diff --git a/http.go b/http.go index 57b5b19..4d6fb00 100644 --- a/http.go +++ b/http.go @@ -34,7 +34,7 @@ type httpServer struct { hd server.Handler exit chan chan error registerOnce sync.Once - subscribers map[*subscriber][]broker.Subscriber + subscribers map[*httpSubscriber][]broker.Subscriber } func init() { @@ -109,9 +109,9 @@ func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...se } func (h *httpServer) Subscribe(sb server.Subscriber) error { - sub, ok := sb.(*subscriber) + sub, ok := sb.(*httpSubscriber) if !ok { - return fmt.Errorf("invalid subscriber: expected *subscriber") + return fmt.Errorf("invalid subscriber: expected *httpSubscriber") } if len(sub.handlers) == 0 { return fmt.Errorf("invalid subscriber: no handler functions") @@ -141,7 +141,7 @@ func (h *httpServer) Register() error { service.Endpoints = eps h.Lock() - var subscriberList []*subscriber + var subscriberList []*httpSubscriber for e := range h.subscribers { // Only advertise non internal subscribers if !e.Options().Internal { diff --git a/message.go b/message.go index d916a5d..821bb76 100644 --- a/message.go +++ b/message.go @@ -1,19 +1,19 @@ package http -type rpcMessage struct { +type httpMessage struct { topic string contentType string payload interface{} } -func (r *rpcMessage) ContentType() string { +func (r *httpMessage) ContentType() string { return r.contentType } -func (r *rpcMessage) Topic() string { +func (r *httpMessage) Topic() string { return r.topic } -func (r *rpcMessage) Payload() interface{} { +func (r *httpMessage) Payload() interface{} { return r.payload } diff --git a/subscriber.go b/subscriber.go index 622471f..1191c86 100644 --- a/subscriber.go +++ b/subscriber.go @@ -28,7 +28,7 @@ type handler struct { ctxType reflect.Type } -type subscriber struct { +type httpSubscriber struct { topic string rcvr reflect.Value typ reflect.Type @@ -117,7 +117,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio } } - return &subscriber{ + return &httpSubscriber{ rcvr: reflect.ValueOf(sub), typ: reflect.TypeOf(sub), topic: topic, @@ -182,7 +182,7 @@ func validateSubscriber(sub server.Subscriber) error { return nil } -func (s *httpServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { +func (s *httpServer) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler { return func(p broker.Publication) error { msg := p.Message() ct := msg.Header["Content-Type"] @@ -251,7 +251,7 @@ func (s *httpServer) createSubHandler(sb *subscriber, opts server.Options) broke } go func() { - results <- fn(ctx, &rpcMessage{ + results <- fn(ctx, &httpMessage{ topic: sb.topic, contentType: ct, payload: req.Interface(), @@ -275,18 +275,18 @@ func (s *httpServer) createSubHandler(sb *subscriber, opts server.Options) broke } } -func (s *subscriber) Topic() string { +func (s *httpSubscriber) Topic() string { return s.topic } -func (s *subscriber) Subscriber() interface{} { +func (s *httpSubscriber) Subscriber() interface{} { return s.subscriber } -func (s *subscriber) Endpoints() []*registry.Endpoint { +func (s *httpSubscriber) Endpoints() []*registry.Endpoint { return s.endpoints } -func (s *subscriber) Options() server.SubscriberOptions { +func (s *httpSubscriber) Options() server.SubscriberOptions { return s.opts }