Decruft the broker by removing Event interface (#1940)
This commit is contained in:
parent
03af0bddc6
commit
242c0acf1e
4
grpc.go
4
grpc.go
@ -753,10 +753,6 @@ func (g *grpcServer) Register() error {
|
||||
opts = append(opts, broker.SubscribeContext(cx))
|
||||
}
|
||||
|
||||
if !sb.Options().AutoAck {
|
||||
opts = append(opts, broker.DisableAutoAck())
|
||||
}
|
||||
|
||||
if logger.V(logger.InfoLevel, logger.DefaultLogger) {
|
||||
logger.Infof("Subscribing to topic: %s", sb.Topic())
|
||||
}
|
||||
|
@ -167,7 +167,7 @@ func validateSubscriber(sub server.Subscriber) error {
|
||||
}
|
||||
|
||||
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||
return func(p broker.Event) (err error) {
|
||||
return func(msg *broker.Message) (err error) {
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
@ -179,7 +179,6 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
|
||||
}
|
||||
}()
|
||||
|
||||
msg := p.Message()
|
||||
// if we don't have headers, create empty map
|
||||
if msg.Header == nil {
|
||||
msg.Header = make(map[string]string)
|
||||
|
Loading…
x
Reference in New Issue
Block a user