diff --git a/events/events.go b/events/events.go new file mode 100644 index 00000000..c9bf4575 --- /dev/null +++ b/events/events.go @@ -0,0 +1,46 @@ +// Package events contains interfaces for managing events within distributed systems +package events + +import ( + "encoding/json" + "errors" + "time" +) + +var ( + // ErrMissingTopic is returned if a blank topic was provided to publish + ErrMissingTopic = errors.New("Missing topic") + // ErrEncodingMessage is returned from publish if there was an error encoding the message option + ErrEncodingMessage = errors.New("Error encoding message") +) + +// Stream of events +type Stream interface { + Publish(topic string, opts ...PublishOption) error + Subscribe(opts ...SubscribeOption) (<-chan Event, error) +} + +// Store of events +type Store interface { + Read(opts ...ReadOption) ([]*Event, error) + Write(event *Event, opts ...WriteOption) error +} + +// Event is the object returned by the broker when you subscribe to a topic +type Event struct { + // ID to uniquely identify the event + ID string + // Topic of event, e.g. "registry.service.created" + Topic string + // Timestamp of the event + Timestamp time.Time + // Metadata contains the encoded event was indexed by + Metadata map[string]string + // Payload contains the encoded message + Payload []byte +} + +// Unmarshal the events message into an object +func (e *Event) Unmarshal(v interface{}) error { + return json.Unmarshal(e.Payload, v) +} diff --git a/events/memory/memory.go b/events/memory/memory.go new file mode 100644 index 00000000..a67aaa4d --- /dev/null +++ b/events/memory/memory.go @@ -0,0 +1,181 @@ +package memory + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/google/uuid" + "github.com/micro/go-micro/v3/events" + "github.com/micro/go-micro/v3/logger" + "github.com/micro/go-micro/v3/store" + "github.com/micro/go-micro/v3/store/memory" + "github.com/pkg/errors" +) + +// NewStream returns an initialized memory stream +func NewStream(opts ...Option) (events.Stream, error) { + // parse the options + var options Options + for _, o := range opts { + o(&options) + } + if options.Store == nil { + options.Store = memory.NewStore() + } + + return &mem{store: options.Store}, nil +} + +type subscriber struct { + Queue string + Topic string + Channel chan events.Event +} + +type mem struct { + store store.Store + + subs []*subscriber + sync.RWMutex +} + +func (m *mem) Publish(topic string, opts ...events.PublishOption) error { + // validate the topic + if len(topic) == 0 { + return events.ErrMissingTopic + } + + // parse the options + options := events.PublishOptions{ + Timestamp: time.Now(), + } + for _, o := range opts { + o(&options) + } + + // encode the message if it's not already encoded + var payload []byte + if p, ok := options.Payload.([]byte); ok { + payload = p + } else { + p, err := json.Marshal(options.Payload) + if err != nil { + return events.ErrEncodingMessage + } + payload = p + } + + // construct the event + event := &events.Event{ + ID: uuid.New().String(), + Topic: topic, + Timestamp: options.Timestamp, + Metadata: options.Metadata, + Payload: payload, + } + + // serialize the event to bytes + bytes, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "Error encoding event") + } + + // write to the store + key := fmt.Sprintf("%v/%v", event.Topic, event.ID) + if err := m.store.Write(&store.Record{Key: key, Value: bytes}); err != nil { + return errors.Wrap(err, "Error writing event to store") + } + + // send to the subscribers async + go m.handleEvent(event) + + return nil +} + +func (m *mem) Subscribe(opts ...events.SubscribeOption) (<-chan events.Event, error) { + // parse the options + options := events.SubscribeOptions{ + Queue: uuid.New().String(), + } + for _, o := range opts { + o(&options) + } + + // setup the subscriber + sub := &subscriber{ + Channel: make(chan events.Event), + Topic: options.Topic, + Queue: options.Queue, + } + + // register the subscriber + m.Lock() + m.subs = append(m.subs, sub) + m.Unlock() + + // lookup previous events if the start time option was passed + if options.StartAtTime.Unix() > 0 { + go m.lookupPreviousEvents(sub, options.StartAtTime) + } + + // return the channel + return sub.Channel, nil +} + +// lookupPreviousEvents finds events for a subscriber which occured before a given time and sends +// them into the subscribers channel +func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) { + var prefix string + if len(sub.Topic) > 0 { + prefix = sub.Topic + "/" + } + + // lookup all events which match the topic (a blank topic will return all results) + recs, err := m.store.Read(prefix, store.ReadPrefix()) + if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error looking up previous events: %v", err) + return + } else if err != nil { + return + } + + // loop through the records and send it to the channel if it matches + for _, r := range recs { + var ev events.Event + if err := json.Unmarshal(r.Value, &ev); err != nil { + continue + } + if ev.Timestamp.Unix() < startTime.Unix() { + continue + } + + sub.Channel <- ev + } +} + +// handleEvents sends the event to any registered subscribers. +func (m *mem) handleEvent(ev *events.Event) { + m.RLock() + subs := m.subs + m.RUnlock() + + // filteredSubs is a KV map of the queue name and subscribers. This is used to prevent a message + // being sent to two subscribers with the same queue. + filteredSubs := map[string]*subscriber{} + + // filter down to subscribers who are interested in this topic + for _, sub := range subs { + if len(sub.Topic) == 0 || sub.Topic == ev.Topic { + filteredSubs[sub.Queue] = sub + } + } + + // send the message to each channel async (since one channel might be blocked) + for _, sub := range subs { + go func(s *subscriber) { + s.Channel <- *ev + }(sub) + } +} diff --git a/events/memory/memory_test.go b/events/memory/memory_test.go new file mode 100644 index 00000000..af6a7da3 --- /dev/null +++ b/events/memory/memory_test.go @@ -0,0 +1,185 @@ +package memory + +import ( + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/micro/go-micro/v3/events" + "github.com/stretchr/testify/assert" +) + +type testPayload struct { + Message string +} + +func TestStream(t *testing.T) { + stream, err := NewStream() + assert.Nilf(t, err, "NewStream should not return an error") + assert.NotNilf(t, stream, "NewStream should return a stream object") + + // TestMissingTopic will test the topic validation on publish + t.Run("TestMissingTopic", func(t *testing.T) { + err := stream.Publish("") + assert.Equalf(t, err, events.ErrMissingTopic, "Publishing to a blank topic should return an error") + }) + + // TestFirehose will publish a message to the test topic. The subscriber will subscribe to the + // firehose topic (indicated by a lack of the topic option). + t.Run("TestFirehose", func(t *testing.T) { + payload := &testPayload{Message: "HelloWorld"} + metadata := map[string]string{"foo": "bar"} + + // create the subscriber + evChan, err := stream.Subscribe() + assert.Nilf(t, err, "Subscribe should not return an error") + + // setup the subscriber async + var wg sync.WaitGroup + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + err = stream.Publish("test", + events.WithPayload(payload), + events.WithMetadata(metadata), + ) + assert.Nil(t, err, "Publishing a valid message should not return an error") + wg.Add(1) + + // wait for the subscriber to recieve the message or timeout + wg.Wait() + }) + + // TestSubscribeTopic will publish a message to the test topic. The subscriber will subscribe to the + // same test topic. + t.Run("TestSubscribeTopic", func(t *testing.T) { + payload := &testPayload{Message: "HelloWorld"} + metadata := map[string]string{"foo": "bar"} + + // create the subscriber + evChan, err := stream.Subscribe(events.WithTopic("test")) + assert.Nilf(t, err, "Subscribe should not return an error") + + // setup the subscriber async + var wg sync.WaitGroup + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + err = stream.Publish("test", + events.WithPayload(payload), + events.WithMetadata(metadata), + ) + assert.Nil(t, err, "Publishing a valid message should not return an error") + wg.Add(1) + + // wait for the subscriber to recieve the message or timeout + wg.Wait() + }) + + // TestSubscribeQueue will publish a message to a random topic. Two subscribers will then consume + // the message from the firehose topic with different queues. The second subscriber will be registered + // after the message is published to test durability. + t.Run("TestSubscribeQueue", func(t *testing.T) { + topic := uuid.New().String() + payload := &testPayload{Message: "HelloWorld"} + metadata := map[string]string{"foo": "bar"} + + // create the first subscriber + evChan1, err := stream.Subscribe(events.WithTopic(topic)) + assert.Nilf(t, err, "Subscribe should not return an error") + + // setup the subscriber async + var wg sync.WaitGroup + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan1: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + err = stream.Publish(topic, + events.WithPayload(payload), + events.WithMetadata(metadata), + ) + assert.Nil(t, err, "Publishing a valid message should not return an error") + wg.Add(2) + + // create the second subscriber + evChan2, err := stream.Subscribe( + events.WithTopic(topic), + events.WithQueue("second_queue"), + events.WithStartAtTime(time.Now().Add(time.Minute*-1)), + ) + assert.Nilf(t, err, "Subscribe should not return an error") + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan2: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + // wait for the subscriber to recieve the message or timeout + wg.Wait() + }) +} diff --git a/events/memory/options.go b/events/memory/options.go new file mode 100644 index 00000000..8f9f1cad --- /dev/null +++ b/events/memory/options.go @@ -0,0 +1,18 @@ +package memory + +import "github.com/micro/go-micro/v3/store" + +// Options which are used to configure the in-memory stream +type Options struct { + Store store.Store +} + +// Option is a function which configures options +type Option func(o *Options) + +// Store sets the store to use +func Store(s store.Store) Option { + return func(o *Options) { + o.Store = s + } +} diff --git a/events/nats/nats.go b/events/nats/nats.go new file mode 100644 index 00000000..8c241815 --- /dev/null +++ b/events/nats/nats.go @@ -0,0 +1,154 @@ +package nats + +import ( + "encoding/json" + "time" + + "github.com/google/uuid" + stan "github.com/nats-io/stan.go" + "github.com/pkg/errors" + + "github.com/micro/go-micro/v3/events" + "github.com/micro/go-micro/v3/logger" +) + +const ( + defaultClusterID = "micro" + eventsTopic = "events" +) + +// NewStream returns an initialized nats stream or an error if the connection to the nats +// server could not be established +func NewStream(opts ...Option) (events.Stream, error) { + // parse the options + options := Options{ + ClientID: uuid.New().String(), + ClusterID: defaultClusterID, + } + for _, o := range opts { + o(&options) + } + + // pass the address as an option if it was set + var cOpts []stan.Option + if len(options.Address) > 0 { + cOpts = append(cOpts, stan.NatsURL(options.Address)) + } + + // connect to the cluster + conn, err := stan.Connect(options.ClusterID, options.ClientID, cOpts...) + if err != nil { + return nil, errors.Wrap(err, "Error connecting to nats") + } + + return &stream{conn}, nil +} + +type stream struct { + conn stan.Conn +} + +// Publish a message to a topic +func (s *stream) Publish(topic string, opts ...events.PublishOption) error { + // validate the topic + if len(topic) == 0 { + return events.ErrMissingTopic + } + + // parse the options + options := events.PublishOptions{ + Timestamp: time.Now(), + } + for _, o := range opts { + o(&options) + } + + // encode the message if it's not already encoded + var payload []byte + if p, ok := options.Payload.([]byte); ok { + payload = p + } else { + p, err := json.Marshal(options.Payload) + if err != nil { + return events.ErrEncodingMessage + } + payload = p + } + + // construct the event + event := &events.Event{ + ID: uuid.New().String(), + Topic: topic, + Timestamp: options.Timestamp, + Metadata: options.Metadata, + Payload: payload, + } + + // serialize the event to bytes + bytes, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "Error encoding event") + } + + // publish the event to the events channel + if _, err := s.conn.PublishAsync(eventsTopic, bytes, nil); err != nil { + return errors.Wrap(err, "Error publishing message to events") + } + + // publish the event to the topic's channel + if _, err := s.conn.PublishAsync(event.Topic, bytes, nil); err != nil { + return errors.Wrap(err, "Error publishing message to topic") + } + + return nil +} + +// Subscribe to a topic +func (s *stream) Subscribe(opts ...events.SubscribeOption) (<-chan events.Event, error) { + // parse the options + options := events.SubscribeOptions{ + Topic: eventsTopic, + Queue: uuid.New().String(), + } + for _, o := range opts { + o(&options) + } + + // setup the subscriber + c := make(chan events.Event) + handleMsg := func(m *stan.Msg) { + // decode the message + var evt events.Event + if err := json.Unmarshal(m.Data, &evt); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error decoding message: %v", err) + } + // not ackknowledging the message is the way to indicate an error occured + return + } + + // push onto the channel and wait for the consumer to take the event off before we acknowledge it. + c <- evt + + if err := m.Ack(); err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error acknowledging message: %v", err) + } + } + + // setup the options + subOpts := []stan.SubscriptionOption{ + stan.DurableName(options.Topic), + stan.SetManualAckMode(), + } + if options.StartAtTime.Unix() > 0 { + stan.StartAtTime(options.StartAtTime) + } + + // connect the subscriber + _, err := s.conn.QueueSubscribe(options.Topic, options.Queue, handleMsg, subOpts...) + if err != nil { + return nil, errors.Wrap(err, "Error subscribing to topic") + } + + return c, nil +} diff --git a/events/nats/nats_test.go b/events/nats/nats_test.go new file mode 100644 index 00000000..daddd7fc --- /dev/null +++ b/events/nats/nats_test.go @@ -0,0 +1,200 @@ +package nats + +import ( + "net" + "os/exec" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/micro/go-micro/v3/events" + "github.com/stretchr/testify/assert" +) + +type testPayload struct { + Message string +} + +func TestStream(t *testing.T) { + _, err := exec.LookPath("nats-streaming-server") + if err != nil { + t.Skipf("Skipping nats test, nats-streaming-server binary is not detected") + } + + conn, err := net.DialTimeout("tcp", ":4222", time.Millisecond*100) + if err != nil { + t.Skipf("Skipping nats test, could not connect to cluster on port 4222: %v", err) + } + if err := conn.Close(); err != nil { + t.Fatalf("Error closing test tcp connection to nats cluster") + } + + stream, err := NewStream(ClusterID("test-cluster")) + assert.Nilf(t, err, "NewStream should not return an error") + assert.NotNilf(t, stream, "NewStream should return a stream object") + + // TestMissingTopic will test the topic validation on publish + t.Run("TestMissingTopic", func(t *testing.T) { + err := stream.Publish("") + assert.Equalf(t, err, events.ErrMissingTopic, "Publishing to a blank topic should return an error") + }) + + // TestFirehose will publish a message to the test topic. The subscriber will subscribe to the + // firehose topic (indicated by a lack of the topic option). + t.Run("TestFirehose", func(t *testing.T) { + payload := &testPayload{Message: "HelloWorld"} + metadata := map[string]string{"foo": "bar"} + + // create the subscriber + evChan, err := stream.Subscribe() + assert.Nilf(t, err, "Subscribe should not return an error") + + // setup the subscriber async + var wg sync.WaitGroup + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + err = stream.Publish("test", + events.WithPayload(payload), + events.WithMetadata(metadata), + ) + assert.Nil(t, err, "Publishing a valid message should not return an error") + wg.Add(1) + + // wait for the subscriber to recieve the message or timeout + wg.Wait() + }) + + // TestSubscribeTopic will publish a message to the test topic. The subscriber will subscribe to the + // same test topic. + t.Run("TestSubscribeTopic", func(t *testing.T) { + payload := &testPayload{Message: "HelloWorld"} + metadata := map[string]string{"foo": "bar"} + + // create the subscriber + evChan, err := stream.Subscribe(events.WithTopic("test")) + assert.Nilf(t, err, "Subscribe should not return an error") + + // setup the subscriber async + var wg sync.WaitGroup + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + err = stream.Publish("test", + events.WithPayload(payload), + events.WithMetadata(metadata), + ) + assert.Nil(t, err, "Publishing a valid message should not return an error") + wg.Add(1) + + // wait for the subscriber to recieve the message or timeout + wg.Wait() + }) + + // TestSubscribeQueue will publish a message to a random topic. Two subscribers will then consume + // the message from the firehose topic with different queues. The second subscriber will be registered + // after the message is published to test durability. + t.Run("TestSubscribeQueue", func(t *testing.T) { + topic := uuid.New().String() + payload := &testPayload{Message: "HelloWorld"} + metadata := map[string]string{"foo": "bar"} + + // create the first subscriber + evChan1, err := stream.Subscribe(events.WithTopic(topic)) + assert.Nilf(t, err, "Subscribe should not return an error") + + // setup the subscriber async + var wg sync.WaitGroup + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan1: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + err = stream.Publish(topic, + events.WithPayload(payload), + events.WithMetadata(metadata), + ) + assert.Nil(t, err, "Publishing a valid message should not return an error") + wg.Add(2) + + // create the second subscriber + evChan2, err := stream.Subscribe( + events.WithTopic(topic), + events.WithQueue("second_queue"), + events.WithStartAtTime(time.Now().Add(time.Minute*-1)), + ) + assert.Nilf(t, err, "Subscribe should not return an error") + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan2: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + // wait for the subscriber to recieve the message or timeout + wg.Wait() + }) +} diff --git a/events/nats/options.go b/events/nats/options.go new file mode 100644 index 00000000..81b41942 --- /dev/null +++ b/events/nats/options.go @@ -0,0 +1,32 @@ +package nats + +// Options which are used to configure the nats stream +type Options struct { + ClusterID string + ClientID string + Address string +} + +// Option is a function which configures options +type Option func(o *Options) + +// ClusterID sets the cluster id for the nats connection +func ClusterID(id string) Option { + return func(o *Options) { + o.ClusterID = id + } +} + +// ClientID sets the client id for the nats connection +func ClientID(id string) Option { + return func(o *Options) { + o.ClientID = id + } +} + +// Address of the nats cluster +func Address(addr string) Option { + return func(o *Options) { + o.Address = addr + } +} diff --git a/events/options.go b/events/options.go new file mode 100644 index 00000000..fe3096e8 --- /dev/null +++ b/events/options.go @@ -0,0 +1,140 @@ +package events + +import "time" + +// PublishOptions contains all the options which can be provided when publishing an event +type PublishOptions struct { + // Metadata contains any keys which can be used to query the data, for example a customer id + Metadata map[string]string + // Payload contains any additonal data which is relevent to the event but does not need to be + // indexed such as structured data + Payload interface{} + // Timestamp to set for the event, if the timestamp is a zero value, the current time will be used + Timestamp time.Time +} + +// PublishOption sets attributes on PublishOptions +type PublishOption func(o *PublishOptions) + +// WithMetadata sets the Metadata field on PublishOptions +func WithMetadata(md map[string]string) PublishOption { + return func(o *PublishOptions) { + o.Metadata = md + } +} + +// WithPayload sets the payload field on PublishOptions +func WithPayload(p interface{}) PublishOption { + return func(o *PublishOptions) { + o.Payload = p + } +} + +// WithTimestamp sets the timestamp field on PublishOptions +func WithTimestamp(t time.Time) PublishOption { + return func(o *PublishOptions) { + o.Timestamp = t + } +} + +// SubscribeOptions contains all the options which can be provided when subscribing to a topic +type SubscribeOptions struct { + // Queue is the name of the subscribers queue, if two subscribers have the same queue the message + // should only be published to one of them + Queue string + // Topic to subscribe to, if left blank the consumer will be subscribed to the firehouse topic which + // recieves all events + Topic string + // StartAtTime is the time from which the messages should be consumed from. If not provided then + // the messages will be consumed starting from the moment the Subscription starts. + StartAtTime time.Time +} + +// SubscribeOption sets attributes on SubscribeOptions +type SubscribeOption func(o *SubscribeOptions) + +// WithQueue sets the Queue fielf on SubscribeOptions to the value provided +func WithQueue(q string) SubscribeOption { + return func(o *SubscribeOptions) { + o.Queue = q + } +} + +// WithTopic sets the topic to subscribe to +func WithTopic(t string) SubscribeOption { + return func(o *SubscribeOptions) { + o.Topic = t + } +} + +// WithStartAtTime sets the StartAtTime field on SubscribeOptions to the value provided +func WithStartAtTime(t time.Time) SubscribeOption { + return func(o *SubscribeOptions) { + o.StartAtTime = t + } +} + +// WriteOptions contains all the options which can be provided when writing an event to a store +type WriteOptions struct { + // TTL is the duration the event should be recorded for, a zero value TTL indicates the event should + // be stored indefinately + TTL time.Duration +} + +// WriteOption sets attributes on WriteOptions +type WriteOption func(o *WriteOptions) + +// WithTTL sets the TTL attribute on WriteOptions +func WithTTL(d time.Duration) WriteOption { + return func(o *WriteOptions) { + o.TTL = d + } +} + +// ReadOptions contains all the options which can be provided when reading events from a store +type ReadOptions struct { + // Topic to read events from, if no topic is provided events from all topics will be returned + Topic string + // Query to filter the results using. The store will query the metadata provided when the event + // was written to the store + Query map[string]string + // Limit the number of results to return + Limit int + // Offset the results by this number, useful for paginated queries + Offset int +} + +// ReadOption sets attributes on ReadOptions +type ReadOption func(o *ReadOptions) + +// ReadTopic sets the topic attribute on ReadOptions +func ReadTopic(t string) ReadOption { + return func(o *ReadOptions) { + o.Topic = t + } +} + +// ReadFilter sets a key and value in the query +func ReadFilter(key, value string) ReadOption { + return func(o *ReadOptions) { + if o.Query == nil { + o.Query = map[string]string{key: value} + } else { + o.Query[key] = value + } + } +} + +// ReadLimit sets the limit attribute on ReadOptions +func ReadLimit(l int) ReadOption { + return func(o *ReadOptions) { + o.Limit = 1 + } +} + +// ReadOffset sets the offset attribute on ReadOptions +func ReadOffset(l int) ReadOption { + return func(o *ReadOptions) { + o.Offset = 1 + } +} diff --git a/go.mod b/go.mod index cb0cc423..86ca5741 100644 --- a/go.mod +++ b/go.mod @@ -40,12 +40,13 @@ require ( github.com/imdario/mergo v0.3.9 github.com/jonboulle/clockwork v0.1.0 // indirect github.com/kr/pretty v0.2.0 - github.com/lib/pq v1.3.0 + github.com/lib/pq v1.7.0 github.com/lucas-clemente/quic-go v0.14.1 github.com/miekg/dns v1.1.27 github.com/mitchellh/hashstructure v1.0.0 - github.com/nats-io/nats-server/v2 v2.1.6 // indirect - github.com/nats-io/nats.go v1.9.2 + github.com/nats-io/nats-streaming-server v0.18.0 // indirect + github.com/nats-io/nats.go v1.10.0 + github.com/nats-io/stan.go v0.7.0 github.com/onsi/ginkgo v1.12.0 // indirect github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c github.com/patrickmn/go-cache v2.1.0+incompatible @@ -56,7 +57,7 @@ require ( github.com/teris-io/shortid v0.0.0-20171029131806-771a37caa5cf github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect - go.etcd.io/bbolt v1.3.4 + go.etcd.io/bbolt v1.3.5 go.uber.org/zap v1.13.0 golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899 golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2 diff --git a/go.sum b/go.sum index 4ca8f478..a09f0eaa 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,7 @@ github.com/Azure/go-autorest/tracing v0.1.0/go.mod h1:ROEEAFwXycQw7Sn3DXNtEedEvd github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5 h1:ygIc8M6trr62pF5DucadTWGdEB4mEyvzi0e2nbcmcyA= github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw= github.com/Microsoft/hcsshim v0.8.7-0.20191101173118-65519b62243c h1:YMP6olTU903X3gxQJckdmiP8/zkSMq4kN3uipsU9XjU= @@ -50,6 +51,8 @@ github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1 github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 h1:kFOfPq6dUM1hTo4JG6LR5AXSUEsOjtdm0kw0FtQtMJA= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= +github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/aws/aws-sdk-go v1.23.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= @@ -63,6 +66,7 @@ github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngE github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0= github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA= github.com/caddyserver/certmagic v0.10.6 h1:sCya6FmfaN74oZE46kqfaFOVoROD/mF36rTQfjN7TZc= @@ -75,6 +79,8 @@ github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cheekybits/genny v1.0.0 h1:uGGa4nei+j20rOSeDeP5Of12XVm7TGUd4dJA9RDitfE= github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wXkRAgjxjQ= +github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= +github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/cloudflare/cloudflare-go v0.10.2/go.mod h1:qhVI5MKwBGhdNU89ZRz2plgYutcJ5PCekLxXn56w6SY= github.com/containerd/cgroups v0.0.0-20190919134610-bf292b21730f/go.mod h1:OApqhQ4XNSNC13gXIwDjhOQxjWa/NxkwZXJ1EvqT0ko= github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw= @@ -158,6 +164,8 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee h1:s+21KNqlpePfkah2I+gwHF8xmJWRjooY+5248k6m4A0= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= @@ -171,6 +179,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= +github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -184,7 +194,6 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -233,12 +242,26 @@ github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1 github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI= github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= +github.com/hashicorp/go-hclog v0.9.1 h1:9PZfAcVEvez4yhLH2TBU64/h/z4xlFI80cWXRrxuKuM= +github.com/hashicorp/go-hclog v0.9.1/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= +github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= +github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= +github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= +github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= +github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.3 h1:YPkqC67at8FYaadspW/6uE0COsBxS2656RLEr8Bppgk= github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hashicorp/raft v1.1.2 h1:oxEL5DDeurYxLd3UbcY/hccgSPhLLpiBZ1YxtWEq59c= +github.com/hashicorp/raft v1.1.2/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= +github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/iij/doapi v0.0.0-20190504054126-0bbf12d6d7df/go.mod h1:QMZY7/J/KSQEhKWFeDesPjMj+wCHReeknARU3wqlyN4= @@ -261,6 +284,7 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd h1:Coekwdh0v2wtGp9Gmz1Ze3eVRAWJMLokvN3QjdzCHLY= github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/cpuid v1.2.3 h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs= github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= @@ -279,8 +303,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labbsr0x/bindman-dns-webhook v1.0.2/go.mod h1:p6b+VCXIR8NYKpDr8/dg1HKfQoRHCdcsROXKvmoehKA= github.com/labbsr0x/goh v1.0.1/go.mod h1:8K2UhVoaWXcCU7Lxoa2omWnC8gyW8px7/lmO61c027w= -github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU= -github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.7.0 h1:h93mCPfUSkaul3Ka/VG8uZdmW1uMHDGxzu0NWHuJmHY= +github.com/lib/pq v1.7.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/linode/linodego v0.10.0/go.mod h1:cziNP7pbvE3mXIPneHj0oRY8L1WtGEIKlZ8LANE4eXA= github.com/liquidweb/liquidweb-go v1.6.0/go.mod h1:UDcVnAMDkZxpw4Y7NOHkqoeiGacVLEIG/i5J9cyixzQ= github.com/lucas-clemente/quic-go v0.14.1 h1:c1aKoBZKOPA+49q96B1wGkibyPP0AxYh45WuAoq+87E= @@ -317,15 +341,19 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= -github.com/nats-io/nats-server/v2 v2.1.6 h1:qAaHZaS8pRRNQLFaiBA1rq5WynyEGp9DFgmMfoaiXGY= -github.com/nats-io/nats-server/v2 v2.1.6/go.mod h1:BL1NOtaBQ5/y97djERRVWNouMW7GT3gxnmbE/eC8u8A= -github.com/nats-io/nats.go v1.9.2 h1:oDeERm3NcZVrPpdR/JpGdWHMv3oJ8yY30YwxKq+DU2s= -github.com/nats-io/nats.go v1.9.2/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= +github.com/nats-io/nats-server/v2 v2.1.7 h1:jCoQwDvRYJy3OpOTHeYfvIPLP46BMeDmH7XEJg/r42I= +github.com/nats-io/nats-server/v2 v2.1.7/go.mod h1:rbRrRE/Iv93O/rUvZ9dh4NfT0Cm9HWjW/BqOWLGgYiE= +github.com/nats-io/nats-streaming-server v0.18.0 h1:+RDozeN9scwCm0Wc2fYlvGcP144hvxvSOtxZ8FE21ME= +github.com/nats-io/nats-streaming-server v0.18.0/go.mod h1:Y9Aiif2oANuoKazQrs4wXtF3jqt6p97ODQg68lR5TnY= +github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY= +github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/nats-io/stan.go v0.7.0 h1:sMVHD9RkxPOl6PJfDVBQd+gbxWkApeYl6GrH+10msO4= +github.com/nats-io/stan.go v0.7.0/go.mod h1:Ci6mUIpGQTjl++MqK2XzkWI/0vF+Bl72uScx7ejSYmU= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= @@ -359,6 +387,8 @@ github.com/oracle/oci-go-sdk v7.0.0+incompatible/go.mod h1:VQb79nF8Z2cwLkLS35ukw github.com/ovh/go-ovh v0.0.0-20181109152953-ba5adb4cf014/go.mod h1:joRatxRJaZBsY3JAOEMcoOp05CnZzsx4scTxi95DHyQ= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c h1:rp5dCmg/yLR3mgFuSOe4oEnDDmGLROTvMragMUXpTQw= github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c/go.mod h1:X07ZCGwUbLaax7L0S3Tw4hpejzu63ZrrQiUe6W0hcy0= +github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= +github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= @@ -369,6 +399,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.1.0 h1:BQ53HtBmfOitExawJ6LokA4x8ov/z0SYYb0+HxJfRI8= @@ -382,6 +413,7 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCb github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkpEo= @@ -389,11 +421,13 @@ github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+ github.com/prometheus/common v0.10.0 h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8= github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= +github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/rainycape/memcache v0.0.0-20150622160815-1031fa0ce2f2/go.mod h1:7tZKcyumwBO6qip7RNQ5r77yrssm9bfCowcLEBcU5IA= @@ -432,6 +466,7 @@ github.com/timewasted/linode v0.0.0-20160829202747-37e84520dcf7/go.mod h1:imsgLp github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc h1:yUaosFVTJwnltaHbSNC3i82I92quFs+OFPRl8kNMVwo= github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/transip/gotransip v0.0.0-20190812104329-6d8d9179b66f/go.mod h1:i0f4R4o2HM0m3DZYQWsj6/MEowD57VzoH0v3d7igeFY= +github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= @@ -444,8 +479,8 @@ github.com/xeipuuv/gojsonschema v0.0.0-20180618132009-1d523034197f/go.mod h1:5yf github.com/xeipuuv/gojsonschema v1.1.0/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= -go.etcd.io/bbolt v1.3.4 h1:hi1bXHMVrlQh6WwxAy+qZCV/SYIlqo+Ushwdpa4tAKg= -go.etcd.io/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= +go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= +go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= @@ -504,6 +539,7 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -549,6 +585,7 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190514135907-3a4b5fb9f71f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -574,6 +611,7 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqG golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=