micro: rewrite options to support multiple building blocks

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2021-01-29 13:17:32 +03:00
parent ac8a3a12c4
commit 827d467077
57 changed files with 1283 additions and 644 deletions

View File

@ -6,7 +6,7 @@ import (
"strings" "strings"
"github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
) )
@ -56,7 +56,7 @@ type Service struct {
// The endpoint for this service // The endpoint for this service
Endpoint *Endpoint Endpoint *Endpoint
// Versions of this service // Versions of this service
Services []*registry.Service Services []*register.Service
} }
func strip(s string) string { func strip(s string) string {

View File

@ -11,7 +11,7 @@ import (
"github.com/unistack-org/micro/v3/api" "github.com/unistack-org/micro/v3/api"
"github.com/unistack-org/micro/v3/api/handler" "github.com/unistack-org/micro/v3/api/handler"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
) )
const ( const (
@ -70,7 +70,7 @@ func (h *httpHandler) getService(r *http.Request) (string, error) {
} }
// get the nodes for this service // get the nodes for this service
nodes := make([]*registry.Node, 0, len(service.Services)) nodes := make([]*register.Node, 0, len(service.Services))
for _, srv := range service.Services { for _, srv := range service.Services {
nodes = append(nodes, srv.Nodes...) nodes = append(nodes, srv.Nodes...)
} }

View File

@ -12,13 +12,13 @@ import (
"github.com/unistack-org/micro/v3/api/resolver" "github.com/unistack-org/micro/v3/api/resolver"
"github.com/unistack-org/micro/v3/api/resolver/vpath" "github.com/unistack-org/micro/v3/api/resolver/vpath"
"github.com/unistack-org/micro/v3/api/router" "github.com/unistack-org/micro/v3/api/router"
regRouter "github.com/unistack-org/micro/v3/api/router/registry" regRouter "github.com/unistack-org/micro/v3/api/router/register"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/registry/memory" "github.com/unistack-org/micro/v3/register/memory"
) )
func testHttp(t *testing.T, path, service, ns string) { func testHttp(t *testing.T, path, service, ns string) {
r := memory.NewRegistry() r := memory.NewRegister()
l, err := net.Listen("tcp", "127.0.0.1:0") l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil { if err != nil {
@ -26,9 +26,9 @@ func testHttp(t *testing.T, path, service, ns string) {
} }
defer l.Close() defer l.Close()
s := &registry.Service{ s := &register.Service{
Name: service, Name: service,
Nodes: []*registry.Node{ Nodes: []*register.Node{
{ {
Id: service + "-1", Id: service + "-1",
Address: l.Addr().String(), Address: l.Addr().String(),
@ -58,7 +58,7 @@ func testHttp(t *testing.T, path, service, ns string) {
// initialise the handler // initialise the handler
rt := regRouter.NewRouter( rt := regRouter.NewRouter(
router.WithHandler("http"), router.WithHandler("http"),
router.WithRegistry(r), router.WithRegister(r),
router.WithResolver(vpath.NewResolver( router.WithResolver(vpath.NewResolver(
resolver.WithServicePrefix(ns), resolver.WithServicePrefix(ns),
)), )),

View File

@ -14,7 +14,7 @@ import (
"github.com/unistack-org/micro/v3/api" "github.com/unistack-org/micro/v3/api"
"github.com/unistack-org/micro/v3/api/handler" "github.com/unistack-org/micro/v3/api/handler"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
) )
const ( const (
@ -72,7 +72,7 @@ func (wh *webHandler) getService(r *http.Request) (string, error) {
} }
// get the nodes // get the nodes
nodes := make([]*registry.Node, 0, len(service.Services)) nodes := make([]*register.Node, 0, len(service.Services))
for _, srv := range service.Services { for _, srv := range service.Services {
nodes = append(nodes, srv.Nodes...) nodes = append(nodes, srv.Nodes...)
} }

View File

@ -3,7 +3,7 @@ package resolver
import ( import (
"context" "context"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
) )
// Options struct // Options struct
@ -58,7 +58,7 @@ func Domain(n string) ResolveOption {
// NewResolveOptions returns new initialised resolve options // NewResolveOptions returns new initialised resolve options
func NewResolveOptions(opts ...ResolveOption) ResolveOptions { func NewResolveOptions(opts ...ResolveOption) ResolveOptions {
options := ResolveOptions{Domain: registry.DefaultDomain} options := ResolveOptions{Domain: register.DefaultDomain}
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }

View File

@ -6,12 +6,12 @@ import (
"github.com/unistack-org/micro/v3/api/resolver" "github.com/unistack-org/micro/v3/api/resolver"
"github.com/unistack-org/micro/v3/api/resolver/vpath" "github.com/unistack-org/micro/v3/api/resolver/vpath"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
) )
type Options struct { type Options struct {
Handler string Handler string
Registry registry.Registry Register register.Register
Resolver resolver.Resolver Resolver resolver.Resolver
Logger logger.Logger Logger logger.Logger
Context context.Context Context context.Context
@ -52,10 +52,10 @@ func WithHandler(h string) Option {
} }
} }
// WithRegistry sets the registry // WithRegister sets the register
func WithRegistry(r registry.Registry) Option { func WithRegister(r register.Register) Option {
return func(o *Options) { return func(o *Options) {
o.Registry = r o.Register = r
} }
} }

View File

@ -14,6 +14,7 @@ var (
// Broker is an interface used for asynchronous messaging. // Broker is an interface used for asynchronous messaging.
type Broker interface { type Broker interface {
Name() string
Init(...Option) error Init(...Option) error
Options() Options Options() Options
Address() string Address() string

View File

@ -16,6 +16,10 @@ func NewBroker(opts ...Option) Broker {
return &noopBroker{opts: NewOptions(opts...)} return &noopBroker{opts: NewOptions(opts...)}
} }
func (n *noopBroker) Name() string {
return n.opts.Name
}
// Init initialize broker // Init initialize broker
func (n *noopBroker) Init(opts ...Option) error { func (n *noopBroker) Init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {

View File

@ -7,12 +7,13 @@ import (
"github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/meter" "github.com/unistack-org/micro/v3/meter"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/tracer" "github.com/unistack-org/micro/v3/tracer"
) )
// Options struct // Options struct
type Options struct { type Options struct {
Name string
// Addrs useed by broker // Addrs useed by broker
Addrs []string Addrs []string
// ErrorHandler executed when errors occur processing messages // ErrorHandler executed when errors occur processing messages
@ -27,8 +28,8 @@ type Options struct {
Tracer tracer.Tracer Tracer tracer.Tracer
// TLSConfig for secure communication // TLSConfig for secure communication
TLSConfig *tls.Config TLSConfig *tls.Config
// Registry used for clustering // Register used for clustering
Registry registry.Registry Register register.Register
// Context is used for non default options // Context is used for non default options
Context context.Context Context context.Context
} }
@ -36,7 +37,7 @@ type Options struct {
// NewOptions create new Options // NewOptions create new Options
func NewOptions(opts ...Option) Options { func NewOptions(opts ...Option) Options {
options := Options{ options := Options{
Registry: registry.DefaultRegistry, Register: register.DefaultRegister,
Logger: logger.DefaultLogger, Logger: logger.DefaultLogger,
Context: context.Background(), Context: context.Background(),
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
@ -202,10 +203,10 @@ func SubscribeGroup(name string) SubscribeOption {
} }
} }
// Registry sets registry option // Register sets register option
func Registry(r registry.Registry) Option { func Register(r register.Register) Option {
return func(o *Options) { return func(o *Options) {
o.Registry = r o.Register = r
} }
} }
@ -237,6 +238,13 @@ func Meter(m meter.Meter) Option {
} }
} }
// Name sets the name
func Name(n string) Option {
return func(o *Options) {
o.Name = n
}
}
// SubscribeContext set context // SubscribeContext set context
func SubscribeContext(ctx context.Context) SubscribeOption { func SubscribeContext(ctx context.Context) SubscribeOption {
return func(o *SubscribeOptions) { return func(o *SubscribeOptions) {

View File

@ -18,6 +18,7 @@ var (
// It supports Request/Response via Transport and Publishing via the Broker. // It supports Request/Response via Transport and Publishing via the Broker.
// It also supports bidirectional streaming of requests. // It also supports bidirectional streaming of requests.
type Client interface { type Client interface {
Name() string
Init(...Option) error Init(...Option) error
Options() Options Options() Options
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message NewMessage(topic string, msg interface{}, opts ...MessageOption) Message

View File

@ -49,6 +49,10 @@ func NewClient(opts ...Option) Client {
return &noopClient{opts: NewOptions(opts...)} return &noopClient{opts: NewOptions(opts...)}
} }
func (n *noopClient) Name() string {
return n.opts.Name
}
func (n *noopRequest) Service() string { func (n *noopRequest) Service() string {
return n.service return n.service
} }

View File

@ -9,7 +9,7 @@ import (
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/meter" "github.com/unistack-org/micro/v3/meter"
"github.com/unistack-org/micro/v3/network/transport" "github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/router" "github.com/unistack-org/micro/v3/router"
"github.com/unistack-org/micro/v3/selector" "github.com/unistack-org/micro/v3/selector"
"github.com/unistack-org/micro/v3/selector/random" "github.com/unistack-org/micro/v3/selector/random"
@ -18,6 +18,7 @@ import (
// Options holds client options // Options holds client options
type Options struct { type Options struct {
Name string
// Used to select codec // Used to select codec
ContentType string ContentType string
// Proxy address to send requests via // Proxy address to send requests via
@ -246,11 +247,11 @@ func Transport(t transport.Transport) Option {
} }
} }
// Registry sets the routers registry // Register sets the routers register
func Registry(r registry.Registry) Option { func Register(r register.Register) Option {
return func(o *Options) { return func(o *Options) {
if o.Router != nil { if o.Router != nil {
o.Router.Init(router.Registry(r)) o.Router.Init(router.Register(r))
} }
} }
} }
@ -291,6 +292,13 @@ func Backoff(fn BackoffFunc) Option {
} }
} }
// Name sets the client name
func Name(n string) Option {
return func(o *Options) {
o.Name = n
}
}
// Lookup sets the lookup function to use for resolving service names // Lookup sets the lookup function to use for resolving service names
func Lookup(l LookupFunc) Option { func Lookup(l LookupFunc) Option {
return func(o *Options) { return func(o *Options) {

22
context.go Normal file
View File

@ -0,0 +1,22 @@
package micro
import "context"
type serviceKey struct{}
// FromContext retrieves a Service from the Context.
func FromContext(ctx context.Context) (Service, bool) {
if ctx == nil {
return nil, false
}
s, ok := ctx.Value(serviceKey{}).(Service)
return s, ok
}
// NewContext returns a new Context with the Service embedded within it.
func NewContext(ctx context.Context, s Service) context.Context {
if ctx == nil {
ctx = context.Background()
}
return context.WithValue(ctx, serviceKey{}, s)
}

View File

@ -6,11 +6,22 @@ import (
"github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/client"
) )
// Event is used to publish messages to a topic
type Event interface {
// Publish publishes a message to the event topic
Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
}
type event struct { type event struct {
c client.Client c client.Client
topic string topic string
} }
// NewEvent creates a new event publisher
func NewEvent(topic string, c client.Client) Event {
return &event{c, topic}
}
func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error { func (e *event) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...) return e.c.Publish(ctx, e.c.NewMessage(e.topic, msg), opts...)
} }

View File

@ -33,7 +33,7 @@ type Store interface {
type Event struct { type Event struct {
// ID to uniquely identify the event // ID to uniquely identify the event
ID string ID string
// Topic of event, e.g. "registry.service.created" // Topic of event, e.g. "register.service.created"
Topic string Topic string
// Timestamp of the event // Timestamp of the event
Timestamp time.Time Timestamp time.Time

View File

@ -1,3 +1,5 @@
// +build ignore
package micro package micro
import ( import (
@ -7,11 +9,28 @@ import (
"github.com/unistack-org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
) )
// Function is a one time executing Service
type Function interface {
// Inherits Service interface
Service
// Done signals to complete execution
Done() error
// Handle registers an RPC handler
Handle(v interface{}) error
// Subscribe registers a subscriber
Subscribe(topic string, v interface{}) error
}
type function struct { type function struct {
cancel context.CancelFunc cancel context.CancelFunc
Service Service
} }
// NewFunction returns a new Function for a one time executing Service
func NewFunction(opts ...Option) Function {
return newFunction(opts...)
}
func fnHandlerWrapper(f Function) server.HandlerWrapper { func fnHandlerWrapper(f Function) server.HandlerWrapper {
return func(h server.HandlerFunc) server.HandlerFunc { return func(h server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error { return func(ctx context.Context, req server.Request, rsp interface{}) error {
@ -45,7 +64,7 @@ func newFunction(opts ...Option) Function {
// make context the last thing // make context the last thing
fopts = append(fopts, Context(ctx)) fopts = append(fopts, Context(ctx))
service := newService(fopts...) service := &service{opts: NewOptions(opts...)}
fn := &function{ fn := &function{
cancel: cancel, cancel: cancel,

View File

@ -7,18 +7,18 @@ import (
"sync" "sync"
"testing" "testing"
rmemory "github.com/unistack-org/micro-registry-memory" rmemory "github.com/unistack-org/micro-register-memory"
) )
func TestFunction(t *testing.T) { func TestFunction(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)
r := rmemory.NewRegistry() r := rmemory.NewRegister()
// create service // create service
fn := NewFunction( fn := NewFunction(
Registry(r), Register(r),
Name("test.function"), Name("test.function"),
AfterStart(func() error { AfterStart(func() error {
wg.Done() wg.Done()

1
go.sum
View File

@ -52,6 +52,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/unistack-org/micro v1.18.0 h1:EbFiII0bKV0Xcua7o6J30MFmm4/g0Hv3ECOKzsUBihU=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY=

View File

@ -7,6 +7,11 @@ import (
"sort" "sort"
) )
var (
// HeaderPrefix for all headers passed
HeaderPrefix = "Micro-"
)
type metadataKey struct{} type metadataKey struct{}
// Metadata is our way of representing request headers internally. // Metadata is our way of representing request headers internally.

View File

@ -1,22 +0,0 @@
metrics
=======
The metrics package provides a simple metrics "Reporter" interface which allows the user to submit counters, gauges and timings (along with key/value tags).
Implementations
---------------
* Prometheus (pull): will be first
* Prometheus (push): certainly achievable
* InfluxDB: could quite easily be done
* Telegraf: almost identical to the InfluxDB implementation
* Micro: Could we provide metrics over Micro's server interface?
Todo
----
* Include a handler middleware which uses the Reporter interface to generate per-request level metrics
- Throughput
- Errors
- Duration

View File

@ -24,6 +24,7 @@ var (
// Meter is an interface for collecting and instrumenting metrics // Meter is an interface for collecting and instrumenting metrics
type Meter interface { type Meter interface {
Name() string
Init(...Option) error Init(...Option) error
Counter(string, ...Option) Counter Counter(string, ...Option) Counter
FloatCounter(string, ...Option) FloatCounter FloatCounter(string, ...Option) FloatCounter

View File

@ -35,20 +35,20 @@ func TestLabelsAppend(t *testing.T) {
ls.vals = []string{"noop", "http"} ls.vals = []string{"noop", "http"}
var nls Labels var nls Labels
nls.keys = []string{"registry"} nls.keys = []string{"register"}
nls.vals = []string{"gossip"} nls.vals = []string{"gossip"}
ls = ls.Append(nls) ls = ls.Append(nls)
ls.Sort() ls.Sort()
if ls.keys[0] != "registry" || ls.vals[0] != "gossip" { if ls.keys[0] != "register" || ls.vals[0] != "gossip" {
t.Fatalf("append error: %v", ls) t.Fatalf("append error: %v", ls)
} }
} }
func TestIterator(t *testing.T) { func TestIterator(t *testing.T) {
var ls Labels var ls Labels
ls.keys = []string{"type", "server", "registry"} ls.keys = []string{"type", "server", "register"}
ls.vals = []string{"noop", "http", "gossip"} ls.vals = []string{"noop", "http", "gossip"}
iter := ls.Iter() iter := ls.Iter()

View File

@ -16,6 +16,10 @@ func NewMeter(opts ...Option) Meter {
return &noopMeter{opts: NewOptions(opts...)} return &noopMeter{opts: NewOptions(opts...)}
} }
func (r *noopMeter) Name() string {
return r.opts.Name
}
// Init initialize options // Init initialize options
func (r *noopMeter) Init(opts ...Option) error { func (r *noopMeter) Init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {

View File

@ -11,6 +11,7 @@ type Option func(*Options)
// Options for metrics implementations: // Options for metrics implementations:
type Options struct { type Options struct {
Name string
Address string Address string
Path string Path string
Labels Labels Labels Labels

232
meter/wrapper/wrapper.go Normal file
View File

@ -0,0 +1,232 @@
// +build ignore
package wrapper
import (
"context"
"fmt"
"time"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/meter"
"github.com/unistack-org/micro/v3/server"
)
type Options struct {
Meter meter.Meter
Name string
Version string
ID string
}
type Option func(*Options)
func ServiceName(name string) Option {
return func(o *Options) {
o.Name = name
}
}
func ServiceVersion(version string) Option {
return func(o *Options) {
o.Version = version
}
}
func ServiceID(id string) Option {
return func(o *Options) {
o.ID = id
}
}
func Meter(m meter.Meter) Option {
return func(o *Options) {
o.Meter = m
}
}
type wrapper struct {
options Options
callFunc client.CallFunc
client.Client
}
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
handler := &wrapper{
labels: labels,
Client: c,
}
return handler
}
}
func NewCallWrapper(opts ...Option) client.CallWrapper {
labels := getLabels(opts...)
return func(fn client.CallFunc) client.CallFunc {
handler := &wrapper{
labels: labels,
callFunc: fn,
}
return handler.CallFunc
}
}
func (w *wrapper) CallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("client_request_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("client_request_duration_seconds", wlabels))
ts := time.Now()
err := w.callFunc(ctx, addr, req, rsp, opts)
te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds()))
timeCounterHistogram.Update(te.Seconds())
if err == nil {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc()
} else {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc()
}
return err
}
func (w *wrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("client_request_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("client_request_duration_seconds", wlabels))
ts := time.Now()
err := w.Client.Call(ctx, req, rsp, opts...)
te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds()))
timeCounterHistogram.Update(te.Seconds())
if err == nil {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc()
} else {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc()
}
return err
}
func (w *wrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint())
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("client_request_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("client_request_duration_seconds", wlabels))
ts := time.Now()
stream, err := w.Client.Stream(ctx, req, opts...)
te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds()))
timeCounterHistogram.Update(te.Seconds())
if err == nil {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc()
} else {
metrics.GetOrCreateCounter(getName("client_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc()
}
return stream, err
}
func (w *wrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
endpoint := p.Topic()
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("publish_message_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("publish_message_duration_seconds", wlabels))
ts := time.Now()
err := w.Client.Publish(ctx, p, opts...)
te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds()))
timeCounterHistogram.Update(te.Seconds())
if err == nil {
metrics.GetOrCreateCounter(getName("publish_message_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc()
} else {
metrics.GetOrCreateCounter(getName("publish_message_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc()
}
return err
}
func NewHandlerWrapper(opts ...Option) server.HandlerWrapper {
labels := getLabels(opts...)
handler := &wrapper{
labels: labels,
}
return handler.HandlerFunc
}
func (w *wrapper) HandlerFunc(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
endpoint := req.Endpoint()
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("server_request_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("server_request_duration_seconds", wlabels))
ts := time.Now()
err := fn(ctx, req, rsp)
te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds()))
timeCounterHistogram.Update(te.Seconds())
if err == nil {
metrics.GetOrCreateCounter(getName("server_request_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc()
} else {
metrics.GetOrCreateCounter(getName("server_request_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc()
}
return err
}
}
func NewSubscriberWrapper(opts ...Option) server.SubscriberWrapper {
labels := getLabels(opts...)
handler := &wrapper{
labels: labels,
}
return handler.SubscriberFunc
}
func (w *wrapper) SubscriberFunc(fn server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Message) error {
endpoint := msg.Topic()
wlabels := append(w.labels, fmt.Sprintf(`%sendpoint="%s"`, DefaultLabelPrefix, endpoint))
timeCounterSummary := metrics.GetOrCreateSummary(getName("subscribe_message_latency_microseconds", wlabels))
timeCounterHistogram := metrics.GetOrCreateSummary(getName("subscribe_message_duration_seconds", wlabels))
ts := time.Now()
err := fn(ctx, msg)
te := time.Since(ts)
timeCounterSummary.Update(float64(te.Seconds()))
timeCounterHistogram.Update(te.Seconds())
if err == nil {
metrics.GetOrCreateCounter(getName("subscribe_message_total", append(wlabels, fmt.Sprintf(`%sstatus="success"`, DefaultLabelPrefix)))).Inc()
} else {
metrics.GetOrCreateCounter(getName("subscribe_message_total", append(wlabels, fmt.Sprintf(`%sstatus="failure"`, DefaultLabelPrefix)))).Inc()
}
return err
}
}

119
micro.go
View File

@ -1,119 +0,0 @@
// Package micro is a pluggable framework for microservices
package micro
import (
"context"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/server"
)
type serviceKey struct{}
// Service is an interface that wraps the lower level libraries
// within micro. Its a convenience method for building
// and initialising services.
type Service interface {
// The service name
Name() string
// Init initialises options
Init(...Option) error
// Options returns the current options
Options() Options
// Client is used to call services
Client() client.Client
// Server is for handling requests and events
Server() server.Server
// Broker is for broker usage
Broker() broker.Broker
// Run the service
Run() error
// The service implementation
String() string
}
// Function is a one time executing Service
type Function interface {
// Inherits Service interface
Service
// Done signals to complete execution
Done() error
// Handle registers an RPC handler
Handle(v interface{}) error
// Subscribe registers a subscriber
Subscribe(topic string, v interface{}) error
}
/*
// Type Event is a future type for acting on asynchronous events
type Event interface {
// Publish publishes a message to the event topic
Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
// Subscribe to the event
Subscribe(ctx context.Context, v in
}
// Resource is a future type for defining dependencies
type Resource interface {
// Name of the resource
Name() string
// Type of resource
Type() string
// Method of creation
Create() error
}
*/
// Event is used to publish messages to a topic
type Event interface {
// Publish publishes a message to the event topic
Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
}
var (
// HeaderPrefix for all headers passed
HeaderPrefix = "Micro-"
)
// NewService creates and returns a new Service based on the packages within.
func NewService(opts ...Option) Service {
return newService(opts...)
}
// FromContext retrieves a Service from the Context.
func FromContext(ctx context.Context) (Service, bool) {
if ctx == nil {
return nil, false
}
s, ok := ctx.Value(serviceKey{}).(Service)
return s, ok
}
// NewContext returns a new Context with the Service embedded within it.
func NewContext(ctx context.Context, s Service) context.Context {
if ctx == nil {
ctx = context.Background()
}
return context.WithValue(ctx, serviceKey{}, s)
}
// NewFunction returns a new Function for a one time executing Service
func NewFunction(opts ...Option) Function {
return newFunction(opts...)
}
// NewEvent creates a new event publisher
func NewEvent(topic string, c client.Client) Event {
return &event{c, topic}
}
// RegisterHandler is syntactic sugar for registering a handler
func RegisterHandler(s server.Server, h interface{}, opts ...server.HandlerOption) error {
return s.Handle(s.NewHandler(h, opts...))
}
// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}

View File

@ -41,6 +41,10 @@ func (t *tunBroker) Init(opts ...broker.Option) error {
return nil return nil
} }
func (t *tunBroker) Name() string {
return t.opts.Name
}
func (t *tunBroker) Options() broker.Options { func (t *tunBroker) Options() broker.Options {
return t.opts return t.opts
} }

View File

@ -2,40 +2,44 @@ package micro
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/unistack-org/micro/v3/auth" "github.com/unistack-org/micro/v3/auth"
"github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/config" "github.com/unistack-org/micro/v3/config"
"github.com/unistack-org/micro/v3/debug/profile"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/meter" "github.com/unistack-org/micro/v3/meter"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/router" "github.com/unistack-org/micro/v3/router"
"github.com/unistack-org/micro/v3/runtime"
"github.com/unistack-org/micro/v3/selector"
"github.com/unistack-org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
"github.com/unistack-org/micro/v3/store" "github.com/unistack-org/micro/v3/store"
"github.com/unistack-org/micro/v3/tracer" "github.com/unistack-org/micro/v3/tracer"
// "github.com/unistack-org/micro/v3/debug/profile"
// "github.com/unistack-org/micro/v3/runtime"
) )
// Options for micro service // Options for micro service
type Options struct { type Options struct {
Auth auth.Auth Name string
Broker broker.Broker Version string
Logger logger.Logger Metadata metadata.Metadata
Meter meter.Meter
Auths []auth.Auth
Brokers []broker.Broker
Loggers []logger.Logger
Meters []meter.Meter
Configs []config.Config Configs []config.Config
Client client.Client Clients []client.Client
Server server.Server Servers []server.Server
Store store.Store Stores []store.Store
Registry registry.Registry Registers []register.Register
Tracer tracer.Tracer Tracers []tracer.Tracer
Router router.Router Routers []router.Router
Runtime runtime.Runtime // Runtime runtime.Runtime
Profile profile.Profile // Profile profile.Profile
// Before and After funcs // Before and After funcs
BeforeStart []func(context.Context) error BeforeStart []func(context.Context) error
@ -52,17 +56,17 @@ type Options struct {
func NewOptions(opts ...Option) Options { func NewOptions(opts ...Option) Options {
options := Options{ options := Options{
Context: context.Background(), Context: context.Background(),
Server: server.DefaultServer, Servers: []server.Server{server.DefaultServer},
Client: client.DefaultClient, Clients: []client.Client{client.DefaultClient},
Broker: broker.DefaultBroker, Brokers: []broker.Broker{broker.DefaultBroker},
Registry: registry.DefaultRegistry, Registers: []register.Register{register.DefaultRegister},
Router: router.DefaultRouter, Routers: []router.Router{router.DefaultRouter},
Auth: auth.DefaultAuth, Auths: []auth.Auth{auth.DefaultAuth},
Logger: logger.DefaultLogger, Loggers: []logger.Logger{logger.DefaultLogger},
Tracer: tracer.DefaultTracer, Tracers: []tracer.Tracer{tracer.DefaultTracer},
Meter: meter.DefaultMeter, Meters: []meter.Meter{meter.DefaultMeter},
Configs: []config.Config{config.DefaultConfig}, Configs: []config.Config{config.DefaultConfig},
Store: store.DefaultStore, Stores: []store.Store{store.DefaultStore},
//Runtime runtime.Runtime //Runtime runtime.Runtime
//Profile profile.Profile //Profile profile.Profile
} }
@ -75,274 +79,559 @@ func NewOptions(opts ...Option) Options {
} }
// Option func // Option func
type Option func(*Options) type Option func(*Options) error
// Broker to be used for service // Broker to be used for client and server
func Broker(b broker.Broker) Option { func Broker(b broker.Broker, opts ...BrokerOption) Option {
return func(o *Options) { return func(o *Options) error {
o.Broker = b var err error
if o.Client != nil { bopts := brokerOptions{}
// Update Client and Server for _, opt := range opts {
o.Client.Init(client.Broker(b)) opt(&bopts)
} }
if o.Server != nil { all := false
o.Server.Init(server.Broker(b)) if len(opts) == 0 {
all = true
} }
for _, srv := range o.Servers {
for _, os := range bopts.servers {
if srv.Name() == os || all {
if err = srv.Init(server.Broker(b)); err != nil {
return err
}
}
}
}
for _, cli := range o.Clients {
for _, oc := range bopts.clients {
if cli.Name() == oc || all {
if err = cli.Init(client.Broker(b)); err != nil {
return err
}
}
}
}
return nil
} }
} }
// Client to be used for service type brokerOptions struct {
func Client(c client.Client) Option { servers []string
return func(o *Options) { clients []string
o.Client = c }
type BrokerOption func(*brokerOptions)
func BrokerClient(n string) BrokerOption {
return func(o *brokerOptions) {
o.clients = append(o.clients, n)
}
}
func BrokerServer(n string) BrokerOption {
return func(o *brokerOptions) {
o.servers = append(o.servers, n)
}
}
// Clients to be used for service
func Clients(c ...client.Client) Option {
return func(o *Options) error {
o.Clients = c
return nil
} }
} }
// Context specifies a context for the service. // Context specifies a context for the service.
// Can be used to signal shutdown of the service and for extra option values. // Can be used to signal shutdown of the service and for extra option values.
func Context(ctx context.Context) Option { func Context(ctx context.Context) Option {
return func(o *Options) { return func(o *Options) error {
// TODO: Pass context to underline stuff ?
o.Context = ctx o.Context = ctx
return nil
} }
} }
/*
// Profile to be used for debug profile // Profile to be used for debug profile
func Profile(p profile.Profile) Option { func Profile(p profile.Profile) Option {
return func(o *Options) { return func(o *Options) {
o.Profile = p o.Profile = p
} }
} }
*/
// Server to be used for service // Servers to be used for service
func Server(s server.Server) Option { func Servers(s ...server.Server) Option {
return func(o *Options) { return func(o *Options) error {
o.Server = s o.Servers = s
return nil
} }
} }
// Store sets the store to use // Stores sets the store to use
func Store(s store.Store) Option { func Stores(s ...store.Store) Option {
return func(o *Options) { return func(o *Options) error {
o.Store = s o.Stores = s
return nil
} }
} }
// Logger set the logger to use // Logger set the logger to use
func Logger(l logger.Logger) Option { func Logger(l logger.Logger, opts ...LoggerOption) Option {
return func(o *Options) { return func(o *Options) error {
o.Logger = l var err error
lopts := loggerOptions{}
for _, opt := range opts {
opt(&lopts)
}
all := false
if len(opts) == 0 {
all = true
}
for _, srv := range o.Servers {
for _, os := range lopts.servers {
if srv.Name() == os || all {
if err = srv.Init(server.Logger(l)); err != nil {
return err
}
}
}
}
for _, cli := range o.Clients {
for _, oc := range lopts.clients {
if cli.Name() == oc || all {
if err = cli.Init(client.Logger(l)); err != nil {
return err
}
}
}
}
for _, brk := range o.Brokers {
for _, ob := range lopts.brokers {
if brk.Name() == ob || all {
if err = brk.Init(broker.Logger(l)); err != nil {
return err
}
}
}
}
for _, reg := range o.Registers {
for _, or := range lopts.registers {
if reg.Name() == or || all {
if err = reg.Init(register.Logger(l)); err != nil {
return err
}
}
}
}
for _, str := range o.Stores {
for _, or := range lopts.stores {
if str.Name() == or || all {
if err = str.Init(store.Logger(l)); err != nil {
return err
}
}
}
}
for _, mtr := range o.Meters {
for _, or := range lopts.meters {
if mtr.Name() == or || all {
if err = mtr.Init(meter.Logger(l)); err != nil {
return err
}
}
}
}
for _, trc := range o.Tracers {
for _, ot := range lopts.tracers {
if trc.Name() == ot || all {
if err = trc.Init(tracer.Logger(l)); err != nil {
return err
}
}
}
}
return nil
} }
} }
// Meter set the meter to use type LoggerOption func(*loggerOptions)
func Meter(m meter.Meter) Option {
return func(o *Options) { type loggerOptions struct {
o.Meter = m servers []string
clients []string
brokers []string
registers []string
stores []string
meters []string
tracers []string
}
/*
func LoggerServer(n string) LoggerOption {
}
*/
// Meters set the meter to use
func Meters(m ...meter.Meter) Option {
return func(o *Options) error {
o.Meters = m
return nil
} }
} }
// Registry sets the registry for the service // Register sets the register for the service
// and the underlying components // and the underlying components
func Registry(r registry.Registry) Option { func Register(r register.Register, opts ...RegisterOption) Option {
return func(o *Options) { return func(o *Options) error {
o.Registry = r var err error
if o.Router != nil { ropts := registerOptions{}
// Update router for _, opt := range opts {
o.Router.Init(router.Registry(r)) opt(&ropts)
} }
if o.Server != nil { all := false
// Update server if len(opts) == 0 {
o.Server.Init(server.Registry(r)) all = true
} }
if o.Broker != nil { for _, rtr := range o.Routers {
// Update Broker for _, os := range ropts.routers {
o.Broker.Init(broker.Registry(r)) if rtr.Name() == os || all {
if err = rtr.Init(router.Register(r)); err != nil {
return err
} }
} }
}
}
for _, srv := range o.Servers {
for _, os := range ropts.servers {
if srv.Name() == os || all {
if err = srv.Init(server.Register(r)); err != nil {
return err
}
}
}
}
for _, brk := range o.Brokers {
for _, os := range ropts.brokers {
if brk.Name() == os || all {
if err = brk.Init(broker.Register(r)); err != nil {
return err
}
}
}
}
return nil
}
} }
// Tracer sets the tracer for the service type registerOptions struct {
func Tracer(t tracer.Tracer) Option { routers []string
return func(o *Options) { servers []string
o.Tracer = t brokers []string
if o.Server != nil { }
o.Server.Init(server.Tracer(t))
} type RegisterOption func(*registerOptions)
if o.Client != nil {
o.Client.Init(client.Tracer(t)) func RegisterRouter(n string) RegisterOption {
} return func(o *registerOptions) {
o.routers = append(o.routers, n)
} }
} }
func RegisterServer(n string) RegisterOption {
return func(o *registerOptions) {
o.servers = append(o.servers, n)
}
}
func RegisterBroker(n string) RegisterOption {
return func(o *registerOptions) {
o.brokers = append(o.brokers, n)
}
}
func Tracer(t tracer.Tracer, opts ...TracerOption) Option {
return func(o *Options) error {
var err error
topts := tracerOptions{}
for _, opt := range opts {
opt(&topts)
}
all := false
if len(opts) == 0 {
all = true
}
for _, srv := range o.Servers {
for _, os := range topts.servers {
if srv.Name() == os || all {
if err = srv.Init(server.Tracer(t)); err != nil {
return err
}
}
}
}
for _, cli := range o.Clients {
for _, os := range topts.clients {
if cli.Name() == os || all {
if err = cli.Init(client.Tracer(t)); err != nil {
return err
}
}
}
}
for _, str := range o.Stores {
for _, os := range topts.stores {
if str.Name() == os || all {
if err = str.Init(store.Tracer(t)); err != nil {
return err
}
}
}
}
for _, brk := range o.Brokers {
for _, os := range topts.brokers {
if brk.Name() == os || all {
if err = brk.Init(broker.Tracer(t)); err != nil {
return err
}
}
}
}
return nil
}
}
type tracerOptions struct {
clients []string
servers []string
brokers []string
stores []string
}
type TracerOption func(*tracerOptions)
func TracerClient(n string) TracerOption {
return func(o *tracerOptions) {
o.clients = append(o.clients, n)
}
}
func TracerServer(n string) TracerOption {
return func(o *tracerOptions) {
o.servers = append(o.servers, n)
}
}
func TracerBroker(n string) TracerOption {
return func(o *tracerOptions) {
o.brokers = append(o.brokers, n)
}
}
func TracerStore(n string) TracerOption {
return func(o *tracerOptions) {
o.stores = append(o.stores, n)
}
}
/*
// Auth sets the auth for the service // Auth sets the auth for the service
func Auth(a auth.Auth) Option { func Auth(a auth.Auth) Option {
return func(o *Options) { return func(o *Options) error {
o.Auth = a o.Auth = a
if o.Server != nil { if o.Server != nil {
o.Server.Init(server.Auth(a)) o.Server.Init(server.Auth(a))
} }
return nil
} }
} }
*/
// Configs sets the configs for the service // Configs sets the configs for the service
func Configs(c ...config.Config) Option { func Configs(c ...config.Config) Option {
return func(o *Options) { return func(o *Options) error {
o.Configs = c o.Configs = c
return nil
} }
} }
/*
// Selector sets the selector for the service client // Selector sets the selector for the service client
func Selector(s selector.Selector) Option { func Selector(s selector.Selector) Option {
return func(o *Options) { return func(o *Options) error {
if o.Client != nil { if o.Client != nil {
o.Client.Init(client.Selector(s)) o.Client.Init(client.Selector(s))
} }
return nil
} }
} }
*/
/*
// Runtime sets the runtime // Runtime sets the runtime
func Runtime(r runtime.Runtime) Option { func Runtime(r runtime.Runtime) Option {
return func(o *Options) { return func(o *Options) {
o.Runtime = r o.Runtime = r
} }
} }
*/
// Router sets the router // Router sets the router
func Router(r router.Router) Option { func Router(r router.Router, opts ...RouterOption) Option {
return func(o *Options) { return func(o *Options) error {
o.Router = r var err error
// Update client ropts := routerOptions{}
if o.Client != nil { for _, opt := range opts {
o.Client.Init(client.Router(r)) opt(&ropts)
} }
all := false
if len(opts) == 0 {
all = true
}
for _, cli := range o.Clients {
for _, os := range ropts.clients {
if cli.Name() == os || all {
if err = cli.Init(client.Router(r)); err != nil {
return err
}
}
}
}
return nil
}
}
type routerOptions struct {
clients []string
}
type RouterOption func(*routerOptions)
func RouterClient(n string) RouterOption {
return func(o *routerOptions) {
o.clients = append(o.clients, n)
} }
} }
// Address sets the address of the server // Address sets the address of the server
func Address(addr string) Option { func Address(addr string) Option {
return func(o *Options) { return func(o *Options) error {
if o.Server != nil { switch len(o.Servers) {
o.Server.Init(server.Address(addr)) case 0:
return fmt.Errorf("cant set address on nil server")
case 1:
break
default:
return fmt.Errorf("cant set same address for multiple servers")
} }
return o.Servers[0].Init(server.Address(addr))
} }
} }
// Name of the service // Name of the service
func Name(n string) Option { func Name(n string) Option {
return func(o *Options) { return func(o *Options) error {
if o.Server != nil { o.Name = n
o.Server.Init(server.Name(n)) return nil
}
} }
} }
// Version of the service // Version of the service
func Version(v string) Option { func Version(v string) Option {
return func(o *Options) { return func(o *Options) error {
if o.Server != nil { o.Version = v
o.Server.Init(server.Version(v)) return nil
}
} }
} }
// Metadata associated with the service // Metadata associated with the service
func Metadata(md metadata.Metadata) Option { func Metadata(md metadata.Metadata) Option {
return func(o *Options) { return func(o *Options) error {
if o.Server != nil { o.Metadata = metadata.Copy(md)
o.Server.Init(server.Metadata(md)) return nil
}
} }
} }
// RegisterTTL specifies the TTL to use when registering the service // RegisterTTL specifies the TTL to use when registering the service
func RegisterTTL(t time.Duration) Option { func RegisterTTL(td time.Duration, opts ...RegisterOption) Option {
return func(o *Options) { return func(o *Options) error {
if o.Server != nil { var err error
o.Server.Init(server.RegisterTTL(t)) ropts := registerOptions{}
for _, opt := range opts {
opt(&ropts)
} }
all := false
if len(opts) == 0 {
all = true
}
for _, srv := range o.Servers {
for _, os := range ropts.servers {
if srv.Name() == os || all {
if err = srv.Init(server.RegisterTTL(td)); err != nil {
return err
}
}
}
}
return nil
} }
} }
// RegisterInterval specifies the interval on which to re-register // RegisterInterval specifies the interval on which to re-register
func RegisterInterval(t time.Duration) Option { func RegisterInterval(td time.Duration, opts ...RegisterOption) Option {
return func(o *Options) { return func(o *Options) error {
if o.Server != nil { var err error
o.Server.Init(server.RegisterInterval(t)) ropts := registerOptions{}
for _, opt := range opts {
opt(&ropts)
}
all := false
if len(opts) == 0 {
all = true
}
for _, srv := range o.Servers {
for _, os := range ropts.servers {
if srv.Name() == os || all {
if err = srv.Init(server.RegisterInterval(td)); err != nil {
return err
} }
} }
}
// WrapClient is a convenience method for wrapping a Client with
// some middleware component. A list of wrappers can be provided.
// Wrappers are applied in reverse order so the last is executed first.
func WrapClient(w ...client.Wrapper) Option {
return func(o *Options) {
// apply in reverse
for i := len(w); i > 0; i-- {
o.Client = w[i-1](o.Client)
} }
} }
} return nil
// WrapCall is a convenience method for wrapping a Client CallFunc
func WrapCall(w ...client.CallWrapper) Option {
return func(o *Options) {
o.Client.Init(client.WrapCall(w...))
}
}
// WrapHandler adds a handler Wrapper to a list of options passed into the server
func WrapHandler(w ...server.HandlerWrapper) Option {
return func(o *Options) {
var wrappers []server.Option
for _, wrap := range w {
wrappers = append(wrappers, server.WrapHandler(wrap))
}
// Init once
o.Server.Init(wrappers...)
}
}
// WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server
func WrapSubscriber(w ...server.SubscriberWrapper) Option {
return func(o *Options) {
var wrappers []server.Option
for _, wrap := range w {
wrappers = append(wrappers, server.WrapSubscriber(wrap))
}
// Init once
o.Server.Init(wrappers...)
} }
} }
// BeforeStart run funcs before service starts // BeforeStart run funcs before service starts
func BeforeStart(fn func(context.Context) error) Option { func BeforeStart(fn func(context.Context) error) Option {
return func(o *Options) { return func(o *Options) error {
o.BeforeStart = append(o.BeforeStart, fn) o.BeforeStart = append(o.BeforeStart, fn)
return nil
} }
} }
// BeforeStop run funcs before service stops // BeforeStop run funcs before service stops
func BeforeStop(fn func(context.Context) error) Option { func BeforeStop(fn func(context.Context) error) Option {
return func(o *Options) { return func(o *Options) error {
o.BeforeStop = append(o.BeforeStop, fn) o.BeforeStop = append(o.BeforeStop, fn)
return nil
} }
} }
// AfterStart run funcs after service starts // AfterStart run funcs after service starts
func AfterStart(fn func(context.Context) error) Option { func AfterStart(fn func(context.Context) error) Option {
return func(o *Options) { return func(o *Options) error {
o.AfterStart = append(o.AfterStart, fn) o.AfterStart = append(o.AfterStart, fn)
return nil
} }
} }
// AfterStop run funcs after service stops // AfterStop run funcs after service stops
func AfterStop(fn func(context.Context) error) Option { func AfterStop(fn func(context.Context) error) Option {
return func(o *Options) { return func(o *Options) error {
o.AfterStop = append(o.AfterStop, fn) o.AfterStop = append(o.AfterStop, fn)
return nil
} }
} }

View File

@ -1,26 +1,26 @@
package registry package register
import ( import (
"context" "context"
) )
type registryKey struct{} type registerKey struct{}
// FromContext get registry from context // FromContext get register from context
func FromContext(ctx context.Context) (Registry, bool) { func FromContext(ctx context.Context) (Register, bool) {
if ctx == nil { if ctx == nil {
return nil, false return nil, false
} }
c, ok := ctx.Value(registryKey{}).(Registry) c, ok := ctx.Value(registerKey{}).(Register)
return c, ok return c, ok
} }
// NewContext put registry in context // NewContext put register in context
func NewContext(ctx context.Context, c Registry) context.Context { func NewContext(ctx context.Context, c Register) context.Context {
if ctx == nil { if ctx == nil {
ctx = context.Background() ctx = context.Background()
} }
return context.WithValue(ctx, registryKey{}, c) return context.WithValue(ctx, registerKey{}, c)
} }
// SetOption returns a function to setup a context with given value // SetOption returns a function to setup a context with given value

View File

@ -1,4 +1,4 @@
package registry package register
import ( import (
"fmt" "fmt"

View File

@ -1,4 +1,4 @@
package registry package register
import ( import (
"context" "context"

85
register/noop.go Normal file
View File

@ -0,0 +1,85 @@
package register
import (
"context"
)
type noopRegister struct {
opts Options
}
func (n *noopRegister) Name() string {
return n.opts.Name
}
// Init initialize register
func (n *noopRegister) Init(opts ...Option) error {
for _, o := range opts {
o(&n.opts)
}
return nil
}
// Options returns options struct
func (n *noopRegister) Options() Options {
return n.opts
}
// Connect opens connection to register
func (n *noopRegister) Connect(ctx context.Context) error {
return nil
}
// Disconnect close connection to register
func (n *noopRegister) Disconnect(ctx context.Context) error {
return nil
}
// Register registers service
func (n *noopRegister) Register(ctx context.Context, svc *Service, opts ...RegisterOption) error {
return nil
}
// Deregister deregisters service
func (n *noopRegister) Deregister(ctx context.Context, svc *Service, opts ...DeregisterOption) error {
return nil
}
// LookupService returns servive info
func (n *noopRegister) LookupService(ctx context.Context, name string, opts ...LookupOption) ([]*Service, error) {
return []*Service{}, nil
}
// ListServices listing services
func (n *noopRegister) ListServices(ctx context.Context, opts ...ListOption) ([]*Service, error) {
return []*Service{}, nil
}
// Watch is used to watch for service changes
func (n *noopRegister) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
return &noopWatcher{done: make(chan struct{}), opts: NewWatchOptions(opts...)}, nil
}
// String returns register string representation
func (n *noopRegister) String() string {
return "noop"
}
type noopWatcher struct {
opts WatchOptions
done chan struct{}
}
func (n *noopWatcher) Next() (*Result, error) {
<-n.done
return nil, ErrWatcherStopped
}
func (n *noopWatcher) Stop() {
close(n.done)
}
// NewRegister returns a new noop register
func NewRegister(opts ...Option) Register {
return &noopRegister{opts: NewOptions(opts...)}
}

View File

@ -1,4 +1,4 @@
package registry package register
import ( import (
"context" "context"
@ -11,6 +11,7 @@ import (
) )
type Options struct { type Options struct {
Name string
Addrs []string Addrs []string
Timeout time.Duration Timeout time.Duration
TLSConfig *tls.Config TLSConfig *tls.Config
@ -102,14 +103,14 @@ func NewDeregisterOptions(opts ...DeregisterOption) DeregisterOptions {
return options return options
} }
type GetOptions struct { type LookupOptions struct {
Context context.Context Context context.Context
// Domain to scope the request to // Domain to scope the request to
Domain string Domain string
} }
func NewGetOptions(opts ...GetOption) GetOptions { func NewLookupOptions(opts ...LookupOption) LookupOptions {
options := GetOptions{ options := LookupOptions{
Domain: DefaultDomain, Domain: DefaultDomain,
Context: context.Background(), Context: context.Background(),
} }
@ -136,7 +137,7 @@ func NewListOptions(opts ...ListOption) ListOptions {
return options return options
} }
// Addrs is the registry addresses to use // Addrs is the register addresses to use
func Addrs(addrs ...string) Option { func Addrs(addrs ...string) Option {
return func(o *Options) { return func(o *Options) {
o.Addrs = addrs o.Addrs = addrs
@ -245,14 +246,14 @@ func DeregisterDomain(d string) DeregisterOption {
} }
} }
func GetContext(ctx context.Context) GetOption { func GetContext(ctx context.Context) LookupOption {
return func(o *GetOptions) { return func(o *LookupOptions) {
o.Context = ctx o.Context = ctx
} }
} }
func GetDomain(d string) GetOption { func GetDomain(d string) LookupOption {
return func(o *GetOptions) { return func(o *LookupOptions) {
o.Domain = d o.Domain = d
} }
} }

View File

@ -1,5 +1,5 @@
// Package registry is an interface for service discovery // Package register is an interface for service discovery
package registry package register
import ( import (
"context" "context"
@ -16,31 +16,32 @@ const (
) )
var ( var (
// DefaultRegistry is the global default registry // DefaultRegister is the global default register
DefaultRegistry Registry = NewRegistry() DefaultRegister Register = NewRegister()
// ErrNotFound returned when GetService is called and no services found // ErrNotFound returned when LookupService is called and no services found
ErrNotFound = errors.New("service not found") ErrNotFound = errors.New("service not found")
// ErrWatcherStopped returned when when watcher is stopped // ErrWatcherStopped returned when when watcher is stopped
ErrWatcherStopped = errors.New("watcher stopped") ErrWatcherStopped = errors.New("watcher stopped")
) )
// Registry provides an interface for service discovery // Register provides an interface for service discovery
// and an abstraction over varying implementations // and an abstraction over varying implementations
// {consul, etcd, zookeeper, ...} // {consul, etcd, zookeeper, ...}
type Registry interface { type Register interface {
Name() string
Init(...Option) error Init(...Option) error
Options() Options Options() Options
Connect(context.Context) error Connect(context.Context) error
Disconnect(context.Context) error Disconnect(context.Context) error
Register(context.Context, *Service, ...RegisterOption) error Register(context.Context, *Service, ...RegisterOption) error
Deregister(context.Context, *Service, ...DeregisterOption) error Deregister(context.Context, *Service, ...DeregisterOption) error
GetService(context.Context, string, ...GetOption) ([]*Service, error) LookupService(context.Context, string, ...LookupOption) ([]*Service, error)
ListServices(context.Context, ...ListOption) ([]*Service, error) ListServices(context.Context, ...ListOption) ([]*Service, error)
Watch(context.Context, ...WatchOption) (Watcher, error) Watch(context.Context, ...WatchOption) (Watcher, error)
String() string String() string
} }
// Service holds service registry info // Service holds service register info
type Service struct { type Service struct {
Name string `json:"name"` Name string `json:"name"`
Version string `json:"version"` Version string `json:"version"`
@ -49,14 +50,14 @@ type Service struct {
Nodes []*Node `json:"nodes"` Nodes []*Node `json:"nodes"`
} }
// Node holds node registry info // Node holds node register info
type Node struct { type Node struct {
Id string `json:"id"` Id string `json:"id"`
Address string `json:"address"` Address string `json:"address"`
Metadata metadata.Metadata `json:"metadata"` Metadata metadata.Metadata `json:"metadata"`
} }
// Endpoint holds endpoint registry info // Endpoint holds endpoint register info
type Endpoint struct { type Endpoint struct {
Name string `json:"name"` Name string `json:"name"`
Request *Value `json:"request"` Request *Value `json:"request"`
@ -83,8 +84,8 @@ type WatchOption func(*WatchOptions)
// DeregisterOption option is used to deregister service // DeregisterOption option is used to deregister service
type DeregisterOption func(*DeregisterOptions) type DeregisterOption func(*DeregisterOptions)
// GetOption option is used to get service // LookupOption option is used to get service
type GetOption func(*GetOptions) type LookupOption func(*LookupOptions)
// ListOption option is used to list services // ListOption option is used to list services
type ListOption func(*ListOptions) type ListOption func(*ListOptions)

View File

@ -1,9 +1,9 @@
package registry package register
import "time" import "time"
// Watcher is an interface that returns updates // Watcher is an interface that returns updates
// about services within the registry. // about services within the register.
type Watcher interface { type Watcher interface {
// Next is a blocking call // Next is a blocking call
Next() (*Result, error) Next() (*Result, error)
@ -17,7 +17,7 @@ type Result struct {
Service *Service Service *Service
} }
// EventType defines registry event type // EventType defines register event type
type EventType int type EventType int
const ( const (
@ -43,14 +43,14 @@ func (t EventType) String() string {
} }
} }
// Event is registry event // Event is register event
type Event struct { type Event struct {
// Id is registry id // Id is register id
Id string Id string
// Type defines type of event // Type defines type of event
Type EventType Type EventType
// Timestamp is event timestamp // Timestamp is event timestamp
Timestamp time.Time Timestamp time.Time
// Service is registry service // Service is register service
Service *Service Service *Service
} }

View File

@ -1,81 +0,0 @@
package registry
import (
"context"
)
type noopRegistry struct {
opts Options
}
// Init initialize registry
func (n *noopRegistry) Init(opts ...Option) error {
for _, o := range opts {
o(&n.opts)
}
return nil
}
// Options returns options struct
func (n *noopRegistry) Options() Options {
return n.opts
}
// Connect opens connection to registry
func (n *noopRegistry) Connect(ctx context.Context) error {
return nil
}
// Disconnect close connection to registry
func (n *noopRegistry) Disconnect(ctx context.Context) error {
return nil
}
// Register registers service
func (n *noopRegistry) Register(ctx context.Context, svc *Service, opts ...RegisterOption) error {
return nil
}
// Deregister deregisters service
func (n *noopRegistry) Deregister(ctx context.Context, svc *Service, opts ...DeregisterOption) error {
return nil
}
// GetService returns servive info
func (n *noopRegistry) GetService(ctx context.Context, name string, opts ...GetOption) ([]*Service, error) {
return []*Service{}, nil
}
// ListServices listing services
func (n *noopRegistry) ListServices(ctx context.Context, opts ...ListOption) ([]*Service, error) {
return []*Service{}, nil
}
// Watch is used to watch for service changes
func (n *noopRegistry) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) {
return &noopWatcher{done: make(chan struct{}), opts: NewWatchOptions(opts...)}, nil
}
// String returns registry string representation
func (n *noopRegistry) String() string {
return "noop"
}
type noopWatcher struct {
opts WatchOptions
done chan struct{}
}
func (n *noopWatcher) Next() (*Result, error) {
<-n.done
return nil, ErrWatcherStopped
}
func (n *noopWatcher) Stop() {
close(n.done)
}
// NewRegistry returns a new noop registry
func NewRegistry(opts ...Option) Registry {
return &noopRegistry{opts: NewOptions(opts...)}
}

View File

@ -1,22 +1,22 @@
// Package registry resolves names using the micro registry // Package register resolves names using the micro register
package registry package register
import ( import (
"context" "context"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/resolver" "github.com/unistack-org/micro/v3/resolver"
) )
// Resolver is a registry network resolver // Resolver is a register network resolver
type Resolver struct { type Resolver struct {
// Registry is the registry to use otherwise we use the defaul // Register is the register to use otherwise we use the defaul
Registry registry.Registry Register register.Register
} }
// Resolve assumes ID is a domain name e.g micro.mu // Resolve assumes ID is a domain name e.g micro.mu
func (r *Resolver) Resolve(name string) ([]*resolver.Record, error) { func (r *Resolver) Resolve(name string) ([]*resolver.Record, error) {
services, err := r.Registry.GetService(context.TODO(), name) services, err := r.Register.LookupService(context.TODO(), name)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -5,11 +5,12 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
) )
// Options are router options // Options are router options
type Options struct { type Options struct {
Name string
// Id is router id // Id is router id
Id string Id string
// Address is router address // Address is router address
@ -18,8 +19,8 @@ type Options struct {
Gateway string Gateway string
// Network is network address // Network is network address
Network string Network string
// Registry is the local registry // Register is the local register
Registry registry.Registry Register register.Register
// Precache routes // Precache routes
Precache bool Precache bool
// Logger // Logger
@ -63,10 +64,10 @@ func Logger(l logger.Logger) Option {
} }
} }
// Registry sets the local registry // Register sets the local register
func Registry(r registry.Registry) Option { func Register(r register.Register) Option {
return func(o *Options) { return func(o *Options) {
o.Registry = r o.Register = r
} }
} }
@ -77,12 +78,19 @@ func Precache() Option {
} }
} }
// Name of the router
func Name(n string) Option {
return func(o *Options) {
o.Name = n
}
}
// NewOptions returns router default options // NewOptions returns router default options
func NewOptions(opts ...Option) Options { func NewOptions(opts ...Option) Options {
options := Options{ options := Options{
Id: uuid.New().String(), Id: uuid.New().String(),
Network: DefaultNetwork, Network: DefaultNetwork,
Registry: registry.DefaultRegistry, Register: register.DefaultRegister,
Logger: logger.DefaultLogger, Logger: logger.DefaultLogger,
Context: context.Background(), Context: context.Background(),
} }

View File

@ -18,6 +18,7 @@ var (
// Router is an interface for a routing control plane // Router is an interface for a routing control plane
type Router interface { type Router interface {
Name() string
// Init initializes the router with options // Init initializes the router with options
Init(...Option) error Init(...Option) error
// Options returns the router options // Options returns the router options

View File

@ -3,13 +3,13 @@ package server
import ( import (
"reflect" "reflect"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
) )
type rpcHandler struct { type rpcHandler struct {
name string name string
handler interface{} handler interface{}
endpoints []*registry.Endpoint endpoints []*register.Endpoint
opts HandlerOptions opts HandlerOptions
} }
@ -20,10 +20,10 @@ func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler {
hdlr := reflect.ValueOf(handler) hdlr := reflect.ValueOf(handler)
name := reflect.Indirect(hdlr).Type().Name() name := reflect.Indirect(hdlr).Type().Name()
var endpoints []*registry.Endpoint var endpoints []*register.Endpoint
for m := 0; m < typ.NumMethod(); m++ { for m := 0; m < typ.NumMethod(); m++ {
if e := registry.ExtractEndpoint(typ.Method(m)); e != nil { if e := register.ExtractEndpoint(typ.Method(m)); e != nil {
e.Name = name + "." + e.Name e.Name = name + "." + e.Name
for k, v := range options.Metadata[e.Name] { for k, v := range options.Metadata[e.Name] {
@ -50,7 +50,7 @@ func (r *rpcHandler) Handler() interface{} {
return r.handler return r.handler
} }
func (r *rpcHandler) Endpoints() []*registry.Endpoint { func (r *rpcHandler) Endpoints() []*register.Endpoint {
return r.endpoints return r.endpoints
} }

View File

@ -13,7 +13,7 @@ import (
"github.com/unistack-org/micro/v3/broker" "github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
) )
var ( var (
@ -34,7 +34,7 @@ const (
type noopServer struct { type noopServer struct {
h Handler h Handler
opts Options opts Options
rsvc *registry.Service rsvc *register.Service
handlers map[string]Handler handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber subscribers map[*subscriber][]broker.Subscriber
registered bool registered bool
@ -64,6 +64,10 @@ func (n *noopServer) Handle(handler Handler) error {
return nil return nil
} }
func (n *noopServer) Name() string {
return n.opts.Name
}
func (n *noopServer) Subscribe(sb Subscriber) error { func (n *noopServer) Subscribe(sb Subscriber) error {
sub, ok := sb.(*subscriber) sub, ok := sb.(*subscriber)
if !ok { if !ok {
@ -137,10 +141,10 @@ func (n *noopServer) Register() error {
} }
var err error var err error
var service *registry.Service var service *register.Service
var cacheService bool var cacheService bool
service, err = NewRegistryService(n) service, err = NewRegisterService(n)
if err != nil { if err != nil {
return err return err
} }
@ -168,7 +172,7 @@ func (n *noopServer) Register() error {
return subscriberList[i].topic > subscriberList[j].topic return subscriberList[i].topic > subscriberList[j].topic
}) })
endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList)) endpoints := make([]*register.Endpoint, 0, len(handlerList)+len(subscriberList))
for _, h := range handlerList { for _, h := range handlerList {
endpoints = append(endpoints, n.handlers[h].Endpoints()...) endpoints = append(endpoints, n.handlers[h].Endpoints()...)
} }
@ -187,7 +191,7 @@ func (n *noopServer) Register() error {
if !registered { if !registered {
if config.Logger.V(logger.InfoLevel) { if config.Logger.V(logger.InfoLevel) {
config.Logger.Infof(n.opts.Context, "registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id) config.Logger.Infof(n.opts.Context, "register [%s] Registering node: %s", config.Register.String(), service.Nodes[0].Id)
} }
} }
@ -244,7 +248,7 @@ func (n *noopServer) Deregister() error {
config := n.opts config := n.opts
n.RUnlock() n.RUnlock()
service, err := NewRegistryService(n) service, err := NewRegisterService(n)
if err != nil { if err != nil {
return err return err
} }

View File

@ -14,7 +14,7 @@ import (
"github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/meter" "github.com/unistack-org/micro/v3/meter"
"github.com/unistack-org/micro/v3/network/transport" "github.com/unistack-org/micro/v3/network/transport"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/tracer" "github.com/unistack-org/micro/v3/tracer"
) )
@ -25,7 +25,7 @@ type Option func(*Options)
type Options struct { type Options struct {
Codecs map[string]codec.Codec Codecs map[string]codec.Codec
Broker broker.Broker Broker broker.Broker
Registry registry.Registry Register register.Register
Tracer tracer.Tracer Tracer tracer.Tracer
Auth auth.Auth Auth auth.Auth
Logger logger.Logger Logger logger.Logger
@ -83,7 +83,7 @@ func NewOptions(opts ...Option) Options {
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer, Tracer: tracer.DefaultTracer,
Broker: broker.DefaultBroker, Broker: broker.DefaultBroker,
Registry: registry.DefaultRegistry, Register: register.DefaultRegister,
Transport: transport.DefaultTransport, Transport: transport.DefaultTransport,
Address: DefaultAddress, Address: DefaultAddress,
Name: DefaultName, Name: DefaultName,
@ -178,10 +178,10 @@ func Context(ctx context.Context) Option {
} }
} }
// Registry used for discovery // Register used for discovery
func Registry(r registry.Registry) Option { func Register(r register.Register) Option {
return func(o *Options) { return func(o *Options) {
o.Registry = r o.Register = r
} }
} }
@ -213,7 +213,7 @@ func Metadata(md metadata.Metadata) Option {
} }
} }
// RegisterCheck run func before registry service // RegisterCheck run func before register service
func RegisterCheck(fn func(context.Context) error) Option { func RegisterCheck(fn func(context.Context) error) Option {
return func(o *Options) { return func(o *Options) {
o.RegisterCheck = fn o.RegisterCheck = fn

View File

@ -5,23 +5,23 @@ import (
"time" "time"
"github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/util/addr" "github.com/unistack-org/micro/v3/util/addr"
"github.com/unistack-org/micro/v3/util/backoff" "github.com/unistack-org/micro/v3/util/backoff"
) )
var ( var (
// DefaultRegisterFunc uses backoff to register service // DefaultRegisterFunc uses backoff to register service
DefaultRegisterFunc = func(svc *registry.Service, config Options) error { DefaultRegisterFunc = func(svc *register.Service, config Options) error {
var err error var err error
opts := []registry.RegisterOption{ opts := []register.RegisterOption{
registry.RegisterTTL(config.RegisterTTL), register.RegisterTTL(config.RegisterTTL),
registry.RegisterDomain(config.Namespace), register.RegisterDomain(config.Namespace),
} }
for i := 0; i <= config.RegisterAttempts; i++ { for i := 0; i <= config.RegisterAttempts; i++ {
err = config.Registry.Register(config.Context, svc, opts...) err = config.Register.Register(config.Context, svc, opts...)
if err == nil { if err == nil {
break break
} }
@ -32,15 +32,15 @@ var (
return err return err
} }
// DefaultDeregisterFunc uses backoff to deregister service // DefaultDeregisterFunc uses backoff to deregister service
DefaultDeregisterFunc = func(svc *registry.Service, config Options) error { DefaultDeregisterFunc = func(svc *register.Service, config Options) error {
var err error var err error
opts := []registry.DeregisterOption{ opts := []register.DeregisterOption{
registry.DeregisterDomain(config.Namespace), register.DeregisterDomain(config.Namespace),
} }
for i := 0; i <= config.DeregisterAttempts; i++ { for i := 0; i <= config.DeregisterAttempts; i++ {
err = config.Registry.Deregister(config.Context, svc, opts...) err = config.Register.Deregister(config.Context, svc, opts...)
if err == nil { if err == nil {
break break
} }
@ -52,8 +52,8 @@ var (
} }
) )
// NewRegistryService returns *registry.Service from Server // NewRegisterService returns *register.Service from Server
func NewRegistryService(s Server) (*registry.Service, error) { func NewRegisterService(s Server) (*register.Service, error) {
opts := s.Options() opts := s.Options()
advt := opts.Address advt := opts.Address
@ -71,7 +71,7 @@ func NewRegistryService(s Server) (*registry.Service, error) {
addr = host addr = host
} }
node := &registry.Node{ node := &register.Node{
Id: opts.Name + "-" + opts.Id, Id: opts.Name + "-" + opts.Id,
Address: net.JoinHostPort(addr, port), Address: net.JoinHostPort(addr, port),
} }
@ -79,12 +79,12 @@ func NewRegistryService(s Server) (*registry.Service, error) {
node.Metadata["server"] = s.String() node.Metadata["server"] = s.String()
node.Metadata["broker"] = opts.Broker.String() node.Metadata["broker"] = opts.Broker.String()
node.Metadata["registry"] = opts.Registry.String() node.Metadata["register"] = opts.Register.String()
return &registry.Service{ return &register.Service{
Name: opts.Name, Name: opts.Name,
Version: opts.Version, Version: opts.Version,
Nodes: []*registry.Node{node}, Nodes: []*register.Node{node},
Metadata: metadata.New(0), Metadata: metadata.New(0),
}, nil }, nil
} }

View File

@ -8,7 +8,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
) )
var ( var (
@ -29,7 +29,7 @@ var (
DefaultRegisterCheck = func(context.Context) error { return nil } DefaultRegisterCheck = func(context.Context) error { return nil }
// DefaultRegisterInterval holds interval for register // DefaultRegisterInterval holds interval for register
DefaultRegisterInterval = time.Second * 30 DefaultRegisterInterval = time.Second * 30
// DefaultRegisterTTL holds registry record ttl, must be multiple of DefaultRegisterInterval // DefaultRegisterTTL holds register record ttl, must be multiple of DefaultRegisterInterval
DefaultRegisterTTL = time.Second * 90 DefaultRegisterTTL = time.Second * 90
// DefaultNamespace will be used if no namespace passed // DefaultNamespace will be used if no namespace passed
DefaultNamespace = "micro" DefaultNamespace = "micro"
@ -43,6 +43,8 @@ var (
// Server is a simple micro server abstraction // Server is a simple micro server abstraction
type Server interface { type Server interface {
// Name returns server name
Name() string
// Initialise options // Initialise options
Init(...Option) error Init(...Option) error
// Retrieve the options // Retrieve the options
@ -147,7 +149,7 @@ type Stream interface {
type Handler interface { type Handler interface {
Name() string Name() string
Handler() interface{} Handler() interface{}
Endpoints() []*registry.Endpoint Endpoints() []*register.Endpoint
Options() HandlerOptions Options() HandlerOptions
} }
@ -157,6 +159,6 @@ type Handler interface {
type Subscriber interface { type Subscriber interface {
Topic() string Topic() string
Subscriber() interface{} Subscriber() interface{}
Endpoints() []*registry.Endpoint Endpoints() []*register.Endpoint
Options() SubscriberOptions Options() SubscriberOptions
} }

View File

@ -14,7 +14,7 @@ import (
"github.com/unistack-org/micro/v3/errors" "github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata" "github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
) )
const ( const (
@ -39,7 +39,7 @@ type subscriber struct {
typ reflect.Type typ reflect.Type
subscriber interface{} subscriber interface{}
handlers []*handler handlers []*handler
endpoints []*registry.Endpoint endpoints []*register.Endpoint
opts SubscriberOptions opts SubscriberOptions
} }
@ -115,7 +115,7 @@ func ValidateSubscriber(sub Subscriber) error {
} }
func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber { func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
var endpoints []*registry.Endpoint var endpoints []*register.Endpoint
var handlers []*handler var handlers []*handler
options := NewSubscriberOptions(opts...) options := NewSubscriberOptions(opts...)
@ -134,9 +134,9 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
} }
handlers = append(handlers, h) handlers = append(handlers, h)
ep := &registry.Endpoint{ ep := &register.Endpoint{
Name: "Func", Name: "Func",
Request: registry.ExtractSubValue(typ), Request: register.ExtractSubValue(typ),
Metadata: metadata.New(2), Metadata: metadata.New(2),
} }
ep.Metadata.Set("topic", topic) ep.Metadata.Set("topic", topic)
@ -161,9 +161,9 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
} }
handlers = append(handlers, h) handlers = append(handlers, h)
ep := &registry.Endpoint{ ep := &register.Endpoint{
Name: name + "." + method.Name, Name: name + "." + method.Name,
Request: registry.ExtractSubValue(method.Type), Request: register.ExtractSubValue(method.Type),
Metadata: metadata.New(2), Metadata: metadata.New(2),
} }
ep.Metadata.Set("topic", topic) ep.Metadata.Set("topic", topic)
@ -304,7 +304,7 @@ func (s *subscriber) Subscriber() interface{} {
return s.subscriber return s.subscriber
} }
func (s *subscriber) Endpoints() []*registry.Endpoint { func (s *subscriber) Endpoints() []*register.Endpoint {
return s.endpoints return s.endpoints
} }

View File

@ -1,8 +1,8 @@
// Package micro is a pluggable framework for microservices
package micro package micro
import ( import (
"fmt" "fmt"
rtime "runtime"
"sync" "sync"
"github.com/unistack-org/micro/v3/auth" "github.com/unistack-org/micro/v3/auth"
@ -10,31 +10,86 @@ import (
"github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/config" "github.com/unistack-org/micro/v3/config"
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/meter"
"github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/router" "github.com/unistack-org/micro/v3/router"
"github.com/unistack-org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
"github.com/unistack-org/micro/v3/store" "github.com/unistack-org/micro/v3/store"
"github.com/unistack-org/micro/v3/tracer"
) )
// Service is an interface that wraps the lower level components.
// Its works as container with building blocks for service.
type Service interface {
// The service name
Name() string
// Init initialises options
Init(...Option) error
// Options returns the current options
Options() Options
// Auth is for handling auth
Auth(...string) auth.Auth
// Logger is for logs
Logger(...string) logger.Logger
// Config if for config
Config(...string) config.Config
// Client is for calling services
Client(...string) client.Client
// Broker is for sending and receiving events
Broker(...string) broker.Broker
// Server is for handling requests and events
Server(...string) server.Server
// Store is for key/val store
Store(...string) store.Store
// Register
Register(...string) register.Register
// Tracer
Tracer(...string) tracer.Tracer
// Router
Router(...string) router.Router
// Meter
Meter(...string) meter.Meter
// Runtime
// Runtime(string) (runtime.Runtime, bool)
// Profile
// Profile(string) (profile.Profile, bool)
// Run the service
Run() error
// The service implementation
String() string
}
// RegisterHandler is syntactic sugar for registering a handler
func RegisterHandler(s server.Server, h interface{}, opts ...server.HandlerOption) error {
return s.Handle(s.NewHandler(h, opts...))
}
// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}
type service struct { type service struct {
opts Options opts Options
sync.RWMutex sync.RWMutex
// once sync.Once // once sync.Once
} }
func newService(opts ...Option) Service { // NewService creates and returns a new Service based on the packages within.
service := &service{opts: NewOptions(opts...)} func NewService(opts ...Option) Service {
return service return &service{opts: NewOptions(opts...)}
} }
func (s *service) Name() string { func (s *service) Name() string {
return s.opts.Server.Options().Name return s.opts.Name
} }
// Init initialises options. Additionally it calls cmd.Init // Init initialises options. Additionally it calls cmd.Init
// which parses command line flags. cmd.Init is only called // which parses command line flags. cmd.Init is only called
// on first Init. // on first Init.
func (s *service) Init(opts ...Option) error { func (s *service) Init(opts ...Option) error {
var err error
// process options // process options
for _, o := range opts { for _, o := range opts {
o(&s.opts) o(&s.opts)
@ -45,60 +100,48 @@ func (s *service) Init(opts ...Option) error {
// skip config as the struct not passed // skip config as the struct not passed
continue continue
} }
if err := cfg.Init(config.Context(s.opts.Context)); err != nil { if err = cfg.Init(config.Context(s.opts.Context)); err != nil {
return err return err
} }
if err := cfg.Load(s.opts.Context); err != nil { if err = cfg.Load(s.opts.Context); err != nil {
return err return err
} }
} }
if s.opts.Logger != nil { for _, log := range s.opts.Loggers {
if err := s.opts.Logger.Init( if err = log.Init(logger.WithContext(s.opts.Context)); err != nil {
logger.WithContext(s.opts.Context),
); err != nil {
return err return err
} }
} }
if s.opts.Registry != nil { for _, reg := range s.opts.Registers {
if err := s.opts.Registry.Init( if err = reg.Init(register.Context(s.opts.Context)); err != nil {
registry.Context(s.opts.Context),
); err != nil {
return err return err
} }
} }
if s.opts.Broker != nil { for _, brk := range s.opts.Brokers {
if err := s.opts.Broker.Init( if err = brk.Init(broker.Context(s.opts.Context)); err != nil {
broker.Context(s.opts.Context),
); err != nil {
return err return err
} }
} }
if s.opts.Store != nil { for _, str := range s.opts.Stores {
if err := s.opts.Store.Init( if err = str.Init(store.Context(s.opts.Context)); err != nil {
store.Context(s.opts.Context),
); err != nil {
return err return err
} }
} }
if s.opts.Server != nil { for _, srv := range s.opts.Servers {
if err := s.opts.Server.Init( if err = srv.Init(server.Context(s.opts.Context)); err != nil {
server.Context(s.opts.Context),
); err != nil {
return err return err
} }
} }
if s.opts.Client != nil { for _, cli := range s.opts.Clients {
if err := s.opts.Client.Init( if err = cli.Init(client.Context(s.opts.Context)); err != nil {
client.Context(s.opts.Context),
); err != nil {
return err return err
} }
} }
@ -110,36 +153,93 @@ func (s *service) Options() Options {
return s.opts return s.opts
} }
func (s *service) Broker() broker.Broker { func (s *service) Broker(names ...string) broker.Broker {
return s.opts.Broker idx := 0
if len(names) == 1 {
idx = getNameIndex(names[0], s.opts.Brokers)
}
return s.opts.Brokers[idx]
} }
func (s *service) Client() client.Client { func (s *service) Tracer(names ...string) tracer.Tracer {
return s.opts.Client idx := 0
if len(names) == 1 {
idx = getNameIndex(names[0], s.opts.Tracers)
}
return s.opts.Tracers[idx]
} }
func (s *service) Server() server.Server { func (s *service) Config(names ...string) config.Config {
return s.opts.Server idx := 0
if len(names) == 1 {
idx = getNameIndex(names[0], s.opts.Configs)
}
return s.opts.Configs[idx]
} }
func (s *service) Store() store.Store { func (s *service) Client(names ...string) client.Client {
return s.opts.Store idx := 0
if len(names) == 1 {
idx = getNameIndex(names[0], s.opts.Clients)
}
return s.opts.Clients[idx]
} }
func (s *service) Registry() registry.Registry { func (s *service) Server(names ...string) server.Server {
return s.opts.Registry idx := 0
if len(names) == 1 {
idx = getNameIndex(names[0], s.opts.Servers)
}
return s.opts.Servers[idx]
} }
func (s *service) Logger() logger.Logger { func (s *service) Store(names ...string) store.Store {
return s.opts.Logger idx := 0
if len(names) == 1 {
idx = getNameIndex(names[0], s.opts.Stores)
}
return s.opts.Stores[idx]
} }
func (s *service) Auth() auth.Auth { func (s *service) Register(names ...string) register.Register {
return s.opts.Auth idx := 0
if len(names) == 1 {
idx = getNameIndex(names[0], s.opts.Registers)
}
return s.opts.Registers[idx]
} }
func (s *service) Router() router.Router { func (s *service) Logger(names ...string) logger.Logger {
return s.opts.Router idx := 0
if len(names) == 1 {
idx = getNameIndex(names[0], s.opts.Loggers)
}
return s.opts.Loggers[idx]
}
func (s *service) Auth(names ...string) auth.Auth {
idx := 0
if len(names) == 1 {
idx = getNameIndex(names[0], s.opts.Auths)
}
return s.opts.Auths[idx]
}
func (s *service) Router(names ...string) router.Router {
idx := 0
if len(names) == 1 {
idx = getNameIndex(names[0], s.opts.Routers)
}
return s.opts.Routers[idx]
}
func (s *service) Meter(names ...string) meter.Meter {
idx := 0
if len(names) == 1 {
idx = getNameIndex(names[0], s.opts.Meters)
}
return s.opts.Meters[idx]
} }
func (s *service) String() string { func (s *service) String() string {
@ -153,8 +253,8 @@ func (s *service) Start() error {
config := s.opts config := s.opts
s.RUnlock() s.RUnlock()
if config.Logger.V(logger.InfoLevel) { if config.Loggers[0].V(logger.InfoLevel) {
config.Logger.Infof(s.opts.Context, "starting [service] %s", s.Name()) config.Loggers[0].Infof(s.opts.Context, "starting [service] %s", s.Name())
} }
for _, fn := range s.opts.BeforeStart { for _, fn := range s.opts.BeforeStart {
@ -169,36 +269,38 @@ func (s *service) Start() error {
continue continue
} }
if err := cfg.Load(s.opts.Context); err != nil { if err = cfg.Load(s.opts.Context); err != nil {
return err return err
} }
} }
if s.opts.Server == nil { if len(s.opts.Servers) == 0 {
return fmt.Errorf("cant start nil server") return fmt.Errorf("cant start nil server")
} }
if s.opts.Registry != nil { for _, reg := range s.opts.Registers {
if err := s.opts.Registry.Connect(s.opts.Context); err != nil { if err = reg.Connect(s.opts.Context); err != nil {
return err return err
} }
} }
if s.opts.Broker != nil { for _, brk := range s.opts.Brokers {
if err := s.opts.Broker.Connect(s.opts.Context); err != nil { if err = brk.Connect(s.opts.Context); err != nil {
return err return err
} }
} }
if s.opts.Store != nil { for _, str := range s.opts.Stores {
if err := s.opts.Store.Connect(s.opts.Context); err != nil { if err = str.Connect(s.opts.Context); err != nil {
return err return err
} }
} }
if err = s.opts.Server.Start(); err != nil { for _, srv := range s.opts.Servers {
if err = srv.Start(); err != nil {
return err return err
} }
}
for _, fn := range s.opts.AfterStart { for _, fn := range s.opts.AfterStart {
if err = fn(s.opts.Context); err != nil { if err = fn(s.opts.Context); err != nil {
@ -214,8 +316,8 @@ func (s *service) Stop() error {
config := s.opts config := s.opts
s.RUnlock() s.RUnlock()
if config.Logger.V(logger.InfoLevel) { if config.Loggers[0].V(logger.InfoLevel) {
config.Logger.Infof(s.opts.Context, "stoppping [service] %s", s.Name()) config.Loggers[0].Infof(s.opts.Context, "stoppping [service] %s", s.Name())
} }
var err error var err error
@ -225,9 +327,11 @@ func (s *service) Stop() error {
} }
} }
if err = s.opts.Server.Stop(); err != nil { for _, srv := range s.opts.Servers {
if err = srv.Stop(); err != nil {
return err return err
} }
}
for _, fn := range s.opts.AfterStop { for _, fn := range s.opts.AfterStop {
if err = fn(s.opts.Context); err != nil { if err = fn(s.opts.Context); err != nil {
@ -235,20 +339,20 @@ func (s *service) Stop() error {
} }
} }
if s.opts.Registry != nil { for _, reg := range s.opts.Registers {
if err := s.opts.Registry.Disconnect(s.opts.Context); err != nil { if err = reg.Disconnect(s.opts.Context); err != nil {
return err return err
} }
} }
if s.opts.Broker != nil { for _, brk := range s.opts.Brokers {
if err := s.opts.Broker.Disconnect(s.opts.Context); err != nil { if err = brk.Disconnect(s.opts.Context); err != nil {
return err return err
} }
} }
if s.opts.Store != nil { for _, str := range s.opts.Stores {
if err := s.opts.Store.Disconnect(s.opts.Context); err != nil { if err = str.Disconnect(s.opts.Context); err != nil {
return err return err
} }
} }
@ -258,6 +362,7 @@ func (s *service) Stop() error {
func (s *service) Run() error { func (s *service) Run() error {
// start the profiler // start the profiler
/*
if s.opts.Profile != nil { if s.opts.Profile != nil {
// to view mutex contention // to view mutex contention
rtime.SetMutexProfileFraction(5) rtime.SetMutexProfileFraction(5)
@ -269,7 +374,7 @@ func (s *service) Run() error {
} }
defer s.opts.Profile.Stop() defer s.opts.Profile.Stop()
} }
*/
if err := s.Start(); err != nil { if err := s.Start(); err != nil {
return err return err
} }
@ -279,3 +384,16 @@ func (s *service) Run() error {
return s.Stop() return s.Stop()
} }
type nameIface interface {
Name() string
}
func getNameIndex(n string, ifaces ...interface{}) int {
for idx, iface := range ifaces {
if ifc, ok := iface.(nameIface); ok && ifc.Name() == n {
return idx
}
}
return 0
}

View File

@ -23,6 +23,11 @@ func (n *noopStore) Options() Options {
return n.opts return n.opts
} }
// Name
func (n *noopStore) Name() string {
return n.opts.Name
}
// String returns string representation // String returns string representation
func (n *noopStore) String() string { func (n *noopStore) String() string {
return "noop" return "noop"

View File

@ -14,6 +14,7 @@ import (
// Options contains configuration for the Store // Options contains configuration for the Store
type Options struct { type Options struct {
Name string
// Nodes contains the addresses or other connection information of the backing storage. // Nodes contains the addresses or other connection information of the backing storage.
// For example, an etcd implementation would contain the nodes of the cluster. // For example, an etcd implementation would contain the nodes of the cluster.
// A SQL implementation could contain one or more connection strings. // A SQL implementation could contain one or more connection strings.

View File

@ -20,6 +20,7 @@ var (
// Store is a data storage interface // Store is a data storage interface
type Store interface { type Store interface {
Name() string
// Init initialises the store // Init initialises the store
Init(opts ...Option) error Init(opts ...Option) error
// Connect is used when store needs to be connected // Connect is used when store needs to be connected

View File

@ -6,6 +6,10 @@ type noopTracer struct {
opts Options opts Options
} }
func (n *noopTracer) Name() string {
return n.opts.Name
}
// Init initilize tracer // Init initilize tracer
func (n *noopTracer) Init(opts ...Option) error { func (n *noopTracer) Init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {
@ -19,6 +23,11 @@ func (n *noopTracer) Start(ctx context.Context, name string) (context.Context, *
return nil, nil return nil, nil
} }
// Lookup get span from context
func (n *noopTracer) Lookup(ctx context.Context) (*Span, error) {
return nil, nil
}
// Finish finishes span // Finish finishes span
func (n *noopTracer) Finish(*Span) error { func (n *noopTracer) Finish(*Span) error {
return nil return nil

View File

@ -9,6 +9,7 @@ var (
// Options struct // Options struct
type Options struct { type Options struct {
Name string
// Logger is the logger for messages // Logger is the logger for messages
Logger logger.Logger Logger logger.Logger
// Size is the size of ring buffer // Size is the size of ring buffer

View File

@ -15,10 +15,14 @@ var (
// Tracer is an interface for distributed tracing // Tracer is an interface for distributed tracing
type Tracer interface { type Tracer interface {
Name() string
Init(...Option) error
// Start a trace // Start a trace
Start(ctx context.Context, name string) (context.Context, *Span) Start(ctx context.Context, name string) (context.Context, *Span)
// Finish the trace // Finish the trace
Finish(*Span) error Finish(*Span) error
// Lookup get span from context
Lookup(ctx context.Context) (*Span, error)
// Read the traces // Read the traces
Read(...ReadOption) ([]*Span, error) Read(...ReadOption) ([]*Span, error)
} }

View File

@ -8,15 +8,15 @@ import (
"net/http" "net/http"
"testing" "testing"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/registry/memory" "github.com/unistack-org/micro/v3/register/memory"
"github.com/unistack-org/micro/v3/router" "github.com/unistack-org/micro/v3/router"
regRouter "github.com/unistack-org/micro/v3/router/registry" regRouter "github.com/unistack-org/micro/v3/router/register"
) )
func TestRoundTripper(t *testing.T) { func TestRoundTripper(t *testing.T) {
m := memory.NewRegistry() m := memory.NewRegister()
r := regRouter.NewRouter(router.Registry(m)) r := regRouter.NewRouter(router.Register(m))
rt := NewRoundTripper(WithRouter(r)) rt := NewRoundTripper(WithRouter(r))
@ -32,9 +32,9 @@ func TestRoundTripper(t *testing.T) {
go http.Serve(l, nil) go http.Serve(l, nil)
m.Register(&registry.Service{ m.Register(&register.Service{
Name: "example.com", Name: "example.com",
Nodes: []*registry.Node{ Nodes: []*register.Node{
{ {
Id: "1", Id: "1",
Address: l.Addr().String(), Address: l.Addr().String(),

View File

@ -1,11 +1,11 @@
package registry package register
import ( import (
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
) )
func addNodes(old, neu []*registry.Node) []*registry.Node { func addNodes(old, neu []*register.Node) []*register.Node {
nodes := make([]*registry.Node, len(neu)) nodes := make([]*register.Node, len(neu))
// add all new nodes // add all new nodes
for i, n := range neu { for i, n := range neu {
node := *n node := *n
@ -35,8 +35,8 @@ func addNodes(old, neu []*registry.Node) []*registry.Node {
return nodes return nodes
} }
func delNodes(old, del []*registry.Node) []*registry.Node { func delNodes(old, del []*register.Node) []*register.Node {
var nodes []*registry.Node var nodes []*register.Node
for _, o := range old { for _, o := range old {
var rem bool var rem bool
for _, n := range del { for _, n := range del {
@ -53,24 +53,24 @@ func delNodes(old, del []*registry.Node) []*registry.Node {
} }
// CopyService make a copy of service // CopyService make a copy of service
func CopyService(service *registry.Service) *registry.Service { func CopyService(service *register.Service) *register.Service {
// copy service // copy service
s := &registry.Service{} s := &register.Service{}
*s = *service *s = *service
// copy nodes // copy nodes
nodes := make([]*registry.Node, len(service.Nodes)) nodes := make([]*register.Node, len(service.Nodes))
for j, node := range service.Nodes { for j, node := range service.Nodes {
n := &registry.Node{} n := &register.Node{}
*n = *node *n = *node
nodes[j] = n nodes[j] = n
} }
s.Nodes = nodes s.Nodes = nodes
// copy endpoints // copy endpoints
eps := make([]*registry.Endpoint, len(service.Endpoints)) eps := make([]*register.Endpoint, len(service.Endpoints))
for j, ep := range service.Endpoints { for j, ep := range service.Endpoints {
e := &registry.Endpoint{} e := &register.Endpoint{}
*e = *ep *e = *ep
eps[j] = e eps[j] = e
} }
@ -79,8 +79,8 @@ func CopyService(service *registry.Service) *registry.Service {
} }
// Copy makes a copy of services // Copy makes a copy of services
func Copy(current []*registry.Service) []*registry.Service { func Copy(current []*register.Service) []*register.Service {
services := make([]*registry.Service, len(current)) services := make([]*register.Service, len(current))
for i, service := range current { for i, service := range current {
services[i] = CopyService(service) services[i] = CopyService(service)
} }
@ -88,14 +88,14 @@ func Copy(current []*registry.Service) []*registry.Service {
} }
// Merge merges two lists of services and returns a new copy // Merge merges two lists of services and returns a new copy
func Merge(olist []*registry.Service, nlist []*registry.Service) []*registry.Service { func Merge(olist []*register.Service, nlist []*register.Service) []*register.Service {
var srv []*registry.Service var srv []*register.Service
for _, n := range nlist { for _, n := range nlist {
var seen bool var seen bool
for _, o := range olist { for _, o := range olist {
if o.Version == n.Version { if o.Version == n.Version {
sp := &registry.Service{} sp := &register.Service{}
// make copy // make copy
*sp = *o *sp = *o
// set nodes // set nodes
@ -106,25 +106,25 @@ func Merge(olist []*registry.Service, nlist []*registry.Service) []*registry.Ser
srv = append(srv, sp) srv = append(srv, sp)
break break
} else { } else {
sp := &registry.Service{} sp := &register.Service{}
// make copy // make copy
*sp = *o *sp = *o
srv = append(srv, sp) srv = append(srv, sp)
} }
} }
if !seen { if !seen {
srv = append(srv, Copy([]*registry.Service{n})...) srv = append(srv, Copy([]*register.Service{n})...)
} }
} }
return srv return srv
} }
// Remove removes services and returns a new copy // Remove removes services and returns a new copy
func Remove(old, del []*registry.Service) []*registry.Service { func Remove(old, del []*register.Service) []*register.Service {
var services []*registry.Service var services []*register.Service
for _, o := range old { for _, o := range old {
srv := &registry.Service{} srv := &register.Service{}
*srv = *o *srv = *o
var rem bool var rem bool

View File

@ -1,18 +1,18 @@
package registry package register
import ( import (
"os" "os"
"testing" "testing"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
) )
func TestRemove(t *testing.T) { func TestRemove(t *testing.T) {
services := []*registry.Service{ services := []*register.Service{
{ {
Name: "foo", Name: "foo",
Version: "1.0.0", Version: "1.0.0",
Nodes: []*registry.Node{ Nodes: []*register.Node{
{ {
Id: "foo-123", Id: "foo-123",
Address: "localhost:9999", Address: "localhost:9999",
@ -22,7 +22,7 @@ func TestRemove(t *testing.T) {
{ {
Name: "foo", Name: "foo",
Version: "1.0.0", Version: "1.0.0",
Nodes: []*registry.Node{ Nodes: []*register.Node{
{ {
Id: "foo-123", Id: "foo-123",
Address: "localhost:6666", Address: "localhost:6666",
@ -31,7 +31,7 @@ func TestRemove(t *testing.T) {
}, },
} }
servs := Remove([]*registry.Service{services[0]}, []*registry.Service{services[1]}) servs := Remove([]*register.Service{services[0]}, []*register.Service{services[1]})
if i := len(servs); i > 0 { if i := len(servs); i > 0 {
t.Errorf("Expected 0 nodes, got %d: %+v", i, servs) t.Errorf("Expected 0 nodes, got %d: %+v", i, servs)
} }
@ -41,11 +41,11 @@ func TestRemove(t *testing.T) {
} }
func TestRemoveNodes(t *testing.T) { func TestRemoveNodes(t *testing.T) {
services := []*registry.Service{ services := []*register.Service{
{ {
Name: "foo", Name: "foo",
Version: "1.0.0", Version: "1.0.0",
Nodes: []*registry.Node{ Nodes: []*register.Node{
{ {
Id: "foo-123", Id: "foo-123",
Address: "localhost:9999", Address: "localhost:9999",
@ -59,7 +59,7 @@ func TestRemoveNodes(t *testing.T) {
{ {
Name: "foo", Name: "foo",
Version: "1.0.0", Version: "1.0.0",
Nodes: []*registry.Node{ Nodes: []*register.Node{
{ {
Id: "foo-123", Id: "foo-123",
Address: "localhost:6666", Address: "localhost:6666",

View File

@ -1,7 +1,7 @@
package router package router
import ( import (
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/register"
"github.com/unistack-org/micro/v3/router" "github.com/unistack-org/micro/v3/router"
) )
@ -19,7 +19,7 @@ func (r *apiRouter) String() string {
} }
// New router is a hack for API routing // New router is a hack for API routing
func New(srvs []*registry.Service) router.Router { func New(srvs []*register.Service) router.Router {
var routes []router.Route var routes []router.Route
for _, srv := range srvs { for _, srv := range srvs {

View File

@ -78,6 +78,11 @@ func (c *syncStore) Init(opts ...store.Option) error {
return nil return nil
} }
// Name returns the store name
func (c *syncStore) Name() string {
return c.storeOpts.Name
}
// Options returns the sync's store options // Options returns the sync's store options
func (c *syncStore) Options() store.Options { func (c *syncStore) Options() store.Options {
return c.storeOpts return c.storeOpts