From d18bf3dab9b21732b833e4f4cbb65d8c588091a5 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 3 Nov 2020 01:57:11 +0300 Subject: [PATCH] many fixes for lint and context.Context usage Signed-off-by: Vasiliy Tolstov --- api/handler/rpc/rpc.go | 4 +- api/router/options.go | 14 ++ api/router/registry/registry.go | 10 +- api/router/static/static.go | 2 +- broker/options.go | 2 +- cache/cache.go | 13 +- client/client.go | 1 + client/noop.go | 2 +- codec/codec.go | 6 +- codec/json/json.go | 60 ------- codec/json/marshaler.go | 45 ----- codec/jsonrpc/client.go | 97 ----------- codec/jsonrpc/jsonrpc.go | 88 ---------- codec/jsonrpc/server.go | 84 --------- codec/proto/marshaler.go | 47 ----- codec/proto/message.go | 37 ---- codec/proto/proto.go | 64 ------- codec/protorpc/envelope.pb.go | 238 -------------------------- codec/protorpc/envelope.pb.micro.go | 21 --- codec/protorpc/envelope.proto | 14 -- codec/protorpc/netstring.go | 36 ---- codec/protorpc/protorpc.go | 186 -------------------- codec/text/text.go | 80 --------- events/events.go | 9 +- go.mod | 6 +- go.sum | 13 ++ network/transport/noop.go | 6 +- network/transport/options.go | 10 +- network/transport/transport.go | 5 +- network/tunnel/broker/broker.go | 8 +- network/tunnel/transport/transport.go | 14 +- network/tunnel/tunnel.go | 9 +- options.go | 1 + registry/noop.go | 10 +- registry/registry.go | 10 +- resolver/registry/registry.go | 4 +- server/noop.go | 8 +- server/registry.go | 8 +- sync/sync.go | 1 + tracer/noop.go | 22 ++- tracer/options.go | 21 ++- tracer/trace.go | 3 +- util/pool/default.go | 5 +- util/pool/pool.go | 3 +- 44 files changed, 152 insertions(+), 1175 deletions(-) delete mode 100644 codec/json/json.go delete mode 100644 codec/json/marshaler.go delete mode 100644 codec/jsonrpc/client.go delete mode 100644 codec/jsonrpc/jsonrpc.go delete mode 100644 codec/jsonrpc/server.go delete mode 100644 codec/proto/marshaler.go delete mode 100644 codec/proto/message.go delete mode 100644 codec/proto/proto.go delete mode 100644 codec/protorpc/envelope.pb.go delete mode 100644 codec/protorpc/envelope.pb.micro.go delete mode 100644 codec/protorpc/envelope.proto delete mode 100644 codec/protorpc/netstring.go delete mode 100644 codec/protorpc/protorpc.go delete mode 100644 codec/text/text.go diff --git a/api/handler/rpc/rpc.go b/api/handler/rpc/rpc.go index c66f715c..07950363 100644 --- a/api/handler/rpc/rpc.go +++ b/api/handler/rpc/rpc.go @@ -10,13 +10,13 @@ import ( jsonpatch "github.com/evanphx/json-patch/v5" "github.com/oxtoacart/bpool" + jsonrpc "github.com/unistack-org/micro-codec-jsonrpc" + protorpc "github.com/unistack-org/micro-codec-protorpc" "github.com/unistack-org/micro/v3/api" "github.com/unistack-org/micro/v3/api/handler" "github.com/unistack-org/micro/v3/api/internal/proto" "github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/codec" - "github.com/unistack-org/micro/v3/codec/jsonrpc" - "github.com/unistack-org/micro/v3/codec/protorpc" "github.com/unistack-org/micro/v3/errors" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/metadata" diff --git a/api/router/options.go b/api/router/options.go index a0794816..70cd566e 100644 --- a/api/router/options.go +++ b/api/router/options.go @@ -1,6 +1,8 @@ package router import ( + "context" + "github.com/unistack-org/micro/v3/api/resolver" "github.com/unistack-org/micro/v3/api/resolver/vpath" "github.com/unistack-org/micro/v3/registry" @@ -10,12 +12,14 @@ type Options struct { Handler string Registry registry.Registry Resolver resolver.Resolver + Context context.Context } type Option func(o *Options) func NewOptions(opts ...Option) Options { options := Options{ + Context: context.Background(), Handler: "meta", } @@ -32,18 +36,28 @@ func NewOptions(opts ...Option) Options { return options } +// WithContext sets the context +func WithContext(ctx context.Context) Option { + return func(o *Options) { + o.Context = ctx + } +} + +// WithHandler sets the handler func WithHandler(h string) Option { return func(o *Options) { o.Handler = h } } +// WithRegistry sets the registry func WithRegistry(r registry.Registry) Option { return func(o *Options) { o.Registry = r } } +// WithResolver sets the resolver func WithResolver(r resolver.Resolver) Option { return func(o *Options) { o.Resolver = r diff --git a/api/router/registry/registry.go b/api/router/registry/registry.go index 2293317b..e6844464 100644 --- a/api/router/registry/registry.go +++ b/api/router/registry/registry.go @@ -50,7 +50,7 @@ func (r *registryRouter) refresh() { var attempts int for { - services, err := r.opts.Registry.ListServices() + services, err := r.opts.Registry.ListServices(r.opts.Context) if err != nil { attempts++ if logger.V(logger.ErrorLevel) { @@ -64,7 +64,7 @@ func (r *registryRouter) refresh() { // for each service, get service and store endpoints for _, s := range services { - service, err := r.opts.Registry.GetService(s.Name) + service, err := r.opts.Registry.GetService(r.opts.Context, s.Name) if err != nil { if logger.V(logger.ErrorLevel) { logger.Errorf("unable to get service: %v", err) @@ -92,7 +92,7 @@ func (r *registryRouter) process(res *registry.Result) { } // get entry from cache - service, err := r.opts.Registry.GetService(res.Service.Name) + service, err := r.opts.Registry.GetService(r.opts.Context, res.Service.Name) if err != nil { if logger.V(logger.ErrorLevel) { logger.Errorf("unable to get %v service: %v", res.Service.Name, err) @@ -230,7 +230,7 @@ func (r *registryRouter) watch() { } // watch for changes - w, err := r.opts.Registry.Watch() + w, err := r.opts.Registry.Watch(r.opts.Context) if err != nil { attempts++ if logger.V(logger.ErrorLevel) { @@ -432,7 +432,7 @@ func (r *registryRouter) Route(req *http.Request) (*api.Service, error) { name := rp.Name // get service - services, err := r.opts.Registry.GetService(name, registry.GetDomain(rp.Domain)) + services, err := r.opts.Registry.GetService(r.opts.Context, name, registry.GetDomain(rp.Domain)) if err != nil { return nil, err } diff --git a/api/router/static/static.go b/api/router/static/static.go index 881157f9..8969ab59 100644 --- a/api/router/static/static.go +++ b/api/router/static/static.go @@ -177,7 +177,7 @@ func (r *staticRouter) Endpoint(req *http.Request) (*api.Service, error) { } epf := strings.Split(ep.apiep.Name, ".") - services, err := r.opts.Registry.GetService(epf[0]) + services, err := r.opts.Registry.GetService(r.opts.Context, epf[0]) if err != nil { return nil, err } diff --git a/broker/options.go b/broker/options.go index cef47c23..22751108 100644 --- a/broker/options.go +++ b/broker/options.go @@ -190,7 +190,7 @@ func Secure(b bool) Option { } } -// Specify TLS Config +// TLSConfig sets the TLS Config func TLSConfig(t *tls.Config) Option { return func(o *Options) { o.TLSConfig = t diff --git a/cache/cache.go b/cache/cache.go index 79922926..be75b839 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -1,24 +1,29 @@ // Package cache is a caching interface package cache +import "context" + // Cache is an interface for caching type Cache interface { // Initialise options Init(...Option) error // Get a value - Get(key string) (interface{}, error) + Get(ctx context.Context, key string) (interface{}, error) // Set a value - Set(key string, val interface{}) error + Set(ctx context.Context, key string, val interface{}) error // Delete a value - Delete(key string) error + Delete(ctx context.Context, key string) error // Name of the implementation String() string } +// Options struct type Options struct { - Nodes []string + Nodes []string + Context context.Context } +// Option func type Option func(o *Options) // Nodes sets the nodes for the cache diff --git a/client/client.go b/client/client.go index 1615cec8..b74ed654 100644 --- a/client/client.go +++ b/client/client.go @@ -9,6 +9,7 @@ import ( ) var ( + // DefaultClient is the global default client DefaultClient Client = NewClient() ) diff --git a/client/noop.go b/client/noop.go index 351094c6..cee3c396 100644 --- a/client/noop.go +++ b/client/noop.go @@ -4,9 +4,9 @@ import ( "context" raw "github.com/unistack-org/micro-codec-bytes" + json "github.com/unistack-org/micro-codec-json" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/codec" - "github.com/unistack-org/micro/v3/codec/json" "github.com/unistack-org/micro/v3/errors" "github.com/unistack-org/micro/v3/metadata" ) diff --git a/codec/codec.go b/codec/codec.go index 107bdb35..e1685f9e 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -14,12 +14,14 @@ const ( ) var ( + // ErrInvalidMessage returned when invalid messge passed to codec ErrInvalidMessage = errors.New("invalid message") ) +// MessageType type MessageType int -// Takes in a connection/buffer and returns a new Codec +// NewCodec takes in a connection/buffer and returns a new Codec type NewCodec func(io.ReadWriteCloser) Codec // Codec encodes/decodes various types of messages used within go-micro. @@ -34,11 +36,13 @@ type Codec interface { String() string } +// Reader interface type Reader interface { ReadHeader(*Message, MessageType) error ReadBody(interface{}) error } +// Writer interface type Writer interface { Write(*Message, interface{}) error } diff --git a/codec/json/json.go b/codec/json/json.go deleted file mode 100644 index 94f91b2f..00000000 --- a/codec/json/json.go +++ /dev/null @@ -1,60 +0,0 @@ -// Package json provides a json codec -package json - -import ( - "encoding/json" - "io" - "io/ioutil" - - "github.com/unistack-org/micro/v3/codec" - jsonpb "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" -) - -type Codec struct { - Conn io.ReadWriteCloser - Encoder *json.Encoder - Decoder *json.Decoder -} - -func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { - return nil -} - -func (c *Codec) ReadBody(b interface{}) error { - if b == nil { - return nil - } - switch m := b.(type) { - case proto.Message: - buf, err := ioutil.ReadAll(c.Conn) - if err != nil { - return err - } - return jsonpb.Unmarshal(buf, m) - } - return c.Decoder.Decode(b) -} - -func (c *Codec) Write(m *codec.Message, b interface{}) error { - if b == nil { - return nil - } - return c.Encoder.Encode(b) -} - -func (c *Codec) Close() error { - return c.Conn.Close() -} - -func (c *Codec) String() string { - return "json" -} - -func NewCodec(c io.ReadWriteCloser) codec.Codec { - return &Codec{ - Conn: c, - Decoder: json.NewDecoder(c), - Encoder: json.NewEncoder(c), - } -} diff --git a/codec/json/marshaler.go b/codec/json/marshaler.go deleted file mode 100644 index 85930742..00000000 --- a/codec/json/marshaler.go +++ /dev/null @@ -1,45 +0,0 @@ -package json - -import ( - "bytes" - "encoding/json" - - oldjsonpb "github.com/golang/protobuf/jsonpb" - oldproto "github.com/golang/protobuf/proto" - "github.com/oxtoacart/bpool" - jsonpb "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" -) - -var jsonpbMarshaler = &jsonpb.MarshalOptions{} -var oldjsonpbMarshaler = &oldjsonpb.Marshaler{} - -// create buffer pool with 16 instances each preallocated with 256 bytes -var bufferPool = bpool.NewSizedBufferPool(16, 256) - -type Marshaler struct{} - -func (j Marshaler) Marshal(v interface{}) ([]byte, error) { - switch m := v.(type) { - case proto.Message: - return jsonpbMarshaler.Marshal(m) - case oldproto.Message: - buf, err := oldjsonpbMarshaler.MarshalToString(m) - return []byte(buf), err - } - return json.Marshal(v) -} - -func (j Marshaler) Unmarshal(d []byte, v interface{}) error { - switch m := v.(type) { - case proto.Message: - return jsonpb.Unmarshal(d, m) - case oldproto.Message: - return oldjsonpb.Unmarshal(bytes.NewReader(d), m) - } - return json.Unmarshal(d, v) -} - -func (j Marshaler) String() string { - return "json" -} diff --git a/codec/jsonrpc/client.go b/codec/jsonrpc/client.go deleted file mode 100644 index 4bf56ff0..00000000 --- a/codec/jsonrpc/client.go +++ /dev/null @@ -1,97 +0,0 @@ -package jsonrpc - -import ( - "encoding/json" - "fmt" - "io" - "sync" - - "github.com/unistack-org/micro/v3/codec" -) - -type clientCodec struct { - dec *json.Decoder // for reading JSON values - enc *json.Encoder // for writing JSON values - c io.Closer - - // temporary work space - req clientRequest - resp clientResponse - - sync.Mutex - pending map[interface{}]string -} - -type clientRequest struct { - Method string `json:"method"` - Params [1]interface{} `json:"params"` - ID interface{} `json:"id"` -} - -type clientResponse struct { - ID interface{} `json:"id"` - Result *json.RawMessage `json:"result"` - Error interface{} `json:"error"` -} - -func newClientCodec(conn io.ReadWriteCloser) *clientCodec { - return &clientCodec{ - dec: json.NewDecoder(conn), - enc: json.NewEncoder(conn), - c: conn, - pending: make(map[interface{}]string), - } -} - -func (c *clientCodec) Write(m *codec.Message, b interface{}) error { - c.Lock() - c.pending[m.Id] = m.Method - c.Unlock() - c.req.Method = m.Method - c.req.Params[0] = b - c.req.ID = m.Id - return c.enc.Encode(&c.req) -} - -func (r *clientResponse) reset() { - r.ID = 0 - r.Result = nil - r.Error = nil -} - -func (c *clientCodec) ReadHeader(m *codec.Message) error { - c.resp.reset() - if err := c.dec.Decode(&c.resp); err != nil { - return err - } - - c.Lock() - m.Method = c.pending[c.resp.ID] - delete(c.pending, c.resp.ID) - c.Unlock() - - m.Error = "" - m.Id = fmt.Sprintf("%v", c.resp.ID) - if c.resp.Error != nil { - x, ok := c.resp.Error.(string) - if !ok { - return fmt.Errorf("invalid error %v", c.resp.Error) - } - if x == "" { - x = "unspecified error" - } - m.Error = x - } - return nil -} - -func (c *clientCodec) ReadBody(x interface{}) error { - if x == nil || c.resp.Result == nil { - return nil - } - return json.Unmarshal(*c.resp.Result, x) -} - -func (c *clientCodec) Close() error { - return c.c.Close() -} diff --git a/codec/jsonrpc/jsonrpc.go b/codec/jsonrpc/jsonrpc.go deleted file mode 100644 index 1311da25..00000000 --- a/codec/jsonrpc/jsonrpc.go +++ /dev/null @@ -1,88 +0,0 @@ -// Package jsonrpc provides a json-rpc 1.0 codec -package jsonrpc - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - - "github.com/unistack-org/micro/v3/codec" -) - -type jsonCodec struct { - buf *bytes.Buffer - mt codec.MessageType - rwc io.ReadWriteCloser - c *clientCodec - s *serverCodec -} - -func (j *jsonCodec) Close() error { - j.buf.Reset() - return j.rwc.Close() -} - -func (j *jsonCodec) String() string { - return "json-rpc" -} - -func (j *jsonCodec) Write(m *codec.Message, b interface{}) error { - switch m.Type { - case codec.Request: - return j.c.Write(m, b) - case codec.Response, codec.Error: - return j.s.Write(m, b) - case codec.Event: - data, err := json.Marshal(b) - if err != nil { - return err - } - _, err = j.rwc.Write(data) - return err - default: - return fmt.Errorf("Unrecognised message type: %v", m.Type) - } -} - -func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { - j.buf.Reset() - j.mt = mt - - switch mt { - case codec.Request: - return j.s.ReadHeader(m) - case codec.Response: - return j.c.ReadHeader(m) - case codec.Event: - _, err := io.Copy(j.buf, j.rwc) - return err - default: - return fmt.Errorf("Unrecognised message type: %v", mt) - } -} - -func (j *jsonCodec) ReadBody(b interface{}) error { - switch j.mt { - case codec.Request: - return j.s.ReadBody(b) - case codec.Response: - return j.c.ReadBody(b) - case codec.Event: - if b != nil { - return json.Unmarshal(j.buf.Bytes(), b) - } - default: - return fmt.Errorf("Unrecognised message type: %v", j.mt) - } - return nil -} - -func NewCodec(rwc io.ReadWriteCloser) codec.Codec { - return &jsonCodec{ - buf: bytes.NewBuffer(nil), - rwc: rwc, - c: newClientCodec(rwc), - s: newServerCodec(rwc), - } -} diff --git a/codec/jsonrpc/server.go b/codec/jsonrpc/server.go deleted file mode 100644 index 28b2af0f..00000000 --- a/codec/jsonrpc/server.go +++ /dev/null @@ -1,84 +0,0 @@ -package jsonrpc - -import ( - "encoding/json" - "fmt" - "io" - - "github.com/unistack-org/micro/v3/codec" -) - -type serverCodec struct { - dec *json.Decoder // for reading JSON values - enc *json.Encoder // for writing JSON values - c io.Closer - - // temporary work space - req serverRequest -} - -type serverRequest struct { - Method string `json:"method"` - Params *json.RawMessage `json:"params"` - ID interface{} `json:"id"` -} - -type serverResponse struct { - ID interface{} `json:"id"` - Result interface{} `json:"result"` - Error interface{} `json:"error"` -} - -func newServerCodec(conn io.ReadWriteCloser) *serverCodec { - return &serverCodec{ - dec: json.NewDecoder(conn), - enc: json.NewEncoder(conn), - c: conn, - } -} - -func (r *serverRequest) reset() { - r.Method = "" - if r.Params != nil { - *r.Params = (*r.Params)[0:0] - } - if r.ID != nil { - r.ID = nil - } -} - -func (c *serverCodec) ReadHeader(m *codec.Message) error { - c.req.reset() - if err := c.dec.Decode(&c.req); err != nil { - return err - } - m.Method = c.req.Method - m.Id = fmt.Sprintf("%v", c.req.ID) - c.req.ID = nil - return nil -} - -func (c *serverCodec) ReadBody(x interface{}) error { - if x == nil { - return nil - } - var params [1]interface{} - params[0] = x - return json.Unmarshal(*c.req.Params, ¶ms) -} - -func (c *serverCodec) Write(m *codec.Message, x interface{}) error { - var resp serverResponse - resp.ID = m.Id - resp.Result = x - if m.Error == "" { - resp.Error = nil - } else { - resp.Error = m.Error - } - return c.enc.Encode(resp) -} - -func (c *serverCodec) Close() error { - return c.c.Close() -} diff --git a/codec/proto/marshaler.go b/codec/proto/marshaler.go deleted file mode 100644 index a110638e..00000000 --- a/codec/proto/marshaler.go +++ /dev/null @@ -1,47 +0,0 @@ -package proto - -import ( - "bytes" - - "github.com/golang/protobuf/proto" - "github.com/oxtoacart/bpool" - "github.com/unistack-org/micro/v3/codec" -) - -// create buffer pool with 16 instances each preallocated with 256 bytes -var bufferPool = bpool.NewSizedBufferPool(16, 256) - -type Marshaler struct{} - -func (Marshaler) Marshal(v interface{}) ([]byte, error) { - pb, ok := v.(proto.Message) - if !ok { - return nil, codec.ErrInvalidMessage - } - - // looks not good, but allows to reuse underlining bytes - buf := bufferPool.Get() - pbuf := proto.NewBuffer(buf.Bytes()) - defer func() { - bufferPool.Put(bytes.NewBuffer(pbuf.Bytes())) - }() - - if err := pbuf.Marshal(pb); err != nil { - return nil, err - } - - return pbuf.Bytes(), nil -} - -func (Marshaler) Unmarshal(data []byte, v interface{}) error { - pb, ok := v.(proto.Message) - if !ok { - return codec.ErrInvalidMessage - } - - return proto.Unmarshal(data, pb) -} - -func (Marshaler) String() string { - return "proto" -} diff --git a/codec/proto/message.go b/codec/proto/message.go deleted file mode 100644 index fd5cc6fa..00000000 --- a/codec/proto/message.go +++ /dev/null @@ -1,37 +0,0 @@ -package proto - -type Message struct { - Data []byte -} - -func (m *Message) MarshalJSON() ([]byte, error) { - return m.Data, nil -} - -func (m *Message) UnmarshalJSON(data []byte) error { - m.Data = data - return nil -} - -func (m *Message) ProtoMessage() {} - -func (m *Message) Reset() { - *m = Message{} -} - -func (m *Message) String() string { - return string(m.Data) -} - -func (m *Message) Marshal() ([]byte, error) { - return m.Data, nil -} - -func (m *Message) Unmarshal(data []byte) error { - m.Data = data - return nil -} - -func NewMessage(data []byte) *Message { - return &Message{data} -} diff --git a/codec/proto/proto.go b/codec/proto/proto.go deleted file mode 100644 index cbfc2496..00000000 --- a/codec/proto/proto.go +++ /dev/null @@ -1,64 +0,0 @@ -// Package proto provides a proto codec -package proto - -import ( - "io" - "io/ioutil" - - "github.com/unistack-org/micro/v3/codec" - "google.golang.org/protobuf/proto" -) - -type Codec struct { - Conn io.ReadWriteCloser -} - -func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { - return nil -} - -func (c *Codec) ReadBody(b interface{}) error { - if b == nil { - return nil - } - buf, err := ioutil.ReadAll(c.Conn) - if err != nil { - return err - } - m, ok := b.(proto.Message) - if !ok { - return codec.ErrInvalidMessage - } - return proto.Unmarshal(buf, m) -} - -func (c *Codec) Write(m *codec.Message, b interface{}) error { - if b == nil { - // Nothing to write - return nil - } - p, ok := b.(proto.Message) - if !ok { - return codec.ErrInvalidMessage - } - buf, err := proto.Marshal(p) - if err != nil { - return err - } - _, err = c.Conn.Write(buf) - return err -} - -func (c *Codec) Close() error { - return c.Conn.Close() -} - -func (c *Codec) String() string { - return "proto" -} - -func NewCodec(c io.ReadWriteCloser) codec.Codec { - return &Codec{ - Conn: c, - } -} diff --git a/codec/protorpc/envelope.pb.go b/codec/protorpc/envelope.pb.go deleted file mode 100644 index 81953524..00000000 --- a/codec/protorpc/envelope.pb.go +++ /dev/null @@ -1,238 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.25.0 -// protoc v3.6.1 -// source: codec/protorpc/envelope.proto - -package protorpc - -import ( - proto "github.com/golang/protobuf/proto" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -// This is a compile-time assertion that a sufficiently up-to-date version -// of the legacy proto package is being used. -const _ = proto.ProtoPackageIsVersion4 - -type Request struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ServiceMethod string `protobuf:"bytes,1,opt,name=service_method,json=serviceMethod,proto3" json:"service_method,omitempty"` - Seq uint64 `protobuf:"fixed64,2,opt,name=seq,proto3" json:"seq,omitempty"` -} - -func (x *Request) Reset() { - *x = Request{} - if protoimpl.UnsafeEnabled { - mi := &file_codec_protorpc_envelope_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Request) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Request) ProtoMessage() {} - -func (x *Request) ProtoReflect() protoreflect.Message { - mi := &file_codec_protorpc_envelope_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Request.ProtoReflect.Descriptor instead. -func (*Request) Descriptor() ([]byte, []int) { - return file_codec_protorpc_envelope_proto_rawDescGZIP(), []int{0} -} - -func (x *Request) GetServiceMethod() string { - if x != nil { - return x.ServiceMethod - } - return "" -} - -func (x *Request) GetSeq() uint64 { - if x != nil { - return x.Seq - } - return 0 -} - -type Response struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ServiceMethod string `protobuf:"bytes,1,opt,name=service_method,json=serviceMethod,proto3" json:"service_method,omitempty"` - Seq uint64 `protobuf:"fixed64,2,opt,name=seq,proto3" json:"seq,omitempty"` - Error string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"` -} - -func (x *Response) Reset() { - *x = Response{} - if protoimpl.UnsafeEnabled { - mi := &file_codec_protorpc_envelope_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Response) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Response) ProtoMessage() {} - -func (x *Response) ProtoReflect() protoreflect.Message { - mi := &file_codec_protorpc_envelope_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Response.ProtoReflect.Descriptor instead. -func (*Response) Descriptor() ([]byte, []int) { - return file_codec_protorpc_envelope_proto_rawDescGZIP(), []int{1} -} - -func (x *Response) GetServiceMethod() string { - if x != nil { - return x.ServiceMethod - } - return "" -} - -func (x *Response) GetSeq() uint64 { - if x != nil { - return x.Seq - } - return 0 -} - -func (x *Response) GetError() string { - if x != nil { - return x.Error - } - return "" -} - -var File_codec_protorpc_envelope_proto protoreflect.FileDescriptor - -var file_codec_protorpc_envelope_proto_rawDesc = []byte{ - 0x0a, 0x1d, 0x63, 0x6f, 0x64, 0x65, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x72, 0x70, 0x63, - 0x2f, 0x65, 0x6e, 0x76, 0x65, 0x6c, 0x6f, 0x70, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, - 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x72, 0x70, 0x63, 0x22, 0x42, 0x0a, 0x07, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x5f, - 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x73, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x73, - 0x65, 0x71, 0x18, 0x02, 0x20, 0x01, 0x28, 0x06, 0x52, 0x03, 0x73, 0x65, 0x71, 0x22, 0x59, 0x0a, - 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, - 0x12, 0x10, 0x0a, 0x03, 0x73, 0x65, 0x71, 0x18, 0x02, 0x20, 0x01, 0x28, 0x06, 0x52, 0x03, 0x73, - 0x65, 0x71, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_codec_protorpc_envelope_proto_rawDescOnce sync.Once - file_codec_protorpc_envelope_proto_rawDescData = file_codec_protorpc_envelope_proto_rawDesc -) - -func file_codec_protorpc_envelope_proto_rawDescGZIP() []byte { - file_codec_protorpc_envelope_proto_rawDescOnce.Do(func() { - file_codec_protorpc_envelope_proto_rawDescData = protoimpl.X.CompressGZIP(file_codec_protorpc_envelope_proto_rawDescData) - }) - return file_codec_protorpc_envelope_proto_rawDescData -} - -var file_codec_protorpc_envelope_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_codec_protorpc_envelope_proto_goTypes = []interface{}{ - (*Request)(nil), // 0: protorpc.Request - (*Response)(nil), // 1: protorpc.Response -} -var file_codec_protorpc_envelope_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_codec_protorpc_envelope_proto_init() } -func file_codec_protorpc_envelope_proto_init() { - if File_codec_protorpc_envelope_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_codec_protorpc_envelope_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Request); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_codec_protorpc_envelope_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Response); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_codec_protorpc_envelope_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_codec_protorpc_envelope_proto_goTypes, - DependencyIndexes: file_codec_protorpc_envelope_proto_depIdxs, - MessageInfos: file_codec_protorpc_envelope_proto_msgTypes, - }.Build() - File_codec_protorpc_envelope_proto = out.File - file_codec_protorpc_envelope_proto_rawDesc = nil - file_codec_protorpc_envelope_proto_goTypes = nil - file_codec_protorpc_envelope_proto_depIdxs = nil -} diff --git a/codec/protorpc/envelope.pb.micro.go b/codec/protorpc/envelope.pb.micro.go deleted file mode 100644 index fcaa757c..00000000 --- a/codec/protorpc/envelope.pb.micro.go +++ /dev/null @@ -1,21 +0,0 @@ -// Code generated by protoc-gen-micro. DO NOT EDIT. -// source: codec/protorpc/envelope.proto - -package protorpc - -import ( - fmt "fmt" - proto "github.com/golang/protobuf/proto" - math "math" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package diff --git a/codec/protorpc/envelope.proto b/codec/protorpc/envelope.proto deleted file mode 100644 index 5b2f9599..00000000 --- a/codec/protorpc/envelope.proto +++ /dev/null @@ -1,14 +0,0 @@ -syntax = "proto3"; - -package protorpc; - -message Request { - string service_method = 1; - fixed64 seq = 2; -} - -message Response { - string service_method = 1; - fixed64 seq = 2; - string error = 3; -} diff --git a/codec/protorpc/netstring.go b/codec/protorpc/netstring.go deleted file mode 100644 index 8204a0e2..00000000 --- a/codec/protorpc/netstring.go +++ /dev/null @@ -1,36 +0,0 @@ -package protorpc - -import ( - "encoding/binary" - "io" -) - -// WriteNetString writes data to a big-endian netstring on a Writer. -// Size is always a 32-bit unsigned int. -func WriteNetString(w io.Writer, data []byte) (written int, err error) { - size := make([]byte, 4) - binary.BigEndian.PutUint32(size, uint32(len(data))) - if written, err = w.Write(size); err != nil { - return - } - return w.Write(data) -} - -// ReadNetString reads data from a big-endian netstring. -func ReadNetString(r io.Reader) (data []byte, err error) { - sizeBuf := make([]byte, 4) - _, err = r.Read(sizeBuf) - if err != nil { - return nil, err - } - size := binary.BigEndian.Uint32(sizeBuf) - if size == 0 { - return nil, nil - } - data = make([]byte, size) - _, err = r.Read(data) - if err != nil { - return nil, err - } - return -} diff --git a/codec/protorpc/protorpc.go b/codec/protorpc/protorpc.go deleted file mode 100644 index 92a93e80..00000000 --- a/codec/protorpc/protorpc.go +++ /dev/null @@ -1,186 +0,0 @@ -// Protorpc provides a net/rpc proto-rpc codec. See envelope.proto for the format. -package protorpc - -import ( - "bytes" - "fmt" - "io" - "strconv" - "sync" - - "github.com/golang/protobuf/proto" - "github.com/unistack-org/micro/v3/codec" -) - -type flusher interface { - Flush() error -} - -type protoCodec struct { - sync.Mutex - rwc io.ReadWriteCloser - mt codec.MessageType - buf *bytes.Buffer -} - -func (c *protoCodec) Close() error { - c.buf.Reset() - return c.rwc.Close() -} - -func (c *protoCodec) String() string { - return "proto-rpc" -} - -func id(id string) uint64 { - p, err := strconv.ParseInt(id, 10, 64) - if err != nil { - p = 0 - } - i := uint64(p) - return i -} - -func (c *protoCodec) Write(m *codec.Message, b interface{}) error { - switch m.Type { - case codec.Request: - c.Lock() - defer c.Unlock() - // This is protobuf, of course we copy it. - pbr := &Request{ServiceMethod: m.Method, Seq: id(m.Id)} - data, err := proto.Marshal(pbr) - if err != nil { - return err - } - _, err = WriteNetString(c.rwc, data) - if err != nil { - return err - } - // dont trust or incoming message - m, ok := b.(proto.Message) - if !ok { - return codec.ErrInvalidMessage - } - data, err = proto.Marshal(m) - if err != nil { - return err - } - _, err = WriteNetString(c.rwc, data) - if err != nil { - return err - } - if flusher, ok := c.rwc.(flusher); ok { - if err = flusher.Flush(); err != nil { - return err - } - } - case codec.Response, codec.Error: - c.Lock() - defer c.Unlock() - rtmp := &Response{ServiceMethod: m.Method, Seq: id(m.Id), Error: m.Error} - data, err := proto.Marshal(rtmp) - if err != nil { - return err - } - _, err = WriteNetString(c.rwc, data) - if err != nil { - return err - } - if pb, ok := b.(proto.Message); ok { - data, err = proto.Marshal(pb) - if err != nil { - return err - } - } else { - data = nil - } - _, err = WriteNetString(c.rwc, data) - if err != nil { - return err - } - if flusher, ok := c.rwc.(flusher); ok { - if err = flusher.Flush(); err != nil { - return err - } - } - case codec.Event: - m, ok := b.(proto.Message) - if !ok { - return codec.ErrInvalidMessage - } - data, err := proto.Marshal(m) - if err != nil { - return err - } - c.rwc.Write(data) - default: - return fmt.Errorf("Unrecognised message type: %v", m.Type) - } - return nil -} - -func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { - c.buf.Reset() - c.mt = mt - - switch mt { - case codec.Request: - data, err := ReadNetString(c.rwc) - if err != nil { - return err - } - rtmp := new(Request) - err = proto.Unmarshal(data, rtmp) - if err != nil { - return err - } - m.Method = rtmp.GetServiceMethod() - m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) - case codec.Response: - data, err := ReadNetString(c.rwc) - if err != nil { - return err - } - rtmp := new(Response) - err = proto.Unmarshal(data, rtmp) - if err != nil { - return err - } - m.Method = rtmp.GetServiceMethod() - m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) - m.Error = rtmp.GetError() - case codec.Event: - _, err := io.Copy(c.buf, c.rwc) - return err - default: - return fmt.Errorf("Unrecognised message type: %v", mt) - } - return nil -} - -func (c *protoCodec) ReadBody(b interface{}) error { - var data []byte - switch c.mt { - case codec.Request, codec.Response: - var err error - data, err = ReadNetString(c.rwc) - if err != nil { - return err - } - case codec.Event: - data = c.buf.Bytes() - default: - return fmt.Errorf("Unrecognised message type: %v", c.mt) - } - if b != nil { - return proto.Unmarshal(data, b.(proto.Message)) - } - return nil -} - -func NewCodec(rwc io.ReadWriteCloser) codec.Codec { - return &protoCodec{ - buf: bytes.NewBuffer(nil), - rwc: rwc, - } -} diff --git a/codec/text/text.go b/codec/text/text.go deleted file mode 100644 index 6b83146c..00000000 --- a/codec/text/text.go +++ /dev/null @@ -1,80 +0,0 @@ -// Package text reads any text/* content-type -package text - -import ( - "fmt" - "io" - "io/ioutil" - - "github.com/unistack-org/micro/v3/codec" -) - -type Codec struct { - Conn io.ReadWriteCloser -} - -// Frame gives us the ability to define raw data to send over the pipes -type Frame struct { - Data []byte -} - -func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { - return nil -} - -func (c *Codec) ReadBody(b interface{}) error { - // read bytes - buf, err := ioutil.ReadAll(c.Conn) - if err != nil { - return err - } - - switch v := b.(type) { - case *string: - *v = string(buf) - case *[]byte: - *v = buf - case *Frame: - v.Data = buf - default: - return fmt.Errorf("failed to read body: %v is not type of *[]byte", b) - } - - return nil -} - -func (c *Codec) Write(m *codec.Message, b interface{}) error { - var v []byte - switch ve := b.(type) { - case nil: - return nil - case *Frame: - v = ve.Data - case *[]byte: - v = *ve - case *string: - v = []byte(*ve) - case string: - v = []byte(ve) - case []byte: - v = ve - default: - return fmt.Errorf("failed to write: %v is not type of *[]byte or []byte", b) - } - _, err := c.Conn.Write(v) - return err -} - -func (c *Codec) Close() error { - return c.Conn.Close() -} - -func (c *Codec) String() string { - return "text" -} - -func NewCodec(c io.ReadWriteCloser) codec.Codec { - return &Codec{ - Conn: c, - } -} diff --git a/events/events.go b/events/events.go index fb55bb3f..4d0ffa45 100644 --- a/events/events.go +++ b/events/events.go @@ -2,6 +2,7 @@ package events import ( + "context" "encoding/json" "errors" "time" @@ -16,14 +17,14 @@ var ( // Stream of events type Stream interface { - Publish(topic string, msg interface{}, opts ...PublishOption) error - Subscribe(topic string, opts ...SubscribeOption) (<-chan Event, error) + Publish(ctx context.Context, topic string, msg interface{}, opts ...PublishOption) error + Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (<-chan Event, error) } // Store of events type Store interface { - Read(opts ...ReadOption) ([]*Event, error) - Write(event *Event, opts ...WriteOption) error + Read(ctx context.Context, opts ...ReadOption) ([]*Event, error) + Write(ctx context.Context, event *Event, opts ...WriteOption) error } // Event is the object returned by the broker when you subscribe to a topic diff --git a/go.mod b/go.mod index 72720aea..f3c30d0a 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/go-acme/lego/v3 v3.4.0 github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee github.com/gobwas/ws v1.0.3 - github.com/golang/protobuf v1.4.2 + github.com/golang/protobuf v1.4.3 github.com/google/uuid v1.1.2 github.com/hashicorp/hcl v1.0.0 github.com/micro/cli/v2 v2.1.2 @@ -21,6 +21,10 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible github.com/stretchr/testify v1.5.1 github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844 + github.com/unistack-org/micro-codec-json v0.0.0-20201102222734-a29c895ec05c + github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd + github.com/unistack-org/micro-codec-proto v0.0.0-20201102222202-769c2d6a4b92 + github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077 github.com/unistack-org/micro-config-cmd v0.0.0-20201028144621-5a55f1aad70a golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a golang.org/x/net v0.0.0-20200904194848-62affa334b73 diff --git a/go.sum b/go.sum index 18e27c30..063f4afa 100644 --- a/go.sum +++ b/go.sum @@ -121,6 +121,8 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= @@ -281,8 +283,18 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLplxEC/etjIhdr3dNzV3JeT27LbVu5pYWm0JCBY= github.com/transip/gotransip v0.0.0-20190812104329-6d8d9179b66f/go.mod h1:i0f4R4o2HM0m3DZYQWsj6/MEowD57VzoH0v3d7igeFY= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= +github.com/unistack-org/micro-codec-bytes v0.0.0-20200827104921-3616a69473a6/go.mod h1:g5sOI8TWgGZiVHe8zoUPdtz7+0oLnqTnfBoai6Qb7jE= github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844 h1:5b1yuSllbsMm/9fUIlIXSr8DbsKT/sAKSCgOx6+SAfI= github.com/unistack-org/micro-codec-bytes v0.0.0-20200828083432-4e49e953d844/go.mod h1:g5sOI8TWgGZiVHe8zoUPdtz7+0oLnqTnfBoai6Qb7jE= +github.com/unistack-org/micro-codec-json v0.0.0-20201102222734-a29c895ec05c h1:RtcNaK8rQSl7xAoy1W437dvZLCVjSC6e4JcolepSQs0= +github.com/unistack-org/micro-codec-json v0.0.0-20201102222734-a29c895ec05c/go.mod h1:dG5aUyhBv+ebOl/UFW2Aj2GTfVxxXWi6AcynpePOAhQ= +github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd h1:qXSiEfVnCgrwTHYvAnEPSHEai3+5EUH9ZYovLpxGDwg= +github.com/unistack-org/micro-codec-jsonrpc v0.0.0-20201102222451-ff6a69988bcd/go.mod h1:PFyvkGhavl+3tEPgOaLAhoJJX4/webVGW59BSOXDfNM= +github.com/unistack-org/micro-codec-proto v0.0.0-20201102222202-769c2d6a4b92 h1:1rPDBu7Nwo3ZL6r6H5rj7qNchHSdBF4zcewAeTUEMC4= +github.com/unistack-org/micro-codec-proto v0.0.0-20201102222202-769c2d6a4b92/go.mod h1:31JMo683bBQ+uN9YufpUU6ESHphyx3DFmTXEnjpJV9Y= +github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077 h1:uK7owL8TPSwoQiDM1V/0swmgCEepSQKXoi8GEnGxtlU= +github.com/unistack-org/micro-codec-protorpc v0.0.0-20201102222610-3a343898c077/go.mod h1:Ct4uAVZaDEyBZj9Q0poDkbzu6zKXUCcSqJkv/MWPpeI= +github.com/unistack-org/micro-config-cmd v0.0.0-20200828075439-d859b9d7265b/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k= github.com/unistack-org/micro-config-cmd v0.0.0-20200909210346-ec89783dc46c h1:GbcjxyOyA9tnNoe4FcnzzLDa8JwEBnQKN/7Bhd8t47I= github.com/unistack-org/micro-config-cmd v0.0.0-20200909210346-ec89783dc46c/go.mod h1:6pm1cadbwsFcEW1ZbV5Fp0i3goR3TNfROMNSPih3I8k= github.com/unistack-org/micro-config-cmd v0.0.0-20200909210755-6e7e85eeab34 h1:VHc98t4SoiCF/jbkFu2e/j+IyJ/+MFQ1T+INNL7LubU= @@ -292,6 +304,7 @@ github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc/go.m github.com/unistack-org/micro-config-cmd v0.0.0-20201028144621-5a55f1aad70a h1:VjlqP1qZkjC0Chmx5MKFPIbtSCigeICFDf8vaLZGh9o= github.com/unistack-org/micro-config-cmd v0.0.0-20201028144621-5a55f1aad70a/go.mod h1:MzMg+qh1wORZwYtg5AVgFkNFrXVVbdPKW7s/Is+A994= github.com/unistack-org/micro/v3 v3.0.0-20200827083227-aa99378adc6e/go.mod h1:rPQbnry3nboAnMczj8B1Gzlcyv/HYoMZLgd3/3nttJ4= +github.com/unistack-org/micro/v3 v3.0.0-gamma/go.mod h1:iEtpu3wTYCRs3pQ3VsFEO7JBO4lOMpkOwMyrpZyIDPo= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200922103357-4c4fa00a5d94/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= diff --git a/network/transport/noop.go b/network/transport/noop.go index 9b5efdf0..646c42e2 100644 --- a/network/transport/noop.go +++ b/network/transport/noop.go @@ -1,5 +1,7 @@ package transport +import "context" + type noopTransport struct { opts Options } @@ -19,12 +21,12 @@ func (t *noopTransport) Options() Options { return t.opts } -func (t *noopTransport) Dial(addr string, opts ...DialOption) (Client, error) { +func (t *noopTransport) Dial(ctx context.Context, addr string, opts ...DialOption) (Client, error) { options := NewDialOptions(opts...) return &noopClient{opts: options}, nil } -func (t *noopTransport) Listen(addr string, opts ...ListenOption) (Listener, error) { +func (t *noopTransport) Listen(ctx context.Context, addr string, opts ...ListenOption) (Listener, error) { options := NewListenOptions(opts...) return &noopListener{opts: options}, nil } diff --git a/network/transport/options.go b/network/transport/options.go index ce7d19d5..921038ca 100644 --- a/network/transport/options.go +++ b/network/transport/options.go @@ -31,6 +31,7 @@ type Options struct { Context context.Context } +// NewOptions returns new options func NewOptions(opts ...Option) Options { options := Options{ Logger: logger.DefaultLogger, @@ -44,6 +45,7 @@ func NewOptions(opts ...Option) Options { return options } +// DialOptions struct type DialOptions struct { // Tells the transport this is a streaming connection with // multiple calls to send/recv and that send may not even be called @@ -59,6 +61,7 @@ type DialOptions struct { Context context.Context } +// NewDialOptions returns new DialOptions func NewDialOptions(opts ...DialOption) DialOptions { options := DialOptions{ Context: context.Background(), @@ -71,6 +74,7 @@ func NewDialOptions(opts ...DialOption) DialOptions { return options } +// ListenOptions struct type ListenOptions struct { // TODO: add tls options when listening // Currently set in global options @@ -80,6 +84,7 @@ type ListenOptions struct { Context context.Context } +// NewListenOptions returns new ListenOptions func NewListenOptions(opts ...ListenOption) ListenOptions { options := ListenOptions{ Context: context.Background(), @@ -106,6 +111,7 @@ func Logger(l logger.Logger) Option { } } +// Context sets the context func Context(ctx context.Context) Option { return func(o *Options) { o.Context = ctx @@ -142,14 +148,14 @@ func TLSConfig(t *tls.Config) Option { } } -// Indicates whether this is a streaming connection +// WithStream indicates whether this is a streaming connection func WithStream() DialOption { return func(o *DialOptions) { o.Stream = true } } -// Timeout used when dialling the remote side +// WithTimeout used when dialling the remote side func WithTimeout(d time.Duration) DialOption { return func(o *DialOptions) { o.Timeout = d diff --git a/network/transport/transport.go b/network/transport/transport.go index f9314516..b43df771 100644 --- a/network/transport/transport.go +++ b/network/transport/transport.go @@ -2,6 +2,7 @@ package transport import ( + "context" "time" ) @@ -16,8 +17,8 @@ var ( type Transport interface { Init(...Option) error Options() Options - Dial(addr string, opts ...DialOption) (Client, error) - Listen(addr string, opts ...ListenOption) (Listener, error) + Dial(ctx context.Context, addr string, opts ...DialOption) (Client, error) + Listen(ctx context.Context, addr string, opts ...ListenOption) (Listener, error) String() string } diff --git a/network/tunnel/broker/broker.go b/network/tunnel/broker/broker.go index 48379c6a..f798f2a3 100644 --- a/network/tunnel/broker/broker.go +++ b/network/tunnel/broker/broker.go @@ -50,17 +50,17 @@ func (t *tunBroker) Address() string { } func (t *tunBroker) Connect(ctx context.Context) error { - return t.tunnel.Connect() + return t.tunnel.Connect(ctx) } func (t *tunBroker) Disconnect(ctx context.Context) error { - return t.tunnel.Close() + return t.tunnel.Close(ctx) } func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error { // TODO: this is probably inefficient, we might want to just maintain an open connection // it may be easier to add broadcast to the tunnel - c, err := t.tunnel.Dial(topic, tunnel.DialMode(tunnel.Multicast)) + c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast)) if err != nil { return err } @@ -73,7 +73,7 @@ func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message } func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { - l, err := t.tunnel.Listen(topic, tunnel.ListenMode(tunnel.Multicast)) + l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast)) if err != nil { return nil, err } diff --git a/network/tunnel/transport/transport.go b/network/tunnel/transport/transport.go index cb6ea8a6..46cf21b7 100644 --- a/network/tunnel/transport/transport.go +++ b/network/tunnel/transport/transport.go @@ -26,7 +26,7 @@ func (t *tunTransport) Init(opts ...transport.Option) error { // close the existing tunnel if t.tunnel != nil { - t.tunnel.Close() + t.tunnel.Close(context.TODO()) } // get the tunnel @@ -47,12 +47,12 @@ func (t *tunTransport) Init(opts ...transport.Option) error { return nil } -func (t *tunTransport) Dial(addr string, opts ...transport.DialOption) (transport.Client, error) { - if err := t.tunnel.Connect(); err != nil { +func (t *tunTransport) Dial(ctx context.Context, addr string, opts ...transport.DialOption) (transport.Client, error) { + if err := t.tunnel.Connect(ctx); err != nil { return nil, err } - c, err := t.tunnel.Dial(addr) + c, err := t.tunnel.Dial(ctx, addr) if err != nil { return nil, err } @@ -60,12 +60,12 @@ func (t *tunTransport) Dial(addr string, opts ...transport.DialOption) (transpor return c, nil } -func (t *tunTransport) Listen(addr string, opts ...transport.ListenOption) (transport.Listener, error) { - if err := t.tunnel.Connect(); err != nil { +func (t *tunTransport) Listen(ctx context.Context, addr string, opts ...transport.ListenOption) (transport.Listener, error) { + if err := t.tunnel.Connect(ctx); err != nil { return nil, err } - l, err := t.tunnel.Listen(addr) + l, err := t.tunnel.Listen(ctx, addr) if err != nil { return nil, err } diff --git a/network/tunnel/tunnel.go b/network/tunnel/tunnel.go index 1c342c34..64d794d2 100644 --- a/network/tunnel/tunnel.go +++ b/network/tunnel/tunnel.go @@ -2,6 +2,7 @@ package tunnel import ( + "context" "errors" "time" @@ -55,15 +56,15 @@ type Tunnel interface { // Address returns the address the tunnel is listening on Address() string // Connect connects the tunnel - Connect() error + Connect(ctx context.Context) error // Close closes the tunnel - Close() error + Close(ctx context.Context) error // Links returns all the links the tunnel is connected to Links() []Link // Dial allows a client to connect to a channel - Dial(channel string, opts ...DialOption) (Session, error) + Dial(ctx context.Context, channel string, opts ...DialOption) (Session, error) // Listen allows to accept connections on a channel - Listen(channel string, opts ...ListenOption) (Listener, error) + Listen(ctx context.Context, channel string, opts ...ListenOption) (Listener, error) // String returns the name of the tunnel implementation String() string } diff --git a/options.go b/options.go index 7d56e9db..2c6012e6 100644 --- a/options.go +++ b/options.go @@ -91,6 +91,7 @@ func Broker(b broker.Broker) Option { } } +// Cmd to be used for service func Cmd(c cmd.Cmd) Option { return func(o *Options) { o.Cmd = c diff --git a/registry/noop.go b/registry/noop.go index 0a637b95..7c79126a 100644 --- a/registry/noop.go +++ b/registry/noop.go @@ -33,27 +33,27 @@ func (n *noopRegistry) Disconnect(ctx context.Context) error { } // Register registers service -func (n *noopRegistry) Register(*Service, ...RegisterOption) error { +func (n *noopRegistry) Register(ctx context.Context, svc *Service, opts ...RegisterOption) error { return nil } // Deregister deregisters service -func (n *noopRegistry) Deregister(*Service, ...DeregisterOption) error { +func (n *noopRegistry) Deregister(ctx context.Context, svc *Service, opts ...DeregisterOption) error { return nil } // GetService returns servive info -func (n *noopRegistry) GetService(string, ...GetOption) ([]*Service, error) { +func (n *noopRegistry) GetService(ctx context.Context, name string, opts ...GetOption) ([]*Service, error) { return []*Service{}, nil } // ListServices listing services -func (n *noopRegistry) ListServices(...ListOption) ([]*Service, error) { +func (n *noopRegistry) ListServices(ctx context.Context, opts ...ListOption) ([]*Service, error) { return []*Service{}, nil } // Watch is used to watch for service changes -func (n *noopRegistry) Watch(...WatchOption) (Watcher, error) { +func (n *noopRegistry) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { return nil, fmt.Errorf("not implemented") } diff --git a/registry/registry.go b/registry/registry.go index 742416b4..3898d926 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -30,11 +30,11 @@ type Registry interface { Options() Options Connect(context.Context) error Disconnect(context.Context) error - Register(*Service, ...RegisterOption) error - Deregister(*Service, ...DeregisterOption) error - GetService(string, ...GetOption) ([]*Service, error) - ListServices(...ListOption) ([]*Service, error) - Watch(...WatchOption) (Watcher, error) + Register(context.Context, *Service, ...RegisterOption) error + Deregister(context.Context, *Service, ...DeregisterOption) error + GetService(context.Context, string, ...GetOption) ([]*Service, error) + ListServices(context.Context, ...ListOption) ([]*Service, error) + Watch(context.Context, ...WatchOption) (Watcher, error) String() string } diff --git a/resolver/registry/registry.go b/resolver/registry/registry.go index 41e4fe36..b91c57c9 100644 --- a/resolver/registry/registry.go +++ b/resolver/registry/registry.go @@ -2,6 +2,8 @@ package registry import ( + "context" + "github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/resolver" ) @@ -14,7 +16,7 @@ type Resolver struct { // Resolve assumes ID is a domain name e.g micro.mu func (r *Resolver) Resolve(name string) ([]*resolver.Record, error) { - services, err := r.Registry.GetService(name) + services, err := r.Registry.GetService(context.TODO(), name) if err != nil { return nil, err } diff --git a/server/noop.go b/server/noop.go index 7393eec0..f3466253 100644 --- a/server/noop.go +++ b/server/noop.go @@ -8,12 +8,12 @@ import ( "time" craw "github.com/unistack-org/micro-codec-bytes" + cjson "github.com/unistack-org/micro-codec-json" + cjsonrpc "github.com/unistack-org/micro-codec-jsonrpc" + cproto "github.com/unistack-org/micro-codec-proto" + cprotorpc "github.com/unistack-org/micro-codec-protorpc" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/codec" - cjson "github.com/unistack-org/micro/v3/codec/json" - cjsonrpc "github.com/unistack-org/micro/v3/codec/jsonrpc" - cproto "github.com/unistack-org/micro/v3/codec/proto" - cprotorpc "github.com/unistack-org/micro/v3/codec/protorpc" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/registry" ) diff --git a/server/registry.go b/server/registry.go index 1ed54341..5e040d16 100644 --- a/server/registry.go +++ b/server/registry.go @@ -12,7 +12,7 @@ import ( var ( // DefaultRegisterFunc uses backoff to register service - DefaultRegisterFunc = func(service *registry.Service, config Options) error { + DefaultRegisterFunc = func(svc *registry.Service, config Options) error { var err error opts := []registry.RegisterOption{ @@ -21,7 +21,7 @@ var ( } for i := 0; i <= config.RegisterAttempts; i++ { - err = config.Registry.Register(service, opts...) + err = config.Registry.Register(config.Context, svc, opts...) if err == nil { break } @@ -32,7 +32,7 @@ var ( return err } // DefaultDeregisterFunc uses backoff to deregister service - DefaultDeregisterFunc = func(service *registry.Service, config Options) error { + DefaultDeregisterFunc = func(svc *registry.Service, config Options) error { var err error opts := []registry.DeregisterOption{ @@ -40,7 +40,7 @@ var ( } for i := 0; i <= config.DeregisterAttempts; i++ { - err = config.Registry.Deregister(service, opts...) + err = config.Registry.Deregister(config.Context, svc, opts...) if err == nil { break } diff --git a/sync/sync.go b/sync/sync.go index f15eb706..6d51abaf 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -6,6 +6,7 @@ import ( ) var ( + // ErrLockTimeout error ErrLockTimeout = errors.New("lock timeout") ) diff --git a/tracer/noop.go b/tracer/noop.go index 3acac73e..09f108df 100644 --- a/tracer/noop.go +++ b/tracer/noop.go @@ -2,24 +2,34 @@ package tracer import "context" -type NoopTracer struct{} +type noopTracer struct { + opts Options +} -func (n *NoopTracer) Init(...Option) error { +// Init initilize tracer +func (n *noopTracer) Init(opts ...Option) error { + for _, o := range opts { + o(&n.opts) + } return nil } -func (n *NoopTracer) Start(ctx context.Context, name string) (context.Context, *Span) { +// Start starts new span +func (n *noopTracer) Start(ctx context.Context, name string) (context.Context, *Span) { return nil, nil } -func (n *NoopTracer) Finish(*Span) error { +// Finish finishes span +func (n *noopTracer) Finish(*Span) error { return nil } -func (n *NoopTracer) Read(...ReadOption) ([]*Span, error) { +// Read reads span +func (n *noopTracer) Read(...ReadOption) ([]*Span, error) { return nil, nil } +// NewTracer returns new noop tracer func NewTracer(opts ...Option) Tracer { - return &NoopTracer{} + return &noopTracer{opts: NewOptions(opts...)} } diff --git a/tracer/options.go b/tracer/options.go index c1f20d82..88f51d0b 100644 --- a/tracer/options.go +++ b/tracer/options.go @@ -1,5 +1,10 @@ package tracer +var ( + // DefaultSize of the buffer + DefaultSize = 64 +) + // Options struct type Options struct { // Size is the size of ring buffer @@ -9,6 +14,7 @@ type Options struct { // Option func type Option func(o *Options) +// ReadOptions struct type ReadOptions struct { // Trace id Trace string @@ -24,14 +30,13 @@ func ReadTrace(t string) ReadOption { } } -const ( - // DefaultSize of the buffer - DefaultSize = 64 -) - -// DefaultOptions returns default options -func DefaultOptions() Options { - return Options{ +// NewOptions returns default options +func NewOptions(opts ...Option) Options { + options := Options{ Size: DefaultSize, } + for _, o := range opts { + o(&options) + } + return options } diff --git a/tracer/trace.go b/tracer/trace.go index 02004eee..f4bb1b35 100644 --- a/tracer/trace.go +++ b/tracer/trace.go @@ -1,4 +1,4 @@ -// Package trace provides an interface for distributed tracing +// Package tracer provides an interface for distributed tracing package tracer import ( @@ -7,6 +7,7 @@ import ( ) var ( + // DefaultTracer is the global default tracer DefaultTracer Tracer = NewTracer() ) diff --git a/util/pool/default.go b/util/pool/default.go index 9baf3e25..c897c796 100644 --- a/util/pool/default.go +++ b/util/pool/default.go @@ -1,6 +1,7 @@ package pool import ( + "context" "sync" "time" @@ -57,7 +58,7 @@ func (p *poolConn) Created() time.Time { return p.created } -func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) { +func (p *pool) Get(ctx context.Context, addr string, opts ...transport.DialOption) (Conn, error) { p.Lock() conns := p.conns[addr] @@ -83,7 +84,7 @@ func (p *pool) Get(addr string, opts ...transport.DialOption) (Conn, error) { p.Unlock() // create new conn - c, err := p.tr.Dial(addr, opts...) + c, err := p.tr.Dial(ctx, addr, opts...) if err != nil { return nil, err } diff --git a/util/pool/pool.go b/util/pool/pool.go index 56ae5c6b..f0d58cd3 100644 --- a/util/pool/pool.go +++ b/util/pool/pool.go @@ -2,6 +2,7 @@ package pool import ( + "context" "time" "github.com/unistack-org/micro/v3/network/transport" @@ -12,7 +13,7 @@ type Pool interface { // Close the pool Close() error // Get a connection - Get(addr string, opts ...transport.DialOption) (Conn, error) + Get(ctx context.Context, addr string, opts ...transport.DialOption) (Conn, error) // Release the connection Release(c Conn, status error) error }