This commit is contained in:
武新飞 2018-12-19 17:47:03 +08:00 committed by Vasiliy Tolstov
parent 3c3b81d9cd
commit 921f832670
3 changed files with 16 additions and 16 deletions

View File

@ -34,7 +34,7 @@ type httpServer struct {
hd server.Handler
exit chan chan error
registerOnce sync.Once
subscribers map[*subscriber][]broker.Subscriber
subscribers map[*httpSubscriber][]broker.Subscriber
}
func init() {
@ -109,9 +109,9 @@ func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...se
}
func (h *httpServer) Subscribe(sb server.Subscriber) error {
sub, ok := sb.(*subscriber)
sub, ok := sb.(*httpSubscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
return fmt.Errorf("invalid subscriber: expected *httpSubscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
@ -141,7 +141,7 @@ func (h *httpServer) Register() error {
service.Endpoints = eps
h.Lock()
var subscriberList []*subscriber
var subscriberList []*httpSubscriber
for e := range h.subscribers {
// Only advertise non internal subscribers
if !e.Options().Internal {

View File

@ -1,19 +1,19 @@
package http
type rpcMessage struct {
type httpMessage struct {
topic string
contentType string
payload interface{}
}
func (r *rpcMessage) ContentType() string {
func (r *httpMessage) ContentType() string {
return r.contentType
}
func (r *rpcMessage) Topic() string {
func (r *httpMessage) Topic() string {
return r.topic
}
func (r *rpcMessage) Payload() interface{} {
func (r *httpMessage) Payload() interface{} {
return r.payload
}

View File

@ -28,7 +28,7 @@ type handler struct {
ctxType reflect.Type
}
type subscriber struct {
type httpSubscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
@ -117,7 +117,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
}
}
return &subscriber{
return &httpSubscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
@ -182,7 +182,7 @@ func validateSubscriber(sub server.Subscriber) error {
return nil
}
func (s *httpServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
func (s *httpServer) createSubHandler(sb *httpSubscriber, opts server.Options) broker.Handler {
return func(p broker.Publication) error {
msg := p.Message()
ct := msg.Header["Content-Type"]
@ -251,7 +251,7 @@ func (s *httpServer) createSubHandler(sb *subscriber, opts server.Options) broke
}
go func() {
results <- fn(ctx, &rpcMessage{
results <- fn(ctx, &httpMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
@ -275,18 +275,18 @@ func (s *httpServer) createSubHandler(sb *subscriber, opts server.Options) broke
}
}
func (s *subscriber) Topic() string {
func (s *httpSubscriber) Topic() string {
return s.topic
}
func (s *subscriber) Subscriber() interface{} {
func (s *httpSubscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *subscriber) Endpoints() []*registry.Endpoint {
func (s *httpSubscriber) Endpoints() []*registry.Endpoint {
return s.endpoints
}
func (s *subscriber) Options() server.SubscriberOptions {
func (s *httpSubscriber) Options() server.SubscriberOptions {
return s.opts
}