register: improvements
* change domain to namespace * lower go.mod deps Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
ca1d7437b8
commit
2cf49e93e8
6
go.mod
6
go.mod
@ -1,12 +1,12 @@
|
||||
module go.unistack.org/micro/v3
|
||||
|
||||
go 1.22.2
|
||||
go 1.22.0
|
||||
|
||||
require (
|
||||
dario.cat/mergo v1.0.1
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.0
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2
|
||||
github.com/KimMachineGun/automemlimit v0.6.1
|
||||
github.com/ash3in/uuidv8 v1.0.1
|
||||
github.com/ash3in/uuidv8 v1.2.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/matoous/go-nanoid v1.5.1
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
|
9
go.sum
9
go.sum
@ -1,11 +1,11 @@
|
||||
dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s=
|
||||
dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU=
|
||||
github.com/KimMachineGun/automemlimit v0.6.1 h1:ILa9j1onAAMadBsyyUJv5cack8Y1WT26yLj/V+ulKp8=
|
||||
github.com/KimMachineGun/automemlimit v0.6.1/go.mod h1:T7xYht7B8r6AG/AqFcUdc7fzd2bIdBKmepfP2S1svPY=
|
||||
github.com/ash3in/uuidv8 v1.0.1 h1:dIq1XRkWT8lGA7N5s7WRTB4V3k49WTBLvILz7aCLp80=
|
||||
github.com/ash3in/uuidv8 v1.0.1/go.mod h1:EoyUgCtxNBnrnpc9efw5rVN1cQ+LFGCoJiFuD6maOMw=
|
||||
github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI=
|
||||
github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4=
|
||||
github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok=
|
||||
github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE=
|
||||
github.com/containerd/cgroups/v3 v3.0.4 h1:2fs7l3P0Qxb1nKWuJNFiwhp2CqiKzho71DQkDrHJIo4=
|
||||
@ -35,6 +35,7 @@ github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtL
|
||||
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
|
||||
github.com/jsimonetti/rtnetlink/v2 v2.0.1 h1:xda7qaHDSVOsADNouv7ukSuicKZO7GgVUCXxpaIEIlM=
|
||||
github.com/jsimonetti/rtnetlink/v2 v2.0.1/go.mod h1:7MoNYNbb3UaDHtF8udiJo/RH6VsTKP1pqKLUTVCvToE=
|
||||
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
|
||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
|
@ -1,12 +1,9 @@
|
||||
package register
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
)
|
||||
|
||||
// ExtractValue from reflect.Type from specified depth
|
||||
@ -38,53 +35,6 @@ func ExtractValue(v reflect.Type, d int) string {
|
||||
return v.Name()
|
||||
}
|
||||
|
||||
// ExtractEndpoint extract *Endpoint from reflect.Method
|
||||
func ExtractEndpoint(method reflect.Method) *Endpoint {
|
||||
if method.PkgPath != "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
var rspType, reqType reflect.Type
|
||||
var stream bool
|
||||
mt := method.Type
|
||||
|
||||
switch mt.NumIn() {
|
||||
case 3:
|
||||
reqType = mt.In(1)
|
||||
rspType = mt.In(2)
|
||||
case 4:
|
||||
reqType = mt.In(2)
|
||||
rspType = mt.In(3)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
// are we dealing with a stream?
|
||||
switch rspType.Kind() {
|
||||
case reflect.Func, reflect.Interface:
|
||||
stream = true
|
||||
}
|
||||
|
||||
request := ExtractValue(reqType, 0)
|
||||
response := ExtractValue(rspType, 0)
|
||||
if request == "" || response == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
ep := &Endpoint{
|
||||
Name: method.Name,
|
||||
Request: request,
|
||||
Response: response,
|
||||
Metadata: metadata.New(0),
|
||||
}
|
||||
|
||||
if stream {
|
||||
ep.Metadata.Set("stream", fmt.Sprintf("%v", stream))
|
||||
}
|
||||
|
||||
return ep
|
||||
}
|
||||
|
||||
// ExtractSubValue exctact *Value from reflect.Type
|
||||
func ExtractSubValue(typ reflect.Type) string {
|
||||
var reqType reflect.Type
|
||||
|
@ -2,8 +2,6 @@ package register
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type TestHandler struct{}
|
||||
@ -15,40 +13,3 @@ type TestResponse struct{}
|
||||
func (t *TestHandler) Test(ctx context.Context, req *TestRequest, rsp *TestResponse) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestExtractEndpoint(t *testing.T) {
|
||||
handler := &TestHandler{}
|
||||
typ := reflect.TypeOf(handler)
|
||||
|
||||
var endpoints []*Endpoint
|
||||
|
||||
for m := 0; m < typ.NumMethod(); m++ {
|
||||
if e := ExtractEndpoint(typ.Method(m)); e != nil {
|
||||
endpoints = append(endpoints, e)
|
||||
}
|
||||
}
|
||||
|
||||
if i := len(endpoints); i != 1 {
|
||||
t.Fatalf("Expected 1 endpoint, have %d", i)
|
||||
}
|
||||
|
||||
if endpoints[0].Name != "Test" {
|
||||
t.Fatalf("Expected handler Test, got %s", endpoints[0].Name)
|
||||
}
|
||||
|
||||
if endpoints[0].Request == "" {
|
||||
t.Fatal("Expected non nil Request")
|
||||
}
|
||||
|
||||
if endpoints[0].Response == "" {
|
||||
t.Fatal("Expected non nil Request")
|
||||
}
|
||||
|
||||
if endpoints[0].Request != "TestRequest" {
|
||||
t.Fatalf("Expected TestRequest got %s", endpoints[0].Request)
|
||||
}
|
||||
|
||||
if endpoints[0].Response != "TestResponse" {
|
||||
t.Fatalf("Expected TestResponse got %s", endpoints[0].Response)
|
||||
}
|
||||
}
|
||||
|
@ -23,11 +23,10 @@ type node struct {
|
||||
}
|
||||
|
||||
type record struct {
|
||||
Name string
|
||||
Version string
|
||||
Metadata map[string]string
|
||||
Nodes map[string]*node
|
||||
Endpoints []*register.Endpoint
|
||||
Name string
|
||||
Version string
|
||||
Metadata map[string]string
|
||||
Nodes map[string]*node
|
||||
}
|
||||
|
||||
type memory struct {
|
||||
@ -59,7 +58,7 @@ func (m *memory) ttlPrune() {
|
||||
|
||||
for range prune.C {
|
||||
m.Lock()
|
||||
for domain, services := range m.records {
|
||||
for namespace, services := range m.records {
|
||||
for service, versions := range services {
|
||||
for version, record := range versions {
|
||||
for id, n := range record.Nodes {
|
||||
@ -67,7 +66,7 @@ func (m *memory) ttlPrune() {
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register TTL expired for node %s of service %s", n.ID, service))
|
||||
}
|
||||
delete(m.records[domain][service][version].Nodes, id)
|
||||
delete(m.records[namespace][service][version].Nodes, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -131,17 +130,12 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
|
||||
options := register.NewRegisterOptions(opts...)
|
||||
|
||||
// get the services for this domain from the register
|
||||
srvs, ok := m.records[options.Domain]
|
||||
srvs, ok := m.records[options.Namespace]
|
||||
if !ok {
|
||||
srvs = make(services)
|
||||
}
|
||||
|
||||
// domain is set in metadata so it can be passed to watchers
|
||||
if s.Metadata == nil {
|
||||
s.Metadata = map[string]string{"domain": options.Domain}
|
||||
} else {
|
||||
s.Metadata["domain"] = options.Domain
|
||||
}
|
||||
s.Namespace = options.Namespace
|
||||
|
||||
// ensure the service name exists
|
||||
r := serviceToRecord(s, options.TTL)
|
||||
@ -154,7 +148,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new service: %s, version: %s", s.Name, s.Version))
|
||||
}
|
||||
m.records[options.Domain] = srvs
|
||||
m.records[options.Namespace] = srvs
|
||||
go m.sendEvent(®ister.Result{Action: "create", Service: s})
|
||||
}
|
||||
|
||||
@ -173,9 +167,6 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
|
||||
metadata[k] = v
|
||||
}
|
||||
|
||||
// set the domain
|
||||
metadata["domain"] = options.Domain
|
||||
|
||||
// add the node
|
||||
srvs[s.Name][s.Version].Nodes[n.ID] = &node{
|
||||
Node: ®ister.Node{
|
||||
@ -206,7 +197,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
|
||||
}
|
||||
}
|
||||
|
||||
m.records[options.Domain] = srvs
|
||||
m.records[options.Namespace] = srvs
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -216,15 +207,8 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
|
||||
|
||||
options := register.NewDeregisterOptions(opts...)
|
||||
|
||||
// domain is set in metadata so it can be passed to watchers
|
||||
if s.Metadata == nil {
|
||||
s.Metadata = map[string]string{"domain": options.Domain}
|
||||
} else {
|
||||
s.Metadata["domain"] = options.Domain
|
||||
}
|
||||
|
||||
// if the domain doesn't exist, there is nothing to deregister
|
||||
services, ok := m.records[options.Domain]
|
||||
services, ok := m.records[options.Namespace]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
@ -253,7 +237,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
|
||||
// if the nodes not empty, we replace the version in the store and exist, the rest of the logic
|
||||
// is cleanup
|
||||
if len(version.Nodes) > 0 {
|
||||
m.records[options.Domain][s.Name][s.Version] = version
|
||||
m.records[options.Namespace][s.Name][s.Version] = version
|
||||
go m.sendEvent(®ister.Result{Action: "update", Service: s})
|
||||
return nil
|
||||
}
|
||||
@ -261,7 +245,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
|
||||
// if this version was the only version of the service, we can remove the whole service from the
|
||||
// register and exit
|
||||
if len(versions) == 1 {
|
||||
delete(m.records[options.Domain], s.Name)
|
||||
delete(m.records[options.Namespace], s.Name)
|
||||
go m.sendEvent(®ister.Result{Action: "delete", Service: s})
|
||||
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
@ -271,7 +255,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
|
||||
}
|
||||
|
||||
// there are other versions of the service running, so only remove this version of it
|
||||
delete(m.records[options.Domain][s.Name], s.Version)
|
||||
delete(m.records[options.Namespace][s.Name], s.Version)
|
||||
go m.sendEvent(®ister.Result{Action: "delete", Service: s})
|
||||
if m.opts.Logger.V(logger.DebugLevel) {
|
||||
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s, version: %s", s.Name, s.Version))
|
||||
@ -284,15 +268,15 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...registe
|
||||
options := register.NewLookupOptions(opts...)
|
||||
|
||||
// if it's a wildcard domain, return from all domains
|
||||
if options.Domain == register.WildcardDomain {
|
||||
if options.Namespace == register.WildcardNamespace {
|
||||
m.RLock()
|
||||
recs := m.records
|
||||
m.RUnlock()
|
||||
|
||||
var services []*register.Service
|
||||
|
||||
for domain := range recs {
|
||||
srvs, err := m.LookupService(ctx, name, append(opts, register.LookupDomain(domain))...)
|
||||
for namespace := range recs {
|
||||
srvs, err := m.LookupService(ctx, name, append(opts, register.LookupNamespace(namespace))...)
|
||||
if err == register.ErrNotFound {
|
||||
continue
|
||||
} else if err != nil {
|
||||
@ -311,7 +295,7 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...registe
|
||||
defer m.RUnlock()
|
||||
|
||||
// check the domain exists
|
||||
services, ok := m.records[options.Domain]
|
||||
services, ok := m.records[options.Namespace]
|
||||
if !ok {
|
||||
return nil, register.ErrNotFound
|
||||
}
|
||||
@ -328,7 +312,7 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...registe
|
||||
var i int
|
||||
|
||||
for _, r := range versions {
|
||||
result[i] = recordToService(r, options.Domain)
|
||||
result[i] = recordToService(r, options.Namespace)
|
||||
i++
|
||||
}
|
||||
|
||||
@ -339,15 +323,15 @@ func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption)
|
||||
options := register.NewListOptions(opts...)
|
||||
|
||||
// if it's a wildcard domain, list from all domains
|
||||
if options.Domain == register.WildcardDomain {
|
||||
if options.Namespace == register.WildcardNamespace {
|
||||
m.RLock()
|
||||
recs := m.records
|
||||
m.RUnlock()
|
||||
|
||||
var services []*register.Service
|
||||
|
||||
for domain := range recs {
|
||||
srvs, err := m.ListServices(ctx, append(opts, register.ListDomain(domain))...)
|
||||
for namespace := range recs {
|
||||
srvs, err := m.ListServices(ctx, append(opts, register.ListNamespace(namespace))...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -361,7 +345,7 @@ func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption)
|
||||
defer m.RUnlock()
|
||||
|
||||
// ensure the domain exists
|
||||
services, ok := m.records[options.Domain]
|
||||
services, ok := m.records[options.Namespace]
|
||||
if !ok {
|
||||
return make([]*register.Service, 0), nil
|
||||
}
|
||||
@ -371,7 +355,7 @@ func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption)
|
||||
|
||||
for _, service := range services {
|
||||
for _, version := range service {
|
||||
result = append(result, recordToService(version, options.Domain))
|
||||
result = append(result, recordToService(version, options.Namespace))
|
||||
}
|
||||
}
|
||||
|
||||
@ -426,16 +410,13 @@ func (m *watcher) Next() (*register.Result, error) {
|
||||
continue
|
||||
}
|
||||
|
||||
// extract domain from service metadata
|
||||
var domain string
|
||||
if r.Service.Metadata != nil && len(r.Service.Metadata["domain"]) > 0 {
|
||||
domain = r.Service.Metadata["domain"]
|
||||
} else {
|
||||
domain = register.DefaultDomain
|
||||
namespace := register.DefaultNamespace
|
||||
if r.Service.Namespace != "" {
|
||||
namespace = r.Service.Namespace
|
||||
}
|
||||
|
||||
// only send the event if watching the wildcard or this specific domain
|
||||
if m.wo.Domain == register.WildcardDomain || m.wo.Domain == domain {
|
||||
if m.wo.Namespace == register.WildcardNamespace || m.wo.Namespace == namespace {
|
||||
return r, nil
|
||||
}
|
||||
case <-m.exit:
|
||||
@ -454,11 +435,6 @@ func (m *watcher) Stop() {
|
||||
}
|
||||
|
||||
func serviceToRecord(s *register.Service, ttl time.Duration) *record {
|
||||
metadata := make(map[string]string, len(s.Metadata))
|
||||
for k, v := range s.Metadata {
|
||||
metadata[k] = v
|
||||
}
|
||||
|
||||
nodes := make(map[string]*node, len(s.Nodes))
|
||||
for _, n := range s.Nodes {
|
||||
nodes[n.ID] = &node{
|
||||
@ -468,42 +444,19 @@ func serviceToRecord(s *register.Service, ttl time.Duration) *record {
|
||||
}
|
||||
}
|
||||
|
||||
endpoints := make([]*register.Endpoint, len(s.Endpoints))
|
||||
copy(endpoints, s.Endpoints)
|
||||
|
||||
return &record{
|
||||
Name: s.Name,
|
||||
Version: s.Version,
|
||||
Metadata: metadata,
|
||||
Nodes: nodes,
|
||||
Endpoints: endpoints,
|
||||
Name: s.Name,
|
||||
Version: s.Version,
|
||||
Nodes: nodes,
|
||||
}
|
||||
}
|
||||
|
||||
func recordToService(r *record, domain string) *register.Service {
|
||||
func recordToService(r *record, namespace string) *register.Service {
|
||||
metadata := make(map[string]string, len(r.Metadata))
|
||||
for k, v := range r.Metadata {
|
||||
metadata[k] = v
|
||||
}
|
||||
|
||||
// set the domain in metadata so it can be determined when a wildcard query is performed
|
||||
metadata["domain"] = domain
|
||||
|
||||
endpoints := make([]*register.Endpoint, len(r.Endpoints))
|
||||
for i, e := range r.Endpoints {
|
||||
md := make(map[string]string, len(e.Metadata))
|
||||
for k, v := range e.Metadata {
|
||||
md[k] = v
|
||||
}
|
||||
|
||||
endpoints[i] = ®ister.Endpoint{
|
||||
Name: e.Name,
|
||||
Request: e.Request,
|
||||
Response: e.Response,
|
||||
Metadata: md,
|
||||
}
|
||||
}
|
||||
|
||||
nodes := make([]*register.Node, len(r.Nodes))
|
||||
i := 0
|
||||
for _, n := range r.Nodes {
|
||||
@ -523,8 +476,7 @@ func recordToService(r *record, domain string) *register.Service {
|
||||
return ®ister.Service{
|
||||
Name: r.Name,
|
||||
Version: r.Version,
|
||||
Metadata: metadata,
|
||||
Endpoints: endpoints,
|
||||
Nodes: nodes,
|
||||
Namespace: namespace,
|
||||
}
|
||||
}
|
||||
|
@ -253,32 +253,32 @@ func TestMemoryWildcard(t *testing.T) {
|
||||
|
||||
testSrv := ®ister.Service{Name: "foo", Version: "1.0.0"}
|
||||
|
||||
if err := m.Register(ctx, testSrv, register.RegisterDomain("one")); err != nil {
|
||||
if err := m.Register(ctx, testSrv, register.RegisterNamespace("one")); err != nil {
|
||||
t.Fatalf("Register err: %v", err)
|
||||
}
|
||||
if err := m.Register(ctx, testSrv, register.RegisterDomain("two")); err != nil {
|
||||
if err := m.Register(ctx, testSrv, register.RegisterNamespace("two")); err != nil {
|
||||
t.Fatalf("Register err: %v", err)
|
||||
}
|
||||
|
||||
if recs, err := m.ListServices(ctx, register.ListDomain("one")); err != nil {
|
||||
if recs, err := m.ListServices(ctx, register.ListNamespace("one")); err != nil {
|
||||
t.Errorf("List err: %v", err)
|
||||
} else if len(recs) != 1 {
|
||||
t.Errorf("Expected 1 record, got %v", len(recs))
|
||||
}
|
||||
|
||||
if recs, err := m.ListServices(ctx, register.ListDomain("*")); err != nil {
|
||||
if recs, err := m.ListServices(ctx, register.ListNamespace("*")); err != nil {
|
||||
t.Errorf("List err: %v", err)
|
||||
} else if len(recs) != 2 {
|
||||
t.Errorf("Expected 2 records, got %v", len(recs))
|
||||
}
|
||||
|
||||
if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("one")); err != nil {
|
||||
if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupNamespace("one")); err != nil {
|
||||
t.Errorf("Lookup err: %v", err)
|
||||
} else if len(recs) != 1 {
|
||||
t.Errorf("Expected 1 record, got %v", len(recs))
|
||||
}
|
||||
|
||||
if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("*")); err != nil {
|
||||
if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupNamespace("*")); err != nil {
|
||||
t.Errorf("Lookup err: %v", err)
|
||||
} else if len(recs) != 2 {
|
||||
t.Errorf("Expected 2 records, got %v", len(recs))
|
||||
|
@ -46,17 +46,17 @@ func NewOptions(opts ...Option) Options {
|
||||
|
||||
// RegisterOptions holds options for register method
|
||||
type RegisterOptions struct { // nolint: golint,revive
|
||||
Context context.Context
|
||||
Domain string
|
||||
TTL time.Duration
|
||||
Attempts int
|
||||
Context context.Context
|
||||
Namespace string
|
||||
TTL time.Duration
|
||||
Attempts int
|
||||
}
|
||||
|
||||
// NewRegisterOptions returns register options struct filled by opts
|
||||
func NewRegisterOptions(opts ...RegisterOption) RegisterOptions {
|
||||
options := RegisterOptions{
|
||||
Domain: DefaultDomain,
|
||||
Context: context.Background(),
|
||||
Namespace: DefaultNamespace,
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
@ -72,15 +72,15 @@ type WatchOptions struct {
|
||||
// Other options for implementations of the interface
|
||||
// can be stored in a context
|
||||
Context context.Context
|
||||
// Domain to watch
|
||||
Domain string
|
||||
// Namespace to watch
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// NewWatchOptions returns watch options filled by opts
|
||||
func NewWatchOptions(opts ...WatchOption) WatchOptions {
|
||||
options := WatchOptions{
|
||||
Domain: DefaultDomain,
|
||||
Context: context.Background(),
|
||||
Namespace: DefaultNamespace,
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
@ -91,8 +91,8 @@ func NewWatchOptions(opts ...WatchOption) WatchOptions {
|
||||
// DeregisterOptions holds options for deregister method
|
||||
type DeregisterOptions struct {
|
||||
Context context.Context
|
||||
// Domain the service was registered in
|
||||
Domain string
|
||||
// Namespace the service was registered in
|
||||
Namespace string
|
||||
// Atempts specify max attempts for deregister
|
||||
Attempts int
|
||||
}
|
||||
@ -100,8 +100,8 @@ type DeregisterOptions struct {
|
||||
// NewDeregisterOptions returns options for deregister filled by opts
|
||||
func NewDeregisterOptions(opts ...DeregisterOption) DeregisterOptions {
|
||||
options := DeregisterOptions{
|
||||
Domain: DefaultDomain,
|
||||
Context: context.Background(),
|
||||
Namespace: DefaultNamespace,
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
@ -112,15 +112,15 @@ func NewDeregisterOptions(opts ...DeregisterOption) DeregisterOptions {
|
||||
// LookupOptions holds lookup options
|
||||
type LookupOptions struct {
|
||||
Context context.Context
|
||||
// Domain to scope the request to
|
||||
Domain string
|
||||
// Namespace to scope the request to
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// NewLookupOptions returns lookup options filled by opts
|
||||
func NewLookupOptions(opts ...LookupOption) LookupOptions {
|
||||
options := LookupOptions{
|
||||
Domain: DefaultDomain,
|
||||
Context: context.Background(),
|
||||
Namespace: DefaultNamespace,
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
@ -131,15 +131,15 @@ func NewLookupOptions(opts ...LookupOption) LookupOptions {
|
||||
// ListOptions holds the list options for list method
|
||||
type ListOptions struct {
|
||||
Context context.Context
|
||||
// Domain to scope the request to
|
||||
Domain string
|
||||
// Namespace to scope the request to
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// NewListOptions returns list options filled by opts
|
||||
func NewListOptions(opts ...ListOption) ListOptions {
|
||||
options := ListOptions{
|
||||
Domain: DefaultDomain,
|
||||
Context: context.Background(),
|
||||
Namespace: DefaultNamespace,
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
@ -217,10 +217,10 @@ func RegisterContext(ctx context.Context) RegisterOption { // nolint: golint,rev
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterDomain secifies register domain
|
||||
func RegisterDomain(d string) RegisterOption { // nolint: golint,revive
|
||||
// RegisterNamespace secifies register Namespace
|
||||
func RegisterNamespace(d string) RegisterOption { // nolint: golint,revive
|
||||
return func(o *RegisterOptions) {
|
||||
o.Domain = d
|
||||
o.Namespace = d
|
||||
}
|
||||
}
|
||||
|
||||
@ -238,10 +238,10 @@ func WatchContext(ctx context.Context) WatchOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WatchDomain sets the domain for watch
|
||||
func WatchDomain(d string) WatchOption {
|
||||
// WatchNamespace sets the Namespace for watch
|
||||
func WatchNamespace(d string) WatchOption {
|
||||
return func(o *WatchOptions) {
|
||||
o.Domain = d
|
||||
o.Namespace = d
|
||||
}
|
||||
}
|
||||
|
||||
@ -259,10 +259,10 @@ func DeregisterContext(ctx context.Context) DeregisterOption {
|
||||
}
|
||||
}
|
||||
|
||||
// DeregisterDomain specifies deregister domain
|
||||
func DeregisterDomain(d string) DeregisterOption {
|
||||
// DeregisterNamespace specifies deregister Namespace
|
||||
func DeregisterNamespace(d string) DeregisterOption {
|
||||
return func(o *DeregisterOptions) {
|
||||
o.Domain = d
|
||||
o.Namespace = d
|
||||
}
|
||||
}
|
||||
|
||||
@ -273,10 +273,10 @@ func LookupContext(ctx context.Context) LookupOption {
|
||||
}
|
||||
}
|
||||
|
||||
// LookupDomain sets the domain for lookup
|
||||
func LookupDomain(d string) LookupOption {
|
||||
// LookupNamespace sets the Namespace for lookup
|
||||
func LookupNamespace(d string) LookupOption {
|
||||
return func(o *LookupOptions) {
|
||||
o.Domain = d
|
||||
o.Namespace = d
|
||||
}
|
||||
}
|
||||
|
||||
@ -287,10 +287,10 @@ func ListContext(ctx context.Context) ListOption {
|
||||
}
|
||||
}
|
||||
|
||||
// ListDomain sets the domain for list method
|
||||
func ListDomain(d string) ListOption {
|
||||
// ListNamespace sets the Namespace for list method
|
||||
func ListNamespace(d string) ListOption {
|
||||
return func(o *ListOptions) {
|
||||
o.Domain = d
|
||||
o.Namespace = d
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,12 +9,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// WildcardDomain indicates any domain
|
||||
WildcardDomain = "*"
|
||||
// WildcardNamespace indicates any Namespace
|
||||
WildcardNamespace = "*"
|
||||
)
|
||||
|
||||
// DefaultDomain to use if none was provided in options
|
||||
var DefaultDomain = "micro"
|
||||
// DefaultNamespace to use if none was provided in options
|
||||
var DefaultNamespace = "micro"
|
||||
|
||||
var (
|
||||
// DefaultRegister is the global default register
|
||||
@ -59,26 +59,17 @@ type Register interface {
|
||||
|
||||
// Service holds service register info
|
||||
type Service struct {
|
||||
Name string `json:"name"`
|
||||
Version string `json:"version"`
|
||||
Metadata metadata.Metadata `json:"metadata"`
|
||||
Endpoints []*Endpoint `json:"endpoints"`
|
||||
Nodes []*Node `json:"nodes"`
|
||||
Name string `json:"name,omitempty"`
|
||||
Version string `json:"version,omitempty"`
|
||||
Nodes []*Node `json:"nodes,omitempty"`
|
||||
Namespace string `json:"namespace,omitempty"`
|
||||
}
|
||||
|
||||
// Node holds node register info
|
||||
type Node struct {
|
||||
Metadata metadata.Metadata `json:"metadata"`
|
||||
ID string `json:"id"`
|
||||
Address string `json:"address"`
|
||||
}
|
||||
|
||||
// Endpoint holds endpoint register info
|
||||
type Endpoint struct {
|
||||
Request string `json:"request"`
|
||||
Response string `json:"response"`
|
||||
Metadata metadata.Metadata `json:"metadata"`
|
||||
Name string `json:"name"`
|
||||
Metadata metadata.Metadata `json:"metadata,omitempty"`
|
||||
ID string `json:"id,omitempty"`
|
||||
Address string `json:"address,omitempty"`
|
||||
}
|
||||
|
||||
// Option func signature
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -32,38 +31,21 @@ const (
|
||||
)
|
||||
|
||||
type rpcHandler struct {
|
||||
opts HandlerOptions
|
||||
handler interface{}
|
||||
name string
|
||||
endpoints []*register.Endpoint
|
||||
opts HandlerOptions
|
||||
handler interface{}
|
||||
name string
|
||||
}
|
||||
|
||||
func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler {
|
||||
options := NewHandlerOptions(opts...)
|
||||
|
||||
typ := reflect.TypeOf(handler)
|
||||
hdlr := reflect.ValueOf(handler)
|
||||
name := reflect.Indirect(hdlr).Type().Name()
|
||||
|
||||
var endpoints []*register.Endpoint
|
||||
|
||||
for m := 0; m < typ.NumMethod(); m++ {
|
||||
if e := register.ExtractEndpoint(typ.Method(m)); e != nil {
|
||||
e.Name = name + "." + e.Name
|
||||
|
||||
for k, v := range options.Metadata[e.Name] {
|
||||
e.Metadata[k] = v
|
||||
}
|
||||
|
||||
endpoints = append(endpoints, e)
|
||||
}
|
||||
}
|
||||
|
||||
return &rpcHandler{
|
||||
name: name,
|
||||
handler: handler,
|
||||
endpoints: endpoints,
|
||||
opts: options,
|
||||
name: name,
|
||||
handler: handler,
|
||||
opts: options,
|
||||
}
|
||||
}
|
||||
|
||||
@ -75,10 +57,6 @@ func (r *rpcHandler) Handler() interface{} {
|
||||
return r.handler
|
||||
}
|
||||
|
||||
func (r *rpcHandler) Endpoints() []*register.Endpoint {
|
||||
return r.endpoints
|
||||
}
|
||||
|
||||
func (r *rpcHandler) Options() HandlerOptions {
|
||||
return r.opts
|
||||
}
|
||||
@ -249,35 +227,6 @@ func (n *noopServer) Register() error {
|
||||
return err
|
||||
}
|
||||
|
||||
n.RLock()
|
||||
handlerList := make([]string, 0, len(n.handlers))
|
||||
for n := range n.handlers {
|
||||
handlerList = append(handlerList, n)
|
||||
}
|
||||
|
||||
sort.Strings(handlerList)
|
||||
|
||||
subscriberList := make([]*subscriber, 0, len(n.subscribers))
|
||||
for e := range n.subscribers {
|
||||
subscriberList = append(subscriberList, e)
|
||||
}
|
||||
sort.Slice(subscriberList, func(i, j int) bool {
|
||||
return subscriberList[i].topic > subscriberList[j].topic
|
||||
})
|
||||
|
||||
endpoints := make([]*register.Endpoint, 0, len(handlerList)+len(subscriberList))
|
||||
for _, h := range handlerList {
|
||||
endpoints = append(endpoints, n.handlers[h].Endpoints()...)
|
||||
}
|
||||
for _, e := range subscriberList {
|
||||
endpoints = append(endpoints, e.Endpoints()...)
|
||||
}
|
||||
n.RUnlock()
|
||||
|
||||
service.Nodes[0].Metadata["protocol"] = "noop"
|
||||
service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"]
|
||||
service.Endpoints = endpoints
|
||||
|
||||
n.RLock()
|
||||
registered := n.registered
|
||||
n.RUnlock()
|
||||
@ -576,7 +525,6 @@ func (n *noopServer) Stop() error {
|
||||
}
|
||||
|
||||
func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
|
||||
var endpoints []*register.Endpoint
|
||||
var handlers []*handler
|
||||
|
||||
options := NewSubscriberOptions(opts...)
|
||||
@ -595,18 +543,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
|
||||
}
|
||||
|
||||
handlers = append(handlers, h)
|
||||
ep := ®ister.Endpoint{
|
||||
Name: "Func",
|
||||
Request: register.ExtractSubValue(typ),
|
||||
Metadata: metadata.New(2),
|
||||
}
|
||||
ep.Metadata.Set("topic", topic)
|
||||
ep.Metadata.Set("subscriber", "true")
|
||||
endpoints = append(endpoints, ep)
|
||||
} else {
|
||||
hdlr := reflect.ValueOf(sub)
|
||||
name := reflect.Indirect(hdlr).Type().Name()
|
||||
|
||||
for m := 0; m < typ.NumMethod(); m++ {
|
||||
method := typ.Method(m)
|
||||
h := &handler{
|
||||
@ -622,14 +559,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
|
||||
}
|
||||
|
||||
handlers = append(handlers, h)
|
||||
ep := ®ister.Endpoint{
|
||||
Name: name + "." + method.Name,
|
||||
Request: register.ExtractSubValue(method.Type),
|
||||
Metadata: metadata.New(2),
|
||||
}
|
||||
ep.Metadata.Set("topic", topic)
|
||||
ep.Metadata.Set("subscriber", "true")
|
||||
endpoints = append(endpoints, ep)
|
||||
}
|
||||
}
|
||||
|
||||
@ -639,7 +568,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs
|
||||
topic: topic,
|
||||
subscriber: sub,
|
||||
handlers: handlers,
|
||||
endpoints: endpoints,
|
||||
opts: options,
|
||||
}
|
||||
}
|
||||
@ -766,10 +694,6 @@ func (s *subscriber) Subscriber() interface{} {
|
||||
return s.subscriber
|
||||
}
|
||||
|
||||
func (s *subscriber) Endpoints() []*register.Endpoint {
|
||||
return s.endpoints
|
||||
}
|
||||
|
||||
func (s *subscriber) Options() SubscriberOptions {
|
||||
return s.opts
|
||||
}
|
||||
@ -780,8 +704,7 @@ type subscriber struct {
|
||||
typ reflect.Type
|
||||
subscriber interface{}
|
||||
|
||||
endpoints []*register.Endpoint
|
||||
handlers []*handler
|
||||
handlers []*handler
|
||||
|
||||
rcvr reflect.Value
|
||||
opts SubscriberOptions
|
||||
|
@ -17,7 +17,7 @@ var (
|
||||
|
||||
opts := []register.RegisterOption{
|
||||
register.RegisterTTL(config.RegisterTTL),
|
||||
register.RegisterDomain(config.Namespace),
|
||||
register.RegisterNamespace(config.Namespace),
|
||||
}
|
||||
|
||||
for i := 0; i <= config.RegisterAttempts; i++ {
|
||||
@ -36,7 +36,7 @@ var (
|
||||
var err error
|
||||
|
||||
opts := []register.DeregisterOption{
|
||||
register.DeregisterDomain(config.Namespace),
|
||||
register.DeregisterNamespace(config.Namespace),
|
||||
}
|
||||
|
||||
for i := 0; i <= config.DeregisterAttempts; i++ {
|
||||
@ -82,9 +82,8 @@ func NewRegisterService(s Server) (*register.Service, error) {
|
||||
node.Metadata["register"] = opts.Register.String()
|
||||
|
||||
return ®ister.Service{
|
||||
Name: opts.Name,
|
||||
Version: opts.Version,
|
||||
Nodes: []*register.Node{node},
|
||||
Metadata: metadata.New(0),
|
||||
Name: opts.Name,
|
||||
Version: opts.Version,
|
||||
Nodes: []*register.Node{node},
|
||||
}, nil
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
)
|
||||
|
||||
// DefaultServer default server
|
||||
@ -170,7 +169,6 @@ type Stream interface {
|
||||
type Handler interface {
|
||||
Name() string
|
||||
Handler() interface{}
|
||||
Endpoints() []*register.Endpoint
|
||||
Options() HandlerOptions
|
||||
}
|
||||
|
||||
@ -180,6 +178,5 @@ type Handler interface {
|
||||
type Subscriber interface {
|
||||
Topic() string
|
||||
Subscriber() interface{}
|
||||
Endpoints() []*register.Endpoint
|
||||
Options() SubscriberOptions
|
||||
}
|
||||
|
@ -71,14 +71,6 @@ func CopyService(service *register.Service) *register.Service {
|
||||
}
|
||||
s.Nodes = nodes
|
||||
|
||||
// copy endpoints
|
||||
eps := make([]*register.Endpoint, len(service.Endpoints))
|
||||
for j, ep := range service.Endpoints {
|
||||
e := ®ister.Endpoint{}
|
||||
*e = *ep
|
||||
eps[j] = e
|
||||
}
|
||||
s.Endpoints = eps
|
||||
return s
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user