Compare commits

..

1 Commits
v3 ... register

Author SHA1 Message Date
8db52549e4 register: add ListName option
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2024-12-27 19:10:46 +03:00
11 changed files with 90 additions and 182 deletions

View File

@ -26,24 +26,24 @@ jobs:
- name: test coverage
run: |
go test -v -cover ./... -covermode=count -coverprofile coverage.out -coverpkg ./...
go test -v -cover ./... -coverprofile coverage.out -coverpkg ./...
go tool cover -func coverage.out -o coverage.out
- name: coverage badge
uses: tj-actions/coverage-badge-go@v2
uses: tj-actions/coverage-badge-go@v1
with:
green: 80
filename: coverage.out
- uses: stefanzweifel/git-auto-commit-action@v4
name: autocommit
id: auto-commit-action
with:
commit_message: Apply Code Coverage Badge
skip_fetch: true
skip_checkout: true
file_pattern: ./README.md
- name: push
- name: Push Changes
if: steps.auto-commit-action.outputs.changes_detected == 'true'
uses: ad-m/github-push-action@master
with:

View File

@ -1,9 +1,5 @@
# Micro
![Coverage](https://img.shields.io/badge/Coverage-44.6%25-yellow)
[![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Doc](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go.unistack.org/micro/v3?tab=overview)
[![Status](https://git.unistack.org/unistack-org/micro/actions/workflows/job_tests.yml/badge.svg?branch=v3)](https://git.unistack.org/unistack-org/micro/actions?query=workflow%3Abuild+branch%3Av3+event%3Apush)
[![Lint](https://goreportcard.com/badge/go.unistack.org/micro/v3)](https://goreportcard.com/report/go.unistack.org/micro/v3)
![Coverage](https://img.shields.io/badge/Coverage-45.1%25-yellow)
Micro is a standard library for microservices.
@ -15,20 +11,30 @@ Micro provides the core requirements for distributed systems development includi
Micro abstracts away the details of distributed systems. Here are the main features.
- **Authentication** - Auth is built in as a first class citizen. Authentication and authorization enable secure
zero trust networking by providing every service an identity and certificates. This additionally includes rule
based access control.
- **Dynamic Config** - Load and hot reload dynamic config from anywhere. The config interface provides a way to load application
level config from any source such as env vars, cmdline, file, consul, vault... You can merge the sources and even define fallbacks.
level config from any source such as env vars, file, etcd. You can merge the sources and even define fallbacks.
- **Data Storage** - A simple data store interface to read, write and delete records. It includes support for memory, file and
s3. State and persistence becomes a core requirement beyond prototyping and Micro looks to build that into the framework.
CockroachDB by default. State and persistence becomes a core requirement beyond prototyping and Micro looks to build that into the framework.
- **Service Discovery** - Automatic service registration and name resolution. Service discovery is at the core of micro service
development. When service A needs to speak to service B it needs the location of that service.
- **Load Balancing** - Client side load balancing built on service discovery. Once we have the addresses of any number of instances
of a service we now need a way to decide which node to route to. We use random hashed load balancing to provide even distribution
across the services and retry a different node if there's a problem.
- **Message Encoding** - Dynamic message encoding based on content-type. The client and server will use codecs along with content-type
to seamlessly encode and decode Go types for you. Any variety of messages could be encoded and sent from different clients. The client
and server handle this by default.
- **Async Messaging** - Pub/Sub is built in as a first class citizen for asynchronous communication and event driven architectures.
- **Transport** - gRPC or http based request/response with support for bidirectional streaming. We provide an abstraction for synchronous communication. A request made to a service will be automatically resolved, load balanced, dialled and streamed.
- **Async Messaging** - PubSub is built in as a first class citizen for asynchronous communication and event driven architectures.
Event notifications are a core pattern in micro service development.
- **Synchronization** - Distributed systems are often built in an eventually consistent manner. Support for distributed locking and
@ -37,6 +43,10 @@ leadership are built in as a Sync interface. When using an eventually consistent
- **Pluggable Interfaces** - Micro makes use of Go interfaces for each system abstraction. Because of this these interfaces
are pluggable and allows Micro to be runtime agnostic.
## Getting Started
To be created.
## License
Micro is Apache 2.0 licensed.

View File

@ -588,6 +588,7 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO
for _, p := range ps {
md := metadata.Copy(omd)
md[metadata.HeaderContentType] = p.ContentType()
topic := p.Topic()
if len(exchange) > 0 {
topic = exchange
@ -599,8 +600,6 @@ func (n *noopClient) publish(ctx context.Context, ps []Message, opts ...PublishO
md.Set(k, v)
}
md[metadata.HeaderContentType] = p.ContentType()
var body []byte
// passed in raw data

View File

@ -14,7 +14,6 @@ import (
"github.com/google/uuid"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/util/buffer"
)
// always first to have proper check
@ -31,30 +30,11 @@ func TestStacktrace(t *testing.T) {
l.Error(ctx, "msg1", errors.New("err"))
if !bytes.Contains(buf.Bytes(), []byte(`slog_test.go:32`)) {
if !bytes.Contains(buf.Bytes(), []byte(`slog_test.go:31`)) {
t.Fatalf("logger error not works, buf contains: %s", buf.Bytes())
}
}
func TestDelayedBuffer(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
dbuf := buffer.NewDelayedBuffer(100, 100*time.Millisecond, buf)
l := NewLogger(logger.WithLevel(logger.ErrorLevel), logger.WithOutput(dbuf),
WithHandlerFunc(slog.NewTextHandler),
logger.WithAddStacktrace(true),
)
if err := l.Init(logger.WithFields("key1", "val1")); err != nil {
t.Fatal(err)
}
l.Error(ctx, "msg1", errors.New("err"))
time.Sleep(120 * time.Millisecond)
if !bytes.Contains(buf.Bytes(), []byte(`key1=val1`)) {
t.Fatalf("logger delayed buffer not works, buf contains: %s", buf.Bytes())
}
}
func TestTime(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)

View File

@ -149,7 +149,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new service: %s, version: %s", s.Name, s.Version))
}
m.records[options.Namespace] = srvs
go m.sendEvent(&register.Result{Action: register.EventCreate, Service: s})
go m.sendEvent(&register.Result{Action: "create", Service: s})
}
var addedNodes bool
@ -185,7 +185,7 @@ func (m *memory) Register(_ context.Context, s *register.Service, opts ...regist
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register added new node to service: %s, version: %s", s.Name, s.Version))
}
go m.sendEvent(&register.Result{Action: register.EventUpdate, Service: s})
go m.sendEvent(&register.Result{Action: "update", Service: s})
} else {
// refresh TTL and timestamp
for _, n := range s.Nodes {
@ -238,7 +238,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
// is cleanup
if len(version.Nodes) > 0 {
m.records[options.Namespace][s.Name][s.Version] = version
go m.sendEvent(&register.Result{Action: register.EventUpdate, Service: s})
go m.sendEvent(&register.Result{Action: "update", Service: s})
return nil
}
@ -246,7 +246,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
// register and exit
if len(versions) == 1 {
delete(m.records[options.Namespace], s.Name)
go m.sendEvent(&register.Result{Action: register.EventDelete, Service: s})
go m.sendEvent(&register.Result{Action: "delete", Service: s})
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s", s.Name))
@ -256,7 +256,7 @@ func (m *memory) Deregister(ctx context.Context, s *register.Service, opts ...re
// there are other versions of the service running, so only remove this version of it
delete(m.records[options.Namespace][s.Name], s.Version)
go m.sendEvent(&register.Result{Action: register.EventDelete, Service: s})
go m.sendEvent(&register.Result{Action: "delete", Service: s})
if m.opts.Logger.V(logger.DebugLevel) {
m.opts.Logger.Debug(m.opts.Context, fmt.Sprintf("Register removed service: %s, version: %s", s.Name, s.Version))
}

View File

@ -5,7 +5,6 @@ import (
"crypto/tls"
"time"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/logger"
"go.unistack.org/micro/v3/meter"
"go.unistack.org/micro/v3/tracer"
@ -27,8 +26,6 @@ type Options struct {
Name string
// Addrs specifies register addrs
Addrs []string
// Codec used to marshal/unmarshal data in register
Codec codec.Codec
// Timeout specifies timeout
Timeout time.Duration
}
@ -40,7 +37,6 @@ func NewOptions(opts ...Option) Options {
Meter: meter.DefaultMeter,
Tracer: tracer.DefaultTracer,
Context: context.Background(),
Codec: codec.NewCodec(),
}
for _, o := range opts {
o(&options)
@ -138,6 +134,8 @@ type ListOptions struct {
Context context.Context
// Namespace to scope the request to
Namespace string
// Name filter services by name
Name string
}
// NewListOptions returns list options filled by opts
@ -299,15 +297,16 @@ func ListNamespace(d string) ListOption {
}
}
// ListName sets the name for list method to filter needed services
func ListName(n string) ListOption {
return func(o *ListOptions) {
o.Name = n
}
}
// Name sets the name
func Name(n string) Option {
return func(o *Options) {
o.Name = n
}
}
func Codec(c codec.Codec) Option {
return func(o *Options) {
o.Codec = c
}
}

View File

@ -15,31 +15,31 @@ type Watcher interface {
// the watcher. Actions can be create, update, delete
type Result struct {
// Service holds register service
Service *Service `json:"service,omitempty"`
Service *Service
// Action holds the action
Action EventType `json:"action,omitempty"`
Action string
}
// EventType defines register event type
type EventType int
const (
// EventCreate is emitted when a new service is registered
EventCreate EventType = iota
// EventDelete is emitted when an existing service is deregistered
EventDelete
// EventUpdate is emitted when an existing service is updated
EventUpdate
// Create is emitted when a new service is registered
Create EventType = iota
// Delete is emitted when an existing service is deregistered
Delete
// Update is emitted when an existing service is updated
Update
)
// String returns human readable event type
func (t EventType) String() string {
switch t {
case EventCreate:
case Create:
return "create"
case EventDelete:
case Delete:
return "delete"
case EventUpdate:
case Update:
return "update"
default:
return "unknown"
@ -49,11 +49,11 @@ func (t EventType) String() string {
// Event is register event
type Event struct {
// Timestamp is event timestamp
Timestamp time.Time `json:"timestamp,omitempty"`
Timestamp time.Time
// Service is register service
Service *Service `json:"service,omitempty"`
Service *Service
// ID is register id
ID string `json:"id,omitempty"`
ID string
// Type defines type of event
Type EventType `json:"type,omitempty"`
Type EventType
}

27
util/buf/buf.go Normal file
View File

@ -0,0 +1,27 @@
package buf
import (
"bytes"
"io"
)
var _ io.Closer = &Buffer{}
// Buffer bytes.Buffer wrapper to satisfie io.Closer interface
type Buffer struct {
*bytes.Buffer
}
// Close reset buffer contents
func (b *Buffer) Close() error {
b.Buffer.Reset()
return nil
}
// New creates new buffer that satisfies Closer interface
func New(b *bytes.Buffer) *Buffer {
if b == nil {
b = bytes.NewBuffer(nil)
}
return &Buffer{b}
}

View File

@ -1,85 +0,0 @@
package buffer
import (
"io"
"sync"
"time"
)
var _ io.WriteCloser = (*DelayedBuffer)(nil)
// DelayedBuffer is the buffer that holds items until either the buffer filled or a specified time limit is reached
type DelayedBuffer struct {
mu sync.Mutex
maxWait time.Duration
flushTime time.Time
buffer chan []byte
ticker *time.Ticker
w io.Writer
err error
}
func NewDelayedBuffer(size int, maxWait time.Duration, w io.Writer) *DelayedBuffer {
b := &DelayedBuffer{
buffer: make(chan []byte, size),
ticker: time.NewTicker(maxWait),
w: w,
flushTime: time.Now(),
maxWait: maxWait,
}
b.loop()
return b
}
func (b *DelayedBuffer) loop() {
go func() {
for range b.ticker.C {
b.mu.Lock()
if time.Since(b.flushTime) > b.maxWait {
b.flush()
}
b.mu.Unlock()
}
}()
}
func (b *DelayedBuffer) flush() {
bufLen := len(b.buffer)
if bufLen > 0 {
tmp := make([][]byte, bufLen)
for i := 0; i < bufLen; i++ {
tmp[i] = <-b.buffer
}
for _, t := range tmp {
_, b.err = b.w.Write(t)
}
b.flushTime = time.Now()
}
}
func (b *DelayedBuffer) Put(items ...[]byte) {
b.mu.Lock()
for _, item := range items {
select {
case b.buffer <- item:
default:
b.flush()
b.buffer <- item
}
}
b.mu.Unlock()
}
func (b *DelayedBuffer) Close() error {
b.mu.Lock()
b.flush()
close(b.buffer)
b.ticker.Stop()
b.mu.Unlock()
return b.err
}
func (b *DelayedBuffer) Write(data []byte) (int, error) {
b.Put(data)
return len(data), b.err
}

View File

@ -1,22 +0,0 @@
package buffer
import (
"bytes"
"testing"
"time"
)
func TestTimedBuffer(t *testing.T) {
buf := bytes.NewBuffer(nil)
b := NewDelayedBuffer(100, 300*time.Millisecond, buf)
for i := 0; i < 100; i++ {
_, _ = b.Write([]byte(`test`))
}
if buf.Len() != 0 {
t.Fatal("delayed write not worked")
}
time.Sleep(400 * time.Millisecond)
if buf.Len() == 0 {
t.Fatal("delayed write not worked")
}
}

View File

@ -35,11 +35,11 @@ func TestUnmarshalYAML(t *testing.T) {
t.Fatalf("invalid duration %v != 10000000", v.TTL)
}
err = yaml.Unmarshal([]byte(`{"ttl":"1d"}`), v)
err = yaml.Unmarshal([]byte(`{"ttl":"1y"}`), v)
if err != nil {
t.Fatal(err)
} else if *(v.TTL) != 86400000000000 {
t.Fatalf("invalid duration %v != 86400000000000", *v.TTL)
} else if *(v.TTL) != 31622400000000000 {
t.Fatalf("invalid duration %v != 31622400000000000", v.TTL)
}
}
@ -68,11 +68,11 @@ func TestUnmarshalJSON(t *testing.T) {
t.Fatalf("invalid duration %v != 10000000", v.TTL)
}
err = json.Unmarshal([]byte(`{"ttl":"1d"}`), v)
err = json.Unmarshal([]byte(`{"ttl":"1y"}`), v)
if err != nil {
t.Fatal(err)
} else if v.TTL != 86400000000000 {
t.Fatalf("invalid duration %v != 86400000000000", v.TTL)
} else if v.TTL != 31622400000000000 {
t.Fatalf("invalid duration %v != 31622400000000000", v.TTL)
}
}
@ -87,11 +87,11 @@ func TestParseDuration(t *testing.T) {
if td.String() != "340h0m0s" {
t.Fatalf("ParseDuration 14d != 340h0m0s : %s", td.String())
}
td, err = ParseDuration("1d")
td, err = ParseDuration("1y")
if err != nil {
t.Fatalf("ParseDuration error: %v", err)
}
if td.String() != "24h0m0s" {
t.Fatalf("ParseDuration 1d != 24h0m0s : %s", td.String())
if td.String() != "8784h0m0s" {
t.Fatalf("ParseDuration 1y != 8784h0m0s : %s", td.String())
}
}