diff --git a/broker/options.go b/broker/options.go index 8b44f6aa..6256cac5 100644 --- a/broker/options.go +++ b/broker/options.go @@ -76,8 +76,8 @@ func DisableAutoAck() SubscribeOption { } } -// QueueName sets the name of the queue to share messages on -func QueueName(name string) SubscribeOption { +// Queue sets the name of the queue to share messages on +func Queue(name string) SubscribeOption { return func(o *SubscribeOptions) { o.Queue = name } diff --git a/examples/pubsub/consumer/consumer.go b/examples/pubsub/consumer/consumer.go index c9033f8a..465a4736 100644 --- a/examples/pubsub/consumer/consumer.go +++ b/examples/pubsub/consumer/consumer.go @@ -19,7 +19,7 @@ func sharedSub() { _, err := broker.Subscribe(topic, func(p broker.Publication) error { fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) return nil - }, broker.QueueName("consumer")) + }, broker.Queue("consumer")) if err != nil { fmt.Println(err) } diff --git a/server/rpc_server.go b/server/rpc_server.go index b9874a92..ea607fec 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -46,9 +46,11 @@ func newRpcServer(opts ...Option) Server { func (s *rpcServer) accept(sock transport.Socket) { defer func() { + // close socket + sock.Close() + if r := recover(); r != nil { log.Print(r, string(debug.Stack())) - sock.Close() } }() @@ -67,7 +69,6 @@ func (s *rpcServer) accept(sock transport.Socket) { }, Body: []byte(err.Error()), }) - sock.Close() return } @@ -85,7 +86,6 @@ func (s *rpcServer) accept(sock transport.Socket) { // TODO: needs better error handling if err := s.rpc.serveRequest(ctx, codec, ct); err != nil { log.Printf("Unexpected error serving request, closing socket: %v", err) - sock.Close() } } @@ -258,7 +258,7 @@ func (s *rpcServer) Register() error { handler := s.createSubHandler(sb, s.opts) var opts []broker.SubscribeOption if queue := sb.Options().Queue; len(queue) > 0 { - opts = append(opts, broker.QueueName(queue)) + opts = append(opts, broker.Queue(queue)) } sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) if err != nil {