diff --git a/api/handler/broker/broker.go b/api/handler/broker/broker.go index a9f0800d..2d2905d9 100644 --- a/api/handler/broker/broker.go +++ b/api/handler/broker/broker.go @@ -120,7 +120,7 @@ func (c *conn) writeLoop() { opts = append(opts, broker.Queue(c.queue)) } - subscriber, err := c.b.Subscribe(c.topic, func(p broker.Publication) error { + subscriber, err := c.b.Subscribe(c.topic, func(p broker.Event) error { b, err := json.Marshal(p.Message()) if err != nil { return nil diff --git a/broker/broker.go b/broker/broker.go index 28fe953a..b7b9dcb0 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -15,15 +15,15 @@ type Broker interface { // Handler is used to process messages via a subscription of a topic. // The handler is passed a publication interface which contains the // message and optional Ack method to acknowledge receipt of the message. -type Handler func(Publication) error +type Handler func(Event) error type Message struct { Header map[string]string Body []byte } -// Publication is given to a subscription handler for processing -type Publication interface { +// Event is given to a subscription handler for processing +type Event interface { Topic() string Message() *Message Ack() error diff --git a/broker/http_broker.go b/broker/http_broker.go index 01e39495..d148da5d 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -59,7 +59,7 @@ type httpSubscriber struct { hb *httpBroker } -type httpPublication struct { +type httpEvent struct { m *Message t string } @@ -155,15 +155,15 @@ func newHttpBroker(opts ...Option) Broker { return h } -func (h *httpPublication) Ack() error { +func (h *httpEvent) Ack() error { return nil } -func (h *httpPublication) Message() *Message { +func (h *httpEvent) Message() *Message { return h.m } -func (h *httpPublication) Topic() string { +func (h *httpEvent) Topic() string { return h.t } @@ -323,7 +323,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - p := &httpPublication{m: m, t: topic} + p := &httpEvent{m: m, t: topic} id := req.Form.Get("id") h.RLock() diff --git a/broker/http_broker_test.go b/broker/http_broker_test.go index 83db509b..4695033e 100644 --- a/broker/http_broker_test.go +++ b/broker/http_broker_test.go @@ -47,7 +47,7 @@ func sub(be *testing.B, c int) { done := make(chan bool, c) for i := 0; i < c; i++ { - sub, err := b.Subscribe(topic, func(p Publication) error { + sub, err := b.Subscribe(topic, func(p Event) error { done <- true m := p.Message() @@ -107,7 +107,7 @@ func pub(be *testing.B, c int) { done := make(chan bool, c*4) - sub, err := b.Subscribe(topic, func(p Publication) error { + sub, err := b.Subscribe(topic, func(p Event) error { done <- true m := p.Message() if string(m.Body) != string(msg.Body) { @@ -175,7 +175,7 @@ func TestBroker(t *testing.T) { done := make(chan bool) - sub, err := b.Subscribe("test", func(p Publication) error { + sub, err := b.Subscribe("test", func(p Event) error { m := p.Message() if string(m.Body) != string(msg.Body) { @@ -224,7 +224,7 @@ func TestConcurrentSubBroker(t *testing.T) { var wg sync.WaitGroup for i := 0; i < 10; i++ { - sub, err := b.Subscribe("test", func(p Publication) error { + sub, err := b.Subscribe("test", func(p Event) error { defer wg.Done() m := p.Message() @@ -279,7 +279,7 @@ func TestConcurrentPubBroker(t *testing.T) { var wg sync.WaitGroup - sub, err := b.Subscribe("test", func(p Publication) error { + sub, err := b.Subscribe("test", func(p Event) error { defer wg.Done() m := p.Message() diff --git a/broker/memory/memory.go b/broker/memory/memory.go index 7446bb21..fb077faa 100644 --- a/broker/memory/memory.go +++ b/broker/memory/memory.go @@ -17,7 +17,7 @@ type memoryBroker struct { Subscribers map[string][]*memorySubscriber } -type memoryPublication struct { +type memoryEvent struct { topic string message *broker.Message } @@ -84,7 +84,7 @@ func (m *memoryBroker) Publish(topic string, message *broker.Message, opts ...br return nil } - p := &memoryPublication{ + p := &memoryEvent{ topic: topic, message: message, } @@ -142,15 +142,15 @@ func (m *memoryBroker) String() string { return "memory" } -func (m *memoryPublication) Topic() string { +func (m *memoryEvent) Topic() string { return m.topic } -func (m *memoryPublication) Message() *broker.Message { +func (m *memoryEvent) Message() *broker.Message { return m.message } -func (m *memoryPublication) Ack() error { +func (m *memoryEvent) Ack() error { return nil } diff --git a/broker/memory/memory_test.go b/broker/memory/memory_test.go index 5c0021b1..625c7ee7 100644 --- a/broker/memory/memory_test.go +++ b/broker/memory/memory_test.go @@ -17,7 +17,7 @@ func TestMemoryBroker(t *testing.T) { topic := "test" count := 10 - fn := func(p broker.Publication) error { + fn := func(p broker.Event) error { return nil } diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 939019f5..809eec2a 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -295,7 +295,7 @@ func (g *grpcClient) Options() client.Options { } func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { - return newGRPCPublication(topic, msg, g.opts.ContentType, opts...) + return newGRPCEvent(topic, msg, g.opts.ContentType, opts...) } func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { @@ -498,7 +498,7 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie } b := &buffer{bytes.NewBuffer(nil)} - if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Payload()); err != nil { + if err := cf(b).Write(&codec.Message{Type: codec.Event}, p.Payload()); err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } diff --git a/client/grpc/message.go b/client/grpc/message.go index 6938064e..5691868e 100644 --- a/client/grpc/message.go +++ b/client/grpc/message.go @@ -4,13 +4,13 @@ import ( "github.com/micro/go-micro/client" ) -type grpcPublication struct { +type grpcEvent struct { topic string contentType string payload interface{} } -func newGRPCPublication(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message { +func newGRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message { var options client.MessageOptions for _, o := range opts { o(&options) @@ -20,21 +20,21 @@ func newGRPCPublication(topic string, payload interface{}, contentType string, o contentType = options.ContentType } - return &grpcPublication{ + return &grpcEvent{ payload: payload, topic: topic, contentType: contentType, } } -func (g *grpcPublication) ContentType() string { +func (g *grpcEvent) ContentType() string { return g.contentType } -func (g *grpcPublication) Topic() string { +func (g *grpcEvent) Topic() string { return g.topic } -func (g *grpcPublication) Payload() interface{} { +func (g *grpcEvent) Payload() interface{} { return g.payload } diff --git a/client/rpc_client.go b/client/rpc_client.go index 52b2a092..d2ebe74a 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -547,7 +547,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt b := &buffer{bytes.NewBuffer(nil)} if err := cf(b).Write(&codec.Message{ Target: topic, - Type: codec.Publication, + Type: codec.Event, Header: map[string]string{ "Micro-Id": id, "Micro-Topic": msg.Topic(), diff --git a/codec/codec.go b/codec/codec.go index 092caa05..b4feb0a4 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -9,7 +9,7 @@ const ( Error MessageType = iota Request Response - Publication + Event ) type MessageType int diff --git a/codec/jsonrpc/jsonrpc.go b/codec/jsonrpc/jsonrpc.go index 8f0c2c4f..f98b2842 100644 --- a/codec/jsonrpc/jsonrpc.go +++ b/codec/jsonrpc/jsonrpc.go @@ -33,7 +33,7 @@ func (j *jsonCodec) Write(m *codec.Message, b interface{}) error { return j.c.Write(m, b) case codec.Response, codec.Error: return j.s.Write(m, b) - case codec.Publication: + case codec.Event: data, err := json.Marshal(b) if err != nil { return err @@ -54,7 +54,7 @@ func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { return j.s.ReadHeader(m) case codec.Response: return j.c.ReadHeader(m) - case codec.Publication: + case codec.Event: _, err := io.Copy(j.buf, j.rwc) return err default: @@ -69,7 +69,7 @@ func (j *jsonCodec) ReadBody(b interface{}) error { return j.s.ReadBody(b) case codec.Response: return j.c.ReadBody(b) - case codec.Publication: + case codec.Event: if b != nil { return json.Unmarshal(j.buf.Bytes(), b) } diff --git a/codec/protorpc/protorpc.go b/codec/protorpc/protorpc.go index 4732b98e..255b93a4 100644 --- a/codec/protorpc/protorpc.go +++ b/codec/protorpc/protorpc.go @@ -99,7 +99,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { return err } } - case codec.Publication: + case codec.Event: data, err := proto.Marshal(b.(proto.Message)) if err != nil { return err @@ -141,7 +141,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { m.Method = rtmp.GetServiceMethod() m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) m.Error = rtmp.GetError() - case codec.Publication: + case codec.Event: _, err := io.Copy(c.buf, c.rwc) return err default: @@ -159,7 +159,7 @@ func (c *protoCodec) ReadBody(b interface{}) error { if err != nil { return err } - case codec.Publication: + case codec.Event: data = c.buf.Bytes() default: return fmt.Errorf("Unrecognised message type: %v", c.mt) diff --git a/server/grpc/subscriber.go b/server/grpc/subscriber.go index 7a9f19af..97b9d75e 100644 --- a/server/grpc/subscriber.go +++ b/server/grpc/subscriber.go @@ -167,7 +167,7 @@ func validateSubscriber(sub server.Subscriber) error { } func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { - return func(p broker.Publication) error { + return func(p broker.Event) error { msg := p.Message() ct := msg.Header["Content-Type"] if len(ct) == 0 { @@ -208,7 +208,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke co := cf(b) defer co.Close() - if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { + if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil { return err } diff --git a/server/subscriber.go b/server/subscriber.go index 77971565..3d87fea1 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -165,7 +165,7 @@ func validateSubscriber(sub Subscriber) error { } func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { - return func(p broker.Publication) error { + return func(p broker.Event) error { msg := p.Message() // get codec @@ -214,7 +214,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle co := cf(b) defer co.Close() - if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { + if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil { return err } diff --git a/sync/task/broker/broker.go b/sync/task/broker/broker.go index af0ee673..f643fb2d 100644 --- a/sync/task/broker/broker.go +++ b/sync/task/broker/broker.go @@ -47,7 +47,7 @@ func (t *Task) Run(c task.Command) error { errCh := make(chan error, t.Options.Pool) // subscribe for distributed work - workFn := func(p broker.Publication) error { + workFn := func(p broker.Event) error { msg := p.Message() // get command name @@ -110,7 +110,7 @@ func (t *Task) Run(c task.Command) error { } // subscribe to all status messages - subStatus, err := t.Broker.Subscribe(topic, func(p broker.Publication) error { + subStatus, err := t.Broker.Subscribe(topic, func(p broker.Event) error { msg := p.Message() // get command name