Decruft the broker by removing Event interface (#1940)

This commit is contained in:
Asim Aslam 2020-08-18 14:00:51 +01:00 committed by GitHub
parent a2a808f2d6
commit 4413372a3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 45 additions and 226 deletions

View File

@ -16,21 +16,15 @@ type Broker interface {
// Handler is used to process messages via a subscription of a topic. // Handler is used to process messages via a subscription of a topic.
// The handler is passed a publication interface which contains the // The handler is passed a publication interface which contains the
// message and optional Ack method to acknowledge receipt of the message. // message and optional Ack method to acknowledge receipt of the message.
type Handler func(Event) error type Handler func(*Message) error
type ErrorHandler func(*Message, error)
type Message struct { type Message struct {
Header map[string]string Header map[string]string
Body []byte Body []byte
} }
// Event is given to a subscription handler for processing
type Event interface {
Topic() string
Message() *Message
Ack() error
Error() error
}
// Subscriber is a convenience return type for the Subscribe method // Subscriber is a convenience return type for the Subscribe method
type Subscriber interface { type Subscriber interface {
Options() SubscribeOptions Options() SubscribeOptions

View File

@ -60,12 +60,6 @@ type httpSubscriber struct {
hb *httpBroker hb *httpBroker
} }
type httpEvent struct {
m *broker.Message
t string
err error
}
var ( var (
DefaultPath = "/" DefaultPath = "/"
DefaultAddress = "127.0.0.1:0" DefaultAddress = "127.0.0.1:0"
@ -155,22 +149,6 @@ func newHttpBroker(opts ...broker.Option) broker.Broker {
return h return h
} }
func (h *httpEvent) Ack() error {
return nil
}
func (h *httpEvent) Error() error {
return h.err
}
func (h *httpEvent) Message() *broker.Message {
return h.m
}
func (h *httpEvent) Topic() string {
return h.t
}
func (h *httpSubscriber) Options() broker.SubscribeOptions { func (h *httpSubscriber) Options() broker.SubscribeOptions {
return h.opts return h.opts
} }
@ -310,16 +288,15 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return return
} }
var m *broker.Message var msg *broker.Message
if err = h.opts.Codec.Unmarshal(b, &m); err != nil { if err = h.opts.Codec.Unmarshal(b, &msg); err != nil {
errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err) errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
w.WriteHeader(500) w.WriteHeader(500)
w.Write([]byte(errr.Error())) w.Write([]byte(errr.Error()))
return return
} }
topic := m.Header["Micro-Topic"] topic := msg.Header["Micro-Topic"]
//delete(m.Header, ":topic")
if len(topic) == 0 { if len(topic) == 0 {
errr := merr.InternalServerError("go.micro.broker", "Topic not found") errr := merr.InternalServerError("go.micro.broker", "Topic not found")
@ -328,7 +305,6 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return return
} }
p := &httpEvent{m: m, t: topic}
id := req.Form.Get("id") id := req.Form.Get("id")
//nolint:prealloc //nolint:prealloc
@ -345,7 +321,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// execute the handler // execute the handler
for _, fn := range subs { for _, fn := range subs {
p.err = fn(p) fn(msg)
} }
} }

View File

