de34f259ba
fixing test failed issue change back error type change registry.ErrNotFound back to selector.ErrNotFound change back error type change registry.ErrNotFound back to selector.ErrNotFound remove the single node tunnel test Fix read yaml config from memory package main import ( "fmt" "github.com/micro/go-micro/config" "github.com/micro/go-micro/config/source/memory" ) var configData = []byte(` --- a: 1234 `) func main() { memorySource := memory.NewSource( memory.WithYAML(configData), ) // Create new config conf := config.NewConfig() // Load file source conf.Load(memorySource) fmt.Println(string(conf.Bytes())) }
574 lines
14 KiB
Go
574 lines
14 KiB
Go
// Package grpc provides a gRPC client
|
|
package grpc
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/micro/go-micro/broker"
|
|
"github.com/micro/go-micro/client"
|
|
"github.com/micro/go-micro/client/selector"
|
|
"github.com/micro/go-micro/codec"
|
|
"github.com/micro/go-micro/errors"
|
|
"github.com/micro/go-micro/metadata"
|
|
"github.com/micro/go-micro/registry"
|
|
"github.com/micro/go-micro/transport"
|
|
|
|
"github.com/micro/go-micro/util/buf"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/encoding"
|
|
gmetadata "google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
type grpcClient struct {
|
|
once sync.Once
|
|
opts client.Options
|
|
pool *pool
|
|
}
|
|
|
|
func init() {
|
|
encoding.RegisterCodec(wrapCodec{jsonCodec{}})
|
|
encoding.RegisterCodec(wrapCodec{protoCodec{}})
|
|
encoding.RegisterCodec(wrapCodec{bytesCodec{}})
|
|
}
|
|
|
|
// secure returns the dial option for whether its a secure or insecure connection
|
|
func (g *grpcClient) secure() grpc.DialOption {
|
|
if g.opts.Context != nil {
|
|
if v := g.opts.Context.Value(tlsAuth{}); v != nil {
|
|
tls := v.(*tls.Config)
|
|
creds := credentials.NewTLS(tls)
|
|
return grpc.WithTransportCredentials(creds)
|
|
}
|
|
}
|
|
return grpc.WithInsecure()
|
|
}
|
|
|
|
func (g *grpcClient) next(request client.Request, opts client.CallOptions) (selector.Next, error) {
|
|
service := request.Service()
|
|
|
|
// get proxy
|
|
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
|
|
service = prx
|
|
}
|
|
|
|
// get proxy address
|
|
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
|
|
opts.Address = []string{prx}
|
|
}
|
|
|
|
// return remote address
|
|
if len(opts.Address) > 0 {
|
|
return func() (*registry.Node, error) {
|
|
return ®istry.Node{
|
|
Address: opts.Address[0],
|
|
}, nil
|
|
}, nil
|
|
}
|
|
|
|
// get next nodes from the selector
|
|
next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
|
|
if err != nil {
|
|
if err == selector.ErrNotFound {
|
|
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
|
|
}
|
|
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
|
|
}
|
|
|
|
return next, nil
|
|
}
|
|
|
|
func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.Request, rsp interface{}, opts client.CallOptions) error {
|
|
address := node.Address
|
|
|
|
header := make(map[string]string)
|
|
if md, ok := metadata.FromContext(ctx); ok {
|
|
for k, v := range md {
|
|
header[k] = v
|
|
}
|
|
}
|
|
|
|
// set timeout in nanoseconds
|
|
header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout)
|
|
// set the content type for the request
|
|
header["x-content-type"] = req.ContentType()
|
|
|
|
md := gmetadata.New(header)
|
|
ctx = gmetadata.NewOutgoingContext(ctx, md)
|
|
|
|
cf, err := g.newGRPCCodec(req.ContentType())
|
|
if err != nil {
|
|
return errors.InternalServerError("go.micro.client", err.Error())
|
|
}
|
|
|
|
maxRecvMsgSize := g.maxRecvMsgSizeValue()
|
|
maxSendMsgSize := g.maxSendMsgSizeValue()
|
|
|
|
var grr error
|
|
|
|
cc, err := g.pool.getConn(address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)),
|
|
grpc.WithTimeout(opts.DialTimeout), g.secure(),
|
|
grpc.WithDefaultCallOptions(
|
|
grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
|
|
grpc.MaxCallSendMsgSize(maxSendMsgSize),
|
|
))
|
|
if err != nil {
|
|
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
|
}
|
|
defer func() {
|
|
// defer execution of release
|
|
g.pool.release(address, cc, grr)
|
|
}()
|
|
|
|
ch := make(chan error, 1)
|
|
|
|
go func() {
|
|
err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpc.CallContentSubtype(cf.Name()))
|
|
ch <- microError(err)
|
|
}()
|
|
|
|
select {
|
|
case err := <-ch:
|
|
grr = err
|
|
case <-ctx.Done():
|
|
grr = ctx.Err()
|
|
}
|
|
|
|
return grr
|
|
}
|
|
|
|
func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client.Request, opts client.CallOptions) (client.Stream, error) {
|
|
address := node.Address
|
|
|
|
header := make(map[string]string)
|
|
if md, ok := metadata.FromContext(ctx); ok {
|
|
for k, v := range md {
|
|
header[k] = v
|
|
}
|
|
}
|
|
|
|
// set timeout in nanoseconds
|
|
header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout)
|
|
// set the content type for the request
|
|
header["x-content-type"] = req.ContentType()
|
|
|
|
md := gmetadata.New(header)
|
|
ctx = gmetadata.NewOutgoingContext(ctx, md)
|
|
|
|
cf, err := g.newGRPCCodec(req.ContentType())
|
|
if err != nil {
|
|
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
|
}
|
|
|
|
var dialCtx context.Context
|
|
var cancel context.CancelFunc
|
|
if opts.DialTimeout >= 0 {
|
|
dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout)
|
|
} else {
|
|
dialCtx, cancel = context.WithCancel(ctx)
|
|
}
|
|
defer cancel()
|
|
|
|
wc := wrapCodec{cf}
|
|
|
|
cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(wc)), g.secure())
|
|
if err != nil {
|
|
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
|
|
}
|
|
|
|
desc := &grpc.StreamDesc{
|
|
StreamName: req.Service() + req.Endpoint(),
|
|
ClientStreams: true,
|
|
ServerStreams: true,
|
|
}
|
|
|
|
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()))
|
|
if err != nil {
|
|
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
|
|
}
|
|
|
|
codec := &grpcCodec{
|
|
s: st,
|
|
c: wc,
|
|
}
|
|
|
|
// set request codec
|
|
if r, ok := req.(*grpcRequest); ok {
|
|
r.codec = codec
|
|
}
|
|
|
|
rsp := &response{
|
|
conn: cc,
|
|
stream: st,
|
|
codec: cf,
|
|
gcodec: codec,
|
|
}
|
|
|
|
return &grpcStream{
|
|
context: ctx,
|
|
request: req,
|
|
response: rsp,
|
|
stream: st,
|
|
conn: cc,
|
|
}, nil
|
|
}
|
|
|
|
func (g *grpcClient) maxRecvMsgSizeValue() int {
|
|
if g.opts.Context == nil {
|
|
return DefaultMaxRecvMsgSize
|
|
}
|
|
v := g.opts.Context.Value(maxRecvMsgSizeKey{})
|
|
if v == nil {
|
|
return DefaultMaxRecvMsgSize
|
|
}
|
|
return v.(int)
|
|
}
|
|
|
|
func (g *grpcClient) maxSendMsgSizeValue() int {
|
|
if g.opts.Context == nil {
|
|
return DefaultMaxSendMsgSize
|
|
}
|
|
v := g.opts.Context.Value(maxSendMsgSizeKey{})
|
|
if v == nil {
|
|
return DefaultMaxSendMsgSize
|
|
}
|
|
return v.(int)
|
|
}
|
|
|
|
func (g *grpcClient) newGRPCCodec(contentType string) (encoding.Codec, error) {
|
|
codecs := make(map[string]encoding.Codec)
|
|
if g.opts.Context != nil {
|
|
if v := g.opts.Context.Value(codecsKey{}); v != nil {
|
|
codecs = v.(map[string]encoding.Codec)
|
|
}
|
|
}
|
|
if c, ok := codecs[contentType]; ok {
|
|
return wrapCodec{c}, nil
|
|
}
|
|
if c, ok := defaultGRPCCodecs[contentType]; ok {
|
|
return wrapCodec{c}, nil
|
|
}
|
|
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
|
|
}
|
|
|
|
func (g *grpcClient) newCodec(contentType string) (codec.NewCodec, error) {
|
|
if c, ok := g.opts.Codecs[contentType]; ok {
|
|
return c, nil
|
|
}
|
|
if cf, ok := defaultRPCCodecs[contentType]; ok {
|
|
return cf, nil
|
|
}
|
|
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
|
|
}
|
|
|
|
func (g *grpcClient) Init(opts ...client.Option) error {
|
|
size := g.opts.PoolSize
|
|
ttl := g.opts.PoolTTL
|
|
|
|
for _, o := range opts {
|
|
o(&g.opts)
|
|
}
|
|
|
|
// update pool configuration if the options changed
|
|
if size != g.opts.PoolSize || ttl != g.opts.PoolTTL {
|
|
g.pool.Lock()
|
|
g.pool.size = g.opts.PoolSize
|
|
g.pool.ttl = int64(g.opts.PoolTTL.Seconds())
|
|
g.pool.Unlock()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (g *grpcClient) Options() client.Options {
|
|
return g.opts
|
|
}
|
|
|
|
func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
|
|
return newGRPCEvent(topic, msg, g.opts.ContentType, opts...)
|
|
}
|
|
|
|
func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
|
|
return newGRPCRequest(service, method, req, g.opts.ContentType, reqOpts...)
|
|
}
|
|
|
|
func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
|
// make a copy of call opts
|
|
callOpts := g.opts.CallOptions
|
|
for _, opt := range opts {
|
|
opt(&callOpts)
|
|
}
|
|
|
|
next, err := g.next(req, callOpts)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// check if we already have a deadline
|
|
d, ok := ctx.Deadline()
|
|
if !ok {
|
|
// no deadline so we create a new one
|
|
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
|
|
} else {
|
|
// got a deadline so no need to setup context
|
|
// but we need to set the timeout we pass along
|
|
opt := client.WithRequestTimeout(time.Until(d))
|
|
opt(&callOpts)
|
|
}
|
|
|
|
// should we noop right here?
|
|
select {
|
|
case <-ctx.Done():
|
|
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
|
|
default:
|
|
}
|
|
|
|
// make copy of call method
|
|
gcall := g.call
|
|
|
|
// wrap the call in reverse
|
|
for i := len(callOpts.CallWrappers); i > 0; i-- {
|
|
gcall = callOpts.CallWrappers[i-1](gcall)
|
|
}
|
|
|
|
// return errors.New("go.micro.client", "request timeout", 408)
|
|
call := func(i int) error {
|
|
// call backoff first. Someone may want an initial start delay
|
|
t, err := callOpts.Backoff(ctx, req, i)
|
|
if err != nil {
|
|
return errors.InternalServerError("go.micro.client", err.Error())
|
|
}
|
|
|
|
// only sleep if greater than 0
|
|
if t.Seconds() > 0 {
|
|
time.Sleep(t)
|
|
}
|
|
|
|
// select next node
|
|
node, err := next()
|
|
service := req.Service()
|
|
if err != nil {
|
|
if err == selector.ErrNotFound {
|
|
return errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
|
|
}
|
|
return errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
|
|
}
|
|
|
|
// make the call
|
|
err = gcall(ctx, node, req, rsp, callOpts)
|
|
g.opts.Selector.Mark(service, node, err)
|
|
return err
|
|
}
|
|
|
|
ch := make(chan error, callOpts.Retries+1)
|
|
var gerr error
|
|
|
|
for i := 0; i <= callOpts.Retries; i++ {
|
|
go func(i int) {
|
|
ch <- call(i)
|
|
}(i)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
|
|
case err := <-ch:
|
|
// if the call succeeded lets bail early
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
retry, rerr := callOpts.Retry(ctx, req, i, err)
|
|
if rerr != nil {
|
|
return rerr
|
|
}
|
|
|
|
if !retry {
|
|
return err
|
|
}
|
|
|
|
gerr = err
|
|
}
|
|
}
|
|
|
|
return gerr
|
|
}
|
|
|
|
func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
|
// make a copy of call opts
|
|
callOpts := g.opts.CallOptions
|
|
for _, opt := range opts {
|
|
opt(&callOpts)
|
|
}
|
|
|
|
next, err := g.next(req, callOpts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// #200 - streams shouldn't have a request timeout set on the context
|
|
|
|
// should we noop right here?
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
|
|
default:
|
|
}
|
|
|
|
call := func(i int) (client.Stream, error) {
|
|
// call backoff first. Someone may want an initial start delay
|
|
t, err := callOpts.Backoff(ctx, req, i)
|
|
if err != nil {
|
|
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
|
}
|
|
|
|
// only sleep if greater than 0
|
|
if t.Seconds() > 0 {
|
|
time.Sleep(t)
|
|
}
|
|
|
|
node, err := next()
|
|
service := req.Service()
|
|
if err != nil {
|
|
if err == selector.ErrNotFound {
|
|
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
|
|
}
|
|
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %s", service, err.Error())
|
|
}
|
|
|
|
stream, err := g.stream(ctx, node, req, callOpts)
|
|
g.opts.Selector.Mark(service, node, err)
|
|
return stream, err
|
|
}
|
|
|
|
type response struct {
|
|
stream client.Stream
|
|
err error
|
|
}
|
|
|
|
ch := make(chan response, callOpts.Retries+1)
|
|
var grr error
|
|
|
|
for i := 0; i <= callOpts.Retries; i++ {
|
|
go func(i int) {
|
|
s, err := call(i)
|
|
ch <- response{s, err}
|
|
}(i)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
|
|
case rsp := <-ch:
|
|
// if the call succeeded lets bail early
|
|
if rsp.err == nil {
|
|
return rsp.stream, nil
|
|
}
|
|
|
|
retry, rerr := callOpts.Retry(ctx, req, i, err)
|
|
if rerr != nil {
|
|
return nil, rerr
|
|
}
|
|
|
|
if !retry {
|
|
return nil, rsp.err
|
|
}
|
|
|
|
grr = rsp.err
|
|
}
|
|
}
|
|
|
|
return nil, grr
|
|
}
|
|
|
|
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
|
md, ok := metadata.FromContext(ctx)
|
|
if !ok {
|
|
md = make(map[string]string)
|
|
}
|
|
md["Content-Type"] = p.ContentType()
|
|
|
|
cf, err := g.newCodec(p.ContentType())
|
|
if err != nil {
|
|
return errors.InternalServerError("go.micro.client", err.Error())
|
|
}
|
|
|
|
b := buf.New(nil)
|
|
|
|
if err := cf(b).Write(&codec.Message{Type: codec.Event}, p.Payload()); err != nil {
|
|
return errors.InternalServerError("go.micro.client", err.Error())
|
|
}
|
|
|
|
g.once.Do(func() {
|
|
g.opts.Broker.Connect()
|
|
})
|
|
|
|
return g.opts.Broker.Publish(p.Topic(), &broker.Message{
|
|
Header: md,
|
|
Body: b.Bytes(),
|
|
})
|
|
}
|
|
|
|
func (g *grpcClient) String() string {
|
|
return "grpc"
|
|
}
|
|
|
|
func newClient(opts ...client.Option) client.Client {
|
|
options := client.Options{
|
|
Codecs: make(map[string]codec.NewCodec),
|
|
CallOptions: client.CallOptions{
|
|
Backoff: client.DefaultBackoff,
|
|
Retry: client.DefaultRetry,
|
|
Retries: client.DefaultRetries,
|
|
RequestTimeout: client.DefaultRequestTimeout,
|
|
DialTimeout: transport.DefaultDialTimeout,
|
|
},
|
|
PoolSize: client.DefaultPoolSize,
|
|
PoolTTL: client.DefaultPoolTTL,
|
|
}
|
|
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
if len(options.ContentType) == 0 {
|
|
options.ContentType = "application/grpc+proto"
|
|
}
|
|
|
|
if options.Broker == nil {
|
|
options.Broker = broker.DefaultBroker
|
|
}
|
|
|
|
if options.Registry == nil {
|
|
options.Registry = registry.DefaultRegistry
|
|
}
|
|
|
|
if options.Selector == nil {
|
|
options.Selector = selector.NewSelector(
|
|
selector.Registry(options.Registry),
|
|
)
|
|
}
|
|
|
|
rc := &grpcClient{
|
|
once: sync.Once{},
|
|
opts: options,
|
|
pool: newPool(options.PoolSize, options.PoolTTL),
|
|
}
|
|
|
|
c := client.Client(rc)
|
|
|
|
// wrap in reverse
|
|
for i := len(options.Wrappers); i > 0; i-- {
|
|
c = options.Wrappers[i-1](c)
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
func NewClient(opts ...client.Option) client.Client {
|
|
return newClient(opts...)
|
|
}
|