micro/client/rpc_client.go
Vasiliy Tolstov 58598d0fe0
fixes for safe conversation and avoid panics (#1213)
* fixes for safe convertation

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* fix client publish panic

If broker connect returns error we dont check it status and use
it later to publish message, mostly this is unexpected because
broker connection failed and we cant use it.
Also proposed solution have benefit - we flag connection status
only when we have succeseful broker connection

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* api/handler/broker: fix possible broker publish panic

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2020-02-18 23:05:38 +00:00

671 lines
14 KiB
Go

package client
import (
"context"
"fmt"
"os"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/v2/broker"
"github.com/micro/go-micro/v2/client/selector"
"github.com/micro/go-micro/v2/codec"
raw "github.com/micro/go-micro/v2/codec/bytes"
"github.com/micro/go-micro/v2/errors"
"github.com/micro/go-micro/v2/metadata"
"github.com/micro/go-micro/v2/registry"
"github.com/micro/go-micro/v2/transport"
"github.com/micro/go-micro/v2/util/buf"
"github.com/micro/go-micro/v2/util/pool"
)
type rpcClient struct {
once atomic.Value
opts Options
pool pool.Pool
seq uint64
}
func newRpcClient(opt ...Option) Client {
opts := NewOptions(opt...)
p := pool.NewPool(
pool.Size(opts.PoolSize),
pool.TTL(opts.PoolTTL),
pool.Transport(opts.Transport),
)
rc := &rpcClient{
opts: opts,
pool: p,
seq: 0,
}
rc.once.Store(false)
c := Client(rc)
// wrap in reverse
for i := len(opts.Wrappers); i > 0; i-- {
c = opts.Wrappers[i-1](c)
}
return c
}
func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
if c, ok := r.opts.Codecs[contentType]; ok {
return c, nil
}
if cf, ok := DefaultCodecs[contentType]; ok {
return cf, nil
}
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
}
func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error {
address := node.Address
msg := &transport.Message{
Header: make(map[string]string),
}
md, ok := metadata.FromContext(ctx)
if ok {
for k, v := range md {
// don't copy Micro-Topic header, that used for pub/sub
// this fix case then client uses the same context that received in subscriber
if k == "Micro-Topic" {
continue
}
msg.Header[k] = v
}
}
// set timeout in nanoseconds
msg.Header["Timeout"] = fmt.Sprintf("%d", opts.RequestTimeout)
// set the content type for the request
msg.Header["Content-Type"] = req.ContentType()
// set the accept header
msg.Header["Accept"] = req.ContentType()
// setup old protocol
cf := setupProtocol(msg, node)
// no codec specified
if cf == nil {
var err error
cf, err = r.newCodec(req.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
}
dOpts := []transport.DialOption{
transport.WithStream(),
}
if opts.DialTimeout >= 0 {
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
}
c, err := r.pool.Get(address, dOpts...)
if err != nil {
return errors.InternalServerError("go.micro.client", "connection error: %v", err)
}
seq := atomic.LoadUint64(&r.seq)
atomic.AddUint64(&r.seq, 1)
codec := newRpcCodec(msg, c, cf, "")
rsp := &rpcResponse{
socket: c,
codec: codec,
}
stream := &rpcStream{
id: fmt.Sprintf("%v", seq),
context: ctx,
request: req,
response: rsp,
codec: codec,
closed: make(chan bool),
release: func(err error) { r.pool.Release(c, err) },
sendEOS: false,
}
// close the stream on exiting this function
defer stream.Close()
// wait for error response
ch := make(chan error, 1)
go func() {
defer func() {
if r := recover(); r != nil {
ch <- errors.InternalServerError("go.micro.client", "panic recovered: %v", r)
}
}()
// send request
if err := stream.Send(req.Body()); err != nil {
ch <- err
return
}
// recv request
if err := stream.Recv(resp); err != nil {
ch <- err
return
}
// success
ch <- nil
}()
var grr error
select {
case err := <-ch:
return err
case <-ctx.Done():
grr = errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
}
// set the stream error
if grr != nil {
stream.Lock()
stream.err = grr
stream.Unlock()
return grr
}
return nil
}
func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) {
address := node.Address
msg := &transport.Message{
Header: make(map[string]string),
}
md, ok := metadata.FromContext(ctx)
if ok {
for k, v := range md {
msg.Header[k] = v
}
}
// set timeout in nanoseconds
msg.Header["Timeout"] = fmt.Sprintf("%d", opts.RequestTimeout)
// set the content type for the request
msg.Header["Content-Type"] = req.ContentType()
// set the accept header
msg.Header["Accept"] = req.ContentType()
// set old codecs
cf := setupProtocol(msg, node)
// no codec specified
if cf == nil {
var err error
cf, err = r.newCodec(req.ContentType())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
}
dOpts := []transport.DialOption{
transport.WithStream(),
}
if opts.DialTimeout >= 0 {
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
}
c, err := r.opts.Transport.Dial(address, dOpts...)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
}
// increment the sequence number
seq := atomic.LoadUint64(&r.seq)
atomic.AddUint64(&r.seq, 1)
id := fmt.Sprintf("%v", seq)
// create codec with stream id
codec := newRpcCodec(msg, c, cf, id)
rsp := &rpcResponse{
socket: c,
codec: codec,
}
// set request codec
if r, ok := req.(*rpcRequest); ok {
r.codec = codec
}
stream := &rpcStream{
id: id,
context: ctx,
request: req,
response: rsp,
codec: codec,
// used to close the stream
closed: make(chan bool),
// signal the end of stream,
sendEOS: true,
// release func
release: func(err error) { c.Close() },
}
// wait for error response
ch := make(chan error, 1)
go func() {
// send the first message
ch <- stream.Send(req.Body())
}()
var grr error
select {
case err := <-ch:
grr = err
case <-ctx.Done():
grr = errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
}
if grr != nil {
// set the error
stream.Lock()
stream.err = grr
stream.Unlock()
// close the stream
stream.Close()
return nil, grr
}
return stream, nil
}
func (r *rpcClient) Init(opts ...Option) error {
size := r.opts.PoolSize
ttl := r.opts.PoolTTL
tr := r.opts.Transport
for _, o := range opts {
o(&r.opts)
}
// update pool configuration if the options changed
if size != r.opts.PoolSize || ttl != r.opts.PoolTTL || tr != r.opts.Transport {
// close existing pool
r.pool.Close()
// create new pool
r.pool = pool.NewPool(
pool.Size(r.opts.PoolSize),
pool.TTL(r.opts.PoolTTL),
pool.Transport(r.opts.Transport),
)
}
return nil
}
func (r *rpcClient) Options() Options {
return r.opts
}
// hasProxy checks if we have proxy set in the environment
func (r *rpcClient) hasProxy() bool {
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
return true
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
return true
}
return false
}
// next returns an iterator for the next nodes to call
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
service := request.Service()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
// default name
if prx == "service" {
prx = "go.micro.proxy"
}
service = prx
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
opts.Address = []string{prx}
}
// return remote address
if len(opts.Address) > 0 {
nodes := make([]*registry.Node, len(opts.Address))
for i, address := range opts.Address {
nodes[i] = &registry.Node{
Address: address,
// Set the protocol
Metadata: map[string]string{
"protocol": "mucp",
},
}
}
// crude return method
return func() (*registry.Node, error) {
return nodes[time.Now().Unix()%int64(len(nodes))], nil
}, nil
}
// get next nodes from the selector
next, err := r.opts.Selector.Select(service, opts.SelectOptions...)
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
}
return next, nil
}
func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
next, err := r.next(request, callOpts)
if err != nil {
return err
}
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
// no deadline so we create a new one
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout)
defer cancel()
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
}
// should we noop right here?
select {
case <-ctx.Done():
return errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
default:
}
// make copy of call method
rcall := r.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
rcall = callOpts.CallWrappers[i-1](rcall)
}
// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
// select next node
node, err := next()
service := request.Service()
if err != nil {
if err == selector.ErrNotFound {
return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return errors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error())
}
// make the call
err = rcall(ctx, node, request, response, callOpts)
r.opts.Selector.Mark(service, node, err)
return err
}
// get the retries
retries := callOpts.Retries
// disable retries when using a proxy
if r.hasProxy() {
retries = 0
}
ch := make(chan error, retries+1)
var gerr error
for i := 0; i <= retries; i++ {
go func(i int) {
ch <- call(i)
}(i)
select {
case <-ctx.Done():
return errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
case err := <-ch:
// if the call succeeded lets bail early
if err == nil {
return nil
}
retry, rerr := callOpts.Retry(ctx, request, i, err)
if rerr != nil {
return rerr
}
if !retry {
return err
}
gerr = err
}
}
return gerr
}
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
next, err := r.next(request, callOpts)
if err != nil {
return nil, err
}
// should we noop right here?
select {
case <-ctx.Done():
return nil, errors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
default:
}
call := func(i int) (Stream, error) {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
node, err := next()
service := request.Service()
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
}
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %s", service, err.Error())
}
stream, err := r.stream(ctx, node, request, callOpts)
r.opts.Selector.Mark(service, node, err)
return stream, err
}
type response struct {
stream Stream
err error
}
// get the retries
retries := callOpts.Retries
// disable retries when using a proxy
if r.hasProxy() {
retries = 0
}
ch := make(chan response, retries+1)
var grr error
for i := 0; i <= retries; i++ {
go func(i int) {
s, err := call(i)
ch <- response{s, err}
}(i)
select {
case <-ctx.Done():
return nil, errors.Timeout("go.micro.client", fmt.Sprintf("call timeout: %v", ctx.Err()))
case rsp := <-ch:
// if the call succeeded lets bail early
if rsp.err == nil {
return rsp.stream, nil
}
retry, rerr := callOpts.Retry(ctx, request, i, rsp.err)
if rerr != nil {
return nil, rerr
}
if !retry {
return nil, rsp.err
}
grr = rsp.err
}
}
return nil, grr
}
func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
options := PublishOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
md, ok := metadata.FromContext(ctx)
if !ok {
md = make(map[string]string)
}
id := uuid.New().String()
md["Content-Type"] = msg.ContentType()
md["Micro-Topic"] = msg.Topic()
md["Micro-Id"] = id
// set the topic
topic := msg.Topic()
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
// encode message body
cf, err := r.newCodec(msg.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
var body []byte
// passed in raw data
if d, ok := msg.Payload().(*raw.Frame); ok {
body = d.Data
} else {
// new buffer
b := buf.New(nil)
if err := cf(b).Write(&codec.Message{
Target: topic,
Type: codec.Event,
Header: map[string]string{
"Micro-Id": id,
"Micro-Topic": msg.Topic(),
},
}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the body
body = b.Bytes()
}
if !r.once.Load().(bool) {
if err = r.opts.Broker.Connect(); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
r.once.Store(true)
}
return r.opts.Broker.Publish(topic, &broker.Message{
Header: md,
Body: body,
})
}
func (r *rpcClient) NewMessage(topic string, message interface{}, opts ...MessageOption) Message {
return newMessage(topic, message, r.opts.ContentType, opts...)
}
func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRequest(service, method, request, r.opts.ContentType, reqOpts...)
}
func (r *rpcClient) String() string {
return "mucp"
}