Change Publication to Event

This commit is contained in:
Asim Aslam 2019-07-07 12:44:09 +01:00
parent 79b03a6825
commit 4b4ad68eb9
15 changed files with 42 additions and 42 deletions

View File

@ -120,7 +120,7 @@ func (c *conn) writeLoop() {
opts = append(opts, broker.Queue(c.queue)) opts = append(opts, broker.Queue(c.queue))
} }
subscriber, err := c.b.Subscribe(c.topic, func(p broker.Publication) error { subscriber, err := c.b.Subscribe(c.topic, func(p broker.Event) error {
b, err := json.Marshal(p.Message()) b, err := json.Marshal(p.Message())
if err != nil { if err != nil {
return nil return nil

View File

@ -15,15 +15,15 @@ type Broker interface {
// Handler is used to process messages via a subscription of a topic. // Handler is used to process messages via a subscription of a topic.
// The handler is passed a publication interface which contains the // The handler is passed a publication interface which contains the
// message and optional Ack method to acknowledge receipt of the message. // message and optional Ack method to acknowledge receipt of the message.
type Handler func(Publication) error type Handler func(Event) error
type Message struct { type Message struct {
Header map[string]string Header map[string]string
Body []byte Body []byte
} }
// Publication is given to a subscription handler for processing // Event is given to a subscription handler for processing
type Publication interface { type Event interface {
Topic() string Topic() string
Message() *Message Message() *Message
Ack() error Ack() error

View File

@ -59,7 +59,7 @@ type httpSubscriber struct {
hb *httpBroker hb *httpBroker
} }
type httpPublication struct { type httpEvent struct {
m *Message m *Message
t string t string
} }
@ -155,15 +155,15 @@ func newHttpBroker(opts ...Option) Broker {
return h return h
} }
func (h *httpPublication) Ack() error { func (h *httpEvent) Ack() error {
return nil return nil
} }
func (h *httpPublication) Message() *Message { func (h *httpEvent) Message() *Message {
return h.m return h.m
} }
func (h *httpPublication) Topic() string { func (h *httpEvent) Topic() string {
return h.t return h.t
} }
@ -323,7 +323,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return return
} }
p := &httpPublication{m: m, t: topic} p := &httpEvent{m: m, t: topic}
id := req.Form.Get("id") id := req.Form.Get("id")
h.RLock() h.RLock()

View File

@ -47,7 +47,7 @@ func sub(be *testing.B, c int) {
done := make(chan bool, c) done := make(chan bool, c)
for i := 0; i < c; i++ { for i := 0; i < c; i++ {
sub, err := b.Subscribe(topic, func(p Publication) error { sub, err := b.Subscribe(topic, func(p Event) error {
done <- true done <- true
m := p.Message() m := p.Message()
@ -107,7 +107,7 @@ func pub(be *testing.B, c int) {
done := make(chan bool, c*4) done := make(chan bool, c*4)
sub, err := b.Subscribe(topic, func(p Publication) error { sub, err := b.Subscribe(topic, func(p Event) error {
done <- true done <- true
m := p.Message() m := p.Message()
if string(m.Body) != string(msg.Body) { if string(m.Body) != string(msg.Body) {
@ -175,7 +175,7 @@ func TestBroker(t *testing.T) {
done := make(chan bool) done := make(chan bool)
sub, err := b.Subscribe("test", func(p Publication) error { sub, err := b.Subscribe("test", func(p Event) error {
m := p.Message() m := p.Message()
if string(m.Body) != string(msg.Body) { if string(m.Body) != string(msg.Body) {
@ -224,7 +224,7 @@ func TestConcurrentSubBroker(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
sub, err := b.Subscribe("test", func(p Publication) error { sub, err := b.Subscribe("test", func(p Event) error {
defer wg.Done() defer wg.Done()
m := p.Message() m := p.Message()
@ -279,7 +279,7 @@ func TestConcurrentPubBroker(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
sub, err := b.Subscribe("test", func(p Publication) error { sub, err := b.Subscribe("test", func(p Event) error {
defer wg.Done() defer wg.Done()
m := p.Message() m := p.Message()

View File

@ -17,7 +17,7 @@ type memoryBroker struct {
Subscribers map[string][]*memorySubscriber Subscribers map[string][]*memorySubscriber
} }
type memoryPublication struct { type memoryEvent struct {
topic string topic string
message *broker.Message message *broker.Message
} }
@ -84,7 +84,7 @@ func (m *memoryBroker) Publish(topic string, message *broker.Message, opts ...br
return nil return nil
} }
p := &memoryPublication{ p := &memoryEvent{
topic: topic, topic: topic,
message: message, message: message,
} }
@ -142,15 +142,15 @@ func (m *memoryBroker) String() string {
return "memory" return "memory"
} }
func (m *memoryPublication) Topic() string { func (m *memoryEvent) Topic() string {
return m.topic return m.topic
} }
func (m *memoryPublication) Message() *broker.Message { func (m *memoryEvent) Message() *broker.Message {
return m.message return m.message
} }
func (m *memoryPublication) Ack() error { func (m *memoryEvent) Ack() error {
return nil return nil
} }

View File

@ -17,7 +17,7 @@ func TestMemoryBroker(t *testing.T) {
topic := "test" topic := "test"
count := 10 count := 10
fn := func(p broker.Publication) error { fn := func(p broker.Event) error {
return nil return nil
} }

View File

@ -295,7 +295,7 @@ func (g *grpcClient) Options() client.Options {
} }
func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
return newGRPCPublication(topic, msg, g.opts.ContentType, opts...) return newGRPCEvent(topic, msg, g.opts.ContentType, opts...)
} }
func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
@ -498,7 +498,7 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
} }
b := &buffer{bytes.NewBuffer(nil)} b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Payload()); err != nil { if err := cf(b).Write(&codec.Message{Type: codec.Event}, p.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }

View File

@ -4,13 +4,13 @@ import (
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
) )
type grpcPublication struct { type grpcEvent struct {
topic string topic string
contentType string contentType string
payload interface{} payload interface{}
} }
func newGRPCPublication(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message { func newGRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
var options client.MessageOptions var options client.MessageOptions
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
@ -20,21 +20,21 @@ func newGRPCPublication(topic string, payload interface{}, contentType string, o
contentType = options.ContentType contentType = options.ContentType
} }
return &grpcPublication{ return &grpcEvent{
payload: payload, payload: payload,
topic: topic, topic: topic,
contentType: contentType, contentType: contentType,
} }
} }
func (g *grpcPublication) ContentType() string { func (g *grpcEvent) ContentType() string {
return g.contentType return g.contentType
} }
func (g *grpcPublication) Topic() string { func (g *grpcEvent) Topic() string {
return g.topic return g.topic
} }
func (g *grpcPublication) Payload() interface{} { func (g *grpcEvent) Payload() interface{} {
return g.payload return g.payload
} }

View File

@ -547,7 +547,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
b := &buffer{bytes.NewBuffer(nil)} b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{ if err := cf(b).Write(&codec.Message{
Target: topic, Target: topic,
Type: codec.Publication, Type: codec.Event,
Header: map[string]string{ Header: map[string]string{
"Micro-Id": id, "Micro-Id": id,
"Micro-Topic": msg.Topic(), "Micro-Topic": msg.Topic(),

View File

@ -9,7 +9,7 @@ const (
Error MessageType = iota Error MessageType = iota
Request Request
Response Response
Publication Event
) )
type MessageType int type MessageType int

View File

@ -33,7 +33,7 @@ func (j *jsonCodec) Write(m *codec.Message, b interface{}) error {
return j.c.Write(m, b) return j.c.Write(m, b)
case codec.Response, codec.Error: case codec.Response, codec.Error:
return j.s.Write(m, b) return j.s.Write(m, b)
case codec.Publication: case codec.Event:
data, err := json.Marshal(b) data, err := json.Marshal(b)
if err != nil { if err != nil {
return err return err
@ -54,7 +54,7 @@ func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
return j.s.ReadHeader(m) return j.s.ReadHeader(m)
case codec.Response: case codec.Response:
return j.c.ReadHeader(m) return j.c.ReadHeader(m)
case codec.Publication: case codec.Event:
_, err := io.Copy(j.buf, j.rwc) _, err := io.Copy(j.buf, j.rwc)
return err return err
default: default:
@ -69,7 +69,7 @@ func (j *jsonCodec) ReadBody(b interface{}) error {
return j.s.ReadBody(b) return j.s.ReadBody(b)
case codec.Response: case codec.Response:
return j.c.ReadBody(b) return j.c.ReadBody(b)
case codec.Publication: case codec.Event:
if b != nil { if b != nil {
return json.Unmarshal(j.buf.Bytes(), b) return json.Unmarshal(j.buf.Bytes(), b)
} }

View File

@ -99,7 +99,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
return err return err
} }
} }
case codec.Publication: case codec.Event:
data, err := proto.Marshal(b.(proto.Message)) data, err := proto.Marshal(b.(proto.Message))
if err != nil { if err != nil {
return err return err
@ -141,7 +141,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
m.Method = rtmp.GetServiceMethod() m.Method = rtmp.GetServiceMethod()
m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
m.Error = rtmp.GetError() m.Error = rtmp.GetError()
case codec.Publication: case codec.Event:
_, err := io.Copy(c.buf, c.rwc) _, err := io.Copy(c.buf, c.rwc)
return err return err
default: default:
@ -159,7 +159,7 @@ func (c *protoCodec) ReadBody(b interface{}) error {
if err != nil { if err != nil {
return err return err
} }
case codec.Publication: case codec.Event:
data = c.buf.Bytes() data = c.buf.Bytes()
default: default:
return fmt.Errorf("Unrecognised message type: %v", c.mt) return fmt.Errorf("Unrecognised message type: %v", c.mt)

View File

@ -167,7 +167,7 @@ func validateSubscriber(sub server.Subscriber) error {
} }
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
return func(p broker.Publication) error { return func(p broker.Event) error {
msg := p.Message() msg := p.Message()
ct := msg.Header["Content-Type"] ct := msg.Header["Content-Type"]
if len(ct) == 0 { if len(ct) == 0 {
@ -208,7 +208,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
co := cf(b) co := cf(b)
defer co.Close() defer co.Close()
if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil {
return err return err
} }

View File

@ -165,7 +165,7 @@ func validateSubscriber(sub Subscriber) error {
} }
func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Publication) error { return func(p broker.Event) error {
msg := p.Message() msg := p.Message()
// get codec // get codec
@ -214,7 +214,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
co := cf(b) co := cf(b)
defer co.Close() defer co.Close()
if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil {
return err return err
} }

View File

@ -47,7 +47,7 @@ func (t *Task) Run(c task.Command) error {
errCh := make(chan error, t.Options.Pool) errCh := make(chan error, t.Options.Pool)
// subscribe for distributed work // subscribe for distributed work
workFn := func(p broker.Publication) error { workFn := func(p broker.Event) error {
msg := p.Message() msg := p.Message()
// get command name // get command name
@ -110,7 +110,7 @@ func (t *Task) Run(c task.Command) error {
} }
// subscribe to all status messages // subscribe to all status messages
subStatus, err := t.Broker.Subscribe(topic, func(p broker.Publication) error { subStatus, err := t.Broker.Subscribe(topic, func(p broker.Event) error {
msg := p.Message() msg := p.Message()
// get command name // get command name