2020-08-18 18:19:53 +03:00
|
|
|
package memory
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"github.com/micro/go-micro/v3/events"
|
|
|
|
"github.com/micro/go-micro/v3/logger"
|
|
|
|
"github.com/micro/go-micro/v3/store"
|
|
|
|
"github.com/micro/go-micro/v3/store/memory"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
)
|
|
|
|
|
|
|
|
// NewStream returns an initialized memory stream
|
|
|
|
func NewStream(opts ...Option) (events.Stream, error) {
|
|
|
|
// parse the options
|
|
|
|
var options Options
|
|
|
|
for _, o := range opts {
|
|
|
|
o(&options)
|
|
|
|
}
|
|
|
|
if options.Store == nil {
|
|
|
|
options.Store = memory.NewStore()
|
|
|
|
}
|
|
|
|
|
|
|
|
return &mem{store: options.Store}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type subscriber struct {
|
|
|
|
Queue string
|
|
|
|
Topic string
|
|
|
|
Channel chan events.Event
|
2020-09-02 15:28:54 +03:00
|
|
|
|
|
|
|
sync.RWMutex
|
2020-09-04 10:31:49 +03:00
|
|
|
retryMap map[string]int
|
|
|
|
retryLimit int
|
|
|
|
autoAck bool
|
|
|
|
ackWait time.Duration
|
2020-08-18 18:19:53 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
type mem struct {
|
|
|
|
store store.Store
|
|
|
|
|
|
|
|
subs []*subscriber
|
|
|
|
sync.RWMutex
|
|
|
|
}
|
|
|
|
|
2020-08-20 11:29:29 +03:00
|
|
|
func (m *mem) Publish(topic string, msg interface{}, opts ...events.PublishOption) error {
|
2020-08-18 18:19:53 +03:00
|
|
|
// validate the topic
|
|
|
|
if len(topic) == 0 {
|
|
|
|
return events.ErrMissingTopic
|
|
|
|
}
|
|
|
|
|
|
|
|
// parse the options
|
|
|
|
options := events.PublishOptions{
|
|
|
|
Timestamp: time.Now(),
|
|
|
|
}
|
|
|
|
for _, o := range opts {
|
|
|
|
o(&options)
|
|
|
|
}
|
|
|
|
|
|
|
|
// encode the message if it's not already encoded
|
|
|
|
var payload []byte
|
2020-08-20 11:29:29 +03:00
|
|
|
if p, ok := msg.([]byte); ok {
|
2020-08-18 18:19:53 +03:00
|
|
|
payload = p
|
|
|
|
} else {
|
2020-08-20 11:29:29 +03:00
|
|
|
p, err := json.Marshal(msg)
|
2020-08-18 18:19:53 +03:00
|
|
|
if err != nil {
|
|
|
|
return events.ErrEncodingMessage
|
|
|
|
}
|
|
|
|
payload = p
|
|
|
|
}
|
|
|
|
|
|
|
|
// construct the event
|
|
|
|
event := &events.Event{
|
|
|
|
ID: uuid.New().String(),
|
|
|
|
Topic: topic,
|
|
|
|
Timestamp: options.Timestamp,
|
|
|
|
Metadata: options.Metadata,
|
|
|
|
Payload: payload,
|
|
|
|
}
|
|
|
|
|
|
|
|
// serialize the event to bytes
|
|
|
|
bytes, err := json.Marshal(event)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "Error encoding event")
|
|
|
|
}
|
|
|
|
|
|
|
|
// write to the store
|
|
|
|
key := fmt.Sprintf("%v/%v", event.Topic, event.ID)
|
|
|
|
if err := m.store.Write(&store.Record{Key: key, Value: bytes}); err != nil {
|
|
|
|
return errors.Wrap(err, "Error writing event to store")
|
|
|
|
}
|
|
|
|
|
|
|
|
// send to the subscribers async
|
|
|
|
go m.handleEvent(event)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-08-20 11:29:29 +03:00
|
|
|
func (m *mem) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan events.Event, error) {
|
|
|
|
// validate the topic
|
|
|
|
if len(topic) == 0 {
|
|
|
|
return nil, events.ErrMissingTopic
|
|
|
|
}
|
|
|
|
|
2020-08-18 18:19:53 +03:00
|
|
|
// parse the options
|
|
|
|
options := events.SubscribeOptions{
|
2020-09-02 15:28:54 +03:00
|
|
|
Queue: uuid.New().String(),
|
|
|
|
AutoAck: true,
|
2020-08-18 18:19:53 +03:00
|
|
|
}
|
|
|
|
for _, o := range opts {
|
|
|
|
o(&options)
|
|
|
|
}
|
2020-09-02 15:28:54 +03:00
|
|
|
// TODO RetryLimit
|
2020-08-18 18:19:53 +03:00
|
|
|
|
|
|
|
// setup the subscriber
|
|
|
|
sub := &subscriber{
|
2020-09-04 10:31:49 +03:00
|
|
|
Channel: make(chan events.Event),
|
|
|
|
Topic: topic,
|
|
|
|
Queue: options.Queue,
|
|
|
|
retryMap: map[string]int{},
|
|
|
|
autoAck: true,
|
|
|
|
retryLimit: options.GetRetryLimit(),
|
2020-09-02 15:28:54 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
if !options.AutoAck {
|
2020-09-04 10:31:49 +03:00
|
|
|
if options.AckWait == 0 {
|
|
|
|
return nil, fmt.Errorf("invalid AckWait passed, should be positive integer")
|
|
|
|
}
|
2020-09-02 15:28:54 +03:00
|
|
|
sub.autoAck = options.AutoAck
|
|
|
|
sub.ackWait = options.AckWait
|
2020-08-18 18:19:53 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// register the subscriber
|
|
|
|
m.Lock()
|
|
|
|
m.subs = append(m.subs, sub)
|
|
|
|
m.Unlock()
|
|
|
|
|
|
|
|
// lookup previous events if the start time option was passed
|
|
|
|
if options.StartAtTime.Unix() > 0 {
|
|
|
|
go m.lookupPreviousEvents(sub, options.StartAtTime)
|
|
|
|
}
|
|
|
|
|
|
|
|
// return the channel
|
|
|
|
return sub.Channel, nil
|
|
|
|
}
|
|
|
|
|
2020-09-02 15:28:54 +03:00
|
|
|
// lookupPreviousEvents finds events for a subscriber which occurred before a given time and sends
|
2020-08-18 18:19:53 +03:00
|
|
|
// them into the subscribers channel
|
|
|
|
func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) {
|
|
|
|
// lookup all events which match the topic (a blank topic will return all results)
|
2020-08-20 11:29:29 +03:00
|
|
|
recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix())
|
2020-08-18 18:19:53 +03:00
|
|
|
if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
|
|
|
logger.Errorf("Error looking up previous events: %v", err)
|
|
|
|
return
|
|
|
|
} else if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// loop through the records and send it to the channel if it matches
|
|
|
|
for _, r := range recs {
|
|
|
|
var ev events.Event
|
|
|
|
if err := json.Unmarshal(r.Value, &ev); err != nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if ev.Timestamp.Unix() < startTime.Unix() {
|
|
|
|
continue
|
|
|
|
}
|
2020-09-02 15:28:54 +03:00
|
|
|
sendEvent(&ev, sub)
|
2020-08-18 18:19:53 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// handleEvents sends the event to any registered subscribers.
|
|
|
|
func (m *mem) handleEvent(ev *events.Event) {
|
|
|
|
m.RLock()
|
|
|
|
subs := m.subs
|
|
|
|
m.RUnlock()
|
|
|
|
|
|
|
|
// filteredSubs is a KV map of the queue name and subscribers. This is used to prevent a message
|
|
|
|
// being sent to two subscribers with the same queue.
|
|
|
|
filteredSubs := map[string]*subscriber{}
|
|
|
|
|
|
|
|
// filter down to subscribers who are interested in this topic
|
|
|
|
for _, sub := range subs {
|
|
|
|
if len(sub.Topic) == 0 || sub.Topic == ev.Topic {
|
|
|
|
filteredSubs[sub.Queue] = sub
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// send the message to each channel async (since one channel might be blocked)
|
2020-09-04 10:31:49 +03:00
|
|
|
for _, sub := range filteredSubs {
|
2020-09-02 15:28:54 +03:00
|
|
|
sendEvent(ev, sub)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func sendEvent(ev *events.Event, sub *subscriber) {
|
|
|
|
go func(s *subscriber) {
|
|
|
|
evCopy := *ev
|
|
|
|
if s.autoAck {
|
|
|
|
s.Channel <- evCopy
|
|
|
|
return
|
|
|
|
}
|
|
|
|
evCopy.SetAckFunc(ackFunc(s, evCopy))
|
|
|
|
evCopy.SetNackFunc(nackFunc(s, evCopy))
|
|
|
|
s.retryMap[evCopy.ID] = 0
|
|
|
|
tick := time.NewTicker(s.ackWait)
|
|
|
|
defer tick.Stop()
|
|
|
|
for range tick.C {
|
|
|
|
s.Lock()
|
|
|
|
count, ok := s.retryMap[evCopy.ID]
|
|
|
|
s.Unlock()
|
|
|
|
if !ok {
|
|
|
|
// success
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2020-09-04 10:31:49 +03:00
|
|
|
if s.retryLimit > -1 && count > s.retryLimit {
|
2020-09-02 15:28:54 +03:00
|
|
|
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
|
|
|
logger.Errorf("Message retry limit reached, discarding: %v %d %d", evCopy.ID, count, s.retryLimit)
|
|
|
|
}
|
|
|
|
s.Lock()
|
|
|
|
delete(s.retryMap, evCopy.ID)
|
|
|
|
s.Unlock()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
s.Channel <- evCopy
|
|
|
|
s.Lock()
|
|
|
|
s.retryMap[evCopy.ID] = count + 1
|
|
|
|
s.Unlock()
|
|
|
|
}
|
|
|
|
}(sub)
|
|
|
|
}
|
|
|
|
|
|
|
|
func ackFunc(s *subscriber, evCopy events.Event) func() error {
|
|
|
|
return func() error {
|
|
|
|
s.Lock()
|
|
|
|
delete(s.retryMap, evCopy.ID)
|
|
|
|
s.Unlock()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func nackFunc(s *subscriber, evCopy events.Event) func() error {
|
|
|
|
return func() error {
|
|
|
|
return nil
|
2020-08-18 18:19:53 +03:00
|
|
|
}
|
|
|
|
}
|