From 827d4670779d79e7b5c3372fd46d94935e3de559 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 29 Jan 2021 13:17:32 +0300 Subject: [PATCH] micro: rewrite options to support multiple building blocks Signed-off-by: Vasiliy Tolstov --- api/api.go | 4 +- api/handler/http/http.go | 4 +- api/handler/http/http_test.go | 14 +- api/handler/web/web.go | 4 +- api/resolver/options.go | 4 +- api/router/options.go | 10 +- broker/broker.go | 1 + broker/noop.go | 4 + broker/options.go | 22 +- client/client.go | 1 + client/noop.go | 4 + client/options.go | 16 +- context.go | 22 + event.go | 11 + events/events.go | 2 +- function.go | 21 +- function_test.go | 6 +- go.sum | 1 + metadata/metadata.go | 5 + meter/README.md | 22 - meter/meter.go | 1 + meter/meter_test.go | 6 +- meter/noop.go | 4 + meter/options.go | 1 + meter/wrapper/wrapper.go | 232 +++++++ micro.go | 119 ---- network/tunnel/broker/broker.go | 4 + options.go | 617 ++++++++++++++----- {registry => register}/context.go | 16 +- {registry => register}/extractor.go | 2 +- {registry => register}/extractor_test.go | 2 +- register/noop.go | 85 +++ {registry => register}/options.go | 19 +- registry/registry.go => register/register.go | 27 +- {registry => register}/watcher.go | 12 +- registry/noop.go | 81 --- resolver/registry/registry.go | 14 +- router/options.go | 22 +- router/router.go | 1 + server/handler.go | 10 +- server/noop.go | 18 +- server/options.go | 14 +- server/registry.go | 32 +- server/server.go | 10 +- server/subscriber.go | 16 +- service.go | 278 ++++++--- store/noop.go | 5 + store/options.go | 1 + store/store.go | 1 + tracer/noop.go | 9 + tracer/options.go | 1 + tracer/{trace.go => tracer.go} | 4 + util/http/http_test.go | 14 +- util/registry/util.go | 44 +- util/registry/util_test.go | 18 +- util/router/router.go | 4 +- util/sync/sync.go | 5 + 57 files changed, 1283 insertions(+), 644 deletions(-) create mode 100644 context.go delete mode 100644 meter/README.md create mode 100644 meter/wrapper/wrapper.go delete mode 100644 micro.go rename {registry => register}/context.go (53%) rename {registry => register}/extractor.go (99%) rename {registry => register}/extractor_test.go (98%) create mode 100644 register/noop.go rename {registry => register}/options.go (93%) rename registry/registry.go => register/register.go (76%) rename {registry => register}/watcher.go (83%) delete mode 100644 registry/noop.go rename tracer/{trace.go => tracer.go} (90%) diff --git a/api/api.go b/api/api.go index bbea801f..467781f4 100644 --- a/api/api.go +++ b/api/api.go @@ -6,7 +6,7 @@ import ( "strings" "github.com/unistack-org/micro/v3/metadata" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/server" ) @@ -56,7 +56,7 @@ type Service struct { // The endpoint for this service Endpoint *Endpoint // Versions of this service - Services []*registry.Service + Services []*register.Service } func strip(s string) string { diff --git a/api/handler/http/http.go b/api/handler/http/http.go index 021c93f9..4ab499f3 100644 --- a/api/handler/http/http.go +++ b/api/handler/http/http.go @@ -11,7 +11,7 @@ import ( "github.com/unistack-org/micro/v3/api" "github.com/unistack-org/micro/v3/api/handler" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" ) const ( @@ -70,7 +70,7 @@ func (h *httpHandler) getService(r *http.Request) (string, error) { } // get the nodes for this service - nodes := make([]*registry.Node, 0, len(service.Services)) + nodes := make([]*register.Node, 0, len(service.Services)) for _, srv := range service.Services { nodes = append(nodes, srv.Nodes...) } diff --git a/api/handler/http/http_test.go b/api/handler/http/http_test.go index 6d8e34c1..65b4a17d 100644 --- a/api/handler/http/http_test.go +++ b/api/handler/http/http_test.go @@ -12,13 +12,13 @@ import ( "github.com/unistack-org/micro/v3/api/resolver" "github.com/unistack-org/micro/v3/api/resolver/vpath" "github.com/unistack-org/micro/v3/api/router" - regRouter "github.com/unistack-org/micro/v3/api/router/registry" - "github.com/unistack-org/micro/v3/registry" - "github.com/unistack-org/micro/v3/registry/memory" + regRouter "github.com/unistack-org/micro/v3/api/router/register" + "github.com/unistack-org/micro/v3/register" + "github.com/unistack-org/micro/v3/register/memory" ) func testHttp(t *testing.T, path, service, ns string) { - r := memory.NewRegistry() + r := memory.NewRegister() l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -26,9 +26,9 @@ func testHttp(t *testing.T, path, service, ns string) { } defer l.Close() - s := ®istry.Service{ + s := ®ister.Service{ Name: service, - Nodes: []*registry.Node{ + Nodes: []*register.Node{ { Id: service + "-1", Address: l.Addr().String(), @@ -58,7 +58,7 @@ func testHttp(t *testing.T, path, service, ns string) { // initialise the handler rt := regRouter.NewRouter( router.WithHandler("http"), - router.WithRegistry(r), + router.WithRegister(r), router.WithResolver(vpath.NewResolver( resolver.WithServicePrefix(ns), )), diff --git a/api/handler/web/web.go b/api/handler/web/web.go index d4052341..47cd47bc 100644 --- a/api/handler/web/web.go +++ b/api/handler/web/web.go @@ -14,7 +14,7 @@ import ( "github.com/unistack-org/micro/v3/api" "github.com/unistack-org/micro/v3/api/handler" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" ) const ( @@ -72,7 +72,7 @@ func (wh *webHandler) getService(r *http.Request) (string, error) { } // get the nodes - nodes := make([]*registry.Node, 0, len(service.Services)) + nodes := make([]*register.Node, 0, len(service.Services)) for _, srv := range service.Services { nodes = append(nodes, srv.Nodes...) } diff --git a/api/resolver/options.go b/api/resolver/options.go index 07ec21a6..861ae00b 100644 --- a/api/resolver/options.go +++ b/api/resolver/options.go @@ -3,7 +3,7 @@ package resolver import ( "context" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" ) // Options struct @@ -58,7 +58,7 @@ func Domain(n string) ResolveOption { // NewResolveOptions returns new initialised resolve options func NewResolveOptions(opts ...ResolveOption) ResolveOptions { - options := ResolveOptions{Domain: registry.DefaultDomain} + options := ResolveOptions{Domain: register.DefaultDomain} for _, o := range opts { o(&options) } diff --git a/api/router/options.go b/api/router/options.go index aec6887e..1ec87d8b 100644 --- a/api/router/options.go +++ b/api/router/options.go @@ -6,12 +6,12 @@ import ( "github.com/unistack-org/micro/v3/api/resolver" "github.com/unistack-org/micro/v3/api/resolver/vpath" "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" ) type Options struct { Handler string - Registry registry.Registry + Register register.Register Resolver resolver.Resolver Logger logger.Logger Context context.Context @@ -52,10 +52,10 @@ func WithHandler(h string) Option { } } -// WithRegistry sets the registry -func WithRegistry(r registry.Registry) Option { +// WithRegister sets the register +func WithRegister(r register.Register) Option { return func(o *Options) { - o.Registry = r + o.Register = r } } diff --git a/broker/broker.go b/broker/broker.go index e032f800..74780964 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -14,6 +14,7 @@ var ( // Broker is an interface used for asynchronous messaging. type Broker interface { + Name() string Init(...Option) error Options() Options Address() string diff --git a/broker/noop.go b/broker/noop.go index 932da27c..0ab3fb99 100644 --- a/broker/noop.go +++ b/broker/noop.go @@ -16,6 +16,10 @@ func NewBroker(opts ...Option) Broker { return &noopBroker{opts: NewOptions(opts...)} } +func (n *noopBroker) Name() string { + return n.opts.Name +} + // Init initialize broker func (n *noopBroker) Init(opts ...Option) error { for _, o := range opts { diff --git a/broker/options.go b/broker/options.go index d41a313a..0bc3959e 100644 --- a/broker/options.go +++ b/broker/options.go @@ -7,12 +7,13 @@ import ( "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/meter" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/tracer" ) // Options struct type Options struct { + Name string // Addrs useed by broker Addrs []string // ErrorHandler executed when errors occur processing messages @@ -27,8 +28,8 @@ type Options struct { Tracer tracer.Tracer // TLSConfig for secure communication TLSConfig *tls.Config - // Registry used for clustering - Registry registry.Registry + // Register used for clustering + Register register.Register // Context is used for non default options Context context.Context } @@ -36,7 +37,7 @@ type Options struct { // NewOptions create new Options func NewOptions(opts ...Option) Options { options := Options{ - Registry: registry.DefaultRegistry, + Register: register.DefaultRegister, Logger: logger.DefaultLogger, Context: context.Background(), Meter: meter.DefaultMeter, @@ -202,10 +203,10 @@ func SubscribeGroup(name string) SubscribeOption { } } -// Registry sets registry option -func Registry(r registry.Registry) Option { +// Register sets register option +func Register(r register.Register) Option { return func(o *Options) { - o.Registry = r + o.Register = r } } @@ -237,6 +238,13 @@ func Meter(m meter.Meter) Option { } } +// Name sets the name +func Name(n string) Option { + return func(o *Options) { + o.Name = n + } +} + // SubscribeContext set context func SubscribeContext(ctx context.Context) SubscribeOption { return func(o *SubscribeOptions) { diff --git a/client/client.go b/client/client.go index 106b387f..e3ffc87c 100644 --- a/client/client.go +++ b/client/client.go @@ -18,6 +18,7 @@ var ( // It supports Request/Response via Transport and Publishing via the Broker. // It also supports bidirectional streaming of requests. type Client interface { + Name() string Init(...Option) error Options() Options NewMessage(topic string, msg interface{}, opts ...MessageOption) Message diff --git a/client/noop.go b/client/noop.go index 01224a2f..ceffdf46 100644 --- a/client/noop.go +++ b/client/noop.go @@ -49,6 +49,10 @@ func NewClient(opts ...Option) Client { return &noopClient{opts: NewOptions(opts...)} } +func (n *noopClient) Name() string { + return n.opts.Name +} + func (n *noopRequest) Service() string { return n.service } diff --git a/client/options.go b/client/options.go index 82c831e9..9ff18ee7 100644 --- a/client/options.go +++ b/client/options.go @@ -9,7 +9,7 @@ import ( "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/meter" "github.com/unistack-org/micro/v3/network/transport" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/router" "github.com/unistack-org/micro/v3/selector" "github.com/unistack-org/micro/v3/selector/random" @@ -18,6 +18,7 @@ import ( // Options holds client options type Options struct { + Name string // Used to select codec ContentType string // Proxy address to send requests via @@ -246,11 +247,11 @@ func Transport(t transport.Transport) Option { } } -// Registry sets the routers registry -func Registry(r registry.Registry) Option { +// Register sets the routers register +func Register(r register.Register) Option { return func(o *Options) { if o.Router != nil { - o.Router.Init(router.Registry(r)) + o.Router.Init(router.Register(r)) } } } @@ -291,6 +292,13 @@ func Backoff(fn BackoffFunc) Option { } } +// Name sets the client name +func Name(n string) Option { + return func(o *Options) { + o.Name = n + } +} + // Lookup sets the lookup function to use for resolving service names func Lookup(l LookupFunc) Option { return func(o *Options) { diff --git a/context.go b/context.go new file mode 100644 index 00000000..9ad154fa --- /dev/null +++ b/context.go @@ -0,0 +1,22 @@ +package micro + +import "context" + +type serviceKey struct{} + +// FromContext retrieves a Service from the Context. +func FromContext(ctx context.Context) (Service, bool) { + if ctx == nil { + return nil, false + } + s, ok := ctx.Value(serviceKey{}).(Service) + return s, ok +} + +// NewContext returns a new Context with the Service embedded within it. +func NewContext(ctx context.Context, s Service) context.Context { + if ctx == nil { + ctx = context.Background() + } + return context.WithValue(ctx, serviceKey{}, s) +} diff --git a/event.go b/event.go index 3f2c84ce..53603aca 100644 --- a/event.go +++ b/event.go @@ -6,11 +6,22 @@ import ( "github.com/unistack-org/micro/v3/client" ) +// Event is used to publish messages to a topic +type Event interface { + // Publish publishes a message to the event topic + Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error +} + type event struct { c client.Client topic string } +// NewEvent creates a new event publisher +func NewEvent(topic string, c client.Client) Event { + return &event{c, topic} +} + func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error { return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...) } diff --git a/events/events.go b/events/events.go index da0f1a11..333bfd7e 100644 --- a/events/events.go +++ b/events/events.go @@ -33,7 +33,7 @@ type Store interface { type Event struct { // ID to uniquely identify the event ID string - // Topic of event, e.g. "registry.service.created" + // Topic of event, e.g. "register.service.created" Topic string // Timestamp of the event Timestamp time.Time diff --git a/function.go b/function.go index 1b9c1865..e8f4f2b5 100644 --- a/function.go +++ b/function.go @@ -1,3 +1,5 @@ +// +build ignore + package micro import ( @@ -7,11 +9,28 @@ import ( "github.com/unistack-org/micro/v3/server" ) +// Function is a one time executing Service +type Function interface { + // Inherits Service interface + Service + // Done signals to complete execution + Done() error + // Handle registers an RPC handler + Handle(v interface{}) error + // Subscribe registers a subscriber + Subscribe(topic string, v interface{}) error +} + type function struct { cancel context.CancelFunc Service } +// NewFunction returns a new Function for a one time executing Service +func NewFunction(opts ...Option) Function { + return newFunction(opts...) +} + func fnHandlerWrapper(f Function) server.HandlerWrapper { return func(h server.HandlerFunc) server.HandlerFunc { return func(ctx context.Context, req server.Request, rsp interface{}) error { @@ -45,7 +64,7 @@ func newFunction(opts ...Option) Function { // make context the last thing fopts = append(fopts, Context(ctx)) - service := newService(fopts...) + service := &service{opts: NewOptions(opts...)} fn := &function{ cancel: cancel, diff --git a/function_test.go b/function_test.go index bd1c881e..c17e98ec 100644 --- a/function_test.go +++ b/function_test.go @@ -7,18 +7,18 @@ import ( "sync" "testing" - rmemory "github.com/unistack-org/micro-registry-memory" + rmemory "github.com/unistack-org/micro-register-memory" ) func TestFunction(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - r := rmemory.NewRegistry() + r := rmemory.NewRegister() // create service fn := NewFunction( - Registry(r), + Register(r), Name("test.function"), AfterStart(func() error { wg.Done() diff --git a/go.sum b/go.sum index 2169ca48..5e0eb15e 100644 --- a/go.sum +++ b/go.sum @@ -52,6 +52,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/unistack-org/micro v1.18.0 h1:EbFiII0bKV0Xcua7o6J30MFmm4/g0Hv3ECOKzsUBihU= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY= diff --git a/metadata/metadata.go b/metadata/metadata.go index 902f22da..028fa4ba 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -7,6 +7,11 @@ import ( "sort" ) +var ( + // HeaderPrefix for all headers passed + HeaderPrefix = "Micro-" +) + type metadataKey struct{} // Metadata is our way of representing request headers internally. diff --git a/meter/README.md b/meter/README.md deleted file mode 100644 index 424a0164..00000000 --- a/meter/README.md +++ /dev/null @@ -1,22 +0,0 @@ -metrics -======= - -The metrics package provides a simple metrics "Reporter" interface which allows the user to submit counters, gauges and timings (along with key/value tags). - -Implementations ---------------- - -* Prometheus (pull): will be first -* Prometheus (push): certainly achievable -* InfluxDB: could quite easily be done -* Telegraf: almost identical to the InfluxDB implementation -* Micro: Could we provide metrics over Micro's server interface? - - -Todo ----- - -* Include a handler middleware which uses the Reporter interface to generate per-request level metrics - - Throughput - - Errors - - Duration diff --git a/meter/meter.go b/meter/meter.go index b7602f9c..affccb4e 100644 --- a/meter/meter.go +++ b/meter/meter.go @@ -24,6 +24,7 @@ var ( // Meter is an interface for collecting and instrumenting metrics type Meter interface { + Name() string Init(...Option) error Counter(string, ...Option) Counter FloatCounter(string, ...Option) FloatCounter diff --git a/meter/meter_test.go b/meter/meter_test.go index 14a47322..9281adcd 100644 --- a/meter/meter_test.go +++ b/meter/meter_test.go @@ -35,20 +35,20 @@ func TestLabelsAppend(t *testing.T) { ls.vals = []string{"noop", "http"} var nls Labels - nls.keys = []string{"registry"} + nls.keys = []string{"register"} nls.vals = []string{"gossip"} ls = ls.Append(nls) ls.Sort() - if ls.keys[0] != "registry" || ls.vals[0] != "gossip" { + if ls.keys[0] != "register" || ls.vals[0] != "gossip" { t.Fatalf("append error: %v", ls) } } func TestIterator(t *testing.T) { var ls Labels - ls.keys = []string{"type", "server", "registry"} + ls.keys = []string{"type", "server", "register"} ls.vals = []string{"noop", "http", "gossip"} iter := ls.Iter() diff --git a/meter/noop.go b/meter/noop.go index d33258e7..61a8f386 100644 --- a/meter/noop.go +++ b/meter/noop.go @@ -16,6 +16,10 @@ func NewMeter(opts ...Option) Meter { return &noopMeter{opts: NewOptions(opts...)} } +func (r *noopMeter) Name() string { + return r.opts.Name +} + // Init initialize options func (r *noopMeter) Init(opts ...Option) error { for _, o := range opts { diff --git a/meter/options.go b/meter/options.go index 52c3d765..aa461bea 100644 --- a/meter/options.go +++ b/meter/options.go @@ -11,6 +11,7 @@ type Option func(*Options) // Options for metrics implementations: type Options struct { + Name string Address string Path string Labels Labels diff --git a/meter/wrapper/wrapper.go b/meter/wrapper/wrapper.go new file mode 100644 index 00000000..ad5bacb7 --- /dev/null +++ b/meter/wrapper/wrapper.go @@ -0,0 +1,232 @@ +// +build ignore + +package wrapper + +import ( + "context" + "fmt" + "time" + + "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/meter" + "github.com/unistack-org/micro/v3/server" +) + +type Options struct { + Meter meter.Meter + Name string + Version string + ID string +} + +type Option func(*Options) + +func ServiceName(name string) Option { + return func(o *Options) { + o.Name = name + } +} + +func ServiceVersion(version string) Option { + return func(o *Options) { + o.Version = version + } +} + +func ServiceID(id string) Option { + return func(o *Options) { + o.ID = id + } +} + +func Meter(m meter.Meter) Option { + return func(o *Options) { + o.Meter = m + } +} + +type wrapper struct { + options Options + callFunc client.CallFunc + client.Client +} + +func NewClientWrapper(opts ...Option) client.Wrapper { + return func(c client.Client) client.Client { + handler := &wrapper{ + labels: labels, + Client: c, + } + + return handler + } +} + +func NewCallWrapper(opts ...Option) client.CallWrapper { + labels := getLabels(opts...) + + return func(fn client.CallFunc) client.CallFunc { + handler := &wrapper{ + labels: labels, + callFunc: fn, + } + + return handler.CallFunc + } +} + +func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint)) + + timeCounterSummary := metrics.GetOrCreateSummary(getName("client_request_latency_microseconds", wlabels)) + timeCounterHistogram := metrics.GetOrCreateSummary(getName("client_request_duration_seconds", wlabels)) + + ts := time.Now() + err := w.callFunc(ctx, addr, req, rsp, opts) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() + } else { + metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() + } + + return err +} + +func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint)) + + timeCounterSummary := metrics.GetOrCreateSummary(getName("client_request_latency_microseconds", wlabels)) + timeCounterHistogram := metrics.GetOrCreateSummary(getName("client_request_duration_seconds", wlabels)) + + ts := time.Now() + err := w.Client.Call(ctx, req, rsp, opts...) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() + } else { + metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() + } + + return err +} + +func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { + endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) + wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint)) + + timeCounterSummary := metrics.GetOrCreateSummary(getName("client_request_latency_microseconds", wlabels)) + timeCounterHistogram := metrics.GetOrCreateSummary(getName("client_request_duration_seconds", wlabels)) + + ts := time.Now() + stream, err := w.Client.Stream(ctx, req, opts...) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() + } else { + metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() + } + + return stream, err +} + +func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + endpoint := p.Topic() + wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint)) + + timeCounterSummary := metrics.GetOrCreateSummary(getName("publish_message_latency_microseconds", wlabels)) + timeCounterHistogram := metrics.GetOrCreateSummary(getName("publish_message_duration_seconds", wlabels)) + + ts := time.Now() + err := w.Client.Publish(ctx, p, opts...) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("publish_message_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() + } else { + metrics.GetOrCreateCounter(getName("publish_message_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() + } + + return err +} + +func NewHandlerWrapper(opts ...Option) server.HandlerWrapper { + labels := getLabels(opts...) + + handler := &wrapper{ + labels: labels, + } + + return handler.HandlerFunc +} + +func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc { + return func(ctx context.Context, req server.Request, rsp interface{}) error { + endpoint := req.Endpoint() + wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint)) + + timeCounterSummary := metrics.GetOrCreateSummary(getName("server_request_latency_microseconds", wlabels)) + timeCounterHistogram := metrics.GetOrCreateSummary(getName("server_request_duration_seconds", wlabels)) + + ts := time.Now() + err := fn(ctx, req, rsp) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("server_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() + } else { + metrics.GetOrCreateCounter(getName("server_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() + } + + return err + } +} + +func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper { + labels := getLabels(opts...) + + handler := &wrapper{ + labels: labels, + } + + return handler.SubscriberFunc +} + +func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc { + return func(ctx context.Context, msg server.Message) error { + endpoint := msg.Topic() + wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint)) + + timeCounterSummary := metrics.GetOrCreateSummary(getName("subscribe_message_latency_microseconds", wlabels)) + timeCounterHistogram := metrics.GetOrCreateSummary(getName("subscribe_message_duration_seconds", wlabels)) + + ts := time.Now() + err := fn(ctx, msg) + te := time.Since(ts) + + timeCounterSummary.Update(float64(te.Seconds())) + timeCounterHistogram.Update(te.Seconds()) + if err == nil { + metrics.GetOrCreateCounter(getName("subscribe_message_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc() + } else { + metrics.GetOrCreateCounter(getName("subscribe_message_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc() + } + + return err + } +} diff --git a/micro.go b/micro.go deleted file mode 100644 index 8e5ed1d9..00000000 --- a/micro.go +++ /dev/null @@ -1,119 +0,0 @@ -// Package micro is a pluggable framework for microservices -package micro - -import ( - "context" - - "github.com/unistack-org/micro/v3/broker" - "github.com/unistack-org/micro/v3/client" - "github.com/unistack-org/micro/v3/server" -) - -type serviceKey struct{} - -// Service is an interface that wraps the lower level libraries -// within micro. Its a convenience method for building -// and initialising services. -type Service interface { - // The service name - Name() string - // Init initialises options - Init(...Option) error - // Options returns the current options - Options() Options - // Client is used to call services - Client() client.Client - // Server is for handling requests and events - Server() server.Server - // Broker is for broker usage - Broker() broker.Broker - // Run the service - Run() error - // The service implementation - String() string -} - -// Function is a one time executing Service -type Function interface { - // Inherits Service interface - Service - // Done signals to complete execution - Done() error - // Handle registers an RPC handler - Handle(v interface{}) error - // Subscribe registers a subscriber - Subscribe(topic string, v interface{}) error -} - -/* -// Type Event is a future type for acting on asynchronous events -type Event interface { - // Publish publishes a message to the event topic - Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error - // Subscribe to the event - Subscribe(ctx context.Context, v in -} - -// Resource is a future type for defining dependencies -type Resource interface { - // Name of the resource - Name() string - // Type of resource - Type() string - // Method of creation - Create() error -} -*/ - -// Event is used to publish messages to a topic -type Event interface { - // Publish publishes a message to the event topic - Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error -} - -var ( - // HeaderPrefix for all headers passed - HeaderPrefix = "Micro-" -) - -// NewService creates and returns a new Service based on the packages within. -func NewService(opts ...Option) Service { - return newService(opts...) -} - -// FromContext retrieves a Service from the Context. -func FromContext(ctx context.Context) (Service, bool) { - if ctx == nil { - return nil, false - } - s, ok := ctx.Value(serviceKey{}).(Service) - return s, ok -} - -// NewContext returns a new Context with the Service embedded within it. -func NewContext(ctx context.Context, s Service) context.Context { - if ctx == nil { - ctx = context.Background() - } - return context.WithValue(ctx, serviceKey{}, s) -} - -// NewFunction returns a new Function for a one time executing Service -func NewFunction(opts ...Option) Function { - return newFunction(opts...) -} - -// NewEvent creates a new event publisher -func NewEvent(topic string, c client.Client) Event { - return &event{c, topic} -} - -// RegisterHandler is syntactic sugar for registering a handler -func RegisterHandler(s server.Server, h interface{}, opts ...server.HandlerOption) error { - return s.Handle(s.NewHandler(h, opts...)) -} - -// RegisterSubscriber is syntactic sugar for registering a subscriber -func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error { - return s.Subscribe(s.NewSubscriber(topic, h, opts...)) -} diff --git a/network/tunnel/broker/broker.go b/network/tunnel/broker/broker.go index 00981d13..096196d1 100644 --- a/network/tunnel/broker/broker.go +++ b/network/tunnel/broker/broker.go @@ -41,6 +41,10 @@ func (t *tunBroker) Init(opts ...broker.Option) error { return nil } +func (t *tunBroker) Name() string { + return t.opts.Name +} + func (t *tunBroker) Options() broker.Options { return t.opts } diff --git a/options.go b/options.go index f023e171..4b73117f 100644 --- a/options.go +++ b/options.go @@ -2,40 +2,44 @@ package micro import ( "context" + "fmt" "time" "github.com/unistack-org/micro/v3/auth" "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/config" - "github.com/unistack-org/micro/v3/debug/profile" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/meter" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/router" - "github.com/unistack-org/micro/v3/runtime" - "github.com/unistack-org/micro/v3/selector" "github.com/unistack-org/micro/v3/server" "github.com/unistack-org/micro/v3/store" "github.com/unistack-org/micro/v3/tracer" + // "github.com/unistack-org/micro/v3/debug/profile" + // "github.com/unistack-org/micro/v3/runtime" ) // Options for micro service type Options struct { - Auth auth.Auth - Broker broker.Broker - Logger logger.Logger - Meter meter.Meter - Configs []config.Config - Client client.Client - Server server.Server - Store store.Store - Registry registry.Registry - Tracer tracer.Tracer - Router router.Router - Runtime runtime.Runtime - Profile profile.Profile + Name string + Version string + Metadata metadata.Metadata + + Auths []auth.Auth + Brokers []broker.Broker + Loggers []logger.Logger + Meters []meter.Meter + Configs []config.Config + Clients []client.Client + Servers []server.Server + Stores []store.Store + Registers []register.Register + Tracers []tracer.Tracer + Routers []router.Router + // Runtime runtime.Runtime + // Profile profile.Profile // Before and After funcs BeforeStart []func(context.Context) error @@ -51,18 +55,18 @@ type Options struct { // NewOptions returns new Options filled with defaults and overrided by provided opts func NewOptions(opts ...Option) Options { options := Options{ - Context: context.Background(), - Server: server.DefaultServer, - Client: client.DefaultClient, - Broker: broker.DefaultBroker, - Registry: registry.DefaultRegistry, - Router: router.DefaultRouter, - Auth: auth.DefaultAuth, - Logger: logger.DefaultLogger, - Tracer: tracer.DefaultTracer, - Meter: meter.DefaultMeter, - Configs: []config.Config{config.DefaultConfig}, - Store: store.DefaultStore, + Context: context.Background(), + Servers: []server.Server{server.DefaultServer}, + Clients: []client.Client{client.DefaultClient}, + Brokers: []broker.Broker{broker.DefaultBroker}, + Registers: []register.Register{register.DefaultRegister}, + Routers: []router.Router{router.DefaultRouter}, + Auths: []auth.Auth{auth.DefaultAuth}, + Loggers: []logger.Logger{logger.DefaultLogger}, + Tracers: []tracer.Tracer{tracer.DefaultTracer}, + Meters: []meter.Meter{meter.DefaultMeter}, + Configs: []config.Config{config.DefaultConfig}, + Stores: []store.Store{store.DefaultStore}, //Runtime runtime.Runtime //Profile profile.Profile } @@ -75,274 +79,559 @@ func NewOptions(opts ...Option) Options { } // Option func -type Option func(*Options) +type Option func(*Options) error -// Broker to be used for service -func Broker(b broker.Broker) Option { - return func(o *Options) { - o.Broker = b - if o.Client != nil { - // Update Client and Server - o.Client.Init(client.Broker(b)) +// Broker to be used for client and server +func Broker(b broker.Broker, opts ...BrokerOption) Option { + return func(o *Options) error { + var err error + bopts := brokerOptions{} + for _, opt := range opts { + opt(&bopts) } - if o.Server != nil { - o.Server.Init(server.Broker(b)) + all := false + if len(opts) == 0 { + all = true } + for _, srv := range o.Servers { + for _, os := range bopts.servers { + if srv.Name() == os || all { + if err = srv.Init(server.Broker(b)); err != nil { + return err + } + } + } + } + for _, cli := range o.Clients { + for _, oc := range bopts.clients { + if cli.Name() == oc || all { + if err = cli.Init(client.Broker(b)); err != nil { + return err + } + } + } + } + return nil } } -// Client to be used for service -func Client(c client.Client) Option { - return func(o *Options) { - o.Client = c +type brokerOptions struct { + servers []string + clients []string +} + +type BrokerOption func(*brokerOptions) + +func BrokerClient(n string) BrokerOption { + return func(o *brokerOptions) { + o.clients = append(o.clients, n) + } +} + +func BrokerServer(n string) BrokerOption { + return func(o *brokerOptions) { + o.servers = append(o.servers, n) + } +} + +// Clients to be used for service +func Clients(c ...client.Client) Option { + return func(o *Options) error { + o.Clients = c + return nil } } // Context specifies a context for the service. // Can be used to signal shutdown of the service and for extra option values. func Context(ctx context.Context) Option { - return func(o *Options) { + return func(o *Options) error { + // TODO: Pass context to underline stuff ? o.Context = ctx + return nil } } +/* // Profile to be used for debug profile func Profile(p profile.Profile) Option { return func(o *Options) { o.Profile = p } } +*/ -// Server to be used for service -func Server(s server.Server) Option { - return func(o *Options) { - o.Server = s +// Servers to be used for service +func Servers(s ...server.Server) Option { + return func(o *Options) error { + o.Servers = s + return nil } } -// Store sets the store to use -func Store(s store.Store) Option { - return func(o *Options) { - o.Store = s +// Stores sets the store to use +func Stores(s ...store.Store) Option { + return func(o *Options) error { + o.Stores = s + return nil } } // Logger set the logger to use -func Logger(l logger.Logger) Option { - return func(o *Options) { - o.Logger = l +func Logger(l logger.Logger, opts ...LoggerOption) Option { + return func(o *Options) error { + var err error + lopts := loggerOptions{} + for _, opt := range opts { + opt(&lopts) + } + all := false + if len(opts) == 0 { + all = true + } + for _, srv := range o.Servers { + for _, os := range lopts.servers { + if srv.Name() == os || all { + if err = srv.Init(server.Logger(l)); err != nil { + return err + } + } + } + } + for _, cli := range o.Clients { + for _, oc := range lopts.clients { + if cli.Name() == oc || all { + if err = cli.Init(client.Logger(l)); err != nil { + return err + } + } + } + } + for _, brk := range o.Brokers { + for _, ob := range lopts.brokers { + if brk.Name() == ob || all { + if err = brk.Init(broker.Logger(l)); err != nil { + return err + } + } + } + } + for _, reg := range o.Registers { + for _, or := range lopts.registers { + if reg.Name() == or || all { + if err = reg.Init(register.Logger(l)); err != nil { + return err + } + } + } + } + for _, str := range o.Stores { + for _, or := range lopts.stores { + if str.Name() == or || all { + if err = str.Init(store.Logger(l)); err != nil { + return err + } + } + } + } + for _, mtr := range o.Meters { + for _, or := range lopts.meters { + if mtr.Name() == or || all { + if err = mtr.Init(meter.Logger(l)); err != nil { + return err + } + } + } + } + for _, trc := range o.Tracers { + for _, ot := range lopts.tracers { + if trc.Name() == ot || all { + if err = trc.Init(tracer.Logger(l)); err != nil { + return err + } + } + } + } + + return nil } } -// Meter set the meter to use -func Meter(m meter.Meter) Option { - return func(o *Options) { - o.Meter = m +type LoggerOption func(*loggerOptions) + +type loggerOptions struct { + servers []string + clients []string + brokers []string + registers []string + stores []string + meters []string + tracers []string +} + +/* +func LoggerServer(n string) LoggerOption { + +} +*/ + +// Meters set the meter to use +func Meters(m ...meter.Meter) Option { + return func(o *Options) error { + o.Meters = m + return nil } } -// Registry sets the registry for the service +// Register sets the register for the service // and the underlying components -func Registry(r registry.Registry) Option { - return func(o *Options) { - o.Registry = r - if o.Router != nil { - // Update router - o.Router.Init(router.Registry(r)) +func Register(r register.Register, opts ...RegisterOption) Option { + return func(o *Options) error { + var err error + ropts := registerOptions{} + for _, opt := range opts { + opt(&ropts) } - if o.Server != nil { - // Update server - o.Server.Init(server.Registry(r)) + all := false + if len(opts) == 0 { + all = true } - if o.Broker != nil { - // Update Broker - o.Broker.Init(broker.Registry(r)) + for _, rtr := range o.Routers { + for _, os := range ropts.routers { + if rtr.Name() == os || all { + if err = rtr.Init(router.Register(r)); err != nil { + return err + } + } + } } + for _, srv := range o.Servers { + for _, os := range ropts.servers { + if srv.Name() == os || all { + if err = srv.Init(server.Register(r)); err != nil { + return err + } + } + } + } + for _, brk := range o.Brokers { + for _, os := range ropts.brokers { + if brk.Name() == os || all { + if err = brk.Init(broker.Register(r)); err != nil { + return err + } + } + } + } + return nil } } -// Tracer sets the tracer for the service -func Tracer(t tracer.Tracer) Option { - return func(o *Options) { - o.Tracer = t - if o.Server != nil { - o.Server.Init(server.Tracer(t)) - } - if o.Client != nil { - o.Client.Init(client.Tracer(t)) - } +type registerOptions struct { + routers []string + servers []string + brokers []string +} + +type RegisterOption func(*registerOptions) + +func RegisterRouter(n string) RegisterOption { + return func(o *registerOptions) { + o.routers = append(o.routers, n) } } +func RegisterServer(n string) RegisterOption { + return func(o *registerOptions) { + o.servers = append(o.servers, n) + } +} + +func RegisterBroker(n string) RegisterOption { + return func(o *registerOptions) { + o.brokers = append(o.brokers, n) + } +} + +func Tracer(t tracer.Tracer, opts ...TracerOption) Option { + return func(o *Options) error { + var err error + topts := tracerOptions{} + for _, opt := range opts { + opt(&topts) + } + all := false + if len(opts) == 0 { + all = true + } + for _, srv := range o.Servers { + for _, os := range topts.servers { + if srv.Name() == os || all { + if err = srv.Init(server.Tracer(t)); err != nil { + return err + } + } + } + } + for _, cli := range o.Clients { + for _, os := range topts.clients { + if cli.Name() == os || all { + if err = cli.Init(client.Tracer(t)); err != nil { + return err + } + } + } + } + for _, str := range o.Stores { + for _, os := range topts.stores { + if str.Name() == os || all { + if err = str.Init(store.Tracer(t)); err != nil { + return err + } + } + } + } + for _, brk := range o.Brokers { + for _, os := range topts.brokers { + if brk.Name() == os || all { + if err = brk.Init(broker.Tracer(t)); err != nil { + return err + } + } + } + } + return nil + } +} + +type tracerOptions struct { + clients []string + servers []string + brokers []string + stores []string +} + +type TracerOption func(*tracerOptions) + +func TracerClient(n string) TracerOption { + return func(o *tracerOptions) { + o.clients = append(o.clients, n) + } +} + +func TracerServer(n string) TracerOption { + return func(o *tracerOptions) { + o.servers = append(o.servers, n) + } +} + +func TracerBroker(n string) TracerOption { + return func(o *tracerOptions) { + o.brokers = append(o.brokers, n) + } +} + +func TracerStore(n string) TracerOption { + return func(o *tracerOptions) { + o.stores = append(o.stores, n) + } +} + +/* // Auth sets the auth for the service func Auth(a auth.Auth) Option { - return func(o *Options) { + return func(o *Options) error { o.Auth = a if o.Server != nil { o.Server.Init(server.Auth(a)) } + return nil } } +*/ // Configs sets the configs for the service func Configs(c ...config.Config) Option { - return func(o *Options) { + return func(o *Options) error { o.Configs = c + return nil } } +/* // Selector sets the selector for the service client func Selector(s selector.Selector) Option { - return func(o *Options) { + return func(o *Options) error { if o.Client != nil { o.Client.Init(client.Selector(s)) } + return nil } } - +*/ +/* // Runtime sets the runtime func Runtime(r runtime.Runtime) Option { return func(o *Options) { o.Runtime = r } } +*/ // Router sets the router -func Router(r router.Router) Option { - return func(o *Options) { - o.Router = r - // Update client - if o.Client != nil { - o.Client.Init(client.Router(r)) +func Router(r router.Router, opts ...RouterOption) Option { + return func(o *Options) error { + var err error + ropts := routerOptions{} + for _, opt := range opts { + opt(&ropts) } + all := false + if len(opts) == 0 { + all = true + } + for _, cli := range o.Clients { + for _, os := range ropts.clients { + if cli.Name() == os || all { + if err = cli.Init(client.Router(r)); err != nil { + return err + } + } + } + } + return nil + } +} + +type routerOptions struct { + clients []string +} + +type RouterOption func(*routerOptions) + +func RouterClient(n string) RouterOption { + return func(o *routerOptions) { + o.clients = append(o.clients, n) } } // Address sets the address of the server func Address(addr string) Option { - return func(o *Options) { - if o.Server != nil { - o.Server.Init(server.Address(addr)) + return func(o *Options) error { + switch len(o.Servers) { + case 0: + return fmt.Errorf("cant set address on nil server") + case 1: + break + default: + return fmt.Errorf("cant set same address for multiple servers") } + return o.Servers[0].Init(server.Address(addr)) } } // Name of the service func Name(n string) Option { - return func(o *Options) { - if o.Server != nil { - o.Server.Init(server.Name(n)) - } + return func(o *Options) error { + o.Name = n + return nil } } // Version of the service func Version(v string) Option { - return func(o *Options) { - if o.Server != nil { - o.Server.Init(server.Version(v)) - } + return func(o *Options) error { + o.Version = v + return nil } } // Metadata associated with the service func Metadata(md metadata.Metadata) Option { - return func(o *Options) { - if o.Server != nil { - o.Server.Init(server.Metadata(md)) - } + return func(o *Options) error { + o.Metadata = metadata.Copy(md) + return nil } } // RegisterTTL specifies the TTL to use when registering the service -func RegisterTTL(t time.Duration) Option { - return func(o *Options) { - if o.Server != nil { - o.Server.Init(server.RegisterTTL(t)) +func RegisterTTL(td time.Duration, opts ...RegisterOption) Option { + return func(o *Options) error { + var err error + ropts := registerOptions{} + for _, opt := range opts { + opt(&ropts) } + all := false + if len(opts) == 0 { + all = true + } + for _, srv := range o.Servers { + for _, os := range ropts.servers { + if srv.Name() == os || all { + if err = srv.Init(server.RegisterTTL(td)); err != nil { + return err + } + } + } + } + return nil } } // RegisterInterval specifies the interval on which to re-register -func RegisterInterval(t time.Duration) Option { - return func(o *Options) { - if o.Server != nil { - o.Server.Init(server.RegisterInterval(t)) +func RegisterInterval(td time.Duration, opts ...RegisterOption) Option { + return func(o *Options) error { + var err error + ropts := registerOptions{} + for _, opt := range opts { + opt(&ropts) } - } -} - -// WrapClient is a convenience method for wrapping a Client with -// some middleware component. A list of wrappers can be provided. -// Wrappers are applied in reverse order so the last is executed first. -func WrapClient(w ...client.Wrapper) Option { - return func(o *Options) { - // apply in reverse - for i := len(w); i > 0; i-- { - o.Client = w[i-1](o.Client) + all := false + if len(opts) == 0 { + all = true } - } -} - -// WrapCall is a convenience method for wrapping a Client CallFunc -func WrapCall(w ...client.CallWrapper) Option { - return func(o *Options) { - o.Client.Init(client.WrapCall(w...)) - } -} - -// WrapHandler adds a handler Wrapper to a list of options passed into the server -func WrapHandler(w ...server.HandlerWrapper) Option { - return func(o *Options) { - var wrappers []server.Option - - for _, wrap := range w { - wrappers = append(wrappers, server.WrapHandler(wrap)) + for _, srv := range o.Servers { + for _, os := range ropts.servers { + if srv.Name() == os || all { + if err = srv.Init(server.RegisterInterval(td)); err != nil { + return err + } + } + } } - - // Init once - o.Server.Init(wrappers...) - } -} - -// WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server -func WrapSubscriber(w ...server.SubscriberWrapper) Option { - return func(o *Options) { - var wrappers []server.Option - - for _, wrap := range w { - wrappers = append(wrappers, server.WrapSubscriber(wrap)) - } - - // Init once - o.Server.Init(wrappers...) + return nil } } // BeforeStart run funcs before service starts func BeforeStart(fn func(context.Context) error) Option { - return func(o *Options) { + return func(o *Options) error { o.BeforeStart = append(o.BeforeStart, fn) + return nil } } // BeforeStop run funcs before service stops func BeforeStop(fn func(context.Context) error) Option { - return func(o *Options) { + return func(o *Options) error { o.BeforeStop = append(o.BeforeStop, fn) + return nil } } // AfterStart run funcs after service starts func AfterStart(fn func(context.Context) error) Option { - return func(o *Options) { + return func(o *Options) error { o.AfterStart = append(o.AfterStart, fn) + return nil } } // AfterStop run funcs after service stops func AfterStop(fn func(context.Context) error) Option { - return func(o *Options) { + return func(o *Options) error { o.AfterStop = append(o.AfterStop, fn) + return nil } } diff --git a/registry/context.go b/register/context.go similarity index 53% rename from registry/context.go rename to register/context.go index f491cea9..35d0b3c2 100644 --- a/registry/context.go +++ b/register/context.go @@ -1,26 +1,26 @@ -package registry +package register import ( "context" ) -type registryKey struct{} +type registerKey struct{} -// FromContext get registry from context -func FromContext(ctx context.Context) (Registry, bool) { +// FromContext get register from context +func FromContext(ctx context.Context) (Register, bool) { if ctx == nil { return nil, false } - c, ok := ctx.Value(registryKey{}).(Registry) + c, ok := ctx.Value(registerKey{}).(Register) return c, ok } -// NewContext put registry in context -func NewContext(ctx context.Context, c Registry) context.Context { +// NewContext put register in context +func NewContext(ctx context.Context, c Register) context.Context { if ctx == nil { ctx = context.Background() } - return context.WithValue(ctx, registryKey{}, c) + return context.WithValue(ctx, registerKey{}, c) } // SetOption returns a function to setup a context with given value diff --git a/registry/extractor.go b/register/extractor.go similarity index 99% rename from registry/extractor.go rename to register/extractor.go index 779e85fd..d1423b74 100644 --- a/registry/extractor.go +++ b/register/extractor.go @@ -1,4 +1,4 @@ -package registry +package register import ( "fmt" diff --git a/registry/extractor_test.go b/register/extractor_test.go similarity index 98% rename from registry/extractor_test.go rename to register/extractor_test.go index be5bd996..901cda2c 100644 --- a/registry/extractor_test.go +++ b/register/extractor_test.go @@ -1,4 +1,4 @@ -package registry +package register import ( "context" diff --git a/register/noop.go b/register/noop.go new file mode 100644 index 00000000..d3433cab --- /dev/null +++ b/register/noop.go @@ -0,0 +1,85 @@ +package register + +import ( + "context" +) + +type noopRegister struct { + opts Options +} + +func (n *noopRegister) Name() string { + return n.opts.Name +} + +// Init initialize register +func (n *noopRegister) Init(opts ...Option) error { + for _, o := range opts { + o(&n.opts) + } + return nil +} + +// Options returns options struct +func (n *noopRegister) Options() Options { + return n.opts +} + +// Connect opens connection to register +func (n *noopRegister) Connect(ctx context.Context) error { + return nil +} + +// Disconnect close connection to register +func (n *noopRegister) Disconnect(ctx context.Context) error { + return nil +} + +// Register registers service +func (n *noopRegister) Register(ctx context.Context, svc *Service, opts ...RegisterOption) error { + return nil +} + +// Deregister deregisters service +func (n *noopRegister) Deregister(ctx context.Context, svc *Service, opts ...DeregisterOption) error { + return nil +} + +// LookupService returns servive info +func (n *noopRegister) LookupService(ctx context.Context, name string, opts ...LookupOption) ([]*Service, error) { + return []*Service{}, nil +} + +// ListServices listing services +func (n *noopRegister) ListServices(ctx context.Context, opts ...ListOption) ([]*Service, error) { + return []*Service{}, nil +} + +// Watch is used to watch for service changes +func (n *noopRegister) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { + return &noopWatcher{done: make(chan struct{}), opts: NewWatchOptions(opts...)}, nil +} + +// String returns register string representation +func (n *noopRegister) String() string { + return "noop" +} + +type noopWatcher struct { + opts WatchOptions + done chan struct{} +} + +func (n *noopWatcher) Next() (*Result, error) { + <-n.done + return nil, ErrWatcherStopped +} + +func (n *noopWatcher) Stop() { + close(n.done) +} + +// NewRegister returns a new noop register +func NewRegister(opts ...Option) Register { + return &noopRegister{opts: NewOptions(opts...)} +} diff --git a/registry/options.go b/register/options.go similarity index 93% rename from registry/options.go rename to register/options.go index 64dc5c44..fa906a2e 100644 --- a/registry/options.go +++ b/register/options.go @@ -1,4 +1,4 @@ -package registry +package register import ( "context" @@ -11,6 +11,7 @@ import ( ) type Options struct { + Name string Addrs []string Timeout time.Duration TLSConfig *tls.Config @@ -102,14 +103,14 @@ func NewDeregisterOptions(opts ...DeregisterOption) DeregisterOptions { return options } -type GetOptions struct { +type LookupOptions struct { Context context.Context // Domain to scope the request to Domain string } -func NewGetOptions(opts ...GetOption) GetOptions { - options := GetOptions{ +func NewLookupOptions(opts ...LookupOption) LookupOptions { + options := LookupOptions{ Domain: DefaultDomain, Context: context.Background(), } @@ -136,7 +137,7 @@ func NewListOptions(opts ...ListOption) ListOptions { return options } -// Addrs is the registry addresses to use +// Addrs is the register addresses to use func Addrs(addrs ...string) Option { return func(o *Options) { o.Addrs = addrs @@ -245,14 +246,14 @@ func DeregisterDomain(d string) DeregisterOption { } } -func GetContext(ctx context.Context) GetOption { - return func(o *GetOptions) { +func GetContext(ctx context.Context) LookupOption { + return func(o *LookupOptions) { o.Context = ctx } } -func GetDomain(d string) GetOption { - return func(o *GetOptions) { +func GetDomain(d string) LookupOption { + return func(o *LookupOptions) { o.Domain = d } } diff --git a/registry/registry.go b/register/register.go similarity index 76% rename from registry/registry.go rename to register/register.go index 25b9fcea..792b41cd 100644 --- a/registry/registry.go +++ b/register/register.go @@ -1,5 +1,5 @@ -// Package registry is an interface for service discovery -package registry +// Package register is an interface for service discovery +package register import ( "context" @@ -16,31 +16,32 @@ const ( ) var ( - // DefaultRegistry is the global default registry - DefaultRegistry Registry = NewRegistry() - // ErrNotFound returned when GetService is called and no services found + // DefaultRegister is the global default register + DefaultRegister Register = NewRegister() + // ErrNotFound returned when LookupService is called and no services found ErrNotFound = errors.New("service not found") // ErrWatcherStopped returned when when watcher is stopped ErrWatcherStopped = errors.New("watcher stopped") ) -// Registry provides an interface for service discovery +// Register provides an interface for service discovery // and an abstraction over varying implementations // {consul, etcd, zookeeper, ...} -type Registry interface { +type Register interface { + Name() string Init(...Option) error Options() Options Connect(context.Context) error Disconnect(context.Context) error Register(context.Context, *Service, ...RegisterOption) error Deregister(context.Context, *Service, ...DeregisterOption) error - GetService(context.Context, string, ...GetOption) ([]*Service, error) + LookupService(context.Context, string, ...LookupOption) ([]*Service, error) ListServices(context.Context, ...ListOption) ([]*Service, error) Watch(context.Context, ...WatchOption) (Watcher, error) String() string } -// Service holds service registry info +// Service holds service register info type Service struct { Name string `json:"name"` Version string `json:"version"` @@ -49,14 +50,14 @@ type Service struct { Nodes []*Node `json:"nodes"` } -// Node holds node registry info +// Node holds node register info type Node struct { Id string `json:"id"` Address string `json:"address"` Metadata metadata.Metadata `json:"metadata"` } -// Endpoint holds endpoint registry info +// Endpoint holds endpoint register info type Endpoint struct { Name string `json:"name"` Request *Value `json:"request"` @@ -83,8 +84,8 @@ type WatchOption func(*WatchOptions) // DeregisterOption option is used to deregister service type DeregisterOption func(*DeregisterOptions) -// GetOption option is used to get service -type GetOption func(*GetOptions) +// LookupOption option is used to get service +type LookupOption func(*LookupOptions) // ListOption option is used to list services type ListOption func(*ListOptions) diff --git a/registry/watcher.go b/register/watcher.go similarity index 83% rename from registry/watcher.go rename to register/watcher.go index c0dba682..edc5bc72 100644 --- a/registry/watcher.go +++ b/register/watcher.go @@ -1,9 +1,9 @@ -package registry +package register import "time" // Watcher is an interface that returns updates -// about services within the registry. +// about services within the register. type Watcher interface { // Next is a blocking call Next() (*Result, error) @@ -17,7 +17,7 @@ type Result struct { Service *Service } -// EventType defines registry event type +// EventType defines register event type type EventType int const ( @@ -43,14 +43,14 @@ func (t EventType) String() string { } } -// Event is registry event +// Event is register event type Event struct { - // Id is registry id + // Id is register id Id string // Type defines type of event Type EventType // Timestamp is event timestamp Timestamp time.Time - // Service is registry service + // Service is register service Service *Service } diff --git a/registry/noop.go b/registry/noop.go deleted file mode 100644 index f6633c0a..00000000 --- a/registry/noop.go +++ /dev/null @@ -1,81 +0,0 @@ -package registry - -import ( - "context" -) - -type noopRegistry struct { - opts Options -} - -// Init initialize registry -func (n *noopRegistry) Init(opts ...Option) error { - for _, o := range opts { - o(&n.opts) - } - return nil -} - -// Options returns options struct -func (n *noopRegistry) Options() Options { - return n.opts -} - -// Connect opens connection to registry -func (n *noopRegistry) Connect(ctx context.Context) error { - return nil -} - -// Disconnect close connection to registry -func (n *noopRegistry) Disconnect(ctx context.Context) error { - return nil -} - -// Register registers service -func (n *noopRegistry) Register(ctx context.Context, svc *Service, opts ...RegisterOption) error { - return nil -} - -// Deregister deregisters service -func (n *noopRegistry) Deregister(ctx context.Context, svc *Service, opts ...DeregisterOption) error { - return nil -} - -// GetService returns servive info -func (n *noopRegistry) GetService(ctx context.Context, name string, opts ...GetOption) ([]*Service, error) { - return []*Service{}, nil -} - -// ListServices listing services -func (n *noopRegistry) ListServices(ctx context.Context, opts ...ListOption) ([]*Service, error) { - return []*Service{}, nil -} - -// Watch is used to watch for service changes -func (n *noopRegistry) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { - return &noopWatcher{done: make(chan struct{}), opts: NewWatchOptions(opts...)}, nil -} - -// String returns registry string representation -func (n *noopRegistry) String() string { - return "noop" -} - -type noopWatcher struct { - opts WatchOptions - done chan struct{} -} - -func (n *noopWatcher) Next() (*Result, error) { - <-n.done - return nil, ErrWatcherStopped -} - -func (n *noopWatcher) Stop() { - close(n.done) -} - -// NewRegistry returns a new noop registry -func NewRegistry(opts ...Option) Registry { - return &noopRegistry{opts: NewOptions(opts...)} -} diff --git a/resolver/registry/registry.go b/resolver/registry/registry.go index 6ebab330..460de75d 100644 --- a/resolver/registry/registry.go +++ b/resolver/registry/registry.go @@ -1,22 +1,22 @@ -// Package registry resolves names using the micro registry -package registry +// Package register resolves names using the micro register +package register import ( "context" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/resolver" ) -// Resolver is a registry network resolver +// Resolver is a register network resolver type Resolver struct { - // Registry is the registry to use otherwise we use the defaul - Registry registry.Registry + // Register is the register to use otherwise we use the defaul + Register register.Register } // Resolve assumes ID is a domain name e.g micro.mu func (r *Resolver) Resolve(name string) ([]*resolver.Record, error) { - services, err := r.Registry.GetService(context.TODO(), name) + services, err := r.Register.LookupService(context.TODO(), name) if err != nil { return nil, err } diff --git a/router/options.go b/router/options.go index c2c56ee5..8db15101 100644 --- a/router/options.go +++ b/router/options.go @@ -5,11 +5,12 @@ import ( "github.com/google/uuid" "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" ) // Options are router options type Options struct { + Name string // Id is router id Id string // Address is router address @@ -18,8 +19,8 @@ type Options struct { Gateway string // Network is network address Network string - // Registry is the local registry - Registry registry.Registry + // Register is the local register + Register register.Register // Precache routes Precache bool // Logger @@ -63,10 +64,10 @@ func Logger(l logger.Logger) Option { } } -// Registry sets the local registry -func Registry(r registry.Registry) Option { +// Register sets the local register +func Register(r register.Register) Option { return func(o *Options) { - o.Registry = r + o.Register = r } } @@ -77,12 +78,19 @@ func Precache() Option { } } +// Name of the router +func Name(n string) Option { + return func(o *Options) { + o.Name = n + } +} + // NewOptions returns router default options func NewOptions(opts ...Option) Options { options := Options{ Id: uuid.New().String(), Network: DefaultNetwork, - Registry: registry.DefaultRegistry, + Register: register.DefaultRegister, Logger: logger.DefaultLogger, Context: context.Background(), } diff --git a/router/router.go b/router/router.go index be07c687..b2235e7e 100644 --- a/router/router.go +++ b/router/router.go @@ -18,6 +18,7 @@ var ( // Router is an interface for a routing control plane type Router interface { + Name() string // Init initializes the router with options Init(...Option) error // Options returns the router options diff --git a/server/handler.go b/server/handler.go index 44fe1e61..b0fc6511 100644 --- a/server/handler.go +++ b/server/handler.go @@ -3,13 +3,13 @@ package server import ( "reflect" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" ) type rpcHandler struct { name string handler interface{} - endpoints []*registry.Endpoint + endpoints []*register.Endpoint opts HandlerOptions } @@ -20,10 +20,10 @@ func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler { hdlr := reflect.ValueOf(handler) name := reflect.Indirect(hdlr).Type().Name() - var endpoints []*registry.Endpoint + var endpoints []*register.Endpoint for m := 0; m < typ.NumMethod(); m++ { - if e := registry.ExtractEndpoint(typ.Method(m)); e != nil { + if e := register.ExtractEndpoint(typ.Method(m)); e != nil { e.Name = name + "." + e.Name for k, v := range options.Metadata[e.Name] { @@ -50,7 +50,7 @@ func (r *rpcHandler) Handler() interface{} { return r.handler } -func (r *rpcHandler) Endpoints() []*registry.Endpoint { +func (r *rpcHandler) Endpoints() []*register.Endpoint { return r.endpoints } diff --git a/server/noop.go b/server/noop.go index b288d55f..9e651dbe 100644 --- a/server/noop.go +++ b/server/noop.go @@ -13,7 +13,7 @@ import ( "github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" ) var ( @@ -34,7 +34,7 @@ const ( type noopServer struct { h Handler opts Options - rsvc *registry.Service + rsvc *register.Service handlers map[string]Handler subscribers map[*subscriber][]broker.Subscriber registered bool @@ -64,6 +64,10 @@ func (n *noopServer) Handle(handler Handler) error { return nil } +func (n *noopServer) Name() string { + return n.opts.Name +} + func (n *noopServer) Subscribe(sb Subscriber) error { sub, ok := sb.(*subscriber) if !ok { @@ -137,10 +141,10 @@ func (n *noopServer) Register() error { } var err error - var service *registry.Service + var service *register.Service var cacheService bool - service, err = NewRegistryService(n) + service, err = NewRegisterService(n) if err != nil { return err } @@ -168,7 +172,7 @@ func (n *noopServer) Register() error { return subscriberList[i].topic > subscriberList[j].topic }) - endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList)) + endpoints := make([]*register.Endpoint, 0, len(handlerList)+len(subscriberList)) for _, h := range handlerList { endpoints = append(endpoints, n.handlers[h].Endpoints()...) } @@ -187,7 +191,7 @@ func (n *noopServer) Register() error { if !registered { if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(n.opts.Context, "registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) + config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].Id) } } @@ -244,7 +248,7 @@ func (n *noopServer) Deregister() error { config := n.opts n.RUnlock() - service, err := NewRegistryService(n) + service, err := NewRegisterService(n) if err != nil { return err } diff --git a/server/options.go b/server/options.go index 7bac29f0..ff6b2543 100644 --- a/server/options.go +++ b/server/options.go @@ -14,7 +14,7 @@ import ( "github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/meter" "github.com/unistack-org/micro/v3/network/transport" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/tracer" ) @@ -25,7 +25,7 @@ type Option func(*Options) type Options struct { Codecs map[string]codec.Codec Broker broker.Broker - Registry registry.Registry + Register register.Register Tracer tracer.Tracer Auth auth.Auth Logger logger.Logger @@ -83,7 +83,7 @@ func NewOptions(opts ...Option) Options { Meter: meter.DefaultMeter, Tracer: tracer.DefaultTracer, Broker: broker.DefaultBroker, - Registry: registry.DefaultRegistry, + Register: register.DefaultRegister, Transport: transport.DefaultTransport, Address: DefaultAddress, Name: DefaultName, @@ -178,10 +178,10 @@ func Context(ctx context.Context) Option { } } -// Registry used for discovery -func Registry(r registry.Registry) Option { +// Register used for discovery +func Register(r register.Register) Option { return func(o *Options) { - o.Registry = r + o.Register = r } } @@ -213,7 +213,7 @@ func Metadata(md metadata.Metadata) Option { } } -// RegisterCheck run func before registry service +// RegisterCheck run func before register service func RegisterCheck(fn func(context.Context) error) Option { return func(o *Options) { o.RegisterCheck = fn diff --git a/server/registry.go b/server/registry.go index 5e040d16..2fd6459a 100644 --- a/server/registry.go +++ b/server/registry.go @@ -5,23 +5,23 @@ import ( "time" "github.com/unistack-org/micro/v3/metadata" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/util/addr" "github.com/unistack-org/micro/v3/util/backoff" ) var ( // DefaultRegisterFunc uses backoff to register service - DefaultRegisterFunc = func(svc *registry.Service, config Options) error { + DefaultRegisterFunc = func(svc *register.Service, config Options) error { var err error - opts := []registry.RegisterOption{ - registry.RegisterTTL(config.RegisterTTL), - registry.RegisterDomain(config.Namespace), + opts := []register.RegisterOption{ + register.RegisterTTL(config.RegisterTTL), + register.RegisterDomain(config.Namespace), } for i := 0; i <= config.RegisterAttempts; i++ { - err = config.Registry.Register(config.Context, svc, opts...) + err = config.Register.Register(config.Context, svc, opts...) if err == nil { break } @@ -32,15 +32,15 @@ var ( return err } // DefaultDeregisterFunc uses backoff to deregister service - DefaultDeregisterFunc = func(svc *registry.Service, config Options) error { + DefaultDeregisterFunc = func(svc *register.Service, config Options) error { var err error - opts := []registry.DeregisterOption{ - registry.DeregisterDomain(config.Namespace), + opts := []register.DeregisterOption{ + register.DeregisterDomain(config.Namespace), } for i := 0; i <= config.DeregisterAttempts; i++ { - err = config.Registry.Deregister(config.Context, svc, opts...) + err = config.Register.Deregister(config.Context, svc, opts...) if err == nil { break } @@ -52,8 +52,8 @@ var ( } ) -// NewRegistryService returns *registry.Service from Server -func NewRegistryService(s Server) (*registry.Service, error) { +// NewRegisterService returns *register.Service from Server +func NewRegisterService(s Server) (*register.Service, error) { opts := s.Options() advt := opts.Address @@ -71,7 +71,7 @@ func NewRegistryService(s Server) (*registry.Service, error) { addr = host } - node := ®istry.Node{ + node := ®ister.Node{ Id: opts.Name + "-" + opts.Id, Address: net.JoinHostPort(addr, port), } @@ -79,12 +79,12 @@ func NewRegistryService(s Server) (*registry.Service, error) { node.Metadata["server"] = s.String() node.Metadata["broker"] = opts.Broker.String() - node.Metadata["registry"] = opts.Registry.String() + node.Metadata["register"] = opts.Register.String() - return ®istry.Service{ + return ®ister.Service{ Name: opts.Name, Version: opts.Version, - Nodes: []*registry.Node{node}, + Nodes: []*register.Node{node}, Metadata: metadata.New(0), }, nil } diff --git a/server/server.go b/server/server.go index f4e97973..fd498630 100644 --- a/server/server.go +++ b/server/server.go @@ -8,7 +8,7 @@ import ( "github.com/google/uuid" "github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/metadata" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" ) var ( @@ -29,7 +29,7 @@ var ( DefaultRegisterCheck = func(context.Context) error { return nil } // DefaultRegisterInterval holds interval for register DefaultRegisterInterval = time.Second * 30 - // DefaultRegisterTTL holds registry record ttl, must be multiple of DefaultRegisterInterval + // DefaultRegisterTTL holds register record ttl, must be multiple of DefaultRegisterInterval DefaultRegisterTTL = time.Second * 90 // DefaultNamespace will be used if no namespace passed DefaultNamespace = "micro" @@ -43,6 +43,8 @@ var ( // Server is a simple micro server abstraction type Server interface { + // Name returns server name + Name() string // Initialise options Init(...Option) error // Retrieve the options @@ -147,7 +149,7 @@ type Stream interface { type Handler interface { Name() string Handler() interface{} - Endpoints() []*registry.Endpoint + Endpoints() []*register.Endpoint Options() HandlerOptions } @@ -157,6 +159,6 @@ type Handler interface { type Subscriber interface { Topic() string Subscriber() interface{} - Endpoints() []*registry.Endpoint + Endpoints() []*register.Endpoint Options() SubscriberOptions } diff --git a/server/subscriber.go b/server/subscriber.go index 3bebc732..d4685546 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -14,7 +14,7 @@ import ( "github.com/unistack-org/micro/v3/errors" "github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/metadata" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" ) const ( @@ -39,7 +39,7 @@ type subscriber struct { typ reflect.Type subscriber interface{} handlers []*handler - endpoints []*registry.Endpoint + endpoints []*register.Endpoint opts SubscriberOptions } @@ -115,7 +115,7 @@ func ValidateSubscriber(sub Subscriber) error { } func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber { - var endpoints []*registry.Endpoint + var endpoints []*register.Endpoint var handlers []*handler options := NewSubscriberOptions(opts...) @@ -134,9 +134,9 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs } handlers = append(handlers, h) - ep := ®istry.Endpoint{ + ep := ®ister.Endpoint{ Name: "Func", - Request: registry.ExtractSubValue(typ), + Request: register.ExtractSubValue(typ), Metadata: metadata.New(2), } ep.Metadata.Set("topic", topic) @@ -161,9 +161,9 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs } handlers = append(handlers, h) - ep := ®istry.Endpoint{ + ep := ®ister.Endpoint{ Name: name + "." + method.Name, - Request: registry.ExtractSubValue(method.Type), + Request: register.ExtractSubValue(method.Type), Metadata: metadata.New(2), } ep.Metadata.Set("topic", topic) @@ -304,7 +304,7 @@ func (s *subscriber) Subscriber() interface{} { return s.subscriber } -func (s *subscriber) Endpoints() []*registry.Endpoint { +func (s *subscriber) Endpoints() []*register.Endpoint { return s.endpoints } diff --git a/service.go b/service.go index cca23d04..6480a0b4 100644 --- a/service.go +++ b/service.go @@ -1,8 +1,8 @@ +// Package micro is a pluggable framework for microservices package micro import ( "fmt" - rtime "runtime" "sync" "github.com/unistack-org/micro/v3/auth" @@ -10,31 +10,86 @@ import ( "github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/config" "github.com/unistack-org/micro/v3/logger" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/meter" + "github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/router" "github.com/unistack-org/micro/v3/server" "github.com/unistack-org/micro/v3/store" + "github.com/unistack-org/micro/v3/tracer" ) +// Service is an interface that wraps the lower level components. +// Its works as container with building blocks for service. +type Service interface { + // The service name + Name() string + // Init initialises options + Init(...Option) error + // Options returns the current options + Options() Options + // Auth is for handling auth + Auth(...string) auth.Auth + // Logger is for logs + Logger(...string) logger.Logger + // Config if for config + Config(...string) config.Config + // Client is for calling services + Client(...string) client.Client + // Broker is for sending and receiving events + Broker(...string) broker.Broker + // Server is for handling requests and events + Server(...string) server.Server + // Store is for key/val store + Store(...string) store.Store + // Register + Register(...string) register.Register + // Tracer + Tracer(...string) tracer.Tracer + // Router + Router(...string) router.Router + // Meter + Meter(...string) meter.Meter + + // Runtime + // Runtime(string) (runtime.Runtime, bool) + // Profile + // Profile(string) (profile.Profile, bool) + // Run the service + Run() error + // The service implementation + String() string +} + +// RegisterHandler is syntactic sugar for registering a handler +func RegisterHandler(s server.Server, h interface{}, opts ...server.HandlerOption) error { + return s.Handle(s.NewHandler(h, opts...)) +} + +// RegisterSubscriber is syntactic sugar for registering a subscriber +func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error { + return s.Subscribe(s.NewSubscriber(topic, h, opts...)) +} + type service struct { opts Options sync.RWMutex // once sync.Once } -func newService(opts ...Option) Service { - service := &service{opts: NewOptions(opts...)} - return service +// NewService creates and returns a new Service based on the packages within. +func NewService(opts ...Option) Service { + return &service{opts: NewOptions(opts...)} } func (s *service) Name() string { - return s.opts.Server.Options().Name + return s.opts.Name } // Init initialises options. Additionally it calls cmd.Init // which parses command line flags. cmd.Init is only called // on first Init. func (s *service) Init(opts ...Option) error { + var err error // process options for _, o := range opts { o(&s.opts) @@ -45,60 +100,48 @@ func (s *service) Init(opts ...Option) error { // skip config as the struct not passed continue } - if err := cfg.Init(config.Context(s.opts.Context)); err != nil { + if err = cfg.Init(config.Context(s.opts.Context)); err != nil { return err } - if err := cfg.Load(s.opts.Context); err != nil { + if err = cfg.Load(s.opts.Context); err != nil { return err } } - if s.opts.Logger != nil { - if err := s.opts.Logger.Init( - logger.WithContext(s.opts.Context), - ); err != nil { + for _, log := range s.opts.Loggers { + if err = log.Init(logger.WithContext(s.opts.Context)); err != nil { return err } } - if s.opts.Registry != nil { - if err := s.opts.Registry.Init( - registry.Context(s.opts.Context), - ); err != nil { + for _, reg := range s.opts.Registers { + if err = reg.Init(register.Context(s.opts.Context)); err != nil { return err } } - if s.opts.Broker != nil { - if err := s.opts.Broker.Init( - broker.Context(s.opts.Context), - ); err != nil { + for _, brk := range s.opts.Brokers { + if err = brk.Init(broker.Context(s.opts.Context)); err != nil { return err } } - if s.opts.Store != nil { - if err := s.opts.Store.Init( - store.Context(s.opts.Context), - ); err != nil { + for _, str := range s.opts.Stores { + if err = str.Init(store.Context(s.opts.Context)); err != nil { return err } } - if s.opts.Server != nil { - if err := s.opts.Server.Init( - server.Context(s.opts.Context), - ); err != nil { + for _, srv := range s.opts.Servers { + if err = srv.Init(server.Context(s.opts.Context)); err != nil { return err } } - if s.opts.Client != nil { - if err := s.opts.Client.Init( - client.Context(s.opts.Context), - ); err != nil { + for _, cli := range s.opts.Clients { + if err = cli.Init(client.Context(s.opts.Context)); err != nil { return err } } @@ -110,36 +153,93 @@ func (s *service) Options() Options { return s.opts } -func (s *service) Broker() broker.Broker { - return s.opts.Broker +func (s *service) Broker(names ...string) broker.Broker { + idx := 0 + if len(names) == 1 { + idx = getNameIndex(names[0], s.opts.Brokers) + } + return s.opts.Brokers[idx] + } -func (s *service) Client() client.Client { - return s.opts.Client +func (s *service) Tracer(names ...string) tracer.Tracer { + idx := 0 + if len(names) == 1 { + idx = getNameIndex(names[0], s.opts.Tracers) + } + return s.opts.Tracers[idx] } -func (s *service) Server() server.Server { - return s.opts.Server +func (s *service) Config(names ...string) config.Config { + idx := 0 + if len(names) == 1 { + idx = getNameIndex(names[0], s.opts.Configs) + } + return s.opts.Configs[idx] } -func (s *service) Store() store.Store { - return s.opts.Store +func (s *service) Client(names ...string) client.Client { + idx := 0 + if len(names) == 1 { + idx = getNameIndex(names[0], s.opts.Clients) + } + return s.opts.Clients[idx] } -func (s *service) Registry() registry.Registry { - return s.opts.Registry +func (s *service) Server(names ...string) server.Server { + idx := 0 + if len(names) == 1 { + idx = getNameIndex(names[0], s.opts.Servers) + } + return s.opts.Servers[idx] } -func (s *service) Logger() logger.Logger { - return s.opts.Logger +func (s *service) Store(names ...string) store.Store { + idx := 0 + if len(names) == 1 { + idx = getNameIndex(names[0], s.opts.Stores) + } + return s.opts.Stores[idx] } -func (s *service) Auth() auth.Auth { - return s.opts.Auth +func (s *service) Register(names ...string) register.Register { + idx := 0 + if len(names) == 1 { + idx = getNameIndex(names[0], s.opts.Registers) + } + return s.opts.Registers[idx] } -func (s *service) Router() router.Router { - return s.opts.Router +func (s *service) Logger(names ...string) logger.Logger { + idx := 0 + if len(names) == 1 { + idx = getNameIndex(names[0], s.opts.Loggers) + } + return s.opts.Loggers[idx] +} + +func (s *service) Auth(names ...string) auth.Auth { + idx := 0 + if len(names) == 1 { + idx = getNameIndex(names[0], s.opts.Auths) + } + return s.opts.Auths[idx] +} + +func (s *service) Router(names ...string) router.Router { + idx := 0 + if len(names) == 1 { + idx = getNameIndex(names[0], s.opts.Routers) + } + return s.opts.Routers[idx] +} + +func (s *service) Meter(names ...string) meter.Meter { + idx := 0 + if len(names) == 1 { + idx = getNameIndex(names[0], s.opts.Meters) + } + return s.opts.Meters[idx] } func (s *service) String() string { @@ -153,8 +253,8 @@ func (s *service) Start() error { config := s.opts s.RUnlock() - if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(s.opts.Context, "starting [service] %s", s.Name()) + if config.Loggers[0].V(logger.InfoLevel) { + config.Loggers[0].Infof(s.opts.Context, "starting [service] %s", s.Name()) } for _, fn := range s.opts.BeforeStart { @@ -169,35 +269,37 @@ func (s *service) Start() error { continue } - if err := cfg.Load(s.opts.Context); err != nil { + if err = cfg.Load(s.opts.Context); err != nil { return err } } - if s.opts.Server == nil { + if len(s.opts.Servers) == 0 { return fmt.Errorf("cant start nil server") } - if s.opts.Registry != nil { - if err := s.opts.Registry.Connect(s.opts.Context); err != nil { + for _, reg := range s.opts.Registers { + if err = reg.Connect(s.opts.Context); err != nil { return err } } - if s.opts.Broker != nil { - if err := s.opts.Broker.Connect(s.opts.Context); err != nil { + for _, brk := range s.opts.Brokers { + if err = brk.Connect(s.opts.Context); err != nil { return err } } - if s.opts.Store != nil { - if err := s.opts.Store.Connect(s.opts.Context); err != nil { + for _, str := range s.opts.Stores { + if err = str.Connect(s.opts.Context); err != nil { return err } } - if err = s.opts.Server.Start(); err != nil { - return err + for _, srv := range s.opts.Servers { + if err = srv.Start(); err != nil { + return err + } } for _, fn := range s.opts.AfterStart { @@ -214,8 +316,8 @@ func (s *service) Stop() error { config := s.opts s.RUnlock() - if config.Logger.V(logger.InfoLevel) { - config.Logger.Infof(s.opts.Context, "stoppping [service] %s", s.Name()) + if config.Loggers[0].V(logger.InfoLevel) { + config.Loggers[0].Infof(s.opts.Context, "stoppping [service] %s", s.Name()) } var err error @@ -225,8 +327,10 @@ func (s *service) Stop() error { } } - if err = s.opts.Server.Stop(); err != nil { - return err + for _, srv := range s.opts.Servers { + if err = srv.Stop(); err != nil { + return err + } } for _, fn := range s.opts.AfterStop { @@ -235,20 +339,20 @@ func (s *service) Stop() error { } } - if s.opts.Registry != nil { - if err := s.opts.Registry.Disconnect(s.opts.Context); err != nil { + for _, reg := range s.opts.Registers { + if err = reg.Disconnect(s.opts.Context); err != nil { return err } } - if s.opts.Broker != nil { - if err := s.opts.Broker.Disconnect(s.opts.Context); err != nil { + for _, brk := range s.opts.Brokers { + if err = brk.Disconnect(s.opts.Context); err != nil { return err } } - if s.opts.Store != nil { - if err := s.opts.Store.Disconnect(s.opts.Context); err != nil { + for _, str := range s.opts.Stores { + if err = str.Disconnect(s.opts.Context); err != nil { return err } } @@ -258,18 +362,19 @@ func (s *service) Stop() error { func (s *service) Run() error { // start the profiler - if s.opts.Profile != nil { - // to view mutex contention - rtime.SetMutexProfileFraction(5) - // to view blocking profile - rtime.SetBlockProfileRate(1) + /* + if s.opts.Profile != nil { + // to view mutex contention + rtime.SetMutexProfileFraction(5) + // to view blocking profile + rtime.SetBlockProfileRate(1) - if err := s.opts.Profile.Start(); err != nil { - return err + if err := s.opts.Profile.Start(); err != nil { + return err + } + defer s.opts.Profile.Stop() } - defer s.opts.Profile.Stop() - } - + */ if err := s.Start(); err != nil { return err } @@ -279,3 +384,16 @@ func (s *service) Run() error { return s.Stop() } + +type nameIface interface { + Name() string +} + +func getNameIndex(n string, ifaces ...interface{}) int { + for idx, iface := range ifaces { + if ifc, ok := iface.(nameIface); ok && ifc.Name() == n { + return idx + } + } + return 0 +} diff --git a/store/noop.go b/store/noop.go index 59767ae1..2313c10a 100644 --- a/store/noop.go +++ b/store/noop.go @@ -23,6 +23,11 @@ func (n *noopStore) Options() Options { return n.opts } +// Name +func (n *noopStore) Name() string { + return n.opts.Name +} + // String returns string representation func (n *noopStore) String() string { return "noop" diff --git a/store/options.go b/store/options.go index 3cdf0f49..f028a0a2 100644 --- a/store/options.go +++ b/store/options.go @@ -14,6 +14,7 @@ import ( // Options contains configuration for the Store type Options struct { + Name string // Nodes contains the addresses or other connection information of the backing storage. // For example, an etcd implementation would contain the nodes of the cluster. // A SQL implementation could contain one or more connection strings. diff --git a/store/store.go b/store/store.go index ad526974..34d0e08d 100644 --- a/store/store.go +++ b/store/store.go @@ -20,6 +20,7 @@ var ( // Store is a data storage interface type Store interface { + Name() string // Init initialises the store Init(opts ...Option) error // Connect is used when store needs to be connected diff --git a/tracer/noop.go b/tracer/noop.go index 09f108df..935cbdb2 100644 --- a/tracer/noop.go +++ b/tracer/noop.go @@ -6,6 +6,10 @@ type noopTracer struct { opts Options } +func (n *noopTracer) Name() string { + return n.opts.Name +} + // Init initilize tracer func (n *noopTracer) Init(opts ...Option) error { for _, o := range opts { @@ -19,6 +23,11 @@ func (n *noopTracer) Start(ctx context.Context, name string) (context.Context, * return nil, nil } +// Lookup get span from context +func (n *noopTracer) Lookup(ctx context.Context) (*Span, error) { + return nil, nil +} + // Finish finishes span func (n *noopTracer) Finish(*Span) error { return nil diff --git a/tracer/options.go b/tracer/options.go index 29fbbdc2..9a6b58b1 100644 --- a/tracer/options.go +++ b/tracer/options.go @@ -9,6 +9,7 @@ var ( // Options struct type Options struct { + Name string // Logger is the logger for messages Logger logger.Logger // Size is the size of ring buffer diff --git a/tracer/trace.go b/tracer/tracer.go similarity index 90% rename from tracer/trace.go rename to tracer/tracer.go index 434843cb..09c82085 100644 --- a/tracer/trace.go +++ b/tracer/tracer.go @@ -15,10 +15,14 @@ var ( // Tracer is an interface for distributed tracing type Tracer interface { + Name() string + Init(...Option) error // Start a trace Start(ctx context.Context, name string) (context.Context, *Span) // Finish the trace Finish(*Span) error + // Lookup get span from context + Lookup(ctx context.Context) (*Span, error) // Read the traces Read(...ReadOption) ([]*Span, error) } diff --git a/util/http/http_test.go b/util/http/http_test.go index 391e0797..dc322bc7 100644 --- a/util/http/http_test.go +++ b/util/http/http_test.go @@ -8,15 +8,15 @@ import ( "net/http" "testing" - "github.com/unistack-org/micro/v3/registry" - "github.com/unistack-org/micro/v3/registry/memory" + "github.com/unistack-org/micro/v3/register" + "github.com/unistack-org/micro/v3/register/memory" "github.com/unistack-org/micro/v3/router" - regRouter "github.com/unistack-org/micro/v3/router/registry" + regRouter "github.com/unistack-org/micro/v3/router/register" ) func TestRoundTripper(t *testing.T) { - m := memory.NewRegistry() - r := regRouter.NewRouter(router.Registry(m)) + m := memory.NewRegister() + r := regRouter.NewRouter(router.Register(m)) rt := NewRoundTripper(WithRouter(r)) @@ -32,9 +32,9 @@ func TestRoundTripper(t *testing.T) { go http.Serve(l, nil) - m.Register(®istry.Service{ + m.Register(®ister.Service{ Name: "example.com", - Nodes: []*registry.Node{ + Nodes: []*register.Node{ { Id: "1", Address: l.Addr().String(), diff --git a/util/registry/util.go b/util/registry/util.go index 41b51f7e..f7d7f440 100644 --- a/util/registry/util.go +++ b/util/registry/util.go @@ -1,11 +1,11 @@ -package registry +package register import ( - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" ) -func addNodes(old, neu []*registry.Node) []*registry.Node { - nodes := make([]*registry.Node, len(neu)) +func addNodes(old, neu []*register.Node) []*register.Node { + nodes := make([]*register.Node, len(neu)) // add all new nodes for i, n := range neu { node := *n @@ -35,8 +35,8 @@ func addNodes(old, neu []*registry.Node) []*registry.Node { return nodes } -func delNodes(old, del []*registry.Node) []*registry.Node { - var nodes []*registry.Node +func delNodes(old, del []*register.Node) []*register.Node { + var nodes []*register.Node for _, o := range old { var rem bool for _, n := range del { @@ -53,24 +53,24 @@ func delNodes(old, del []*registry.Node) []*registry.Node { } // CopyService make a copy of service -func CopyService(service *registry.Service) *registry.Service { +func CopyService(service *register.Service) *register.Service { // copy service - s := ®istry.Service{} + s := ®ister.Service{} *s = *service // copy nodes - nodes := make([]*registry.Node, len(service.Nodes)) + nodes := make([]*register.Node, len(service.Nodes)) for j, node := range service.Nodes { - n := ®istry.Node{} + n := ®ister.Node{} *n = *node nodes[j] = n } s.Nodes = nodes // copy endpoints - eps := make([]*registry.Endpoint, len(service.Endpoints)) + eps := make([]*register.Endpoint, len(service.Endpoints)) for j, ep := range service.Endpoints { - e := ®istry.Endpoint{} + e := ®ister.Endpoint{} *e = *ep eps[j] = e } @@ -79,8 +79,8 @@ func CopyService(service *registry.Service) *registry.Service { } // Copy makes a copy of services -func Copy(current []*registry.Service) []*registry.Service { - services := make([]*registry.Service, len(current)) +func Copy(current []*register.Service) []*register.Service { + services := make([]*register.Service, len(current)) for i, service := range current { services[i] = CopyService(service) } @@ -88,14 +88,14 @@ func Copy(current []*registry.Service) []*registry.Service { } // Merge merges two lists of services and returns a new copy -func Merge(olist []*registry.Service, nlist []*registry.Service) []*registry.Service { - var srv []*registry.Service +func Merge(olist []*register.Service, nlist []*register.Service) []*register.Service { + var srv []*register.Service for _, n := range nlist { var seen bool for _, o := range olist { if o.Version == n.Version { - sp := ®istry.Service{} + sp := ®ister.Service{} // make copy *sp = *o // set nodes @@ -106,25 +106,25 @@ func Merge(olist []*registry.Service, nlist []*registry.Service) []*registry.Ser srv = append(srv, sp) break } else { - sp := ®istry.Service{} + sp := ®ister.Service{} // make copy *sp = *o srv = append(srv, sp) } } if !seen { - srv = append(srv, Copy([]*registry.Service{n})...) + srv = append(srv, Copy([]*register.Service{n})...) } } return srv } // Remove removes services and returns a new copy -func Remove(old, del []*registry.Service) []*registry.Service { - var services []*registry.Service +func Remove(old, del []*register.Service) []*register.Service { + var services []*register.Service for _, o := range old { - srv := ®istry.Service{} + srv := ®ister.Service{} *srv = *o var rem bool diff --git a/util/registry/util_test.go b/util/registry/util_test.go index dadd6232..2e3120f0 100644 --- a/util/registry/util_test.go +++ b/util/registry/util_test.go @@ -1,18 +1,18 @@ -package registry +package register import ( "os" "testing" - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" ) func TestRemove(t *testing.T) { - services := []*registry.Service{ + services := []*register.Service{ { Name: "foo", Version: "1.0.0", - Nodes: []*registry.Node{ + Nodes: []*register.Node{ { Id: "foo-123", Address: "localhost:9999", @@ -22,7 +22,7 @@ func TestRemove(t *testing.T) { { Name: "foo", Version: "1.0.0", - Nodes: []*registry.Node{ + Nodes: []*register.Node{ { Id: "foo-123", Address: "localhost:6666", @@ -31,7 +31,7 @@ func TestRemove(t *testing.T) { }, } - servs := Remove([]*registry.Service{services[0]}, []*registry.Service{services[1]}) + servs := Remove([]*register.Service{services[0]}, []*register.Service{services[1]}) if i := len(servs); i > 0 { t.Errorf("Expected 0 nodes, got %d: %+v", i, servs) } @@ -41,11 +41,11 @@ func TestRemove(t *testing.T) { } func TestRemoveNodes(t *testing.T) { - services := []*registry.Service{ + services := []*register.Service{ { Name: "foo", Version: "1.0.0", - Nodes: []*registry.Node{ + Nodes: []*register.Node{ { Id: "foo-123", Address: "localhost:9999", @@ -59,7 +59,7 @@ func TestRemoveNodes(t *testing.T) { { Name: "foo", Version: "1.0.0", - Nodes: []*registry.Node{ + Nodes: []*register.Node{ { Id: "foo-123", Address: "localhost:6666", diff --git a/util/router/router.go b/util/router/router.go index 151188e8..35a039b5 100644 --- a/util/router/router.go +++ b/util/router/router.go @@ -1,7 +1,7 @@ package router import ( - "github.com/unistack-org/micro/v3/registry" + "github.com/unistack-org/micro/v3/register" "github.com/unistack-org/micro/v3/router" ) @@ -19,7 +19,7 @@ func (r *apiRouter) String() string { } // New router is a hack for API routing -func New(srvs []*registry.Service) router.Router { +func New(srvs []*register.Service) router.Router { var routes []router.Route for _, srv := range srvs { diff --git a/util/sync/sync.go b/util/sync/sync.go index 25779a8e..9c0b62c6 100644 --- a/util/sync/sync.go +++ b/util/sync/sync.go @@ -78,6 +78,11 @@ func (c *syncStore) Init(opts ...store.Option) error { return nil } +// Name returns the store name +func (c *syncStore) Name() string { + return c.storeOpts.Name +} + // Options returns the sync's store options func (c *syncStore) Options() store.Options { return c.storeOpts