diff --git a/grpc.go b/grpc.go index d9146f0..1de8a83 100644 --- a/grpc.go +++ b/grpc.go @@ -753,10 +753,6 @@ func (g *grpcServer) Register() error { opts = append(opts, broker.SubscribeContext(cx)) } - if !sb.Options().AutoAck { - opts = append(opts, broker.DisableAutoAck()) - } - if logger.V(logger.InfoLevel, logger.DefaultLogger) { logger.Infof("Subscribing to topic: %s", sb.Topic()) } diff --git a/subscriber.go b/subscriber.go index 28f86ab..d69379a 100644 --- a/subscriber.go +++ b/subscriber.go @@ -167,7 +167,7 @@ func validateSubscriber(sub server.Subscriber) error { } func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { - return func(p broker.Event) (err error) { + return func(msg *broker.Message) (err error) { defer func() { if r := recover(); r != nil { @@ -179,7 +179,6 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke } }() - msg := p.Message() // if we don't have headers, create empty map if msg.Header == nil { msg.Header = make(map[string]string)