From 4413372a3fcf9e98e5535315e16a86dd86ce2728 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 18 Aug 2020 14:00:51 +0100 Subject: [PATCH] Decruft the broker by removing Event interface (#1940) --- broker/broker.go | 12 ++----- broker/http/http.go | 32 +++--------------- broker/http/http_test.go | 17 +++------- broker/memory/memory.go | 64 ++---------------------------------- broker/memory/memory_test.go | 2 +- broker/nats/nats.go | 38 +++------------------ broker/options.go | 26 ++++----------- router/registry/registry.go | 4 +++ server/grpc/grpc.go | 4 --- server/grpc/subscriber.go | 3 +- server/mucp/rpc_event.go | 38 --------------------- server/mucp/rpc_server.go | 13 ++------ server/mucp/subscriber.go | 9 +++++ tunnel/broker/broker.go | 9 ++--- 14 files changed, 45 insertions(+), 226 deletions(-) delete mode 100644 server/mucp/rpc_event.go diff --git a/broker/broker.go b/broker/broker.go index 4095a3a9..b205b903 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -16,21 +16,15 @@ type Broker interface { // Handler is used to process messages via a subscription of a topic. // The handler is passed a publication interface which contains the // message and optional Ack method to acknowledge receipt of the message. -type Handler func(Event) error +type Handler func(*Message) error + +type ErrorHandler func(*Message, error) type Message struct { Header map[string]string Body []byte } -// Event is given to a subscription handler for processing -type Event interface { - Topic() string - Message() *Message - Ack() error - Error() error -} - // Subscriber is a convenience return type for the Subscribe method type Subscriber interface { Options() SubscribeOptions diff --git a/broker/http/http.go b/broker/http/http.go index 4b0c26c4..2b59de5b 100644 --- a/broker/http/http.go +++ b/broker/http/http.go @@ -60,12 +60,6 @@ type httpSubscriber struct { hb *httpBroker } -type httpEvent struct { - m *broker.Message - t string - err error -} - var ( DefaultPath = "/" DefaultAddress = "127.0.0.1:0" @@ -155,22 +149,6 @@ func newHttpBroker(opts ...broker.Option) broker.Broker { return h } -func (h *httpEvent) Ack() error { - return nil -} - -func (h *httpEvent) Error() error { - return h.err -} - -func (h *httpEvent) Message() *broker.Message { - return h.m -} - -func (h *httpEvent) Topic() string { - return h.t -} - func (h *httpSubscriber) Options() broker.SubscribeOptions { return h.opts } @@ -310,16 +288,15 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - var m *broker.Message - if err = h.opts.Codec.Unmarshal(b, &m); err != nil { + var msg *broker.Message + if err = h.opts.Codec.Unmarshal(b, &msg); err != nil { errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err) w.WriteHeader(500) w.Write([]byte(errr.Error())) return } - topic := m.Header["Micro-Topic"] - //delete(m.Header, ":topic") + topic := msg.Header["Micro-Topic"] if len(topic) == 0 { errr := merr.InternalServerError("go.micro.broker", "Topic not found") @@ -328,7 +305,6 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - p := &httpEvent{m: m, t: topic} id := req.Form.Get("id") //nolint:prealloc @@ -345,7 +321,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { // execute the handler for _, fn := range subs { - p.err = fn(p) + fn(msg) } } diff --git a/broker/http/http_test.go b/broker/http/http_test.go index 335e820c..ea43e1f5 100644 --- a/broker/http/http_test.go +++ b/broker/http/http_test.go @@ -83,9 +83,8 @@ func sub(be *testing.B, c int) { done := make(chan bool, c) for i := 0; i < c; i++ { - sub, err := b.Subscribe(topic, func(p broker.Event) error { + sub, err := b.Subscribe(topic, func(m *broker.Message) error { done <- true - m := p.Message() if string(m.Body) != string(msg.Body) { be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) @@ -140,9 +139,8 @@ func pub(be *testing.B, c int) { done := make(chan bool, c*4) - sub, err := b.Subscribe(topic, func(p broker.Event) error { + sub, err := b.Subscribe(topic, func(m *broker.Message) error { done <- true - m := p.Message() if string(m.Body) != string(msg.Body) { be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) } @@ -208,8 +206,7 @@ func TestBroker(t *testing.T) { done := make(chan bool) - sub, err := b.Subscribe("test", func(p broker.Event) error { - m := p.Message() + sub, err := b.Subscribe("test", func(m *broker.Message) error { if string(m.Body) != string(msg.Body) { t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) @@ -257,11 +254,9 @@ func TestConcurrentSubBroker(t *testing.T) { var wg sync.WaitGroup for i := 0; i < 10; i++ { - sub, err := b.Subscribe("test", func(p broker.Event) error { + sub, err := b.Subscribe("test", func(m *broker.Message) error { defer wg.Done() - m := p.Message() - if string(m.Body) != string(msg.Body) { t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) } @@ -312,11 +307,9 @@ func TestConcurrentPubBroker(t *testing.T) { var wg sync.WaitGroup - sub, err := b.Subscribe("test", func(p broker.Event) error { + sub, err := b.Subscribe("test", func(m *broker.Message) error { defer wg.Done() - m := p.Message() - if string(m.Body) != string(msg.Body) { t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) } diff --git a/broker/memory/memory.go b/broker/memory/memory.go index a093c4ec..a934daf5 100644 --- a/broker/memory/memory.go +++ b/broker/memory/memory.go @@ -10,7 +10,6 @@ import ( "github.com/google/uuid" "github.com/micro/go-micro/v3/broker" - "github.com/micro/go-micro/v3/logger" maddr "github.com/micro/go-micro/v3/util/addr" mnet "github.com/micro/go-micro/v3/util/net" ) @@ -24,13 +23,6 @@ type memoryBroker struct { Subscribers map[string][]*memorySubscriber } -type memoryEvent struct { - opts broker.Options - topic string - err error - message interface{} -} - type memorySubscriber struct { id string topic string @@ -103,31 +95,9 @@ func (m *memoryBroker) Publish(topic string, msg *broker.Message, opts ...broker return nil } - var v interface{} - if m.opts.Codec != nil { - buf, err := m.opts.Codec.Marshal(msg) - if err != nil { - return err - } - v = buf - } else { - v = msg - } - - p := &memoryEvent{ - topic: topic, - message: v, - opts: m.opts, - } - for _, sub := range subs { - if err := sub.handler(p); err != nil { - p.err = err - if eh := m.opts.ErrorHandler; eh != nil { - eh(p) - continue - } - return err + if err := sub.handler(msg); err != nil { + continue } } @@ -180,36 +150,6 @@ func (m *memoryBroker) String() string { return "memory" } -func (m *memoryEvent) Topic() string { - return m.topic -} - -func (m *memoryEvent) Message() *broker.Message { - switch v := m.message.(type) { - case *broker.Message: - return v - case []byte: - msg := &broker.Message{} - if err := m.opts.Codec.Unmarshal(v, msg); err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Errorf("[memory]: failed to unmarshal: %v\n", err) - } - return nil - } - return msg - } - - return nil -} - -func (m *memoryEvent) Ack() error { - return nil -} - -func (m *memoryEvent) Error() error { - return m.err -} - func (m *memorySubscriber) Options() broker.SubscribeOptions { return m.opts } diff --git a/broker/memory/memory_test.go b/broker/memory/memory_test.go index 1abe782d..d9c22c2c 100644 --- a/broker/memory/memory_test.go +++ b/broker/memory/memory_test.go @@ -17,7 +17,7 @@ func TestMemoryBroker(t *testing.T) { topic := "test" count := 10 - fn := func(p broker.Event) error { + fn := func(m *broker.Message) error { return nil } diff --git a/broker/nats/nats.go b/broker/nats/nats.go index 17c12411..d9907c10 100644 --- a/broker/nats/nats.go +++ b/broker/nats/nats.go @@ -36,29 +36,6 @@ type subscriber struct { opts broker.SubscribeOptions } -type publication struct { - t string - err error - m *broker.Message -} - -func (p *publication) Topic() string { - return p.t -} - -func (p *publication) Message() *broker.Message { - return p.m -} - -func (p *publication) Ack() error { - // nats does not support acking - return nil -} - -func (p *publication) Error() error { - return p.err -} - func (s *subscriber) Options() broker.SubscribeOptions { return s.opts } @@ -195,7 +172,6 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro n.RUnlock() opt := broker.SubscribeOptions{ - AutoAck: true, Context: context.Background(), } @@ -204,29 +180,25 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro } fn := func(msg *nats.Msg) { - var m broker.Message - pub := &publication{t: msg.Subject} - eh := n.opts.ErrorHandler + var m *broker.Message + eh := opt.ErrorHandler err := n.opts.Codec.Unmarshal(msg.Data, &m) - pub.err = err - pub.m = &m if err != nil { m.Body = msg.Data if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Error(err) } if eh != nil { - eh(pub) + eh(m, err) } return } - if err := handler(pub); err != nil { - pub.err = err + if err := handler(m); err != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Error(err) } if eh != nil { - eh(pub) + eh(m, err) } } } diff --git a/broker/options.go b/broker/options.go index 8d1f2c22..514ec5ab 100644 --- a/broker/options.go +++ b/broker/options.go @@ -13,10 +13,6 @@ type Options struct { Secure bool Codec codec.Marshaler - // Handler executed when error happens in broker message - // processing - ErrorHandler Handler - TLSConfig *tls.Config // Registry used for clustering Registry registry.Registry @@ -32,9 +28,9 @@ type PublishOptions struct { } type SubscribeOptions struct { - // AutoAck defaults to true. When a handler returns - // with a nil error the message is acked. - AutoAck bool + // Handler executed when errors occur processing messages + ErrorHandler ErrorHandler + // Subscribers with the same queue name // will create a shared subscription where each // receives a subset of messages. @@ -59,9 +55,7 @@ func PublishContext(ctx context.Context) PublishOption { type SubscribeOption func(*SubscribeOptions) func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { - opt := SubscribeOptions{ - AutoAck: true, - } + opt := SubscribeOptions{} for _, o := range opts { o(&opt) @@ -85,18 +79,10 @@ func Codec(c codec.Marshaler) Option { } } -// DisableAutoAck will disable auto acking of messages -// after they have been handled. -func DisableAutoAck() SubscribeOption { - return func(o *SubscribeOptions) { - o.AutoAck = false - } -} - // ErrorHandler will catch all broker errors that cant be handled // in normal way, for example Codec errors -func ErrorHandler(h Handler) Option { - return func(o *Options) { +func HandleError(h ErrorHandler) SubscribeOption { + return func(o *SubscribeOptions) { o.ErrorHandler = h } } diff --git a/router/registry/registry.go b/router/registry/registry.go index c762d7bc..e02bc418 100644 --- a/router/registry/registry.go +++ b/router/registry/registry.go @@ -300,9 +300,12 @@ func (r *rtr) watchRegistry(w registry.Watcher) error { // don't process nil entries if res.Service == nil { + logger.Trace("Received a nil service") continue } + logger.Tracef("Router dealing with next route %s %+v\n", res.Action, res.Service) + // get the services domain from metadata. Fallback to wildcard. domain := getDomain(res.Service) @@ -376,6 +379,7 @@ func (r *rtr) start() error { case <-r.exit: return default: + logger.Tracef("Router starting registry watch") w, err := r.options.Registry.Watch(registry.WatchDomain(registry.WildcardDomain)) if err != nil { if logger.V(logger.DebugLevel, logger.DefaultLogger) { diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index d9146f02..1de8a83e 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -753,10 +753,6 @@ func (g *grpcServer) Register() error { opts = append(opts, broker.SubscribeContext(cx)) } - if !sb.Options().AutoAck { - opts = append(opts, broker.DisableAutoAck()) - } - if logger.V(logger.InfoLevel, logger.DefaultLogger) { logger.Infof("Subscribing to topic: %s", sb.Topic()) } diff --git a/server/grpc/subscriber.go b/server/grpc/subscriber.go index 28f86abf..d69379ac 100644 --- a/server/grpc/subscriber.go +++ b/server/grpc/subscriber.go @@ -167,7 +167,7 @@ func validateSubscriber(sub server.Subscriber) error { } func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { - return func(p broker.Event) (err error) { + return func(msg *broker.Message) (err error) { defer func() { if r := recover(); r != nil { @@ -179,7 +179,6 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke } }() - msg := p.Message() // if we don't have headers, create empty map if msg.Header == nil { msg.Header = make(map[string]string) diff --git a/server/mucp/rpc_event.go b/server/mucp/rpc_event.go deleted file mode 100644 index 7d886afe..00000000 --- a/server/mucp/rpc_event.go +++ /dev/null @@ -1,38 +0,0 @@ -package mucp - -import ( - "github.com/micro/go-micro/v3/broker" - "github.com/micro/go-micro/v3/transport" -) - -// event is a broker event we handle on the server transport -type event struct { - err error - message *broker.Message -} - -func (e *event) Ack() error { - // there is no ack support - return nil -} - -func (e *event) Message() *broker.Message { - return e.message -} - -func (e *event) Error() error { - return e.err -} - -func (e *event) Topic() string { - return e.message.Header["Micro-Topic"] -} - -func newEvent(msg transport.Message) *event { - return &event{ - message: &broker.Message{ - Header: msg.Header, - Body: msg.Body, - }, - } -} diff --git a/server/mucp/rpc_server.go b/server/mucp/rpc_server.go index b885ce02..d650b8d1 100644 --- a/server/mucp/rpc_server.go +++ b/server/mucp/rpc_server.go @@ -79,10 +79,7 @@ func newServer(opts ...server.Option) server.Server { // HandleEvent handles inbound messages to the service directly // TODO: handle requests from an event. We won't send a response. -func (s *rpcServer) HandleEvent(e broker.Event) error { - // formatting horrible cruft - msg := e.Message() - +func (s *rpcServer) HandleEvent(msg *broker.Message) error { if msg.Header == nil { // create empty map in case of headers empty to avoid panic later msg.Header = make(map[string]string) @@ -190,10 +187,8 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { // Micro-Service is a request // Micro-Topic is a message if t := msg.Header["Micro-Topic"]; len(t) > 0 { - // process the event - ev := newEvent(msg) // TODO: handle the error event - if err := s.HandleEvent(ev); err != nil { + if err := s.HandleEvent(newMessage(msg)); err != nil { msg.Header["Micro-Error"] = err.Error() } // write back some 200 @@ -706,10 +701,6 @@ func (s *rpcServer) Register() error { opts = append(opts, broker.SubscribeContext(cx)) } - if !sb.Options().AutoAck { - opts = append(opts, broker.DisableAutoAck()) - } - sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent, opts...) if err != nil { return err diff --git a/server/mucp/subscriber.go b/server/mucp/subscriber.go index a2142f34..149226be 100644 --- a/server/mucp/subscriber.go +++ b/server/mucp/subscriber.go @@ -6,6 +6,8 @@ import ( "github.com/micro/go-micro/v3/registry" "github.com/micro/go-micro/v3/server" + "github.com/micro/go-micro/v3/broker" + "github.com/micro/go-micro/v3/transport" ) const ( @@ -28,6 +30,13 @@ type subscriber struct { opts server.SubscriberOptions } +func newMessage(msg transport.Message) *broker.Message { + return &broker.Message{ + Header: msg.Header, + Body: msg.Body, + } +} + func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { options := server.SubscriberOptions{ AutoAck: true, diff --git a/tunnel/broker/broker.go b/tunnel/broker/broker.go index 7bfc0757..caa1eb27 100644 --- a/tunnel/broker/broker.go +++ b/tunnel/broker/broker.go @@ -124,12 +124,9 @@ func (t *tunSubscriber) run() { c.Close() // handle the message - go t.handler(&tunEvent{ - topic: t.topic, - message: &broker.Message{ - Header: m.Header, - Body: m.Body, - }, + go t.handler(&broker.Message{ + Header: m.Header, + Body: m.Body, }) } }