From 2ed04e3e2457cb39c76f0e57d06ba3e697df11e9 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 9 May 2023 18:47:56 +0300 Subject: [PATCH] fix build Signed-off-by: Vasiliy Tolstov --- http.go | 100 ----------------------------------------------------- options.go | 4 +-- server.go | 2 ++ 3 files changed, 4 insertions(+), 102 deletions(-) diff --git a/http.go b/http.go index f2a1f47..e0152a7 100644 --- a/http.go +++ b/http.go @@ -9,12 +9,10 @@ import ( "net" "net/http" "reflect" - "sort" "strings" "sync" "time" - "go.unistack.org/micro/v4/broker" "go.unistack.org/micro/v4/codec" "go.unistack.org/micro/v4/logger" "go.unistack.org/micro/v4/register" @@ -30,7 +28,6 @@ type Server struct { rsvc *register.Service handlers map[string]server.Handler exit chan chan error - subscribers map[*httpSubscriber][]broker.Subscriber errorHandler func(context.Context, server.Handler, http.ResponseWriter, *http.Request, error, int) pathHandlers *rhttp.Trie opts server.Options @@ -101,10 +98,6 @@ func (h *Server) Init(opts ...server.Option) error { h.RUnlock() return err } - if err := h.opts.Broker.Init(); err != nil { - h.RUnlock() - return err - } if err := h.opts.Tracer.Init(); err != nil { h.RUnlock() return err @@ -288,35 +281,6 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s return hdlr } -func (h *Server) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber { - return newSubscriber(topic, handler, opts...) -} - -func (h *Server) Subscribe(sb server.Subscriber) error { - sub, ok := sb.(*httpSubscriber) - if !ok { - return fmt.Errorf("invalid subscriber: expected *httpSubscriber") - } - if len(sub.handlers) == 0 { - return fmt.Errorf("invalid subscriber: no handler functions") - } - - if err := server.ValidateSubscriber(sb); err != nil { - return err - } - - h.RLock() - _, ok = h.subscribers[sub] - h.RUnlock() - if ok { - return fmt.Errorf("subscriber %v already exists", h) - } - h.Lock() - h.subscribers[sub] = nil - h.Unlock() - return nil -} - func (h *Server) Register() error { var eps []*register.Endpoint h.RLock() @@ -342,21 +306,6 @@ func (h *Server) Register() error { service.Nodes[0].Metadata["protocol"] = "http" service.Endpoints = eps - h.Lock() - subscriberList := make([]*httpSubscriber, 0, len(h.subscribers)) - for e := range h.subscribers { - // Only advertise non internal subscribers - subscriberList = append(subscriberList, e) - } - sort.Slice(subscriberList, func(i, j int) bool { - return subscriberList[i].topic > subscriberList[j].topic - }) - - for _, e := range subscriberList { - service.Endpoints = append(service.Endpoints, e.Endpoints()...) - } - h.Unlock() - h.RLock() registered := h.registered h.RUnlock() @@ -378,29 +327,6 @@ func (h *Server) Register() error { } h.Lock() - for sb := range h.subscribers { - handler := h.createSubHandler(sb, config) - var opts []broker.SubscribeOption - if queue := sb.Options().Queue; len(queue) > 0 { - opts = append(opts, broker.SubscribeGroup(queue)) - } - - subCtx := config.Context - if cx := sb.Options().Context; cx != nil { - subCtx = cx - } - opts = append(opts, broker.SubscribeContext(subCtx)) - opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) - opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly)) - - sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) - if err != nil { - h.Unlock() - return err - } - h.subscribers[sb] = []broker.Subscriber{sub} - } - h.registered = true h.rsvc = service h.Unlock() @@ -435,23 +361,6 @@ func (h *Server) Deregister() error { } h.registered = false - - subCtx := h.opts.Context - for sb, subs := range h.subscribers { - if cx := sb.Options().Context; cx != nil { - subCtx = cx - } - - for _, sub := range subs { - config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", sub.Topic()) - if err := sub.Unsubscribe(subCtx); err != nil { - h.Unlock() - config.Logger.Errorf(config.Context, "failed to unsubscribe topic: %s, error: %v", sb.Topic(), err) - return err - } - } - h.subscribers[sb] = nil - } h.Unlock() return nil } @@ -525,10 +434,6 @@ func (h *Server) Start() error { return fmt.Errorf("cant process with nil handler") } - if err := config.Broker.Connect(h.opts.Context); err != nil { - return err - } - if err := config.RegisterCheck(h.opts.Context); err != nil { if config.Logger.V(logger.ErrorLevel) { config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err) @@ -626,10 +531,6 @@ func (h *Server) Start() error { config.Logger.Errorf(config.Context, "Server deregister error: %s", err) } - if err := config.Broker.Disconnect(config.Context); err != nil { - config.Logger.Errorf(config.Context, "Broker disconnect error: %s", err) - } - ch <- ts.Close() }() @@ -659,7 +560,6 @@ func NewServer(opts ...server.Option) *Server { return &Server{ opts: options, exit: make(chan chan error), - subscribers: make(map[*httpSubscriber][]broker.Subscriber), errorHandler: eh, pathHandlers: rhttp.NewTrie(), } diff --git a/options.go b/options.go index 3e6fde1..fa3da2f 100644 --- a/options.go +++ b/options.go @@ -85,8 +85,8 @@ func Middleware(mw ...func(http.Handler) http.Handler) server.Option { type serverKey struct{} -// Server provide ability to pass *http.Server -func Server(hs *http.Server) server.Option { +// HTTPServer provide ability to pass *http.Server +func HTTPServer(hs *http.Server) server.Option { return server.SetOption(serverKey{}, hs) } diff --git a/server.go b/server.go index 8ea0da0..8669be3 100644 --- a/server.go +++ b/server.go @@ -10,6 +10,8 @@ import ( "go.unistack.org/micro/v4/server" ) +var typeOfError = reflect.TypeOf((*error)(nil)).Elem() + type methodType struct { ArgType reflect.Type ReplyType reflect.Type