Merge branch 'master' of github.com:micro/go-micro

This commit is contained in:
Asim Aslam 2019-02-04 10:29:26 +00:00
commit baf7de76bf
8 changed files with 378 additions and 147 deletions

View File

@ -1,6 +1,6 @@
# Gossip Registry # Gossip Registry
Gossip is a zero dependency registry which uses hashicorp/memberlist to broadcast registry information Gossip is a zero dependency registry which uses github.com/hashicorp/memberlist to broadcast registry information
via the SWIM protocol. via the SWIM protocol.
## Usage ## Usage

View File

@ -0,0 +1,17 @@
package gossip
import (
"context"
"github.com/micro/go-micro/registry"
)
// setRegistryOption returns a function to setup a context with given value
func setRegistryOption(k, v interface{}) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}

View File

@ -7,9 +7,11 @@ import (
"io/ioutil" "io/ioutil"
"net" "net"
"os" "os"
"os/signal"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"syscall"
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -21,10 +23,35 @@ import (
"github.com/mitchellh/hashstructure" "github.com/mitchellh/hashstructure"
) )
// use registry.Result int32 values after it switches from string to int32 types
// type actionType int32
// type updateType int32
const ( const (
addAction = "update" actionTypeInvalid int32 = iota
delAction = "delete" actionTypeCreate
syncAction = "sync" actionTypeDelete
actionTypeUpdate
actionTypeSync
)
func actionTypeString(t int32) string {
switch t {
case actionTypeCreate:
return "create"
case actionTypeDelete:
return "delete"
case actionTypeUpdate:
return "update"
case actionTypeSync:
return "sync"
}
return "invalid"
}
const (
updateTypeInvalid int32 = iota
updateTypeService
) )
type broadcast struct { type broadcast struct {
@ -93,23 +120,18 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// shutdown old member // shutdown old member
if g.member != nil { if g.member != nil {
g.member.Shutdown() g.Stop()
} }
// replace addresses // replace addresses
curAddrs = newAddrs curAddrs = newAddrs
// create a queue
queue := &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(curAddrs)
},
RetransmitMult: 3,
}
// create a new default config // create a new default config
c := memberlist.DefaultLocalConfig() c := memberlist.DefaultLocalConfig()
// log to dev null
c.LogOutput = ioutil.Discard
if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil { if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil {
c = optConfig c = optConfig
} }
@ -145,15 +167,6 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// set the name // set the name
c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-") c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-")
// set the delegate
c.Delegate = &delegate{
updates: g.updates,
queue: queue,
}
// log to dev null
c.LogOutput = ioutil.Discard
// set a secret key if secure // set a secret key if secure
if g.options.Secure { if g.options.Secure {
k, ok := g.options.Context.Value(contextSecretKey{}).([]byte) k, ok := g.options.Context.Value(contextSecretKey{}).([]byte)
@ -164,6 +177,20 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
c.SecretKey = k c.SecretKey = k
} }
// create a queue
queue := &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(curAddrs)
},
RetransmitMult: 3,
}
// set the delegate
c.Delegate = &delegate{
updates: g.updates,
queue: queue,
}
// create the memberlist // create the memberlist
m, err := memberlist.Create(c) m, err := memberlist.Create(c)
if err != nil { if err != nil {
@ -187,31 +214,12 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
return nil return nil
} }
func (*broadcast) UniqueBroadcast() {}
func (b *broadcast) Invalidates(other memberlist.Broadcast) bool { func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
up := new(pb.Update)
if err := proto.Unmarshal(other.Message(), up); err != nil {
return false return false
} }
// ids do not match
if b.update.Id == up.Id {
return false
}
// timestamps do not match
if b.update.Timestamp != up.Timestamp {
return false
}
// type does not match
if b.update.Type != up.Type {
return false
}
// invalidates
return true
}
func (b *broadcast) Message() []byte { func (b *broadcast) Message() []byte {
up, err := proto.Marshal(b.update) up, err := proto.Marshal(b.update)
if err != nil { if err != nil {
@ -242,7 +250,7 @@ func (d *delegate) NotifyMsg(b []byte) {
} }
// only process service action // only process service action
if up.Type != "service" { if up.Type != updateTypeService {
return return
} }
@ -280,7 +288,7 @@ func (d *delegate) LocalState(join bool) []byte {
d.updates <- &update{ d.updates <- &update{
Update: &pb.Update{ Update: &pb.Update{
Action: syncAction, Action: actionTypeSync,
}, },
sync: syncCh, sync: syncCh,
} }
@ -309,7 +317,7 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) {
for _, service := range services { for _, service := range services {
for _, srv := range service { for _, srv := range service {
d.updates <- &update{ d.updates <- &update{
Update: &pb.Update{Action: addAction}, Update: &pb.Update{Action: actionTypeCreate},
Service: srv, Service: srv,
sync: nil, sync: nil,
} }
@ -350,6 +358,31 @@ func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
return next, exit return next, exit
} }
func (g *gossipRegistry) wait() {
ctx := g.options.Context
if c, ok := ctx.Value(contextContext{}).(context.Context); ok && c != nil {
ctx = c
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
select {
// wait on kill signal
case <-ch:
// wait on context cancel
case <-ctx.Done():
}
g.Stop()
}
func (g *gossipRegistry) Stop() {
g.member.Leave(g.interval * 2)
g.member.Shutdown()
}
func (g *gossipRegistry) run() { func (g *gossipRegistry) run() {
var mtx sync.Mutex var mtx sync.Mutex
updates := map[uint64]*update{} updates := map[uint64]*update{}
@ -367,11 +400,11 @@ func (g *gossipRegistry) run() {
// process all the updates // process all the updates
for k, v := range updates { for k, v := range updates {
// check if expiry time has passed // check if expiry time has passed
if d := (v.Update.Timestamp + v.Update.Expires); d < now { if d := (v.Update.Expires); d < now {
// delete from records // delete from records
delete(updates, k) delete(updates, k)
// set to delete // set to delete
v.Update.Action = delAction v.Update.Action = actionTypeDelete
// fire a new update // fire a new update
g.updates <- v g.updates <- v
} }
@ -384,7 +417,7 @@ func (g *gossipRegistry) run() {
// process the updates // process the updates
for u := range g.updates { for u := range g.updates {
switch u.Update.Action { switch u.Update.Action {
case addAction: case actionTypeCreate:
g.Lock() g.Lock()
if service, ok := g.services[u.Service.Name]; !ok { if service, ok := g.services[u.Service.Name]; !ok {
g.services[u.Service.Name] = []*registry.Service{u.Service} g.services[u.Service.Name] = []*registry.Service{u.Service}
@ -395,7 +428,7 @@ func (g *gossipRegistry) run() {
g.Unlock() g.Unlock()
// publish update to watchers // publish update to watchers
go g.publish(addAction, []*registry.Service{u.Service}) go g.publish(actionTypeString(actionTypeCreate), []*registry.Service{u.Service})
// we need to expire the node at some point in the future // we need to expire the node at some point in the future
if u.Update.Expires > 0 { if u.Update.Expires > 0 {
@ -406,7 +439,7 @@ func (g *gossipRegistry) run() {
mtx.Unlock() mtx.Unlock()
} }
} }
case delAction: case actionTypeDelete:
g.Lock() g.Lock()
if service, ok := g.services[u.Service.Name]; ok { if service, ok := g.services[u.Service.Name]; ok {
if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 { if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 {
@ -418,7 +451,7 @@ func (g *gossipRegistry) run() {
g.Unlock() g.Unlock()
// publish update to watchers // publish update to watchers
go g.publish(delAction, []*registry.Service{u.Service}) go g.publish(actionTypeString(actionTypeDelete), []*registry.Service{u.Service})
// delete from expiry checks // delete from expiry checks
if hash, err := hashstructure.Hash(u.Service, nil); err == nil { if hash, err := hashstructure.Hash(u.Service, nil); err == nil {
@ -426,7 +459,7 @@ func (g *gossipRegistry) run() {
delete(updates, hash) delete(updates, hash)
mtx.Unlock() mtx.Unlock()
} }
case syncAction: case actionTypeSync:
// no sync channel provided // no sync channel provided
if u.sync == nil { if u.sync == nil {
continue continue
@ -441,7 +474,7 @@ func (g *gossipRegistry) run() {
} }
// publish to watchers // publish to watchers
go g.publish(addAction, service) go g.publish(actionTypeString(actionTypeCreate), service)
} }
g.RUnlock() g.RUnlock()
@ -480,11 +513,9 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register
} }
up := &pb.Update{ up := &pb.Update{
Id: uuid.New().String(), Expires: uint64(time.Now().Add(options.TTL).UnixNano()),
Timestamp: uint64(time.Now().UnixNano()), Action: actionTypeCreate,
Expires: uint64(options.TTL.Nanoseconds()), Type: updateTypeService,
Action: "update",
Type: "service",
Metadata: map[string]string{ Metadata: map[string]string{
"Content-Type": "application/json", "Content-Type": "application/json",
}, },
@ -519,10 +550,8 @@ func (g *gossipRegistry) Deregister(s *registry.Service) error {
g.Unlock() g.Unlock()
up := &pb.Update{ up := &pb.Update{
Id: uuid.New().String(), Action: actionTypeDelete,
Timestamp: uint64(time.Now().UnixNano()), Type: updateTypeService,
Action: "delete",
Type: "service",
Metadata: map[string]string{ Metadata: map[string]string{
"Content-Type": "application/json", "Content-Type": "application/json",
}, },
@ -590,5 +619,7 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
// wait for setup // wait for setup
<-time.After(gossip.interval * 2) <-time.After(gossip.interval * 2)
go gossip.wait()
return gossip return gossip
} }

View File

@ -0,0 +1,199 @@
package gossip_test
import (
"context"
"os"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/hashicorp/memberlist"
micro "github.com/micro/go-micro"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/gossip"
pb "github.com/micro/go-micro/registry/gossip/proto"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/server"
)
var (
r1 registry.Registry
r2 registry.Registry
mu sync.Mutex
)
func newConfig() *memberlist.Config {
wc := memberlist.DefaultLANConfig()
wc.DisableTcpPings = false
wc.GossipVerifyIncoming = false
wc.GossipVerifyOutgoing = false
wc.EnableCompression = false
wc.LogOutput = os.Stderr
wc.ProtocolVersion = 4
wc.Name = uuid.New().String()
return wc
}
func newRegistries() {
mu.Lock()
defer mu.Unlock()
if r1 != nil && r2 != nil {
return
}
wc1 := newConfig()
wc2 := newConfig()
rops1 := []registry.Option{gossip.Config(wc1), gossip.Address("127.0.0.1:54321")}
rops2 := []registry.Option{gossip.Config(wc2), gossip.Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321")}
r1 = gossip.NewRegistry(rops1...) // first started without members
r2 = gossip.NewRegistry(rops2...) // second started joining
}
func TestRegistryBroadcast(t *testing.T) {
newRegistries()
svc1 := &registry.Service{Name: "r1-svc", Version: "0.0.0.1"}
svc2 := &registry.Service{Name: "r2-svc", Version: "0.0.0.2"}
<-time.After(1 * time.Second)
if err := r1.Register(svc1); err != nil {
t.Fatal(err)
}
<-time.After(1 * time.Second)
if err := r2.Register(svc2); err != nil {
t.Fatal(err)
}
var found bool
svcs, err := r1.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "r2-svc" {
found = true
}
}
if !found {
t.Fatalf("r2-svc not found in r1, broadcast not work")
}
found = false
svcs, err = r2.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "r1-svc" {
found = true
}
}
if !found {
t.Fatalf("r1-svc not found in r2, broadcast not work")
}
if err := r1.Deregister(svc1); err != nil {
t.Fatal(err)
}
if err := r2.Deregister(svc2); err != nil {
t.Fatal(err)
}
}
func TestServerRegistry(t *testing.T) {
newRegistries()
_, err := newServer("s1", r1, t)
if err != nil {
t.Fatal(err)
}
_, err = newServer("s2", r2, t)
if err != nil {
t.Fatal(err)
}
svcs, err := r1.ListServices()
if err != nil {
t.Fatal(err)
}
if len(svcs) < 1 {
t.Fatalf("r1 svcs unknown %#+v\n", svcs)
}
found := false
for _, svc := range svcs {
if svc.Name == "s2" {
found = true
}
}
if !found {
t.Fatalf("r1 does not have s2, broadcast not work")
}
found = false
svcs, err = r2.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "s1" {
found = true
}
}
if !found {
t.Fatalf("r2 does not have s1, broadcast not work")
}
}
type testServer struct{}
func (*testServer) Test(ctx context.Context, req *pb.Update, rsp *pb.Update) error {
return nil
}
func newServer(n string, r registry.Registry, t *testing.T) (micro.Service, error) {
h := &testServer{}
var wg sync.WaitGroup
wg.Add(1)
sopts := []server.Option{
server.Name(n),
server.Registry(r),
}
copts := []client.Option{
client.Selector(selector.NewSelector(selector.Registry(r))),
client.Registry(r),
}
srv := micro.NewService(
micro.Server(server.NewServer(sopts...)),
micro.Client(client.NewClient(copts...)),
micro.AfterStart(func() error {
wg.Done()
return nil
}),
)
srv.Server().NewHandler(h)
go func() {
t.Fatal(srv.Run())
}()
wg.Wait()
return srv, nil
}

View File

@ -12,34 +12,35 @@ type contextSecretKey struct{}
// Secret specifies an encryption key. The value should be either // Secret specifies an encryption key. The value should be either
// 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256. // 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256.
func Secret(k []byte) registry.Option { func Secret(k []byte) registry.Option {
return func(o *registry.Options) { return setRegistryOption(contextSecretKey{}, k)
o.Context = context.WithValue(o.Context, contextSecretKey{}, k)
}
} }
type contextAddress struct{} type contextAddress struct{}
// Address to bind to - host:port // Address to bind to - host:port
func Address(a string) registry.Option { func Address(a string) registry.Option {
return func(o *registry.Options) { return setRegistryOption(contextAddress{}, a)
o.Context = context.WithValue(o.Context, contextAddress{}, a)
}
} }
type contextConfig struct{} type contextConfig struct{}
// Config allow to inject a *memberlist.Config struct for configuring gossip // Config allow to inject a *memberlist.Config struct for configuring gossip
func Config(c *memberlist.Config) registry.Option { func Config(c *memberlist.Config) registry.Option {
return func(o *registry.Options) { return setRegistryOption(contextConfig{}, c)
o.Context = context.WithValue(o.Context, contextConfig{}, c)
}
} }
type contextAdvertise struct{} type contextAdvertise struct{}
// The address to advertise for other gossip members - host:port // The address to advertise for other gossip members - host:port
func Advertise(a string) registry.Option { func Advertise(a string) registry.Option {
return func(o *registry.Options) { return setRegistryOption(contextAdvertise{}, a)
o.Context = context.WithValue(o.Context, contextAdvertise{}, a)
} }
type contextContext struct{}
// Context specifies a context for the registry.
// Can be used to signal shutdown of the registry.
// Can be used for extra option values.
func Context(ctx context.Context) registry.Option {
return setRegistryOption(contextContext{}, ctx)
} }

View File

@ -3,9 +3,12 @@
package gossip package gossip
import proto "github.com/golang/protobuf/proto" import (
import fmt "fmt" fmt "fmt"
import math "math" math "math"
proto "github.com/golang/protobuf/proto"
)
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal
@ -20,16 +23,12 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// Update is the message broadcast // Update is the message broadcast
type Update struct { type Update struct {
// unique id of update
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// unix nano timestamp of update
Timestamp uint64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// time to live for entry // time to live for entry
Expires uint64 `protobuf:"varint,3,opt,name=expires,proto3" json:"expires,omitempty"` Expires uint64 `protobuf:"varint,1,opt,name=expires,proto3" json:"expires,omitempty"`
// type of update; service // type of update
Type string `protobuf:"bytes,4,opt,name=type,proto3" json:"type,omitempty"` Type int32 `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"`
// what action is taken; add, del, put // what action is taken
Action string `protobuf:"bytes,5,opt,name=action,proto3" json:"action,omitempty"` Action int32 `protobuf:"varint,3,opt,name=action,proto3" json:"action,omitempty"`
// any other associated metadata about the data // any other associated metadata about the data
Metadata map[string]string `protobuf:"bytes,6,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Metadata map[string]string `protobuf:"bytes,6,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// the payload data; // the payload data;
@ -43,16 +42,17 @@ func (m *Update) Reset() { *m = Update{} }
func (m *Update) String() string { return proto.CompactTextString(m) } func (m *Update) String() string { return proto.CompactTextString(m) }
func (*Update) ProtoMessage() {} func (*Update) ProtoMessage() {}
func (*Update) Descriptor() ([]byte, []int) { func (*Update) Descriptor() ([]byte, []int) {
return fileDescriptor_gossip_fd1eb378131a5d12, []int{0} return fileDescriptor_18cba623e76e57f3, []int{0}
} }
func (m *Update) XXX_Unmarshal(b []byte) error { func (m *Update) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Update.Unmarshal(m, b) return xxx_messageInfo_Update.Unmarshal(m, b)
} }
func (m *Update) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { func (m *Update) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Update.Marshal(b, m, deterministic) return xxx_messageInfo_Update.Marshal(b, m, deterministic)
} }
func (dst *Update) XXX_Merge(src proto.Message) { func (m *Update) XXX_Merge(src proto.Message) {
xxx_messageInfo_Update.Merge(dst, src) xxx_messageInfo_Update.Merge(m, src)
} }
func (m *Update) XXX_Size() int { func (m *Update) XXX_Size() int {
return xxx_messageInfo_Update.Size(m) return xxx_messageInfo_Update.Size(m)
@ -63,20 +63,6 @@ func (m *Update) XXX_DiscardUnknown() {
var xxx_messageInfo_Update proto.InternalMessageInfo var xxx_messageInfo_Update proto.InternalMessageInfo
func (m *Update) GetId() string {
if m != nil {
return m.Id
}
return ""
}
func (m *Update) GetTimestamp() uint64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *Update) GetExpires() uint64 { func (m *Update) GetExpires() uint64 {
if m != nil { if m != nil {
return m.Expires return m.Expires
@ -84,18 +70,18 @@ func (m *Update) GetExpires() uint64 {
return 0 return 0
} }
func (m *Update) GetType() string { func (m *Update) GetType() int32 {
if m != nil { if m != nil {
return m.Type return m.Type
} }
return "" return 0
} }
func (m *Update) GetAction() string { func (m *Update) GetAction() int32 {
if m != nil { if m != nil {
return m.Action return m.Action
} }
return "" return 0
} }
func (m *Update) GetMetadata() map[string]string { func (m *Update) GetMetadata() map[string]string {
@ -118,25 +104,24 @@ func init() {
} }
func init() { func init() {
proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor_gossip_fd1eb378131a5d12) proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor_18cba623e76e57f3)
} }
var fileDescriptor_gossip_fd1eb378131a5d12 = []byte{ var fileDescriptor_18cba623e76e57f3 = []byte{
// 251 bytes of a gzipped FileDescriptorProto // 227 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0xcf, 0x4a, 0xc4, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x8f, 0xc1, 0x4a, 0x03, 0x31,
0x10, 0x87, 0x69, 0xb6, 0x9b, 0xb5, 0xe3, 0x1f, 0x64, 0x10, 0x09, 0xb2, 0x87, 0xe2, 0xa9, 0x17, 0x14, 0x45, 0x49, 0xa7, 0x4d, 0xed, 0x53, 0x41, 0x1e, 0x22, 0x41, 0x5c, 0x0c, 0xae, 0x66, 0xe3,
0x5b, 0xd0, 0xcb, 0xa2, 0x5e, 0x3d, 0x7a, 0x09, 0xf8, 0x00, 0xd9, 0x36, 0xd4, 0xa0, 0xd9, 0x84, 0x0c, 0xe8, 0xa6, 0xa8, 0x5b, 0x97, 0x6e, 0x02, 0x7e, 0x40, 0x3a, 0x0d, 0x31, 0xe8, 0x34, 0x21,
0x64, 0x56, 0xec, 0x13, 0xf8, 0xda, 0xb2, 0x69, 0x54, 0xbc, 0x7d, 0xdf, 0xcc, 0x24, 0x99, 0x5f, 0x79, 0x15, 0xf3, 0xa9, 0xfe, 0x8d, 0x34, 0x89, 0x42, 0x77, 0xe7, 0x24, 0x37, 0xdc, 0x1b, 0x78,
0xe0, 0x71, 0x34, 0xf4, 0xba, 0xdf, 0xb6, 0xbd, 0xb3, 0x9d, 0x35, 0x7d, 0x70, 0xdd, 0xe8, 0x6e, 0x36, 0x96, 0xde, 0xf7, 0x9b, 0x7e, 0x74, 0xd3, 0x30, 0xd9, 0x31, 0xb8, 0xc1, 0xb8, 0xbb, 0x02,
0x66, 0x08, 0x7a, 0x34, 0x91, 0xc2, 0xd4, 0x8d, 0x2e, 0x46, 0xe3, 0x3b, 0x1f, 0x1c, 0xb9, 0x2c, 0x41, 0x1b, 0x1b, 0x29, 0xa4, 0xc1, 0xb8, 0x18, 0xad, 0x1f, 0x7c, 0x70, 0xe4, 0xaa, 0xf4, 0x59,
0x6d, 0x12, 0xe4, 0xb3, 0x5d, 0x7f, 0x31, 0xe0, 0x2f, 0x7e, 0x50, 0xa4, 0xf1, 0x0c, 0x98, 0x19, 0x90, 0x17, 0xbb, 0xfd, 0x61, 0xc0, 0xdf, 0xfc, 0x56, 0x91, 0x46, 0x01, 0x4b, 0xfd, 0xed, 0x6d,
0x44, 0x51, 0x17, 0x4d, 0x25, 0x99, 0x19, 0x70, 0x0d, 0x15, 0x19, 0xab, 0x23, 0x29, 0xeb, 0x05, 0xd0, 0x51, 0xb0, 0x96, 0x75, 0x73, 0xf9, 0xa7, 0x88, 0x30, 0xa7, 0xe4, 0xb5, 0x98, 0xb5, 0xac,
0xab, 0x8b, 0xa6, 0x94, 0x7f, 0x05, 0x14, 0xb0, 0xd2, 0x9f, 0xde, 0x04, 0x1d, 0xc5, 0x22, 0xf5, 0x5b, 0xc8, 0xcc, 0x78, 0x05, 0x5c, 0x8d, 0x64, 0xdd, 0x4e, 0x34, 0xf9, 0xb4, 0x1a, 0xae, 0xe1,
0x7e, 0x14, 0x11, 0x4a, 0x9a, 0xbc, 0x16, 0x65, 0xba, 0x29, 0x31, 0x5e, 0x02, 0x57, 0x3d, 0x19, 0x64, 0xd2, 0xa4, 0xb6, 0x8a, 0x94, 0xe0, 0x6d, 0xd3, 0x9d, 0xde, 0xdf, 0xf4, 0xb5, 0xb9, 0xf4,
0xb7, 0x13, 0xcb, 0x54, 0xcd, 0x86, 0x1b, 0x38, 0xb2, 0x9a, 0xd4, 0xa0, 0x48, 0x09, 0x5e, 0x2f, 0xf4, 0xaf, 0xf5, 0xfa, 0x65, 0x47, 0x21, 0xc9, 0xff, 0xf4, 0xa1, 0x25, 0xbf, 0x5a, 0xb6, 0xac,
0x9a, 0xe3, 0xdb, 0x75, 0x9b, 0xf7, 0x9c, 0xb7, 0x6a, 0x9f, 0x73, 0xfb, 0x69, 0x47, 0x61, 0x92, 0x3b, 0x93, 0x99, 0xaf, 0x9f, 0xe0, 0xfc, 0x28, 0x8e, 0x17, 0xd0, 0x7c, 0xe8, 0x94, 0x07, 0xae,
0xbf, 0xd3, 0x87, 0x57, 0xd2, 0xa9, 0x55, 0x5d, 0x34, 0x27, 0x32, 0xf1, 0xd5, 0x03, 0x9c, 0xfe, 0xe4, 0x01, 0xf1, 0x12, 0x16, 0x5f, 0xea, 0x73, 0x5f, 0xd6, 0xad, 0x64, 0x91, 0xc7, 0xd9, 0x9a,
0x1b, 0xc7, 0x73, 0x58, 0xbc, 0xe9, 0x29, 0x67, 0x3a, 0x20, 0x5e, 0xc0, 0xf2, 0x43, 0xbd, 0xef, 0x6d, 0x78, 0xfe, 0xea, 0xc3, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xd6, 0x63, 0x7b, 0x1b, 0x2a,
0x75, 0x0a, 0x54, 0xc9, 0x59, 0xee, 0xd9, 0xa6, 0xd8, 0xf2, 0xf4, 0x31, 0x77, 0xdf, 0x01, 0x00, 0x01, 0x00, 0x00,
0x00, 0xff, 0xff, 0x06, 0x6e, 0x00, 0x3c, 0x58, 0x01, 0x00, 0x00,
} }

View File

@ -4,16 +4,12 @@ package gossip;
// Update is the message broadcast // Update is the message broadcast
message Update { message Update {
// unique id of update
string id = 1;
// unix nano timestamp of update
uint64 timestamp = 2;
// time to live for entry // time to live for entry
uint64 expires = 3; uint64 expires = 1;
// type of update; service // type of update
string type = 4; int32 type = 2;
// what action is taken; add, del, put // what action is taken
string action = 5; int32 action = 3;
// any other associated metadata about the data // any other associated metadata about the data
map<string, string> metadata = 6; map<string, string> metadata = 6;
// the payload data; // the payload data;

View File

@ -157,6 +157,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// TODO: handle error better // TODO: handle error better
if err := handler(ctx, request, response); err != nil { if err := handler(ctx, request, response); err != nil {
if err != lastStreamResponseError {
// write an error response // write an error response
err = rcodec.Write(&codec.Message{ err = rcodec.Write(&codec.Message{
Header: msg.Header, Header: msg.Header,
@ -167,6 +168,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
if err != nil { if err != nil {
log.Logf("rpc: unable to write error response: %v", err) log.Logf("rpc: unable to write error response: %v", err)
} }
}
s.wg.Done() s.wg.Done()
return return
} }