diff --git a/api/server/http/http.go b/api/server/http/http.go index e0ddde53..7486d1be 100644 --- a/api/server/http/http.go +++ b/api/server/http/http.go @@ -21,13 +21,8 @@ type httpServer struct { } func NewServer(address string, opts ...server.Option) server.Server { - var options server.Options - for _, o := range opts { - o(&options) - } - return &httpServer{ - opts: options, + opts: server.NewOptions(opts...), mux: http.NewServeMux(), address: address, exit: make(chan chan error), diff --git a/api/server/options.go b/api/server/options.go index 644cc386..5fbb152b 100644 --- a/api/server/options.go +++ b/api/server/options.go @@ -21,6 +21,14 @@ type Options struct { Wrappers []Wrapper } +func NewOptions(opts ...Option) Options { + options := Options{} + for _, o := range opts { + o(&options) + } + return options +} + type Wrapper func(h http.Handler) http.Handler func WrapHandler(w ...Wrapper) Option { diff --git a/auth/auth.go b/auth/auth.go index 8fa5b77b..4559f69d 100644 --- a/auth/auth.go +++ b/auth/auth.go @@ -17,7 +17,7 @@ const ( ) var ( - DefaultAuth Auth = newAuth() + DefaultAuth Auth = &NoopAuth{opts: NewOptions()} // ErrInvalidToken is when the token provided is not valid ErrInvalidToken = errors.New("invalid token provided") // ErrForbidden is when a user does not have the necessary scope to access a resource diff --git a/auth/noop.go b/auth/noop.go index e7dbbda2..ed0f485f 100644 --- a/auth/noop.go +++ b/auth/noop.go @@ -4,40 +4,29 @@ import ( "github.com/google/uuid" ) -func newAuth(opts ...Option) Auth { - var options Options - for _, o := range opts { - o(&options) - } - - return &noop{ - opts: options, - } -} - -type noop struct { +type NoopAuth struct { opts Options } // String returns the name of the implementation -func (n *noop) String() string { +func (n *NoopAuth) String() string { return "noop" } // Init the auth -func (n *noop) Init(opts ...Option) { +func (n *NoopAuth) Init(opts ...Option) { for _, o := range opts { o(&n.opts) } } // Options set for auth -func (n *noop) Options() Options { +func (n *NoopAuth) Options() Options { return n.opts } // Generate a new account -func (n *noop) Generate(id string, opts ...GenerateOption) (*Account, error) { +func (n *NoopAuth) Generate(id string, opts ...GenerateOption) (*Account, error) { options := NewGenerateOptions(opts...) return &Account{ @@ -50,27 +39,27 @@ func (n *noop) Generate(id string, opts ...GenerateOption) (*Account, error) { } // Grant access to a resource -func (n *noop) Grant(rule *Rule) error { +func (n *NoopAuth) Grant(rule *Rule) error { return nil } // Revoke access to a resource -func (n *noop) Revoke(rule *Rule) error { +func (n *NoopAuth) Revoke(rule *Rule) error { return nil } // Rules used to verify requests -func (n *noop) Rules(opts ...RulesOption) ([]*Rule, error) { +func (n *NoopAuth) Rules(opts ...RulesOption) ([]*Rule, error) { return []*Rule{}, nil } // Verify an account has access to a resource -func (n *noop) Verify(acc *Account, res *Resource, opts ...VerifyOption) error { +func (n *NoopAuth) Verify(acc *Account, res *Resource, opts ...VerifyOption) error { return nil } // Inspect a token -func (n *noop) Inspect(token string) (*Account, error) { +func (n *NoopAuth) Inspect(token string) (*Account, error) { uid, err := uuid.NewRandom() if err != nil { return nil, err @@ -79,6 +68,6 @@ func (n *noop) Inspect(token string) (*Account, error) { } // Token generation using an account id and secret -func (n *noop) Token(opts ...TokenOption) (*Token, error) { +func (n *NoopAuth) Token(opts ...TokenOption) (*Token, error) { return &Token{}, nil } diff --git a/broker/broker.go b/broker/broker.go index d0c185f3..c93d1132 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -1,8 +1,10 @@ // Package broker is an interface used for asynchronous messaging package broker +import "context" + var ( - DefaultBroker Broker = newBroker() + DefaultBroker Broker = &NoopBroker{opts: NewOptions()} ) // Broker is an interface used for asynchronous messaging. @@ -10,10 +12,10 @@ type Broker interface { Init(...Option) error Options() Options Address() string - Connect() error - Disconnect() error - Publish(topic string, m *Message, opts ...PublishOption) error - Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) + Connect(context.Context) error + Disconnect(context.Context) error + Publish(context.Context, string, *Message, ...PublishOption) error + Subscribe(context.Context, string, Handler, ...SubscribeOption) (Subscriber, error) String() string } @@ -39,5 +41,5 @@ type Message struct { type Subscriber interface { Options() SubscribeOptions Topic() string - Unsubscribe() error + Unsubscribe(context.Context) error } diff --git a/broker/noop.go b/broker/noop.go index 29dad2c5..9dfcac87 100644 --- a/broker/noop.go +++ b/broker/noop.go @@ -1,6 +1,8 @@ package broker -type noopBroker struct { +import "context" + +type NoopBroker struct { opts Options } @@ -9,7 +11,7 @@ type noopSubscriber struct { opts SubscribeOptions } -func (n *noopBroker) Init(opts ...Option) error { +func (n *NoopBroker) Init(opts ...Option) error { for _, o := range opts { o(&n.opts) } @@ -17,27 +19,27 @@ func (n *noopBroker) Init(opts ...Option) error { return nil } -func (n *noopBroker) Options() Options { +func (n *NoopBroker) Options() Options { return n.opts } -func (n *noopBroker) Address() string { +func (n *NoopBroker) Address() string { return "" } -func (n *noopBroker) Connect() error { +func (n *NoopBroker) Connect(ctx context.Context) error { return nil } -func (n *noopBroker) Disconnect() error { +func (n *NoopBroker) Disconnect(ctx context.Context) error { return nil } -func (n *noopBroker) Publish(topic string, m *Message, opts ...PublishOption) error { +func (n *NoopBroker) Publish(ctx context.Context, topic string, m *Message, opts ...PublishOption) error { return nil } -func (n *noopBroker) Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) { +func (n *NoopBroker) Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) { options := NewSubscribeOptions() for _, o := range opts { @@ -47,7 +49,7 @@ func (n *noopBroker) Subscribe(topic string, h Handler, opts ...SubscribeOption) return &noopSubscriber{topic: topic, opts: options}, nil } -func (n *noopBroker) String() string { +func (n *NoopBroker) String() string { return "noop" } @@ -59,16 +61,6 @@ func (n *noopSubscriber) Topic() string { return n.topic } -func (n *noopSubscriber) Unsubscribe() error { +func (n *noopSubscriber) Unsubscribe(ctx context.Context) error { return nil } - -// newBroker returns a new noop broker -func newBroker(opts ...Option) Broker { - options := NewOptions() - - for _, o := range opts { - o(&options) - } - return &noopBroker{opts: options} -} diff --git a/broker/options.go b/broker/options.go index 9542c59f..b48cf5d4 100644 --- a/broker/options.go +++ b/broker/options.go @@ -39,6 +39,12 @@ func NewOptions(opts ...Option) Options { return options } +func Context(ctx context.Context) Option { + return func(o *Options) { + o.Context = ctx + } +} + type PublishOptions struct { // Other options for implementations of the interface // can be stored in a context diff --git a/client/client.go b/client/client.go index 5364dc1b..4cb10297 100644 --- a/client/client.go +++ b/client/client.go @@ -9,7 +9,7 @@ import ( ) var ( - DefaultClient Client = newClient() + DefaultClient Client = &NoopClient{opts: NewOptions()} ) // Client is the interface used to make requests to services. diff --git a/client/noop.go b/client/noop.go index ba3c811f..5a06ee0a 100644 --- a/client/noop.go +++ b/client/noop.go @@ -2,17 +2,16 @@ package client import ( "context" - "sync/atomic" raw "github.com/unistack-org/micro-codec-bytes" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/codec" + "github.com/unistack-org/micro/v3/codec/json" "github.com/unistack-org/micro/v3/errors" "github.com/unistack-org/micro/v3/metadata" ) -type noopClient struct { - once atomic.Value +type NoopClient struct { opts Options } @@ -119,30 +118,30 @@ func (n *noopMessage) ContentType() string { return n.opts.ContentType } -func (n *noopClient) Init(opts ...Option) error { +func (n *NoopClient) Init(opts ...Option) error { for _, o := range opts { o(&n.opts) } return nil } -func (n *noopClient) Options() Options { +func (n *NoopClient) Options() Options { return n.opts } -func (n *noopClient) String() string { +func (n *NoopClient) String() string { return "noop" } -func (n *noopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error { +func (n *NoopClient) Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error { return nil } -func (n *noopClient) NewRequest(service, endpoint string, req interface{}, opts ...RequestOption) Request { +func (n *NoopClient) NewRequest(service, endpoint string, req interface{}, opts ...RequestOption) Request { return &noopRequest{} } -func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message { +func (n *NoopClient) NewMessage(topic string, msg interface{}, opts ...MessageOption) Message { options := MessageOptions{} for _, o := range opts { o(&options) @@ -151,25 +150,18 @@ func (n *noopClient) NewMessage(topic string, msg interface{}, opts ...MessageOp return &noopMessage{topic: topic, payload: msg, opts: options} } -func (n *noopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) { +func (n *NoopClient) Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error) { return &noopStream{}, nil } -func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error { - var options PublishOptions +func (n *NoopClient) Publish(ctx context.Context, p Message, opts ...PublishOption) error { var body []byte - // fail early on connect error - if !n.once.Load().(bool) { - if err := n.opts.Broker.Connect(); err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) - } - n.once.Store(true) + if err := n.opts.Broker.Connect(ctx); err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) } - for _, o := range opts { - o(&options) - } + options := NewPublishOptions(opts...) md, ok := metadata.FromContext(ctx) if !ok { @@ -182,6 +174,11 @@ func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOpti if d, ok := p.Payload().(*raw.Frame); ok { body = d.Data } else { + cf := n.opts.Broker.Options().Codec + if cf == nil { + cf = json.Marshaler{} + } + /* // use codec for payload cf, err := n.opts.Codecs[p.ContentType()] @@ -190,7 +187,7 @@ func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOpti } */ // set the body - b, err := n.opts.Broker.Options().Codec.Marshal(p.Payload()) + b, err := cf.Marshal(p.Payload()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } @@ -204,7 +201,7 @@ func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOpti topic = options.Exchange } - return n.opts.Broker.Publish(topic, &broker.Message{ + return n.opts.Broker.Publish(ctx, topic, &broker.Message{ Header: md, Body: body, }, broker.PublishContext(options.Context)) @@ -217,5 +214,5 @@ func newClient(opts ...Option) Client { for _, o := range opts { o(&options) } - return &noopClient{opts: options} + return &NoopClient{opts: options} } diff --git a/client/options.go b/client/options.go index b14aae18..5cf1125a 100644 --- a/client/options.go +++ b/client/options.go @@ -7,11 +7,11 @@ import ( "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/network/transport" "github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/router" "github.com/unistack-org/micro/v3/selector" "github.com/unistack-org/micro/v3/selector/random" - "github.com/unistack-org/micro/v3/network/transport" ) type Options struct { @@ -48,6 +48,14 @@ type Options struct { Context context.Context } +func NewCallOptions(opts ...CallOption) CallOptions { + options := CallOptions{} + for _, o := range opts { + o(&options) + } + return options +} + type CallOptions struct { // Address of remote hosts Address []string @@ -84,6 +92,20 @@ type CallOptions struct { Context context.Context } +func Context(ctx context.Context) Option { + return func(o *Options) { + o.Context = ctx + } +} + +func NewPublishOptions(opts ...PublishOption) PublishOptions { + options := PublishOptions{} + for _, o := range opts { + o(&options) + } + return options +} + type PublishOptions struct { // Exchange is the routing exchange for the message Exchange string @@ -92,10 +114,26 @@ type PublishOptions struct { Context context.Context } +func NewMessageOptions(opts ...MessageOption) MessageOptions { + options := MessageOptions{} + for _, o := range opts { + o(&options) + } + return options +} + type MessageOptions struct { ContentType string } +func NewRequestOptions(opts ...RequestOption) RequestOptions { + options := RequestOptions{} + for _, o := range opts { + o(&options) + } + return options +} + type RequestOptions struct { ContentType string Stream bool diff --git a/codec/json/json.go b/codec/json/json.go index 74ffe267..94f91b2f 100644 --- a/codec/json/json.go +++ b/codec/json/json.go @@ -25,12 +25,13 @@ func (c *Codec) ReadBody(b interface{}) error { if b == nil { return nil } - if pb, ok := b.(proto.Message); ok { + switch m := b.(type) { + case proto.Message: buf, err := ioutil.ReadAll(c.Conn) if err != nil { return err } - return jsonpb.Unmarshal(buf, pb) + return jsonpb.Unmarshal(buf, m) } return c.Decoder.Decode(b) } diff --git a/debug/profile/profile.go b/debug/profile/profile.go index d043e524..3ecf4b7a 100644 --- a/debug/profile/profile.go +++ b/debug/profile/profile.go @@ -11,20 +11,20 @@ type Profile interface { } var ( - DefaultProfile Profile = new(noop) + DefaultProfile Profile = &NoopProfile{} ) -type noop struct{} +type NoopProfile struct{} -func (p *noop) Start() error { +func (p *NoopProfile) Start() error { return nil } -func (p *noop) Stop() error { +func (p *NoopProfile) Stop() error { return nil } -func (p *noop) String() string { +func (p *NoopProfile) String() string { return "noop" } diff --git a/logger/logger.go b/logger/logger.go index 4af78358..d496e5df 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -9,7 +9,7 @@ var ( // Logger is a generic logging interface type Logger interface { // Init initialises options - Init(options ...Option) error + Init(opts ...Option) error // V compare provided verbosity level with current log level V(level Level) bool // The Logger options diff --git a/logger/options.go b/logger/options.go index d0a03302..c0a94604 100644 --- a/logger/options.go +++ b/logger/options.go @@ -20,6 +20,14 @@ type Options struct { Context context.Context } +func NewOptions(opts ...Option) Options { + options := Options{} + for _, o := range opts { + o(&options) + } + return options +} + // WithFields set default fields for the logger func WithFields(fields map[string]interface{}) Option { return func(args *Options) { diff --git a/micro.go b/micro.go index fe002853..a1775098 100644 --- a/micro.go +++ b/micro.go @@ -4,6 +4,7 @@ package micro import ( "context" + "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/server" ) @@ -17,13 +18,15 @@ type Service interface { // The service name Name() string // Init initialises options - Init(...Option) + Init(...Option) error // Options returns the current options Options() Options // Client is used to call services Client() client.Client // Server is for handling requests and events Server() server.Server + // Broker is for broker usage + Broker() broker.Broker // Run the service Run() error // The service implementation diff --git a/network/transport/noop.go b/network/transport/noop.go new file mode 100644 index 00000000..95a039a0 --- /dev/null +++ b/network/transport/noop.go @@ -0,0 +1,70 @@ +package transport + +type NoopTransport struct { + opts Options +} + +func (t *NoopTransport) Init(opts ...Option) error { + for _, o := range opts { + o(&t.opts) + } + return nil +} + +func (t *NoopTransport) Options() Options { + return t.opts +} + +func (t *NoopTransport) Dial(addr string, opts ...DialOption) (Client, error) { + options := NewDialOptions(opts...) + return &noopClient{opts: options}, nil +} + +func (t *NoopTransport) Listen(addr string, opts ...ListenOption) (Listener, error) { + options := NewListenOptions(opts...) + return &noopListener{opts: options}, nil +} + +func (t *NoopTransport) String() string { + return "noop" +} + +type noopClient struct { + opts DialOptions +} + +func (c *noopClient) Close() error { + return nil +} + +func (c *noopClient) Local() string { + return "" +} + +func (c *noopClient) Remote() string { + return "" +} + +func (c *noopClient) Recv(*Message) error { + return nil +} + +func (c *noopClient) Send(*Message) error { + return nil +} + +type noopListener struct { + opts ListenOptions +} + +func (l *noopListener) Addr() string { + return "" +} + +func (l *noopListener) Accept(fn func(Socket)) error { + return nil +} + +func (l *noopListener) Close() error { + return nil +} diff --git a/network/transport/options.go b/network/transport/options.go index 377c4162..ce7d19d5 100644 --- a/network/transport/options.go +++ b/network/transport/options.go @@ -106,6 +106,12 @@ func Logger(l logger.Logger) Option { } } +func Context(ctx context.Context) Option { + return func(o *Options) { + o.Context = ctx + } +} + // Codec sets the codec used for encoding where the transport // does not support message headers func Codec(c codec.Marshaler) Option { diff --git a/network/transport/transport.go b/network/transport/transport.go index 440217ff..e3b0a957 100644 --- a/network/transport/transport.go +++ b/network/transport/transport.go @@ -6,7 +6,7 @@ import ( ) var ( - DefaultTransport Transport + DefaultTransport Transport = &NoopTransport{opts: NewOptions()} ) // Transport is an interface which is used for communication between diff --git a/network/tunnel/broker/broker.go b/network/tunnel/broker/broker.go index 404ba2c5..48379c6a 100644 --- a/network/tunnel/broker/broker.go +++ b/network/tunnel/broker/broker.go @@ -49,15 +49,15 @@ func (t *tunBroker) Address() string { return t.tunnel.Address() } -func (t *tunBroker) Connect() error { +func (t *tunBroker) Connect(ctx context.Context) error { return t.tunnel.Connect() } -func (t *tunBroker) Disconnect() error { +func (t *tunBroker) Disconnect(ctx context.Context) error { return t.tunnel.Close() } -func (t *tunBroker) Publish(topic string, m *broker.Message, opts ...broker.PublishOption) error { +func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error { // TODO: this is probably inefficient, we might want to just maintain an open connection // it may be easier to add broadcast to the tunnel c, err := t.tunnel.Dial(topic, tunnel.DialMode(tunnel.Multicast)) @@ -72,21 +72,16 @@ func (t *tunBroker) Publish(topic string, m *broker.Message, opts ...broker.Publ }) } -func (t *tunBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { +func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { l, err := t.tunnel.Listen(topic, tunnel.ListenMode(tunnel.Multicast)) if err != nil { return nil, err } - var options broker.SubscribeOptions - for _, o := range opts { - o(&options) - } - tunSub := &tunSubscriber{ topic: topic, handler: h, - opts: options, + opts: broker.NewSubscribeOptions(opts...), closed: make(chan bool), listener: l, } @@ -150,7 +145,7 @@ func (t *tunSubscriber) Topic() string { return t.topic } -func (t *tunSubscriber) Unsubscribe() error { +func (t *tunSubscriber) Unsubscribe(ctx context.Context) error { select { case <-t.closed: return nil @@ -177,12 +172,7 @@ func (t *tunEvent) Error() error { } func NewBroker(opts ...broker.Option) (broker.Broker, error) { - options := broker.Options{ - Context: context.Background(), - } - for _, o := range opts { - o(&options) - } + options := broker.NewOptions(opts...) t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel) if !ok { diff --git a/registry/noop.go b/registry/noop.go index 2370a642..4b995790 100644 --- a/registry/noop.go +++ b/registry/noop.go @@ -1,53 +1,59 @@ package registry -import "fmt" +import ( + "context" + "fmt" +) -type noopRegistry struct { +type NoopRegistry struct { opts Options } -func (n *noopRegistry) Init(opts ...Option) error { +func (n *NoopRegistry) Init(opts ...Option) error { for _, o := range opts { o(&n.opts) } return nil } -func (n *noopRegistry) Options() Options { +func (n *NoopRegistry) Options() Options { return n.opts } -func (n *noopRegistry) Register(*Service, ...RegisterOption) error { +func (n *NoopRegistry) Connect(ctx context.Context) error { return nil } -func (n *noopRegistry) Deregister(*Service, ...DeregisterOption) error { +func (n *NoopRegistry) Disconnect(ctx context.Context) error { return nil } -func (n *noopRegistry) GetService(string, ...GetOption) ([]*Service, error) { +func (n *NoopRegistry) Register(*Service, ...RegisterOption) error { + return nil +} + +func (n *NoopRegistry) Deregister(*Service, ...DeregisterOption) error { + return nil +} + +func (n *NoopRegistry) GetService(string, ...GetOption) ([]*Service, error) { return []*Service{}, nil } -func (n *noopRegistry) ListServices(...ListOption) ([]*Service, error) { +func (n *NoopRegistry) ListServices(...ListOption) ([]*Service, error) { return []*Service{}, nil } -func (n *noopRegistry) Watch(...WatchOption) (Watcher, error) { +func (n *NoopRegistry) Watch(...WatchOption) (Watcher, error) { return nil, fmt.Errorf("not implemented") } -func (n *noopRegistry) String() string { +func (n *NoopRegistry) String() string { return "noop" } -// newRegistry returns a new noop registry -func newRegistry(opts ...Option) Registry { - options := NewOptions() - - for _, o := range opts { - o(&options) - } - - return &noopRegistry{opts: options} +// NewRegistry returns a new noop registry +func NewRegistry(opts ...Option) Registry { + options := NewOptions(opts...) + return &NoopRegistry{opts: options} } diff --git a/registry/options.go b/registry/options.go index 0b4702ed..0134cfbd 100644 --- a/registry/options.go +++ b/registry/options.go @@ -37,6 +37,8 @@ type RegisterOptions struct { Context context.Context // Domain to register the service in Domain string + // Attempts specify attempts for register + Attempts int } type WatchOptions struct { @@ -54,6 +56,8 @@ type DeregisterOptions struct { Context context.Context // Domain the service was registered in Domain string + // Atempts specify max attempts for deregister + Attempts int } type GetOptions struct { @@ -95,6 +99,13 @@ func Logger(l logger.Logger) Option { } } +// Context sets the context +func Context(ctx context.Context) Option { + return func(o *Options) { + o.Context = ctx + } +} + // Specify TLS Config func TLSConfig(t *tls.Config) Option { return func(o *Options) { @@ -102,6 +113,12 @@ func TLSConfig(t *tls.Config) Option { } } +func RegisterAttempts(t int) RegisterOption { + return func(o *RegisterOptions) { + o.Attempts = t + } +} + func RegisterTTL(t time.Duration) RegisterOption { return func(o *RegisterOptions) { o.TTL = t @@ -139,6 +156,12 @@ func WatchDomain(d string) WatchOption { } } +func DeregisterTimeout(t int) DeregisterOption { + return func(o *DeregisterOptions) { + o.Attempts = t + } +} + func DeregisterContext(ctx context.Context) DeregisterOption { return func(o *DeregisterOptions) { o.Context = ctx diff --git a/registry/registry.go b/registry/registry.go index 4c042dac..c76f7982 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -2,6 +2,7 @@ package registry import ( + "context" "errors" ) @@ -13,7 +14,7 @@ const ( ) var ( - DefaultRegistry Registry = newRegistry() + DefaultRegistry Registry = NewRegistry() // ErrNotFound returned when GetService is called and no services found ErrNotFound = errors.New("service not found") // ErrWatcherStopped returned when when watcher is stopped @@ -26,6 +27,8 @@ var ( type Registry interface { Init(...Option) error Options() Options + Connect(context.Context) error + Disconnect(context.Context) error Register(*Service, ...RegisterOption) error Deregister(*Service, ...DeregisterOption) error GetService(string, ...GetOption) ([]*Service, error) diff --git a/router/query.go b/router/query.go index 39efd5c2..72e43f0f 100644 --- a/router/query.go +++ b/router/query.go @@ -65,7 +65,7 @@ func QueryLink(link string) QueryOption { // NewQuery creates new query and returns it func NewQuery(opts ...QueryOption) QueryOptions { // default options - qopts := QueryOptions{ + options := QueryOptions{ Service: "*", Address: "*", Gateway: "*", @@ -75,8 +75,8 @@ func NewQuery(opts ...QueryOption) QueryOptions { } for _, o := range opts { - o(&qopts) + o(&options) } - return qopts + return options } diff --git a/server/handler.go b/server/handler.go index 7c7821ff..44fe1e61 100644 --- a/server/handler.go +++ b/server/handler.go @@ -1,94 +1,59 @@ package server -import "context" +import ( + "reflect" -type HandlerOption func(*HandlerOptions) + "github.com/unistack-org/micro/v3/registry" +) -type HandlerOptions struct { - Internal bool - Metadata map[string]map[string]string - Context context.Context +type rpcHandler struct { + name string + handler interface{} + endpoints []*registry.Endpoint + opts HandlerOptions } -func NewHandlerOptions(opts ...HandlerOption) HandlerOptions { - options := HandlerOptions{ - Context: context.Background(), +func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler { + options := NewHandlerOptions(opts...) + + typ := reflect.TypeOf(handler) + hdlr := reflect.ValueOf(handler) + name := reflect.Indirect(hdlr).Type().Name() + + var endpoints []*registry.Endpoint + + for m := 0; m < typ.NumMethod(); m++ { + if e := registry.ExtractEndpoint(typ.Method(m)); e != nil { + e.Name = name + "." + e.Name + + for k, v := range options.Metadata[e.Name] { + e.Metadata[k] = v + } + + endpoints = append(endpoints, e) + } } - for _, o := range opts { - o(&options) - } - - return options -} - -type SubscriberOption func(*SubscriberOptions) - -type SubscriberOptions struct { - // AutoAck defaults to true. When a handler returns - // with a nil error the message is acked. - AutoAck bool - Queue string - Internal bool - Context context.Context -} - -func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions { - options := SubscriberOptions{ - AutoAck: true, - Context: context.Background(), - } - - for _, o := range opts { - o(&options) - } - - return options -} - -// EndpointMetadata is a Handler option that allows metadata to be added to -// individual endpoints. -func EndpointMetadata(name string, md map[string]string) HandlerOption { - return func(o *HandlerOptions) { - o.Metadata[name] = md + return &rpcHandler{ + name: name, + handler: handler, + endpoints: endpoints, + opts: options, } } -// Internal Handler options specifies that a handler is not advertised -// to the discovery system. In the future this may also limit request -// to the internal network or authorised user. -func InternalHandler(b bool) HandlerOption { - return func(o *HandlerOptions) { - o.Internal = b - } +func (r *rpcHandler) Name() string { + return r.name } -// Internal Subscriber options specifies that a subscriber is not advertised -// to the discovery system. -func InternalSubscriber(b bool) SubscriberOption { - return func(o *SubscriberOptions) { - o.Internal = b - } +func (r *rpcHandler) Handler() interface{} { + return r.handler } -// DisableAutoAck will disable auto acking of messages -// after they have been handled. -func DisableAutoAck() SubscriberOption { - return func(o *SubscriberOptions) { - o.AutoAck = false - } +func (r *rpcHandler) Endpoints() []*registry.Endpoint { + return r.endpoints } -// Shared queue name distributed messages across subscribers -func SubscriberQueue(n string) SubscriberOption { - return func(o *SubscriberOptions) { - o.Queue = n - } -} - -// SubscriberContext set context options to allow broker SubscriberOption passed -func SubscriberContext(ctx context.Context) SubscriberOption { - return func(o *SubscriberOptions) { - o.Context = ctx - } +func (r *rpcHandler) Options() HandlerOptions { + return r.opts } diff --git a/server/noop.go b/server/noop.go index 8fdd61eb..98f65c6f 100644 --- a/server/noop.go +++ b/server/noop.go @@ -1,108 +1,462 @@ package server -import "github.com/unistack-org/micro/v3/registry" +import ( + "bytes" + "fmt" + "sort" + "sync" + "time" -type noopServer struct { - h Handler - opts Options + craw "github.com/unistack-org/micro-codec-bytes" + "github.com/unistack-org/micro/v3/broker" + "github.com/unistack-org/micro/v3/codec" + cjson "github.com/unistack-org/micro/v3/codec/json" + cjsonrpc "github.com/unistack-org/micro/v3/codec/jsonrpc" + cproto "github.com/unistack-org/micro/v3/codec/proto" + cprotorpc "github.com/unistack-org/micro/v3/codec/protorpc" + "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/registry" +) + +var ( + DefaultCodecs = map[string]codec.NewCodec{ + "application/json": cjson.NewCodec, + "application/json-rpc": cjsonrpc.NewCodec, + "application/protobuf": cproto.NewCodec, + "application/proto-rpc": cprotorpc.NewCodec, + "application/octet-stream": craw.NewCodec, + } +) + +const ( + defaultContentType = "application/json" +) + +type NoopServer struct { + h Handler + opts Options + rsvc *registry.Service + handlers map[string]Handler + subscribers map[*subscriber][]broker.Subscriber + registered bool + started bool + exit chan chan error + wg *sync.WaitGroup + sync.RWMutex } -type noopHandler struct { - opts HandlerOptions - h interface{} +func (n *NoopServer) newCodec(contentType string) (codec.NewCodec, error) { + if cf, ok := n.opts.Codecs[contentType]; ok { + return cf, nil + } + if cf, ok := DefaultCodecs[contentType]; ok { + return cf, nil + } + return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) } -type noopSubscriber struct { - topic string - opts SubscriberOptions - h interface{} -} - -func (n *noopSubscriber) Topic() string { - return n.topic -} - -func (n *noopSubscriber) Subscriber() interface{} { - return n.h -} - -func (n *noopSubscriber) Endpoints() []*registry.Endpoint { - return nil -} - -func (n *noopSubscriber) Options() SubscriberOptions { - return n.opts -} - -func (n *noopHandler) Endpoints() []*registry.Endpoint { - return nil -} - -func (n *noopHandler) Handler() interface{} { - return nil -} - -func (n *noopHandler) Options() HandlerOptions { - return n.opts -} - -func (n *noopHandler) Name() string { - return "noop" -} - -func (n *noopServer) Handle(handler Handler) error { +func (n *NoopServer) Handle(handler Handler) error { n.h = handler return nil } -func (n *noopServer) Subscribe(subscriber Subscriber) error { - // n.s = handler +func (n *NoopServer) Subscribe(sb Subscriber) error { + sub, ok := sb.(*subscriber) + if !ok { + return fmt.Errorf("invalid subscriber: expected *subscriber") + } + if len(sub.handlers) == 0 { + return fmt.Errorf("invalid subscriber: no handler functions") + } + + if err := ValidateSubscriber(sb); err != nil { + return err + } + + n.Lock() + if _, ok = n.subscribers[sub]; ok { + n.Unlock() + return fmt.Errorf("subscriber %v already exists", sub) + } + + n.subscribers[sub] = nil + n.Unlock() return nil } -func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler { - options := NewHandlerOptions() - for _, o := range opts { - o(&options) - } - return &noopHandler{opts: options, h: h} +func (n *NoopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler { + return newRpcHandler(h, opts...) } -func (n *noopServer) NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber { - options := NewSubscriberOptions() - for _, o := range opts { - o(&options) - } - return &noopSubscriber{topic: topic, opts: options, h: h} +func (n *NoopServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber { + return newSubscriber(topic, sb, opts...) } -func (n *noopServer) Init(opts ...Option) error { +func (n *NoopServer) Init(opts ...Option) error { for _, o := range opts { o(&n.opts) } + + if n.handlers == nil { + n.handlers = make(map[string]Handler) + } + if n.subscribers == nil { + n.subscribers = make(map[*subscriber][]broker.Subscriber) + } + if n.exit == nil { + n.exit = make(chan chan error) + } + return nil } -func (n *noopServer) Start() error { - return nil -} - -func (n *noopServer) Stop() error { - return nil -} - -func (n *noopServer) Options() Options { +func (n *NoopServer) Options() Options { return n.opts } -func (n *noopServer) String() string { +func (n *NoopServer) String() string { return "noop" } -func newServer(opts ...Option) Server { - options := NewOptions() - for _, o := range opts { - o(&options) +func (n *NoopServer) Register() error { + n.RLock() + rsvc := n.rsvc + config := n.opts + n.RUnlock() + + // if service already filled, reuse it and return early + if rsvc != nil { + if err := DefaultRegisterFunc(rsvc, config); err != nil { + return err + } + return nil } - return &noopServer{opts: options} + + var err error + var service *registry.Service + var cacheService bool + + service, err = NewRegistryService(n) + if err != nil { + return err + } + + n.RLock() + // Maps are ordered randomly, sort the keys for consistency + var handlerList []string + for n, e := range n.handlers { + // Only advertise non internal handlers + if !e.Options().Internal { + handlerList = append(handlerList, n) + } + } + + sort.Strings(handlerList) + + var subscriberList []*subscriber + for e := range n.subscribers { + // Only advertise non internal subscribers + if !e.Options().Internal { + subscriberList = append(subscriberList, e) + } + } + sort.Slice(subscriberList, func(i, j int) bool { + return subscriberList[i].topic > subscriberList[j].topic + }) + + endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList)) + for _, h := range handlerList { + endpoints = append(endpoints, n.handlers[h].Endpoints()...) + } + for _, e := range subscriberList { + endpoints = append(endpoints, e.Endpoints()...) + } + n.RUnlock() + + service.Nodes[0].Metadata["protocol"] = "noop" + service.Nodes[0].Metadata["transport"] = "noop" + service.Endpoints = endpoints + + n.RLock() + registered := n.registered + n.RUnlock() + + if !registered { + if logger.V(logger.InfoLevel) { + logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) + } + } + + // register the service + if err := DefaultRegisterFunc(service, config); err != nil { + return err + } + + // already registered? don't need to register subscribers + if registered { + return nil + } + + n.Lock() + defer n.Unlock() + + cx := config.Context + + for sb := range n.subscribers { + handler := n.createSubHandler(sb, config) + var opts []broker.SubscribeOption + if queue := sb.Options().Queue; len(queue) > 0 { + opts = append(opts, broker.SubscribeGroup(queue)) + } + + if sb.Options().Context != nil { + cx = sb.Options().Context + } + + opts = append(opts, broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck)) + + if logger.V(logger.InfoLevel) { + logger.Infof("Subscribing to topic: %s", sb.Topic()) + } + sub, err := config.Broker.Subscribe(cx, sb.Topic(), handler, opts...) + if err != nil { + return err + } + n.subscribers[sb] = []broker.Subscriber{sub} + } + + n.registered = true + if cacheService { + n.rsvc = service + } + + return nil +} + +func (n *NoopServer) Deregister() error { + var err error + + n.RLock() + config := n.opts + n.RUnlock() + + service, err := NewRegistryService(n) + if err != nil { + return err + } + + if logger.V(logger.InfoLevel) { + logger.Infof("Deregistering node: %s", service.Nodes[0].Id) + } + + if err := DefaultDeregisterFunc(service, config); err != nil { + return err + } + + n.Lock() + n.rsvc = nil + + if !n.registered { + n.Unlock() + return nil + } + + n.registered = false + + cx := config.Context + + wg := sync.WaitGroup{} + for sb, subs := range n.subscribers { + for _, sub := range subs { + if sb.Options().Context != nil { + cx = sb.Options().Context + } + + wg.Add(1) + go func(s broker.Subscriber) { + defer wg.Done() + if logger.V(logger.InfoLevel) { + logger.Infof("Unsubscribing from topic: %s", s.Topic()) + } + if err := s.Unsubscribe(cx); err != nil { + if logger.V(logger.ErrorLevel) { + logger.Errorf("Unsubscribing from topic: %s err: %v", s.Topic(), err) + } + } + }(sub) + } + n.subscribers[sb] = nil + } + wg.Wait() + + n.Unlock() + return nil +} + +func (n *NoopServer) Start() error { + n.RLock() + if n.started { + n.RUnlock() + return nil + } + config := n.Options() + n.RUnlock() + + if logger.V(logger.InfoLevel) { + logger.Infof("Server [noop] Listening on %s", config.Address) + } + n.Lock() + if len(config.Advertise) == 0 { + config.Advertise = config.Address + } + n.Unlock() + + // only connect if we're subscribed + if len(n.subscribers) > 0 { + // connect to the broker + if err := config.Broker.Connect(config.Context); err != nil { + if logger.V(logger.ErrorLevel) { + logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err) + } + return err + } + + if logger.V(logger.InfoLevel) { + logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()) + } + } + + // use RegisterCheck func before register + if err := config.RegisterCheck(config.Context); err != nil { + if logger.V(logger.ErrorLevel) { + logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err) + } + } else { + // announce self to the world + if err := n.Register(); err != nil { + if logger.V(logger.ErrorLevel) { + logger.Errorf("Server register error: %v", err) + } + } + } + + go func() { + t := new(time.Ticker) + + // only process if it exists + if config.RegisterInterval > time.Duration(0) { + // new ticker + t = time.NewTicker(config.RegisterInterval) + } + + // return error chan + var ch chan error + + Loop: + for { + select { + // register self on interval + case <-t.C: + n.RLock() + registered := n.registered + n.RUnlock() + rerr := config.RegisterCheck(config.Context) + if rerr != nil && registered { + if logger.V(logger.ErrorLevel) { + logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr) + } + // deregister self in case of error + if err := n.Deregister(); err != nil { + if logger.V(logger.ErrorLevel) { + logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err) + } + } + } else if rerr != nil && !registered { + if logger.V(logger.ErrorLevel) { + logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, rerr) + } + continue + } + if err := n.Register(); err != nil { + if logger.V(logger.ErrorLevel) { + logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err) + } + } + // wait for exit + case ch = <-n.exit: + break Loop + } + } + + // deregister self + if err := n.Deregister(); err != nil { + if logger.V(logger.ErrorLevel) { + logger.Error("Server deregister error: ", err) + } + } + + // wait for waitgroup + if n.wg != nil { + n.wg.Wait() + } + + // stop the grpc server + exit := make(chan bool) + + go func() { + close(exit) + }() + + select { + case <-exit: + } + + // close transport + ch <- nil + + if logger.V(logger.InfoLevel) { + logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address()) + } + // disconnect broker + if err := config.Broker.Disconnect(config.Context); err != nil { + if logger.V(logger.ErrorLevel) { + logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err) + } + } + }() + + // mark the server as started + n.Lock() + n.started = true + n.Unlock() + + return nil +} + +func (n *NoopServer) Stop() error { + n.RLock() + if !n.started { + n.RUnlock() + return nil + } + n.RUnlock() + + ch := make(chan error) + n.exit <- ch + + err := <-ch + n.Lock() + n.rsvc = nil + n.started = false + n.Unlock() + + return err +} + +type noopcodec struct { + *bytes.Buffer +} + +func (c noopcodec) Close() error { + return nil } diff --git a/server/options.go b/server/options.go index c1c246c1..ef6a0298 100644 --- a/server/options.go +++ b/server/options.go @@ -39,6 +39,10 @@ type Options struct { RegisterTTL time.Duration // The interval on which to register RegisterInterval time.Duration + // RegisterAttempts specify how many times try to register + RegisterAttempts int + // DeegisterAttempts specify how many times try to deregister + DeregisterAttempts int // The router for requests Router Router @@ -61,6 +65,8 @@ func NewOptions(opts ...Option) Options { RegisterInterval: DefaultRegisterInterval, RegisterTTL: DefaultRegisterTTL, RegisterCheck: DefaultRegisterCheck, + Logger: logger.DefaultLogger, + Tracer: tracer.DefaultTracer, Broker: broker.DefaultBroker, Registry: registry.DefaultRegistry, Address: DefaultAddress, @@ -255,3 +261,94 @@ func WrapSubscriber(w SubscriberWrapper) Option { o.SubWrappers = append(o.SubWrappers, w) } } + +type HandlerOption func(*HandlerOptions) + +type HandlerOptions struct { + Internal bool + Metadata map[string]map[string]string + Context context.Context +} + +func NewHandlerOptions(opts ...HandlerOption) HandlerOptions { + options := HandlerOptions{ + Context: context.Background(), + } + + for _, o := range opts { + o(&options) + } + + return options +} + +type SubscriberOption func(*SubscriberOptions) + +type SubscriberOptions struct { + // AutoAck defaults to true. When a handler returns + // with a nil error the message is acked. + AutoAck bool + Queue string + Internal bool + Context context.Context +} + +func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions { + options := SubscriberOptions{ + AutoAck: true, + Context: context.Background(), + } + + for _, o := range opts { + o(&options) + } + + return options +} + +// EndpointMetadata is a Handler option that allows metadata to be added to +// individual endpoints. +func EndpointMetadata(name string, md map[string]string) HandlerOption { + return func(o *HandlerOptions) { + o.Metadata[name] = md + } +} + +// Internal Handler options specifies that a handler is not advertised +// to the discovery system. In the future this may also limit request +// to the internal network or authorised user. +func InternalHandler(b bool) HandlerOption { + return func(o *HandlerOptions) { + o.Internal = b + } +} + +// Internal Subscriber options specifies that a subscriber is not advertised +// to the discovery system. +func InternalSubscriber(b bool) SubscriberOption { + return func(o *SubscriberOptions) { + o.Internal = b + } +} + +// DisableAutoAck will disable auto acking of messages +// after they have been handled. +func DisableAutoAck() SubscriberOption { + return func(o *SubscriberOptions) { + o.AutoAck = false + } +} + +// Shared queue name distributed messages across subscribers +func SubscriberQueue(n string) SubscriberOption { + return func(o *SubscriberOptions) { + o.Queue = n + } +} + +// SubscriberContext set context options to allow broker SubscriberOption passed +func SubscriberContext(ctx context.Context) SubscriberOption { + return func(o *SubscriberOptions) { + o.Context = ctx + } +} diff --git a/server/registry.go b/server/registry.go new file mode 100644 index 00000000..191b4e3b --- /dev/null +++ b/server/registry.go @@ -0,0 +1,89 @@ +package server + +import ( + "net" + "time" + + "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/util/addr" + "github.com/unistack-org/micro/v3/util/backoff" +) + +var ( + // DefaultRegisterFunc uses backoff to register service + DefaultRegisterFunc = func(service *registry.Service, config Options) error { + var err error + + opts := []registry.RegisterOption{ + registry.RegisterTTL(config.RegisterTTL), + registry.RegisterDomain(config.Namespace), + } + + for i := 0; i <= config.RegisterAttempts; i++ { + err = config.Registry.Register(service, opts...) + if err == nil { + break + } + // backoff then retry + time.Sleep(backoff.Do(i + 1)) + continue + } + return err + } + // DefaultDeregisterFunc uses backoff to deregister service + DefaultDeregisterFunc = func(service *registry.Service, config Options) error { + var err error + + opts := []registry.DeregisterOption{ + registry.DeregisterDomain(config.Namespace), + } + + for i := 0; i <= config.DeregisterAttempts; i++ { + err = config.Registry.Deregister(service, opts...) + if err == nil { + break + } + // backoff then retry + time.Sleep(backoff.Do(i + 1)) + continue + } + return err + } +) + +func NewRegistryService(s Server) (*registry.Service, error) { + opts := s.Options() + + advt := opts.Address + if len(opts.Advertise) > 0 { + advt = opts.Advertise + } + + host, port, err := net.SplitHostPort(advt) + if err != nil { + return nil, err + } + + addr, err := addr.Extract(host) + if err != nil { + addr = host + } + + node := ®istry.Node{ + Id: opts.Name + "-" + opts.Id, + Address: net.JoinHostPort(addr, port), + } + node.Metadata = metadata.Copy(opts.Metadata) + + node.Metadata["server"] = s.String() + node.Metadata["broker"] = opts.Broker.String() + node.Metadata["registry"] = opts.Registry.String() + + return ®istry.Service{ + Name: opts.Name, + Version: opts.Version, + Nodes: []*registry.Node{node}, + Metadata: metadata.New(0), + }, nil +} diff --git a/server/request.go b/server/request.go new file mode 100644 index 00000000..2e0472ba --- /dev/null +++ b/server/request.go @@ -0,0 +1,90 @@ +package server + +import ( + raw "github.com/unistack-org/micro-codec-bytes" + "github.com/unistack-org/micro/v3/codec" +) + +type rpcRequest struct { + service string + method string + contentType string + codec codec.Codec + header map[string]string + body []byte + stream bool + payload interface{} +} + +type rpcMessage struct { + topic string + contentType string + payload interface{} + header map[string]string + body []byte + codec codec.Codec +} + +func (r *rpcRequest) ContentType() string { + return r.contentType +} + +func (r *rpcRequest) Service() string { + return r.service +} + +func (r *rpcRequest) Method() string { + return r.method +} + +func (r *rpcRequest) Endpoint() string { + return r.method +} + +func (r *rpcRequest) Codec() codec.Reader { + return r.codec +} + +func (r *rpcRequest) Header() map[string]string { + return r.header +} + +func (r *rpcRequest) Read() ([]byte, error) { + f := &raw.Frame{} + if err := r.codec.ReadBody(f); err != nil { + return nil, err + } + return f.Data, nil +} + +func (r *rpcRequest) Stream() bool { + return r.stream +} + +func (r *rpcRequest) Body() interface{} { + return r.payload +} + +func (r *rpcMessage) ContentType() string { + return r.contentType +} + +func (r *rpcMessage) Topic() string { + return r.topic +} + +func (r *rpcMessage) Payload() interface{} { + return r.payload +} + +func (r *rpcMessage) Header() map[string]string { + return r.header +} + +func (r *rpcMessage) Body() []byte { + return r.body +} + +func (r *rpcMessage) Codec() codec.Reader { + return r.codec +} diff --git a/server/server.go b/server/server.go index 87387275..d8901833 100644 --- a/server/server.go +++ b/server/server.go @@ -11,7 +11,7 @@ import ( ) var ( - DefaultServer Server = newServer() + DefaultServer Server = &NoopServer{opts: NewOptions()} ) // Server is a simple micro server abstraction diff --git a/server/subscriber.go b/server/subscriber.go index bcff0e66..cfeca8ba 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -1,10 +1,20 @@ package server import ( + "bytes" + "context" "fmt" "reflect" + "runtime/debug" + "strings" "unicode" "unicode/utf8" + + "github.com/unistack-org/micro/v3/broker" + "github.com/unistack-org/micro/v3/errors" + "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/metadata" + "github.com/unistack-org/micro/v3/registry" ) const ( @@ -17,6 +27,22 @@ var ( typeOfError = reflect.TypeOf((*error)(nil)).Elem() ) +type handler struct { + method reflect.Value + reqType reflect.Type + ctxType reflect.Type +} + +type subscriber struct { + topic string + rcvr reflect.Value + typ reflect.Type + subscriber interface{} + handlers []*handler + endpoints []*registry.Endpoint + opts SubscriberOptions +} + // Is this an exported - upper case - name? func isExported(name string) bool { rune, _ := utf8.DecodeRuneInString(name) @@ -86,3 +112,200 @@ func ValidateSubscriber(sub Subscriber) error { return nil } + +func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber { + var endpoints []*registry.Endpoint + var handlers []*handler + + options := NewSubscriberOptions(opts...) + + if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { + h := &handler{ + method: reflect.ValueOf(sub), + } + + switch typ.NumIn() { + case 1: + h.reqType = typ.In(0) + case 2: + h.ctxType = typ.In(0) + h.reqType = typ.In(1) + } + + handlers = append(handlers, h) + + endpoints = append(endpoints, ®istry.Endpoint{ + Name: "Func", + Request: registry.ExtractSubValue(typ), + Metadata: map[string]string{ + "topic": topic, + "subscriber": "true", + }, + }) + } else { + hdlr := reflect.ValueOf(sub) + name := reflect.Indirect(hdlr).Type().Name() + + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + h := &handler{ + method: method.Func, + } + + switch method.Type.NumIn() { + case 2: + h.reqType = method.Type.In(1) + case 3: + h.ctxType = method.Type.In(1) + h.reqType = method.Type.In(2) + } + + handlers = append(handlers, h) + + endpoints = append(endpoints, ®istry.Endpoint{ + Name: name + "." + method.Name, + Request: registry.ExtractSubValue(method.Type), + Metadata: map[string]string{ + "topic": topic, + "subscriber": "true", + }, + }) + } + } + + return &subscriber{ + rcvr: reflect.ValueOf(sub), + typ: reflect.TypeOf(sub), + topic: topic, + subscriber: sub, + handlers: handlers, + endpoints: endpoints, + opts: options, + } +} + +func (n *NoopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { + return func(p broker.Event) (err error) { + defer func() { + if r := recover(); r != nil { + if logger.V(logger.ErrorLevel) { + logger.Error("panic recovered: ", r) + logger.Error(string(debug.Stack())) + } + err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r) + } + }() + + msg := p.Message() + // if we don't have headers, create empty map + if msg.Header == nil { + msg.Header = make(map[string]string) + } + + ct := msg.Header["Content-Type"] + if len(ct) == 0 { + msg.Header["Content-Type"] = defaultContentType + ct = defaultContentType + } + cf, err := n.newCodec(ct) + if err != nil { + return err + } + + hdr := make(map[string]string, len(msg.Header)) + for k, v := range msg.Header { + hdr[k] = v + } + delete(hdr, "Content-Type") + ctx := metadata.NewContext(sb.opts.Context, hdr) + + results := make(chan error, len(sb.handlers)) + + for i := 0; i < len(sb.handlers); i++ { + handler := sb.handlers[i] + + var isVal bool + var req reflect.Value + + if handler.reqType.Kind() == reflect.Ptr { + req = reflect.New(handler.reqType.Elem()) + } else { + req = reflect.New(handler.reqType) + isVal = true + } + if isVal { + req = req.Elem() + } + + if err = cf(noopcodec{bytes.NewBuffer(msg.Body)}).ReadBody(req.Interface()); err != nil { + return err + } + + fn := func(ctx context.Context, msg Message) error { + var vals []reflect.Value + if sb.typ.Kind() != reflect.Func { + vals = append(vals, sb.rcvr) + } + if handler.ctxType != nil { + vals = append(vals, reflect.ValueOf(ctx)) + } + + vals = append(vals, reflect.ValueOf(msg.Payload())) + + returnValues := handler.method.Call(vals) + if rerr := returnValues[0].Interface(); rerr != nil { + return rerr.(error) + } + return nil + } + + for i := len(opts.SubWrappers); i > 0; i-- { + fn = opts.SubWrappers[i-1](fn) + } + + if n.wg != nil { + n.wg.Add(1) + } + go func() { + if n.wg != nil { + defer n.wg.Done() + } + err := fn(ctx, &rpcMessage{ + topic: sb.topic, + contentType: ct, + payload: req.Interface(), + header: msg.Header, + body: msg.Body, + }) + results <- err + }() + } + var errors []string + for i := 0; i < len(sb.handlers); i++ { + if rerr := <-results; rerr != nil { + errors = append(errors, rerr.Error()) + } + } + if len(errors) > 0 { + err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) + } + + return err + } +} + +func (s *subscriber) Topic() string { + return s.topic +} + +func (s *subscriber) Subscriber() interface{} { + return s.subscriber +} + +func (s *subscriber) Endpoints() []*registry.Endpoint { + return s.endpoints +} + +func (s *subscriber) Options() SubscriberOptions { + return s.opts +} diff --git a/service.go b/service.go index ee6d229b..854e1cc2 100644 --- a/service.go +++ b/service.go @@ -5,9 +5,13 @@ import ( "sync" cmd "github.com/unistack-org/micro-config-cmd" + "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/network/transport" + "github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/server" + "github.com/unistack-org/micro/v3/store" ) type service struct { @@ -31,84 +35,94 @@ func (s *service) Name() string { // Init initialises options. Additionally it calls cmd.Init // which parses command line flags. cmd.Init is only called // on first Init. -func (s *service) Init(opts ...Option) { +func (s *service) Init(opts ...Option) error { // process options for _, o := range opts { o(&s.opts) } - s.once.Do(func() { - if s.opts.Cmd != nil { - // set cmd name - if len(s.opts.Cmd.App().Name) == 0 { - s.opts.Cmd.App().Name = s.Server().Options().Name - } - - // Initialise the command options - if err := s.opts.Cmd.Init( - cmd.Auth(&s.opts.Auth), - cmd.Broker(&s.opts.Broker), - cmd.Registry(&s.opts.Registry), - cmd.Runtime(&s.opts.Runtime), - cmd.Transport(&s.opts.Transport), - cmd.Client(&s.opts.Client), - cmd.Config(&s.opts.Config), - cmd.Server(&s.opts.Server), - cmd.Store(&s.opts.Store), - cmd.Profile(&s.opts.Profile), - ); err != nil { - logger.Fatalf("[cmd] init failed: %v", err) - } + if s.opts.Cmd != nil { + // set cmd name + if len(s.opts.Cmd.App().Name) == 0 { + s.opts.Cmd.App().Name = s.Server().Options().Name } - }) + + // Initialise the command options + if err := s.opts.Cmd.Init( + cmd.Auth(&s.opts.Auth), + cmd.Broker(&s.opts.Broker), + cmd.Registry(&s.opts.Registry), + cmd.Runtime(&s.opts.Runtime), + cmd.Transport(&s.opts.Transport), + cmd.Client(&s.opts.Client), + cmd.Config(&s.opts.Config), + cmd.Server(&s.opts.Server), + cmd.Store(&s.opts.Store), + cmd.Profile(&s.opts.Profile), + ); err != nil { + return err + } + } if s.opts.Registry != nil { - if err := s.opts.Registry.Init(); err != nil { - logger.Fatalf("[cmd] init failed: %v", err) + if err := s.opts.Registry.Init( + registry.Context(s.opts.Context), + ); err != nil { + return err } } if s.opts.Broker != nil { - if err := s.opts.Broker.Init(); err != nil { - logger.Fatalf("[cmd] init failed: %v", err) + if err := s.opts.Broker.Init( + broker.Context(s.opts.Context), + ); err != nil { + return err } } if s.opts.Transport != nil { - if err := s.opts.Transport.Init(); err != nil { - logger.Fatalf("[cmd] init failed: %v", err) + if err := s.opts.Transport.Init( + transport.Context(s.opts.Context), + ); err != nil { + return err } } if s.opts.Store != nil { - if err := s.opts.Store.Init(s.opts.Context); err != nil { - logger.Fatalf("[cmd] init failed: %v", err) + if err := s.opts.Store.Init( + store.Context(s.opts.Context), + ); err != nil { + return err } } if s.opts.Server != nil { - if err := s.opts.Server.Init(); err != nil { - logger.Fatalf("[cmd] init failed: %v", err) + if err := s.opts.Server.Init( + server.Context(s.opts.Context), + ); err != nil { + return err } } if s.opts.Client != nil { - if err := s.opts.Client.Init(); err != nil { - logger.Fatalf("[cmd] init failed: %v", err) + if err := s.opts.Client.Init( + client.Context(s.opts.Context), + ); err != nil { + return err } } - // execute the command - // TODO: do this in service.Run() - if err := s.opts.Cmd.Run(); err != nil { - logger.Fatalf("[cmd] run failed: %v", err) - } + return nil } func (s *service) Options() Options { return s.opts } +func (s *service) Broker() broker.Broker { + return s.opts.Broker +} + func (s *service) Client() client.Client { return s.opts.Client } @@ -185,6 +199,12 @@ func (s *service) Run() error { defer s.opts.Profile.Stop() } + if s.opts.Cmd != nil { + if err := s.opts.Cmd.Run(); err != nil { + return err + } + } + if err := s.Start(); err != nil { return err } diff --git a/store/noop.go b/store/noop.go index 6f3b2d5d..d71927eb 100644 --- a/store/noop.go +++ b/store/noop.go @@ -2,49 +2,45 @@ package store import "context" -type noopStore struct { +type NoopStore struct { opts Options } -func newStore(opts ...Option) Store { - options := NewOptions() - for _, o := range opts { - o(&options) - } - return &noopStore{opts: options} -} - -func (n *noopStore) Init(ctx context.Context, opts ...Option) error { +func (n *NoopStore) Init(opts ...Option) error { for _, o := range opts { o(&n.opts) } return nil } -func (n *noopStore) Options() Options { +func (n *NoopStore) Options() Options { return n.opts } -func (n *noopStore) String() string { +func (n *NoopStore) String() string { return "noop" } -func (n *noopStore) Read(ctx context.Context, key string, opts ...ReadOption) ([]*Record, error) { +func (n *NoopStore) Read(ctx context.Context, key string, opts ...ReadOption) ([]*Record, error) { return []*Record{}, nil } -func (n *noopStore) Write(ctx context.Context, r *Record, opts ...WriteOption) error { +func (n *NoopStore) Write(ctx context.Context, r *Record, opts ...WriteOption) error { return nil } -func (n *noopStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error { +func (n *NoopStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error { return nil } -func (n *noopStore) List(ctx context.Context, opts ...ListOption) ([]string, error) { +func (n *NoopStore) List(ctx context.Context, opts ...ListOption) ([]string, error) { return []string{}, nil } -func (n *noopStore) Close(ctx context.Context) error { +func (n *NoopStore) Connect(ctx context.Context) error { + return nil +} + +func (n *NoopStore) Disconnect(ctx context.Context) error { return nil } diff --git a/store/options.go b/store/options.go index b8dcf28d..4ebf9ecc 100644 --- a/store/options.go +++ b/store/options.go @@ -37,6 +37,12 @@ func NewOptions(opts ...Option) Options { // Option sets values in Options type Option func(o *Options) +func Context(ctx context.Context) Option { + return func(o *Options) { + o.Context = ctx + } +} + // Logger sets the logger func Logger(l logger.Logger) Option { return func(o *Options) { diff --git a/store/store.go b/store/store.go index 131d2a0a..6f17e973 100644 --- a/store/store.go +++ b/store/store.go @@ -11,13 +11,14 @@ import ( var ( // ErrNotFound is returned when a key doesn't exist ErrNotFound = errors.New("not found") - DefaultStore Store = newStore() + DefaultStore Store = &NoopStore{opts: NewOptions()} ) // Store is a data storage interface type Store interface { // Init initialises the store. It must perform any required setup on the backing storage implementation and check that it is ready for use, returning any errors. - Init(ctx context.Context, opts ...Option) error + Init(opts ...Option) error + Connect(ctx context.Context) error // Options allows you to view the current options. Options() Options // Read takes a single key name and optional ReadOptions. It returns matching []*Record or an error. @@ -28,8 +29,8 @@ type Store interface { Delete(ctx context.Context, key string, opts ...DeleteOption) error // List returns any keys that match, or an empty list with no error if none matched. List(ctx context.Context, opts ...ListOption) ([]string, error) - // Close the store - Close(ctx context.Context) error + // Disconnect the store + Disconnect(ctx context.Context) error // String returns the name of the implementation. String() string } diff --git a/tracer/context.go b/tracer/context.go new file mode 100644 index 00000000..a2fd4372 --- /dev/null +++ b/tracer/context.go @@ -0,0 +1,36 @@ +// Package trace provides an interface for distributed tracing +package tracer + +import ( + "context" + + "github.com/unistack-org/micro/v3/metadata" +) + +const ( + traceIDKey = "Micro-Trace-Id" + spanIDKey = "Micro-Span-Id" +) + +// FromContext returns a span from context +func FromContext(ctx context.Context) (traceID string, parentSpanID string, isFound bool) { + traceID, traceOk := metadata.Get(ctx, traceIDKey) + microID, microOk := metadata.Get(ctx, "Micro-Id") + if !traceOk && !microOk { + isFound = false + return + } + if !traceOk { + traceID = microID + } + parentSpanID, ok := metadata.Get(ctx, spanIDKey) + return traceID, parentSpanID, ok +} + +// NewContext saves the trace and span ids in the context +func NewContext(ctx context.Context, traceID, parentSpanID string) context.Context { + return metadata.MergeContext(ctx, map[string]string{ + traceIDKey: traceID, + spanIDKey: parentSpanID, + }, true) +} diff --git a/tracer/noop.go b/tracer/noop.go index 4443dcce..3acac73e 100644 --- a/tracer/noop.go +++ b/tracer/noop.go @@ -2,24 +2,24 @@ package tracer import "context" -type noop struct{} +type NoopTracer struct{} -func (n *noop) Init(...Option) error { +func (n *NoopTracer) Init(...Option) error { return nil } -func (n *noop) Start(ctx context.Context, name string) (context.Context, *Span) { +func (n *NoopTracer) Start(ctx context.Context, name string) (context.Context, *Span) { return nil, nil } -func (n *noop) Finish(*Span) error { +func (n *NoopTracer) Finish(*Span) error { return nil } -func (n *noop) Read(...ReadOption) ([]*Span, error) { +func (n *NoopTracer) Read(...ReadOption) ([]*Span, error) { return nil, nil } -func newTracer(opts ...Option) Tracer { - return &noop{} +func NewTracer(opts ...Option) Tracer { + return &NoopTracer{} } diff --git a/tracer/trace.go b/tracer/trace.go index d4a91a89..02004eee 100644 --- a/tracer/trace.go +++ b/tracer/trace.go @@ -4,12 +4,10 @@ package tracer import ( "context" "time" - - "github.com/unistack-org/micro/v3/metadata" ) var ( - DefaultTracer Tracer = newTracer() + DefaultTracer Tracer = NewTracer() ) // Tracer is an interface for distributed tracing @@ -51,31 +49,3 @@ type Span struct { // Type Type SpanType } - -const ( - traceIDKey = "Micro-Trace-Id" - spanIDKey = "Micro-Span-Id" -) - -// FromContext returns a span from context -func FromContext(ctx context.Context) (traceID string, parentSpanID string, isFound bool) { - traceID, traceOk := metadata.Get(ctx, traceIDKey) - microID, microOk := metadata.Get(ctx, "Micro-Id") - if !traceOk && !microOk { - isFound = false - return - } - if !traceOk { - traceID = microID - } - parentSpanID, ok := metadata.Get(ctx, spanIDKey) - return traceID, parentSpanID, ok -} - -// ToContext saves the trace and span ids in the context -func ToContext(ctx context.Context, traceID, parentSpanID string) context.Context { - return metadata.MergeContext(ctx, map[string]string{ - traceIDKey: traceID, - spanIDKey: parentSpanID, - }, true) -} diff --git a/util/registry/util.go b/util/registry/util.go index 9fa94ed9..41b51f7e 100644 --- a/util/registry/util.go +++ b/util/registry/util.go @@ -1,12 +1,7 @@ package registry import ( - "net" - - "github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/registry" - "github.com/unistack-org/micro/v3/server" - "github.com/unistack-org/micro/v3/util/addr" ) func addNodes(old, neu []*registry.Node) []*registry.Node { @@ -60,13 +55,13 @@ func delNodes(old, del []*registry.Node) []*registry.Node { // CopyService make a copy of service func CopyService(service *registry.Service) *registry.Service { // copy service - s := new(registry.Service) + s := ®istry.Service{} *s = *service // copy nodes nodes := make([]*registry.Node, len(service.Nodes)) for j, node := range service.Nodes { - n := new(registry.Node) + n := ®istry.Node{} *n = *node nodes[j] = n } @@ -75,7 +70,7 @@ func CopyService(service *registry.Service) *registry.Service { // copy endpoints eps := make([]*registry.Endpoint, len(service.Endpoints)) for j, ep := range service.Endpoints { - e := new(registry.Endpoint) + e := ®istry.Endpoint{} *e = *ep eps[j] = e } @@ -100,7 +95,7 @@ func Merge(olist []*registry.Service, nlist []*registry.Service) []*registry.Ser var seen bool for _, o := range olist { if o.Version == n.Version { - sp := new(registry.Service) + sp := ®istry.Service{} // make copy *sp = *o // set nodes @@ -111,7 +106,7 @@ func Merge(olist []*registry.Service, nlist []*registry.Service) []*registry.Ser srv = append(srv, sp) break } else { - sp := new(registry.Service) + sp := ®istry.Service{} // make copy *sp = *o srv = append(srv, sp) @@ -129,7 +124,7 @@ func Remove(old, del []*registry.Service) []*registry.Service { var services []*registry.Service for _, o := range old { - srv := new(registry.Service) + srv := ®istry.Service{} *srv = *o var rem bool @@ -151,38 +146,3 @@ func Remove(old, del []*registry.Service) []*registry.Service { return services } - -func NewService(s server.Server) (*registry.Service, error) { - opts := s.Options() - - advt := opts.Address - if len(opts.Advertise) > 0 { - advt = opts.Advertise - } - - host, port, err := net.SplitHostPort(advt) - if err != nil { - return nil, err - } - - addr, err := addr.Extract(host) - if err != nil { - addr = host - } - - node := ®istry.Node{ - Id: opts.Name + "-" + opts.Id, - Address: net.JoinHostPort(addr, port), - } - node.Metadata = metadata.Copy(opts.Metadata) - - node.Metadata["server"] = s.String() - node.Metadata["broker"] = opts.Broker.String() - node.Metadata["registry"] = opts.Registry.String() - - return ®istry.Service{ - Name: opts.Name, - Version: opts.Version, - Nodes: []*registry.Node{node}, - }, nil -} diff --git a/util/sync/sync.go b/util/sync/sync.go index 38bf4b36..1667e6a5 100644 --- a/util/sync/sync.go +++ b/util/sync/sync.go @@ -42,12 +42,20 @@ func NewSync(opts ...Option) Sync { return c } +func (c *syncStore) Connect(ctx context.Context) error { + return nil +} + +func (c *syncStore) Disconnect(ctx context.Context) error { + return nil +} + func (c *syncStore) Close(ctx context.Context) error { return nil } // Init initialises the storeOptions -func (c *syncStore) Init(ctx context.Context, opts ...store.Option) error { +func (c *syncStore) Init(opts ...store.Option) error { for _, o := range opts { o(&c.storeOpts) } @@ -55,7 +63,7 @@ func (c *syncStore) Init(ctx context.Context, opts ...store.Option) error { return fmt.Errorf("the sync has no stores") } for _, s := range c.syncOpts.Stores { - if err := s.Init(ctx); err != nil { + if err := s.Init(); err != nil { return fmt.Errorf("Store %s failed to Init(): %w", s.String(), err) } }