394 lines
8.1 KiB
Go
394 lines
8.1 KiB
Go
|
package redis
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/docker/docker/registry"
|
||
|
goredis "github.com/redis/go-redis/v9"
|
||
|
"go.unistack.org/micro/v3/codec"
|
||
|
"go.unistack.org/micro/v3/register"
|
||
|
"go.unistack.org/micro/v3/util/id"
|
||
|
pool "go.unistack.org/micro/v3/util/xpool"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
separator = "/"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
_ register.Register = (*Redis)(nil)
|
||
|
_ register.Watcher = (*Watcher)(nil)
|
||
|
)
|
||
|
|
||
|
type Redis struct {
|
||
|
cli goredis.UniversalClient
|
||
|
opts register.Options
|
||
|
watchers map[string]*Watcher
|
||
|
services map[string][]*registry.Service
|
||
|
db int
|
||
|
pool *pool.StringsPool
|
||
|
sync.RWMutex
|
||
|
}
|
||
|
|
||
|
// NewRegister returns an initialized in-memory register
|
||
|
func NewRegister(opts ...register.Option) *Redis {
|
||
|
r := &Redis{
|
||
|
opts: register.NewOptions(opts...),
|
||
|
watchers: make(map[string]*Watcher),
|
||
|
services: make(map[string][]*registry.Service),
|
||
|
}
|
||
|
|
||
|
return r
|
||
|
}
|
||
|
|
||
|
func (r *Redis) Connect(ctx context.Context) error {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (r *Redis) Disconnect(ctx context.Context) error {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (r *Redis) Init(opts ...register.Option) error {
|
||
|
for _, o := range opts {
|
||
|
o(&r.opts)
|
||
|
}
|
||
|
|
||
|
redisOptions := DefaultOptions
|
||
|
|
||
|
if r.opts.Context != nil {
|
||
|
if c, ok := r.opts.Context.Value(configKey{}).(*goredis.UniversalOptions); ok && c != nil {
|
||
|
redisOptions = c
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if len(r.opts.Addrs) > 0 {
|
||
|
redisOptions.Addrs = r.opts.Addrs
|
||
|
}
|
||
|
|
||
|
if r.opts.TLSConfig != nil {
|
||
|
redisOptions.TLSConfig = r.opts.TLSConfig
|
||
|
}
|
||
|
|
||
|
c := goredis.NewUniversalClient(redisOptions)
|
||
|
setTracing(c, r.opts.Tracer)
|
||
|
r.pool = pool.NewStringsPool(50)
|
||
|
r.db = redisOptions.DB
|
||
|
r.cli = c
|
||
|
r.statsMeter()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (r *Redis) Options() register.Options {
|
||
|
return r.opts
|
||
|
}
|
||
|
|
||
|
func (r *Redis) Register(ctx context.Context, s *register.Service, opts ...register.RegisterOption) error {
|
||
|
options := register.NewRegisterOptions(opts...)
|
||
|
b := r.pool.Get()
|
||
|
defer func() {
|
||
|
b.Reset()
|
||
|
r.pool.Put(b)
|
||
|
}()
|
||
|
|
||
|
for _, n := range s.Nodes {
|
||
|
buf, err := r.opts.Codec.Marshal(n)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if err = r.cli.Set(ctx, r.getKey(b, options.Namespace, s.Name, s.Version, n.ID), buf, options.TTL).Err(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
evt := ®ister.Event{
|
||
|
Timestamp: time.Now(),
|
||
|
Type: register.EventCreate,
|
||
|
Service: s,
|
||
|
ID: id.MustNew(),
|
||
|
}
|
||
|
|
||
|
buf, err := r.opts.Codec.Marshal(evt)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if err = r.cli.Publish(ctx, options.Namespace+separator+"events", buf).Err(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (r *Redis) Deregister(ctx context.Context, s *register.Service, opts ...register.DeregisterOption) error {
|
||
|
options := register.NewDeregisterOptions(opts...)
|
||
|
b := r.pool.Get()
|
||
|
defer func() {
|
||
|
b.Reset()
|
||
|
r.pool.Put(b)
|
||
|
}()
|
||
|
var err error
|
||
|
for _, n := range s.Nodes {
|
||
|
if err = r.cli.Del(ctx, r.getKey(b, options.Namespace, s.Name, s.Version, n.ID)).Err(); err != nil && err != goredis.Nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
evt := ®ister.Event{
|
||
|
Timestamp: time.Now(),
|
||
|
Type: register.EventDelete,
|
||
|
Service: s,
|
||
|
ID: id.MustNew(),
|
||
|
}
|
||
|
|
||
|
buf, err := r.opts.Codec.Marshal(evt)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if err = r.cli.Publish(ctx, options.Namespace+separator+"events", buf).Err(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (r *Redis) LookupService(ctx context.Context, name string, opts ...register.LookupOption) ([]*register.Service, error) {
|
||
|
options := register.NewLookupOptions(opts...)
|
||
|
b := r.pool.Get()
|
||
|
defer func() {
|
||
|
b.Reset()
|
||
|
r.pool.Put(b)
|
||
|
}()
|
||
|
|
||
|
keys, err := r.cli.Keys(ctx, r.getKey(b, options.Namespace, name, "*", "")).Result()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
if len(keys) == 0 {
|
||
|
return nil, register.ErrNotFound
|
||
|
}
|
||
|
|
||
|
vals, err := r.cli.MGet(ctx, keys...).Result()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
if len(vals) == 0 {
|
||
|
return nil, nil
|
||
|
}
|
||
|
|
||
|
servicesMap := make(map[string]*register.Service)
|
||
|
for idx := range keys {
|
||
|
eidx := strings.LastIndex(keys[idx], separator)
|
||
|
sidx := strings.LastIndex(keys[idx][:eidx], separator)
|
||
|
if string(keys[idx][sidx]) == separator {
|
||
|
sidx++
|
||
|
}
|
||
|
name := keys[idx][sidx:eidx]
|
||
|
svc, ok := servicesMap[name]
|
||
|
if !ok {
|
||
|
p := strings.Split(name, "-")
|
||
|
svc = ®ister.Service{Name: p[0], Version: p[1]}
|
||
|
}
|
||
|
|
||
|
switch v := vals[idx].(type) {
|
||
|
case string:
|
||
|
node := ®ister.Node{}
|
||
|
if err = r.opts.Codec.Unmarshal([]byte(v), node); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
svc.Nodes = append(svc.Nodes, node)
|
||
|
case []byte:
|
||
|
node := ®ister.Node{}
|
||
|
if err = r.opts.Codec.Unmarshal(v, node); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
svc.Nodes = append(svc.Nodes, node)
|
||
|
}
|
||
|
|
||
|
servicesMap[name] = svc
|
||
|
}
|
||
|
|
||
|
svcs := make([]*register.Service, 0, len(servicesMap))
|
||
|
for _, svc := range servicesMap {
|
||
|
svcs = append(svcs, svc)
|
||
|
}
|
||
|
|
||
|
return svcs, nil
|
||
|
}
|
||
|
|
||
|
func (r *Redis) ListServices(ctx context.Context, opts ...register.ListOption) ([]*register.Service, error) {
|
||
|
options := register.NewListOptions(opts...)
|
||
|
b := r.pool.Get()
|
||
|
defer func() {
|
||
|
b.Reset()
|
||
|
r.pool.Put(b)
|
||
|
}()
|
||
|
|
||
|
// TODO: replace Keys with Scan
|
||
|
keys, err := r.cli.Keys(ctx, r.getKey(b, options.Namespace, "*", "", "")).Result()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
if len(keys) == 0 {
|
||
|
return nil, nil
|
||
|
}
|
||
|
|
||
|
vals, err := r.cli.MGet(ctx, keys...).Result()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
if len(vals) == 0 {
|
||
|
return nil, nil
|
||
|
}
|
||
|
|
||
|
servicesMap := make(map[string]*register.Service)
|
||
|
for idx := range keys {
|
||
|
eidx := strings.LastIndex(keys[idx], separator)
|
||
|
sidx := strings.LastIndex(keys[idx][:eidx], separator)
|
||
|
if string(keys[idx][sidx]) == separator {
|
||
|
sidx++
|
||
|
}
|
||
|
name := keys[idx][sidx:eidx]
|
||
|
svc, ok := servicesMap[name]
|
||
|
if !ok {
|
||
|
p := strings.Split(name, "-")
|
||
|
svc = ®ister.Service{Name: p[0], Version: p[1]}
|
||
|
}
|
||
|
|
||
|
switch v := vals[idx].(type) {
|
||
|
case string:
|
||
|
node := ®ister.Node{}
|
||
|
if err = r.opts.Codec.Unmarshal([]byte(v), node); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
svc.Nodes = append(svc.Nodes, node)
|
||
|
case []byte:
|
||
|
node := ®ister.Node{}
|
||
|
if err = r.opts.Codec.Unmarshal(v, node); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
svc.Nodes = append(svc.Nodes, node)
|
||
|
}
|
||
|
|
||
|
servicesMap[name] = svc
|
||
|
}
|
||
|
|
||
|
svcs := make([]*register.Service, 0, len(servicesMap))
|
||
|
for _, svc := range servicesMap {
|
||
|
svcs = append(svcs, svc)
|
||
|
}
|
||
|
|
||
|
return svcs, nil
|
||
|
}
|
||
|
|
||
|
func (r *Redis) Watch(ctx context.Context, opts ...register.WatchOption) (register.Watcher, error) {
|
||
|
id, err := id.New()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
wo := register.NewWatchOptions(opts...)
|
||
|
|
||
|
w := &Watcher{
|
||
|
exit: make(chan bool),
|
||
|
res: make(chan *register.Result),
|
||
|
id: id,
|
||
|
wo: wo,
|
||
|
sub: r.cli.Subscribe(ctx, wo.Namespace+separator+"events"),
|
||
|
codec: r.opts.Codec,
|
||
|
}
|
||
|
|
||
|
r.Lock()
|
||
|
r.watchers[w.id] = w
|
||
|
r.Unlock()
|
||
|
|
||
|
return w, nil
|
||
|
}
|
||
|
|
||
|
func (r *Redis) Name() string {
|
||
|
return r.opts.Name
|
||
|
}
|
||
|
|
||
|
func (r *Redis) String() string {
|
||
|
return "redis"
|
||
|
}
|
||
|
|
||
|
type Watcher struct {
|
||
|
res chan *register.Result
|
||
|
exit chan bool
|
||
|
wo register.WatchOptions
|
||
|
id string
|
||
|
codec codec.Codec
|
||
|
sub *goredis.PubSub
|
||
|
}
|
||
|
|
||
|
func (w *Watcher) Next() (*register.Result, error) {
|
||
|
var err error
|
||
|
for {
|
||
|
select {
|
||
|
case msg := <-w.sub.Channel():
|
||
|
evt := ®ister.Event{}
|
||
|
if err = w.codec.Unmarshal([]byte(msg.Payload), evt); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
if evt.Service == nil {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if len(w.wo.Service) > 0 && w.wo.Service != evt.Service.Name {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
namespace := register.DefaultNamespace
|
||
|
if evt.Service.Namespace != "" {
|
||
|
namespace = evt.Service.Namespace
|
||
|
}
|
||
|
|
||
|
// only send the event if watching the wildcard or this specific domain
|
||
|
if w.wo.Namespace == register.WildcardNamespace || w.wo.Namespace == namespace {
|
||
|
return ®ister.Result{Service: evt.Service, Action: evt.Type}, nil
|
||
|
}
|
||
|
|
||
|
case <-w.exit:
|
||
|
return nil, register.ErrWatcherStopped
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (w *Watcher) Stop() {
|
||
|
select {
|
||
|
case <-w.exit:
|
||
|
return
|
||
|
default:
|
||
|
w.sub.Close()
|
||
|
close(w.exit)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (r *Redis) getKey(b *strings.Builder, opNamespace string, name string, version string, nid string) string {
|
||
|
if opNamespace != "" {
|
||
|
b.WriteString(opNamespace)
|
||
|
b.WriteString(separator)
|
||
|
}
|
||
|
|
||
|
if name != "" {
|
||
|
b.WriteString(name)
|
||
|
}
|
||
|
|
||
|
if version != "" {
|
||
|
b.WriteString("-")
|
||
|
b.WriteString(version)
|
||
|
}
|
||
|
|
||
|
if nid != "" {
|
||
|
b.WriteString(separator)
|
||
|
b.WriteString(nid)
|
||
|
}
|
||
|
return b.String()
|
||
|
}
|