fixup for never micro

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-10-16 16:20:19 +03:00
parent 3c26142d45
commit 74b29ac236
3 changed files with 23 additions and 14 deletions

2
go.mod
View File

@ -2,4 +2,4 @@ module github.com/unistack-org/micro-server-http
go 1.13 go 1.13
require github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201009135158-2fc47782cf47 require github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201016063857-14c97d59c15f

4
go.sum
View File

@ -289,8 +289,8 @@ github.com/unistack-org/micro-config-cmd v0.0.0-20200920140133-0853deb2e5dc/go.m
github.com/unistack-org/micro/v3 v3.0.0-20200827083227-aa99378adc6e/go.mod h1:rPQbnry3nboAnMczj8B1Gzlcyv/HYoMZLgd3/3nttJ4= github.com/unistack-org/micro/v3 v3.0.0-20200827083227-aa99378adc6e/go.mod h1:rPQbnry3nboAnMczj8B1Gzlcyv/HYoMZLgd3/3nttJ4=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200909210629-caec730248b1/go.mod h1:mmqHR9WelHUXqg2mELjsQ+FJHcWs6mNmXg+wEYO2T3c=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20200920135754-1cbd1d2bad83/go.mod h1:HUzMG4Mcy97958VxWTg8zuazZgwQ/aoLZ8wtBVONwRE=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201009135158-2fc47782cf47 h1:3d/HgT7Iq/UIw5OGyzfUeZPJwydhBohh9shyGJH14EA= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201016063857-14c97d59c15f h1:9PLnkfb9vdn1yHlKLIGo5AiSNzqerZscsm9R+uW+DAw=
github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201009135158-2fc47782cf47/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM= github.com/unistack-org/micro/v3 v3.0.0-gamma.0.20201016063857-14c97d59c15f/go.mod h1:aL+8VhSXpx0SuEeXPOWUo5BgS7kyvWYobeXFay90UUM=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA= github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=

31
http.go
View File

@ -17,7 +17,6 @@ import (
"github.com/unistack-org/micro/v3/logger" "github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/registry" "github.com/unistack-org/micro/v3/registry"
"github.com/unistack-org/micro/v3/server" "github.com/unistack-org/micro/v3/server"
regutil "github.com/unistack-org/micro/v3/util/registry"
) )
var ( var (
@ -137,14 +136,11 @@ func (h *httpServer) Register() error {
eps := h.hd.Endpoints() eps := h.hd.Endpoints()
h.Unlock() h.Unlock()
service, err := regutil.NewService(h) service, err := server.NewRegistryService(h)
if err != nil { if err != nil {
return err return err
} }
if len(service.Metadata) == 0 { service.Nodes[0].Metadata["protocol"] = "http"
service.Metadata = make(map[string]string, 5)
}
service.Metadata["protocol"] = "http"
service.Endpoints = eps service.Endpoints = eps
h.Lock() h.Lock()
@ -183,6 +179,8 @@ func (h *httpServer) Register() error {
} }
h.registered = true h.registered = true
subCtx := h.opts.Context
for sb := range h.subscribers { for sb := range h.subscribers {
handler := h.createSubHandler(sb, opts) handler := h.createSubHandler(sb, opts)
var subOpts []broker.SubscribeOption var subOpts []broker.SubscribeOption
@ -190,11 +188,15 @@ func (h *httpServer) Register() error {
subOpts = append(subOpts, broker.Queue(queue)) subOpts = append(subOpts, broker.Queue(queue))
} }
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
if !sb.Options().AutoAck { if !sb.Options().AutoAck {
subOpts = append(subOpts, broker.DisableAutoAck()) subOpts = append(subOpts, broker.DisableAutoAck())
} }
sub, err := opts.Broker.Subscribe(sb.Topic(), handler, subOpts...) sub, err := opts.Broker.Subscribe(subCtx, sb.Topic(), handler, subOpts...)
if err != nil { if err != nil {
return err return err
} }
@ -210,7 +212,7 @@ func (h *httpServer) Deregister() error {
logger.Infof("Deregistering node: %s", opts.Name+"-"+opts.Id) logger.Infof("Deregistering node: %s", opts.Name+"-"+opts.Id)
service, err := regutil.NewService(h) service, err := server.NewRegistryService(h)
if err != nil { if err != nil {
return err return err
} }
@ -226,10 +228,17 @@ func (h *httpServer) Deregister() error {
} }
h.registered = false h.registered = false
subCtx := h.opts.Context
for sb, subs := range h.subscribers { for sb, subs := range h.subscribers {
for _, sub := range subs { for _, sub := range subs {
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
logger.Infof("Unsubscribing from topic: %s", sub.Topic()) logger.Infof("Unsubscribing from topic: %s", sub.Topic())
sub.Unsubscribe() if err := sub.Unsubscribe(subCtx); err != nil {
logger.Errorf("failed to unsubscribe topic: %s, error: %v", sb.Topic(), err)
return err
}
} }
h.subscribers[sb] = nil h.subscribers[sb] = nil
} }
@ -261,7 +270,7 @@ func (h *httpServer) Start() error {
return errors.New("Server required http.Handler") return errors.New("Server required http.Handler")
} }
if err = opts.Broker.Connect(); err != nil { if err = opts.Broker.Connect(h.opts.Context); err != nil {
return err return err
} }
@ -334,7 +343,7 @@ func (h *httpServer) Start() error {
// deregister // deregister
h.Deregister() h.Deregister()
opts.Broker.Disconnect() opts.Broker.Disconnect(h.opts.Context)
}() }()
return nil return nil