Merge pull request #393 from micro/legacy
Evolution of Codecs and Methods
This commit is contained in:
		| @@ -38,7 +38,9 @@ type Message interface { | |||||||
| type Request interface { | type Request interface { | ||||||
| 	// The service to call | 	// The service to call | ||||||
| 	Service() string | 	Service() string | ||||||
| 	// The endpoint to call | 	// The action to take | ||||||
|  | 	Method() string | ||||||
|  | 	// The endpoint to invoke | ||||||
| 	Endpoint() string | 	Endpoint() string | ||||||
| 	// The content type | 	// The content type | ||||||
| 	ContentType() string | 	ContentType() string | ||||||
|   | |||||||
| @@ -4,10 +4,11 @@ import ( | |||||||
| 	"bytes" | 	"bytes" | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"net" | ||||||
|  | 	"strconv" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" |  | ||||||
|  |  | ||||||
| 	"sync/atomic" | 	"sync/atomic" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/google/uuid" | 	"github.com/google/uuid" | ||||||
| 	"github.com/micro/go-micro/broker" | 	"github.com/micro/go-micro/broker" | ||||||
| @@ -56,7 +57,12 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) { | |||||||
| 	return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) | 	return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *rpcClient) call(ctx context.Context, address string, req Request, resp interface{}, opts CallOptions) error { | func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error { | ||||||
|  | 	address := node.Address | ||||||
|  | 	if node.Port > 0 { | ||||||
|  | 		address = fmt.Sprintf("%s:%d", address, node.Port) | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	msg := &transport.Message{ | 	msg := &transport.Message{ | ||||||
| 		Header: make(map[string]string), | 		Header: make(map[string]string), | ||||||
| 	} | 	} | ||||||
| @@ -75,9 +81,16 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp | |||||||
| 	// set the accept header | 	// set the accept header | ||||||
| 	msg.Header["Accept"] = req.ContentType() | 	msg.Header["Accept"] = req.ContentType() | ||||||
|  |  | ||||||
| 	cf, err := r.newCodec(req.ContentType()) | 	// setup old protocol | ||||||
| 	if err != nil { | 	cf := setupProtocol(msg, node) | ||||||
| 		return errors.InternalServerError("go.micro.client", err.Error()) |  | ||||||
|  | 	// no codec specified | ||||||
|  | 	if cf == nil { | ||||||
|  | 		var err error | ||||||
|  | 		cf, err = r.newCodec(req.ContentType()) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return errors.InternalServerError("go.micro.client", err.Error()) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	var grr error | 	var grr error | ||||||
| @@ -144,7 +157,12 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Stream, error) { | func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) { | ||||||
|  | 	address := node.Address | ||||||
|  | 	if node.Port > 0 { | ||||||
|  | 		address = fmt.Sprintf("%s:%d", address, node.Port) | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	msg := &transport.Message{ | 	msg := &transport.Message{ | ||||||
| 		Header: make(map[string]string), | 		Header: make(map[string]string), | ||||||
| 	} | 	} | ||||||
| @@ -163,9 +181,16 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt | |||||||
| 	// set the accept header | 	// set the accept header | ||||||
| 	msg.Header["Accept"] = req.ContentType() | 	msg.Header["Accept"] = req.ContentType() | ||||||
|  |  | ||||||
| 	cf, err := r.newCodec(req.ContentType()) | 	// set old codecs | ||||||
| 	if err != nil { | 	cf := setupProtocol(msg, node) | ||||||
| 		return nil, errors.InternalServerError("go.micro.client", err.Error()) |  | ||||||
|  | 	// no codec specified | ||||||
|  | 	if cf == nil { | ||||||
|  | 		var err error | ||||||
|  | 		cf, err = r.newCodec(req.ContentType()) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, errors.InternalServerError("go.micro.client", err.Error()) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	dOpts := []transport.DialOption{ | 	dOpts := []transport.DialOption{ | ||||||
| @@ -245,9 +270,19 @@ func (r *rpcClient) Options() Options { | |||||||
| func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) { | func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) { | ||||||
| 	// return remote address | 	// return remote address | ||||||
| 	if len(opts.Address) > 0 { | 	if len(opts.Address) > 0 { | ||||||
|  | 		address := opts.Address | ||||||
|  | 		port := 0 | ||||||
|  |  | ||||||
|  | 		host, sport, err := net.SplitHostPort(opts.Address) | ||||||
|  | 		if err == nil { | ||||||
|  | 			address = host | ||||||
|  | 			port, _ = strconv.Atoi(sport) | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 		return func() (*registry.Node, error) { | 		return func() (*registry.Node, error) { | ||||||
| 			return ®istry.Node{ | 			return ®istry.Node{ | ||||||
| 				Address: opts.Address, | 				Address: address, | ||||||
|  | 				Port:    port, | ||||||
| 			}, nil | 			}, nil | ||||||
| 		}, nil | 		}, nil | ||||||
| 	} | 	} | ||||||
| @@ -323,14 +358,8 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac | |||||||
| 			return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) | 			return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// set the address |  | ||||||
| 		address := node.Address |  | ||||||
| 		if node.Port > 0 { |  | ||||||
| 			address = fmt.Sprintf("%s:%d", address, node.Port) |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		// make the call | 		// make the call | ||||||
| 		err = rcall(ctx, address, request, response, callOpts) | 		err = rcall(ctx, node, request, response, callOpts) | ||||||
| 		r.opts.Selector.Mark(request.Service(), node, err) | 		r.opts.Selector.Mark(request.Service(), node, err) | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| @@ -406,12 +435,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt | |||||||
| 			return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) | 			return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error()) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		address := node.Address | 		stream, err := r.stream(ctx, node, request, callOpts) | ||||||
| 		if node.Port > 0 { |  | ||||||
| 			address = fmt.Sprintf("%s:%d", address, node.Port) |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		stream, err := r.stream(ctx, address, request, callOpts) |  | ||||||
| 		r.opts.Selector.Mark(request.Service(), node, err) | 		r.opts.Selector.Mark(request.Service(), node, err) | ||||||
| 		return stream, err | 		return stream, err | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -21,10 +21,11 @@ func TestCallAddress(t *testing.T) { | |||||||
| 	var called bool | 	var called bool | ||||||
| 	service := "test.service" | 	service := "test.service" | ||||||
| 	endpoint := "Test.Endpoint" | 	endpoint := "Test.Endpoint" | ||||||
| 	address := "10.1.10.1:8080" | 	address := "10.1.10.1" | ||||||
|  | 	port := 8080 | ||||||
|  |  | ||||||
| 	wrap := func(cf CallFunc) CallFunc { | 	wrap := func(cf CallFunc) CallFunc { | ||||||
| 		return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { | 		return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error { | ||||||
| 			called = true | 			called = true | ||||||
|  |  | ||||||
| 			if req.Service() != service { | 			if req.Service() != service { | ||||||
| @@ -35,8 +36,12 @@ func TestCallAddress(t *testing.T) { | |||||||
| 				return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) | 				return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			if addr != address { | 			if node.Address != address { | ||||||
| 				return fmt.Errorf("expected address: %s got %s", address, addr) | 				return fmt.Errorf("expected address: %s got %s", address, node.Address) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			if node.Port != port { | ||||||
|  | 				return fmt.Errorf("expected address: %d got %d", port, node.Port) | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			// don't do the call | 			// don't do the call | ||||||
| @@ -54,7 +59,7 @@ func TestCallAddress(t *testing.T) { | |||||||
| 	req := c.NewRequest(service, endpoint, nil) | 	req := c.NewRequest(service, endpoint, nil) | ||||||
|  |  | ||||||
| 	// test calling remote address | 	// test calling remote address | ||||||
| 	if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil { | 	if err := c.Call(context.Background(), req, nil, WithAddress(fmt.Sprintf("%s:%d", address, port))); err != nil { | ||||||
| 		t.Fatal("call with address error", err) | 		t.Fatal("call with address error", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -67,12 +72,12 @@ func TestCallAddress(t *testing.T) { | |||||||
| func TestCallRetry(t *testing.T) { | func TestCallRetry(t *testing.T) { | ||||||
| 	service := "test.service" | 	service := "test.service" | ||||||
| 	endpoint := "Test.Endpoint" | 	endpoint := "Test.Endpoint" | ||||||
| 	address := "10.1.10.1:8080" | 	address := "10.1.10.1" | ||||||
|  |  | ||||||
| 	var called int | 	var called int | ||||||
|  |  | ||||||
| 	wrap := func(cf CallFunc) CallFunc { | 	wrap := func(cf CallFunc) CallFunc { | ||||||
| 		return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { | 		return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error { | ||||||
| 			called++ | 			called++ | ||||||
| 			if called == 1 { | 			if called == 1 { | ||||||
| 				return errors.InternalServerError("test.error", "retry request") | 				return errors.InternalServerError("test.error", "retry request") | ||||||
| @@ -108,12 +113,11 @@ func TestCallWrapper(t *testing.T) { | |||||||
| 	id := "test.1" | 	id := "test.1" | ||||||
| 	service := "test.service" | 	service := "test.service" | ||||||
| 	endpoint := "Test.Endpoint" | 	endpoint := "Test.Endpoint" | ||||||
| 	host := "10.1.10.1" | 	address := "10.1.10.1" | ||||||
| 	port := 8080 | 	port := 8080 | ||||||
| 	address := "10.1.10.1:8080" |  | ||||||
|  |  | ||||||
| 	wrap := func(cf CallFunc) CallFunc { | 	wrap := func(cf CallFunc) CallFunc { | ||||||
| 		return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error { | 		return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error { | ||||||
| 			called = true | 			called = true | ||||||
|  |  | ||||||
| 			if req.Service() != service { | 			if req.Service() != service { | ||||||
| @@ -124,8 +128,8 @@ func TestCallWrapper(t *testing.T) { | |||||||
| 				return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) | 				return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint()) | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			if addr != address { | 			if node.Address != address { | ||||||
| 				return fmt.Errorf("expected address: %s got %s", address, addr) | 				return fmt.Errorf("expected address: %s got %s", address, node.Address) | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			// don't do the call | 			// don't do the call | ||||||
| @@ -146,7 +150,7 @@ func TestCallWrapper(t *testing.T) { | |||||||
| 		Nodes: []*registry.Node{ | 		Nodes: []*registry.Node{ | ||||||
| 			®istry.Node{ | 			®istry.Node{ | ||||||
| 				Id:      id, | 				Id:      id, | ||||||
| 				Address: host, | 				Address: address, | ||||||
| 				Port:    port, | 				Port:    port, | ||||||
| 			}, | 			}, | ||||||
| 		}, | 		}, | ||||||
|   | |||||||
| @@ -12,6 +12,7 @@ import ( | |||||||
| 	"github.com/micro/go-micro/codec/proto" | 	"github.com/micro/go-micro/codec/proto" | ||||||
| 	"github.com/micro/go-micro/codec/protorpc" | 	"github.com/micro/go-micro/codec/protorpc" | ||||||
| 	"github.com/micro/go-micro/errors" | 	"github.com/micro/go-micro/errors" | ||||||
|  | 	"github.com/micro/go-micro/registry" | ||||||
| 	"github.com/micro/go-micro/transport" | 	"github.com/micro/go-micro/transport" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -58,6 +59,15 @@ var ( | |||||||
| 		"application/proto-rpc":    protorpc.NewCodec, | 		"application/proto-rpc":    protorpc.NewCodec, | ||||||
| 		"application/octet-stream": raw.NewCodec, | 		"application/octet-stream": raw.NewCodec, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// TODO: remove legacy codec list | ||||||
|  | 	defaultCodecs = map[string]codec.NewCodec{ | ||||||
|  | 		"application/json":         jsonrpc.NewCodec, | ||||||
|  | 		"application/json-rpc":     jsonrpc.NewCodec, | ||||||
|  | 		"application/protobuf":     protorpc.NewCodec, | ||||||
|  | 		"application/proto-rpc":    protorpc.NewCodec, | ||||||
|  | 		"application/octet-stream": protorpc.NewCodec, | ||||||
|  | 	} | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { | func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { | ||||||
| @@ -74,6 +84,27 @@ func (rwc *readWriteCloser) Close() error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // setupProtocol sets up the old protocol | ||||||
|  | func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec { | ||||||
|  | 	protocol := node.Metadata["protocol"] | ||||||
|  |  | ||||||
|  | 	// got protocol | ||||||
|  | 	if len(protocol) > 0 { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// no protocol use old codecs | ||||||
|  | 	switch msg.Header["Content-Type"] { | ||||||
|  | 	case "application/json": | ||||||
|  | 		msg.Header["Content-Type"] = "application/json-rpc" | ||||||
|  | 	case "application/protobuf": | ||||||
|  | 		msg.Header["Content-Type"] = "application/proto-rpc" | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// now return codec | ||||||
|  | 	return defaultCodecs[msg.Header["Content-Type"]] | ||||||
|  | } | ||||||
|  |  | ||||||
| func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec { | func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec { | ||||||
| 	rwc := &readWriteCloser{ | 	rwc := &readWriteCloser{ | ||||||
| 		wbuf: bytes.NewBuffer(nil), | 		wbuf: bytes.NewBuffer(nil), | ||||||
| @@ -104,6 +135,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error { | |||||||
| 	// set the mucp headers | 	// set the mucp headers | ||||||
| 	m.Header["X-Micro-Id"] = m.Id | 	m.Header["X-Micro-Id"] = m.Id | ||||||
| 	m.Header["X-Micro-Service"] = m.Target | 	m.Header["X-Micro-Service"] = m.Target | ||||||
|  | 	m.Header["X-Micro-Method"] = m.Method | ||||||
| 	m.Header["X-Micro-Endpoint"] = m.Endpoint | 	m.Header["X-Micro-Endpoint"] = m.Endpoint | ||||||
|  |  | ||||||
| 	// if body is bytes Frame don't encode | 	// if body is bytes Frame don't encode | ||||||
| @@ -154,6 +186,7 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error { | |||||||
| 	// read header | 	// read header | ||||||
| 	err := c.codec.ReadHeader(&me, r) | 	err := c.codec.ReadHeader(&me, r) | ||||||
| 	wm.Endpoint = me.Endpoint | 	wm.Endpoint = me.Endpoint | ||||||
|  | 	wm.Method = me.Method | ||||||
| 	wm.Id = me.Id | 	wm.Id = me.Id | ||||||
| 	wm.Error = me.Error | 	wm.Error = me.Error | ||||||
|  |  | ||||||
| @@ -162,11 +195,16 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error { | |||||||
| 		wm.Error = me.Header["X-Micro-Error"] | 		wm.Error = me.Header["X-Micro-Error"] | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// check method in header | 	// check endpoint in header | ||||||
| 	if len(me.Endpoint) == 0 { | 	if len(me.Endpoint) == 0 { | ||||||
| 		wm.Endpoint = me.Header["X-Micro-Endpoint"] | 		wm.Endpoint = me.Header["X-Micro-Endpoint"] | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// check method in header | ||||||
|  | 	if len(me.Method) == 0 { | ||||||
|  | 		wm.Method = me.Header["X-Micro-Method"] | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	if len(me.Id) == 0 { | 	if len(me.Id) == 0 { | ||||||
| 		wm.Id = me.Header["X-Micro-Id"] | 		wm.Id = me.Header["X-Micro-Id"] | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -6,6 +6,7 @@ import ( | |||||||
|  |  | ||||||
| type rpcRequest struct { | type rpcRequest struct { | ||||||
| 	service     string | 	service     string | ||||||
|  | 	method      string | ||||||
| 	endpoint    string | 	endpoint    string | ||||||
| 	contentType string | 	contentType string | ||||||
| 	codec       codec.Codec | 	codec       codec.Codec | ||||||
| @@ -27,6 +28,7 @@ func newRequest(service, endpoint string, request interface{}, contentType strin | |||||||
|  |  | ||||||
| 	return &rpcRequest{ | 	return &rpcRequest{ | ||||||
| 		service:     service, | 		service:     service, | ||||||
|  | 		method:      endpoint, | ||||||
| 		endpoint:    endpoint, | 		endpoint:    endpoint, | ||||||
| 		body:        request, | 		body:        request, | ||||||
| 		contentType: contentType, | 		contentType: contentType, | ||||||
| @@ -42,6 +44,10 @@ func (r *rpcRequest) Service() string { | |||||||
| 	return r.service | 	return r.service | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (r *rpcRequest) Method() string { | ||||||
|  | 	return r.method | ||||||
|  | } | ||||||
|  |  | ||||||
| func (r *rpcRequest) Endpoint() string { | func (r *rpcRequest) Endpoint() string { | ||||||
| 	return r.endpoint | 	return r.endpoint | ||||||
| } | } | ||||||
|   | |||||||
| @@ -53,6 +53,7 @@ func (r *rpcStream) Send(msg interface{}) error { | |||||||
| 	req := codec.Message{ | 	req := codec.Message{ | ||||||
| 		Id:       r.id, | 		Id:       r.id, | ||||||
| 		Target:   r.request.Service(), | 		Target:   r.request.Service(), | ||||||
|  | 		Method:   r.request.Method(), | ||||||
| 		Endpoint: r.request.Endpoint(), | 		Endpoint: r.request.Endpoint(), | ||||||
| 		Type:     codec.Request, | 		Type:     codec.Request, | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -2,10 +2,12 @@ package client | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  |  | ||||||
|  | 	"github.com/micro/go-micro/registry" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // CallFunc represents the individual call func | // CallFunc represents the individual call func | ||||||
| type CallFunc func(ctx context.Context, address string, req Request, rsp interface{}, opts CallOptions) error | type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error | ||||||
|  |  | ||||||
| // CallWrapper is a low level wrapper for the CallFunc | // CallWrapper is a low level wrapper for the CallFunc | ||||||
| type CallWrapper func(CallFunc) CallFunc | type CallWrapper func(CallFunc) CallFunc | ||||||
|   | |||||||
| @@ -53,6 +53,7 @@ type Message struct { | |||||||
| 	Id       string | 	Id       string | ||||||
| 	Type     MessageType | 	Type     MessageType | ||||||
| 	Target   string | 	Target   string | ||||||
|  | 	Method   string | ||||||
| 	Endpoint string | 	Endpoint string | ||||||
| 	Error    string | 	Error    string | ||||||
|  |  | ||||||
|   | |||||||
| @@ -45,9 +45,9 @@ func newClientCodec(conn io.ReadWriteCloser) *clientCodec { | |||||||
|  |  | ||||||
| func (c *clientCodec) Write(m *codec.Message, b interface{}) error { | func (c *clientCodec) Write(m *codec.Message, b interface{}) error { | ||||||
| 	c.Lock() | 	c.Lock() | ||||||
| 	c.pending[m.Id] = m.Endpoint | 	c.pending[m.Id] = m.Method | ||||||
| 	c.Unlock() | 	c.Unlock() | ||||||
| 	c.req.Method = m.Endpoint | 	c.req.Method = m.Method | ||||||
| 	c.req.Params[0] = b | 	c.req.Params[0] = b | ||||||
| 	c.req.ID = m.Id | 	c.req.ID = m.Id | ||||||
| 	return c.enc.Encode(&c.req) | 	return c.enc.Encode(&c.req) | ||||||
| @@ -66,7 +66,7 @@ func (c *clientCodec) ReadHeader(m *codec.Message) error { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	c.Lock() | 	c.Lock() | ||||||
| 	m.Endpoint = c.pending[c.resp.ID] | 	m.Method = c.pending[c.resp.ID] | ||||||
| 	delete(c.pending, c.resp.ID) | 	delete(c.pending, c.resp.ID) | ||||||
| 	c.Unlock() | 	c.Unlock() | ||||||
|  |  | ||||||
|   | |||||||
| @@ -53,7 +53,7 @@ func (c *serverCodec) ReadHeader(m *codec.Message) error { | |||||||
| 	if err := c.dec.Decode(&c.req); err != nil { | 	if err := c.dec.Decode(&c.req); err != nil { | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	m.Endpoint = c.req.Method | 	m.Method = c.req.Method | ||||||
| 	m.Id = fmt.Sprintf("%v", c.req.ID) | 	m.Id = fmt.Sprintf("%v", c.req.ID) | ||||||
| 	c.req.ID = nil | 	c.req.ID = nil | ||||||
| 	return nil | 	return nil | ||||||
|   | |||||||
| @@ -47,7 +47,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { | |||||||
| 		c.Lock() | 		c.Lock() | ||||||
| 		defer c.Unlock() | 		defer c.Unlock() | ||||||
| 		// This is protobuf, of course we copy it. | 		// This is protobuf, of course we copy it. | ||||||
| 		pbr := &Request{ServiceMethod: &m.Endpoint, Seq: id(m.Id)} | 		pbr := &Request{ServiceMethod: &m.Method, Seq: id(m.Id)} | ||||||
| 		data, err := proto.Marshal(pbr) | 		data, err := proto.Marshal(pbr) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| @@ -73,7 +73,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { | |||||||
| 	case codec.Response, codec.Error: | 	case codec.Response, codec.Error: | ||||||
| 		c.Lock() | 		c.Lock() | ||||||
| 		defer c.Unlock() | 		defer c.Unlock() | ||||||
| 		rtmp := &Response{ServiceMethod: &m.Endpoint, Seq: id(m.Id), Error: &m.Error} | 		rtmp := &Response{ServiceMethod: &m.Method, Seq: id(m.Id), Error: &m.Error} | ||||||
| 		data, err := proto.Marshal(rtmp) | 		data, err := proto.Marshal(rtmp) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| @@ -126,7 +126,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { | |||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 		m.Endpoint = rtmp.GetServiceMethod() | 		m.Method = rtmp.GetServiceMethod() | ||||||
| 		m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) | 		m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) | ||||||
| 	case codec.Response: | 	case codec.Response: | ||||||
| 		data, err := ReadNetString(c.rwc) | 		data, err := ReadNetString(c.rwc) | ||||||
| @@ -138,7 +138,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { | |||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| 		} | 		} | ||||||
| 		m.Endpoint = rtmp.GetServiceMethod() | 		m.Method = rtmp.GetServiceMethod() | ||||||
| 		m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) | 		m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) | ||||||
| 		m.Error = rtmp.GetError() | 		m.Error = rtmp.GetError() | ||||||
| 	case codec.Publication: | 	case codec.Publication: | ||||||
|   | |||||||
| @@ -41,6 +41,15 @@ var ( | |||||||
| 		"application/proto-rpc":    protorpc.NewCodec, | 		"application/proto-rpc":    protorpc.NewCodec, | ||||||
| 		"application/octet-stream": raw.NewCodec, | 		"application/octet-stream": raw.NewCodec, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// TODO: remove legacy codec list | ||||||
|  | 	defaultCodecs = map[string]codec.NewCodec{ | ||||||
|  | 		"application/json":         jsonrpc.NewCodec, | ||||||
|  | 		"application/json-rpc":     jsonrpc.NewCodec, | ||||||
|  | 		"application/protobuf":     protorpc.NewCodec, | ||||||
|  | 		"application/proto-rpc":    protorpc.NewCodec, | ||||||
|  | 		"application/octet-stream": protorpc.NewCodec, | ||||||
|  | 	} | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { | func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { | ||||||
| @@ -57,6 +66,42 @@ func (rwc *readWriteCloser) Close() error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // setupProtocol sets up the old protocol | ||||||
|  | func setupProtocol(msg *transport.Message) codec.NewCodec { | ||||||
|  | 	service := msg.Header["X-Micro-Service"] | ||||||
|  | 	method := msg.Header["X-Micro-Method"] | ||||||
|  | 	endpoint := msg.Header["X-Micro-Endpoint"] | ||||||
|  | 	protocol := msg.Header["X-Micro-Protocol"] | ||||||
|  | 	target := msg.Header["X-Micro-Target"] | ||||||
|  |  | ||||||
|  | 	// if the protocol exists (mucp) do nothing | ||||||
|  | 	if len(protocol) > 0 { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// if no service/method/endpoint then it's the old protocol | ||||||
|  | 	if len(service) == 0 && len(method) == 0 && len(endpoint) == 0 { | ||||||
|  | 		return defaultCodecs[msg.Header["Content-Type"]] | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// old target method specified | ||||||
|  | 	if len(target) > 0 { | ||||||
|  | 		return defaultCodecs[msg.Header["Content-Type"]] | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// no method then set to endpoint | ||||||
|  | 	if len(method) == 0 { | ||||||
|  | 		msg.Header["X-Micro-Method"] = method | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// no endpoint then set to method | ||||||
|  | 	if len(endpoint) == 0 { | ||||||
|  | 		msg.Header["X-Micro-Endpoint"] = method | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec { | func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec { | ||||||
| 	rwc := &readWriteCloser{ | 	rwc := &readWriteCloser{ | ||||||
| 		rbuf: bytes.NewBuffer(req.Body), | 		rbuf: bytes.NewBuffer(req.Body), | ||||||
| @@ -109,6 +154,7 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { | |||||||
|  |  | ||||||
| 	// set some internal things | 	// set some internal things | ||||||
| 	m.Target = m.Header["X-Micro-Service"] | 	m.Target = m.Header["X-Micro-Service"] | ||||||
|  | 	m.Method = m.Header["X-Micro-Method"] | ||||||
| 	m.Endpoint = m.Header["X-Micro-Endpoint"] | 	m.Endpoint = m.Header["X-Micro-Endpoint"] | ||||||
| 	m.Id = m.Header["X-Micro-Id"] | 	m.Id = m.Header["X-Micro-Id"] | ||||||
|  |  | ||||||
| @@ -116,9 +162,15 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { | |||||||
| 	err := c.codec.ReadHeader(&m, codec.Request) | 	err := c.codec.ReadHeader(&m, codec.Request) | ||||||
|  |  | ||||||
| 	// set the method/id | 	// set the method/id | ||||||
|  | 	r.Method = m.Method | ||||||
| 	r.Endpoint = m.Endpoint | 	r.Endpoint = m.Endpoint | ||||||
| 	r.Id = m.Id | 	r.Id = m.Id | ||||||
|  |  | ||||||
|  | 	// TODO: remove the old legacy cruft | ||||||
|  | 	if len(r.Endpoint) == 0 { | ||||||
|  | 		r.Endpoint = r.Method | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	return err | 	return err | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -141,6 +193,8 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { | |||||||
|  |  | ||||||
| 	// create a new message | 	// create a new message | ||||||
| 	m := &codec.Message{ | 	m := &codec.Message{ | ||||||
|  | 		Target:   r.Target, | ||||||
|  | 		Method:   r.Method, | ||||||
| 		Endpoint: r.Endpoint, | 		Endpoint: r.Endpoint, | ||||||
| 		Id:       r.Id, | 		Id:       r.Id, | ||||||
| 		Error:    r.Error, | 		Error:    r.Error, | ||||||
| @@ -162,6 +216,11 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error { | |||||||
| 		m.Header["X-Micro-Service"] = r.Target | 		m.Header["X-Micro-Service"] = r.Target | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// set request method | ||||||
|  | 	if len(r.Method) > 0 { | ||||||
|  | 		m.Header["X-Micro-Method"] = r.Method | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	// set request endpoint | 	// set request endpoint | ||||||
| 	if len(r.Endpoint) > 0 { | 	if len(r.Endpoint) > 0 { | ||||||
| 		m.Header["X-Micro-Endpoint"] = r.Endpoint | 		m.Header["X-Micro-Endpoint"] = r.Endpoint | ||||||
|   | |||||||
| @@ -7,6 +7,7 @@ import ( | |||||||
|  |  | ||||||
| type rpcRequest struct { | type rpcRequest struct { | ||||||
| 	service     string | 	service     string | ||||||
|  | 	method      string | ||||||
| 	endpoint    string | 	endpoint    string | ||||||
| 	contentType string | 	contentType string | ||||||
| 	socket      transport.Socket | 	socket      transport.Socket | ||||||
| @@ -34,6 +35,10 @@ func (r *rpcRequest) Service() string { | |||||||
| 	return r.service | 	return r.service | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (r *rpcRequest) Method() string { | ||||||
|  | 	return r.method | ||||||
|  | } | ||||||
|  |  | ||||||
| func (r *rpcRequest) Endpoint() string { | func (r *rpcRequest) Endpoint() string { | ||||||
| 	return r.endpoint | 	return r.endpoint | ||||||
| } | } | ||||||
|   | |||||||
| @@ -189,6 +189,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, | |||||||
| 	r := &rpcRequest{ | 	r := &rpcRequest{ | ||||||
| 		service:     req.msg.Target, | 		service:     req.msg.Target, | ||||||
| 		contentType: req.msg.Header["Content-Type"], | 		contentType: req.msg.Header["Content-Type"], | ||||||
|  | 		method:      req.msg.Method, | ||||||
| 		endpoint:    req.msg.Endpoint, | 		endpoint:    req.msg.Endpoint, | ||||||
| 		body:        req.msg.Body, | 		body:        req.msg.Body, | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -97,17 +97,23 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { | |||||||
| 			ct = DefaultContentType | 			ct = DefaultContentType | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		// TODO: needs better error handling | 		// setup old protocol | ||||||
| 		cf, err := s.newCodec(ct) | 		cf := setupProtocol(&msg) | ||||||
| 		if err != nil { |  | ||||||
| 			sock.Send(&transport.Message{ | 		// no old codec | ||||||
| 				Header: map[string]string{ | 		if cf == nil { | ||||||
| 					"Content-Type": "text/plain", | 			// TODO: needs better error handling | ||||||
| 				}, | 			var err error | ||||||
| 				Body: []byte(err.Error()), | 			if cf, err = s.newCodec(ct); err != nil { | ||||||
| 			}) | 				sock.Send(&transport.Message{ | ||||||
| 			s.wg.Done() | 					Header: map[string]string{ | ||||||
| 			return | 						"Content-Type": "text/plain", | ||||||
|  | 					}, | ||||||
|  | 					Body: []byte(err.Error()), | ||||||
|  | 				}) | ||||||
|  | 				s.wg.Done() | ||||||
|  | 				return | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		rcodec := newRpcCodec(&msg, sock, cf) | 		rcodec := newRpcCodec(&msg, sock, cf) | ||||||
| @@ -115,6 +121,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { | |||||||
| 		// internal request | 		// internal request | ||||||
| 		request := &rpcRequest{ | 		request := &rpcRequest{ | ||||||
| 			service:     msg.Header["X-Micro-Service"], | 			service:     msg.Header["X-Micro-Service"], | ||||||
|  | 			method:      msg.Header["X-Micro-Method"], | ||||||
| 			endpoint:    msg.Header["X-Micro-Endpoint"], | 			endpoint:    msg.Header["X-Micro-Endpoint"], | ||||||
| 			contentType: ct, | 			contentType: ct, | ||||||
| 			codec:       rcodec, | 			codec:       rcodec, | ||||||
| @@ -276,6 +283,7 @@ func (s *rpcServer) Register() error { | |||||||
| 	node.Metadata["broker"] = config.Broker.String() | 	node.Metadata["broker"] = config.Broker.String() | ||||||
| 	node.Metadata["server"] = s.String() | 	node.Metadata["server"] = s.String() | ||||||
| 	node.Metadata["registry"] = config.Registry.String() | 	node.Metadata["registry"] = config.Registry.String() | ||||||
|  | 	node.Metadata["protocol"] = "mucp" | ||||||
|  |  | ||||||
| 	s.RLock() | 	s.RLock() | ||||||
| 	// Maps are ordered randomly, sort the keys for consistency | 	// Maps are ordered randomly, sort the keys for consistency | ||||||
|   | |||||||
| @@ -31,6 +31,8 @@ func (r *rpcStream) Send(msg interface{}) error { | |||||||
| 	defer r.Unlock() | 	defer r.Unlock() | ||||||
|  |  | ||||||
| 	resp := codec.Message{ | 	resp := codec.Message{ | ||||||
|  | 		Target:   r.request.Service(), | ||||||
|  | 		Method:   r.request.Method(), | ||||||
| 		Endpoint: r.request.Endpoint(), | 		Endpoint: r.request.Endpoint(), | ||||||
| 		Id:       r.id, | 		Id:       r.id, | ||||||
| 		Type:     codec.Response, | 		Type:     codec.Response, | ||||||
|   | |||||||
| @@ -45,6 +45,8 @@ type Message interface { | |||||||
| type Request interface { | type Request interface { | ||||||
| 	// Service name requested | 	// Service name requested | ||||||
| 	Service() string | 	Service() string | ||||||
|  | 	// The action requested | ||||||
|  | 	Method() string | ||||||
| 	// Endpoint name requested | 	// Endpoint name requested | ||||||
| 	Endpoint() string | 	Endpoint() string | ||||||
| 	// Content type provided | 	// Content type provided | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user