commit
04103fe048
@ -15,7 +15,7 @@ type Client interface {
|
|||||||
Init(...Option) error
|
Init(...Option) error
|
||||||
Options() Options
|
Options() Options
|
||||||
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
|
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
|
||||||
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
|
NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request
|
||||||
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
|
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
|
||||||
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
|
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
|
||||||
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
|
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
|
||||||
@ -38,8 +38,8 @@ type Message interface {
|
|||||||
type Request interface {
|
type Request interface {
|
||||||
// The service to call
|
// The service to call
|
||||||
Service() string
|
Service() string
|
||||||
// The method to call
|
// The endpoint to call
|
||||||
Method() string
|
Endpoint() string
|
||||||
// The content type
|
// The content type
|
||||||
ContentType() string
|
ContentType() string
|
||||||
// The unencoded request body
|
// The unencoded request body
|
||||||
@ -125,8 +125,8 @@ func NewClient(opt ...Option) Client {
|
|||||||
|
|
||||||
// Creates a new request using the default client. Content Type will
|
// Creates a new request using the default client. Content Type will
|
||||||
// be set to the default within options and use the appropriate codec
|
// be set to the default within options and use the appropriate codec
|
||||||
func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
|
func NewRequest(service, endpoint string, request interface{}, reqOpts ...RequestOption) Request {
|
||||||
return DefaultClient.NewRequest(service, method, request, reqOpts...)
|
return DefaultClient.NewRequest(service, endpoint, request, reqOpts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a streaming connection with a service and returns responses on the
|
// Creates a streaming connection with a service and returns responses on the
|
||||||
|
@ -16,7 +16,7 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type MockResponse struct {
|
type MockResponse struct {
|
||||||
Method string
|
Endpoint string
|
||||||
Response interface{}
|
Response interface{}
|
||||||
Error error
|
Error error
|
||||||
}
|
}
|
||||||
@ -54,8 +54,8 @@ func (m *MockClient) NewMessage(topic string, msg interface{}, opts ...client.Me
|
|||||||
return m.Client.NewMessage(topic, msg, opts...)
|
return m.Client.NewMessage(topic, msg, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
|
func (m *MockClient) NewRequest(service, endpoint string, req interface{}, reqOpts ...client.RequestOption) client.Request {
|
||||||
return m.Client.NewRequest(service, method, req, reqOpts...)
|
return m.Client.NewRequest(service, endpoint, req, reqOpts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
@ -68,7 +68,7 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, r := range response {
|
for _, r := range response {
|
||||||
if r.Method != req.Method() {
|
if r.Endpoint != req.Endpoint() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,7 +91,7 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("rpc: can't find service %s", req.Method())
|
return fmt.Errorf("rpc: can't find service %s", req.Endpoint())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||||
|
@ -13,17 +13,17 @@ func TestClient(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
response := []MockResponse{
|
response := []MockResponse{
|
||||||
{Method: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}},
|
{Endpoint: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}},
|
||||||
{Method: "Foo.Struct", Response: &TestResponse{Param: "aparam"}},
|
{Endpoint: "Foo.Struct", Response: &TestResponse{Param: "aparam"}},
|
||||||
{Method: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")},
|
{Endpoint: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")},
|
||||||
{Method: "Foo.Func", Response: func() string { return "string" }},
|
{Endpoint: "Foo.Func", Response: func() string { return "string" }},
|
||||||
{Method: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }},
|
{Endpoint: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }},
|
||||||
}
|
}
|
||||||
|
|
||||||
c := NewClient(Response("go.mock", response))
|
c := NewClient(Response("go.mock", response))
|
||||||
|
|
||||||
for _, r := range response {
|
for _, r := range response {
|
||||||
req := c.NewRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"})
|
req := c.NewRequest("go.mock", r.Endpoint, map[string]interface{}{"foo": "bar"})
|
||||||
var rsp interface{}
|
var rsp interface{}
|
||||||
|
|
||||||
err := c.Call(context.TODO(), req, &rsp)
|
err := c.Call(context.TODO(), req, &rsp)
|
||||||
|
@ -458,7 +458,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
|
|||||||
}
|
}
|
||||||
b := &buffer{bytes.NewBuffer(nil)}
|
b := &buffer{bytes.NewBuffer(nil)}
|
||||||
if err := cf(b).Write(&codec.Message{
|
if err := cf(b).Write(&codec.Message{
|
||||||
Type: codec.Publication,
|
Target: msg.Topic(),
|
||||||
|
Type: codec.Publication,
|
||||||
Header: map[string]string{
|
Header: map[string]string{
|
||||||
"X-Micro-Id": id,
|
"X-Micro-Id": id,
|
||||||
"X-Micro-Topic": msg.Topic(),
|
"X-Micro-Topic": msg.Topic(),
|
||||||
|
@ -14,7 +14,7 @@ import (
|
|||||||
func TestCallAddress(t *testing.T) {
|
func TestCallAddress(t *testing.T) {
|
||||||
var called bool
|
var called bool
|
||||||
service := "test.service"
|
service := "test.service"
|
||||||
method := "Test.Method"
|
endpoint := "Test.Endpoint"
|
||||||
address := "10.1.10.1:8080"
|
address := "10.1.10.1:8080"
|
||||||
|
|
||||||
wrap := func(cf CallFunc) CallFunc {
|
wrap := func(cf CallFunc) CallFunc {
|
||||||
@ -25,8 +25,8 @@ func TestCallAddress(t *testing.T) {
|
|||||||
return fmt.Errorf("expected service: %s got %s", service, req.Service())
|
return fmt.Errorf("expected service: %s got %s", service, req.Service())
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Method() != method {
|
if req.Endpoint() != endpoint {
|
||||||
return fmt.Errorf("expected service: %s got %s", method, req.Method())
|
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
|
||||||
}
|
}
|
||||||
|
|
||||||
if addr != address {
|
if addr != address {
|
||||||
@ -45,7 +45,7 @@ func TestCallAddress(t *testing.T) {
|
|||||||
)
|
)
|
||||||
c.Options().Selector.Init(selector.Registry(r))
|
c.Options().Selector.Init(selector.Registry(r))
|
||||||
|
|
||||||
req := c.NewRequest(service, method, nil)
|
req := c.NewRequest(service, endpoint, nil)
|
||||||
|
|
||||||
// test calling remote address
|
// test calling remote address
|
||||||
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
|
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
|
||||||
@ -60,7 +60,7 @@ func TestCallAddress(t *testing.T) {
|
|||||||
|
|
||||||
func TestCallRetry(t *testing.T) {
|
func TestCallRetry(t *testing.T) {
|
||||||
service := "test.service"
|
service := "test.service"
|
||||||
method := "Test.Method"
|
endpoint := "Test.Endpoint"
|
||||||
address := "10.1.10.1:8080"
|
address := "10.1.10.1:8080"
|
||||||
|
|
||||||
var called int
|
var called int
|
||||||
@ -84,7 +84,7 @@ func TestCallRetry(t *testing.T) {
|
|||||||
)
|
)
|
||||||
c.Options().Selector.Init(selector.Registry(r))
|
c.Options().Selector.Init(selector.Registry(r))
|
||||||
|
|
||||||
req := c.NewRequest(service, method, nil)
|
req := c.NewRequest(service, endpoint, nil)
|
||||||
|
|
||||||
// test calling remote address
|
// test calling remote address
|
||||||
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
|
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
|
||||||
@ -101,7 +101,7 @@ func TestCallWrapper(t *testing.T) {
|
|||||||
var called bool
|
var called bool
|
||||||
id := "test.1"
|
id := "test.1"
|
||||||
service := "test.service"
|
service := "test.service"
|
||||||
method := "Test.Method"
|
endpoint := "Test.Endpoint"
|
||||||
host := "10.1.10.1"
|
host := "10.1.10.1"
|
||||||
port := 8080
|
port := 8080
|
||||||
address := "10.1.10.1:8080"
|
address := "10.1.10.1:8080"
|
||||||
@ -114,8 +114,8 @@ func TestCallWrapper(t *testing.T) {
|
|||||||
return fmt.Errorf("expected service: %s got %s", service, req.Service())
|
return fmt.Errorf("expected service: %s got %s", service, req.Service())
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Method() != method {
|
if req.Endpoint() != endpoint {
|
||||||
return fmt.Errorf("expected service: %s got %s", method, req.Method())
|
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
|
||||||
}
|
}
|
||||||
|
|
||||||
if addr != address {
|
if addr != address {
|
||||||
@ -146,7 +146,7 @@ func TestCallWrapper(t *testing.T) {
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
req := c.NewRequest(service, method, nil)
|
req := c.NewRequest(service, endpoint, nil)
|
||||||
if err := c.Call(context.Background(), req, nil); err != nil {
|
if err := c.Call(context.Background(), req, nil); err != nil {
|
||||||
t.Fatal("call wrapper error", err)
|
t.Fatal("call wrapper error", err)
|
||||||
}
|
}
|
||||||
|
@ -102,14 +102,14 @@ func (c *rpcCodec) Write(wm *codec.Message, body interface{}) error {
|
|||||||
c.buf.wbuf.Reset()
|
c.buf.wbuf.Reset()
|
||||||
|
|
||||||
m := &codec.Message{
|
m := &codec.Message{
|
||||||
Id: wm.Id,
|
Id: wm.Id,
|
||||||
Target: wm.Target,
|
Target: wm.Target,
|
||||||
Method: wm.Method,
|
Endpoint: wm.Endpoint,
|
||||||
Type: codec.Request,
|
Type: codec.Request,
|
||||||
Header: map[string]string{
|
Header: map[string]string{
|
||||||
"X-Micro-Id": wm.Id,
|
"X-Micro-Id": wm.Id,
|
||||||
"X-Micro-Service": wm.Target,
|
"X-Micro-Service": wm.Target,
|
||||||
"X-Micro-Method": wm.Method,
|
"X-Micro-Endpoint": wm.Endpoint,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -150,7 +150,7 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error {
|
|||||||
|
|
||||||
// read header
|
// read header
|
||||||
err := c.codec.ReadHeader(&me, r)
|
err := c.codec.ReadHeader(&me, r)
|
||||||
wm.Method = me.Method
|
wm.Endpoint = me.Endpoint
|
||||||
wm.Id = me.Id
|
wm.Id = me.Id
|
||||||
wm.Error = me.Error
|
wm.Error = me.Error
|
||||||
|
|
||||||
@ -160,8 +160,8 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check method in header
|
// check method in header
|
||||||
if len(me.Method) == 0 {
|
if len(me.Endpoint) == 0 {
|
||||||
wm.Method = me.Header["X-Micro-Method"]
|
wm.Endpoint = me.Header["X-Micro-Endpoint"]
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(me.Id) == 0 {
|
if len(me.Id) == 0 {
|
||||||
|
@ -6,14 +6,14 @@ import (
|
|||||||
|
|
||||||
type rpcRequest struct {
|
type rpcRequest struct {
|
||||||
service string
|
service string
|
||||||
method string
|
endpoint string
|
||||||
contentType string
|
contentType string
|
||||||
codec codec.Codec
|
codec codec.Codec
|
||||||
body interface{}
|
body interface{}
|
||||||
opts RequestOptions
|
opts RequestOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
|
func newRequest(service, endpoint string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
|
||||||
var opts RequestOptions
|
var opts RequestOptions
|
||||||
|
|
||||||
for _, o := range reqOpts {
|
for _, o := range reqOpts {
|
||||||
@ -27,7 +27,7 @@ func newRequest(service, method string, request interface{}, contentType string,
|
|||||||
|
|
||||||
return &rpcRequest{
|
return &rpcRequest{
|
||||||
service: service,
|
service: service,
|
||||||
method: method,
|
endpoint: endpoint,
|
||||||
body: request,
|
body: request,
|
||||||
contentType: contentType,
|
contentType: contentType,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
@ -42,8 +42,8 @@ func (r *rpcRequest) Service() string {
|
|||||||
return r.service
|
return r.service
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcRequest) Method() string {
|
func (r *rpcRequest) Endpoint() string {
|
||||||
return r.method
|
return r.endpoint
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcRequest) Body() interface{} {
|
func (r *rpcRequest) Body() interface{} {
|
||||||
|
@ -5,19 +5,19 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestRequestOptions(t *testing.T) {
|
func TestRequestOptions(t *testing.T) {
|
||||||
r := newRequest("service", "method", nil, "application/json")
|
r := newRequest("service", "endpoint", nil, "application/json")
|
||||||
if r.Service() != "service" {
|
if r.Service() != "service" {
|
||||||
t.Fatalf("expected 'service' got %s", r.Service())
|
t.Fatalf("expected 'service' got %s", r.Service())
|
||||||
}
|
}
|
||||||
if r.Method() != "method" {
|
if r.Endpoint() != "endpoint" {
|
||||||
t.Fatalf("expected 'method' got %s", r.Method())
|
t.Fatalf("expected 'endpoint' got %s", r.Endpoint())
|
||||||
}
|
}
|
||||||
if r.ContentType() != "application/json" {
|
if r.ContentType() != "application/json" {
|
||||||
t.Fatalf("expected 'method' got %s", r.ContentType())
|
t.Fatalf("expected 'endpoint' got %s", r.ContentType())
|
||||||
}
|
}
|
||||||
|
|
||||||
r2 := newRequest("service", "method", nil, "application/json", WithContentType("application/protobuf"))
|
r2 := newRequest("service", "endpoint", nil, "application/json", WithContentType("application/protobuf"))
|
||||||
if r2.ContentType() != "application/protobuf" {
|
if r2.ContentType() != "application/protobuf" {
|
||||||
t.Fatalf("expected 'method' got %s", r2.ContentType())
|
t.Fatalf("expected 'endpoint' got %s", r2.ContentType())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -46,10 +46,10 @@ func (r *rpcStream) Send(msg interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
req := codec.Message{
|
req := codec.Message{
|
||||||
Id: r.id,
|
Id: r.id,
|
||||||
Target: r.request.Service(),
|
Target: r.request.Service(),
|
||||||
Method: r.request.Method(),
|
Endpoint: r.request.Endpoint(),
|
||||||
Type: codec.Request,
|
Type: codec.Request,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.codec.Write(&req, msg); err != nil {
|
if err := r.codec.Write(&req, msg); err != nil {
|
||||||
|
@ -50,11 +50,11 @@ type Marshaler interface {
|
|||||||
// the communication, likely followed by the body.
|
// the communication, likely followed by the body.
|
||||||
// In the case of an error, body may be nil.
|
// In the case of an error, body may be nil.
|
||||||
type Message struct {
|
type Message struct {
|
||||||
Id string
|
Id string
|
||||||
Type MessageType
|
Type MessageType
|
||||||
Target string
|
Target string
|
||||||
Method string
|
Endpoint string
|
||||||
Error string
|
Error string
|
||||||
|
|
||||||
// The values read from the socket
|
// The values read from the socket
|
||||||
Header map[string]string
|
Header map[string]string
|
||||||
|
@ -29,7 +29,7 @@ func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
|
|||||||
path := m.Header[":path"]
|
path := m.Header[":path"]
|
||||||
if len(path) == 0 || path[0] != '/' {
|
if len(path) == 0 || path[0] != '/' {
|
||||||
m.Target = m.Header["X-Micro-Service"]
|
m.Target = m.Header["X-Micro-Service"]
|
||||||
m.Method = m.Header["X-Micro-Method"]
|
m.Endpoint = m.Header["X-Micro-Endpoint"]
|
||||||
} else {
|
} else {
|
||||||
// [ , a.package.Foo, Bar]
|
// [ , a.package.Foo, Bar]
|
||||||
parts := strings.Split(path, "/")
|
parts := strings.Split(path, "/")
|
||||||
@ -37,7 +37,7 @@ func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
|
|||||||
return errors.New("Unknown request path")
|
return errors.New("Unknown request path")
|
||||||
}
|
}
|
||||||
service := strings.Split(parts[1], ".")
|
service := strings.Split(parts[1], ".")
|
||||||
m.Method = strings.Join([]string{service[len(service)-1], parts[2]}, ".")
|
m.Endpoint = strings.Join([]string{service[len(service)-1], parts[2]}, ".")
|
||||||
m.Target = strings.Join(service[:len(service)-1], ".")
|
m.Target = strings.Join(service[:len(service)-1], ".")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,9 +45,9 @@ func newClientCodec(conn io.ReadWriteCloser) *clientCodec {
|
|||||||
|
|
||||||
func (c *clientCodec) Write(m *codec.Message, b interface{}) error {
|
func (c *clientCodec) Write(m *codec.Message, b interface{}) error {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
c.pending[m.Id] = m.Method
|
c.pending[m.Id] = m.Endpoint
|
||||||
c.Unlock()
|
c.Unlock()
|
||||||
c.req.Method = m.Method
|
c.req.Method = m.Endpoint
|
||||||
c.req.Params[0] = b
|
c.req.Params[0] = b
|
||||||
c.req.ID = m.Id
|
c.req.ID = m.Id
|
||||||
return c.enc.Encode(&c.req)
|
return c.enc.Encode(&c.req)
|
||||||
@ -66,7 +66,7 @@ func (c *clientCodec) ReadHeader(m *codec.Message) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
c.Lock()
|
c.Lock()
|
||||||
m.Method = c.pending[c.resp.ID]
|
m.Endpoint = c.pending[c.resp.ID]
|
||||||
delete(c.pending, c.resp.ID)
|
delete(c.pending, c.resp.ID)
|
||||||
c.Unlock()
|
c.Unlock()
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ func (c *serverCodec) ReadHeader(m *codec.Message) error {
|
|||||||
if err := c.dec.Decode(&c.req); err != nil {
|
if err := c.dec.Decode(&c.req); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m.Method = c.req.Method
|
m.Endpoint = c.req.Method
|
||||||
m.Id = fmt.Sprintf("%v", c.req.ID)
|
m.Id = fmt.Sprintf("%v", c.req.ID)
|
||||||
c.req.ID = nil
|
c.req.ID = nil
|
||||||
return nil
|
return nil
|
||||||
|
@ -47,7 +47,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
|
|||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
// This is protobuf, of course we copy it.
|
// This is protobuf, of course we copy it.
|
||||||
pbr := &Request{ServiceMethod: &m.Method, Seq: id(m.Id)}
|
pbr := &Request{ServiceMethod: &m.Endpoint, Seq: id(m.Id)}
|
||||||
data, err := proto.Marshal(pbr)
|
data, err := proto.Marshal(pbr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -73,7 +73,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
|
|||||||
case codec.Response:
|
case codec.Response:
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
rtmp := &Response{ServiceMethod: &m.Method, Seq: id(m.Id), Error: &m.Error}
|
rtmp := &Response{ServiceMethod: &m.Endpoint, Seq: id(m.Id), Error: &m.Error}
|
||||||
data, err := proto.Marshal(rtmp)
|
data, err := proto.Marshal(rtmp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -126,7 +126,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m.Method = rtmp.GetServiceMethod()
|
m.Endpoint = rtmp.GetServiceMethod()
|
||||||
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
|
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
|
||||||
case codec.Response:
|
case codec.Response:
|
||||||
data, err := ReadNetString(c.rwc)
|
data, err := ReadNetString(c.rwc)
|
||||||
@ -138,7 +138,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m.Method = rtmp.GetServiceMethod()
|
m.Endpoint = rtmp.GetServiceMethod()
|
||||||
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
|
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
|
||||||
m.Error = rtmp.GetError()
|
m.Error = rtmp.GetError()
|
||||||
case codec.Publication:
|
case codec.Publication:
|
||||||
|
@ -106,14 +106,14 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
|
|||||||
|
|
||||||
// set some internal things
|
// set some internal things
|
||||||
m.Target = m.Header["X-Micro-Service"]
|
m.Target = m.Header["X-Micro-Service"]
|
||||||
m.Method = m.Header["X-Micro-Method"]
|
m.Endpoint = m.Header["X-Micro-Endpoint"]
|
||||||
m.Id = m.Header["X-Micro-Id"]
|
m.Id = m.Header["X-Micro-Id"]
|
||||||
|
|
||||||
// read header via codec
|
// read header via codec
|
||||||
err := c.codec.ReadHeader(&m, codec.Request)
|
err := c.codec.ReadHeader(&m, codec.Request)
|
||||||
|
|
||||||
// set the method/id
|
// set the method/id
|
||||||
r.Method = m.Method
|
r.Endpoint = m.Endpoint
|
||||||
r.Id = m.Id
|
r.Id = m.Id
|
||||||
|
|
||||||
return err
|
return err
|
||||||
@ -128,15 +128,15 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
|
|||||||
|
|
||||||
// create a new message
|
// create a new message
|
||||||
m := &codec.Message{
|
m := &codec.Message{
|
||||||
Method: r.Method,
|
Endpoint: r.Endpoint,
|
||||||
Id: r.Id,
|
Id: r.Id,
|
||||||
Error: r.Error,
|
Error: r.Error,
|
||||||
Type: r.Type,
|
Type: r.Type,
|
||||||
Header: map[string]string{
|
Header: map[string]string{
|
||||||
"X-Micro-Id": r.Id,
|
"X-Micro-Id": r.Id,
|
||||||
"X-Micro-Method": r.Method,
|
"X-Micro-Endpoint": r.Endpoint,
|
||||||
"X-Micro-Error": r.Error,
|
"X-Micro-Error": r.Error,
|
||||||
"Content-Type": c.req.Header["Content-Type"],
|
"Content-Type": c.req.Header["Content-Type"],
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,9 +48,9 @@ func TestCodecWriteError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
err := c.Write(&codec.Message{
|
err := c.Write(&codec.Message{
|
||||||
Method: "Service.Method",
|
Endpoint: "Service.Endpoint",
|
||||||
Id: "0",
|
Id: "0",
|
||||||
Error: "",
|
Error: "",
|
||||||
}, "body")
|
}, "body")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -7,7 +7,7 @@ import (
|
|||||||
|
|
||||||
type rpcRequest struct {
|
type rpcRequest struct {
|
||||||
service string
|
service string
|
||||||
method string
|
endpoint string
|
||||||
contentType string
|
contentType string
|
||||||
socket transport.Socket
|
socket transport.Socket
|
||||||
codec codec.Codec
|
codec codec.Codec
|
||||||
@ -34,8 +34,8 @@ func (r *rpcRequest) Service() string {
|
|||||||
return r.service
|
return r.service
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcRequest) Method() string {
|
func (r *rpcRequest) Endpoint() string {
|
||||||
return r.method
|
return r.endpoint
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcRequest) Header() map[string]string {
|
func (r *rpcRequest) Header() map[string]string {
|
||||||
|
@ -171,7 +171,7 @@ func (router *router) sendResponse(sending sync.Locker, req *request, reply inte
|
|||||||
resp.msg = msg
|
resp.msg = msg
|
||||||
|
|
||||||
// Encode the response header
|
// Encode the response header
|
||||||
resp.msg.Method = req.msg.Method
|
resp.msg.Endpoint = req.msg.Endpoint
|
||||||
if errmsg != "" {
|
if errmsg != "" {
|
||||||
resp.msg.Error = errmsg
|
resp.msg.Error = errmsg
|
||||||
reply = invalidRequest
|
reply = invalidRequest
|
||||||
@ -191,7 +191,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
|
|||||||
r := &rpcRequest{
|
r := &rpcRequest{
|
||||||
service: req.msg.Target,
|
service: req.msg.Target,
|
||||||
contentType: req.msg.Header["Content-Type"],
|
contentType: req.msg.Header["Content-Type"],
|
||||||
method: req.msg.Method,
|
endpoint: req.msg.Endpoint,
|
||||||
body: req.msg.Body,
|
body: req.msg.Body,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -379,9 +379,9 @@ func (router *router) readHeader(cc codec.Reader) (service *service, mtype *meth
|
|||||||
// we can still recover and move on to the next request.
|
// we can still recover and move on to the next request.
|
||||||
keepReading = true
|
keepReading = true
|
||||||
|
|
||||||
serviceMethod := strings.Split(req.msg.Method, ".")
|
serviceMethod := strings.Split(req.msg.Endpoint, ".")
|
||||||
if len(serviceMethod) != 2 {
|
if len(serviceMethod) != 2 {
|
||||||
err = errors.New("rpc: service/method request ill-formed: " + req.msg.Method)
|
err = errors.New("rpc: service/method request ill-formed: " + req.msg.Endpoint)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Look up the request.
|
// Look up the request.
|
||||||
@ -389,12 +389,12 @@ func (router *router) readHeader(cc codec.Reader) (service *service, mtype *meth
|
|||||||
service = router.serviceMap[serviceMethod[0]]
|
service = router.serviceMap[serviceMethod[0]]
|
||||||
router.mu.Unlock()
|
router.mu.Unlock()
|
||||||
if service == nil {
|
if service == nil {
|
||||||
err = errors.New("rpc: can't find service " + req.msg.Method)
|
err = errors.New("rpc: can't find service " + req.msg.Endpoint)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
mtype = service.method[serviceMethod[1]]
|
mtype = service.method[serviceMethod[1]]
|
||||||
if mtype == nil {
|
if mtype == nil {
|
||||||
err = errors.New("rpc: can't find method " + req.msg.Method)
|
err = errors.New("rpc: can't find method " + req.msg.Endpoint)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -115,7 +115,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
|||||||
// internal request
|
// internal request
|
||||||
request := &rpcRequest{
|
request := &rpcRequest{
|
||||||
service: msg.Header["X-Micro-Service"],
|
service: msg.Header["X-Micro-Service"],
|
||||||
method: msg.Header["X-Micro-Method"],
|
endpoint: msg.Header["X-Micro-Endpoint"],
|
||||||
contentType: ct,
|
contentType: ct,
|
||||||
codec: codec,
|
codec: codec,
|
||||||
header: msg.Header,
|
header: msg.Header,
|
||||||
|
@ -31,9 +31,9 @@ func (r *rpcStream) Send(msg interface{}) error {
|
|||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
|
||||||
resp := codec.Message{
|
resp := codec.Message{
|
||||||
Method: r.request.Method(),
|
Endpoint: r.request.Endpoint(),
|
||||||
Id: r.id,
|
Id: r.id,
|
||||||
Type: codec.Response,
|
Type: codec.Response,
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.codec.Write(&resp, msg)
|
return r.codec.Write(&resp, msg)
|
||||||
|
@ -45,8 +45,8 @@ type Message interface {
|
|||||||
type Request interface {
|
type Request interface {
|
||||||
// Service name requested
|
// Service name requested
|
||||||
Service() string
|
Service() string
|
||||||
// Method name requested
|
// Endpoint name requested
|
||||||
Method() string
|
Endpoint() string
|
||||||
// Content type provided
|
// Content type provided
|
||||||
ContentType() string
|
ContentType() string
|
||||||
// Header of the request
|
// Header of the request
|
||||||
@ -83,7 +83,7 @@ type Stream interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Handler interface represents a request handler. It's generated
|
// Handler interface represents a request handler. It's generated
|
||||||
// by passing any type of public concrete object with methods into server.NewHandler.
|
// by passing any type of public concrete object with endpoints into server.NewHandler.
|
||||||
// Most will pass in a struct.
|
// Most will pass in a struct.
|
||||||
//
|
//
|
||||||
// Example:
|
// Example:
|
||||||
@ -102,7 +102,7 @@ type Handler interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Subscriber interface represents a subscription to a given topic using
|
// Subscriber interface represents a subscription to a given topic using
|
||||||
// a specific subscriber function or object with methods.
|
// a specific subscriber function or object with endpoints.
|
||||||
type Subscriber interface {
|
type Subscriber interface {
|
||||||
Topic() string
|
Topic() string
|
||||||
Subscriber() interface{}
|
Subscriber() interface{}
|
||||||
@ -151,7 +151,7 @@ func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscr
|
|||||||
|
|
||||||
// NewHandler creates a new handler interface using the default server
|
// NewHandler creates a new handler interface using the default server
|
||||||
// Handlers are required to be a public object with public
|
// Handlers are required to be a public object with public
|
||||||
// methods. Call to a service method such as Foo.Bar expects
|
// endpoints. Call to a service endpoint such as Foo.Bar expects
|
||||||
// the type:
|
// the type:
|
||||||
//
|
//
|
||||||
// type Foo struct {}
|
// type Foo struct {}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user