registry: gossip unify registry option passing, optimize

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2019-01-30 15:39:57 +03:00
parent cead99ac44
commit 422e2002a0
7 changed files with 367 additions and 138 deletions

View File

@@ -7,9 +7,11 @@ import (
"io/ioutil"
"net"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/golang/protobuf/proto"
@@ -21,10 +23,35 @@ import (
"github.com/mitchellh/hashstructure"
)
// use registry.Result int32 values after it switches from string to int32 types
// type actionType int32
// type updateType int32
const (
addAction = "update"
delAction = "delete"
syncAction = "sync"
actionTypeInvalid int32 = iota
actionTypeCreate
actionTypeDelete
actionTypeUpdate
actionTypeSync
)
func actionTypeString(t int32) string {
switch t {
case actionTypeCreate:
return "create"
case actionTypeDelete:
return "delete"
case actionTypeUpdate:
return "update"
case actionTypeSync:
return "sync"
}
return "invalid"
}
const (
updateTypeInvalid int32 = iota
updateTypeService
)
type broadcast struct {
@@ -93,23 +120,18 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// shutdown old member
if g.member != nil {
g.member.Shutdown()
g.Stop()
}
// replace addresses
curAddrs = newAddrs
// create a queue
queue := &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(curAddrs)
},
RetransmitMult: 3,
}
// create a new default config
c := memberlist.DefaultLocalConfig()
// log to dev null
c.LogOutput = ioutil.Discard
if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil {
c = optConfig
}
@@ -145,15 +167,6 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// set the name
c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-")
// set the delegate
c.Delegate = &delegate{
updates: g.updates,
queue: queue,
}
// log to dev null
c.LogOutput = ioutil.Discard
// set a secret key if secure
if g.options.Secure {
k, ok := g.options.Context.Value(contextSecretKey{}).([]byte)
@@ -164,6 +177,20 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
c.SecretKey = k
}
// create a queue
queue := &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(curAddrs)
},
RetransmitMult: 3,
}
// set the delegate
c.Delegate = &delegate{
updates: g.updates,
queue: queue,
}
// create the memberlist
m, err := memberlist.Create(c)
if err != nil {
@@ -187,29 +214,10 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
return nil
}
func (*broadcast) UniqueBroadcast() {}
func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
up := new(pb.Update)
if err := proto.Unmarshal(other.Message(), up); err != nil {
return false
}
// ids do not match
if b.update.Id == up.Id {
return false
}
// timestamps do not match
if b.update.Timestamp != up.Timestamp {
return false
}
// type does not match
if b.update.Type != up.Type {
return false
}
// invalidates
return true
return false
}
func (b *broadcast) Message() []byte {
@@ -242,7 +250,7 @@ func (d *delegate) NotifyMsg(b []byte) {
}
// only process service action
if up.Type != "service" {
if up.Type != updateTypeService {
return
}
@@ -280,7 +288,7 @@ func (d *delegate) LocalState(join bool) []byte {
d.updates <- &update{
Update: &pb.Update{
Action: syncAction,
Action: actionTypeSync,
},
sync: syncCh,
}
@@ -309,7 +317,7 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) {
for _, service := range services {
for _, srv := range service {
d.updates <- &update{
Update: &pb.Update{Action: addAction},
Update: &pb.Update{Action: actionTypeCreate},
Service: srv,
sync: nil,
}
@@ -350,6 +358,31 @@ func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
return next, exit
}
func (g *gossipRegistry) wait() {
ctx := g.options.Context
if c, ok := ctx.Value(contextContext{}).(context.Context); ok && c != nil {
ctx = c
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
select {
// wait on kill signal
case <-ch:
// wait on context cancel
case <-ctx.Done():
}
g.Stop()
}
func (g *gossipRegistry) Stop() {
g.member.Leave(g.interval * 2)
g.member.Shutdown()
}
func (g *gossipRegistry) run() {
var mtx sync.Mutex
updates := map[uint64]*update{}
@@ -367,11 +400,11 @@ func (g *gossipRegistry) run() {
// process all the updates
for k, v := range updates {
// check if expiry time has passed
if d := (v.Update.Timestamp + v.Update.Expires); d < now {
if d := (v.Update.Expires); d < now {
// delete from records
delete(updates, k)
// set to delete
v.Update.Action = delAction
v.Update.Action = actionTypeDelete
// fire a new update
g.updates <- v
}
@@ -384,7 +417,7 @@ func (g *gossipRegistry) run() {
// process the updates
for u := range g.updates {
switch u.Update.Action {
case addAction:
case actionTypeCreate:
g.Lock()
if service, ok := g.services[u.Service.Name]; !ok {
g.services[u.Service.Name] = []*registry.Service{u.Service}
@@ -395,7 +428,7 @@ func (g *gossipRegistry) run() {
g.Unlock()
// publish update to watchers
go g.publish(addAction, []*registry.Service{u.Service})
go g.publish(actionTypeString(actionTypeCreate), []*registry.Service{u.Service})
// we need to expire the node at some point in the future
if u.Update.Expires > 0 {
@@ -406,7 +439,7 @@ func (g *gossipRegistry) run() {
mtx.Unlock()
}
}
case delAction:
case actionTypeDelete:
g.Lock()
if service, ok := g.services[u.Service.Name]; ok {
if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 {
@@ -418,7 +451,7 @@ func (g *gossipRegistry) run() {
g.Unlock()
// publish update to watchers
go g.publish(delAction, []*registry.Service{u.Service})
go g.publish(actionTypeString(actionTypeDelete), []*registry.Service{u.Service})
// delete from expiry checks
if hash, err := hashstructure.Hash(u.Service, nil); err == nil {
@@ -426,7 +459,7 @@ func (g *gossipRegistry) run() {
delete(updates, hash)
mtx.Unlock()
}
case syncAction:
case actionTypeSync:
// no sync channel provided
if u.sync == nil {
continue
@@ -441,7 +474,7 @@ func (g *gossipRegistry) run() {
}
// publish to watchers
go g.publish(addAction, service)
go g.publish(actionTypeString(actionTypeCreate), service)
}
g.RUnlock()
@@ -480,11 +513,9 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register
}
up := &pb.Update{
Id: uuid.New().String(),
Timestamp: uint64(time.Now().UnixNano()),
Expires: uint64(options.TTL.Nanoseconds()),
Action: "update",
Type: "service",
Expires: uint64(time.Now().Add(options.TTL).UnixNano()),
Action: actionTypeCreate,
Type: updateTypeService,
Metadata: map[string]string{
"Content-Type": "application/json",
},
@@ -519,10 +550,8 @@ func (g *gossipRegistry) Deregister(s *registry.Service) error {
g.Unlock()
up := &pb.Update{
Id: uuid.New().String(),
Timestamp: uint64(time.Now().UnixNano()),
Action: "delete",
Type: "service",
Action: actionTypeDelete,
Type: updateTypeService,
Metadata: map[string]string{
"Content-Type": "application/json",
},
@@ -590,5 +619,7 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
// wait for setup
<-time.After(gossip.interval * 2)
go gossip.wait()
return gossip
}