Compare commits

..

1 Commits
v3 ... register

Author SHA1 Message Date
8db52549e4 register: add ListName option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-27 19:10:46 +03:00
11 changed files with 90 additions and 182 deletions

View File

@ -26,24 +26,24 @@ jobs:
- name: test coverage - name: test coverage
run: | run: |
go test -v -cover ./... -covermode=count -coverprofile coverage.out -coverpkg ./... go test -v -cover ./... -coverprofile coverage.out -coverpkg ./...
go tool cover -func coverage.out -o coverage.out go tool cover -func coverage.out -o coverage.out
- name: coverage badge - name: coverage badge
uses: tj-actions/coverage-badge-go@v2 uses: tj-actions/coverage-badge-go@v1
with: with:
green: 80 green: 80
filename: coverage.out filename: coverage.out
- uses: stefanzweifel/git-auto-commit-action@v4 - uses: stefanzweifel/git-auto-commit-action@v4
name: autocommit id: auto-commit-action
with: with:
commit_message: Apply Code Coverage Badge commit_message: Apply Code Coverage Badge
skip_fetch: true skip_fetch: true
skip_checkout: true skip_checkout: true
file_pattern: ./README.md file_pattern: ./README.md
- name: push - name: Push Changes
if: steps.auto-commit-action.outputs.changes_detected == 'true' if: steps.auto-commit-action.outputs.changes_detected == 'true'
uses: ad-m/github-push-action@master uses: ad-m/github-push-action@master
with: with:

View File

