@@ -7,12 +7,12 @@ import (
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
|
||||
"github.com/micro/go-micro/v3/broker"
|
||||
"github.com/micro/go-micro/v3/errors"
|
||||
"github.com/micro/go-micro/v3/logger"
|
||||
"github.com/micro/go-micro/v3/metadata"
|
||||
"github.com/micro/go-micro/v3/registry"
|
||||
"github.com/micro/go-micro/v3/server"
|
||||
"github.com/unistack-org/micro/v3/broker"
|
||||
"github.com/unistack-org/micro/v3/errors"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/metadata"
|
||||
"github.com/unistack-org/micro/v3/registry"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -36,10 +36,7 @@ type subscriber struct {
|
||||
}
|
||||
|
||||
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||
options := server.SubscriberOptions{
|
||||
AutoAck: true,
|
||||
}
|
||||
|
||||
options := server.NewSubscriberOptions()
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
@@ -167,11 +164,10 @@ func validateSubscriber(sub server.Subscriber) error {
|
||||
}
|
||||
|
||||
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||
return func(msg *broker.Message) (err error) {
|
||||
|
||||
return func(p broker.Event) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error("panic recovered: ", r)
|
||||
logger.Error(string(debug.Stack()))
|
||||
}
|
||||
@@ -179,6 +175,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
|
||||
}
|
||||
}()
|
||||
|
||||
msg := p.Message()
|
||||
// if we don't have headers, create empty map
|
||||
if msg.Header == nil {
|
||||
msg.Header = make(map[string]string)
|
||||
|
Reference in New Issue
Block a user