diff --git a/http.go b/http.go index de40ed3..c5633f5 100644 --- a/http.go +++ b/http.go @@ -9,12 +9,12 @@ import ( "sort" "sync" - "github.com/micro/go-log" + log "github.com/micro/go-log" "github.com/micro/go-micro/broker" - "github.com/micro/go-micro/server" "github.com/micro/go-micro/cmd" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/server" "github.com/micro/go-plugins/codec/jsonrpc" "github.com/micro/go-plugins/codec/protorpc" ) @@ -28,6 +28,7 @@ var ( "application/octet-stream": protorpc.NewCodec, } ) + type httpServer struct { sync.Mutex opts server.Options @@ -35,6 +36,8 @@ type httpServer struct { exit chan chan error registerOnce sync.Once subscribers map[*httpSubscriber][]broker.Subscriber + // used for first registration + registered bool } func init() { @@ -162,23 +165,33 @@ func (h *httpServer) Register() error { h.registerOnce.Do(func() { log.Logf("Registering node: %s", opts.Name+"-"+opts.Id) - - for sb, _ := range h.subscribers { - handler := h.createSubHandler(sb, opts) - var subOpts []broker.SubscribeOption - if queue := sb.Options().Queue; len(queue) > 0 { - subOpts = append(subOpts, broker.Queue(queue)) - } - sub, err := opts.Broker.Subscribe(sb.Topic(), handler, subOpts...) - if err != nil { - log.Logf("Registering subscriber: %s, err: %s", sb.Topic, err) - return - } - h.subscribers[sb] = []broker.Subscriber{sub} - } }) - return opts.Registry.Register(service, rOpts...) + if err := opts.Registry.Register(service, rOpts...); err != nil { + return err + } + + h.Lock() + defer h.Unlock() + + if h.registered { + return nil + } + h.registered = true + + for sb, _ := range h.subscribers { + handler := h.createSubHandler(sb, opts) + var subOpts []broker.SubscribeOption + if queue := sb.Options().Queue; len(queue) > 0 { + subOpts = append(subOpts, broker.Queue(queue)) + } + sub, err := opts.Broker.Subscribe(sb.Topic(), handler, subOpts...) + if err != nil { + return err + } + h.subscribers[sb] = []broker.Subscriber{sub} + } + return nil } func (h *httpServer) Deregister() error { @@ -194,6 +207,12 @@ func (h *httpServer) Deregister() error { } h.Lock() + if !h.registered { + h.Unlock() + return nil + } + h.registered = false + for sb, subs := range h.subscribers { for _, sub := range subs { log.Logf("Unsubscribing from topic: %s", sub.Topic()) @@ -249,8 +268,8 @@ func (h *httpServer) String() string { func newServer(opts ...server.Option) server.Server { return &httpServer{ - opts: newOptions(opts...), - exit: make(chan chan error), + opts: newOptions(opts...), + exit: make(chan chan error), subscribers: make(map[*httpSubscriber][]broker.Subscriber), } }