@ -1,9 +1,5 @@
# Micro # Micro
![Coverage](https://img.shields.io/badge/Coverage-44.6%25-yellow) ![Coverage](https://img.shields.io/badge/Coverage-45.1%25-yellow)
[![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v3?tab=overview)
[![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v3)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av3+event%3Apush)
[![Lint](https://goreportcard.com/badge/go.unistack.org/micro/v3)](https://goreportcard.com/report/go.unistack.org/micro/v3)
Micro is a standard library for microservices. Micro is a standard library for microservices.
@ -15,20 +11,30 @@ Micro provides the core requirements for distributed systems development includi
Micro abstracts away the details of distributed systems. Here are the main features. Micro abstracts away the details of distributed systems. Here are the main features.
- **Authentication** - Auth is built in as a first class citizen. Authentication and authorization enable secure
zero trust networking by providing every service an identity and certificates. This additionally includes rule
based access control.
- **Dynamic Config** - Load and hot reload dynamic config from anywhere. The config interface provides a way to load application - **Dynamic Config** - Load and hot reload dynamic config from anywhere. The config interface provides a way to load application
level config from any source such as env vars, cmdline, file, consul, vault... You can merge the sources and even define fallbacks. level config from any source such as env vars, file, etcd. You can merge the sources and even define fallbacks.
- **Data Storage** - A simple data store interface to read, write and delete records. It includes support for memory, file and - **Data Storage** - A simple data store interface to read, write and delete records. It includes support for memory, file and
s3. State and persistence becomes a core requirement beyond prototyping and Micro looks to build that into the framework. CockroachDB by default. State and persistence becomes a core requirement beyond prototyping and Micro looks to build that into the framework.
- **Service Discovery** - Automatic service registration and name resolution. Service discovery is at the core of micro service - **Service Discovery** - Automatic service registration and name resolution. Service discovery is at the core of micro service
development. When service A needs to speak to service B it needs the location of that service. development. When service A needs to speak to service B it needs the location of that service.
- **Load Balancing** - Client side load balancing built on service discovery. Once we have the addresses of any number of instances
of a service we now need a way to decide which node to route to. We use random hashed load balancing to provide even distribution
across the services and retry a different node if there's a problem.
- **Message Encoding** - Dynamic message encoding based on content-type. The client and server will use codecs along with content-type - **Message Encoding** - Dynamic message encoding based on content-type. The client and server will use codecs along with content-type
to seamlessly encode and decode Go types for you. Any variety of messages could be encoded and sent from different clients. The client to seamlessly encode and decode Go types for you. Any variety of messages could be encoded and sent from different clients. The client
and server handle this by default. and server handle this by default.
- **Async Messaging** - Pub/Sub is built in as a first class citizen for asynchronous communication and event driven architectures. - **Transport** - gRPC or http based request/response with support for bidirectional streaming. We provide an abstraction for synchronous communication. A request made to a service will be automatically resolved, load balanced, dialled and streamed.
- **Async Messaging** - PubSub is built in as a first class citizen for asynchronous communication and event driven architectures.
Event notifications are a core pattern in micro service development. Event notifications are a core pattern in micro service development.
- **Synchronization** - Distributed systems are often built in an eventually consistent manner. Support for distributed locking and - **Synchronization** - Distributed systems are often built in an eventually consistent manner. Support for distributed locking and
@ -37,6 +43,10 @@ leadership are built in as a Sync interface. When using an eventually consistent
- **Pluggable Interfaces** - Micro makes use of Go interfaces for each system abstraction. Because of this these interfaces - **Pluggable Interfaces** - Micro makes use of Go interfaces for each system abstraction. Because of this these interfaces
are pluggable and allows Micro to be runtime agnostic. are pluggable and allows Micro to be runtime agnostic.
## Getting Started
To be created.
## License ## License
Micro is Apache 2.0 licensed. Micro is Apache 2.0 licensed.

View File

@ -588,6 +588,7 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO
for _, p := range ps { for _, p := range ps {
md := metadata.Copy(omd) md := metadata.Copy(omd)
md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic() topic := p.Topic()
if len(exchange) > 0 { if len(exchange) > 0 {
topic = exchange topic = exchange
@ -599,8 +600,6 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO
md.Set(k, v) md.Set(k, v)
} }
md[metadata.HeaderContentType] = p.ContentType()
var body []byte var body []byte
// passed in raw data // passed in raw data

View File

@ -14,7 +14,6 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/util/buffer"
) )
// always first to have proper check // always first to have proper check
@ -31,30 +30,11 @@ func TestStacktrace(t *testing.T) {
l.Error(ctx, "msg1", errors.New("err")) l.Error(ctx, "msg1", errors.New("err"))
if !bytes.Contains(buf.Bytes(), []byte(`slog_test.go:32`)) { if !bytes.Contains(buf.Bytes(), []byte(`slog_test.go:31`)) {
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes()) t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
} }
} }
func TestDelayedBuffer(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
dbuf := buffer.NewDelayedBuffer(100, 100*time.Millisecond, buf)
l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(dbuf),
WithHandlerFunc(slog.NewTextHandler),
logger.WithAddStacktrace(true),
)
if err := l.Init(logger.WithFields("key1", "val1")); err != nil {
t.Fatal(err)
}
l.Error(ctx, "msg1", errors.New("err"))
time.Sleep(120 * time.Millisecond)
if !bytes.Contains(buf.Bytes(), []byte(`key1=val1`)) {
t.Fatalf("logger delayed buffer not works, buf contains: %s", buf.Bytes())
}
}
func TestTime(t *testing.T) { func TestTime(t *testing.T) {
ctx := context.TODO() ctx := context.TODO()
buf := bytes.NewBuffer(nil) buf := bytes.NewBuffer(nil)

View File

@ -149,7 +149,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new service: %s, version: %s", s.Name, s.Version)) m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new service: %s, version: %s", s.Name, s.Version))
} }
m.records[options.Namespace] = srvs m.records[options.Namespace] = srvs
go m.sendEvent(&register.Result{Action: register.EventCreate, Service: s}) go m.sendEvent(&register.Result{Action: "create", Service: s})
} }
var addedNodes bool var addedNodes bool
@ -185,7 +185,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
if m.opts.Logger.V(logger.DebugLevel) { if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new node to service: %s, version: %s", s.Name, s.Version)) m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new node to service: %s, version: %s", s.Name, s.Version))
} }
go m.sendEvent(&register.Result{Action: register.EventUpdate, Service: s}) go m.sendEvent(&register.Result{Action: "update", Service: s})
} else { } else {
// refresh TTL and timestamp // refresh TTL and timestamp
for _, n := range s.Nodes { for _, n := range s.Nodes {
@ -238,7 +238,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
// is cleanup // is cleanup
if len(version.Nodes) > 0 { if len(version.Nodes) > 0 {
m.records[options.Namespace][s.Name][s.Version] = version m.records[options.Namespace][s.Name][s.Version] = version
go m.sendEvent(&register.Result{Action: register.EventUpdate, Service: s}) go m.sendEvent(&register.Result{Action: "update", Service: s})
return nil return nil
} }
@ -246,7 +246,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
// register and exit // register and exit
if len(versions) == 1 { if len(versions) == 1 {
delete(m.records[options.Namespace], s.Name) delete(m.records[options.Namespace], s.Name)
go m.sendEvent(&register.Result{Action: register.EventDelete, Service: s}) go m.sendEvent(&register.Result{Action: "delete", Service: s})
if m.opts.Logger.V(logger.DebugLevel) { if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s", s.Name)) m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s", s.Name))
@ -256,7 +256,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
// there are other versions of the service running, so only remove this version of it // there are other versions of the service running, so only remove this version of it
delete(m.records[options.Namespace][s.Name], s.Version) delete(m.records[options.Namespace][s.Name], s.Version)
go m.sendEvent(&register.Result{Action: register.EventDelete, Service: s}) go m.sendEvent(&register.Result{Action: "delete", Service: s})
if m.opts.Logger.V(logger.DebugLevel) { if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s, version: %s", s.Name, s.Version)) m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s, version: %s", s.Name, s.Version))
} }

View File

@ -5,7 +5,6 @@ import (
"crypto/tls" "crypto/tls"
"time" "time"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/meter" "go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/tracer" "go.unistack.org/micro/v3/tracer"
@ -27,8 +26,6 @@ type Options struct {
Name string Name string
// Addrs specifies register addrs // Addrs specifies register addrs
Addrs []string Addrs []string
// Codec used to marshal/unmarshal data in register
Codec codec.Codec
// Timeout specifies timeout // Timeout specifies timeout
Timeout time.Duration Timeout time.Duration
} }
@ -40,7 +37,6 @@ func NewOptions(opts ...Option) Options {
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer, Tracer: tracer.DefaultTracer,
Context: context.Background(), Context: context.Background(),
Codec: codec.NewCodec(),
} }
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
@ -138,6 +134,8 @@ type ListOptions struct {
Context context.Context Context context.Context
// Namespace to scope the request to // Namespace to scope the request to
Namespace string Namespace string
// Name filter services by name
Name string
} }
// NewListOptions returns list options filled by opts // NewListOptions returns list options filled by opts
@ -299,15 +297,16 @@ func ListNamespace(d string) ListOption {
} }
} }
// ListName sets the name for list method to filter needed services
func ListName(n string) ListOption {
return func(o *ListOptions) {
o.Name = n
}
}
// Name sets the name // Name sets the name
func Name(n string) Option { func Name(n string) Option {
return func(o *Options) { return func(o *Options) {
o.Name = n o.Name = n
} }
} }
func Codec(c codec.Codec) Option {
return func(o *Options) {
o.Codec = c
}
}

