events/nats: add support for TLS config (#1946)
* events/nats: add support for TLS config * events/nats: improve error logging
This commit is contained in:
@@ -2,9 +2,11 @@ package nats
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/nats-io/nats.go"
|
||||
stan "github.com/nats-io/stan.go"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@@ -29,19 +31,27 @@ func NewStream(opts ...Option) (events.Stream, error) {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// pass the address as an option if it was set
|
||||
var cOpts []stan.Option
|
||||
// connect to nats
|
||||
nopts := nats.GetDefaultOptions()
|
||||
if options.TLSConfig != nil {
|
||||
nopts.Secure = true
|
||||
nopts.TLSConfig = options.TLSConfig
|
||||
}
|
||||
if len(options.Address) > 0 {
|
||||
cOpts = append(cOpts, stan.NatsURL(options.Address))
|
||||
nopts.Servers = []string{options.Address}
|
||||
}
|
||||
conn, err := nopts.Connect()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error connecting to nats at %v with tls enabled (%v): %v", options.Address, nopts.TLSConfig != nil, err)
|
||||
}
|
||||
|
||||
// connect to the cluster
|
||||
conn, err := stan.Connect(options.ClusterID, options.ClientID, cOpts...)
|
||||
clusterConn, err := stan.Connect(options.ClusterID, options.ClientID, stan.NatsConn(conn))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Error connecting to nats")
|
||||
return nil, fmt.Errorf("Error connecting to nats cluster %v: %v", options.ClusterID, err)
|
||||
}
|
||||
|
||||
return &stream{conn}, nil
|
||||
return &stream{clusterConn}, nil
|
||||
}
|
||||
|
||||
type stream struct {
|
||||
|
@@ -1,10 +1,13 @@
|
||||
package nats
|
||||
|
||||
import "crypto/tls"
|
||||
|
||||
// Options which are used to configure the nats stream
|
||||
type Options struct {
|
||||
ClusterID string
|
||||
ClientID string
|
||||
Address string
|
||||
TLSConfig *tls.Config
|
||||
}
|
||||
|
||||
// Option is a function which configures options
|
||||
@@ -30,3 +33,10 @@ func Address(addr string) Option {
|
||||
o.Address = addr
|
||||
}
|
||||
}
|
||||
|
||||
// TLSConfig to use when connecting to the cluster
|
||||
func TLSConfig(t *tls.Config) Option {
|
||||
return func(o *Options) {
|
||||
o.TLSConfig = t
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user