move implementations to external repos (#17)

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2020-08-25 13:44:41 +03:00
committed by GitHub
parent c4a303190a
commit 0f4b1435d9
238 changed files with 151 additions and 37364 deletions

View File

@@ -1,181 +0,0 @@
package memory
import (
"encoding/json"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/unistack-org/micro/v3/events"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/store"
"github.com/unistack-org/micro/v3/store/memory"
)
// NewStream returns an initialized memory stream
func NewStream(opts ...Option) (events.Stream, error) {
// parse the options
var options Options
for _, o := range opts {
o(&options)
}
if options.Store == nil {
options.Store = memory.NewStore()
}
return &mem{store: options.Store}, nil
}
type subscriber struct {
Queue string
Topic string
Channel chan events.Event
}
type mem struct {
store store.Store
subs []*subscriber
sync.RWMutex
}
func (m *mem) Publish(topic string, msg interface{}, opts ...events.PublishOption) error {
// validate the topic
if len(topic) == 0 {
return events.ErrMissingTopic
}
// parse the options
options := events.PublishOptions{
Timestamp: time.Now(),
}
for _, o := range opts {
o(&options)
}
// encode the message if it's not already encoded
var payload []byte
if p, ok := msg.([]byte); ok {
payload = p
} else {
p, err := json.Marshal(msg)
if err != nil {
return events.ErrEncodingMessage
}
payload = p
}
// construct the event
event := &events.Event{
ID: uuid.New().String(),
Topic: topic,
Timestamp: options.Timestamp,
Metadata: options.Metadata,
Payload: payload,
}
// serialize the event to bytes
bytes, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "Error encoding event")
}
// write to the store
key := fmt.Sprintf("%v/%v", event.Topic, event.ID)
if err := m.store.Write(&store.Record{Key: key, Value: bytes}); err != nil {
return errors.Wrap(err, "Error writing event to store")
}
// send to the subscribers async
go m.handleEvent(event)
return nil
}
func (m *mem) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan events.Event, error) {
// validate the topic
if len(topic) == 0 {
return nil, events.ErrMissingTopic
}
// parse the options
options := events.SubscribeOptions{
Queue: uuid.New().String(),
}
for _, o := range opts {
o(&options)
}
// setup the subscriber
sub := &subscriber{
Channel: make(chan events.Event),
Topic: topic,
Queue: options.Queue,
}
// register the subscriber
m.Lock()
m.subs = append(m.subs, sub)
m.Unlock()
// lookup previous events if the start time option was passed
if options.StartAtTime.Unix() > 0 {
go m.lookupPreviousEvents(sub, options.StartAtTime)
}
// return the channel
return sub.Channel, nil
}
// lookupPreviousEvents finds events for a subscriber which occured before a given time and sends
// them into the subscribers channel
func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) {
// lookup all events which match the topic (a blank topic will return all results)
recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix())
if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Error looking up previous events: %v", err)
return
} else if err != nil {
return
}
// loop through the records and send it to the channel if it matches
for _, r := range recs {
var ev events.Event
if err := json.Unmarshal(r.Value, &ev); err != nil {
continue
}
if ev.Timestamp.Unix() < startTime.Unix() {
continue
}
sub.Channel <- ev
}
}
// handleEvents sends the event to any registered subscribers.
func (m *mem) handleEvent(ev *events.Event) {
m.RLock()
subs := m.subs
m.RUnlock()
// filteredSubs is a KV map of the queue name and subscribers. This is used to prevent a message
// being sent to two subscribers with the same queue.
filteredSubs := map[string]*subscriber{}
// filter down to subscribers who are interested in this topic
for _, sub := range subs {
if len(sub.Topic) == 0 || sub.Topic == ev.Topic {
filteredSubs[sub.Queue] = sub
}
}
// send the message to each channel async (since one channel might be blocked)
for _, sub := range subs {
go func(s *subscriber) {
s.Channel <- *ev
}(sub)
}
}

View File

