From d00d76bf7cfb622830da458d1fd4fa85010aab75 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 14 Apr 2018 18:21:02 +0100 Subject: [PATCH] Move publication to message --- function.go | 2 +- server/rpc_request.go | 12 ++++++------ server/server.go | 4 ++-- server/server_wrapper.go | 2 +- server/subscriber.go | 8 ++++---- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/function.go b/function.go index 97cf17aa..4379a4b3 100644 --- a/function.go +++ b/function.go @@ -23,7 +23,7 @@ func fnHandlerWrapper(f Function) server.HandlerWrapper { func fnSubWrapper(f Function) server.SubscriberWrapper { return func(s server.SubscriberFunc) server.SubscriberFunc { - return func(ctx context.Context, msg server.Publication) error { + return func(ctx context.Context, msg server.Message) error { defer f.Done() return s(ctx, msg) } diff --git a/server/rpc_request.go b/server/rpc_request.go index 5dcac361..b96286c2 100644 --- a/server/rpc_request.go +++ b/server/rpc_request.go @@ -8,10 +8,10 @@ type rpcRequest struct { stream bool } -type rpcPublication struct { +type rpcMessage struct { topic string contentType string - message interface{} + payload interface{} } func (r *rpcRequest) ContentType() string { @@ -34,14 +34,14 @@ func (r *rpcRequest) Stream() bool { return r.stream } -func (r *rpcPublication) ContentType() string { +func (r *rpcMessage) ContentType() string { return r.contentType } -func (r *rpcPublication) Topic() string { +func (r *rpcMessage) Topic() string { return r.topic } -func (r *rpcPublication) Message() interface{} { - return r.message +func (r *rpcMessage) Payload() interface{} { + return r.payload } diff --git a/server/server.go b/server/server.go index f2f76f3c..9b9f2ba4 100644 --- a/server/server.go +++ b/server/server.go @@ -21,9 +21,9 @@ type Server interface { String() string } -type Publication interface { +type Message interface { Topic() string - Message() interface{} + Payload() interface{} ContentType() string } diff --git a/server/server_wrapper.go b/server/server_wrapper.go index e90ae338..3e4d3ecd 100644 --- a/server/server_wrapper.go +++ b/server/server_wrapper.go @@ -12,7 +12,7 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error // SubscriberFunc represents a single method of a subscriber. It's used primarily // for the wrappers. What's handed to the actual method is the concrete // publication message. -type SubscriberFunc func(ctx context.Context, msg Publication) error +type SubscriberFunc func(ctx context.Context, msg Message) error // HandlerWrapper wraps the HandlerFunc and returns the equivalent type HandlerWrapper func(HandlerFunc) HandlerFunc diff --git a/server/subscriber.go b/server/subscriber.go index d69d3620..0af2457b 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -204,7 +204,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle return err } - fn := func(ctx context.Context, msg Publication) error { + fn := func(ctx context.Context, msg Message) error { var vals []reflect.Value if sb.typ.Kind() != reflect.Func { vals = append(vals, sb.rcvr) @@ -213,7 +213,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle vals = append(vals, reflect.ValueOf(ctx)) } - vals = append(vals, reflect.ValueOf(msg.Message())) + vals = append(vals, reflect.ValueOf(msg.Payload())) returnValues := handler.method.Call(vals) if err := returnValues[0].Interface(); err != nil { @@ -229,10 +229,10 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle s.wg.Add(1) go func() { defer s.wg.Done() - fn(ctx, &rpcPublication{ + fn(ctx, &rpcMessage{ topic: sb.topic, contentType: ct, - message: req.Interface(), + payload: req.Interface(), }) }() }