Remove all the external plugins except grpc (#1988)
* Remove all the external plugins except grpc * strip cockroach * strip nats test * fix build
This commit is contained in:
@@ -1,190 +0,0 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/nats-io/nats.go"
|
||||
stan "github.com/nats-io/stan.go"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/micro/go-micro/v3/events"
|
||||
"github.com/micro/go-micro/v3/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultClusterID = "micro"
|
||||
)
|
||||
|
||||
// NewStream returns an initialized nats stream or an error if the connection to the nats
|
||||
// server could not be established
|
||||
func NewStream(opts ...Option) (events.Stream, error) {
|
||||
// parse the options
|
||||
options := Options{
|
||||
ClientID: uuid.New().String(),
|
||||
ClusterID: defaultClusterID,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// connect to nats
|
||||
nopts := nats.GetDefaultOptions()
|
||||
if options.TLSConfig != nil {
|
||||
nopts.Secure = true
|
||||
nopts.TLSConfig = options.TLSConfig
|
||||
}
|
||||
if len(options.Address) > 0 {
|
||||
nopts.Servers = []string{options.Address}
|
||||
}
|
||||
conn, err := nopts.Connect()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error connecting to nats at %v with tls enabled (%v): %v", options.Address, nopts.TLSConfig != nil, err)
|
||||
}
|
||||
|
||||
// connect to the cluster
|
||||
clusterConn, err := stan.Connect(options.ClusterID, options.ClientID, stan.NatsConn(conn))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error connecting to nats cluster %v: %v", options.ClusterID, err)
|
||||
}
|
||||
|
||||
return &stream{clusterConn}, nil
|
||||
}
|
||||
|
||||
type stream struct {
|
||||
conn stan.Conn
|
||||
}
|
||||
|
||||
// Publish a message to a topic
|
||||
func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOption) error {
|
||||
// validate the topic
|
||||
if len(topic) == 0 {
|
||||
return events.ErrMissingTopic
|
||||
}
|
||||
|
||||
// parse the options
|
||||
options := events.PublishOptions{
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// encode the message if it's not already encoded
|
||||
var payload []byte
|
||||
if p, ok := msg.([]byte); ok {
|
||||
payload = p
|
||||
} else {
|
||||
p, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return events.ErrEncodingMessage
|
||||
}
|
||||
payload = p
|
||||
}
|
||||
|
||||
// construct the event
|
||||
event := &events.Event{
|
||||
ID: uuid.New().String(),
|
||||
Topic: topic,
|
||||
Timestamp: options.Timestamp,
|
||||
Metadata: options.Metadata,
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
// serialize the event to bytes
|
||||
bytes, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Error encoding event")
|
||||
}
|
||||
|
||||
// publish the event to the topic's channel
|
||||
if _, err := s.conn.PublishAsync(event.Topic, bytes, nil); err != nil {
|
||||
return errors.Wrap(err, "Error publishing message to topic")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe to a topic
|
||||
func (s *stream) Subscribe(topic string, opts ...events.SubscribeOption) (<-chan events.Event, error) {
|
||||
// validate the topic
|
||||
if len(topic) == 0 {
|
||||
return nil, events.ErrMissingTopic
|
||||
}
|
||||
|
||||
// parse the options
|
||||
options := events.SubscribeOptions{
|
||||
Queue: uuid.New().String(),
|
||||
AutoAck: true,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// setup the subscriber
|
||||
c := make(chan events.Event)
|
||||
handleMsg := func(m *stan.Msg) {
|
||||
// poison message handling
|
||||
if options.GetRetryLimit() > -1 && m.Redelivered && int(m.RedeliveryCount) > options.GetRetryLimit() {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("Message retry limit reached, discarding: %v", m.Sequence)
|
||||
}
|
||||
m.Ack() // ignoring error
|
||||
return
|
||||
}
|
||||
|
||||
// decode the message
|
||||
var evt events.Event
|
||||
if err := json.Unmarshal(m.Data, &evt); err != nil {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("Error decoding message: %v", err)
|
||||
}
|
||||
// not acknowledging the message is the way to indicate an error occurred
|
||||
return
|
||||
}
|
||||
|
||||
if !options.AutoAck {
|
||||
// set up the ack funcs
|
||||
evt.SetAckFunc(func() error {
|
||||
return m.Ack()
|
||||
})
|
||||
evt.SetNackFunc(func() error {
|
||||
// noop. not acknowledging the message is the way to indicate an error occurred
|
||||
// we have to wait for the ack wait to kick in before the message is resent
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// push onto the channel and wait for the consumer to take the event off before we acknowledge it.
|
||||
c <- evt
|
||||
|
||||
if !options.AutoAck {
|
||||
return
|
||||
}
|
||||
if err := m.Ack(); err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("Error acknowledging message: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// setup the options
|
||||
subOpts := []stan.SubscriptionOption{
|
||||
stan.DurableName(topic),
|
||||
stan.SetManualAckMode(),
|
||||
}
|
||||
if options.StartAtTime.Unix() > 0 {
|
||||
subOpts = append(subOpts, stan.StartAtTime(options.StartAtTime))
|
||||
}
|
||||
if options.AckWait > 0 {
|
||||
subOpts = append(subOpts, stan.AckWait(options.AckWait))
|
||||
}
|
||||
|
||||
// connect the subscriber
|
||||
_, err := s.conn.QueueSubscribe(topic, options.Queue, handleMsg, subOpts...)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Error subscribing to topic")
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
@@ -1,3 +0,0 @@
|
||||
// +build nats
|
||||
|
||||
package nats
|
@@ -1,42 +0,0 @@
|
||||
package nats
|
||||
|
||||
import "crypto/tls"
|
||||
|
||||
// Options which are used to configure the nats stream
|
||||
type Options struct {
|
||||
ClusterID string
|
||||
ClientID string
|
||||
Address string
|
||||
TLSConfig *tls.Config
|
||||
}
|
||||
|
||||
// Option is a function which configures options
|
||||
type Option func(o *Options)
|
||||
|
||||
// ClusterID sets the cluster id for the nats connection
|
||||
func ClusterID(id string) Option {
|
||||
return func(o *Options) {
|
||||
o.ClusterID = id
|
||||
}
|
||||
}
|
||||
|
||||
// ClientID sets the client id for the nats connection
|
||||
func ClientID(id string) Option {
|
||||
return func(o *Options) {
|
||||
o.ClientID = id
|
||||
}
|
||||
}
|
||||
|
||||
// Address of the nats cluster
|
||||
func Address(addr string) Option {
|
||||
return func(o *Options) {
|
||||
o.Address = addr
|
||||
}
|
||||
}
|
||||
|
||||
// TLSConfig to use when connecting to the cluster
|
||||
func TLSConfig(t *tls.Config) Option {
|
||||
return func(o *Options) {
|
||||
o.TLSConfig = t
|
||||
}
|
||||
}
|
@@ -9,7 +9,6 @@ import (
|
||||
|
||||
"github.com/micro/go-micro/v3/events"
|
||||
"github.com/micro/go-micro/v3/events/stream/memory"
|
||||
"github.com/micro/go-micro/v3/events/stream/nats"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -27,13 +26,7 @@ type testCase struct {
|
||||
func TestStream(t *testing.T) {
|
||||
tcs := []testCase{}
|
||||
|
||||
// NATS specific setup
|
||||
stream, err := nats.NewStream(nats.ClusterID("test-cluster"))
|
||||
assert.Nilf(t, err, "NewStream should not return an error")
|
||||
assert.NotNilf(t, stream, "NewStream should return a stream object")
|
||||
tcs = append(tcs, testCase{str: stream, name: "nats"})
|
||||
|
||||
stream, err = memory.NewStream()
|
||||
stream, err := memory.NewStream()
|
||||
assert.Nilf(t, err, "NewStream should not return an error")
|
||||
assert.NotNilf(t, stream, "NewStream should return a stream object")
|
||||
tcs = append(tcs, testCase{str: stream, name: "memory"})
|
||||
|
Reference in New Issue
Block a user