From d2911028776c66c17fb98d708d140795608d58a0 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 27 Dec 2024 01:08:00 +0300 Subject: [PATCH 01/19] register: improvements * change domain to namespace * lower go.mod deps Signed-off-by: Vasiliy Tolstov --- go.mod | 6 +- go.sum | 9 +-- register/extractor.go | 50 --------------- register/extractor_test.go | 39 ----------- register/memory/memory.go | 114 ++++++++++----------------------- register/memory/memory_test.go | 12 ++-- register/options.go | 74 ++++++++++----------- register/register.go | 31 ++++----- server/noop.go | 91 ++------------------------ server/registry.go | 11 ++-- server/server.go | 3 - util/register/util.go | 8 --- 12 files changed, 107 insertions(+), 341 deletions(-) diff --git a/go.mod b/go.mod index b164472b..6af8b0b9 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,12 @@ module go.unistack.org/micro/v3 -go 1.22.2 +go 1.22.0 require ( dario.cat/mergo v1.0.1 - github.com/DATA-DOG/go-sqlmock v1.5.0 + github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/KimMachineGun/automemlimit v0.6.1 - github.com/ash3in/uuidv8 v1.0.1 + github.com/ash3in/uuidv8 v1.2.0 github.com/google/uuid v1.6.0 github.com/matoous/go-nanoid v1.5.1 github.com/patrickmn/go-cache v2.1.0+incompatible diff --git a/go.sum b/go.sum index 59f72f82..72be286c 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,11 @@ dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= -github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= -github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/KimMachineGun/automemlimit v0.6.1 h1:ILa9j1onAAMadBsyyUJv5cack8Y1WT26yLj/V+ulKp8= github.com/KimMachineGun/automemlimit v0.6.1/go.mod h1:T7xYht7B8r6AG/AqFcUdc7fzd2bIdBKmepfP2S1svPY= -github.com/ash3in/uuidv8 v1.0.1 h1:dIq1XRkWT8lGA7N5s7WRTB4V3k49WTBLvILz7aCLp80= -github.com/ash3in/uuidv8 v1.0.1/go.mod h1:EoyUgCtxNBnrnpc9efw5rVN1cQ+LFGCoJiFuD6maOMw= +github.com/ash3in/uuidv8 v1.2.0 h1:2oogGdtCPwaVtyvPPGin4TfZLtOGE5F+W++E880G6SI= +github.com/ash3in/uuidv8 v1.2.0/go.mod h1:BnU0wJBxnzdEKmVg4xckBkD+VZuecTFTUP3M0dWgyY4= github.com/cilium/ebpf v0.16.0 h1:+BiEnHL6Z7lXnlGUsXQPPAE7+kenAd4ES8MQ5min0Ok= github.com/cilium/ebpf v0.16.0/go.mod h1:L7u2Blt2jMM/vLAVgjxluxtBKlz3/GWjB0dMOEngfwE= github.com/containerd/cgroups/v3 v3.0.4 h1:2fs7l3P0Qxb1nKWuJNFiwhp2CqiKzho71DQkDrHJIo4= @@ -35,6 +35,7 @@ github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtL github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/jsimonetti/rtnetlink/v2 v2.0.1 h1:xda7qaHDSVOsADNouv7ukSuicKZO7GgVUCXxpaIEIlM= github.com/jsimonetti/rtnetlink/v2 v2.0.1/go.mod h1:7MoNYNbb3UaDHtF8udiJo/RH6VsTKP1pqKLUTVCvToE= +github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= diff --git a/register/extractor.go b/register/extractor.go index 923729af..08cd170a 100644 --- a/register/extractor.go +++ b/register/extractor.go @@ -1,12 +1,9 @@ package register import ( - "fmt" "reflect" "unicode" "unicode/utf8" - - "go.unistack.org/micro/v3/metadata" ) // ExtractValue from reflect.Type from specified depth @@ -38,53 +35,6 @@ func ExtractValue(v reflect.Type, d int) string { return v.Name() } -// ExtractEndpoint extract *Endpoint from reflect.Method -func ExtractEndpoint(method reflect.Method) *Endpoint { - if method.PkgPath != "" { - return nil - } - - var rspType, reqType reflect.Type - var stream bool - mt := method.Type - - switch mt.NumIn() { - case 3: - reqType = mt.In(1) - rspType = mt.In(2) - case 4: - reqType = mt.In(2) - rspType = mt.In(3) - default: - return nil - } - - // are we dealing with a stream? - switch rspType.Kind() { - case reflect.Func, reflect.Interface: - stream = true - } - - request := ExtractValue(reqType, 0) - response := ExtractValue(rspType, 0) - if request == "" || response == "" { - return nil - } - - ep := &Endpoint{ - Name: method.Name, - Request: request, - Response: response, - Metadata: metadata.New(0), - } - - if stream { - ep.Metadata.Set("stream", fmt.Sprintf("%v", stream)) - } - - return ep -} - // ExtractSubValue exctact *Value from reflect.Type func ExtractSubValue(typ reflect.Type) string { var reqType reflect.Type diff --git a/register/extractor_test.go b/register/extractor_test.go index d72c65a2..316a0a7a 100644 --- a/register/extractor_test.go +++ b/register/extractor_test.go @@ -2,8 +2,6 @@ package register import ( "context" - "reflect" - "testing" ) type TestHandler struct{} @@ -15,40 +13,3 @@ type TestResponse struct{} func (t *TestHandler) Test(ctx context.Context, req *TestRequest, rsp *TestResponse) error { return nil } - -func TestExtractEndpoint(t *testing.T) { - handler := &TestHandler{} - typ := reflect.TypeOf(handler) - - var endpoints []*Endpoint - - for m := 0; m < typ.NumMethod(); m++ { - if e := ExtractEndpoint(typ.Method(m)); e != nil { - endpoints = append(endpoints, e) - } - } - - if i := len(endpoints); i != 1 { - t.Fatalf("Expected 1 endpoint, have %d", i) - } - - if endpoints[0].Name != "Test" { - t.Fatalf("Expected handler Test, got %s", endpoints[0].Name) - } - - if endpoints[0].Request == "" { - t.Fatal("Expected non nil Request") - } - - if endpoints[0].Response == "" { - t.Fatal("Expected non nil Request") - } - - if endpoints[0].Request != "TestRequest" { - t.Fatalf("Expected TestRequest got %s", endpoints[0].Request) - } - - if endpoints[0].Response != "TestResponse" { - t.Fatalf("Expected TestResponse got %s", endpoints[0].Response) - } -} diff --git a/register/memory/memory.go b/register/memory/memory.go index 8a13a7fb..28d309f9 100644 --- a/register/memory/memory.go +++ b/register/memory/memory.go @@ -23,11 +23,10 @@ type node struct { } type record struct { - Name string - Version string - Metadata map[string]string - Nodes map[string]*node - Endpoints []*register.Endpoint + Name string + Version string + Metadata map[string]string + Nodes map[string]*node } type memory struct { @@ -59,7 +58,7 @@ func (m *memory) ttlPrune() { for range prune.C { m.Lock() - for domain, services := range m.records { + for namespace, services := range m.records { for service, versions := range services { for version, record := range versions { for id, n := range record.Nodes { @@ -67,7 +66,7 @@ func (m *memory) ttlPrune() { if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register TTL expired for node %s of service %s", n.ID, service)) } - delete(m.records[domain][service][version].Nodes, id) + delete(m.records[namespace][service][version].Nodes, id) } } } @@ -131,17 +130,12 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist options := register.NewRegisterOptions(opts...) // get the services for this domain from the register - srvs, ok := m.records[options.Domain] + srvs, ok := m.records[options.Namespace] if !ok { srvs = make(services) } - // domain is set in metadata so it can be passed to watchers - if s.Metadata == nil { - s.Metadata = map[string]string{"domain": options.Domain} - } else { - s.Metadata["domain"] = options.Domain - } + s.Namespace = options.Namespace // ensure the service name exists r := serviceToRecord(s, options.TTL) @@ -154,7 +148,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new service: %s, version: %s", s.Name, s.Version)) } - m.records[options.Domain] = srvs + m.records[options.Namespace] = srvs go m.sendEvent(®ister.Result{Action: "create", Service: s}) } @@ -173,9 +167,6 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist metadata[k] = v } - // set the domain - metadata["domain"] = options.Domain - // add the node srvs[s.Name][s.Version].Nodes[n.ID] = &node{ Node: ®ister.Node{ @@ -206,7 +197,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist } } - m.records[options.Domain] = srvs + m.records[options.Namespace] = srvs return nil } @@ -216,15 +207,8 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re options := register.NewDeregisterOptions(opts...) - // domain is set in metadata so it can be passed to watchers - if s.Metadata == nil { - s.Metadata = map[string]string{"domain": options.Domain} - } else { - s.Metadata["domain"] = options.Domain - } - // if the domain doesn't exist, there is nothing to deregister - services, ok := m.records[options.Domain] + services, ok := m.records[options.Namespace] if !ok { return nil } @@ -253,7 +237,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re // if the nodes not empty, we replace the version in the store and exist, the rest of the logic // is cleanup if len(version.Nodes) > 0 { - m.records[options.Domain][s.Name][s.Version] = version + m.records[options.Namespace][s.Name][s.Version] = version go m.sendEvent(®ister.Result{Action: "update", Service: s}) return nil } @@ -261,7 +245,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re // if this version was the only version of the service, we can remove the whole service from the // register and exit if len(versions) == 1 { - delete(m.records[options.Domain], s.Name) + delete(m.records[options.Namespace], s.Name) go m.sendEvent(®ister.Result{Action: "delete", Service: s}) if m.opts.Logger.V(logger.DebugLevel) { @@ -271,7 +255,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re } // there are other versions of the service running, so only remove this version of it - delete(m.records[options.Domain][s.Name], s.Version) + delete(m.records[options.Namespace][s.Name], s.Version) go m.sendEvent(®ister.Result{Action: "delete", Service: s}) if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s, version: %s", s.Name, s.Version)) @@ -284,15 +268,15 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...registe options := register.NewLookupOptions(opts...) // if it's a wildcard domain, return from all domains - if options.Domain == register.WildcardDomain { + if options.Namespace == register.WildcardNamespace { m.RLock() recs := m.records m.RUnlock() var services []*register.Service - for domain := range recs { - srvs, err := m.LookupService(ctx, name, append(opts, register.LookupDomain(domain))...) + for namespace := range recs { + srvs, err := m.LookupService(ctx, name, append(opts, register.LookupNamespace(namespace))...) if err == register.ErrNotFound { continue } else if err != nil { @@ -311,7 +295,7 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...registe defer m.RUnlock() // check the domain exists - services, ok := m.records[options.Domain] + services, ok := m.records[options.Namespace] if !ok { return nil, register.ErrNotFound } @@ -328,7 +312,7 @@ func (m *memory) LookupService(ctx context.Context, name string, opts ...registe var i int for _, r := range versions { - result[i] = recordToService(r, options.Domain) + result[i] = recordToService(r, options.Namespace) i++ } @@ -339,15 +323,15 @@ func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption) options := register.NewListOptions(opts...) // if it's a wildcard domain, list from all domains - if options.Domain == register.WildcardDomain { + if options.Namespace == register.WildcardNamespace { m.RLock() recs := m.records m.RUnlock() var services []*register.Service - for domain := range recs { - srvs, err := m.ListServices(ctx, append(opts, register.ListDomain(domain))...) + for namespace := range recs { + srvs, err := m.ListServices(ctx, append(opts, register.ListNamespace(namespace))...) if err != nil { return nil, err } @@ -361,7 +345,7 @@ func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption) defer m.RUnlock() // ensure the domain exists - services, ok := m.records[options.Domain] + services, ok := m.records[options.Namespace] if !ok { return make([]*register.Service, 0), nil } @@ -371,7 +355,7 @@ func (m *memory) ListServices(ctx context.Context, opts ...register.ListOption) for _, service := range services { for _, version := range service { - result = append(result, recordToService(version, options.Domain)) + result = append(result, recordToService(version, options.Namespace)) } } @@ -426,16 +410,13 @@ func (m *watcher) Next() (*register.Result, error) { continue } - // extract domain from service metadata - var domain string - if r.Service.Metadata != nil && len(r.Service.Metadata["domain"]) > 0 { - domain = r.Service.Metadata["domain"] - } else { - domain = register.DefaultDomain + namespace := register.DefaultNamespace + if r.Service.Namespace != "" { + namespace = r.Service.Namespace } // only send the event if watching the wildcard or this specific domain - if m.wo.Domain == register.WildcardDomain || m.wo.Domain == domain { + if m.wo.Namespace == register.WildcardNamespace || m.wo.Namespace == namespace { return r, nil } case <-m.exit: @@ -454,11 +435,6 @@ func (m *watcher) Stop() { } func serviceToRecord(s *register.Service, ttl time.Duration) *record { - metadata := make(map[string]string, len(s.Metadata)) - for k, v := range s.Metadata { - metadata[k] = v - } - nodes := make(map[string]*node, len(s.Nodes)) for _, n := range s.Nodes { nodes[n.ID] = &node{ @@ -468,42 +444,19 @@ func serviceToRecord(s *register.Service, ttl time.Duration) *record { } } - endpoints := make([]*register.Endpoint, len(s.Endpoints)) - copy(endpoints, s.Endpoints) - return &record{ - Name: s.Name, - Version: s.Version, - Metadata: metadata, - Nodes: nodes, - Endpoints: endpoints, + Name: s.Name, + Version: s.Version, + Nodes: nodes, } } -func recordToService(r *record, domain string) *register.Service { +func recordToService(r *record, namespace string) *register.Service { metadata := make(map[string]string, len(r.Metadata)) for k, v := range r.Metadata { metadata[k] = v } - // set the domain in metadata so it can be determined when a wildcard query is performed - metadata["domain"] = domain - - endpoints := make([]*register.Endpoint, len(r.Endpoints)) - for i, e := range r.Endpoints { - md := make(map[string]string, len(e.Metadata)) - for k, v := range e.Metadata { - md[k] = v - } - - endpoints[i] = ®ister.Endpoint{ - Name: e.Name, - Request: e.Request, - Response: e.Response, - Metadata: md, - } - } - nodes := make([]*register.Node, len(r.Nodes)) i := 0 for _, n := range r.Nodes { @@ -523,8 +476,7 @@ func recordToService(r *record, domain string) *register.Service { return ®ister.Service{ Name: r.Name, Version: r.Version, - Metadata: metadata, - Endpoints: endpoints, Nodes: nodes, + Namespace: namespace, } } diff --git a/register/memory/memory_test.go b/register/memory/memory_test.go index 03928dcb..71f4cd77 100644 --- a/register/memory/memory_test.go +++ b/register/memory/memory_test.go @@ -253,32 +253,32 @@ func TestMemoryWildcard(t *testing.T) { testSrv := ®ister.Service{Name: "foo", Version: "1.0.0"} - if err := m.Register(ctx, testSrv, register.RegisterDomain("one")); err != nil { + if err := m.Register(ctx, testSrv, register.RegisterNamespace("one")); err != nil { t.Fatalf("Register err: %v", err) } - if err := m.Register(ctx, testSrv, register.RegisterDomain("two")); err != nil { + if err := m.Register(ctx, testSrv, register.RegisterNamespace("two")); err != nil { t.Fatalf("Register err: %v", err) } - if recs, err := m.ListServices(ctx, register.ListDomain("one")); err != nil { + if recs, err := m.ListServices(ctx, register.ListNamespace("one")); err != nil { t.Errorf("List err: %v", err) } else if len(recs) != 1 { t.Errorf("Expected 1 record, got %v", len(recs)) } - if recs, err := m.ListServices(ctx, register.ListDomain("*")); err != nil { + if recs, err := m.ListServices(ctx, register.ListNamespace("*")); err != nil { t.Errorf("List err: %v", err) } else if len(recs) != 2 { t.Errorf("Expected 2 records, got %v", len(recs)) } - if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("one")); err != nil { + if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupNamespace("one")); err != nil { t.Errorf("Lookup err: %v", err) } else if len(recs) != 1 { t.Errorf("Expected 1 record, got %v", len(recs)) } - if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupDomain("*")); err != nil { + if recs, err := m.LookupService(ctx, testSrv.Name, register.LookupNamespace("*")); err != nil { t.Errorf("Lookup err: %v", err) } else if len(recs) != 2 { t.Errorf("Expected 2 records, got %v", len(recs)) diff --git a/register/options.go b/register/options.go index 4e903c7a..05cacb7f 100644 --- a/register/options.go +++ b/register/options.go @@ -46,17 +46,17 @@ func NewOptions(opts ...Option) Options { // RegisterOptions holds options for register method type RegisterOptions struct { // nolint: golint,revive - Context context.Context - Domain string - TTL time.Duration - Attempts int + Context context.Context + Namespace string + TTL time.Duration + Attempts int } // NewRegisterOptions returns register options struct filled by opts func NewRegisterOptions(opts ...RegisterOption) RegisterOptions { options := RegisterOptions{ - Domain: DefaultDomain, - Context: context.Background(), + Namespace: DefaultNamespace, + Context: context.Background(), } for _, o := range opts { o(&options) @@ -72,15 +72,15 @@ type WatchOptions struct { // Other options for implementations of the interface // can be stored in a context Context context.Context - // Domain to watch - Domain string + // Namespace to watch + Namespace string } // NewWatchOptions returns watch options filled by opts func NewWatchOptions(opts ...WatchOption) WatchOptions { options := WatchOptions{ - Domain: DefaultDomain, - Context: context.Background(), + Namespace: DefaultNamespace, + Context: context.Background(), } for _, o := range opts { o(&options) @@ -91,8 +91,8 @@ func NewWatchOptions(opts ...WatchOption) WatchOptions { // DeregisterOptions holds options for deregister method type DeregisterOptions struct { Context context.Context - // Domain the service was registered in - Domain string + // Namespace the service was registered in + Namespace string // Atempts specify max attempts for deregister Attempts int } @@ -100,8 +100,8 @@ type DeregisterOptions struct { // NewDeregisterOptions returns options for deregister filled by opts func NewDeregisterOptions(opts ...DeregisterOption) DeregisterOptions { options := DeregisterOptions{ - Domain: DefaultDomain, - Context: context.Background(), + Namespace: DefaultNamespace, + Context: context.Background(), } for _, o := range opts { o(&options) @@ -112,15 +112,15 @@ func NewDeregisterOptions(opts ...DeregisterOption) DeregisterOptions { // LookupOptions holds lookup options type LookupOptions struct { Context context.Context - // Domain to scope the request to - Domain string + // Namespace to scope the request to + Namespace string } // NewLookupOptions returns lookup options filled by opts func NewLookupOptions(opts ...LookupOption) LookupOptions { options := LookupOptions{ - Domain: DefaultDomain, - Context: context.Background(), + Namespace: DefaultNamespace, + Context: context.Background(), } for _, o := range opts { o(&options) @@ -131,15 +131,15 @@ func NewLookupOptions(opts ...LookupOption) LookupOptions { // ListOptions holds the list options for list method type ListOptions struct { Context context.Context - // Domain to scope the request to - Domain string + // Namespace to scope the request to + Namespace string } // NewListOptions returns list options filled by opts func NewListOptions(opts ...ListOption) ListOptions { options := ListOptions{ - Domain: DefaultDomain, - Context: context.Background(), + Namespace: DefaultNamespace, + Context: context.Background(), } for _, o := range opts { o(&options) @@ -217,10 +217,10 @@ func RegisterContext(ctx context.Context) RegisterOption { // nolint: golint,rev } } -// RegisterDomain secifies register domain -func RegisterDomain(d string) RegisterOption { // nolint: golint,revive +// RegisterNamespace secifies register Namespace +func RegisterNamespace(d string) RegisterOption { // nolint: golint,revive return func(o *RegisterOptions) { - o.Domain = d + o.Namespace = d } } @@ -238,10 +238,10 @@ func WatchContext(ctx context.Context) WatchOption { } } -// WatchDomain sets the domain for watch -func WatchDomain(d string) WatchOption { +// WatchNamespace sets the Namespace for watch +func WatchNamespace(d string) WatchOption { return func(o *WatchOptions) { - o.Domain = d + o.Namespace = d } } @@ -259,10 +259,10 @@ func DeregisterContext(ctx context.Context) DeregisterOption { } } -// DeregisterDomain specifies deregister domain -func DeregisterDomain(d string) DeregisterOption { +// DeregisterNamespace specifies deregister Namespace +func DeregisterNamespace(d string) DeregisterOption { return func(o *DeregisterOptions) { - o.Domain = d + o.Namespace = d } } @@ -273,10 +273,10 @@ func LookupContext(ctx context.Context) LookupOption { } } -// LookupDomain sets the domain for lookup -func LookupDomain(d string) LookupOption { +// LookupNamespace sets the Namespace for lookup +func LookupNamespace(d string) LookupOption { return func(o *LookupOptions) { - o.Domain = d + o.Namespace = d } } @@ -287,10 +287,10 @@ func ListContext(ctx context.Context) ListOption { } } -// ListDomain sets the domain for list method -func ListDomain(d string) ListOption { +// ListNamespace sets the Namespace for list method +func ListNamespace(d string) ListOption { return func(o *ListOptions) { - o.Domain = d + o.Namespace = d } } diff --git a/register/register.go b/register/register.go index 7f101357..54342520 100644 --- a/register/register.go +++ b/register/register.go @@ -9,12 +9,12 @@ import ( ) const ( - // WildcardDomain indicates any domain - WildcardDomain = "*" + // WildcardNamespace indicates any Namespace + WildcardNamespace = "*" ) -// DefaultDomain to use if none was provided in options -var DefaultDomain = "micro" +// DefaultNamespace to use if none was provided in options +var DefaultNamespace = "micro" var ( // DefaultRegister is the global default register @@ -59,26 +59,17 @@ type Register interface { // Service holds service register info type Service struct { - Name string `json:"name"` - Version string `json:"version"` - Metadata metadata.Metadata `json:"metadata"` - Endpoints []*Endpoint `json:"endpoints"` - Nodes []*Node `json:"nodes"` + Name string `json:"name,omitempty"` + Version string `json:"version,omitempty"` + Nodes []*Node `json:"nodes,omitempty"` + Namespace string `json:"namespace,omitempty"` } // Node holds node register info type Node struct { - Metadata metadata.Metadata `json:"metadata"` - ID string `json:"id"` - Address string `json:"address"` -} - -// Endpoint holds endpoint register info -type Endpoint struct { - Request string `json:"request"` - Response string `json:"response"` - Metadata metadata.Metadata `json:"metadata"` - Name string `json:"name"` + Metadata metadata.Metadata `json:"metadata,omitempty"` + ID string `json:"id,omitempty"` + Address string `json:"address,omitempty"` } // Option func signature diff --git a/server/noop.go b/server/noop.go index 30a1d07b..a8ca50a6 100644 --- a/server/noop.go +++ b/server/noop.go @@ -5,7 +5,6 @@ import ( "fmt" "reflect" "runtime/debug" - "sort" "strings" "sync" "time" @@ -32,38 +31,21 @@ const ( ) type rpcHandler struct { - opts HandlerOptions - handler interface{} - name string - endpoints []*register.Endpoint + opts HandlerOptions + handler interface{} + name string } func newRPCHandler(handler interface{}, opts ...HandlerOption) Handler { options := NewHandlerOptions(opts...) - typ := reflect.TypeOf(handler) hdlr := reflect.ValueOf(handler) name := reflect.Indirect(hdlr).Type().Name() - var endpoints []*register.Endpoint - - for m := 0; m < typ.NumMethod(); m++ { - if e := register.ExtractEndpoint(typ.Method(m)); e != nil { - e.Name = name + "." + e.Name - - for k, v := range options.Metadata[e.Name] { - e.Metadata[k] = v - } - - endpoints = append(endpoints, e) - } - } - return &rpcHandler{ - name: name, - handler: handler, - endpoints: endpoints, - opts: options, + name: name, + handler: handler, + opts: options, } } @@ -75,10 +57,6 @@ func (r *rpcHandler) Handler() interface{} { return r.handler } -func (r *rpcHandler) Endpoints() []*register.Endpoint { - return r.endpoints -} - func (r *rpcHandler) Options() HandlerOptions { return r.opts } @@ -249,35 +227,6 @@ func (n *noopServer) Register() error { return err } - n.RLock() - handlerList := make([]string, 0, len(n.handlers)) - for n := range n.handlers { - handlerList = append(handlerList, n) - } - - sort.Strings(handlerList) - - subscriberList := make([]*subscriber, 0, len(n.subscribers)) - for e := range n.subscribers { - subscriberList = append(subscriberList, e) - } - sort.Slice(subscriberList, func(i, j int) bool { - return subscriberList[i].topic > subscriberList[j].topic - }) - - endpoints := make([]*register.Endpoint, 0, len(handlerList)+len(subscriberList)) - for _, h := range handlerList { - endpoints = append(endpoints, n.handlers[h].Endpoints()...) - } - for _, e := range subscriberList { - endpoints = append(endpoints, e.Endpoints()...) - } - n.RUnlock() - - service.Nodes[0].Metadata["protocol"] = "noop" - service.Nodes[0].Metadata["transport"] = service.Nodes[0].Metadata["protocol"] - service.Endpoints = endpoints - n.RLock() registered := n.registered n.RUnlock() @@ -576,7 +525,6 @@ func (n *noopServer) Stop() error { } func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber { - var endpoints []*register.Endpoint var handlers []*handler options := NewSubscriberOptions(opts...) @@ -595,18 +543,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs } handlers = append(handlers, h) - ep := ®ister.Endpoint{ - Name: "Func", - Request: register.ExtractSubValue(typ), - Metadata: metadata.New(2), - } - ep.Metadata.Set("topic", topic) - ep.Metadata.Set("subscriber", "true") - endpoints = append(endpoints, ep) } else { - hdlr := reflect.ValueOf(sub) - name := reflect.Indirect(hdlr).Type().Name() - for m := 0; m < typ.NumMethod(); m++ { method := typ.Method(m) h := &handler{ @@ -622,14 +559,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs } handlers = append(handlers, h) - ep := ®ister.Endpoint{ - Name: name + "." + method.Name, - Request: register.ExtractSubValue(method.Type), - Metadata: metadata.New(2), - } - ep.Metadata.Set("topic", topic) - ep.Metadata.Set("subscriber", "true") - endpoints = append(endpoints, ep) } } @@ -639,7 +568,6 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs topic: topic, subscriber: sub, handlers: handlers, - endpoints: endpoints, opts: options, } } @@ -766,10 +694,6 @@ func (s *subscriber) Subscriber() interface{} { return s.subscriber } -func (s *subscriber) Endpoints() []*register.Endpoint { - return s.endpoints -} - func (s *subscriber) Options() SubscriberOptions { return s.opts } @@ -780,8 +704,7 @@ type subscriber struct { typ reflect.Type subscriber interface{} - endpoints []*register.Endpoint - handlers []*handler + handlers []*handler rcvr reflect.Value opts SubscriberOptions diff --git a/server/registry.go b/server/registry.go index f9539522..fbd34f66 100644 --- a/server/registry.go +++ b/server/registry.go @@ -17,7 +17,7 @@ var ( opts := []register.RegisterOption{ register.RegisterTTL(config.RegisterTTL), - register.RegisterDomain(config.Namespace), + register.RegisterNamespace(config.Namespace), } for i := 0; i <= config.RegisterAttempts; i++ { @@ -36,7 +36,7 @@ var ( var err error opts := []register.DeregisterOption{ - register.DeregisterDomain(config.Namespace), + register.DeregisterNamespace(config.Namespace), } for i := 0; i <= config.DeregisterAttempts; i++ { @@ -82,9 +82,8 @@ func NewRegisterService(s Server) (*register.Service, error) { node.Metadata["register"] = opts.Register.String() return ®ister.Service{ - Name: opts.Name, - Version: opts.Version, - Nodes: []*register.Node{node}, - Metadata: metadata.New(0), + Name: opts.Name, + Version: opts.Version, + Nodes: []*register.Node{node}, }, nil } diff --git a/server/server.go b/server/server.go index 232ed22b..6d7b0d5b 100644 --- a/server/server.go +++ b/server/server.go @@ -7,7 +7,6 @@ import ( "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/metadata" - "go.unistack.org/micro/v3/register" ) // DefaultServer default server @@ -170,7 +169,6 @@ type Stream interface { type Handler interface { Name() string Handler() interface{} - Endpoints() []*register.Endpoint Options() HandlerOptions } @@ -180,6 +178,5 @@ type Handler interface { type Subscriber interface { Topic() string Subscriber() interface{} - Endpoints() []*register.Endpoint Options() SubscriberOptions } diff --git a/util/register/util.go b/util/register/util.go index f047b989..7163a634 100644 --- a/util/register/util.go +++ b/util/register/util.go @@ -71,14 +71,6 @@ func CopyService(service *register.Service) *register.Service { } s.Nodes = nodes - // copy endpoints - eps := make([]*register.Endpoint, len(service.Endpoints)) - for j, ep := range service.Endpoints { - e := ®ister.Endpoint{} - *e = *ep - eps[j] = e - } - s.Endpoints = eps return s } From f39de15d93f22a37e9c73e5e75a8e07288409082 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 27 Dec 2024 01:12:29 +0300 Subject: [PATCH 02/19] fixup Signed-off-by: Vasiliy Tolstov --- logger/slog/slog_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/logger/slog/slog_test.go b/logger/slog/slog_test.go index d527110c..8f87b226 100644 --- a/logger/slog/slog_test.go +++ b/logger/slog/slog_test.go @@ -32,7 +32,8 @@ func TestTime(t *testing.T) { l.Error(ctx, "msg1", errors.New("err")) - if !bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T03:00:00.000000000+03:00`)) { + if !bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T03:00:00.000000000+03:00`)) && + !bytes.Contains(buf.Bytes(), []byte(`timestamp=1970-01-01T00:00:00.000000000Z`)) { t.Fatalf("logger error not works, buf contains: %s", buf.Bytes()) } } From 74e31d99f6d6cd375fe7acbb9636e6b44e9bdcd5 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 27 Dec 2024 01:16:22 +0300 Subject: [PATCH 03/19] fixup Signed-off-by: Vasiliy Tolstov --- logger/slog/slog_test.go | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/logger/slog/slog_test.go b/logger/slog/slog_test.go index 8f87b226..e8f4cd43 100644 --- a/logger/slog/slog_test.go +++ b/logger/slog/slog_test.go @@ -16,6 +16,25 @@ import ( "go.unistack.org/micro/v3/metadata" ) +// always first to have proper check +func TestStacktrace(t *testing.T) { + ctx := context.TODO() + buf := bytes.NewBuffer(nil) + l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(buf), + WithHandlerFunc(slog.NewTextHandler), + logger.WithAddStacktrace(true), + ) + if err := l.Init(logger.WithFields("key1", "val1")); err != nil { + t.Fatal(err) + } + + l.Error(ctx, "msg1", errors.New("err")) + + if !bytes.Contains(buf.Bytes(), []byte(`slog_test.go:31`)) { + t.Fatalf("logger error not works, buf contains: %s", buf.Bytes()) + } +} + func TestTime(t *testing.T) { ctx := context.TODO() buf := bytes.NewBuffer(nil) @@ -38,24 +57,6 @@ func TestTime(t *testing.T) { } } -func TestStacktrace(t *testing.T) { - ctx := context.TODO() - buf := bytes.NewBuffer(nil) - l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(buf), - WithHandlerFunc(slog.NewTextHandler), - logger.WithAddStacktrace(true), - ) - if err := l.Init(logger.WithFields("key1", "val1")); err != nil { - t.Fatal(err) - } - - l.Error(ctx, "msg1", errors.New("err")) - - if !bytes.Contains(buf.Bytes(), []byte(`slog_test.go:51`)) { - t.Fatalf("logger error not works, buf contains: %s", buf.Bytes()) - } -} - func TestWithFields(t *testing.T) { ctx := context.TODO() buf := bytes.NewBuffer(nil) From f8c5e10c1d309dc36b31a7c9090702f6e3990bf7 Mon Sep 17 00:00:00 2001 From: vtolstov Date: Thu, 26 Dec 2024 22:21:35 +0000 Subject: [PATCH 04/19] Apply Code Coverage Badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1ed038e3..0999e7dc 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Micro -![Coverage](https://img.shields.io/badge/Coverage-45.1%25-yellow) +![Coverage](https://img.shields.io/badge/Coverage-44.6%25-yellow) Micro is a standard library for microservices. From b8232e02befdaaedb5c497efe810db240ced73f0 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 27 Dec 2024 19:12:57 +0300 Subject: [PATCH 05/19] register: add ListName option Signed-off-by: Vasiliy Tolstov --- register/options.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/register/options.go b/register/options.go index 05cacb7f..a0d13f64 100644 --- a/register/options.go +++ b/register/options.go @@ -130,9 +130,12 @@ func NewLookupOptions(opts ...LookupOption) LookupOptions { // ListOptions holds the list options for list method type ListOptions struct { + // Context used to store additional options Context context.Context // Namespace to scope the request to Namespace string + // Name filter services by name + Name string } // NewListOptions returns list options filled by opts @@ -294,6 +297,13 @@ func ListNamespace(d string) ListOption { } } +// ListName sets the name for list method to filter needed services +func ListName(n string) ListOption { + return func(o *ListOptions) { + o.Name = n + } +} + // Name sets the name func Name(n string) Option { return func(o *Options) { From 470263ff5fdda58ec7f8bc11a97d1d27235a8076 Mon Sep 17 00:00:00 2001 From: vtolstov Date: Fri, 27 Dec 2024 16:14:00 +0000 Subject: [PATCH 06/19] Apply Code Coverage Badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0999e7dc..82a4992c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Micro -![Coverage](https://img.shields.io/badge/Coverage-44.6%25-yellow) +![Coverage](https://img.shields.io/badge/Coverage-44.8%25-yellow) Micro is a standard library for microservices. From 277f04ba19c792f562d90617fce3042546e24835 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 27 Dec 2024 19:33:46 +0300 Subject: [PATCH 07/19] register: add Codec option Signed-off-by: Vasiliy Tolstov --- register/options.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/register/options.go b/register/options.go index a0d13f64..d46c4ced 100644 --- a/register/options.go +++ b/register/options.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "time" + "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v3/tracer" @@ -26,6 +27,8 @@ type Options struct { Name string // Addrs specifies register addrs Addrs []string + // Codec used to marshal/unmarshal data in register + Codec codec.Codec // Timeout specifies timeout Timeout time.Duration } @@ -37,6 +40,7 @@ func NewOptions(opts ...Option) Options { Meter: meter.DefaultMeter, Tracer: tracer.DefaultTracer, Context: context.Background(), + Codec: codec.NewCodec(), } for _, o := range opts { o(&options) @@ -310,3 +314,11 @@ func Name(n string) Option { o.Name = n } } + +type codecKey struct{} + +func Codec(c codec.Codec) Option { + return func(o *Options) { + o.Codec = c + } +} From 06da500ef4ebfc6fc13139b078dd32b046ceff51 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 27 Dec 2024 23:56:27 +0300 Subject: [PATCH 08/19] register: cleanup Signed-off-by: Vasiliy Tolstov --- register/options.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/register/options.go b/register/options.go index d46c4ced..c2f192ef 100644 --- a/register/options.go +++ b/register/options.go @@ -138,8 +138,6 @@ type ListOptions struct { Context context.Context // Namespace to scope the request to Namespace string - // Name filter services by name - Name string } // NewListOptions returns list options filled by opts @@ -301,13 +299,6 @@ func ListNamespace(d string) ListOption { } } -// ListName sets the name for list method to filter needed services -func ListName(n string) ListOption { - return func(o *ListOptions) { - o.Name = n - } -} - // Name sets the name func Name(n string) Option { return func(o *Options) { @@ -315,8 +306,6 @@ func Name(n string) Option { } } -type codecKey struct{} - func Codec(c codec.Codec) Option { return func(o *Options) { o.Codec = c From a3e8ab249281995b43a350a0fd8dad098bfa3fa5 Mon Sep 17 00:00:00 2001 From: vtolstov Date: Fri, 27 Dec 2024 20:57:08 +0000 Subject: [PATCH 09/19] Apply Code Coverage Badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 82a4992c..0999e7dc 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Micro -![Coverage](https://img.shields.io/badge/Coverage-44.8%25-yellow) +![Coverage](https://img.shields.io/badge/Coverage-44.6%25-yellow) Micro is a standard library for microservices. From a00cf2c8d95ec430ddbb8c7527acbddad53741ea Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 28 Dec 2024 14:51:01 +0300 Subject: [PATCH 10/19] register: watcher fixes Signed-off-by: Vasiliy Tolstov --- register/watcher.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/register/watcher.go b/register/watcher.go index 7c9feb2c..756a46a5 100644 --- a/register/watcher.go +++ b/register/watcher.go @@ -15,31 +15,31 @@ type Watcher interface { // the watcher. Actions can be create, update, delete type Result struct { // Service holds register service - Service *Service + Service *Service `json:"service,omitempty"` // Action holds the action - Action string + Action EventType `json:"action,omitempty"` } // EventType defines register event type type EventType int const ( - // Create is emitted when a new service is registered - Create EventType = iota - // Delete is emitted when an existing service is deregistered - Delete - // Update is emitted when an existing service is updated - Update + // EventCreate is emitted when a new service is registered + EventCreate EventType = iota + // EventDelete is emitted when an existing service is deregistered + EventDelete + // EventUpdate is emitted when an existing service is updated + EventUpdate ) // String returns human readable event type func (t EventType) String() string { switch t { - case Create: + case EventCreate: return "create" - case Delete: + case EventDelete: return "delete" - case Update: + case EventUpdate: return "update" default: return "unknown" @@ -49,11 +49,11 @@ func (t EventType) String() string { // Event is register event type Event struct { // Timestamp is event timestamp - Timestamp time.Time + Timestamp time.Time `json:"timestamp,omitempty"` // Service is register service - Service *Service + Service *Service `json:"service,omitempty"` // ID is register id - ID string + ID string `json:"id,omitempty"` // Type defines type of event - Type EventType + Type EventType `json:"type,omitempty"` } From d7dd6fbeb200b60036149d00b94d28cfa1b8fc28 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 28 Dec 2024 14:55:20 +0300 Subject: [PATCH 11/19] register/memory: fix build Signed-off-by: Vasiliy Tolstov --- register/memory/memory.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/register/memory/memory.go b/register/memory/memory.go index 28d309f9..f0bcf7f8 100644 --- a/register/memory/memory.go +++ b/register/memory/memory.go @@ -149,7 +149,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new service: %s, version: %s", s.Name, s.Version)) } m.records[options.Namespace] = srvs - go m.sendEvent(®ister.Result{Action: "create", Service: s}) + go m.sendEvent(®ister.Result{Action: register.EventCreate, Service: s}) } var addedNodes bool @@ -185,7 +185,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new node to service: %s, version: %s", s.Name, s.Version)) } - go m.sendEvent(®ister.Result{Action: "update", Service: s}) + go m.sendEvent(®ister.Result{Action: register.EventUpdate, Service: s}) } else { // refresh TTL and timestamp for _, n := range s.Nodes { @@ -238,7 +238,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re // is cleanup if len(version.Nodes) > 0 { m.records[options.Namespace][s.Name][s.Version] = version - go m.sendEvent(®ister.Result{Action: "update", Service: s}) + go m.sendEvent(®ister.Result{Action: register.EventUpdate, Service: s}) return nil } @@ -246,7 +246,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re // register and exit if len(versions) == 1 { delete(m.records[options.Namespace], s.Name) - go m.sendEvent(®ister.Result{Action: "delete", Service: s}) + go m.sendEvent(®ister.Result{Action: register.EventDelete, Service: s}) if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s", s.Name)) @@ -256,7 +256,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re // there are other versions of the service running, so only remove this version of it delete(m.records[options.Namespace][s.Name], s.Version) - go m.sendEvent(®ister.Result{Action: "delete", Service: s}) + go m.sendEvent(®ister.Result{Action: register.EventDelete, Service: s}) if m.opts.Logger.V(logger.DebugLevel) { m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s, version: %s", s.Name, s.Version)) } From 558c6f4d7ca9ef549a4d00f4e358faae842e3452 Mon Sep 17 00:00:00 2001 From: vtolstov Date: Sat, 28 Dec 2024 11:56:07 +0000 Subject: [PATCH 12/19] Apply Code Coverage Badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0999e7dc..82a4992c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Micro -![Coverage](https://img.shields.io/badge/Coverage-44.6%25-yellow) +![Coverage](https://img.shields.io/badge/Coverage-44.8%25-yellow) Micro is a standard library for microservices. From 653bd386cc3d3b7259b218d26feea80301c503ff Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 29 Dec 2024 01:52:53 +0300 Subject: [PATCH 13/19] util/buffer: add DelayedBuffer Signed-off-by: Vasiliy Tolstov --- util/buf/buf.go | 27 ------------ util/buffer/buffer.go | 85 ++++++++++++++++++++++++++++++++++++++ util/buffer/buffer_test.go | 22 ++++++++++ 3 files changed, 107 insertions(+), 27 deletions(-) delete mode 100644 util/buf/buf.go create mode 100644 util/buffer/buffer.go create mode 100644 util/buffer/buffer_test.go diff --git a/util/buf/buf.go b/util/buf/buf.go deleted file mode 100644 index e6b86214..00000000 --- a/util/buf/buf.go +++ /dev/null @@ -1,27 +0,0 @@ -package buf - -import ( - "bytes" - "io" -) - -var _ io.Closer = &Buffer{} - -// Buffer bytes.Buffer wrapper to satisfie io.Closer interface -type Buffer struct { - *bytes.Buffer -} - -// Close reset buffer contents -func (b *Buffer) Close() error { - b.Buffer.Reset() - return nil -} - -// New creates new buffer that satisfies Closer interface -func New(b *bytes.Buffer) *Buffer { - if b == nil { - b = bytes.NewBuffer(nil) - } - return &Buffer{b} -} diff --git a/util/buffer/buffer.go b/util/buffer/buffer.go new file mode 100644 index 00000000..8d93fc3b --- /dev/null +++ b/util/buffer/buffer.go @@ -0,0 +1,85 @@ +package buffer + +import ( + "io" + "sync" + "time" +) + +var _ io.WriteCloser = (*DelayedBuffer)(nil) + +// DelayedBuffer is the buffer that holds items until either the buffer filled or a specified time limit is reached +type DelayedBuffer struct { + mu sync.Mutex + maxWait time.Duration + flushTime time.Time + buffer chan []byte + ticker *time.Ticker + w io.Writer + err error +} + +func NewDelayedBuffer(size int, maxWait time.Duration, w io.Writer) *DelayedBuffer { + b := &DelayedBuffer{ + buffer: make(chan []byte, size), + ticker: time.NewTicker(maxWait), + w: w, + flushTime: time.Now(), + maxWait: maxWait, + } + b.loop() + return b +} + +func (b *DelayedBuffer) loop() { + go func() { + for range b.ticker.C { + b.mu.Lock() + if time.Since(b.flushTime) > b.maxWait { + b.flush() + } + b.mu.Unlock() + } + }() +} + +func (b *DelayedBuffer) flush() { + bufLen := len(b.buffer) + if bufLen > 0 { + tmp := make([][]byte, bufLen) + for i := 0; i < bufLen; i++ { + tmp[i] = <-b.buffer + } + for _, t := range tmp { + _, b.err = b.w.Write(t) + } + b.flushTime = time.Now() + } +} + +func (b *DelayedBuffer) Put(items ...[]byte) { + b.mu.Lock() + for _, item := range items { + select { + case b.buffer <- item: + default: + b.flush() + b.buffer <- item + } + } + b.mu.Unlock() +} + +func (b *DelayedBuffer) Close() error { + b.mu.Lock() + b.flush() + close(b.buffer) + b.ticker.Stop() + b.mu.Unlock() + return b.err +} + +func (b *DelayedBuffer) Write(data []byte) (int, error) { + b.Put(data) + return len(data), b.err +} diff --git a/util/buffer/buffer_test.go b/util/buffer/buffer_test.go new file mode 100644 index 00000000..8285a82d --- /dev/null +++ b/util/buffer/buffer_test.go @@ -0,0 +1,22 @@ +package buffer + +import ( + "bytes" + "testing" + "time" +) + +func TestTimedBuffer(t *testing.T) { + buf := bytes.NewBuffer(nil) + b := NewDelayedBuffer(100, 300*time.Millisecond, buf) + for i := 0; i < 100; i++ { + _, _ = b.Write([]byte(`test`)) + } + if buf.Len() != 0 { + t.Fatal("delayed write not worked") + } + time.Sleep(400 * time.Millisecond) + if buf.Len() == 0 { + t.Fatal("delayed write not worked") + } +} From bd55a35dc3f6b8787604d85203661459843bb041 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 29 Dec 2024 01:57:37 +0300 Subject: [PATCH 14/19] logger/slog: add delayed buffer test Signed-off-by: Vasiliy Tolstov --- logger/slog/slog_test.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/logger/slog/slog_test.go b/logger/slog/slog_test.go index e8f4cd43..deb87eda 100644 --- a/logger/slog/slog_test.go +++ b/logger/slog/slog_test.go @@ -14,6 +14,7 @@ import ( "github.com/google/uuid" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/metadata" + "go.unistack.org/micro/v3/util/buffer" ) // always first to have proper check @@ -30,11 +31,30 @@ func TestStacktrace(t *testing.T) { l.Error(ctx, "msg1", errors.New("err")) - if !bytes.Contains(buf.Bytes(), []byte(`slog_test.go:31`)) { + if !bytes.Contains(buf.Bytes(), []byte(`slog_test.go:32`)) { t.Fatalf("logger error not works, buf contains: %s", buf.Bytes()) } } +func TestDelayedBuffer(t *testing.T) { + ctx := context.TODO() + buf := bytes.NewBuffer(nil) + dbuf := buffer.NewDelayedBuffer(100, 100*time.Millisecond, buf) + l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(dbuf), + WithHandlerFunc(slog.NewTextHandler), + logger.WithAddStacktrace(true), + ) + if err := l.Init(logger.WithFields("key1", "val1")); err != nil { + t.Fatal(err) + } + + l.Error(ctx, "msg1", errors.New("err")) + time.Sleep(120 * time.Millisecond) + if !bytes.Contains(buf.Bytes(), []byte(`key1=val1`)) { + t.Fatalf("logger delayed buffer not works, buf contains: %s", buf.Bytes()) + } +} + func TestTime(t *testing.T) { ctx := context.TODO() buf := bytes.NewBuffer(nil) From 27fa6e91736b8ed00354fc4ceb49b72d270ccb0f Mon Sep 17 00:00:00 2001 From: vtolstov Date: Sat, 28 Dec 2024 22:58:19 +0000 Subject: [PATCH 15/19] Apply Code Coverage Badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 82a4992c..35d52391 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ # Micro -![Coverage](https://img.shields.io/badge/Coverage-44.8%25-yellow) +![Coverage](https://img.shields.io/badge/Coverage-44.7%25-yellow) Micro is a standard library for microservices. From 5df8f83f450b8a3ad73ef6c10cb9fcd988c21378 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 4 Jan 2025 18:53:57 +0300 Subject: [PATCH 16/19] badges (#392) Signed-off-by: Vasiliy Tolstov Co-authored-by: vtolstov Reviewed-on: https://git.unistack.org/unistack-org/micro/pulls/392 Co-authored-by: Vasiliy Tolstov Co-committed-by: Vasiliy Tolstov --- .gitea/workflows/job_coverage.yml | 8 ++++---- README.md | 8 ++++++-- util/time/duration_test.go | 18 +++++++++--------- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/.gitea/workflows/job_coverage.yml b/.gitea/workflows/job_coverage.yml index e176edca..aa281dc2 100644 --- a/.gitea/workflows/job_coverage.yml +++ b/.gitea/workflows/job_coverage.yml @@ -26,24 +26,24 @@ jobs: - name: test coverage run: | - go test -v -cover ./... -coverprofile coverage.out -coverpkg ./... + go test -v -cover ./... -covermode=count -coverprofile coverage.out -coverpkg ./... go tool cover -func coverage.out -o coverage.out - name: coverage badge - uses: tj-actions/coverage-badge-go@v1 + uses: tj-actions/coverage-badge-go@v2 with: green: 80 filename: coverage.out - uses: stefanzweifel/git-auto-commit-action@v4 - id: auto-commit-action + name: autocommit with: commit_message: Apply Code Coverage Badge skip_fetch: true skip_checkout: true file_pattern: ./README.md - - name: Push Changes + - name: push if: steps.auto-commit-action.outputs.changes_detected == 'true' uses: ad-m/github-push-action@master with: diff --git a/README.md b/README.md index 35d52391..85f7c5ec 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ -# Micro -![Coverage](https://img.shields.io/badge/Coverage-44.7%25-yellow) +# Micro +[![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) +[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/unistack-org/micro/v3?tab=overview) +[![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v3)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av3+event%3Apush) +[![Lint](https://goreportcard.com/badge/go.unistack.org/micro/v3)](https://goreportcard.com/report/go.unistack.org/micro/v3) +![Coverage](https://img.shields.io/badge/Coverage-44.6%25-yellow) Micro is a standard library for microservices. diff --git a/util/time/duration_test.go b/util/time/duration_test.go index af5f1434..97b3997e 100644 --- a/util/time/duration_test.go +++ b/util/time/duration_test.go @@ -35,11 +35,11 @@ func TestUnmarshalYAML(t *testing.T) { t.Fatalf("invalid duration %v != 10000000", v.TTL) } - err = yaml.Unmarshal([]byte(`{"ttl":"1y"}`), v) + err = yaml.Unmarshal([]byte(`{"ttl":"1d"}`), v) if err != nil { t.Fatal(err) - } else if *(v.TTL) != 31622400000000000 { - t.Fatalf("invalid duration %v != 31622400000000000", v.TTL) + } else if *(v.TTL) != 86400000000000 { + t.Fatalf("invalid duration %v != 86400000000000", *v.TTL) } } @@ -68,11 +68,11 @@ func TestUnmarshalJSON(t *testing.T) { t.Fatalf("invalid duration %v != 10000000", v.TTL) } - err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v) + err = json.Unmarshal([]byte(`{"ttl":"1d"}`), v) if err != nil { t.Fatal(err) - } else if v.TTL != 31622400000000000 { - t.Fatalf("invalid duration %v != 31622400000000000", v.TTL) + } else if v.TTL != 86400000000000 { + t.Fatalf("invalid duration %v != 86400000000000", v.TTL) } } @@ -87,11 +87,11 @@ func TestParseDuration(t *testing.T) { if td.String() != "340h0m0s" { t.Fatalf("ParseDuration 14d != 340h0m0s : %s", td.String()) } - td, err = ParseDuration("1y") + td, err = ParseDuration("1d") if err != nil { t.Fatalf("ParseDuration error: %v", err) } - if td.String() != "8784h0m0s" { - t.Fatalf("ParseDuration 1y != 8784h0m0s : %s", td.String()) + if td.String() != "24h0m0s" { + t.Fatalf("ParseDuration 1d != 24h0m0s : %s", td.String()) } } From fcc4faff8ae6ac52f71f5fb11d4ebdb2e95799dd Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 4 Jan 2025 18:57:02 +0300 Subject: [PATCH 17/19] fix godoc link Signed-off-by: Vasiliy Tolstov --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 85f7c5ec..a0a7d712 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) -[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/github.com/unistack-org/micro/v3?tab=overview) +[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v3?tab=overview) [![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v3)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av3+event%3Apush) [![Lint](https://goreportcard.com/badge/go.unistack.org/micro/v3)](https://goreportcard.com/report/go.unistack.org/micro/v3) ![Coverage](https://img.shields.io/badge/Coverage-44.6%25-yellow) From 29d956e74e9ed9e2261c0ea08eb563fe03313cb3 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sat, 4 Jan 2025 19:09:50 +0300 Subject: [PATCH 18/19] fix readme Signed-off-by: Vasiliy Tolstov --- README.md | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index a0a7d712..aead4305 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v3?tab=overview) [![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v3)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av3+event%3Apush) [![Lint](https://goreportcard.com/badge/go.unistack.org/micro/v3)](https://goreportcard.com/report/go.unistack.org/micro/v3) -![Coverage](https://img.shields.io/badge/Coverage-44.6%25-yellow) +![Coverage](https://img.shields.io/badge/coverage-44.6%25-yellow) Micro is a standard library for microservices. @@ -15,30 +15,20 @@ Micro provides the core requirements for distributed systems development includi Micro abstracts away the details of distributed systems. Here are the main features. -- **Authentication** - Auth is built in as a first class citizen. Authentication and authorization enable secure -zero trust networking by providing every service an identity and certificates. This additionally includes rule -based access control. - - **Dynamic Config** - Load and hot reload dynamic config from anywhere. The config interface provides a way to load application -level config from any source such as env vars, file, etcd. You can merge the sources and even define fallbacks. +level config from any source such as env vars, cmdline, file, consul, vault... You can merge the sources and even define fallbacks. - **Data Storage** - A simple data store interface to read, write and delete records. It includes support for memory, file and -CockroachDB by default. State and persistence becomes a core requirement beyond prototyping and Micro looks to build that into the framework. +s3. State and persistence becomes a core requirement beyond prototyping and Micro looks to build that into the framework. - **Service Discovery** - Automatic service registration and name resolution. Service discovery is at the core of micro service development. When service A needs to speak to service B it needs the location of that service. -- **Load Balancing** - Client side load balancing built on service discovery. Once we have the addresses of any number of instances -of a service we now need a way to decide which node to route to. We use random hashed load balancing to provide even distribution -across the services and retry a different node if there's a problem. - - **Message Encoding** - Dynamic message encoding based on content-type. The client and server will use codecs along with content-type to seamlessly encode and decode Go types for you. Any variety of messages could be encoded and sent from different clients. The client and server handle this by default. -- **Transport** - gRPC or http based request/response with support for bidirectional streaming. We provide an abstraction for synchronous communication. A request made to a service will be automatically resolved, load balanced, dialled and streamed. - -- **Async Messaging** - PubSub is built in as a first class citizen for asynchronous communication and event driven architectures. +- **Async Messaging** - Pub/Sub is built in as a first class citizen for asynchronous communication and event driven architectures. Event notifications are a core pattern in micro service development. - **Synchronization** - Distributed systems are often built in an eventually consistent manner. Support for distributed locking and @@ -47,10 +37,6 @@ leadership are built in as a Sync interface. When using an eventually consistent - **Pluggable Interfaces** - Micro makes use of Go interfaces for each system abstraction. Because of this these interfaces are pluggable and allows Micro to be runtime agnostic. -## Getting Started - -To be created. - ## License Micro is Apache 2.0 licensed. From d846044fc69d31f887aa3378d482491f102bada4 Mon Sep 17 00:00:00 2001 From: vtolstov Date: Sat, 4 Jan 2025 16:10:26 +0000 Subject: [PATCH 19/19] Apply Code Coverage Badge --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index aead4305..3fe5d78a 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ # Micro +![Coverage](https://img.shields.io/badge/Coverage-44.8%25-yellow) [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v3?tab=overview) [![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v3)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av3+event%3Apush)