Support micro proxy

This commit is contained in:
Asim Aslam 2019-01-30 18:42:11 +00:00
parent 107b571019
commit b1a31134bd
3 changed files with 28 additions and 9 deletions

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"os"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -213,12 +214,17 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
codec: codec, codec: codec,
} }
// set request codec
if r, ok := req.(*rpcRequest); ok {
r.codec = codec
}
stream := &rpcStream{ stream := &rpcStream{
context: ctx, context: ctx,
request: req, request: req,
response: rsp, response: rsp,
closed: make(chan bool), closed: make(chan bool),
codec: newRpcCodec(msg, c, cf), codec: codec,
} }
ch := make(chan error, 1) ch := make(chan error, 1)
@ -268,6 +274,18 @@ func (r *rpcClient) Options() Options {
} }
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) { func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
service := request.Service()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
service = prx
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
opts.Address = prx
}
// return remote address // return remote address
if len(opts.Address) > 0 { if len(opts.Address) > 0 {
address := opts.Address address := opts.Address
@ -288,11 +306,11 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro
} }
// get next nodes from the selector // get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...) next, err := r.opts.Selector.Select(service, opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound { if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error()) return nil, errors.NotFound("go.micro.client", "service %s: %v", service, err.Error())
} else if err != nil { } else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", request.Service(), err.Error()) return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", service, err.Error())
} }
return next, nil return next, nil

View File

@ -115,6 +115,9 @@ func getHeaders(m *codec.Message) {
func setHeaders(m *codec.Message) { func setHeaders(m *codec.Message) {
set := func(hdr, v string) { set := func(hdr, v string) {
if len(v) == 0 {
return
}
m.Header[hdr] = v m.Header[hdr] = v
m.Header["X-"+hdr] = v m.Header["X-"+hdr] = v
} }
@ -200,12 +203,10 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
Header: m.Header, Header: m.Header,
Body: m.Body, Body: m.Body,
} }
// send the request // send the request
if err := c.client.Send(&msg); err != nil { if err := c.client.Send(&msg); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error()) return errors.InternalServerError("go.micro.client.transport", err.Error())
} }
return nil return nil
} }

View File

@ -35,7 +35,7 @@ type httpTransportClient struct {
dialOpts DialOptions dialOpts DialOptions
once sync.Once once sync.Once
sync.Mutex sync.RWMutex
r chan *http.Request r chan *http.Request
bl []*http.Request bl []*http.Request
buff *bufio.Reader buff *bufio.Reader
@ -133,11 +133,11 @@ func (h *httpTransportClient) Recv(m *Message) error {
r = rc r = rc
} }
h.Lock() h.RLock()
defer h.Unlock()
if h.buff == nil { if h.buff == nil {
return io.EOF return io.EOF
} }
h.RUnlock()
// set timeout if its greater than 0 // set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) { if h.ht.opts.Timeout > time.Duration(0) {