Merge pull request 'fix build' (#161) from fixup into master
Reviewed-on: #161
This commit is contained in:
commit
9926d52f78
100
http.go
100
http.go
@ -9,12 +9,10 @@ import (
|
||||
"net"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.unistack.org/micro/v4/broker"
|
||||
"go.unistack.org/micro/v4/codec"
|
||||
"go.unistack.org/micro/v4/logger"
|
||||
"go.unistack.org/micro/v4/register"
|
||||
@ -30,7 +28,6 @@ type Server struct {
|
||||
rsvc *register.Service
|
||||
handlers map[string]server.Handler
|
||||
exit chan chan error
|
||||
subscribers map[*httpSubscriber][]broker.Subscriber
|
||||
errorHandler func(context.Context, server.Handler, http.ResponseWriter, *http.Request, error, int)
|
||||
pathHandlers *rhttp.Trie
|
||||
opts server.Options
|
||||
@ -101,10 +98,6 @@ func (h *Server) Init(opts ...server.Option) error {
|
||||
h.RUnlock()
|
||||
return err
|
||||
}
|
||||
if err := h.opts.Broker.Init(); err != nil {
|
||||
h.RUnlock()
|
||||
return err
|
||||
}
|
||||
if err := h.opts.Tracer.Init(); err != nil {
|
||||
h.RUnlock()
|
||||
return err
|
||||
@ -288,35 +281,6 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s
|
||||
return hdlr
|
||||
}
|
||||
|
||||
func (h *Server) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||
return newSubscriber(topic, handler, opts...)
|
||||
}
|
||||
|
||||
func (h *Server) Subscribe(sb server.Subscriber) error {
|
||||
sub, ok := sb.(*httpSubscriber)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid subscriber: expected *httpSubscriber")
|
||||
}
|
||||
if len(sub.handlers) == 0 {
|
||||
return fmt.Errorf("invalid subscriber: no handler functions")
|
||||
}
|
||||
|
||||
if err := server.ValidateSubscriber(sb); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h.RLock()
|
||||
_, ok = h.subscribers[sub]
|
||||
h.RUnlock()
|
||||
if ok {
|
||||
return fmt.Errorf("subscriber %v already exists", h)
|
||||
}
|
||||
h.Lock()
|
||||
h.subscribers[sub] = nil
|
||||
h.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Server) Register() error {
|
||||
var eps []*register.Endpoint
|
||||
h.RLock()
|
||||
@ -342,21 +306,6 @@ func (h *Server) Register() error {
|
||||
service.Nodes[0].Metadata["protocol"] = "http"
|
||||
service.Endpoints = eps
|
||||
|
||||
h.Lock()
|
||||
subscriberList := make([]*httpSubscriber, 0, len(h.subscribers))
|
||||
for e := range h.subscribers {
|
||||
// Only advertise non internal subscribers
|
||||
subscriberList = append(subscriberList, e)
|
||||
}
|
||||
sort.Slice(subscriberList, func(i, j int) bool {
|
||||
return subscriberList[i].topic > subscriberList[j].topic
|
||||
})
|
||||
|
||||
for _, e := range subscriberList {
|
||||
service.Endpoints = append(service.Endpoints, e.Endpoints()...)
|
||||
}
|
||||
h.Unlock()
|
||||
|
||||
h.RLock()
|
||||
registered := h.registered
|
||||
h.RUnlock()
|
||||
@ -378,29 +327,6 @@ func (h *Server) Register() error {
|
||||
}
|
||||
|
||||
h.Lock()
|
||||
for sb := range h.subscribers {
|
||||
handler := h.createSubHandler(sb, config)
|
||||
var opts []broker.SubscribeOption
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
opts = append(opts, broker.SubscribeGroup(queue))
|
||||
}
|
||||
|
||||
subCtx := config.Context
|
||||
if cx := sb.Options().Context; cx != nil {
|
||||
subCtx = cx
|
||||
}
|
||||
opts = append(opts, broker.SubscribeContext(subCtx))
|
||||
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
||||
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
|
||||
|
||||
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
||||
if err != nil {
|
||||
h.Unlock()
|
||||
return err
|
||||
}
|
||||
h.subscribers[sb] = []broker.Subscriber{sub}
|
||||
}
|
||||
|
||||
h.registered = true
|
||||
h.rsvc = service
|
||||
h.Unlock()
|
||||
@ -435,23 +361,6 @@ func (h *Server) Deregister() error {
|
||||
}
|
||||
|
||||
h.registered = false
|
||||
|
||||
subCtx := h.opts.Context
|
||||
for sb, subs := range h.subscribers {
|
||||
if cx := sb.Options().Context; cx != nil {
|
||||
subCtx = cx
|
||||
}
|
||||
|
||||
for _, sub := range subs {
|
||||
config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", sub.Topic())
|
||||
if err := sub.Unsubscribe(subCtx); err != nil {
|
||||
h.Unlock()
|
||||
config.Logger.Errorf(config.Context, "failed to unsubscribe topic: %s, error: %v", sb.Topic(), err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
h.subscribers[sb] = nil
|
||||
}
|
||||
h.Unlock()
|
||||
return nil
|
||||
}
|
||||
@ -525,10 +434,6 @@ func (h *Server) Start() error {
|
||||
return fmt.Errorf("cant process with nil handler")
|
||||
}
|
||||
|
||||
if err := config.Broker.Connect(h.opts.Context); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := config.RegisterCheck(h.opts.Context); err != nil {
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err)
|
||||
@ -626,10 +531,6 @@ func (h *Server) Start() error {
|
||||
config.Logger.Errorf(config.Context, "Server deregister error: %s", err)
|
||||
}
|
||||
|
||||
if err := config.Broker.Disconnect(config.Context); err != nil {
|
||||
config.Logger.Errorf(config.Context, "Broker disconnect error: %s", err)
|
||||
}
|
||||
|
||||
ch <- ts.Close()
|
||||
}()
|
||||
|
||||
@ -659,7 +560,6 @@ func NewServer(opts ...server.Option) *Server {
|
||||
return &Server{
|
||||
opts: options,
|
||||
exit: make(chan chan error),
|
||||
subscribers: make(map[*httpSubscriber][]broker.Subscriber),
|
||||
errorHandler: eh,
|
||||
pathHandlers: rhttp.NewTrie(),
|
||||
}
|
||||
|
@ -85,8 +85,8 @@ func Middleware(mw ...func(http.Handler) http.Handler) server.Option {
|
||||
|
||||
type serverKey struct{}
|
||||
|
||||
// Server provide ability to pass *http.Server
|
||||
func Server(hs *http.Server) server.Option {
|
||||
// HTTPServer provide ability to pass *http.Server
|
||||
func HTTPServer(hs *http.Server) server.Option {
|
||||
return server.SetOption(serverKey{}, hs)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user