100
									
								
								http.go
									
									
									
									
									
								
							
							
						
						
									
										100
									
								
								http.go
									
									
									
									
									
								
							| @@ -9,12 +9,10 @@ import ( | ||||
| 	"net" | ||||
| 	"net/http" | ||||
| 	"reflect" | ||||
| 	"sort" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"go.unistack.org/micro/v4/broker" | ||||
| 	"go.unistack.org/micro/v4/codec" | ||||
| 	"go.unistack.org/micro/v4/logger" | ||||
| 	"go.unistack.org/micro/v4/register" | ||||
| @@ -30,7 +28,6 @@ type Server struct { | ||||
| 	rsvc         *register.Service | ||||
| 	handlers     map[string]server.Handler | ||||
| 	exit         chan chan error | ||||
| 	subscribers  map[*httpSubscriber][]broker.Subscriber | ||||
| 	errorHandler func(context.Context, server.Handler, http.ResponseWriter, *http.Request, error, int) | ||||
| 	pathHandlers *rhttp.Trie | ||||
| 	opts         server.Options | ||||
| @@ -101,10 +98,6 @@ func (h *Server) Init(opts ...server.Option) error { | ||||
| 		h.RUnlock() | ||||
| 		return err | ||||
| 	} | ||||
| 	if err := h.opts.Broker.Init(); err != nil { | ||||
| 		h.RUnlock() | ||||
| 		return err | ||||
| 	} | ||||
| 	if err := h.opts.Tracer.Init(); err != nil { | ||||
| 		h.RUnlock() | ||||
| 		return err | ||||
| @@ -288,35 +281,6 @@ func (h *Server) NewHandler(handler interface{}, opts ...server.HandlerOption) s | ||||
| 	return hdlr | ||||
| } | ||||
|  | ||||
| func (h *Server) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber { | ||||
| 	return newSubscriber(topic, handler, opts...) | ||||
| } | ||||
|  | ||||
| func (h *Server) Subscribe(sb server.Subscriber) error { | ||||
| 	sub, ok := sb.(*httpSubscriber) | ||||
| 	if !ok { | ||||
| 		return fmt.Errorf("invalid subscriber: expected *httpSubscriber") | ||||
| 	} | ||||
| 	if len(sub.handlers) == 0 { | ||||
| 		return fmt.Errorf("invalid subscriber: no handler functions") | ||||
| 	} | ||||
|  | ||||
| 	if err := server.ValidateSubscriber(sb); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	h.RLock() | ||||
| 	_, ok = h.subscribers[sub] | ||||
| 	h.RUnlock() | ||||
| 	if ok { | ||||
| 		return fmt.Errorf("subscriber %v already exists", h) | ||||
| 	} | ||||
| 	h.Lock() | ||||
| 	h.subscribers[sub] = nil | ||||
| 	h.Unlock() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (h *Server) Register() error { | ||||
| 	var eps []*register.Endpoint | ||||
| 	h.RLock() | ||||
| @@ -342,21 +306,6 @@ func (h *Server) Register() error { | ||||
| 	service.Nodes[0].Metadata["protocol"] = "http" | ||||
| 	service.Endpoints = eps | ||||
|  | ||||
| 	h.Lock() | ||||
| 	subscriberList := make([]*httpSubscriber, 0, len(h.subscribers)) | ||||
| 	for e := range h.subscribers { | ||||
| 		// Only advertise non internal subscribers | ||||
| 		subscriberList = append(subscriberList, e) | ||||
| 	} | ||||
| 	sort.Slice(subscriberList, func(i, j int) bool { | ||||
| 		return subscriberList[i].topic > subscriberList[j].topic | ||||
| 	}) | ||||
|  | ||||
| 	for _, e := range subscriberList { | ||||
| 		service.Endpoints = append(service.Endpoints, e.Endpoints()...) | ||||
| 	} | ||||
| 	h.Unlock() | ||||
|  | ||||
| 	h.RLock() | ||||
| 	registered := h.registered | ||||
| 	h.RUnlock() | ||||
| @@ -378,29 +327,6 @@ func (h *Server) Register() error { | ||||
| 	} | ||||
|  | ||||
| 	h.Lock() | ||||
| 	for sb := range h.subscribers { | ||||
| 		handler := h.createSubHandler(sb, config) | ||||
| 		var opts []broker.SubscribeOption | ||||
| 		if queue := sb.Options().Queue; len(queue) > 0 { | ||||
| 			opts = append(opts, broker.SubscribeGroup(queue)) | ||||
| 		} | ||||
|  | ||||
| 		subCtx := config.Context | ||||
| 		if cx := sb.Options().Context; cx != nil { | ||||
| 			subCtx = cx | ||||
| 		} | ||||
| 		opts = append(opts, broker.SubscribeContext(subCtx)) | ||||
| 		opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck)) | ||||
| 		opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly)) | ||||
|  | ||||
| 		sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...) | ||||
| 		if err != nil { | ||||
| 			h.Unlock() | ||||
| 			return err | ||||
| 		} | ||||
| 		h.subscribers[sb] = []broker.Subscriber{sub} | ||||
| 	} | ||||
|  | ||||
| 	h.registered = true | ||||
| 	h.rsvc = service | ||||
| 	h.Unlock() | ||||
| @@ -435,23 +361,6 @@ func (h *Server) Deregister() error { | ||||
| 	} | ||||
|  | ||||
| 	h.registered = false | ||||
|  | ||||
| 	subCtx := h.opts.Context | ||||
| 	for sb, subs := range h.subscribers { | ||||
| 		if cx := sb.Options().Context; cx != nil { | ||||
| 			subCtx = cx | ||||
| 		} | ||||
|  | ||||
| 		for _, sub := range subs { | ||||
| 			config.Logger.Infof(config.Context, "Unsubscribing from topic: %s", sub.Topic()) | ||||
| 			if err := sub.Unsubscribe(subCtx); err != nil { | ||||
| 				h.Unlock() | ||||
| 				config.Logger.Errorf(config.Context, "failed to unsubscribe topic: %s, error: %v", sb.Topic(), err) | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
| 		h.subscribers[sb] = nil | ||||
| 	} | ||||
| 	h.Unlock() | ||||
| 	return nil | ||||
| } | ||||
| @@ -525,10 +434,6 @@ func (h *Server) Start() error { | ||||
| 		return fmt.Errorf("cant process with nil handler") | ||||
| 	} | ||||
|  | ||||
| 	if err := config.Broker.Connect(h.opts.Context); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if err := config.RegisterCheck(h.opts.Context); err != nil { | ||||
| 		if config.Logger.V(logger.ErrorLevel) { | ||||
| 			config.Logger.Errorf(config.Context, "Server %s-%s register check error: %s", config.Name, config.ID, err) | ||||
| @@ -626,10 +531,6 @@ func (h *Server) Start() error { | ||||
| 			config.Logger.Errorf(config.Context, "Server deregister error: %s", err) | ||||
| 		} | ||||
|  | ||||
| 		if err := config.Broker.Disconnect(config.Context); err != nil { | ||||
| 			config.Logger.Errorf(config.Context, "Broker disconnect error: %s", err) | ||||
| 		} | ||||
|  | ||||
| 		ch <- ts.Close() | ||||
| 	}() | ||||
|  | ||||
| @@ -659,7 +560,6 @@ func NewServer(opts ...server.Option) *Server { | ||||
| 	return &Server{ | ||||
| 		opts:         options, | ||||
| 		exit:         make(chan chan error), | ||||
| 		subscribers:  make(map[*httpSubscriber][]broker.Subscriber), | ||||
| 		errorHandler: eh, | ||||
| 		pathHandlers: rhttp.NewTrie(), | ||||
| 	} | ||||
|   | ||||
| @@ -85,8 +85,8 @@ func Middleware(mw ...func(http.Handler) http.Handler) server.Option { | ||||
|  | ||||
| type serverKey struct{} | ||||
|  | ||||
| // Server provide ability to pass *http.Server | ||||
| func Server(hs *http.Server) server.Option { | ||||
| // HTTPServer provide ability to pass *http.Server | ||||
| func HTTPServer(hs *http.Server) server.Option { | ||||
| 	return server.SetOption(serverKey{}, hs) | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user