@@ -1,135 +0,0 @@
package memory
import (
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/unistack-org/micro/v3/events"
)
type testPayload struct {
Message string
}
func TestStream(t *testing.T) {
stream, err := NewStream()
assert.Nilf(t, err, "NewStream should not return an error")
assert.NotNilf(t, stream, "NewStream should return a stream object")
// TestMissingTopic will test the topic validation on publish
t.Run("TestMissingTopic", func(t *testing.T) {
err := stream.Publish("", nil)
assert.Equalf(t, err, events.ErrMissingTopic, "Publishing to a blank topic should return an error")
})
// TestSubscribeTopic will publish a message to the test topic. The subscriber will subscribe to the
// same test topic.
t.Run("TestSubscribeTopic", func(t *testing.T) {
payload := &testPayload{Message: "HelloWorld"}
metadata := map[string]string{"foo": "bar"}
// create the subscriber
evChan, err := stream.Subscribe("test")
assert.Nilf(t, err, "Subscribe should not return an error")
// setup the subscriber async
var wg sync.WaitGroup
go func() {
timeout := time.NewTimer(time.Millisecond * 250)
select {
case event, _ := <-evChan:
assert.NotNilf(t, event, "The message was nil")
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
var result testPayload
err = event.Unmarshal(&result)
assert.Nil(t, err, "Error decoding result")
assert.Equal(t, result, *payload, "Payload didn't match")
wg.Done()
case <-timeout.C:
t.Fatalf("Event was not recieved")
}
}()
err = stream.Publish("test", payload, events.WithMetadata(metadata))
assert.Nil(t, err, "Publishing a valid message should not return an error")
wg.Add(1)
// wait for the subscriber to recieve the message or timeout
wg.Wait()
})
// TestSubscribeQueue will publish a message to a random topic. Two subscribers will then consume
// the message from the firehose topic with different queues. The second subscriber will be registered
// after the message is published to test durability.
t.Run("TestSubscribeQueue", func(t *testing.T) {
topic := uuid.New().String()
payload := &testPayload{Message: "HelloWorld"}
metadata := map[string]string{"foo": "bar"}
// create the first subscriber
evChan1, err := stream.Subscribe(topic)
assert.Nilf(t, err, "Subscribe should not return an error")
// setup the subscriber async
var wg sync.WaitGroup
go func() {
timeout := time.NewTimer(time.Millisecond * 250)
select {
case event, _ := <-evChan1:
assert.NotNilf(t, event, "The message was nil")
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
var result testPayload
err = event.Unmarshal(&result)
assert.Nil(t, err, "Error decoding result")
assert.Equal(t, result, *payload, "Payload didn't match")
wg.Done()
case <-timeout.C:
t.Fatalf("Event was not recieved")
}
}()
err = stream.Publish(topic, payload, events.WithMetadata(metadata))
assert.Nil(t, err, "Publishing a valid message should not return an error")
wg.Add(2)
// create the second subscriber
evChan2, err := stream.Subscribe(topic,
events.WithQueue("second_queue"),
events.WithStartAtTime(time.Now().Add(time.Minute*-1)),
)
assert.Nilf(t, err, "Subscribe should not return an error")
go func() {
timeout := time.NewTimer(time.Millisecond * 250)
select {
case event, _ := <-evChan2:
assert.NotNilf(t, event, "The message was nil")
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
var result testPayload
err = event.Unmarshal(&result)
assert.Nil(t, err, "Error decoding result")
assert.Equal(t, result, *payload, "Payload didn't match")
wg.Done()
case <-timeout.C:
t.Fatalf("Event was not recieved")
}
}()
// wait for the subscriber to recieve the message or timeout
wg.Wait()
})
}

View File

@@ -1,18 +0,0 @@
package memory
import "github.com/unistack-org/micro/v3/store"
// Options which are used to configure the in-memory stream
type Options struct {
Store store.Store
}
// Option is a function which configures options
type Option func(o *Options)
// Store sets the store to use
func Store(s store.Store) Option {
return func(o *Options) {
o.Store = s
}
}

View File