View File

@ -15,31 +15,31 @@ type Watcher interface {
// the watcher. Actions can be create, update, delete // the watcher. Actions can be create, update, delete
type Result struct { type Result struct {
// Service holds register service // Service holds register service
Service *Service `json:"service,omitempty"` Service *Service
// Action holds the action // Action holds the action
Action EventType `json:"action,omitempty"` Action string
} }
// EventType defines register event type // EventType defines register event type
type EventType int type EventType int
const ( const (
// EventCreate is emitted when a new service is registered // Create is emitted when a new service is registered
EventCreate EventType = iota Create EventType = iota
// EventDelete is emitted when an existing service is deregistered // Delete is emitted when an existing service is deregistered
EventDelete Delete
// EventUpdate is emitted when an existing service is updated // Update is emitted when an existing service is updated
EventUpdate Update
) )
// String returns human readable event type // String returns human readable event type
func (t EventType) String() string { func (t EventType) String() string {
switch t { switch t {
case EventCreate: case Create:
return "create" return "create"
case EventDelete: case Delete:
return "delete" return "delete"
case EventUpdate: case Update:
return "update" return "update"
default: default:
return "unknown" return "unknown"
@ -49,11 +49,11 @@ func (t EventType) String() string {
// Event is register event // Event is register event
type Event struct { type Event struct {
// Timestamp is event timestamp // Timestamp is event timestamp
Timestamp time.Time `json:"timestamp,omitempty"` Timestamp time.Time
// Service is register service // Service is register service
Service *Service `json:"service,omitempty"` Service *Service
// ID is register id // ID is register id
ID string `json:"id,omitempty"` ID string
// Type defines type of event // Type defines type of event
Type EventType `json:"type,omitempty"` Type EventType
} }

27
util/buf/buf.go Normal file
View File

@ -0,0 +1,27 @@
package buf
import (
"bytes"
"io"
)
var _ io.Closer = &Buffer{}
// Buffer bytes.Buffer wrapper to satisfie io.Closer interface
type Buffer struct {
*bytes.Buffer
}
// Close reset buffer contents
func (b *Buffer) Close() error {
b.Buffer.Reset()
return nil
}
// New creates new buffer that satisfies Closer interface
func New(b *bytes.Buffer) *Buffer {
if b == nil {
b = bytes.NewBuffer(nil)
}
return &Buffer{b}
}

View File

@ -1,85 +0,0 @@
package buffer
import (
"io"
"sync"
"time"
)
var _ io.WriteCloser = (*DelayedBuffer)(nil)
// DelayedBuffer is the buffer that holds items until either the buffer filled or a specified time limit is reached
type DelayedBuffer struct {
mu sync.Mutex
maxWait time.Duration
flushTime time.Time
buffer chan []byte
ticker *time.Ticker
w io.Writer
err error
}
func NewDelayedBuffer(size int, maxWait time.Duration, w io.Writer) *DelayedBuffer {
b := &DelayedBuffer{
buffer: make(chan []byte, size),
ticker: time.NewTicker(maxWait),
w: w,
flushTime: time.Now(),
maxWait: maxWait,
}
b.loop()
return b
}
func (b *DelayedBuffer) loop() {
go func() {
for range b.ticker.C {
b.mu.Lock()
if time.Since(b.flushTime) > b.maxWait {
b.flush()
}
b.mu.Unlock()
}
}()
}
func (b *DelayedBuffer) flush() {
bufLen := len(b.buffer)
if bufLen > 0 {
tmp := make([][]byte, bufLen)
for i := 0; i < bufLen; i++ {
tmp[i] = <-b.buffer
}
for _, t := range tmp {
_, b.err = b.w.Write(t)
}
b.flushTime = time.Now()
}
}
func (b *DelayedBuffer) Put(items ...[]byte) {
b.mu.Lock()
for _, item := range items {
select {
case b.buffer <- item:
default:
b.flush()
b.buffer <- item
}
}
b.mu.Unlock()
}
func (b *DelayedBuffer) Close() error {
b.mu.Lock()
b.flush()
close(b.buffer)
b.ticker.Stop()
b.mu.Unlock()
return b.err
}
func (b *DelayedBuffer) Write(data []byte) (int, error) {
b.Put(data)
return len(data), b.err
}

View File

@ -1,22 +0,0 @@
package buffer
import (
"bytes"
"testing"
"time"
)
func TestTimedBuffer(t *testing.T) {
buf := bytes.NewBuffer(nil)
b := NewDelayedBuffer(100, 300*time.Millisecond, buf)
for i := 0; i < 100; i++ {
_, _ = b.Write([]byte(`test`))
}
if buf.Len() != 0 {
t.Fatal("delayed write not worked")
}
time.Sleep(400 * time.Millisecond)
if buf.Len() == 0 {
t.Fatal("delayed write not worked")
}
}

View File

@ -35,11 +35,11 @@ func TestUnmarshalYAML(t *testing.T) {
t.Fatalf("invalid duration %v != 10000000", v.TTL) t.Fatalf("invalid duration %v != 10000000", v.TTL)
} }
err = yaml.Unmarshal([]byte(`{"ttl":"1d"}`), v) err = yaml.Unmarshal([]byte(`{"ttl":"1y"}`), v)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} else if *(v.TTL) != 86400000000000 { } else if *(v.TTL) != 31622400000000000 {
t.Fatalf("invalid duration %v != 86400000000000", *v.TTL) t.Fatalf("invalid duration %v != 31622400000000000", v.TTL)
} }
} }
@ -68,11 +68,11 @@ func TestUnmarshalJSON(t *testing.T) {
t.Fatalf("invalid duration %v != 10000000", v.TTL) t.Fatalf("invalid duration %v != 10000000", v.TTL)
} }
err = json.Unmarshal([]byte(`{"ttl":"1d"}`), v) err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} else if v.TTL != 86400000000000 { } else if v.TTL != 31622400000000000 {
t.Fatalf("invalid duration %v != 86400000000000", v.TTL) t.Fatalf("invalid duration %v != 31622400000000000", v.TTL)
} }
} }
@ -87,11 +87,11 @@ func TestParseDuration(t *testing.T) {
if td.String() != "340h0m0s" { if td.String() != "340h0m0s" {
t.Fatalf("ParseDuration 14d != 340h0m0s : %s", td.String()) t.Fatalf("ParseDuration 14d != 340h0m0s : %s", td.String())
} }
td, err = ParseDuration("1d") td, err = ParseDuration("1y")
if err != nil { if err != nil {
t.Fatalf("ParseDuration error: %v", err) t.Fatalf("ParseDuration error: %v", err)
} }
if td.String() != "24h0m0s" { if td.String() != "8784h0m0s" {
t.Fatalf("ParseDuration 1d != 24h0m0s : %s", td.String()) t.Fatalf("ParseDuration 1y != 8784h0m0s : %s", td.String())
} }
} }