From 9d59e4cdb4bfaaa5f32d7545efe900d1de2ba5b4 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 12 Sep 2021 23:24:22 +0300 Subject: [PATCH] add needed files Signed-off-by: Vasiliy Tolstov --- LICENSE | 15 +- codec.go | 26 ++ drpc.go | 722 ++++++++++++++++++++++++++++++++++++++++++++++++ drpc_pool.go | 209 ++++++++++++++ error.go | 44 +++ go.mod | 11 +- go.sum | 167 +++++++++++ message.go | 40 +++ options.go | 77 ++++++ request.go | 87 ++++++ request_test.go | 41 +++ response.go | 45 +++ stream.go | 92 ++++++ 13 files changed, 1563 insertions(+), 13 deletions(-) create mode 100644 codec.go create mode 100644 drpc.go create mode 100644 drpc_pool.go create mode 100644 error.go create mode 100644 go.sum create mode 100644 message.go create mode 100644 options.go create mode 100644 request.go create mode 100644 request_test.go create mode 100644 response.go create mode 100644 stream.go diff --git a/LICENSE b/LICENSE index 261eeb9..6e701f1 100644 --- a/LICENSE +++ b/LICENSE @@ -1,3 +1,4 @@ + Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ @@ -175,18 +176,8 @@ END OF TERMS AND CONDITIONS - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] + Copyright 2015-2020 Asim Aslam. + Copyright 2019-2020 Unistack LLC. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/codec.go b/codec.go new file mode 100644 index 0000000..36f4876 --- /dev/null +++ b/codec.go @@ -0,0 +1,26 @@ +package drpc + +import ( + "github.com/unistack-org/micro/v3/codec" + "storj.io/drpc" +) + +type wrapMicroCodec struct{ codec.Codec } + +func (w *wrapMicroCodec) Marshal(v drpc.Message) ([]byte, error) { + if m, ok := v.(*codec.Frame); ok { + return m.Data, nil + } + return w.Codec.Marshal(v.(interface{})) +} + +func (w *wrapMicroCodec) Unmarshal(d []byte, v drpc.Message) error { + if d == nil || v == nil { + return nil + } + if m, ok := v.(*codec.Frame); ok { + m.Data = d + return nil + } + return w.Codec.Unmarshal(d, v.(interface{})) +} diff --git a/drpc.go b/drpc.go new file mode 100644 index 0000000..3c4f610 --- /dev/null +++ b/drpc.go @@ -0,0 +1,722 @@ +// Package drpc provides a drpc client +package drpc + +import ( + "context" + "fmt" + "net" + "strings" + "sync" + "time" + + "github.com/unistack-org/micro/v3/broker" + "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/errors" + "github.com/unistack-org/micro/v3/metadata" + "storj.io/drpc/drpcconn" + dmetadata "storj.io/drpc/drpcmetadata" +) + +var ( + DefaultContentType = "application/drpc+proto" +) + +type drpcClient struct { + opts client.Options + //pool *pool + init bool + sync.RWMutex +} + +func (d *drpcClient) call(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { + var header map[string]string + + if md, ok := metadata.FromOutgoingContext(ctx); ok { + header = make(map[string]string, len(md)) + for k, v := range md { + header[strings.ToLower(k)] = v + } + } else { + header = make(map[string]string, 2) + } + + // set timeout in nanoseconds + header[metadata.HeaderTimeout] = fmt.Sprintf("%d", opts.RequestTimeout) + // set the content type for the request + // header["x-content-type"] = req.ContentType() + + ctx = dmetadata.AddPairs(ctx, header) + cf, err := d.newCodec(req.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + //maxRecvMsgSize := d.maxRecvMsgSizeValue() + //maxSendMsgSize := d.maxSendMsgSizeValue() + + var grr error + + var dialCtx context.Context + var cancel context.CancelFunc + if opts.DialTimeout >= 0 { + dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout) + } else { + dialCtx, cancel = context.WithCancel(ctx) + } + defer cancel() + + // TODO: handle g.secure + /* + grpcDialOptions := []grpc.DialOption{ + g.secure(addr), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(maxRecvMsgSize), + grpc.MaxCallSendMsgSize(maxSendMsgSize), + ), + } + + if opts := g.getGrpcDialOptions(); opts != nil { + grpcDialOptions = append(grpcDialOptions, opts...) + } + + cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...) + if err != nil { + return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) + } + defer func() { + // defer execution of release + g.pool.release(cc, grr) + }() + */ + ch := make(chan error, 1) + _ = dialCtx + //rc, err := net.DialContext(ctx, "tcp", addr) + rc, err := net.Dial("tcp", addr) + if err != nil { + return err + } + defer rc.Close() + + cc := drpcconn.New(rc) + defer cc.Close() + go func() { + err := cc.Invoke(ctx, methodToDRPC(req.Service(), req.Endpoint()), &wrapMicroCodec{cf}, req.Body(), rsp) + ch <- microError(err) + }() + + select { + case err := <-ch: + grr = err + case <-ctx.Done(): + grr = errors.Timeout("go.micro.client", "%v", ctx.Err()) + } + + return grr +} + +/* +func (g *drpcClient) stream(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { + var header map[string]string + + if md, ok := metadata.FromOutgoingContext(ctx); ok { + header = make(map[string]string, len(md)) + for k, v := range md { + header[k] = v + } + } else { + header = make(map[string]string) + } + + // set timeout in nanoseconds + if opts.StreamTimeout > time.Duration(0) { + header["timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) + } + // set the content type for the request + header["x-content-type"] = req.ContentType() + + md := gmetadata.New(header) + ctx = gmetadata.NewOutgoingContext(ctx, md) + + cf, err := g.newCodec(req.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + var dialCtx context.Context + var cancel context.CancelFunc + if opts.DialTimeout >= 0 { + dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout) + } else { + dialCtx, cancel = context.WithCancel(ctx) + } + defer cancel() + + grpcDialOptions := []grpc.DialOption{ + // g.secure(addr), + } + + if opts := g.getGrpcDialOptions(); opts != nil { + grpcDialOptions = append(grpcDialOptions, opts...) + } + + cc, err := g.pool.getConn(dialCtx, addr, grpcDialOptions...) + if err != nil { + return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) + } + + desc := &grpc.StreamDesc{ + StreamName: req.Service() + req.Endpoint(), + ClientStreams: true, + ServerStreams: true, + } + + grpcCallOptions := []grpc.CallOption{ + grpc.ForceCodec(cf), + grpc.CallContentSubtype(cf.String()), + } + if opts := g.getGrpcCallOptions(); opts != nil { + grpcCallOptions = append(grpcCallOptions, opts...) + } + + // create a new cancelling context + newCtx, cancel := context.WithCancel(ctx) + + st, err := cc.NewStream(newCtx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...) + if err != nil { + // we need to cleanup as we dialled and created a context + // cancel the context + cancel() + // release the connection + g.pool.release(cc, err) + // now return the error + return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) + } + + // set request codec + if r, ok := req.(*grpcRequest); ok { + r.codec = cf + } + + // setup the stream response + stream := &grpcStream{ + ClientStream: st, + context: ctx, + request: req, + response: &response{ + conn: cc, + stream: st, + codec: cf, + }, + conn: cc, + close: func(err error) { + // cancel the context if an error occurred + if err != nil { + cancel() + } + + // defer execution of release + g.pool.release(cc, err) + }, + } + + // set the stream as the response + val := reflect.ValueOf(rsp).Elem() + val.Set(reflect.ValueOf(stream).Elem()) + return nil +} + +func (g *drpcClient) poolMaxStreams() int { + if g.opts.Context == nil { + return DefaultPoolMaxStreams + } + v := g.opts.Context.Value(poolMaxStreams{}) + if v == nil { + return DefaultPoolMaxStreams + } + return v.(int) +} + +func (g *drpcClient) poolMaxIdle() int { + if g.opts.Context == nil { + return DefaultPoolMaxIdle + } + v := g.opts.Context.Value(poolMaxIdle{}) + if v == nil { + return DefaultPoolMaxIdle + } + return v.(int) +} + +func (g *drpcClient) maxRecvMsgSizeValue() int { + if g.opts.Context == nil { + return DefaultMaxRecvMsgSize + } + v := g.opts.Context.Value(maxRecvMsgSizeKey{}) + if v == nil { + return DefaultMaxRecvMsgSize + } + return v.(int) +} + +func (g *drpcClient) maxSendMsgSizeValue() int { + if g.opts.Context == nil { + return DefaultMaxSendMsgSize + } + v := g.opts.Context.Value(maxSendMsgSizeKey{}) + if v == nil { + return DefaultMaxSendMsgSize + } + return v.(int) +} +*/ + +func (d *drpcClient) newCodec(ct string) (codec.Codec, error) { + d.RLock() + defer d.RUnlock() + + if idx := strings.IndexRune(ct, ';'); idx >= 0 { + ct = ct[:idx] + } + + if c, ok := d.opts.Codecs[ct]; ok { + return c, nil + } + return nil, codec.ErrUnknownContentType +} + +func (d *drpcClient) Init(opts ...client.Option) error { + if len(opts) == 0 && d.init { + return nil + } + // size := d.opts.PoolSize + // ttl := d.opts.PoolTTL + + for _, o := range opts { + o(&d.opts) + } + + /* + // update pool configuration if the options changed + if size != d.opts.PoolSize || ttl != d.opts.PoolTTL { + d.pool.Lock() + d.pool.size = d.opts.PoolSize + d.pool.ttl = int64(d.opts.PoolTTL.Seconds()) + d.pool.Unlock() + } + */ + + if err := d.opts.Broker.Init(); err != nil { + return err + } + if err := d.opts.Tracer.Init(); err != nil { + return err + } + if err := d.opts.Router.Init(); err != nil { + return err + } + if err := d.opts.Logger.Init(); err != nil { + return err + } + if err := d.opts.Meter.Init(); err != nil { + return err + } + if err := d.opts.Transport.Init(); err != nil { + return err + } + + d.init = true + + return nil +} + +func (d *drpcClient) Options() client.Options { + return d.opts +} + +func (d *drpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { + return newDRPCEvent(topic, msg, d.opts.ContentType, opts...) +} + +func (d *drpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { + return newDRPCRequest(service, method, req, d.opts.ContentType, reqOpts...) +} + +func (d *drpcClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + if req == nil { + return errors.InternalServerError("go.micro.client", "req is nil") + } else if rsp == nil { + return errors.InternalServerError("go.micro.client", "rsp is nil") + } + // make a copy of call opts + callOpts := d.opts.CallOptions + for _, opt := range opts { + opt(&callOpts) + } + + // check if we already have a deadline + td, ok := ctx.Deadline() + if !ok { + // no deadline so we create a new one + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, callOpts.RequestTimeout) + defer cancel() + } else { + // got a deadline so no need to setup context + // but we need to set the timeout we pass along + opt := client.WithRequestTimeout(time.Until(td)) + opt(&callOpts) + } + + // should we noop right here? + select { + case <-ctx.Done(): + return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + default: + } + + // make copy of call method + dcall := d.call + + // wrap the call in reverse + for i := len(callOpts.CallWrappers); i > 0; i-- { + dcall = callOpts.CallWrappers[i-1](dcall) + } + + // use the router passed as a call option, or fallback to the rpc clients router + if callOpts.Router == nil { + callOpts.Router = d.opts.Router + } + + if callOpts.Selector == nil { + callOpts.Selector = d.opts.Selector + } + + // inject proxy address + // TODO: don't even bother using Lookup/Select in this case + if len(d.opts.Proxy) > 0 { + callOpts.Address = []string{d.opts.Proxy} + } + + // lookup the route to send the reques to + // TODO apply any filtering here + routes, err := d.opts.Lookup(ctx, req, callOpts) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // balance the list of nodes + next, err := callOpts.Selector.Select(routes) + if err != nil { + return err + } + + // return errors.New("go.micro.client", "request timeout", 408) + call := func(i int) error { + // call backoff first. Someone may want an initial start delay + t, err := callOpts.Backoff(ctx, req, i) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // only sleep if greater than 0 + if t.Seconds() > 0 { + time.Sleep(t) + } + + // get the next node + node := next() + + // make the call + err = dcall(ctx, node, req, rsp, callOpts) + + // record the result of the call to inform future routing decisions + if verr := d.opts.Selector.Record(node, err); verr != nil { + return verr + } + + // try and transform the error to a go-micro error + if verr, ok := err.(*errors.Error); ok { + return verr + } + + return err + } + + ch := make(chan error, callOpts.Retries+1) + var derr error + + for i := 0; i <= callOpts.Retries; i++ { + go func(i int) { + ch <- call(i) + }(i) + + select { + case <-ctx.Done(): + return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + case err := <-ch: + // if the call succeeded lets bail early + if err == nil { + return nil + } + + retry, rerr := callOpts.Retry(ctx, req, i, err) + if rerr != nil { + return rerr + } + + if !retry { + return err + } + + derr = err + } + } + + return derr +} + +func (g *drpcClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { + return nil, nil + /* + // make a copy of call opts + callOpts := g.opts.CallOptions + for _, opt := range opts { + opt(&callOpts) + } + + // #200 - streams shouldn't have a request timeout set on the context + + // should we noop right here? + select { + case <-ctx.Done(): + return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + default: + } + + // make a copy of stream + gstream := g.stream + + // wrap the call in reverse + for i := len(callOpts.CallWrappers); i > 0; i-- { + gstream = callOpts.CallWrappers[i-1](gstream) + } + + // use the router passed as a call option, or fallback to the rpc clients router + if callOpts.Router == nil { + callOpts.Router = g.opts.Router + } + + if callOpts.Selector == nil { + callOpts.Selector = g.opts.Selector + } + + // inject proxy address + // TODO: don't even bother using Lookup/Select in this case + if len(g.opts.Proxy) > 0 { + callOpts.Address = []string{g.opts.Proxy} + } + + // lookup the route to send the reques to + // TODO: move to internal lookup func + routes, err := g.opts.Lookup(ctx, req, callOpts) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + // balance the list of nodes + next, err := callOpts.Selector.Select(routes) + if err != nil { + return nil, err + } + + call := func(i int) (client.Stream, error) { + // call backoff first. Someone may want an initial start delay + t, err := callOpts.Backoff(ctx, req, i) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + // only sleep if greater than 0 + if t.Seconds() > 0 { + time.Sleep(t) + } + + // get the next node + node := next() + + // make the call + stream := &grpcStream{} + err = gstream(ctx, node, req, stream, callOpts) + + // record the result of the call to inform future routing decisions + if verr := g.opts.Selector.Record(node, err); verr != nil { + return nil, verr + } + + // try and transform the error to a go-micro error + if verr, ok := err.(*errors.Error); ok { + return nil, verr + } + + if rerr := g.opts.Selector.Record(node, err); rerr != nil { + return nil, rerr + } + + return stream, err + } + + type response struct { + stream client.Stream + err error + } + + ch := make(chan response, callOpts.Retries+1) + var grr error + + for i := 0; i <= callOpts.Retries; i++ { + go func(i int) { + s, err := call(i) + ch <- response{s, err} + }(i) + + select { + case <-ctx.Done(): + return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + case rsp := <-ch: + // if the call succeeded lets bail early + if rsp.err == nil { + return rsp.stream, nil + } + + retry, rerr := callOpts.Retry(ctx, req, i, grr) + if rerr != nil { + return nil, rerr + } + + if !retry { + return nil, rsp.err + } + + grr = rsp.err + } + } + + return nil, grr + */ +} + +func (c *drpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + var body []byte + + options := client.NewPublishOptions(opts...) + + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + md = metadata.New(2) + } + md[metadata.HeaderContentType] = p.ContentType() + md[metadata.HeaderTopic] = p.Topic() + + // passed in raw data + if d, ok := p.Payload().(*codec.Frame); ok { + body = d.Data + } else { + // use codec for payload + cf, err := c.newCodec(p.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + // set the body + b, err := cf.Marshal(p.Payload()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + body = b + } + + topic := p.Topic() + + // get the exchange + if len(options.Exchange) > 0 { + topic = options.Exchange + } + + return c.opts.Broker.Publish(metadata.NewOutgoingContext(ctx, md), topic, &broker.Message{ + Header: md, + Body: body, + }, + broker.PublishContext(options.Context), + broker.PublishBodyOnly(options.BodyOnly), + ) +} + +func (d *drpcClient) String() string { + return "drpc" +} + +func (d *drpcClient) Name() string { + return d.opts.Name +} + +/* +func (g *drpcClient) getGrpcDialOptions() []grpc.DialOption { + if g.opts.CallOptions.Context == nil { + return nil + } + + v := g.opts.CallOptions.Context.Value(grpcDialOptions{}) + + if v == nil { + return nil + } + + opts, ok := v.([]grpc.DialOption) + + if !ok { + return nil + } + + return opts +} + +func (g *drpcClient) getGrpcCallOptions() []grpc.CallOption { + if g.opts.CallOptions.Context == nil { + return nil + } + + v := g.opts.CallOptions.Context.Value(grpcCallOptions{}) + + if v == nil { + return nil + } + + opts, ok := v.([]grpc.CallOption) + + if !ok { + return nil + } + + return opts +} + +*/ + +func NewClient(opts ...client.Option) client.Client { + options := client.NewOptions(opts...) + // default content type for grpc + options.ContentType = DefaultContentType + + rc := &drpcClient{ + opts: options, + } + + c := client.Client(rc) + + // wrap in reverse + for i := len(options.Wrappers); i > 0; i-- { + c = options.Wrappers[i-1](c) + } + + return c +} diff --git a/drpc_pool.go b/drpc_pool.go new file mode 100644 index 0000000..9f12bb0 --- /dev/null +++ b/drpc_pool.go @@ -0,0 +1,209 @@ +// +build ignore + +package grpc + +import ( + "context" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" +) + +type pool struct { + conns map[string]*streamsPool + size int + ttl int64 + maxStreams int + maxIdle int + sync.Mutex +} + +type streamsPool struct { + // head of list + head *poolConn + // busy conns list + busy *poolConn + // the siza of list + count int + // idle conn + idle int +} + +type poolConn struct { + err error + *grpc.ClientConn + next *poolConn + pool *pool + sp *streamsPool + pre *poolConn + addr string + streams int + created int64 + in bool +} + +func newPool(size int, ttl time.Duration, idle int, ms int) *pool { + if ms <= 0 { + ms = 1 + } + if idle < 0 { + idle = 0 + } + return &pool{ + size: size, + ttl: int64(ttl.Seconds()), + maxStreams: ms, + maxIdle: idle, + conns: make(map[string]*streamsPool), + } +} + +func (p *pool) getConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*poolConn, error) { + now := time.Now().Unix() + p.Lock() + sp, ok := p.conns[addr] + if !ok { + sp = &streamsPool{head: &poolConn{}, busy: &poolConn{}, count: 0, idle: 0} + p.conns[addr] = sp + } + // while we have conns check streams and then return one + // otherwise we'll create a new conn + conn := sp.head.next + for conn != nil { + // check conn state + // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md + switch conn.GetState() { + case connectivity.Connecting: + conn = conn.next + continue + case connectivity.Shutdown: + next := conn.next + if conn.streams == 0 { + removeConn(conn) + sp.idle-- + } + conn = next + continue + case connectivity.TransientFailure: + next := conn.next + if conn.streams == 0 { + removeConn(conn) + conn.ClientConn.Close() + sp.idle-- + } + conn = next + continue + case connectivity.Ready: + case connectivity.Idle: + } + // a old conn + if now-conn.created > p.ttl { + next := conn.next + if conn.streams == 0 { + removeConn(conn) + conn.ClientConn.Close() + sp.idle-- + } + conn = next + continue + } + // a busy conn + if conn.streams >= p.maxStreams { + next := conn.next + removeConn(conn) + addConnAfter(conn, sp.busy) + conn = next + continue + } + // a idle conn + if conn.streams == 0 { + sp.idle-- + } + // a good conn + conn.streams++ + p.Unlock() + return conn, nil + } + p.Unlock() + + // create new conn + cc, err := grpc.DialContext(ctx, addr, opts...) + if err != nil { + return nil, err + } + conn = &poolConn{ClientConn: cc, err: nil, addr: addr, pool: p, sp: sp, streams: 1, created: time.Now().Unix(), pre: nil, next: nil, in: false} + + // add conn to streams pool + p.Lock() + if sp.count < p.size { + addConnAfter(conn, sp.head) + } + p.Unlock() + + return conn, nil +} + +func (p *pool) release(conn *poolConn, err error) { + p.Lock() + p, sp, created := conn.pool, conn.sp, conn.created + // try to add conn + if !conn.in && sp.count < p.size { + addConnAfter(conn, sp.head) + } + if !conn.in { + p.Unlock() + conn.ClientConn.Close() + return + } + // a busy conn + if conn.streams >= p.maxStreams { + removeConn(conn) + addConnAfter(conn, sp.head) + } + conn.streams-- + // if streams == 0, we can do something + if conn.streams == 0 { + // 1. it has errored + // 2. too many idle conn or + // 3. conn is too old + now := time.Now().Unix() + if err != nil || sp.idle >= p.maxIdle || now-created > p.ttl { + removeConn(conn) + p.Unlock() + conn.ClientConn.Close() + return + } + sp.idle++ + } + p.Unlock() +} + +func (conn *poolConn) Close() { + conn.pool.release(conn, conn.err) +} + +func removeConn(conn *poolConn) { + if conn.pre != nil { + conn.pre.next = conn.next + } + if conn.next != nil { + conn.next.pre = conn.pre + } + conn.pre = nil + conn.next = nil + conn.in = false + conn.sp.count-- +} + +func addConnAfter(conn *poolConn, after *poolConn) { + conn.next = after.next + conn.pre = after + if after.next != nil { + after.next.pre = conn + } + after.next = conn + conn.in = true + conn.sp.count++ +} diff --git a/error.go b/error.go new file mode 100644 index 0000000..d61c947 --- /dev/null +++ b/error.go @@ -0,0 +1,44 @@ +package drpc + +import ( + "github.com/unistack-org/micro/v3/errors" + "google.golang.org/grpc/status" +) + +func microError(err error) error { + // no error + + if err == nil { + return nil + } + + if verr, ok := err.(*errors.Error); ok { + return verr + } + + // grpc error + s, ok := status.FromError(err) + if !ok { + return err + } + + // return first error from details + if details := s.Details(); len(details) > 0 { + if verr, ok := details[0].(error); ok { + return microError(verr) + } + } + + // try to decode micro *errors.Error + if e := errors.Parse(s.Message()); e.Code > 0 { + return e // actually a micro error + } + + // fallback + return &errors.Error{ + Id: "go.micro.client", + Code: int32(s.Code()), + Detail: s.Message(), + Status: s.Code().String(), + } +} diff --git a/go.mod b/go.mod index 683b030..200a5fa 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,12 @@ -module github.com/unistack-org/micro-client-drpc/v3 +module github.com/unistack-org/micro-client-grpc/v3 go 1.16 + +require ( + github.com/unistack-org/micro/v3 v3.5.2 + golang.org/x/net v0.0.0-20210716203947-853a461950ff // indirect + golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect + google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f // indirect + google.golang.org/grpc v1.39.0 + storj.io/drpc v0.0.24 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d90a76d --- /dev/null +++ b/go.sum @@ -0,0 +1,167 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/silas/dag v0.0.0-20210121180416-41cf55125c34/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/unistack-org/micro/v3 v3.5.2 h1:8b9Mk4FLWRLp8SduBh5Xs6g/3xJ+ZIBOnH82eHuLWnw= +github.com/unistack-org/micro/v3 v3.5.2/go.mod h1:1ZkwpEqpiHiVhM2hiF9DamtpsF04oFybFhEQ4zEMcro= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= +github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g= +github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210510120150-4163338589ed h1:p9UgmWI9wKpfYmgaV/IZKGdXc5qEK45tDwwwDyjS26I= +golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20210716203947-853a461950ff h1:j2EK/QoxYNBsXI4R7fQkkRUk8y6wnOBI+6hgPdP/6Ds= +golang.org/x/net v0.0.0-20210716203947-853a461950ff/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f h1:YORWxaStkWBnWgELOHTmDrqNlFXuVGEbhwbB5iK94bQ= +google.golang.org/genproto v0.0.0-20210722135532-667f2b7c528f/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0= +google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.39.0 h1:Klz8I9kdtkIN6EpHHUOMLCYhTn/2WAe5a0s1hcBkdTI= +google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +storj.io/drpc v0.0.23 h1:MHVOMVoBbQkPuL17GerlGTA9PLwE1HF6zAOYFvWwYG4= +storj.io/drpc v0.0.23/go.mod h1:OSJH7wvH3yKlhnMHwblKJioaaeyI6X8xbXT1SG9woe8= +storj.io/drpc v0.0.24 h1:9WgD+q8WWDIz7XCMib+U6xX9SjblMqOz9R6sU7rJnS8= +storj.io/drpc v0.0.24/go.mod h1:ofQUDPQbbIymRDKE0tms48k8bLP5Y+dsI9CbXGv3gko= diff --git a/message.go b/message.go new file mode 100644 index 0000000..6981d09 --- /dev/null +++ b/message.go @@ -0,0 +1,40 @@ +package drpc + +import ( + "github.com/unistack-org/micro/v3/client" +) + +type drpcEvent struct { + payload interface{} + topic string + contentType string +} + +func newDRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message { + var options client.MessageOptions + for _, o := range opts { + o(&options) + } + + if len(options.ContentType) > 0 { + contentType = options.ContentType + } + + return &drpcEvent{ + payload: payload, + topic: topic, + contentType: contentType, + } +} + +func (d *drpcEvent) ContentType() string { + return d.contentType +} + +func (d *drpcEvent) Topic() string { + return d.topic +} + +func (d *drpcEvent) Payload() interface{} { + return d.payload +} diff --git a/options.go b/options.go new file mode 100644 index 0000000..18966e6 --- /dev/null +++ b/options.go @@ -0,0 +1,77 @@ +package drpc + +import ( + "context" + + "github.com/unistack-org/micro/v3/client" +) + +var ( + // DefaultPoolMaxStreams maximum streams on a connectioin + // (20) + DefaultPoolMaxStreams = 20 + + // DefaultPoolMaxIdle maximum idle conns of a pool + // (50) + DefaultPoolMaxIdle = 50 + + // DefaultMaxRecvMsgSize maximum message that client can receive + // (4 MB). + DefaultMaxRecvMsgSize = 1024 * 1024 * 4 + + // DefaultMaxSendMsgSize maximum message that client can send + // (4 MB). + DefaultMaxSendMsgSize = 1024 * 1024 * 4 +) + +type poolMaxStreams struct{} + +// maximum streams on a connectioin +func PoolMaxStreams(n int) client.Option { + return func(o *client.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, poolMaxStreams{}, n) + } +} + +type poolMaxIdle struct{} + +// maximum idle conns of a pool +func PoolMaxIdle(d int) client.Option { + return func(o *client.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, poolMaxIdle{}, d) + } +} + +type maxRecvMsgSizeKey struct{} + +// +// MaxRecvMsgSize set the maximum size of message that client can receive. +// +func MaxRecvMsgSize(s int) client.Option { + return func(o *client.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, maxRecvMsgSizeKey{}, s) + } +} + +type maxSendMsgSizeKey struct{} + +// +// MaxSendMsgSize set the maximum size of message that client can send. +// +func MaxSendMsgSize(s int) client.Option { + return func(o *client.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, maxSendMsgSizeKey{}, s) + } +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..a99c84e --- /dev/null +++ b/request.go @@ -0,0 +1,87 @@ +package drpc + +import ( + "fmt" + "strings" + + "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/codec" +) + +type drpcRequest struct { + request interface{} + codec codec.Codec + service string + method string + contentType string + opts client.RequestOptions +} + +// service Struct.Method /service.Struct/Method +func methodToDRPC(service, method string) string { + // no method or already grpc method + if len(method) == 0 || method[0] == '/' { + return method + } + + // assume method is Foo.Bar + mParts := strings.Split(method, ".") + if len(mParts) != 2 { + return method + } + + if len(service) == 0 { + return fmt.Sprintf("/%s/%s", mParts[0], mParts[1]) + } + + // return /pkg.Foo/Bar + return fmt.Sprintf("/%s.%s/%s", strings.Title(service), mParts[0], mParts[1]) +} + +func newDRPCRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request { + var opts client.RequestOptions + for _, o := range reqOpts { + o(&opts) + } + + // set the content-type specified + if len(opts.ContentType) > 0 { + contentType = opts.ContentType + } + + return &drpcRequest{ + service: service, + method: method, + request: request, + contentType: contentType, + opts: opts, + } +} + +func (d *drpcRequest) ContentType() string { + return d.contentType +} + +func (d *drpcRequest) Service() string { + return d.service +} + +func (d *drpcRequest) Method() string { + return d.method +} + +func (d *drpcRequest) Endpoint() string { + return d.method +} + +func (d *drpcRequest) Codec() codec.Codec { + return d.codec +} + +func (d *drpcRequest) Body() interface{} { + return d.request +} + +func (d *drpcRequest) Stream() bool { + return d.opts.Stream +} diff --git a/request_test.go b/request_test.go new file mode 100644 index 0000000..9369d05 --- /dev/null +++ b/request_test.go @@ -0,0 +1,41 @@ +package drpc + +import ( + "testing" +) + +func TestMethodToDRPC(t *testing.T) { + testData := []struct { + service string + method string + expect string + }{ + { + "helloworld", + "Greeter.SayHello", + "/Helloworld.Greeter/SayHello", + }, + { + "helloworld", + "/Helloworld.Greeter/SayHello", + "/Helloworld.Greeter/SayHello", + }, + { + "", + "/Helloworld.Greeter/SayHello", + "/Helloworld.Greeter/SayHello", + }, + { + "", + "Greeter.SayHello", + "/Greeter/SayHello", + }, + } + + for _, d := range testData { + method := methodToDRPC(d.service, d.method) + if method != d.expect { + t.Fatalf("expected %s got %s", d.expect, method) + } + } +} diff --git a/response.go b/response.go new file mode 100644 index 0000000..fddffba --- /dev/null +++ b/response.go @@ -0,0 +1,45 @@ +package drpc + +import ( + "io" + + "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/metadata" +) + +type response struct { + //conn *poolConn + stream io.ReadWriteCloser + codec codec.Codec +} + +// Read the response +func (r *response) Codec() codec.Codec { + return r.codec +} + +// read the header +func (r *response) Header() metadata.Metadata { + return nil + /* + meta, err := r.stream.Header() + if err != nil { + return metadata.New(0) + } + md := metadata.New(len(meta)) + for k, v := range meta { + md.Set(k, strings.Join(v, ",")) + } + return md + */ +} + +// Read the undecoded response +func (r *response) Read() ([]byte, error) { + f := &codec.Frame{} + //if err := r.codec.ReadBody(&wrapStream{r.stream}, f); err != nil { + if err := r.codec.ReadBody(r.stream, f); err != nil { + return nil, err + } + return f.Data, nil +} diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..ef5211e --- /dev/null +++ b/stream.go @@ -0,0 +1,92 @@ +package drpc + +import ( + "context" + "io" + "sync" + + "github.com/unistack-org/micro/v3/client" + "google.golang.org/grpc" +) + +// Implements the streamer interface +type drpcStream struct { + grpc.ClientStream + context context.Context + err error + request client.Request + response client.Response + close func(err error) + //conn *poolConn + sync.RWMutex + closed bool +} + +func (d *drpcStream) Context() context.Context { + return d.context +} + +func (d *drpcStream) Request() client.Request { + return d.request +} + +func (d *drpcStream) Response() client.Response { + return d.response +} + +func (d *drpcStream) Send(msg interface{}) error { + if err := d.ClientStream.SendMsg(msg); err != nil { + d.setError(err) + return err + } + return nil +} + +func (d *drpcStream) Recv(msg interface{}) (err error) { + defer d.setError(err) + + if err = d.ClientStream.RecvMsg(msg); err != nil { + // #202 - inconsistent gRPC stream behavior + // the only way to tell if the stream is done is when we get a EOF on the Recv + // here we should close the underlying gRPC ClientConn + closeErr := d.Close() + if err == io.EOF && closeErr != nil { + err = closeErr + } + + return err + } + + return +} + +func (d *drpcStream) Error() error { + d.RLock() + defer d.RUnlock() + return d.err +} + +func (d *drpcStream) setError(e error) { + d.Lock() + d.err = e + d.Unlock() +} + +// Close the gRPC send stream +// #202 - inconsistent gRPC stream behavior +// The underlying gRPC stream should not be closed here since the +// stream should still be able to receive after this function call +// TODO: should the conn be closed in another way? +func (d *drpcStream) Close() error { + d.Lock() + defer d.Unlock() + + if d.closed { + return nil + } + + // close the connection + d.closed = true + d.close(d.err) + return d.ClientStream.CloseSend() +}