fix: allow subscribers to register and deregister multi times.

This commit is contained in:
武新飞 2018-12-19 18:22:27 +08:00 committed by Vasiliy Tolstov
parent 3f5cbf2bcd
commit 532edc786f

57
http.go
View File

@ -9,12 +9,12 @@ import (
"sort" "sort"
"sync" "sync"
"github.com/micro/go-log" log "github.com/micro/go-log"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/cmd" "github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server"
"github.com/micro/go-plugins/codec/jsonrpc" "github.com/micro/go-plugins/codec/jsonrpc"
"github.com/micro/go-plugins/codec/protorpc" "github.com/micro/go-plugins/codec/protorpc"
) )
@ -28,6 +28,7 @@ var (
"application/octet-stream": protorpc.NewCodec, "application/octet-stream": protorpc.NewCodec,
} }
) )
type httpServer struct { type httpServer struct {
sync.Mutex sync.Mutex
opts server.Options opts server.Options
@ -35,6 +36,8 @@ type httpServer struct {
exit chan chan error exit chan chan error
registerOnce sync.Once registerOnce sync.Once
subscribers map[*httpSubscriber][]broker.Subscriber subscribers map[*httpSubscriber][]broker.Subscriber
// used for first registration
registered bool
} }
func init() { func init() {
@ -162,23 +165,33 @@ func (h *httpServer) Register() error {
h.registerOnce.Do(func() { h.registerOnce.Do(func() {
log.Logf("Registering node: %s", opts.Name+"-"+opts.Id) log.Logf("Registering node: %s", opts.Name+"-"+opts.Id)
for sb, _ := range h.subscribers {
handler := h.createSubHandler(sb, opts)
var subOpts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
subOpts = append(subOpts, broker.Queue(queue))
}
sub, err := opts.Broker.Subscribe(sb.Topic(), handler, subOpts...)
if err != nil {
log.Logf("Registering subscriber: %s, err: %s", sb.Topic, err)
return
}
h.subscribers[sb] = []broker.Subscriber{sub}
}
}) })
return opts.Registry.Register(service, rOpts...) if err := opts.Registry.Register(service, rOpts...); err != nil {
return err
}
h.Lock()
defer h.Unlock()
if h.registered {
return nil
}
h.registered = true
for sb, _ := range h.subscribers {
handler := h.createSubHandler(sb, opts)
var subOpts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
subOpts = append(subOpts, broker.Queue(queue))
}
sub, err := opts.Broker.Subscribe(sb.Topic(), handler, subOpts...)
if err != nil {
return err
}
h.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
} }
func (h *httpServer) Deregister() error { func (h *httpServer) Deregister() error {
@ -194,6 +207,12 @@ func (h *httpServer) Deregister() error {
} }
h.Lock() h.Lock()
if !h.registered {
h.Unlock()
return nil
}
h.registered = false
for sb, subs := range h.subscribers { for sb, subs := range h.subscribers {
for _, sub := range subs { for _, sub := range subs {
log.Logf("Unsubscribing from topic: %s", sub.Topic()) log.Logf("Unsubscribing from topic: %s", sub.Topic())
@ -249,8 +268,8 @@ func (h *httpServer) String() string {
func newServer(opts ...server.Option) server.Server { func newServer(opts ...server.Option) server.Server {
return &httpServer{ return &httpServer{
opts: newOptions(opts...), opts: newOptions(opts...),
exit: make(chan chan error), exit: make(chan chan error),
subscribers: make(map[*httpSubscriber][]broker.Subscriber), subscribers: make(map[*httpSubscriber][]broker.Subscriber),
} }
} }