Merge pull request #393 from micro/legacy
Evolution of Codecs and Methods
This commit is contained in:
commit
943219f203
@ -38,7 +38,9 @@ type Message interface {
|
|||||||
type Request interface {
|
type Request interface {
|
||||||
// The service to call
|
// The service to call
|
||||||
Service() string
|
Service() string
|
||||||
// The endpoint to call
|
// The action to take
|
||||||
|
Method() string
|
||||||
|
// The endpoint to invoke
|
||||||
Endpoint() string
|
Endpoint() string
|
||||||
// The content type
|
// The content type
|
||||||
ContentType() string
|
ContentType() string
|
||||||
|
@ -4,10 +4,11 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/micro/go-micro/broker"
|
"github.com/micro/go-micro/broker"
|
||||||
@ -56,7 +57,12 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
|
|||||||
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
|
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcClient) call(ctx context.Context, address string, req Request, resp interface{}, opts CallOptions) error {
|
func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error {
|
||||||
|
address := node.Address
|
||||||
|
if node.Port > 0 {
|
||||||
|
address = fmt.Sprintf("%s:%d", address, node.Port)
|
||||||
|
}
|
||||||
|
|
||||||
msg := &transport.Message{
|
msg := &transport.Message{
|
||||||
Header: make(map[string]string),
|
Header: make(map[string]string),
|
||||||
}
|
}
|
||||||
@ -75,10 +81,17 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
|
|||||||
// set the accept header
|
// set the accept header
|
||||||
msg.Header["Accept"] = req.ContentType()
|
msg.Header["Accept"] = req.ContentType()
|
||||||
|
|
||||||
cf, err := r.newCodec(req.ContentType())
|
// setup old protocol
|
||||||
|
cf := setupProtocol(msg, node)
|
||||||
|
|
||||||
|
// no codec specified
|
||||||
|
if cf == nil {
|
||||||
|
var err error
|
||||||
|
cf, err = r.newCodec(req.ContentType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var grr error
|
var grr error
|
||||||
c, err := r.pool.getConn(address, r.opts.Transport, transport.WithTimeout(opts.DialTimeout))
|
c, err := r.pool.getConn(address, r.opts.Transport, transport.WithTimeout(opts.DialTimeout))
|
||||||
@ -144,7 +157,12 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Stream, error) {
|
func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) {
|
||||||
|
address := node.Address
|
||||||
|
if node.Port > 0 {
|
||||||
|
address = fmt.Sprintf("%s:%d", address, node.Port)
|
||||||
|
}
|
||||||
|
|
||||||
msg := &transport.Message{
|
msg := &transport.Message{
|
||||||
Header: make(map[string]string),
|
Header: make(map[string]string),
|
||||||
}
|
}
|
||||||
@ -163,10 +181,17 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
|
|||||||
// set the accept header
|
// set the accept header
|
||||||
msg.Header["Accept"] = req.ContentType()
|
msg.Header["Accept"] = req.ContentType()
|
||||||
|
|
||||||
cf, err := r.newCodec(req.ContentType())
|
// set old codecs
|
||||||
|
cf := setupProtocol(msg, node)
|
||||||
|
|
||||||
|
// no codec specified
|
||||||
|
if cf == nil {
|
||||||
|
var err error
|
||||||
|
cf, err = r.newCodec(req.ContentType())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
dOpts := []transport.DialOption{
|
dOpts := []transport.DialOption{
|
||||||
transport.WithStream(),
|
transport.WithStream(),
|
||||||
@ -245,9 +270,19 @@ func (r *rpcClient) Options() Options {
|
|||||||
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
|
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
|
||||||
// return remote address
|
// return remote address
|
||||||
if len(opts.Address) > 0 {
|
if len(opts.Address) > 0 {
|
||||||
|
address := opts.Address
|
||||||
|
port := 0
|
||||||
|
|
||||||
|
host, sport, err := net.SplitHostPort(opts.Address)
|
||||||
|
if err == nil {
|
||||||
|
address = host
|
||||||
|
port, _ = strconv.Atoi(sport)
|
||||||
|
}
|
||||||
|
|
||||||
return func() (*registry.Node, error) {
|
return func() (*registry.Node, error) {
|
||||||
return ®istry.Node{
|
return ®istry.Node{
|
||||||
Address: opts.Address,
|
Address: address,
|
||||||
|
Port: port,
|
||||||
}, nil
|
}, nil
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@ -323,14 +358,8 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
|||||||
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
|
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the address
|
|
||||||
address := node.Address
|
|
||||||
if node.Port > 0 {
|
|
||||||
address = fmt.Sprintf("%s:%d", address, node.Port)
|
|
||||||
}
|
|
||||||
|
|
||||||
// make the call
|
// make the call
|
||||||
err = rcall(ctx, address, request, response, callOpts)
|
err = rcall(ctx, node, request, response, callOpts)
|
||||||
r.opts.Selector.Mark(request.Service(), node, err)
|
r.opts.Selector.Mark(request.Service(), node, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -406,12 +435,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
|||||||
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
|
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
address := node.Address
|
stream, err := r.stream(ctx, node, request, callOpts)
|
||||||
if node.Port > 0 {
|
|
||||||
address = fmt.Sprintf("%s:%d", address, node.Port)
|
|
||||||
}
|
|
||||||
|
|
||||||
stream, err := r.stream(ctx, address, request, callOpts)
|
|
||||||
r.opts.Selector.Mark(request.Service(), node, err)
|
r.opts.Selector.Mark(request.Service(), node, err)
|
||||||
return stream, err
|
return stream, err
|
||||||
}
|
}
|
||||||
|
@ -21,10 +21,11 @@ func TestCallAddress(t *testing.T) {
|
|||||||
var called bool
|
var called bool
|
||||||
service := "test.service"
|
service := "test.service"
|
||||||
endpoint := "Test.Endpoint"
|
endpoint := "Test.Endpoint"
|
||||||
address := "10.1.10.1:8080"
|
address := "10.1.10.1"
|
||||||
|
port := 8080
|
||||||
|
|
||||||
wrap := func(cf CallFunc) CallFunc {
|
wrap := func(cf CallFunc) CallFunc {
|
||||||
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
|
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
|
||||||
called = true
|
called = true
|
||||||
|
|
||||||
if req.Service() != service {
|
if req.Service() != service {
|
||||||
@ -35,8 +36,12 @@ func TestCallAddress(t *testing.T) {
|
|||||||
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
|
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
|
||||||
}
|
}
|
||||||
|
|
||||||
if addr != address {
|
if node.Address != address {
|
||||||
return fmt.Errorf("expected address: %s got %s", address, addr)
|
return fmt.Errorf("expected address: %s got %s", address, node.Address)
|
||||||
|
}
|
||||||
|
|
||||||
|
if node.Port != port {
|
||||||
|
return fmt.Errorf("expected address: %d got %d", port, node.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
// don't do the call
|
// don't do the call
|
||||||
@ -54,7 +59,7 @@ func TestCallAddress(t *testing.T) {
|
|||||||
req := c.NewRequest(service, endpoint, nil)
|
req := c.NewRequest(service, endpoint, nil)
|
||||||
|
|
||||||
// test calling remote address
|
// test calling remote address
|
||||||
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
|
if err := c.Call(context.Background(), req, nil, WithAddress(fmt.Sprintf("%s:%d", address, port))); err != nil {
|
||||||
t.Fatal("call with address error", err)
|
t.Fatal("call with address error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,12 +72,12 @@ func TestCallAddress(t *testing.T) {
|
|||||||
func TestCallRetry(t *testing.T) {
|
func TestCallRetry(t *testing.T) {
|
||||||
service := "test.service"
|
service := "test.service"
|
||||||
endpoint := "Test.Endpoint"
|
endpoint := "Test.Endpoint"
|
||||||
address := "10.1.10.1:8080"
|
address := "10.1.10.1"
|
||||||
|
|
||||||
var called int
|
var called int
|
||||||
|
|
||||||
wrap := func(cf CallFunc) CallFunc {
|
wrap := func(cf CallFunc) CallFunc {
|
||||||
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
|
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
|
||||||
called++
|
called++
|
||||||
if called == 1 {
|
if called == 1 {
|
||||||
return errors.InternalServerError("test.error", "retry request")
|
return errors.InternalServerError("test.error", "retry request")
|
||||||
@ -108,12 +113,11 @@ func TestCallWrapper(t *testing.T) {
|
|||||||
id := "test.1"
|
id := "test.1"
|
||||||
service := "test.service"
|
service := "test.service"
|
||||||
endpoint := "Test.Endpoint"
|
endpoint := "Test.Endpoint"
|
||||||
host := "10.1.10.1"
|
address := "10.1.10.1"
|
||||||
port := 8080
|
port := 8080
|
||||||
address := "10.1.10.1:8080"
|
|
||||||
|
|
||||||
wrap := func(cf CallFunc) CallFunc {
|
wrap := func(cf CallFunc) CallFunc {
|
||||||
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
|
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
|
||||||
called = true
|
called = true
|
||||||
|
|
||||||
if req.Service() != service {
|
if req.Service() != service {
|
||||||
@ -124,8 +128,8 @@ func TestCallWrapper(t *testing.T) {
|
|||||||
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
|
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
|
||||||
}
|
}
|
||||||
|
|
||||||
if addr != address {
|
if node.Address != address {
|
||||||
return fmt.Errorf("expected address: %s got %s", address, addr)
|
return fmt.Errorf("expected address: %s got %s", address, node.Address)
|
||||||
}
|
}
|
||||||
|
|
||||||
// don't do the call
|
// don't do the call
|
||||||
@ -146,7 +150,7 @@ func TestCallWrapper(t *testing.T) {
|
|||||||
Nodes: []*registry.Node{
|
Nodes: []*registry.Node{
|
||||||
®istry.Node{
|
®istry.Node{
|
||||||
Id: id,
|
Id: id,
|
||||||
Address: host,
|
Address: address,
|
||||||
Port: port,
|
Port: port,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/micro/go-micro/codec/proto"
|
"github.com/micro/go-micro/codec/proto"
|
||||||
"github.com/micro/go-micro/codec/protorpc"
|
"github.com/micro/go-micro/codec/protorpc"
|
||||||
"github.com/micro/go-micro/errors"
|
"github.com/micro/go-micro/errors"
|
||||||
|
"github.com/micro/go-micro/registry"
|
||||||
"github.com/micro/go-micro/transport"
|
"github.com/micro/go-micro/transport"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -58,6 +59,15 @@ var (
|
|||||||
"application/proto-rpc": protorpc.NewCodec,
|
"application/proto-rpc": protorpc.NewCodec,
|
||||||
"application/octet-stream": raw.NewCodec,
|
"application/octet-stream": raw.NewCodec,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: remove legacy codec list
|
||||||
|
defaultCodecs = map[string]codec.NewCodec{
|
||||||
|
"application/json": jsonrpc.NewCodec,
|
||||||
|
"application/json-rpc": jsonrpc.NewCodec,
|
||||||
|
"application/protobuf": protorpc.NewCodec,
|
||||||
|
"application/proto-rpc": protorpc.NewCodec,
|
||||||
|
"application/octet-stream": protorpc.NewCodec,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func (rwc *readWriteCloser) Read(p []byte) (n int, err error) {
|
func (rwc *readWriteCloser) Read(p []byte) (n int, err error) {
|
||||||
@ -74,6 +84,27 @@ func (rwc *readWriteCloser) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setupProtocol sets up the old protocol
|
||||||
|
func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
|
||||||
|
protocol := node.Metadata["protocol"]
|
||||||
|
|
||||||
|
// got protocol
|
||||||
|
if len(protocol) > 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// no protocol use old codecs
|
||||||
|
switch msg.Header["Content-Type"] {
|
||||||
|
case "application/json":
|
||||||
|
msg.Header["Content-Type"] = "application/json-rpc"
|
||||||
|
case "application/protobuf":
|
||||||
|
msg.Header["Content-Type"] = "application/proto-rpc"
|
||||||
|
}
|
||||||
|
|
||||||
|
// now return codec
|
||||||
|
return defaultCodecs[msg.Header["Content-Type"]]
|
||||||
|
}
|
||||||
|
|
||||||
func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec {
|
func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec {
|
||||||
rwc := &readWriteCloser{
|
rwc := &readWriteCloser{
|
||||||
wbuf: bytes.NewBuffer(nil),
|
wbuf: bytes.NewBuffer(nil),
|
||||||
@ -104,6 +135,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
|
|||||||
// set the mucp headers
|
// set the mucp headers
|
||||||
m.Header["X-Micro-Id"] = m.Id
|
m.Header["X-Micro-Id"] = m.Id
|
||||||
m.Header["X-Micro-Service"] = m.Target
|
m.Header["X-Micro-Service"] = m.Target
|
||||||
|
m.Header["X-Micro-Method"] = m.Method
|
||||||
m.Header["X-Micro-Endpoint"] = m.Endpoint
|
m.Header["X-Micro-Endpoint"] = m.Endpoint
|
||||||
|
|
||||||
// if body is bytes Frame don't encode
|
// if body is bytes Frame don't encode
|
||||||
@ -154,6 +186,7 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error {
|
|||||||
// read header
|
// read header
|
||||||
err := c.codec.ReadHeader(&me, r)
|
err := c.codec.ReadHeader(&me, r)
|
||||||
wm.Endpoint = me.Endpoint
|
wm.Endpoint = me.Endpoint
|
||||||
|
wm.Method = me.Method
|
||||||
wm.Id = me.Id
|
wm.Id = me.Id
|
||||||
wm.Error = me.Error
|
wm.Error = me.Error
|
||||||
|
|
||||||
@ -162,11 +195,16 @@ func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error {
|
|||||||
wm.Error = me.Header["X-Micro-Error"]
|
wm.Error = me.Header["X-Micro-Error"]
|
||||||
}
|
}
|
||||||
|
|
||||||
// check method in header
|
// check endpoint in header
|
||||||
if len(me.Endpoint) == 0 {
|
if len(me.Endpoint) == 0 {
|
||||||
wm.Endpoint = me.Header["X-Micro-Endpoint"]
|
wm.Endpoint = me.Header["X-Micro-Endpoint"]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check method in header
|
||||||
|
if len(me.Method) == 0 {
|
||||||
|
wm.Method = me.Header["X-Micro-Method"]
|
||||||
|
}
|
||||||
|
|
||||||
if len(me.Id) == 0 {
|
if len(me.Id) == 0 {
|
||||||
wm.Id = me.Header["X-Micro-Id"]
|
wm.Id = me.Header["X-Micro-Id"]
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
|
|
||||||
type rpcRequest struct {
|
type rpcRequest struct {
|
||||||
service string
|
service string
|
||||||
|
method string
|
||||||
endpoint string
|
endpoint string
|
||||||
contentType string
|
contentType string
|
||||||
codec codec.Codec
|
codec codec.Codec
|
||||||
@ -27,6 +28,7 @@ func newRequest(service, endpoint string, request interface{}, contentType strin
|
|||||||
|
|
||||||
return &rpcRequest{
|
return &rpcRequest{
|
||||||
service: service,
|
service: service,
|
||||||
|
method: endpoint,
|
||||||
endpoint: endpoint,
|
endpoint: endpoint,
|
||||||
body: request,
|
body: request,
|
||||||
contentType: contentType,
|
contentType: contentType,
|
||||||
@ -42,6 +44,10 @@ func (r *rpcRequest) Service() string {
|
|||||||
return r.service
|
return r.service
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *rpcRequest) Method() string {
|
||||||
|
return r.method
|
||||||
|
}
|
||||||
|
|
||||||
func (r *rpcRequest) Endpoint() string {
|
func (r *rpcRequest) Endpoint() string {
|
||||||
return r.endpoint
|
return r.endpoint
|
||||||
}
|
}
|
||||||
|
@ -53,6 +53,7 @@ func (r *rpcStream) Send(msg interface{}) error {
|
|||||||
req := codec.Message{
|
req := codec.Message{
|
||||||
Id: r.id,
|
Id: r.id,
|
||||||
Target: r.request.Service(),
|
Target: r.request.Service(),
|
||||||
|
Method: r.request.Method(),
|
||||||
Endpoint: r.request.Endpoint(),
|
Endpoint: r.request.Endpoint(),
|
||||||
Type: codec.Request,
|
Type: codec.Request,
|
||||||
}
|
}
|
||||||
|
@ -2,10 +2,12 @@ package client
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/micro/go-micro/registry"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CallFunc represents the individual call func
|
// CallFunc represents the individual call func
|
||||||
type CallFunc func(ctx context.Context, address string, req Request, rsp interface{}, opts CallOptions) error
|
type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error
|
||||||
|
|
||||||
// CallWrapper is a low level wrapper for the CallFunc
|
// CallWrapper is a low level wrapper for the CallFunc
|
||||||
type CallWrapper func(CallFunc) CallFunc
|
type CallWrapper func(CallFunc) CallFunc
|
||||||
|
@ -53,6 +53,7 @@ type Message struct {
|
|||||||
Id string
|
Id string
|
||||||
Type MessageType
|
Type MessageType
|
||||||
Target string
|
Target string
|
||||||
|
Method string
|
||||||
Endpoint string
|
Endpoint string
|
||||||
Error string
|
Error string
|
||||||
|
|
||||||
|
@ -45,9 +45,9 @@ func newClientCodec(conn io.ReadWriteCloser) *clientCodec {
|
|||||||
|
|
||||||
func (c *clientCodec) Write(m *codec.Message, b interface{}) error {
|
func (c *clientCodec) Write(m *codec.Message, b interface{}) error {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
c.pending[m.Id] = m.Endpoint
|
c.pending[m.Id] = m.Method
|
||||||
c.Unlock()
|
c.Unlock()
|
||||||
c.req.Method = m.Endpoint
|
c.req.Method = m.Method
|
||||||
c.req.Params[0] = b
|
c.req.Params[0] = b
|
||||||
c.req.ID = m.Id
|
c.req.ID = m.Id
|
||||||
return c.enc.Encode(&c.req)
|
return c.enc.Encode(&c.req)
|
||||||
@ -66,7 +66,7 @@ func (c *clientCodec) ReadHeader(m *codec.Message) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
c.Lock()
|
c.Lock()
|
||||||
m.Endpoint = c.pending[c.resp.ID]
|
m.Method = c.pending[c.resp.ID]
|
||||||
delete(c.pending, c.resp.ID)
|
delete(c.pending, c.resp.ID)
|
||||||
c.Unlock()
|
c.Unlock()
|
||||||
|
|
||||||
|
@ -53,7 +53,7 @@ func (c *serverCodec) ReadHeader(m *codec.Message) error {
|
|||||||
if err := c.dec.Decode(&c.req); err != nil {
|
if err := c.dec.Decode(&c.req); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m.Endpoint = c.req.Method
|
m.Method = c.req.Method
|
||||||
m.Id = fmt.Sprintf("%v", c.req.ID)
|
m.Id = fmt.Sprintf("%v", c.req.ID)
|
||||||
c.req.ID = nil
|
c.req.ID = nil
|
||||||
return nil
|
return nil
|
||||||
|
@ -47,7 +47,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
|
|||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
// This is protobuf, of course we copy it.
|
// This is protobuf, of course we copy it.
|
||||||
pbr := &Request{ServiceMethod: &m.Endpoint, Seq: id(m.Id)}
|
pbr := &Request{ServiceMethod: &m.Method, Seq: id(m.Id)}
|
||||||
data, err := proto.Marshal(pbr)
|
data, err := proto.Marshal(pbr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -73,7 +73,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
|
|||||||
case codec.Response, codec.Error:
|
case codec.Response, codec.Error:
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
rtmp := &Response{ServiceMethod: &m.Endpoint, Seq: id(m.Id), Error: &m.Error}
|
rtmp := &Response{ServiceMethod: &m.Method, Seq: id(m.Id), Error: &m.Error}
|
||||||
data, err := proto.Marshal(rtmp)
|
data, err := proto.Marshal(rtmp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -126,7 +126,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m.Endpoint = rtmp.GetServiceMethod()
|
m.Method = rtmp.GetServiceMethod()
|
||||||
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
|
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
|
||||||
case codec.Response:
|
case codec.Response:
|
||||||
data, err := ReadNetString(c.rwc)
|
data, err := ReadNetString(c.rwc)
|
||||||
@ -138,7 +138,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
m.Endpoint = rtmp.GetServiceMethod()
|
m.Method = rtmp.GetServiceMethod()
|
||||||
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
|
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
|
||||||
m.Error = rtmp.GetError()
|
m.Error = rtmp.GetError()
|
||||||
case codec.Publication:
|
case codec.Publication:
|
||||||
|
@ -41,6 +41,15 @@ var (
|
|||||||
"application/proto-rpc": protorpc.NewCodec,
|
"application/proto-rpc": protorpc.NewCodec,
|
||||||
"application/octet-stream": raw.NewCodec,
|
"application/octet-stream": raw.NewCodec,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: remove legacy codec list
|
||||||
|
defaultCodecs = map[string]codec.NewCodec{
|
||||||
|
"application/json": jsonrpc.NewCodec,
|
||||||
|
"application/json-rpc": jsonrpc.NewCodec,
|
||||||
|
"application/protobuf": protorpc.NewCodec,
|
||||||
|
"application/proto-rpc": protorpc.NewCodec,
|
||||||
|
"application/octet-stream": protorpc.NewCodec,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func (rwc *readWriteCloser) Read(p []byte) (n int, err error) {
|
func (rwc *readWriteCloser) Read(p []byte) (n int, err error) {
|
||||||
@ -57,6 +66,42 @@ func (rwc *readWriteCloser) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setupProtocol sets up the old protocol
|
||||||
|
func setupProtocol(msg *transport.Message) codec.NewCodec {
|
||||||
|
service := msg.Header["X-Micro-Service"]
|
||||||
|
method := msg.Header["X-Micro-Method"]
|
||||||
|
endpoint := msg.Header["X-Micro-Endpoint"]
|
||||||
|
protocol := msg.Header["X-Micro-Protocol"]
|
||||||
|
target := msg.Header["X-Micro-Target"]
|
||||||
|
|
||||||
|
// if the protocol exists (mucp) do nothing
|
||||||
|
if len(protocol) > 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// if no service/method/endpoint then it's the old protocol
|
||||||
|
if len(service) == 0 && len(method) == 0 && len(endpoint) == 0 {
|
||||||
|
return defaultCodecs[msg.Header["Content-Type"]]
|
||||||
|
}
|
||||||
|
|
||||||
|
// old target method specified
|
||||||
|
if len(target) > 0 {
|
||||||
|
return defaultCodecs[msg.Header["Content-Type"]]
|
||||||
|
}
|
||||||
|
|
||||||
|
// no method then set to endpoint
|
||||||
|
if len(method) == 0 {
|
||||||
|
msg.Header["X-Micro-Method"] = method
|
||||||
|
}
|
||||||
|
|
||||||
|
// no endpoint then set to method
|
||||||
|
if len(endpoint) == 0 {
|
||||||
|
msg.Header["X-Micro-Endpoint"] = method
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec {
|
func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec {
|
||||||
rwc := &readWriteCloser{
|
rwc := &readWriteCloser{
|
||||||
rbuf: bytes.NewBuffer(req.Body),
|
rbuf: bytes.NewBuffer(req.Body),
|
||||||
@ -109,6 +154,7 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
|
|||||||
|
|
||||||
// set some internal things
|
// set some internal things
|
||||||
m.Target = m.Header["X-Micro-Service"]
|
m.Target = m.Header["X-Micro-Service"]
|
||||||
|
m.Method = m.Header["X-Micro-Method"]
|
||||||
m.Endpoint = m.Header["X-Micro-Endpoint"]
|
m.Endpoint = m.Header["X-Micro-Endpoint"]
|
||||||
m.Id = m.Header["X-Micro-Id"]
|
m.Id = m.Header["X-Micro-Id"]
|
||||||
|
|
||||||
@ -116,9 +162,15 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
|
|||||||
err := c.codec.ReadHeader(&m, codec.Request)
|
err := c.codec.ReadHeader(&m, codec.Request)
|
||||||
|
|
||||||
// set the method/id
|
// set the method/id
|
||||||
|
r.Method = m.Method
|
||||||
r.Endpoint = m.Endpoint
|
r.Endpoint = m.Endpoint
|
||||||
r.Id = m.Id
|
r.Id = m.Id
|
||||||
|
|
||||||
|
// TODO: remove the old legacy cruft
|
||||||
|
if len(r.Endpoint) == 0 {
|
||||||
|
r.Endpoint = r.Method
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,6 +193,8 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
|
|||||||
|
|
||||||
// create a new message
|
// create a new message
|
||||||
m := &codec.Message{
|
m := &codec.Message{
|
||||||
|
Target: r.Target,
|
||||||
|
Method: r.Method,
|
||||||
Endpoint: r.Endpoint,
|
Endpoint: r.Endpoint,
|
||||||
Id: r.Id,
|
Id: r.Id,
|
||||||
Error: r.Error,
|
Error: r.Error,
|
||||||
@ -162,6 +216,11 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
|
|||||||
m.Header["X-Micro-Service"] = r.Target
|
m.Header["X-Micro-Service"] = r.Target
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set request method
|
||||||
|
if len(r.Method) > 0 {
|
||||||
|
m.Header["X-Micro-Method"] = r.Method
|
||||||
|
}
|
||||||
|
|
||||||
// set request endpoint
|
// set request endpoint
|
||||||
if len(r.Endpoint) > 0 {
|
if len(r.Endpoint) > 0 {
|
||||||
m.Header["X-Micro-Endpoint"] = r.Endpoint
|
m.Header["X-Micro-Endpoint"] = r.Endpoint
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
|
|
||||||
type rpcRequest struct {
|
type rpcRequest struct {
|
||||||
service string
|
service string
|
||||||
|
method string
|
||||||
endpoint string
|
endpoint string
|
||||||
contentType string
|
contentType string
|
||||||
socket transport.Socket
|
socket transport.Socket
|
||||||
@ -34,6 +35,10 @@ func (r *rpcRequest) Service() string {
|
|||||||
return r.service
|
return r.service
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *rpcRequest) Method() string {
|
||||||
|
return r.method
|
||||||
|
}
|
||||||
|
|
||||||
func (r *rpcRequest) Endpoint() string {
|
func (r *rpcRequest) Endpoint() string {
|
||||||
return r.endpoint
|
return r.endpoint
|
||||||
}
|
}
|
||||||
|
@ -189,6 +189,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
|
|||||||
r := &rpcRequest{
|
r := &rpcRequest{
|
||||||
service: req.msg.Target,
|
service: req.msg.Target,
|
||||||
contentType: req.msg.Header["Content-Type"],
|
contentType: req.msg.Header["Content-Type"],
|
||||||
|
method: req.msg.Method,
|
||||||
endpoint: req.msg.Endpoint,
|
endpoint: req.msg.Endpoint,
|
||||||
body: req.msg.Body,
|
body: req.msg.Body,
|
||||||
}
|
}
|
||||||
|
@ -97,9 +97,14 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
|||||||
ct = DefaultContentType
|
ct = DefaultContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// setup old protocol
|
||||||
|
cf := setupProtocol(&msg)
|
||||||
|
|
||||||
|
// no old codec
|
||||||
|
if cf == nil {
|
||||||
// TODO: needs better error handling
|
// TODO: needs better error handling
|
||||||
cf, err := s.newCodec(ct)
|
var err error
|
||||||
if err != nil {
|
if cf, err = s.newCodec(ct); err != nil {
|
||||||
sock.Send(&transport.Message{
|
sock.Send(&transport.Message{
|
||||||
Header: map[string]string{
|
Header: map[string]string{
|
||||||
"Content-Type": "text/plain",
|
"Content-Type": "text/plain",
|
||||||
@ -109,12 +114,14 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
|||||||
s.wg.Done()
|
s.wg.Done()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rcodec := newRpcCodec(&msg, sock, cf)
|
rcodec := newRpcCodec(&msg, sock, cf)
|
||||||
|
|
||||||
// internal request
|
// internal request
|
||||||
request := &rpcRequest{
|
request := &rpcRequest{
|
||||||
service: msg.Header["X-Micro-Service"],
|
service: msg.Header["X-Micro-Service"],
|
||||||
|
method: msg.Header["X-Micro-Method"],
|
||||||
endpoint: msg.Header["X-Micro-Endpoint"],
|
endpoint: msg.Header["X-Micro-Endpoint"],
|
||||||
contentType: ct,
|
contentType: ct,
|
||||||
codec: rcodec,
|
codec: rcodec,
|
||||||
@ -276,6 +283,7 @@ func (s *rpcServer) Register() error {
|
|||||||
node.Metadata["broker"] = config.Broker.String()
|
node.Metadata["broker"] = config.Broker.String()
|
||||||
node.Metadata["server"] = s.String()
|
node.Metadata["server"] = s.String()
|
||||||
node.Metadata["registry"] = config.Registry.String()
|
node.Metadata["registry"] = config.Registry.String()
|
||||||
|
node.Metadata["protocol"] = "mucp"
|
||||||
|
|
||||||
s.RLock()
|
s.RLock()
|
||||||
// Maps are ordered randomly, sort the keys for consistency
|
// Maps are ordered randomly, sort the keys for consistency
|
||||||
|
@ -31,6 +31,8 @@ func (r *rpcStream) Send(msg interface{}) error {
|
|||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
|
|
||||||
resp := codec.Message{
|
resp := codec.Message{
|
||||||
|
Target: r.request.Service(),
|
||||||
|
Method: r.request.Method(),
|
||||||
Endpoint: r.request.Endpoint(),
|
Endpoint: r.request.Endpoint(),
|
||||||
Id: r.id,
|
Id: r.id,
|
||||||
Type: codec.Response,
|
Type: codec.Response,
|
||||||
|
@ -45,6 +45,8 @@ type Message interface {
|
|||||||
type Request interface {
|
type Request interface {
|
||||||
// Service name requested
|
// Service name requested
|
||||||
Service() string
|
Service() string
|
||||||
|
// The action requested
|
||||||
|
Method() string
|
||||||
// Endpoint name requested
|
// Endpoint name requested
|
||||||
Endpoint() string
|
Endpoint() string
|
||||||
// Content type provided
|
// Content type provided
|
||||||
|
Loading…
Reference in New Issue
Block a user