@@ -1,162 +0,0 @@
package nats
import (
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
stan "github.com/nats-io/stan.go"
"github.com/pkg/errors"
"github.com/unistack-org/micro/v3/events"
"github.com/unistack-org/micro/v3/logger"
)
const (
defaultClusterID = "micro"
)
// NewStream returns an initialized nats stream or an error if the connection to the nats
// server could not be established
func NewStream(opts ...Option) (events.Stream, error) {
// parse the options
options := Options{
ClientID: uuid.New().String(),
ClusterID: defaultClusterID,
}
for _, o := range opts {
o(&options)
}
// connect to nats
nopts := nats.GetDefaultOptions()
if options.TLSConfig != nil {
nopts.Secure = true
nopts.TLSConfig = options.TLSConfig
}
if len(options.Address) > 0 {
nopts.Servers = []string{options.Address}
}
conn, err := nopts.Connect()
if err != nil {
return nil, fmt.Errorf("Error connecting to nats at %v with tls enabled (%v): %v", options.Address, nopts.TLSConfig != nil, err)
}
// connect to the cluster
clusterConn, err := stan.Connect(options.ClusterID, options.ClientID, stan.NatsConn(conn))
if err != nil {
return nil, fmt.Errorf("Error connecting to nats cluster %v: %v", options.ClusterID, err)
}
return &stream{clusterConn}, nil
}
type stream struct {
conn stan.Conn
}
// Publish a message to a topic
func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOption) error {
// validate the topic
if len(topic) == 0 {
return events.ErrMissingTopic
}
// parse the options
options := events.PublishOptions{
Timestamp: time.Now(),
}
for _, o := range opts {
o(&options)
}
// encode the message if it's not already encoded
var payload []byte
if p, ok := msg.([]byte); ok {
payload = p
} else {
p, err := json.Marshal(msg)
if err != nil {
return events.ErrEncodingMessage
}
payload = p
}
// construct the event
event := &events.Event{
ID: uuid.New().String(),
Topic: topic,
Timestamp: options.Timestamp,
Metadata: options.Metadata,
Payload: payload,
}
// serialize the event to bytes
bytes, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "Error encoding event")
}
// publish the event to the topic's channel
if _, err := s.conn.PublishAsync(event.Topic, bytes, nil); err != nil {
return errors.Wrap(err, "Error publishing message to topic")
}
return nil
}
// Subscribe to a topic
func (s *stream) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan events.Event, error) {
// validate the topic
if len(topic) == 0 {
return nil, events.ErrMissingTopic
}
// parse the options
options := events.SubscribeOptions{
Queue: uuid.New().String(),
}
for _, o := range opts {
o(&options)
}
// setup the subscriber
c := make(chan events.Event)
handleMsg := func(m *stan.Msg) {
// decode the message
var evt events.Event
if err := json.Unmarshal(m.Data, &evt); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Error decoding message: %v", err)
}
// not ackknowledging the message is the way to indicate an error occured
return
}
// push onto the channel and wait for the consumer to take the event off before we acknowledge it.
c <- evt
if err := m.Ack(); err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Error acknowledging message: %v", err)
}
}
// setup the options
subOpts := []stan.SubscriptionOption{
stan.DurableName(topic),
stan.SetManualAckMode(),
}
if options.StartAtTime.Unix() > 0 {
stan.StartAtTime(options.StartAtTime)
}
// connect the subscriber
_, err := s.conn.QueueSubscribe(topic, options.Queue, handleMsg, subOpts...)
if err != nil {
return nil, errors.Wrap(err, "Error subscribing to topic")
}
return c, nil
}

View File