@ -83,9 +83,8 @@ func sub(be *testing.B, c int) {
done := make(chan bool, c) done := make(chan bool, c)
for i := 0; i < c; i++ { for i := 0; i < c; i++ {
sub, err := b.Subscribe(topic, func(p broker.Event) error { sub, err := b.Subscribe(topic, func(m *broker.Message) error {
done <- true done <- true
m := p.Message()
if string(m.Body) != string(msg.Body) { if string(m.Body) != string(msg.Body) {
be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
@ -140,9 +139,8 @@ func pub(be *testing.B, c int) {
done := make(chan bool, c*4) done := make(chan bool, c*4)
sub, err := b.Subscribe(topic, func(p broker.Event) error { sub, err := b.Subscribe(topic, func(m *broker.Message) error {
done <- true done <- true
m := p.Message()
if string(m.Body) != string(msg.Body) { if string(m.Body) != string(msg.Body) {
be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
} }
@ -208,8 +206,7 @@ func TestBroker(t *testing.T) {
done := make(chan bool) done := make(chan bool)
sub, err := b.Subscribe("test", func(p broker.Event) error { sub, err := b.Subscribe("test", func(m *broker.Message) error {
m := p.Message()
if string(m.Body) != string(msg.Body) { if string(m.Body) != string(msg.Body) {
t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
@ -257,11 +254,9 @@ func TestConcurrentSubBroker(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
sub, err := b.Subscribe("test", func(p broker.Event) error { sub, err := b.Subscribe("test", func(m *broker.Message) error {
defer wg.Done() defer wg.Done()
m := p.Message()
if string(m.Body) != string(msg.Body) { if string(m.Body) != string(msg.Body) {
t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
} }
@ -312,11 +307,9 @@ func TestConcurrentPubBroker(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
sub, err := b.Subscribe("test", func(p broker.Event) error { sub, err := b.Subscribe("test", func(m *broker.Message) error {
defer wg.Done() defer wg.Done()
m := p.Message()
if string(m.Body) != string(msg.Body) { if string(m.Body) != string(msg.Body) {
t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
} }

View File

@ -10,7 +10,6 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/go-micro/v3/broker" "github.com/micro/go-micro/v3/broker"
"github.com/micro/go-micro/v3/logger"
maddr "github.com/micro/go-micro/v3/util/addr" maddr "github.com/micro/go-micro/v3/util/addr"
mnet "github.com/micro/go-micro/v3/util/net" mnet "github.com/micro/go-micro/v3/util/net"
) )
@ -24,13 +23,6 @@ type memoryBroker struct {
Subscribers map[string][]*memorySubscriber Subscribers map[string][]*memorySubscriber
} }
type memoryEvent struct {
opts broker.Options
topic string
err error
message interface{}
}
type memorySubscriber struct { type memorySubscriber struct {
id string id string
topic string topic string
@ -103,31 +95,9 @@ func (m *memoryBroker) Publish(topic string, msg *broker.Message, opts ...broker
return nil return nil
} }
var v interface{}
if m.opts.Codec != nil {
buf, err := m.opts.Codec.Marshal(msg)
if err != nil {
return err
}
v = buf
} else {
v = msg
}
p := &memoryEvent{
topic: topic,
message: v,
opts: m.opts,
}
for _, sub := range subs { for _, sub := range subs {
if err := sub.handler(p); err != nil { if err := sub.handler(msg); err != nil {
p.err = err continue
if eh := m.opts.ErrorHandler; eh != nil {
eh(p)
continue
}
return err
} }
} }
@ -180,36 +150,6 @@ func (m *memoryBroker) String() string {
return "memory" return "memory"
} }
func (m *memoryEvent) Topic() string {
return m.topic
}
func (m *memoryEvent) Message() *broker.Message {
switch v := m.message.(type) {
case *broker.Message:
return v
case []byte:
msg := &broker.Message{}
if err := m.opts.Codec.Unmarshal(v, msg); err != nil {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("[memory]: failed to unmarshal: %v\n", err)
}
return nil
}
return msg
}
return nil
}
func (m *memoryEvent) Ack() error {
return nil
}
func (m *memoryEvent) Error() error {
return m.err
}
func (m *memorySubscriber) Options() broker.SubscribeOptions { func (m *memorySubscriber) Options() broker.SubscribeOptions {
return m.opts return m.opts
} }

View File

@ -17,7 +17,7 @@ func TestMemoryBroker(t *testing.T) {
topic := "test" topic := "test"
count := 10 count := 10
fn := func(p broker.Event) error { fn := func(m *broker.Message) error {
return nil return nil
} }

View File

@ -36,29 +36,6 @@ type subscriber struct {
opts broker.SubscribeOptions opts broker.SubscribeOptions
} }
type publication struct {
t string
err error
m *broker.Message
}
func (p *publication) Topic() string {
return p.t
}
func (p *publication) Message() *broker.Message {
return p.m
}
func (p *publication) Ack() error {
// nats does not support acking
return nil
}
func (p *publication) Error() error {
return p.err
}
func (s *subscriber) Options() broker.SubscribeOptions { func (s *subscriber) Options() broker.SubscribeOptions {
return s.opts return s.opts
} }
@ -195,7 +172,6 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
n.RUnlock() n.RUnlock()
opt := broker.SubscribeOptions{ opt := broker.SubscribeOptions{
AutoAck: true,
Context: context.Background(), Context: context.Background(),
} }
@ -204,29 +180,25 @@ func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...bro
} }
fn := func(msg *nats.Msg) { fn := func(msg *nats.Msg) {
var m broker.Message var m *broker.Message
pub := &publication{t: msg.Subject} eh := opt.ErrorHandler
eh := n.opts.ErrorHandler
err := n.opts.Codec.Unmarshal(msg.Data, &m) err := n.opts.Codec.Unmarshal(msg.Data, &m)
pub.err = err
pub.m = &m
if err != nil { if err != nil {
m.Body = msg.Data m.Body = msg.Data
if logger.V(logger.ErrorLevel, logger.DefaultLogger) { if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err) logger.Error(err)
} }
if eh != nil { if eh != nil {
eh(pub) eh(m, err)
} }
return return
} }
if err := handler(pub); err != nil { if err := handler(m); err != nil {
pub.err = err
if logger.V(logger.ErrorLevel, logger.DefaultLogger) { if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err) logger.Error(err)
} }
if eh != nil { if eh != nil {
eh(pub) eh(m, err)
} }
} }
} }

View File

@ -13,10 +13,6 @@ type Options struct {
Secure bool Secure bool
Codec codec.Marshaler Codec codec.Marshaler
// Handler executed when error happens in broker message
// processing
ErrorHandler Handler
TLSConfig *tls.Config TLSConfig *tls.Config
// Registry used for clustering // Registry used for clustering
Registry registry.Registry Registry registry.Registry
@ -32,9 +28,9 @@ type PublishOptions struct {
} }
type SubscribeOptions struct { type SubscribeOptions struct {
// AutoAck defaults to true. When a handler returns // Handler executed when errors occur processing messages
// with a nil error the message is acked. ErrorHandler ErrorHandler
AutoAck bool
// Subscribers with the same queue name // Subscribers with the same queue name
// will create a shared subscription where each // will create a shared subscription where each
// receives a subset of messages. // receives a subset of messages.
@ -59,9 +55,7 @@ func PublishContext(ctx context.Context) PublishOption {
type SubscribeOption func(*SubscribeOptions) type SubscribeOption func(*SubscribeOptions)
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
opt := SubscribeOptions{ opt := SubscribeOptions{}
AutoAck: true,
}
for _, o := range opts { for _, o := range opts {
o(&opt) o(&opt)
@ -85,18 +79,10 @@ func Codec(c codec.Marshaler) Option {
} }
} }
// DisableAutoAck will disable auto acking of messages
// after they have been handled.
func DisableAutoAck() SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = false
}
}
// ErrorHandler will catch all broker errors that cant be handled // ErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors // in normal way, for example Codec errors
func ErrorHandler(h Handler) Option { func HandleError(h ErrorHandler) SubscribeOption {
return func(o *Options) { return func(o *SubscribeOptions) {
o.ErrorHandler = h o.ErrorHandler = h
} }
} }

View File

@ -300,9 +300,12 @@ func (r *rtr) watchRegistry(w registry.Watcher) error {
// don't process nil entries // don't process nil entries
if res.Service == nil { if res.Service == nil {
logger.Trace("Received a nil service")
continue continue
} }
logger.Tracef("Router dealing with next route %s %+v\n", res.Action, res.Service)
// get the services domain from metadata. Fallback to wildcard. // get the services domain from metadata. Fallback to wildcard.
domain := getDomain(res.Service) domain := getDomain(res.Service)
@ -376,6 +379,7 @@ func (r *rtr) start() error {
case <-r.exit: case <-r.exit:
return return
default: default:
logger.Tracef("Router starting registry watch")
w, err := r.options.Registry.Watch(registry.WatchDomain(registry.WildcardDomain)) w, err := r.options.Registry.Watch(registry.WatchDomain(registry.WildcardDomain))
if err != nil { if err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) { if logger.V(logger.DebugLevel, logger.DefaultLogger) {

View File

@ -753,10 +753,6 @@ func (g *grpcServer) Register() error {
opts = append(opts, broker.SubscribeContext(cx)) opts = append(opts, broker.SubscribeContext(cx))
} }
if !sb.Options().AutoAck {
opts = append(opts, broker.DisableAutoAck())
}
if logger.V(logger.InfoLevel, logger.DefaultLogger) { if logger.V(logger.InfoLevel, logger.DefaultLogger) {
logger.Infof("Subscribing to topic: %s", sb.Topic()) logger.Infof("Subscribing to topic: %s", sb.Topic())
} }

View File

@ -167,7 +167,7 @@ func validateSubscriber(sub server.Subscriber) error {
} }
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
return func(p broker.Event) (err error) { return func(msg *broker.Message) (err error) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@ -179,7 +179,6 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
} }
}() }()
msg := p.Message()
// if we don't have headers, create empty map // if we don't have headers, create empty map
if msg.Header == nil { if msg.Header == nil {
msg.Header = make(map[string]string) msg.Header = make(map[string]string)

View File

@ -1,38 +0,0 @@
package mucp
import (
"github.com/micro/go-micro/v3/broker"
"github.com/micro/go-micro/v3/transport"
)
// event is a broker event we handle on the server transport
type event struct {
err error
message *broker.Message
}
func (e *event) Ack() error {
// there is no ack support
return nil
}
func (e *event) Message() *broker.Message {
return e.message
}
func (e *event) Error() error {
return e.err
}
func (e *event) Topic() string {
return e.message.Header["Micro-Topic"]
}
func newEvent(msg transport.Message) *event {
return &event{
message: &broker.Message{
Header: msg.Header,
Body: msg.Body,
},
}
}

View File

@ -79,10 +79,7 @@ func newServer(opts ...server.Option) server.Server {
// HandleEvent handles inbound messages to the service directly // HandleEvent handles inbound messages to the service directly
// TODO: handle requests from an event. We won't send a response. // TODO: handle requests from an event. We won't send a response.
func (s *rpcServer) HandleEvent(e broker.Event) error { func (s *rpcServer) HandleEvent(msg *broker.Message) error {
// formatting horrible cruft
msg := e.Message()
if msg.Header == nil { if msg.Header == nil {
// create empty map in case of headers empty to avoid panic later // create empty map in case of headers empty to avoid panic later
msg.Header = make(map[string]string) msg.Header = make(map[string]string)
@ -190,10 +187,8 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// Micro-Service is a request // Micro-Service is a request
// Micro-Topic is a message // Micro-Topic is a message
if t := msg.Header["Micro-Topic"]; len(t) > 0 { if t := msg.Header["Micro-Topic"]; len(t) > 0 {
// process the event
ev := newEvent(msg)
// TODO: handle the error event // TODO: handle the error event
if err := s.HandleEvent(ev); err != nil { if err := s.HandleEvent(newMessage(msg)); err != nil {
msg.Header["Micro-Error"] = err.Error() msg.Header["Micro-Error"] = err.Error()
} }
// write back some 200 // write back some 200
@ -706,10 +701,6 @@ func (s *rpcServer) Register() error {
opts = append(opts, broker.SubscribeContext(cx)) opts = append(opts, broker.SubscribeContext(cx))
} }
if !sb.Options().AutoAck {
opts = append(opts, broker.DisableAutoAck())
}
sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent, opts...) sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent, opts...)
if err != nil { if err != nil {
return err return err

View File

@ -6,6 +6,8 @@ import (
"github.com/micro/go-micro/v3/registry" "github.com/micro/go-micro/v3/registry"
"github.com/micro/go-micro/v3/server" "github.com/micro/go-micro/v3/server"
"github.com/micro/go-micro/v3/broker"
"github.com/micro/go-micro/v3/transport"
) )
const ( const (
@ -28,6 +30,13 @@ type subscriber struct {
opts server.SubscriberOptions opts server.SubscriberOptions
} }
func newMessage(msg transport.Message) *broker.Message {
return &broker.Message{
Header: msg.Header,
Body: msg.Body,
}
}
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
options := server.SubscriberOptions{ options := server.SubscriberOptions{
AutoAck: true, AutoAck: true,

View File

@ -124,12 +124,9 @@ func (t *tunSubscriber) run() {
c.Close() c.Close()
// handle the message // handle the message
go t.handler(&tunEvent{ go t.handler(&broker.Message{
topic: t.topic, Header: m.Header,
message: &broker.Message{ Body: m.Body,
Header: m.Header,
Body: m.Body,
},
}) })
} }
} }