From 6751060d05756437828926801f2e9f8c8a87730d Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Fri, 12 Feb 2021 16:33:16 +0300 Subject: [PATCH] move memory implementations to core micro repo Signed-off-by: Vasiliy Tolstov --- broker/memory.go | 247 ++++++++++++++ broker/memory_test.go | 50 +++ broker/noop.go | 81 ----- client/noop.go | 11 +- events/events.go | 49 --- events/options.go | 124 ------- go.mod | 1 + go.sum | 2 + network/transport/memory.go | 263 +++++++++++++++ network/transport/memory_test.go | 93 ++++++ network/transport/noop.go | 77 ----- register/memory.go | 541 +++++++++++++++++++++++++++++++ register/memory_test.go | 313 ++++++++++++++++++ register/noop.go | 85 ----- server/noop.go | 12 +- store/memory.go | 199 ++++++++++++ store/memory_test.go | 67 ++++ store/noop.go | 69 ---- sync/memory.go | 199 ++++++++++++ tracer/memory.go | 98 ++++++ tracer/noop.go | 44 --- 21 files changed, 2094 insertions(+), 531 deletions(-) create mode 100644 broker/memory.go create mode 100644 broker/memory_test.go delete mode 100644 broker/noop.go delete mode 100644 events/events.go delete mode 100644 events/options.go create mode 100644 network/transport/memory.go create mode 100644 network/transport/memory_test.go delete mode 100644 network/transport/noop.go create mode 100644 register/memory.go create mode 100644 register/memory_test.go delete mode 100644 register/noop.go create mode 100644 store/memory.go create mode 100644 store/memory_test.go delete mode 100644 store/noop.go create mode 100644 sync/memory.go create mode 100644 tracer/memory.go delete mode 100644 tracer/noop.go diff --git a/broker/memory.go b/broker/memory.go new file mode 100644 index 00000000..f6b8d82d --- /dev/null +++ b/broker/memory.go @@ -0,0 +1,247 @@ +package broker + +import ( + "context" + "errors" + "math/rand" + "sync" + "time" + + "github.com/google/uuid" + "github.com/unistack-org/micro/v3/logger" + maddr "github.com/unistack-org/micro/v3/util/addr" + mnet "github.com/unistack-org/micro/v3/util/net" +) + +type memoryBroker struct { + opts Options + + addr string + sync.RWMutex + connected bool + Subscribers map[string][]*memorySubscriber +} + +type memoryEvent struct { + opts Options + topic string + err error + message interface{} +} + +type memorySubscriber struct { + id string + topic string + exit chan bool + handler Handler + opts SubscribeOptions + ctx context.Context +} + +func (m *memoryBroker) Options() Options { + return m.opts +} + +func (m *memoryBroker) Address() string { + return m.addr +} + +func (m *memoryBroker) Connect(ctx context.Context) error { + m.Lock() + defer m.Unlock() + + if m.connected { + return nil + } + + // use 127.0.0.1 to avoid scan of all network interfaces + addr, err := maddr.Extract("127.0.0.1") + if err != nil { + return err + } + i := rand.Intn(20000) + // set addr with port + addr = mnet.HostPort(addr, 10000+i) + + m.addr = addr + m.connected = true + + return nil +} + +func (m *memoryBroker) Disconnect(ctx context.Context) error { + m.Lock() + defer m.Unlock() + + if !m.connected { + return nil + } + + m.connected = false + + return nil +} + +func (m *memoryBroker) Init(opts ...Option) error { + for _, o := range opts { + o(&m.opts) + } + return nil +} + +func (m *memoryBroker) Publish(ctx context.Context, topic string, msg *Message, opts ...PublishOption) error { + m.RLock() + if !m.connected { + m.RUnlock() + return errors.New("not connected") + } + + subs, ok := m.Subscribers[topic] + m.RUnlock() + if !ok { + return nil + } + + var v interface{} + if m.opts.Codec != nil { + buf, err := m.opts.Codec.Marshal(msg) + if err != nil { + return err + } + v = buf + } else { + v = msg + } + + p := &memoryEvent{ + topic: topic, + message: v, + opts: m.opts, + } + + eh := m.opts.ErrorHandler + + for _, sub := range subs { + if err := sub.handler(p); err != nil { + p.err = err + if sub.opts.ErrorHandler != nil { + eh = sub.opts.ErrorHandler + } + if eh != nil { + eh(p) + } else { + if m.opts.Logger.V(logger.ErrorLevel) { + m.opts.Logger.Error(m.opts.Context, err.Error()) + } + } + continue + } + } + + return nil +} + +func (m *memoryBroker) Subscribe(ctx context.Context, topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { + m.RLock() + if !m.connected { + m.RUnlock() + return nil, errors.New("not connected") + } + m.RUnlock() + + options := NewSubscribeOptions(opts...) + + id, err := uuid.NewRandom() + if err != nil { + return nil, err + } + + sub := &memorySubscriber{ + exit: make(chan bool, 1), + id: id.String(), + topic: topic, + handler: handler, + opts: options, + ctx: ctx, + } + + m.Lock() + m.Subscribers[topic] = append(m.Subscribers[topic], sub) + m.Unlock() + + go func() { + <-sub.exit + m.Lock() + var newSubscribers []*memorySubscriber + for _, sb := range m.Subscribers[topic] { + if sb.id == sub.id { + continue + } + newSubscribers = append(newSubscribers, sb) + } + m.Subscribers[topic] = newSubscribers + m.Unlock() + }() + + return sub, nil +} + +func (m *memoryBroker) String() string { + return "memory" +} + +func (m *memoryBroker) Name() string { + return m.opts.Name +} + +func (m *memoryEvent) Topic() string { + return m.topic +} + +func (m *memoryEvent) Message() *Message { + switch v := m.message.(type) { + case *Message: + return v + case []byte: + msg := &Message{} + if err := m.opts.Codec.Unmarshal(v, msg); err != nil { + if m.opts.Logger.V(logger.ErrorLevel) { + m.opts.Logger.Error(m.opts.Context, "[memory]: failed to unmarshal: %v", err) + } + return nil + } + return msg + } + + return nil +} + +func (m *memoryEvent) Ack() error { + return nil +} + +func (m *memoryEvent) Error() error { + return m.err +} + +func (m *memorySubscriber) Options() SubscribeOptions { + return m.opts +} + +func (m *memorySubscriber) Topic() string { + return m.topic +} + +func (m *memorySubscriber) Unsubscribe(ctx context.Context) error { + m.exit <- true + return nil +} + +func NewBroker(opts ...Option) Broker { + rand.Seed(time.Now().UnixNano()) + + return &memoryBroker{ + opts: NewOptions(opts...), + Subscribers: make(map[string][]*memorySubscriber), + } +} diff --git a/broker/memory_test.go b/broker/memory_test.go new file mode 100644 index 00000000..25c7cfe1 --- /dev/null +++ b/broker/memory_test.go @@ -0,0 +1,50 @@ +package broker + +import ( + "context" + "fmt" + "testing" +) + +func TestMemoryBroker(t *testing.T) { + b := NewBroker() + ctx := context.Background() + + if err := b.Connect(ctx); err != nil { + t.Fatalf("Unexpected connect error %v", err) + } + + topic := "test" + count := 10 + + fn := func(p Event) error { + return nil + } + + sub, err := b.Subscribe(ctx, topic, fn) + if err != nil { + t.Fatalf("Unexpected error subscribing %v", err) + } + + for i := 0; i < count; i++ { + message := &Message{ + Header: map[string]string{ + "foo": "bar", + "id": fmt.Sprintf("%d", i), + }, + Body: []byte(`hello world`), + } + + if err := b.Publish(ctx, topic, message); err != nil { + t.Fatalf("Unexpected error publishing %d", i) + } + } + + if err := sub.Unsubscribe(ctx); err != nil { + t.Fatalf("Unexpected error unsubscribing from %s: %v", topic, err) + } + + if err := b.Disconnect(ctx); err != nil { + t.Fatalf("Unexpected connect error %v", err) + } +} diff --git a/broker/noop.go b/broker/noop.go deleted file mode 100644 index 0ab3fb99..00000000 --- a/broker/noop.go +++ /dev/null @@ -1,81 +0,0 @@ -package broker - -import "context" - -type noopBroker struct { - opts Options -} - -type noopSubscriber struct { - topic string - opts SubscribeOptions -} - -// NewBroker returns new noop broker -func NewBroker(opts ...Option) Broker { - return &noopBroker{opts: NewOptions(opts...)} -} - -func (n *noopBroker) Name() string { - return n.opts.Name -} - -// Init initialize broker -func (n *noopBroker) Init(opts ...Option) error { - for _, o := range opts { - o(&n.opts) - } - - return nil -} - -// Options returns broker Options -func (n *noopBroker) Options() Options { - return n.opts -} - -// Address returns broker address -func (n *noopBroker) Address() string { - return "" -} - -// Connect connects to broker -func (n *noopBroker) Connect(ctx context.Context) error { - return nil -} - -// Disconnect disconnects from broker -func (n *noopBroker) Disconnect(ctx context.Context) error { - return nil -} - -// Publish publishes message to broker -func (n *noopBroker) Publish(ctx context.Context, topic string, m *Message, opts ...PublishOption) error { - return nil -} - -// Subscribe subscribes to broker topic -func (n *noopBroker) Subscribe(ctx context.Context, topic string, h Handler, opts ...SubscribeOption) (Subscriber, error) { - options := NewSubscribeOptions(opts...) - return &noopSubscriber{topic: topic, opts: options}, nil -} - -// String return broker string representation -func (n *noopBroker) String() string { - return "noop" -} - -// Options returns subscriber options -func (n *noopSubscriber) Options() SubscribeOptions { - return n.opts -} - -// TOpic returns subscriber topic -func (n *noopSubscriber) Topic() string { - return n.topic -} - -// Unsubscribe unsbscribes from broker topic -func (n *noopSubscriber) Unsubscribe(ctx context.Context) error { - return nil -} diff --git a/client/noop.go b/client/noop.go index e24d216a..2d285f8d 100644 --- a/client/noop.go +++ b/client/noop.go @@ -46,7 +46,16 @@ type noopRequest struct { // NewClient returns new noop client func NewClient(opts ...Option) Client { - return &noopClient{opts: NewOptions(opts...)} + nc := &noopClient{opts: NewOptions(opts...)} + // wrap in reverse + + c := Client(nc) + + for i := len(nc.opts.Wrappers); i > 0; i-- { + c = nc.opts.Wrappers[i-1](c) + } + + return c } func (n *noopClient) Name() string { diff --git a/events/events.go b/events/events.go deleted file mode 100644 index 333bfd7e..00000000 --- a/events/events.go +++ /dev/null @@ -1,49 +0,0 @@ -// Package events contains interfaces for managing events within distributed systems -package events - -import ( - "context" - "encoding/json" - "errors" - "time" - - "github.com/unistack-org/micro/v3/metadata" -) - -var ( - // ErrMissingTopic is returned if a blank topic was provided to publish - ErrMissingTopic = errors.New("Missing topic") - // ErrEncodingMessage is returned from publish if there was an error encoding the message option - ErrEncodingMessage = errors.New("Error encoding message") -) - -// Stream of events -type Stream interface { - Publish(ctx context.Context, topic string, msg interface{}, opts ...PublishOption) error - Subscribe(ctx context.Context, topic string, opts ...SubscribeOption) (<-chan Event, error) -} - -// Store of events -type Store interface { - Read(ctx context.Context, opts ...ReadOption) ([]*Event, error) - Write(ctx context.Context, event *Event, opts ...WriteOption) error -} - -// Event is the object returned by the broker when you subscribe to a topic -type Event struct { - // ID to uniquely identify the event - ID string - // Topic of event, e.g. "register.service.created" - Topic string - // Timestamp of the event - Timestamp time.Time - // Metadata contains the encoded event was indexed by - Metadata metadata.Metadata - // Payload contains the encoded message - Payload []byte -} - -// Unmarshal the events message into an object -func (e *Event) Unmarshal(v interface{}) error { - return json.Unmarshal(e.Payload, v) -} diff --git a/events/options.go b/events/options.go deleted file mode 100644 index 4a14a9e5..00000000 --- a/events/options.go +++ /dev/null @@ -1,124 +0,0 @@ -package events - -import ( - "time" - - "github.com/unistack-org/micro/v3/metadata" -) - -// PublishOptions contains all the options which can be provided when publishing an event -type PublishOptions struct { - // Metadata contains any keys which can be used to query the data, for example a customer id - Metadata metadata.Metadata - // Timestamp to set for the event, if the timestamp is a zero value, the current time will be used - Timestamp time.Time -} - -// PublishOption sets attributes on PublishOptions -type PublishOption func(o *PublishOptions) - -// WithMetadata sets the Metadata field on PublishOptions -func WithMetadata(md metadata.Metadata) PublishOption { - return func(o *PublishOptions) { - o.Metadata = metadata.Copy(md) - } -} - -// WithTimestamp sets the timestamp field on PublishOptions -func WithTimestamp(t time.Time) PublishOption { - return func(o *PublishOptions) { - o.Timestamp = t - } -} - -// SubscribeOptions contains all the options which can be provided when subscribing to a topic -type SubscribeOptions struct { - // Queue is the name of the subscribers queue, if two subscribers have the same queue the message - // should only be published to one of them - Queue string - // StartAtTime is the time from which the messages should be consumed from. If not provided then - // the messages will be consumed starting from the moment the Subscription starts. - StartAtTime time.Time -} - -// SubscribeOption sets attributes on SubscribeOptions -type SubscribeOption func(o *SubscribeOptions) - -// WithQueue sets the Queue fielf on SubscribeOptions to the value provided -func WithQueue(q string) SubscribeOption { - return func(o *SubscribeOptions) { - o.Queue = q - } -} - -// WithStartAtTime sets the StartAtTime field on SubscribeOptions to the value provided -func WithStartAtTime(t time.Time) SubscribeOption { - return func(o *SubscribeOptions) { - o.StartAtTime = t - } -} - -// WriteOptions contains all the options which can be provided when writing an event to a store -type WriteOptions struct { - // TTL is the duration the event should be recorded for, a zero value TTL indicates the event should - // be stored indefinitely - TTL time.Duration -} - -// WriteOption sets attributes on WriteOptions -type WriteOption func(o *WriteOptions) - -// WithTTL sets the TTL attribute on WriteOptions -func WithTTL(d time.Duration) WriteOption { - return func(o *WriteOptions) { - o.TTL = d - } -} - -// ReadOptions contains all the options which can be provided when reading events from a store -type ReadOptions struct { - // Topic to read events from, if no topic is provided events from all topics will be returned - Topic string - // Query to filter the results using. The store will query the metadata provided when the event - // was written to the store - Query map[string]string - // Limit the number of results to return - Limit int - // Offset the results by this number, useful for paginated queries - Offset int -} - -// ReadOption sets attributes on ReadOptions -type ReadOption func(o *ReadOptions) - -// ReadTopic sets the topic attribute on ReadOptions -func ReadTopic(t string) ReadOption { - return func(o *ReadOptions) { - o.Topic = t - } -} - -// ReadFilter sets a key and value in the query -func ReadFilter(key, value string) ReadOption { - return func(o *ReadOptions) { - if o.Query == nil { - o.Query = map[string]string{key: value} - } else { - o.Query[key] = value - } - } -} - -// ReadLimit sets the limit attribute on ReadOptions -func ReadLimit(l int) ReadOption { - return func(o *ReadOptions) { - o.Limit = 1 - } -} - -// ReadOffset sets the offset attribute on ReadOptions -func ReadOffset(l int) ReadOption { - return func(o *ReadOptions) { - o.Offset = 1 - } -} diff --git a/go.mod b/go.mod index 4ab34bbe..caa0b3f7 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/miekg/dns v1.1.38 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/stretchr/testify v1.7.0 golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect golang.org/x/net v0.0.0-20210119194325-5f4716e94777 diff --git a/go.sum b/go.sum index 516770f7..354ca6e2 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/network/transport/memory.go b/network/transport/memory.go new file mode 100644 index 00000000..a723356f --- /dev/null +++ b/network/transport/memory.go @@ -0,0 +1,263 @@ +package transport + +import ( + "context" + "errors" + "fmt" + "math/rand" + "net" + "sync" + "time" + + maddr "github.com/unistack-org/micro/v3/util/addr" + mnet "github.com/unistack-org/micro/v3/util/net" +) + +type memorySocket struct { + recv chan *Message + send chan *Message + // sock exit + exit chan bool + // listener exit + lexit chan bool + + local string + remote string + + // for send/recv transport.Timeout + timeout time.Duration + ctx context.Context + sync.RWMutex +} + +type memoryClient struct { + *memorySocket + opts DialOptions +} + +type memoryListener struct { + addr string + exit chan bool + conn chan *memorySocket + lopts ListenOptions + topts Options + sync.RWMutex + ctx context.Context +} + +type memoryTransport struct { + opts Options + sync.RWMutex + listeners map[string]*memoryListener +} + +func (ms *memorySocket) Recv(m *Message) error { + ms.RLock() + defer ms.RUnlock() + + ctx := ms.ctx + if ms.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ms.ctx, ms.timeout) + defer cancel() + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ms.exit: + return errors.New("connection closed") + case <-ms.lexit: + return errors.New("server connection closed") + case cm := <-ms.recv: + *m = *cm + } + return nil +} + +func (ms *memorySocket) Local() string { + return ms.local +} + +func (ms *memorySocket) Remote() string { + return ms.remote +} + +func (ms *memorySocket) Send(m *Message) error { + ms.RLock() + defer ms.RUnlock() + + ctx := ms.ctx + if ms.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ms.ctx, ms.timeout) + defer cancel() + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-ms.exit: + return errors.New("connection closed") + case <-ms.lexit: + return errors.New("server connection closed") + case ms.send <- m: + } + return nil +} + +func (ms *memorySocket) Close() error { + ms.Lock() + defer ms.Unlock() + select { + case <-ms.exit: + return nil + default: + close(ms.exit) + } + return nil +} + +func (m *memoryListener) Addr() string { + return m.addr +} + +func (m *memoryListener) Close() error { + m.Lock() + defer m.Unlock() + select { + case <-m.exit: + return nil + default: + close(m.exit) + } + return nil +} + +func (m *memoryListener) Accept(fn func(Socket)) error { + for { + select { + case <-m.exit: + return nil + case c := <-m.conn: + go fn(&memorySocket{ + lexit: c.lexit, + exit: c.exit, + send: c.recv, + recv: c.send, + local: c.Remote(), + remote: c.Local(), + timeout: m.topts.Timeout, + ctx: m.topts.Context, + }) + } + } +} + +func (m *memoryTransport) Dial(ctx context.Context, addr string, opts ...DialOption) (Client, error) { + m.RLock() + defer m.RUnlock() + + listener, ok := m.listeners[addr] + if !ok { + return nil, errors.New("could not dial " + addr) + } + + options := NewDialOptions(opts...) + + client := &memoryClient{ + &memorySocket{ + send: make(chan *Message), + recv: make(chan *Message), + exit: make(chan bool), + lexit: listener.exit, + local: addr, + remote: addr, + timeout: m.opts.Timeout, + ctx: m.opts.Context, + }, + options, + } + + // pseudo connect + select { + case <-listener.exit: + return nil, errors.New("connection error") + case listener.conn <- client.memorySocket: + } + + return client, nil +} + +func (m *memoryTransport) Listen(ctx context.Context, addr string, opts ...ListenOption) (Listener, error) { + m.Lock() + defer m.Unlock() + + options := NewListenOptions(opts...) + + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + + addr, err = maddr.Extract(host) + if err != nil { + return nil, err + } + + // if zero port then randomly assign one + if len(port) > 0 && port == "0" { + i := rand.Intn(20000) + port = fmt.Sprintf("%d", 10000+i) + } + + // set addr with port + addr = mnet.HostPort(addr, port) + + if _, ok := m.listeners[addr]; ok { + return nil, errors.New("already listening on " + addr) + } + + listener := &memoryListener{ + lopts: options, + topts: m.opts, + addr: addr, + conn: make(chan *memorySocket), + exit: make(chan bool), + ctx: m.opts.Context, + } + + m.listeners[addr] = listener + + return listener, nil +} + +func (m *memoryTransport) Init(opts ...Option) error { + for _, o := range opts { + o(&m.opts) + } + return nil +} + +func (m *memoryTransport) Options() Options { + return m.opts +} + +func (m *memoryTransport) String() string { + return "memory" +} + +func (m *memoryTransport) Name() string { + return m.opts.Name +} + +func NewTransport(opts ...Option) Transport { + options := NewOptions(opts...) + + rand.Seed(time.Now().UnixNano()) + + return &memoryTransport{ + opts: options, + listeners: make(map[string]*memoryListener), + } +} diff --git a/network/transport/memory_test.go b/network/transport/memory_test.go new file mode 100644 index 00000000..939c4a01 --- /dev/null +++ b/network/transport/memory_test.go @@ -0,0 +1,93 @@ +package transport + +import ( + "context" + "os" + "testing" +) + +func TestMemoryTransport(t *testing.T) { + tr := NewTransport() + ctx := context.Background() + // bind / listen + l, err := tr.Listen(ctx, "127.0.0.1:8080") + if err != nil { + t.Fatalf("Unexpected error listening %v", err) + } + defer l.Close() + + // accept + go func() { + if err := l.Accept(func(sock Socket) { + for { + var m Message + if err := sock.Recv(&m); err != nil { + return + } + if len(os.Getenv("INTEGRATION_TESTS")) == 0 { + t.Logf("Server Received %s", string(m.Body)) + } + if err := sock.Send(&Message{ + Body: []byte(`pong`), + }); err != nil { + return + } + } + }); err != nil { + t.Fatalf("Unexpected error accepting %v", err) + } + }() + + // dial + c, err := tr.Dial(ctx, "127.0.0.1:8080") + if err != nil { + t.Fatalf("Unexpected error dialing %v", err) + } + defer c.Close() + + // send <=> receive + for i := 0; i < 3; i++ { + if err := c.Send(&Message{ + Body: []byte(`ping`), + }); err != nil { + return + } + var m Message + if err := c.Recv(&m); err != nil { + return + } + if len(os.Getenv("INTEGRATION_TESTS")) == 0 { + t.Logf("Client Received %s", string(m.Body)) + } + } + +} + +func TestListener(t *testing.T) { + tr := NewTransport() + ctx := context.Background() + // bind / listen on random port + l, err := tr.Listen(ctx, ":0") + if err != nil { + t.Fatalf("Unexpected error listening %v", err) + } + defer l.Close() + + // try again + l2, err := tr.Listen(ctx, ":0") + if err != nil { + t.Fatalf("Unexpected error listening %v", err) + } + defer l2.Close() + + // now make sure it still fails + l3, err := tr.Listen(ctx, ":8080") + if err != nil { + t.Fatalf("Unexpected error listening %v", err) + } + defer l3.Close() + + if _, err := tr.Listen(ctx, ":8080"); err == nil { + t.Fatal("Expected error binding to :8080 got nil") + } +} diff --git a/network/transport/noop.go b/network/transport/noop.go deleted file mode 100644 index e5fac7b4..00000000 --- a/network/transport/noop.go +++ /dev/null @@ -1,77 +0,0 @@ -package transport - -import "context" - -type noopTransport struct { - opts Options -} - -// NewTransport creates new noop transport -func NewTransport(opts ...Option) Transport { - return &noopTransport{opts: NewOptions(opts...)} -} - -func (t *noopTransport) Init(opts ...Option) error { - for _, o := range opts { - o(&t.opts) - } - return nil -} - -func (t *noopTransport) Options() Options { - return t.opts -} - -func (t *noopTransport) Dial(ctx context.Context, addr string, opts ...DialOption) (Client, error) { - options := NewDialOptions(opts...) - return &noopClient{opts: options}, nil -} - -func (t *noopTransport) Listen(ctx context.Context, addr string, opts ...ListenOption) (Listener, error) { - options := NewListenOptions(opts...) - return &noopListener{opts: options}, nil -} - -func (t *noopTransport) String() string { - return "noop" -} - -type noopClient struct { - opts DialOptions -} - -func (c *noopClient) Close() error { - return nil -} - -func (c *noopClient) Local() string { - return "" -} - -func (c *noopClient) Remote() string { - return "" -} - -func (c *noopClient) Recv(*Message) error { - return nil -} - -func (c *noopClient) Send(*Message) error { - return nil -} - -type noopListener struct { - opts ListenOptions -} - -func (l *noopListener) Addr() string { - return "" -} - -func (l *noopListener) Accept(fn func(Socket)) error { - return nil -} - -func (l *noopListener) Close() error { - return nil -} diff --git a/register/memory.go b/register/memory.go new file mode 100644 index 00000000..a9b27e00 --- /dev/null +++ b/register/memory.go @@ -0,0 +1,541 @@ +package register + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/google/uuid" + "github.com/unistack-org/micro/v3/logger" +) + +var ( + sendEventTime = 10 * time.Millisecond + ttlPruneTime = time.Second +) + +type node struct { + *Node + TTL time.Duration + LastSeen time.Time +} + +type record struct { + Name string + Version string + Metadata map[string]string + Nodes map[string]*node + Endpoints []*Endpoint +} + +type memory struct { + opts Options + // records is a KV map with domain name as the key and a services map as the value + records map[string]services + watchers map[string]*watcher + sync.RWMutex +} + +// services is a KV map with service name as the key and a map of records as the value +type services map[string]map[string]*record + +// NewRegister returns an initialized in-memory register +func NewRegister(opts ...Option) Register { + r := &memory{ + opts: NewOptions(opts...), + records: make(map[string]services), + watchers: make(map[string]*watcher), + } + + go r.ttlPrune() + + return r +} + +func (m *memory) ttlPrune() { + prune := time.NewTicker(ttlPruneTime) + defer prune.Stop() + + for { + select { + case <-prune.C: + m.Lock() + for domain, services := range m.records { + for service, versions := range services { + for version, record := range versions { + for id, n := range record.Nodes { + if n.TTL != 0 && time.Since(n.LastSeen) > n.TTL { + if m.opts.Logger.V(logger.DebugLevel) { + m.opts.Logger.Debugf(m.opts.Context, "Register TTL expired for node %s of service %s", n.Id, service) + } + delete(m.records[domain][service][version].Nodes, id) + } + } + } + } + } + m.Unlock() + } + } +} + +func (m *memory) sendEvent(r *Result) { + m.RLock() + watchers := make([]*watcher, 0, len(m.watchers)) + for _, w := range m.watchers { + watchers = append(watchers, w) + } + m.RUnlock() + + for _, w := range watchers { + select { + case <-w.exit: + m.Lock() + delete(m.watchers, w.id) + m.Unlock() + default: + select { + case w.res <- r: + case <-time.After(sendEventTime): + } + } + } +} + +func (m *memory) Connect(ctx context.Context) error { + return nil +} + +func (m *memory) Disconnect(ctx context.Context) error { + return nil +} + +func (m *memory) Init(opts ...Option) error { + for _, o := range opts { + o(&m.opts) + } + + // add services + m.Lock() + defer m.Unlock() + + return nil +} + +func (m *memory) Options() Options { + return m.opts +} + +func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOption) error { + m.Lock() + defer m.Unlock() + + options := NewRegisterOptions(opts...) + + // get the services for this domain from the register + srvs, ok := m.records[options.Domain] + if !ok { + srvs = make(services) + } + + // domain is set in metadata so it can be passed to watchers + if s.Metadata == nil { + s.Metadata = map[string]string{"domain": options.Domain} + } else { + s.Metadata["domain"] = options.Domain + } + + // ensure the service name exists + r := serviceToRecord(s, options.TTL) + if _, ok := srvs[s.Name]; !ok { + srvs[s.Name] = make(map[string]*record) + } + + if _, ok := srvs[s.Name][s.Version]; !ok { + srvs[s.Name][s.Version] = r + if m.opts.Logger.V(logger.DebugLevel) { + m.opts.Logger.Debugf(m.opts.Context, "Register added new service: %s, version: %s", s.Name, s.Version) + } + m.records[options.Domain] = srvs + go m.sendEvent(&Result{Action: "create", Service: s}) + } + + var addedNodes bool + + for _, n := range s.Nodes { + // check if already exists + if _, ok := srvs[s.Name][s.Version].Nodes[n.Id]; ok { + continue + } + + metadata := make(map[string]string) + + // make copy of metadata + for k, v := range n.Metadata { + metadata[k] = v + } + + // set the domain + metadata["domain"] = options.Domain + + // add the node + srvs[s.Name][s.Version].Nodes[n.Id] = &node{ + Node: &Node{ + Id: n.Id, + Address: n.Address, + Metadata: metadata, + }, + TTL: options.TTL, + LastSeen: time.Now(), + } + + addedNodes = true + } + + if addedNodes { + if m.opts.Logger.V(logger.DebugLevel) { + m.opts.Logger.Debugf(m.opts.Context, "Register added new node to service: %s, version: %s", s.Name, s.Version) + } + go m.sendEvent(&Result{Action: "update", Service: s}) + } else { + // refresh TTL and timestamp + for _, n := range s.Nodes { + if m.opts.Logger.V(logger.DebugLevel) { + m.opts.Logger.Debugf(m.opts.Context, "Updated registration for service: %s, version: %s", s.Name, s.Version) + } + srvs[s.Name][s.Version].Nodes[n.Id].TTL = options.TTL + srvs[s.Name][s.Version].Nodes[n.Id].LastSeen = time.Now() + } + } + + m.records[options.Domain] = srvs + return nil +} + +func (m *memory) Deregister(ctx context.Context, s *Service, opts ...DeregisterOption) error { + m.Lock() + defer m.Unlock() + + options := NewDeregisterOptions(opts...) + + // domain is set in metadata so it can be passed to watchers + if s.Metadata == nil { + s.Metadata = map[string]string{"domain": options.Domain} + } else { + s.Metadata["domain"] = options.Domain + } + + // if the domain doesn't exist, there is nothing to deregister + services, ok := m.records[options.Domain] + if !ok { + return nil + } + + // if no services with this name and version exist, there is nothing to deregister + versions, ok := services[s.Name] + if !ok { + return nil + } + + version, ok := versions[s.Version] + if !ok { + return nil + } + + // deregister all of the service nodes from this version + for _, n := range s.Nodes { + if _, ok := version.Nodes[n.Id]; ok { + if m.opts.Logger.V(logger.DebugLevel) { + m.opts.Logger.Debugf(m.opts.Context, "Register removed node from service: %s, version: %s", s.Name, s.Version) + } + delete(version.Nodes, n.Id) + } + } + + // if the nodes not empty, we replace the version in the store and exist, the rest of the logic + // is cleanup + if len(version.Nodes) > 0 { + m.records[options.Domain][s.Name][s.Version] = version + go m.sendEvent(&Result{Action: "update", Service: s}) + return nil + } + + // if this version was the only version of the service, we can remove the whole service from the + // register and exit + if len(versions) == 1 { + delete(m.records[options.Domain], s.Name) + go m.sendEvent(&Result{Action: "delete", Service: s}) + + if m.opts.Logger.V(logger.DebugLevel) { + m.opts.Logger.Debugf(m.opts.Context, "Register removed service: %s", s.Name) + } + return nil + } + + // there are other versions of the service running, so only remove this version of it + delete(m.records[options.Domain][s.Name], s.Version) + go m.sendEvent(&Result{Action: "delete", Service: s}) + if m.opts.Logger.V(logger.DebugLevel) { + m.opts.Logger.Debugf(m.opts.Context, "Register removed service: %s, version: %s", s.Name, s.Version) + } + + return nil +} + +func (m *memory) LookupService(ctx context.Context, name string, opts ...LookupOption) ([]*Service, error) { + options := NewLookupOptions(opts...) + + // if it's a wildcard domain, return from all domains + if options.Domain == WildcardDomain { + m.RLock() + recs := m.records + m.RUnlock() + + var services []*Service + + for domain := range recs { + srvs, err := m.LookupService(ctx, name, append(opts, LookupDomain(domain))...) + if err == ErrNotFound { + continue + } else if err != nil { + return nil, err + } + services = append(services, srvs...) + } + + if len(services) == 0 { + return nil, ErrNotFound + } + return services, nil + } + + m.RLock() + defer m.RUnlock() + + // check the domain exists + services, ok := m.records[options.Domain] + if !ok { + return nil, ErrNotFound + } + + // check the service exists + versions, ok := services[name] + if !ok || len(versions) == 0 { + return nil, ErrNotFound + } + + // serialize the response + result := make([]*Service, len(versions)) + + var i int + + for _, r := range versions { + result[i] = recordToService(r, options.Domain) + i++ + } + + return result, nil +} + +func (m *memory) ListServices(ctx context.Context, opts ...ListOption) ([]*Service, error) { + options := NewListOptions(opts...) + + // if it's a wildcard domain, list from all domains + if options.Domain == WildcardDomain { + m.RLock() + recs := m.records + m.RUnlock() + + var services []*Service + + for domain := range recs { + srvs, err := m.ListServices(ctx, append(opts, ListDomain(domain))...) + if err != nil { + return nil, err + } + services = append(services, srvs...) + } + + return services, nil + } + + m.RLock() + defer m.RUnlock() + + // ensure the domain exists + services, ok := m.records[options.Domain] + if !ok { + return make([]*Service, 0), nil + } + + // serialize the result, each version counts as an individual service + var result []*Service + + for domain, service := range services { + for _, version := range service { + result = append(result, recordToService(version, domain)) + } + } + + return result, nil +} + +func (m *memory) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { + wo := NewWatchOptions(opts...) + + // construct the watcher + w := &watcher{ + exit: make(chan bool), + res: make(chan *Result), + id: uuid.New().String(), + wo: wo, + } + + m.Lock() + m.watchers[w.id] = w + m.Unlock() + + return w, nil +} + +func (m *memory) Name() string { + return m.opts.Name +} + +func (m *memory) String() string { + return "memory" +} + +type watcher struct { + id string + wo WatchOptions + res chan *Result + exit chan bool +} + +func (m *watcher) Next() (*Result, error) { + for { + select { + case r := <-m.res: + if r.Service == nil { + continue + } + + if len(m.wo.Service) > 0 && m.wo.Service != r.Service.Name { + continue + } + + // extract domain from service metadata + var domain string + if r.Service.Metadata != nil && len(r.Service.Metadata["domain"]) > 0 { + domain = r.Service.Metadata["domain"] + } else { + domain = DefaultDomain + } + + // only send the event if watching the wildcard or this specific domain + if m.wo.Domain == WildcardDomain || m.wo.Domain == domain { + return r, nil + } + case <-m.exit: + return nil, errors.New("watcher stopped") + } + } +} + +func (m *watcher) Stop() { + select { + case <-m.exit: + return + default: + close(m.exit) + } +} + +func serviceToRecord(s *Service, ttl time.Duration) *record { + metadata := make(map[string]string, len(s.Metadata)) + for k, v := range s.Metadata { + metadata[k] = v + } + + nodes := make(map[string]*node, len(s.Nodes)) + for _, n := range s.Nodes { + nodes[n.Id] = &node{ + Node: n, + TTL: ttl, + LastSeen: time.Now(), + } + } + + endpoints := make([]*Endpoint, len(s.Endpoints)) + for i, e := range s.Endpoints { + endpoints[i] = e + } + + return &record{ + Name: s.Name, + Version: s.Version, + Metadata: metadata, + Nodes: nodes, + Endpoints: endpoints, + } +} + +func recordToService(r *record, domain string) *Service { + metadata := make(map[string]string, len(r.Metadata)) + for k, v := range r.Metadata { + metadata[k] = v + } + + // set the domain in metadata so it can be determined when a wildcard query is performed + metadata["domain"] = domain + + endpoints := make([]*Endpoint, len(r.Endpoints)) + for i, e := range r.Endpoints { + request := new(Value) + if e.Request != nil { + *request = *e.Request + } + response := new(Value) + if e.Response != nil { + *response = *e.Response + } + + metadata := make(map[string]string, len(e.Metadata)) + for k, v := range e.Metadata { + metadata[k] = v + } + + endpoints[i] = &Endpoint{ + Name: e.Name, + Request: request, + Response: response, + Metadata: metadata, + } + } + + nodes := make([]*Node, len(r.Nodes)) + i := 0 + for _, n := range r.Nodes { + metadata := make(map[string]string, len(n.Metadata)) + for k, v := range n.Metadata { + metadata[k] = v + } + + nodes[i] = &Node{ + Id: n.Id, + Address: n.Address, + Metadata: metadata, + } + i++ + } + + return &Service{ + Name: r.Name, + Version: r.Version, + Metadata: metadata, + Endpoints: endpoints, + Nodes: nodes, + } +} diff --git a/register/memory_test.go b/register/memory_test.go new file mode 100644 index 00000000..e04e9478 --- /dev/null +++ b/register/memory_test.go @@ -0,0 +1,313 @@ +package register + +import ( + "context" + "fmt" + "os" + "testing" + "time" +) + +var ( + testData = map[string][]*Service{ + "foo": { + { + Name: "foo", + Version: "1.0.0", + Nodes: []*Node{ + { + Id: "foo-1.0.0-123", + Address: "localhost:9999", + }, + { + Id: "foo-1.0.0-321", + Address: "localhost:9999", + }, + }, + }, + { + Name: "foo", + Version: "1.0.1", + Nodes: []*Node{ + { + Id: "foo-1.0.1-321", + Address: "localhost:6666", + }, + }, + }, + { + Name: "foo", + Version: "1.0.3", + Nodes: []*Node{ + { + Id: "foo-1.0.3-345", + Address: "localhost:8888", + }, + }, + }, + }, + "bar": { + { + Name: "bar", + Version: "default", + Nodes: []*Node{ + { + Id: "bar-1.0.0-123", + Address: "localhost:9999", + }, + { + Id: "bar-1.0.0-321", + Address: "localhost:9999", + }, + }, + }, + { + Name: "bar", + Version: "latest", + Nodes: []*Node{ + { + Id: "bar-1.0.1-321", + Address: "localhost:6666", + }, + }, + }, + }, + } +) + +func TestMemoryRegistry(t *testing.T) { + ctx := context.TODO() + m := NewRegister() + + fn := func(k string, v []*Service) { + services, err := m.LookupService(ctx, k) + if err != nil { + t.Errorf("Unexpected error getting service %s: %v", k, err) + } + + if len(services) != len(v) { + t.Errorf("Expected %d services for %s, got %d", len(v), k, len(services)) + } + + for _, service := range v { + var seen bool + for _, s := range services { + if s.Version == service.Version { + seen = true + break + } + } + if !seen { + t.Errorf("expected to find version %s", service.Version) + } + } + } + + // register data + for _, v := range testData { + serviceCount := 0 + for _, service := range v { + if err := m.Register(ctx, service); err != nil { + t.Errorf("Unexpected register error: %v", err) + } + serviceCount++ + // after the service has been registered we should be able to query it + services, err := m.LookupService(ctx, service.Name) + if err != nil { + t.Errorf("Unexpected error getting service %s: %v", service.Name, err) + } + if len(services) != serviceCount { + t.Errorf("Expected %d services for %s, got %d", serviceCount, service.Name, len(services)) + } + } + } + + // using test data + for k, v := range testData { + fn(k, v) + } + + services, err := m.ListServices(ctx) + if err != nil { + t.Errorf("Unexpected error when listing services: %v", err) + } + + totalServiceCount := 0 + for _, testSvc := range testData { + for range testSvc { + totalServiceCount++ + } + } + + if len(services) != totalServiceCount { + t.Errorf("Expected total service count: %d, got: %d", totalServiceCount, len(services)) + } + + // deregister + for _, v := range testData { + for _, service := range v { + if err := m.Deregister(ctx, service); err != nil { + t.Errorf("Unexpected deregister error: %v", err) + } + } + } + + // after all the service nodes have been deregistered we should not get any results + for _, v := range testData { + for _, service := range v { + services, err := m.LookupService(ctx, service.Name) + if err != ErrNotFound { + t.Errorf("Expected error: %v, got: %v", ErrNotFound, err) + } + if len(services) != 0 { + t.Errorf("Expected %d services for %s, got %d", 0, service.Name, len(services)) + } + } + } +} + +func TestMemoryRegistryTTL(t *testing.T) { + m := NewRegister() + ctx := context.TODO() + + for _, v := range testData { + for _, service := range v { + if err := m.Register(ctx, service, RegisterTTL(time.Millisecond)); err != nil { + t.Fatal(err) + } + } + } + + time.Sleep(ttlPruneTime * 2) + + for name := range testData { + svcs, err := m.LookupService(ctx, name) + if err != nil { + t.Fatal(err) + } + + for _, svc := range svcs { + if len(svc.Nodes) > 0 { + t.Fatalf("Service %q still has nodes registered", name) + } + } + } +} + +func TestMemoryRegistryTTLConcurrent(t *testing.T) { + concurrency := 1000 + waitTime := ttlPruneTime * 2 + m := NewRegister() + ctx := context.TODO() + for _, v := range testData { + for _, service := range v { + if err := m.Register(ctx, service, RegisterTTL(waitTime/2)); err != nil { + t.Fatal(err) + } + } + } + + if len(os.Getenv("IN_TRAVIS_CI")) == 0 { + t.Logf("test will wait %v, then check TTL timeouts", waitTime) + } + + errChan := make(chan error, concurrency) + syncChan := make(chan struct{}) + + for i := 0; i < concurrency; i++ { + go func() { + <-syncChan + for name := range testData { + svcs, err := m.LookupService(ctx, name) + if err != nil { + errChan <- err + return + } + + for _, svc := range svcs { + if len(svc.Nodes) > 0 { + errChan <- fmt.Errorf("Service %q still has nodes registered", name) + return + } + } + } + + errChan <- nil + }() + } + + time.Sleep(waitTime) + close(syncChan) + + for i := 0; i < concurrency; i++ { + if err := <-errChan; err != nil { + t.Fatal(err) + } + } +} + +func TestMemoryWildcard(t *testing.T) { + m := NewRegister() + ctx := context.TODO() + + testSrv := &Service{Name: "foo", Version: "1.0.0"} + + if err := m.Register(ctx, testSrv, RegisterDomain("one")); err != nil { + t.Fatalf("Register err: %v", err) + } + if err := m.Register(ctx, testSrv, RegisterDomain("two")); err != nil { + t.Fatalf("Register err: %v", err) + } + + if recs, err := m.ListServices(ctx, ListDomain("one")); err != nil { + t.Errorf("List err: %v", err) + } else if len(recs) != 1 { + t.Errorf("Expected 1 record, got %v", len(recs)) + } + + if recs, err := m.ListServices(ctx, ListDomain("*")); err != nil { + t.Errorf("List err: %v", err) + } else if len(recs) != 2 { + t.Errorf("Expected 2 records, got %v", len(recs)) + } + + if recs, err := m.LookupService(ctx, testSrv.Name, LookupDomain("one")); err != nil { + t.Errorf("Lookup err: %v", err) + } else if len(recs) != 1 { + t.Errorf("Expected 1 record, got %v", len(recs)) + } + + if recs, err := m.LookupService(ctx, testSrv.Name, LookupDomain("*")); err != nil { + t.Errorf("Lookup err: %v", err) + } else if len(recs) != 2 { + t.Errorf("Expected 2 records, got %v", len(recs)) + } +} + +func TestWatcher(t *testing.T) { + w := &watcher{ + id: "test", + res: make(chan *Result), + exit: make(chan bool), + wo: WatchOptions{ + Domain: WildcardDomain, + }, + } + + go func() { + w.res <- &Result{ + Service: &Service{Name: "foo"}, + } + }() + + _, err := w.Next() + if err != nil { + t.Fatal("unexpected err", err) + } + + w.Stop() + + if _, err := w.Next(); err == nil { + t.Fatal("expected error on Next()") + } +} diff --git a/register/noop.go b/register/noop.go deleted file mode 100644 index d3433cab..00000000 --- a/register/noop.go +++ /dev/null @@ -1,85 +0,0 @@ -package register - -import ( - "context" -) - -type noopRegister struct { - opts Options -} - -func (n *noopRegister) Name() string { - return n.opts.Name -} - -// Init initialize register -func (n *noopRegister) Init(opts ...Option) error { - for _, o := range opts { - o(&n.opts) - } - return nil -} - -// Options returns options struct -func (n *noopRegister) Options() Options { - return n.opts -} - -// Connect opens connection to register -func (n *noopRegister) Connect(ctx context.Context) error { - return nil -} - -// Disconnect close connection to register -func (n *noopRegister) Disconnect(ctx context.Context) error { - return nil -} - -// Register registers service -func (n *noopRegister) Register(ctx context.Context, svc *Service, opts ...RegisterOption) error { - return nil -} - -// Deregister deregisters service -func (n *noopRegister) Deregister(ctx context.Context, svc *Service, opts ...DeregisterOption) error { - return nil -} - -// LookupService returns servive info -func (n *noopRegister) LookupService(ctx context.Context, name string, opts ...LookupOption) ([]*Service, error) { - return []*Service{}, nil -} - -// ListServices listing services -func (n *noopRegister) ListServices(ctx context.Context, opts ...ListOption) ([]*Service, error) { - return []*Service{}, nil -} - -// Watch is used to watch for service changes -func (n *noopRegister) Watch(ctx context.Context, opts ...WatchOption) (Watcher, error) { - return &noopWatcher{done: make(chan struct{}), opts: NewWatchOptions(opts...)}, nil -} - -// String returns register string representation -func (n *noopRegister) String() string { - return "noop" -} - -type noopWatcher struct { - opts WatchOptions - done chan struct{} -} - -func (n *noopWatcher) Next() (*Result, error) { - <-n.done - return nil, ErrWatcherStopped -} - -func (n *noopWatcher) Stop() { - close(n.done) -} - -// NewRegister returns a new noop register -func NewRegister(opts ...Option) Register { - return &noopRegister{opts: NewOptions(opts...)} -} diff --git a/server/noop.go b/server/noop.go index 9e651dbe..41c70238 100644 --- a/server/noop.go +++ b/server/noop.go @@ -46,7 +46,17 @@ type noopServer struct { // NewServer returns new noop server func NewServer(opts ...Option) Server { - return &noopServer{opts: NewOptions(opts...)} + n := &noopServer{opts: NewOptions(opts...)} + if n.handlers == nil { + n.handlers = make(map[string]Handler) + } + if n.subscribers == nil { + n.subscribers = make(map[*subscriber][]broker.Subscriber) + } + if n.exit == nil { + n.exit = make(chan chan error) + } + return n } func (n *noopServer) newCodec(contentType string) (codec.Codec, error) { diff --git a/store/memory.go b/store/memory.go new file mode 100644 index 00000000..294b64e3 --- /dev/null +++ b/store/memory.go @@ -0,0 +1,199 @@ +package store + +import ( + "context" + "path/filepath" + "sort" + "strings" + "time" + + "github.com/patrickmn/go-cache" +) + +// NewStore returns a memory store +func NewStore(opts ...Option) Store { + return &memoryStore{ + opts: NewOptions(opts...), + store: cache.New(cache.NoExpiration, 5*time.Minute), + } +} + +func (m *memoryStore) Connect(ctx context.Context) error { + return nil +} + +func (m *memoryStore) Disconnect(ctx context.Context) error { + m.store.Flush() + return nil +} + +type memoryStore struct { + opts Options + store *cache.Cache +} + +func (m *memoryStore) key(prefix, key string) string { + return filepath.Join(prefix, key) +} + +func (m *memoryStore) prefix(database, table string) string { + if len(database) == 0 { + database = m.opts.Database + } + if len(table) == 0 { + table = m.opts.Table + } + return filepath.Join(database, table) +} + +func (m *memoryStore) exists(prefix, key string) error { + key = m.key(prefix, key) + + _, found := m.store.Get(key) + if !found { + return ErrNotFound + } + + return nil +} + +func (m *memoryStore) get(prefix, key string, val interface{}) error { + key = m.key(prefix, key) + + r, found := m.store.Get(key) + if !found { + return ErrNotFound + } + + buf, ok := r.([]byte) + if !ok { + return ErrNotFound + } + + if err := m.opts.Codec.Unmarshal(buf, val); err != nil { + return err + } + + return nil +} + +func (m *memoryStore) delete(prefix, key string) { + key = m.key(prefix, key) + m.store.Delete(key) +} + +func (m *memoryStore) list(prefix string, limit, offset uint) []string { + allItems := m.store.Items() + allKeys := make([]string, len(allItems)) + i := 0 + + for k := range allItems { + if !strings.HasPrefix(k, prefix+"/") { + continue + } + allKeys[i] = strings.TrimPrefix(k, prefix+"/") + i++ + } + + if limit != 0 || offset != 0 { + sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] }) + sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] }) + end := len(allKeys) + if limit > 0 { + calcLimit := int(offset + limit) + if calcLimit < end { + end = calcLimit + } + } + + if int(offset) >= end { + return nil + } + return allKeys[offset:end] + } + + return allKeys +} + +func (m *memoryStore) Init(opts ...Option) error { + for _, o := range opts { + o(&m.opts) + } + return nil +} + +func (m *memoryStore) String() string { + return "memory" +} + +func (m *memoryStore) Name() string { + return m.opts.Name +} + +func (m *memoryStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error { + prefix := m.prefix(m.opts.Database, m.opts.Table) + return m.exists(prefix, key) +} + +func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error { + readOpts := NewReadOptions(opts...) + prefix := m.prefix(readOpts.Database, readOpts.Table) + return m.get(prefix, key, val) +} + +func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error { + writeOpts := NewWriteOptions(opts...) + + prefix := m.prefix(writeOpts.Database, writeOpts.Table) + + key = m.key(prefix, key) + + buf, err := m.opts.Codec.Marshal(val) + if err != nil { + return err + } + + m.store.Set(key, buf, writeOpts.TTL) + return nil +} + +func (m *memoryStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error { + deleteOptions := NewDeleteOptions(opts...) + + prefix := m.prefix(deleteOptions.Database, deleteOptions.Table) + m.delete(prefix, key) + return nil +} + +func (m *memoryStore) Options() Options { + return m.opts +} + +func (m *memoryStore) List(ctx context.Context, opts ...ListOption) ([]string, error) { + listOptions := NewListOptions(opts...) + + prefix := m.prefix(listOptions.Database, listOptions.Table) + keys := m.list(prefix, listOptions.Limit, listOptions.Offset) + + if len(listOptions.Prefix) > 0 { + var prefixKeys []string + for _, k := range keys { + if strings.HasPrefix(k, listOptions.Prefix) { + prefixKeys = append(prefixKeys, k) + } + } + keys = prefixKeys + } + + if len(listOptions.Suffix) > 0 { + var suffixKeys []string + for _, k := range keys { + if strings.HasSuffix(k, listOptions.Suffix) { + suffixKeys = append(suffixKeys, k) + } + } + keys = suffixKeys + } + + return keys, nil +} diff --git a/store/memory_test.go b/store/memory_test.go new file mode 100644 index 00000000..cc1fb3c6 --- /dev/null +++ b/store/memory_test.go @@ -0,0 +1,67 @@ +package store_test + +import ( + "context" + "os" + "testing" + "time" + + "github.com/unistack-org/micro/v3/store" +) + +func TestMemoryReInit(t *testing.T) { + s := store.NewStore(store.Table("aaa")) + s.Init(store.Table("")) + if len(s.Options().Table) > 0 { + t.Error("Init didn't reinitialise the store") + } +} + +func TestMemoryBasic(t *testing.T) { + s := store.NewStore() + s.Init() + basictest(s, t) +} + +func TestMemoryPrefix(t *testing.T) { + s := store.NewStore() + s.Init(store.Table("some-prefix")) + basictest(s, t) +} + +func TestMemoryNamespace(t *testing.T) { + s := store.NewStore() + s.Init(store.Database("some-namespace")) + basictest(s, t) +} + +func TestMemoryNamespacePrefix(t *testing.T) { + s := store.NewStore() + s.Init(store.Table("some-prefix"), store.Database("some-namespace")) + basictest(s, t) +} + +func basictest(s store.Store, t *testing.T) { + ctx := context.Background() + if len(os.Getenv("IN_TRAVIS_CI")) == 0 { + t.Logf("Testing store %s, with options %#+v\n", s.String(), s.Options()) + } + // Read and Write an expiring Record + if err := s.Write(ctx, "Hello", "World", store.WriteTTL(time.Millisecond*100)); err != nil { + t.Error(err) + } + var val []byte + if err := s.Read(ctx, "Hello", &val); err != nil { + t.Error(err) + } else { + if string(val) != "World" { + t.Errorf("Expected %s, got %s", "World", val) + } + } + time.Sleep(time.Millisecond * 200) + if err := s.Read(ctx, "Hello", &val); err != store.ErrNotFound { + t.Errorf("Expected %# v, got %# v", store.ErrNotFound, err) + } + + s.Disconnect(ctx) // reset the store +} diff --git a/store/noop.go b/store/noop.go deleted file mode 100644 index 2313c10a..00000000 --- a/store/noop.go +++ /dev/null @@ -1,69 +0,0 @@ -package store - -import "context" - -type noopStore struct { - opts Options -} - -func NewStore(opts ...Option) Store { - return &noopStore{opts: NewOptions(opts...)} -} - -// Init initialize store -func (n *noopStore) Init(opts ...Option) error { - for _, o := range opts { - o(&n.opts) - } - return nil -} - -// Options returns Options struct -func (n *noopStore) Options() Options { - return n.opts -} - -// Name -func (n *noopStore) Name() string { - return n.opts.Name -} - -// String returns string representation -func (n *noopStore) String() string { - return "noop" -} - -// Read reads store value by key -func (n *noopStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error { - return ErrNotFound -} - -// Read reads store value by key -func (n *noopStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error { - return ErrNotFound -} - -// Write writes store record -func (n *noopStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error { - return nil -} - -// Delete removes store value by key -func (n *noopStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error { - return nil -} - -// List lists store -func (n *noopStore) List(ctx context.Context, opts ...ListOption) ([]string, error) { - return []string{}, nil -} - -// Connect connects to store -func (n *noopStore) Connect(ctx context.Context) error { - return nil -} - -// Disconnect disconnects from store -func (n *noopStore) Disconnect(ctx context.Context) error { - return nil -} diff --git a/sync/memory.go b/sync/memory.go new file mode 100644 index 00000000..e510bcc8 --- /dev/null +++ b/sync/memory.go @@ -0,0 +1,199 @@ +package sync + +import ( + gosync "sync" + "time" +) + +type memorySync struct { + options Options + + mtx gosync.RWMutex + locks map[string]*memoryLock +} + +type memoryLock struct { + id string + time time.Time + ttl time.Duration + release chan bool +} + +type memoryLeader struct { + opts LeaderOptions + id string + resign func(id string) error + status chan bool +} + +func (m *memoryLeader) Resign() error { + return m.resign(m.id) +} + +func (m *memoryLeader) Status() chan bool { + return m.status +} + +func (m *memorySync) Leader(id string, opts ...LeaderOption) (Leader, error) { + var once gosync.Once + var options LeaderOptions + for _, o := range opts { + o(&options) + } + + // acquire a lock for the id + if err := m.Lock(id); err != nil { + return nil, err + } + + // return the leader + return &memoryLeader{ + opts: options, + id: id, + resign: func(id string) error { + once.Do(func() { + m.Unlock(id) + }) + return nil + }, + // TODO: signal when Unlock is called + status: make(chan bool, 1), + }, nil +} + +func (m *memorySync) Init(opts ...Option) error { + for _, o := range opts { + o(&m.options) + } + return nil +} + +func (m *memorySync) Options() Options { + return m.options +} + +func (m *memorySync) Lock(id string, opts ...LockOption) error { + // lock our access + m.mtx.Lock() + + var options LockOptions + for _, o := range opts { + o(&options) + } + + lk, ok := m.locks[id] + if !ok { + m.locks[id] = &memoryLock{ + id: id, + time: time.Now(), + ttl: options.TTL, + release: make(chan bool), + } + // unlock + m.mtx.Unlock() + return nil + } + + m.mtx.Unlock() + + // set wait time + var wait <-chan time.Time + var ttl <-chan time.Time + + // decide if we should wait + if options.Wait > time.Duration(0) { + wait = time.After(options.Wait) + } + + // check the ttl of the lock + if lk.ttl > time.Duration(0) { + // time lived for the lock + live := time.Since(lk.time) + + // set a timer for the leftover ttl + if live > lk.ttl { + // release the lock if it expired + _ = m.Unlock(id) + } else { + ttl = time.After(live) + } + } + +lockLoop: + for { + // wait for the lock to be released + select { + case <-lk.release: + m.mtx.Lock() + + // someone locked before us + lk, ok = m.locks[id] + if ok { + m.mtx.Unlock() + continue + } + + // got chance to lock + m.locks[id] = &memoryLock{ + id: id, + time: time.Now(), + ttl: options.TTL, + release: make(chan bool), + } + + m.mtx.Unlock() + + break lockLoop + case <-ttl: + // ttl exceeded + _ = m.Unlock(id) + // TODO: check the ttl again above + ttl = nil + // try acquire + continue + case <-wait: + return ErrLockTimeout + } + } + + return nil +} + +func (m *memorySync) Unlock(id string) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + lk, ok := m.locks[id] + // no lock exists + if !ok { + return nil + } + + // delete the lock + delete(m.locks, id) + + select { + case <-lk.release: + return nil + default: + close(lk.release) + } + + return nil +} + +func (m *memorySync) String() string { + return "memory" +} + +func NewSync(opts ...Option) Sync { + var options Options + for _, o := range opts { + o(&options) + } + + return &memorySync{ + options: options, + locks: make(map[string]*memoryLock), + } +} diff --git a/tracer/memory.go b/tracer/memory.go new file mode 100644 index 00000000..266aa206 --- /dev/null +++ b/tracer/memory.go @@ -0,0 +1,98 @@ +package tracer + +import ( + "context" + "time" + + "github.com/google/uuid" + "github.com/unistack-org/micro/v3/util/ring" +) + +type tracer struct { + opts Options + // ring buffer of traces + buffer *ring.Buffer +} + +func (t *tracer) Read(opts ...ReadOption) ([]*Span, error) { + var options ReadOptions + for _, o := range opts { + o(&options) + } + + sp := t.buffer.Get(t.buffer.Size()) + + spans := make([]*Span, 0, len(sp)) + + for _, span := range sp { + val := span.Value.(*Span) + // skip if trace id is specified and doesn't match + if len(options.Trace) > 0 && val.Trace != options.Trace { + continue + } + spans = append(spans, val) + } + + return spans, nil +} + +func (t *tracer) Start(ctx context.Context, name string) (context.Context, *Span) { + span := &Span{ + Name: name, + Trace: uuid.New().String(), + Id: uuid.New().String(), + Started: time.Now(), + Metadata: make(map[string]string), + } + + // return span if no context + if ctx == nil { + return NewContext(context.Background(), span.Trace, span.Id), span + } + traceID, parentSpanID, ok := FromContext(ctx) + // If the trace can not be found in the header, + // that means this is where the trace is created. + if !ok { + return NewContext(ctx, span.Trace, span.Id), span + } + + // set trace id + span.Trace = traceID + // set parent + span.Parent = parentSpanID + + // return the span + return NewContext(ctx, span.Trace, span.Id), span +} + +func (t *tracer) Finish(s *Span) error { + // set finished time + s.Duration = time.Since(s.Started) + // save the span + t.buffer.Put(s) + + return nil +} + +func (t *tracer) Init(opts ...Option) error { + for _, o := range opts { + o(&t.opts) + } + return nil +} + +func (t *tracer) Lookup(ctx context.Context) (*Span, error) { + return nil, nil +} + +func (t *tracer) Name() string { + return t.opts.Name +} + +func NewTracer(opts ...Option) Tracer { + return &tracer{ + opts: NewOptions(opts...), + // the last 256 requests + buffer: ring.New(256), + } +} diff --git a/tracer/noop.go b/tracer/noop.go deleted file mode 100644 index 935cbdb2..00000000 --- a/tracer/noop.go +++ /dev/null @@ -1,44 +0,0 @@ -package tracer - -import "context" - -type noopTracer struct { - opts Options -} - -func (n *noopTracer) Name() string { - return n.opts.Name -} - -// Init initilize tracer -func (n *noopTracer) Init(opts ...Option) error { - for _, o := range opts { - o(&n.opts) - } - return nil -} - -// Start starts new span -func (n *noopTracer) Start(ctx context.Context, name string) (context.Context, *Span) { - return nil, nil -} - -// Lookup get span from context -func (n *noopTracer) Lookup(ctx context.Context) (*Span, error) { - return nil, nil -} - -// Finish finishes span -func (n *noopTracer) Finish(*Span) error { - return nil -} - -// Read reads span -func (n *noopTracer) Read(...ReadOption) ([]*Span, error) { - return nil, nil -} - -// NewTracer returns new noop tracer -func NewTracer(opts ...Option) Tracer { - return &noopTracer{opts: NewOptions(opts...)} -}