@@ -1,151 +0,0 @@
package nats
import (
"net"
"os/exec"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/unistack-org/micro/v3/events"
)
type testPayload struct {
Message string
}
func TestStream(t *testing.T) {
t.Skip()
_, err := exec.LookPath("nats-streaming-server")
if err != nil {
t.Skipf("Skipping nats test, nats-streaming-server binary is not detected")
}
conn, err := net.DialTimeout("tcp", ":4222", time.Millisecond*100)
if err != nil {
t.Skipf("Skipping nats test, could not connect to cluster on port 4222: %v", err)
}
if err := conn.Close(); err != nil {
t.Fatalf("Error closing test tcp connection to nats cluster")
}
stream, err := NewStream(ClusterID("test-cluster"))
assert.Nilf(t, err, "NewStream should not return an error")
assert.NotNilf(t, stream, "NewStream should return a stream object")
// TestMissingTopic will test the topic validation on publish
t.Run("TestMissingTopic", func(t *testing.T) {
err := stream.Publish("", nil)
assert.Equalf(t, err, events.ErrMissingTopic, "Publishing to a blank topic should return an error")
})
// TestSubscribeTopic will publish a message to the test topic. The subscriber will subscribe to the
// same test topic.
t.Run("TestSubscribeTopic", func(t *testing.T) {
payload := &testPayload{Message: "HelloWorld"}
metadata := map[string]string{"foo": "bar"}
// create the subscriber
evChan, err := stream.Subscribe("test")
assert.Nilf(t, err, "Subscribe should not return an error")
// setup the subscriber async
var wg sync.WaitGroup
go func() {
timeout := time.NewTimer(time.Millisecond * 250)
select {
case event, _ := <-evChan:
assert.NotNilf(t, event, "The message was nil")
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
var result testPayload
err = event.Unmarshal(&result)
assert.Nil(t, err, "Error decoding result")
assert.Equal(t, result, *payload, "Payload didn't match")
wg.Done()
case <-timeout.C:
t.Fatalf("Event was not recieved")
}
}()
err = stream.Publish("test", payload, events.WithMetadata(metadata))
assert.Nil(t, err, "Publishing a valid message should not return an error")
wg.Add(1)
// wait for the subscriber to recieve the message or timeout
wg.Wait()
})
// TestSubscribeQueue will publish a message to a random topic. Two subscribers will then consume
// the message from the firehose topic with different queues. The second subscriber will be registered
// after the message is published to test durability.
t.Run("TestSubscribeQueue", func(t *testing.T) {
topic := uuid.New().String()
payload := &testPayload{Message: "HelloWorld"}
metadata := map[string]string{"foo": "bar"}
// create the first subscriber
evChan1, err := stream.Subscribe(topic)
assert.Nilf(t, err, "Subscribe should not return an error")
// setup the subscriber async
var wg sync.WaitGroup
go func() {
timeout := time.NewTimer(time.Millisecond * 250)
select {
case event, _ := <-evChan1:
assert.NotNilf(t, event, "The message was nil")
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
var result testPayload
err = event.Unmarshal(&result)
assert.Nil(t, err, "Error decoding result")
assert.Equal(t, result, *payload, "Payload didn't match")
wg.Done()
case <-timeout.C:
t.Fatalf("Event was not recieved")
}
}()
err = stream.Publish(topic, payload, events.WithMetadata(metadata))
assert.Nil(t, err, "Publishing a valid message should not return an error")
wg.Add(2)
// create the second subscriber
evChan2, err := stream.Subscribe(topic,
events.WithQueue("second_queue"),
events.WithStartAtTime(time.Now().Add(time.Minute*-1)),
)
assert.Nilf(t, err, "Subscribe should not return an error")
go func() {
timeout := time.NewTimer(time.Millisecond * 250)
select {
case event, _ := <-evChan2:
assert.NotNilf(t, event, "The message was nil")
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
var result testPayload
err = event.Unmarshal(&result)
assert.Nil(t, err, "Error decoding result")
assert.Equal(t, result, *payload, "Payload didn't match")
wg.Done()
case <-timeout.C:
t.Fatalf("Event was not recieved")
}
}()
// wait for the subscriber to recieve the message or timeout
wg.Wait()
})
}

View File

@@ -1,42 +0,0 @@
package nats
import "crypto/tls"
// Options which are used to configure the nats stream
type Options struct {
ClusterID string
ClientID string
Address string
TLSConfig *tls.Config
}
// Option is a function which configures options
type Option func(o *Options)
// ClusterID sets the cluster id for the nats connection
func ClusterID(id string) Option {
return func(o *Options) {
o.ClusterID = id
}
}
// ClientID sets the client id for the nats connection
func ClientID(id string) Option {
return func(o *Options) {
o.ClientID = id
}
}
// Address of the nats cluster
func Address(addr string) Option {
return func(o *Options) {
o.Address = addr
}
}
// TLSConfig to use when connecting to the cluster
func TLSConfig(t *tls.Config) Option {
return func(o *Options) {
o.TLSConfig = t
}
}