From b1a31134bd5840cf95d371c0ce4d06f8647083d5 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 30 Jan 2019 18:42:11 +0000 Subject: [PATCH] Support micro proxy --- client/rpc_client.go | 26 ++++++++++++++++++++++---- client/rpc_codec.go | 5 +++-- transport/http_transport.go | 6 +++--- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/client/rpc_client.go b/client/rpc_client.go index 77c546cb..864eaad1 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "net" + "os" "strconv" "sync" "sync/atomic" @@ -213,12 +214,17 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request codec: codec, } + // set request codec + if r, ok := req.(*rpcRequest); ok { + r.codec = codec + } + stream := &rpcStream{ context: ctx, request: req, response: rsp, closed: make(chan bool), - codec: newRpcCodec(msg, c, cf), + codec: codec, } ch := make(chan error, 1) @@ -268,6 +274,18 @@ func (r *rpcClient) Options() Options { } func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) { + service := request.Service() + + // get proxy + if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 { + service = prx + } + + // get proxy address + if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 { + opts.Address = prx + } + // return remote address if len(opts.Address) > 0 { address := opts.Address @@ -288,11 +306,11 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro } // get next nodes from the selector - next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...) + next, err := r.opts.Selector.Select(service, opts.SelectOptions...) if err != nil && err == selector.ErrNotFound { - return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error()) + return nil, errors.NotFound("go.micro.client", "service %s: %v", service, err.Error()) } else if err != nil { - return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", request.Service(), err.Error()) + return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", service, err.Error()) } return next, nil diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 53588ab9..6ff84a64 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -115,6 +115,9 @@ func getHeaders(m *codec.Message) { func setHeaders(m *codec.Message) { set := func(hdr, v string) { + if len(v) == 0 { + return + } m.Header[hdr] = v m.Header["X-"+hdr] = v } @@ -200,12 +203,10 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error { Header: m.Header, Body: m.Body, } - // send the request if err := c.client.Send(&msg); err != nil { return errors.InternalServerError("go.micro.client.transport", err.Error()) } - return nil } diff --git a/transport/http_transport.go b/transport/http_transport.go index 154448da..2b3619ed 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -35,7 +35,7 @@ type httpTransportClient struct { dialOpts DialOptions once sync.Once - sync.Mutex + sync.RWMutex r chan *http.Request bl []*http.Request buff *bufio.Reader @@ -133,11 +133,11 @@ func (h *httpTransportClient) Recv(m *Message) error { r = rc } - h.Lock() - defer h.Unlock() + h.RLock() if h.buff == nil { return io.EOF } + h.RUnlock() // set timeout if its greater than 0 if h.ht.opts.Timeout > time.Duration(0) {