commit
be33d9204a
@ -120,7 +120,7 @@ func (c *conn) writeLoop() {
|
|||||||
opts = append(opts, broker.Queue(c.queue))
|
opts = append(opts, broker.Queue(c.queue))
|
||||||
}
|
}
|
||||||
|
|
||||||
subscriber, err := c.b.Subscribe(c.topic, func(p broker.Publication) error {
|
subscriber, err := c.b.Subscribe(c.topic, func(p broker.Event) error {
|
||||||
b, err := json.Marshal(p.Message())
|
b, err := json.Marshal(p.Message())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -15,15 +15,15 @@ type Broker interface {
|
|||||||
// Handler is used to process messages via a subscription of a topic.
|
// Handler is used to process messages via a subscription of a topic.
|
||||||
// The handler is passed a publication interface which contains the
|
// The handler is passed a publication interface which contains the
|
||||||
// message and optional Ack method to acknowledge receipt of the message.
|
// message and optional Ack method to acknowledge receipt of the message.
|
||||||
type Handler func(Publication) error
|
type Handler func(Event) error
|
||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
Header map[string]string
|
Header map[string]string
|
||||||
Body []byte
|
Body []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publication is given to a subscription handler for processing
|
// Event is given to a subscription handler for processing
|
||||||
type Publication interface {
|
type Event interface {
|
||||||
Topic() string
|
Topic() string
|
||||||
Message() *Message
|
Message() *Message
|
||||||
Ack() error
|
Ack() error
|
||||||
|
@ -59,7 +59,7 @@ type httpSubscriber struct {
|
|||||||
hb *httpBroker
|
hb *httpBroker
|
||||||
}
|
}
|
||||||
|
|
||||||
type httpPublication struct {
|
type httpEvent struct {
|
||||||
m *Message
|
m *Message
|
||||||
t string
|
t string
|
||||||
}
|
}
|
||||||
@ -155,15 +155,15 @@ func newHttpBroker(opts ...Option) Broker {
|
|||||||
return h
|
return h
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpPublication) Ack() error {
|
func (h *httpEvent) Ack() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpPublication) Message() *Message {
|
func (h *httpEvent) Message() *Message {
|
||||||
return h.m
|
return h.m
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpPublication) Topic() string {
|
func (h *httpEvent) Topic() string {
|
||||||
return h.t
|
return h.t
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -323,7 +323,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &httpPublication{m: m, t: topic}
|
p := &httpEvent{m: m, t: topic}
|
||||||
id := req.Form.Get("id")
|
id := req.Form.Get("id")
|
||||||
|
|
||||||
h.RLock()
|
h.RLock()
|
||||||
|
@ -47,7 +47,7 @@ func sub(be *testing.B, c int) {
|
|||||||
done := make(chan bool, c)
|
done := make(chan bool, c)
|
||||||
|
|
||||||
for i := 0; i < c; i++ {
|
for i := 0; i < c; i++ {
|
||||||
sub, err := b.Subscribe(topic, func(p Publication) error {
|
sub, err := b.Subscribe(topic, func(p Event) error {
|
||||||
done <- true
|
done <- true
|
||||||
m := p.Message()
|
m := p.Message()
|
||||||
|
|
||||||
@ -107,7 +107,7 @@ func pub(be *testing.B, c int) {
|
|||||||
|
|
||||||
done := make(chan bool, c*4)
|
done := make(chan bool, c*4)
|
||||||
|
|
||||||
sub, err := b.Subscribe(topic, func(p Publication) error {
|
sub, err := b.Subscribe(topic, func(p Event) error {
|
||||||
done <- true
|
done <- true
|
||||||
m := p.Message()
|
m := p.Message()
|
||||||
if string(m.Body) != string(msg.Body) {
|
if string(m.Body) != string(msg.Body) {
|
||||||
@ -175,7 +175,7 @@ func TestBroker(t *testing.T) {
|
|||||||
|
|
||||||
done := make(chan bool)
|
done := make(chan bool)
|
||||||
|
|
||||||
sub, err := b.Subscribe("test", func(p Publication) error {
|
sub, err := b.Subscribe("test", func(p Event) error {
|
||||||
m := p.Message()
|
m := p.Message()
|
||||||
|
|
||||||
if string(m.Body) != string(msg.Body) {
|
if string(m.Body) != string(msg.Body) {
|
||||||
@ -224,7 +224,7 @@ func TestConcurrentSubBroker(t *testing.T) {
|
|||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
sub, err := b.Subscribe("test", func(p Publication) error {
|
sub, err := b.Subscribe("test", func(p Event) error {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
m := p.Message()
|
m := p.Message()
|
||||||
@ -279,7 +279,7 @@ func TestConcurrentPubBroker(t *testing.T) {
|
|||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
sub, err := b.Subscribe("test", func(p Publication) error {
|
sub, err := b.Subscribe("test", func(p Event) error {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
m := p.Message()
|
m := p.Message()
|
||||||
|
@ -17,7 +17,7 @@ type memoryBroker struct {
|
|||||||
Subscribers map[string][]*memorySubscriber
|
Subscribers map[string][]*memorySubscriber
|
||||||
}
|
}
|
||||||
|
|
||||||
type memoryPublication struct {
|
type memoryEvent struct {
|
||||||
topic string
|
topic string
|
||||||
message *broker.Message
|
message *broker.Message
|
||||||
}
|
}
|
||||||
@ -84,7 +84,7 @@ func (m *memoryBroker) Publish(topic string, message *broker.Message, opts ...br
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &memoryPublication{
|
p := &memoryEvent{
|
||||||
topic: topic,
|
topic: topic,
|
||||||
message: message,
|
message: message,
|
||||||
}
|
}
|
||||||
@ -142,15 +142,15 @@ func (m *memoryBroker) String() string {
|
|||||||
return "memory"
|
return "memory"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryPublication) Topic() string {
|
func (m *memoryEvent) Topic() string {
|
||||||
return m.topic
|
return m.topic
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryPublication) Message() *broker.Message {
|
func (m *memoryEvent) Message() *broker.Message {
|
||||||
return m.message
|
return m.message
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryPublication) Ack() error {
|
func (m *memoryEvent) Ack() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,7 +17,7 @@ func TestMemoryBroker(t *testing.T) {
|
|||||||
topic := "test"
|
topic := "test"
|
||||||
count := 10
|
count := 10
|
||||||
|
|
||||||
fn := func(p broker.Publication) error {
|
fn := func(p broker.Event) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -295,7 +295,7 @@ func (g *grpcClient) Options() client.Options {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
|
func (g *grpcClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
|
||||||
return newGRPCPublication(topic, msg, g.opts.ContentType, opts...)
|
return newGRPCEvent(topic, msg, g.opts.ContentType, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
|
func (g *grpcClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
|
||||||
@ -498,7 +498,7 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
|
|||||||
}
|
}
|
||||||
|
|
||||||
b := &buffer{bytes.NewBuffer(nil)}
|
b := &buffer{bytes.NewBuffer(nil)}
|
||||||
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Payload()); err != nil {
|
if err := cf(b).Write(&codec.Message{Type: codec.Event}, p.Payload()); err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,13 +4,13 @@ import (
|
|||||||
"github.com/micro/go-micro/client"
|
"github.com/micro/go-micro/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
type grpcPublication struct {
|
type grpcEvent struct {
|
||||||
topic string
|
topic string
|
||||||
contentType string
|
contentType string
|
||||||
payload interface{}
|
payload interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGRPCPublication(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
|
func newGRPCEvent(topic string, payload interface{}, contentType string, opts ...client.MessageOption) client.Message {
|
||||||
var options client.MessageOptions
|
var options client.MessageOptions
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
@ -20,21 +20,21 @@ func newGRPCPublication(topic string, payload interface{}, contentType string, o
|
|||||||
contentType = options.ContentType
|
contentType = options.ContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
return &grpcPublication{
|
return &grpcEvent{
|
||||||
payload: payload,
|
payload: payload,
|
||||||
topic: topic,
|
topic: topic,
|
||||||
contentType: contentType,
|
contentType: contentType,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcPublication) ContentType() string {
|
func (g *grpcEvent) ContentType() string {
|
||||||
return g.contentType
|
return g.contentType
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcPublication) Topic() string {
|
func (g *grpcEvent) Topic() string {
|
||||||
return g.topic
|
return g.topic
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcPublication) Payload() interface{} {
|
func (g *grpcEvent) Payload() interface{} {
|
||||||
return g.payload
|
return g.payload
|
||||||
}
|
}
|
||||||
|
@ -547,7 +547,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
|
|||||||
b := &buffer{bytes.NewBuffer(nil)}
|
b := &buffer{bytes.NewBuffer(nil)}
|
||||||
if err := cf(b).Write(&codec.Message{
|
if err := cf(b).Write(&codec.Message{
|
||||||
Target: topic,
|
Target: topic,
|
||||||
Type: codec.Publication,
|
Type: codec.Event,
|
||||||
Header: map[string]string{
|
Header: map[string]string{
|
||||||
"Micro-Id": id,
|
"Micro-Id": id,
|
||||||
"Micro-Topic": msg.Topic(),
|
"Micro-Topic": msg.Topic(),
|
||||||
|
@ -9,7 +9,7 @@ const (
|
|||||||
Error MessageType = iota
|
Error MessageType = iota
|
||||||
Request
|
Request
|
||||||
Response
|
Response
|
||||||
Publication
|
Event
|
||||||
)
|
)
|
||||||
|
|
||||||
type MessageType int
|
type MessageType int
|
||||||
|
@ -33,7 +33,7 @@ func (j *jsonCodec) Write(m *codec.Message, b interface{}) error {
|
|||||||
return j.c.Write(m, b)
|
return j.c.Write(m, b)
|
||||||
case codec.Response, codec.Error:
|
case codec.Response, codec.Error:
|
||||||
return j.s.Write(m, b)
|
return j.s.Write(m, b)
|
||||||
case codec.Publication:
|
case codec.Event:
|
||||||
data, err := json.Marshal(b)
|
data, err := json.Marshal(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -54,7 +54,7 @@ func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
|||||||
return j.s.ReadHeader(m)
|
return j.s.ReadHeader(m)
|
||||||
case codec.Response:
|
case codec.Response:
|
||||||
return j.c.ReadHeader(m)
|
return j.c.ReadHeader(m)
|
||||||
case codec.Publication:
|
case codec.Event:
|
||||||
_, err := io.Copy(j.buf, j.rwc)
|
_, err := io.Copy(j.buf, j.rwc)
|
||||||
return err
|
return err
|
||||||
default:
|
default:
|
||||||
@ -69,7 +69,7 @@ func (j *jsonCodec) ReadBody(b interface{}) error {
|
|||||||
return j.s.ReadBody(b)
|
return j.s.ReadBody(b)
|
||||||
case codec.Response:
|
case codec.Response:
|
||||||
return j.c.ReadBody(b)
|
return j.c.ReadBody(b)
|
||||||
case codec.Publication:
|
case codec.Event:
|
||||||
if b != nil {
|
if b != nil {
|
||||||
return json.Unmarshal(j.buf.Bytes(), b)
|
return json.Unmarshal(j.buf.Bytes(), b)
|
||||||
}
|
}
|
||||||
|
@ -99,7 +99,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case codec.Publication:
|
case codec.Event:
|
||||||
data, err := proto.Marshal(b.(proto.Message))
|
data, err := proto.Marshal(b.(proto.Message))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -141,7 +141,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
|
|||||||
m.Method = rtmp.GetServiceMethod()
|
m.Method = rtmp.GetServiceMethod()
|
||||||
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
|
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
|
||||||
m.Error = rtmp.GetError()
|
m.Error = rtmp.GetError()
|
||||||
case codec.Publication:
|
case codec.Event:
|
||||||
_, err := io.Copy(c.buf, c.rwc)
|
_, err := io.Copy(c.buf, c.rwc)
|
||||||
return err
|
return err
|
||||||
default:
|
default:
|
||||||
@ -159,7 +159,7 @@ func (c *protoCodec) ReadBody(b interface{}) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case codec.Publication:
|
case codec.Event:
|
||||||
data = c.buf.Bytes()
|
data = c.buf.Bytes()
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Unrecognised message type: %v", c.mt)
|
return fmt.Errorf("Unrecognised message type: %v", c.mt)
|
||||||
|
@ -167,7 +167,7 @@ func validateSubscriber(sub server.Subscriber) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||||
return func(p broker.Publication) error {
|
return func(p broker.Event) error {
|
||||||
msg := p.Message()
|
msg := p.Message()
|
||||||
ct := msg.Header["Content-Type"]
|
ct := msg.Header["Content-Type"]
|
||||||
if len(ct) == 0 {
|
if len(ct) == 0 {
|
||||||
@ -208,7 +208,7 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
|
|||||||
co := cf(b)
|
co := cf(b)
|
||||||
defer co.Close()
|
defer co.Close()
|
||||||
|
|
||||||
if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil {
|
if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,7 +165,7 @@ func validateSubscriber(sub Subscriber) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
|
||||||
return func(p broker.Publication) error {
|
return func(p broker.Event) error {
|
||||||
msg := p.Message()
|
msg := p.Message()
|
||||||
|
|
||||||
// get codec
|
// get codec
|
||||||
@ -214,7 +214,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
|
|||||||
co := cf(b)
|
co := cf(b)
|
||||||
defer co.Close()
|
defer co.Close()
|
||||||
|
|
||||||
if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil {
|
if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,7 +47,7 @@ func (t *Task) Run(c task.Command) error {
|
|||||||
errCh := make(chan error, t.Options.Pool)
|
errCh := make(chan error, t.Options.Pool)
|
||||||
|
|
||||||
// subscribe for distributed work
|
// subscribe for distributed work
|
||||||
workFn := func(p broker.Publication) error {
|
workFn := func(p broker.Event) error {
|
||||||
msg := p.Message()
|
msg := p.Message()
|
||||||
|
|
||||||
// get command name
|
// get command name
|
||||||
@ -110,7 +110,7 @@ func (t *Task) Run(c task.Command) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// subscribe to all status messages
|
// subscribe to all status messages
|
||||||
subStatus, err := t.Broker.Subscribe(topic, func(p broker.Publication) error {
|
subStatus, err := t.Broker.Subscribe(topic, func(p broker.Event) error {
|
||||||
msg := p.Message()
|
msg := p.Message()
|
||||||
|
|
||||||
// get command name
|
// get command name
|
||||||
|
Loading…
Reference in New Issue
Block a user