Use protocol from node metadata
This commit is contained in:
parent
9bd32645be
commit
6468733d98
@ -4,10 +4,11 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-micro/broker"
|
||||
@ -56,7 +57,12 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
|
||||
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
|
||||
}
|
||||
|
||||
func (r *rpcClient) call(ctx context.Context, address string, req Request, resp interface{}, opts CallOptions) error {
|
||||
func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error {
|
||||
address := node.Address
|
||||
if node.Port > 0 {
|
||||
address = fmt.Sprintf("%s:%d", address, node.Port)
|
||||
}
|
||||
|
||||
msg := &transport.Message{
|
||||
Header: make(map[string]string),
|
||||
}
|
||||
@ -75,10 +81,17 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
|
||||
// set the accept header
|
||||
msg.Header["Accept"] = req.ContentType()
|
||||
|
||||
cf, err := r.newCodec(req.ContentType())
|
||||
// setup old protocol
|
||||
cf := setupProtocol(msg, node)
|
||||
|
||||
// no codec specified
|
||||
if cf == nil {
|
||||
var err error
|
||||
cf, err = r.newCodec(req.ContentType())
|
||||
if err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
var grr error
|
||||
c, err := r.pool.getConn(address, r.opts.Transport, transport.WithTimeout(opts.DialTimeout))
|
||||
@ -144,7 +157,12 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Stream, error) {
|
||||
func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) {
|
||||
address := node.Address
|
||||
if node.Port > 0 {
|
||||
address = fmt.Sprintf("%s:%d", address, node.Port)
|
||||
}
|
||||
|
||||
msg := &transport.Message{
|
||||
Header: make(map[string]string),
|
||||
}
|
||||
@ -163,10 +181,17 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
|
||||
// set the accept header
|
||||
msg.Header["Accept"] = req.ContentType()
|
||||
|
||||
cf, err := r.newCodec(req.ContentType())
|
||||
// set old codecs
|
||||
cf := setupProtocol(msg, node)
|
||||
|
||||
// no codec specified
|
||||
if cf == nil {
|
||||
var err error
|
||||
cf, err = r.newCodec(req.ContentType())
|
||||
if err != nil {
|
||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
dOpts := []transport.DialOption{
|
||||
transport.WithStream(),
|
||||
@ -245,9 +270,19 @@ func (r *rpcClient) Options() Options {
|
||||
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
|
||||
// return remote address
|
||||
if len(opts.Address) > 0 {
|
||||
address := opts.Address
|
||||
port := 0
|
||||
|
||||
host, sport, err := net.SplitHostPort(opts.Address)
|
||||
if err == nil {
|
||||
address = host
|
||||
port, _ = strconv.Atoi(sport)
|
||||
}
|
||||
|
||||
return func() (*registry.Node, error) {
|
||||
return ®istry.Node{
|
||||
Address: opts.Address,
|
||||
Address: address,
|
||||
Port: port,
|
||||
}, nil
|
||||
}, nil
|
||||
}
|
||||
@ -323,14 +358,8 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
|
||||
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
|
||||
}
|
||||
|
||||
// set the address
|
||||
address := node.Address
|
||||
if node.Port > 0 {
|
||||
address = fmt.Sprintf("%s:%d", address, node.Port)
|
||||
}
|
||||
|
||||
// make the call
|
||||
err = rcall(ctx, address, request, response, callOpts)
|
||||
err = rcall(ctx, node, request, response, callOpts)
|
||||
r.opts.Selector.Mark(request.Service(), node, err)
|
||||
return err
|
||||
}
|
||||
@ -406,12 +435,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
|
||||
}
|
||||
|
||||
address := node.Address
|
||||
if node.Port > 0 {
|
||||
address = fmt.Sprintf("%s:%d", address, node.Port)
|
||||
}
|
||||
|
||||
stream, err := r.stream(ctx, address, request, callOpts)
|
||||
stream, err := r.stream(ctx, node, request, callOpts)
|
||||
r.opts.Selector.Mark(request.Service(), node, err)
|
||||
return stream, err
|
||||
}
|
||||
|
@ -21,10 +21,11 @@ func TestCallAddress(t *testing.T) {
|
||||
var called bool
|
||||
service := "test.service"
|
||||
endpoint := "Test.Endpoint"
|
||||
address := "10.1.10.1:8080"
|
||||
address := "10.1.10.1"
|
||||
port := 8080
|
||||
|
||||
wrap := func(cf CallFunc) CallFunc {
|
||||
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
|
||||
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
|
||||
called = true
|
||||
|
||||
if req.Service() != service {
|
||||
@ -35,8 +36,12 @@ func TestCallAddress(t *testing.T) {
|
||||
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
|
||||
}
|
||||
|
||||
if addr != address {
|
||||
return fmt.Errorf("expected address: %s got %s", address, addr)
|
||||
if node.Address != address {
|
||||
return fmt.Errorf("expected address: %s got %s", address, node.Address)
|
||||
}
|
||||
|
||||
if node.Port != port {
|
||||
return fmt.Errorf("expected address: %d got %d", port, node.Port)
|
||||
}
|
||||
|
||||
// don't do the call
|
||||
@ -54,7 +59,7 @@ func TestCallAddress(t *testing.T) {
|
||||
req := c.NewRequest(service, endpoint, nil)
|
||||
|
||||
// test calling remote address
|
||||
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
|
||||
if err := c.Call(context.Background(), req, nil, WithAddress(fmt.Sprintf("%s:%d", address, port))); err != nil {
|
||||
t.Fatal("call with address error", err)
|
||||
}
|
||||
|
||||
@ -67,12 +72,12 @@ func TestCallAddress(t *testing.T) {
|
||||
func TestCallRetry(t *testing.T) {
|
||||
service := "test.service"
|
||||
endpoint := "Test.Endpoint"
|
||||
address := "10.1.10.1:8080"
|
||||
address := "10.1.10.1"
|
||||
|
||||
var called int
|
||||
|
||||
wrap := func(cf CallFunc) CallFunc {
|
||||
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
|
||||
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
|
||||
called++
|
||||
if called == 1 {
|
||||
return errors.InternalServerError("test.error", "retry request")
|
||||
@ -108,12 +113,11 @@ func TestCallWrapper(t *testing.T) {
|
||||
id := "test.1"
|
||||
service := "test.service"
|
||||
endpoint := "Test.Endpoint"
|
||||
host := "10.1.10.1"
|
||||
address := "10.1.10.1"
|
||||
port := 8080
|
||||
address := "10.1.10.1:8080"
|
||||
|
||||
wrap := func(cf CallFunc) CallFunc {
|
||||
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
|
||||
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
|
||||
called = true
|
||||
|
||||
if req.Service() != service {
|
||||
@ -124,8 +128,8 @@ func TestCallWrapper(t *testing.T) {
|
||||
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
|
||||
}
|
||||
|
||||
if addr != address {
|
||||
return fmt.Errorf("expected address: %s got %s", address, addr)
|
||||
if node.Address != address {
|
||||
return fmt.Errorf("expected address: %s got %s", address, node.Address)
|
||||
}
|
||||
|
||||
// don't do the call
|
||||
@ -146,7 +150,7 @@ func TestCallWrapper(t *testing.T) {
|
||||
Nodes: []*registry.Node{
|
||||
®istry.Node{
|
||||
Id: id,
|
||||
Address: host,
|
||||
Address: address,
|
||||
Port: port,
|
||||
},
|
||||
},
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/micro/go-micro/codec/proto"
|
||||
"github.com/micro/go-micro/codec/protorpc"
|
||||
"github.com/micro/go-micro/errors"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/transport"
|
||||
)
|
||||
|
||||
@ -58,6 +59,15 @@ var (
|
||||
"application/proto-rpc": protorpc.NewCodec,
|
||||
"application/octet-stream": raw.NewCodec,
|
||||
}
|
||||
|
||||
// TODO: remove legacy codec list
|
||||
defaultCodecs = map[string]codec.NewCodec{
|
||||
"application/json": jsonrpc.NewCodec,
|
||||
"application/json-rpc": jsonrpc.NewCodec,
|
||||
"application/protobuf": protorpc.NewCodec,
|
||||
"application/proto-rpc": protorpc.NewCodec,
|
||||
"application/octet-stream": protorpc.NewCodec,
|
||||
}
|
||||
)
|
||||
|
||||
func (rwc *readWriteCloser) Read(p []byte) (n int, err error) {
|
||||
@ -74,6 +84,27 @@ func (rwc *readWriteCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupProtocol sets up the old protocol
|
||||
func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
|
||||
protocol := node.Metadata["protocol"]
|
||||
|
||||
// got protocol
|
||||
if len(protocol) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// no protocol use old codecs
|
||||
switch msg.Header["Content-Type"] {
|
||||
case "application/json":
|
||||
msg.Header["Content-Type"] = "application/json-rpc"
|
||||
case "application/protobuf":
|
||||
msg.Header["Content-Type"] = "application/proto-rpc"
|
||||
}
|
||||
|
||||
// now return codec
|
||||
return defaultCodecs[msg.Header["Content-Type"]]
|
||||
}
|
||||
|
||||
func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec {
|
||||
rwc := &readWriteCloser{
|
||||
wbuf: bytes.NewBuffer(nil),
|
||||
|
@ -2,10 +2,12 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
// CallFunc represents the individual call func
|
||||
type CallFunc func(ctx context.Context, address string, req Request, rsp interface{}, opts CallOptions) error
|
||||
type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error
|
||||
|
||||
// CallWrapper is a low level wrapper for the CallFunc
|
||||
type CallWrapper func(CallFunc) CallFunc
|
||||
|
@ -283,6 +283,7 @@ func (s *rpcServer) Register() error {
|
||||
node.Metadata["broker"] = config.Broker.String()
|
||||
node.Metadata["server"] = s.String()
|
||||
node.Metadata["registry"] = config.Registry.String()
|
||||
node.Metadata["protocol"] = "mucp"
|
||||
|
||||
s.RLock()
|
||||
// Maps are ordered randomly, sort the keys for consistency
|
||||
|
Loading…
x
Reference in New Issue
Block a user