micro/events/nats/nats.go
2020-08-20 09:29:29 +01:00

163 lines
3.8 KiB
Go

package nats
import (
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
stan "github.com/nats-io/stan.go"
"github.com/pkg/errors"
"github.com/micro/go-micro/v3/events"
"github.com/micro/go-micro/v3/logger"
)
const (
defaultClusterID = "micro"
)
// NewStream returns an initialized nats stream or an error if the connection to the nats
// server could not be established
func NewStream(opts ...Option) (events.Stream, error) {
// parse the options
options := Options{
ClientID: uuid.New().String(),
ClusterID: defaultClusterID,
}
for _, o := range opts {
o(&options)
}
// connect to nats
nopts := nats.GetDefaultOptions()
if options.TLSConfig != nil {
nopts.Secure = true
nopts.TLSConfig = options.TLSConfig
}
if len(options.Address) > 0 {
nopts.Servers = []string{options.Address}
}
conn, err := nopts.Connect()
if err != nil {
return nil, fmt.Errorf("Error connecting to nats at %v with tls enabled (%v): %v", options.Address, nopts.TLSConfig != nil, err)
}
// connect to the cluster
clusterConn, err := stan.Connect(options.ClusterID, options.ClientID, stan.NatsConn(conn))
if err != nil {
return nil, fmt.Errorf("Error connecting to nats cluster %v: %v", options.ClusterID, err)
}
return &stream{clusterConn}, nil
}
type stream struct {
conn stan.Conn
}
// Publish a message to a topic
func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOption) error {
// validate the topic
if len(topic) == 0 {
return events.ErrMissingTopic
}
// parse the options
options := events.PublishOptions{
Timestamp: time.Now(),
}
for _, o := range opts {
o(&options)
}
// encode the message if it's not already encoded
var payload []byte
if p, ok := msg.([]byte); ok {
payload = p
} else {
p, err := json.Marshal(msg)
if err != nil {
return events.ErrEncodingMessage
}
payload = p
}
// construct the event
event := &events.Event{
ID: uuid.New().String(),
Topic: topic,
Timestamp: options.Timestamp,
Metadata: options.Metadata,
Payload: payload,
}
// serialize the event to bytes
bytes, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "Error encoding event")
}
// publish the event to the topic's channel
if _, err := s.conn.PublishAsync(event.Topic, bytes, nil); err != nil {
return errors.Wrap(err, "Error publishing message to topic")
}
return nil
}
// Subscribe to a topic
func (s *stream) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan events.Event, error) {
// validate the topic
if len(topic) == 0 {
return nil, events.ErrMissingTopic
}
// parse the options
options := events.SubscribeOptions{
Queue: uuid.New().String(),
}
for _, o := range opts {
o(&options)
}
// setup the subscriber
c := make(chan events.Event)
handleMsg := func(m *stan.Msg) {
// decode the message
var evt events.Event
if err := json.Unmarshal(m.Data, &evt); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Error decoding message: %v", err)
}
// not ackknowledging the message is the way to indicate an error occured
return
}
// push onto the channel and wait for the consumer to take the event off before we acknowledge it.
c <- evt
if err := m.Ack(); err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Error acknowledging message: %v", err)
}
}
// setup the options
subOpts := []stan.SubscriptionOption{
stan.DurableName(topic),
stan.SetManualAckMode(),
}
if options.StartAtTime.Unix() > 0 {
stan.StartAtTime(options.StartAtTime)
}
// connect the subscriber
_, err := s.conn.QueueSubscribe(topic, options.Queue, handleMsg, subOpts...)
if err != nil {
return nil, errors.Wrap(err, "Error subscribing to topic")
}
return c, nil
}