Change QueueName to Queue

This commit is contained in:
Asim 2016-05-10 10:55:18 +01:00
parent 0d21b08928
commit aea81315d0
3 changed files with 7 additions and 7 deletions

View File

@ -76,8 +76,8 @@ func DisableAutoAck() SubscribeOption {
} }
} }
// QueueName sets the name of the queue to share messages on // Queue sets the name of the queue to share messages on
func QueueName(name string) SubscribeOption { func Queue(name string) SubscribeOption {
return func(o *SubscribeOptions) { return func(o *SubscribeOptions) {
o.Queue = name o.Queue = name
} }

View File

@ -19,7 +19,7 @@ func sharedSub() {
_, err := broker.Subscribe(topic, func(p broker.Publication) error { _, err := broker.Subscribe(topic, func(p broker.Publication) error {
fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
return nil return nil
}, broker.QueueName("consumer")) }, broker.Queue("consumer"))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }

View File

@ -46,9 +46,11 @@ func newRpcServer(opts ...Option) Server {
func (s *rpcServer) accept(sock transport.Socket) { func (s *rpcServer) accept(sock transport.Socket) {
defer func() { defer func() {
// close socket
sock.Close()
if r := recover(); r != nil { if r := recover(); r != nil {
log.Print(r, string(debug.Stack())) log.Print(r, string(debug.Stack()))
sock.Close()
} }
}() }()
@ -67,7 +69,6 @@ func (s *rpcServer) accept(sock transport.Socket) {
}, },
Body: []byte(err.Error()), Body: []byte(err.Error()),
}) })
sock.Close()
return return
} }
@ -85,7 +86,6 @@ func (s *rpcServer) accept(sock transport.Socket) {
// TODO: needs better error handling // TODO: needs better error handling
if err := s.rpc.serveRequest(ctx, codec, ct); err != nil { if err := s.rpc.serveRequest(ctx, codec, ct); err != nil {
log.Printf("Unexpected error serving request, closing socket: %v", err) log.Printf("Unexpected error serving request, closing socket: %v", err)
sock.Close()
} }
} }
@ -258,7 +258,7 @@ func (s *rpcServer) Register() error {
handler := s.createSubHandler(sb, s.opts) handler := s.createSubHandler(sb, s.opts)
var opts []broker.SubscribeOption var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 { if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.QueueName(queue)) opts = append(opts, broker.Queue(queue))
} }
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil { if err != nil {