diff --git a/broker/broker.go b/broker/broker.go index 25f8732b..d511cccf 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -112,6 +112,13 @@ type Message struct { Body RawMessage } +// NewMessage create broker message with topic filled +func NewMessage(topic string) *Message { + m := &Message{Header: metadata.New(2)} + m.Header.Set(metadata.HeaderTopic, topic) + return m +} + // Subscriber is a convenience return type for the Subscribe method type Subscriber interface { // Options returns subscriber options diff --git a/broker/memory.go b/broker/memory.go index 93b66772..10420cc8 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -6,6 +6,7 @@ import ( "github.com/google/uuid" "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/metadata" maddr "github.com/unistack-org/micro/v3/util/addr" mnet "github.com/unistack-org/micro/v3/util/net" "github.com/unistack-org/micro/v3/util/rand" @@ -113,14 +114,14 @@ func (m *memoryBroker) BatchPublish(ctx context.Context, msgs []*Message, opts . if m.opts.Codec == nil { m.RLock() for _, msg := range msgs { - topic, _ := msg.Header.Get("Micro-Topic") + topic, _ := msg.Header.Get(metadata.HeaderTopic) vs = append(vs, msgWrapper{topic: topic, body: m}) } m.RUnlock() } else { m.RLock() for _, msg := range msgs { - topic, _ := msg.Header.Get("Micro-Topic") + topic, _ := msg.Header.Get(metadata.HeaderTopic) buf, err := m.opts.Codec.Marshal(msg) if err != nil { m.RUnlock() diff --git a/broker/memory_test.go b/broker/memory_test.go index 27bba388..8c2da4fb 100644 --- a/broker/memory_test.go +++ b/broker/memory_test.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "testing" + + "github.com/unistack-org/micro/v3/metadata" ) func TestMemoryBatchBroker(t *testing.T) { @@ -30,9 +32,9 @@ func TestMemoryBatchBroker(t *testing.T) { for i := 0; i < count; i++ { message := &Message{ Header: map[string]string{ - "Micro-Topic": topic, - "foo": "bar", - "id": fmt.Sprintf("%d", i), + metadata.HeaderTopic: topic, + "foo": "bar", + "id": fmt.Sprintf("%d", i), }, Body: []byte(`"hello world"`), } @@ -75,9 +77,9 @@ func TestMemoryBroker(t *testing.T) { for i := 0; i < count; i++ { message := &Message{ Header: map[string]string{ - "Micro-Topic": topic, - "foo": "bar", - "id": fmt.Sprintf("%d", i), + metadata.HeaderTopic: topic, + "foo": "bar", + "id": fmt.Sprintf("%d", i), }, Body: []byte(`"hello world"`), } diff --git a/client/noop.go b/client/noop.go index c68a69f5..b6ae8fbb 100644 --- a/client/noop.go +++ b/client/noop.go @@ -190,8 +190,8 @@ func (n *noopClient) Publish(ctx context.Context, p Message, opts ...PublishOpti if !ok { md = metadata.New(0) } - md["Content-Type"] = p.ContentType() - md["Micro-Topic"] = p.Topic() + md[metadata.HeaderContentType] = p.ContentType() + md[metadata.HeaderTopic] = p.Topic() // passed in raw data if d, ok := p.Payload().(*codec.Frame); ok { diff --git a/metadata/metadata.go b/metadata/metadata.go index 4e23c368..846d1720 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -6,8 +6,18 @@ import ( "sort" ) -// HeaderPrefix for all headers passed -var HeaderPrefix = "Micro-" +var ( + // HeaderTopic is the header name that contains topic name + HeaderTopic = "Micro-Topic" + // HeaderContentType specifies content type of message + HeaderContentType = "Content-Type" + // HeaderEndpoint specifies endpoint in service + HeaderEndpoint = "Micro-Endpoint" + // HeaderService specifies service + HeaderService = "Micro-Service" + // HeaderTimeout specifies timeout of operation + HeaderTimeout = "Micro-Timeout" +) // Metadata is our way of representing request headers internally. // They're used at the RPC level and translate back and forth diff --git a/network/tunnel/broker/broker.go b/network/tunnel/broker/broker.go index 1e9732e2..42d4fac5 100644 --- a/network/tunnel/broker/broker.go +++ b/network/tunnel/broker/broker.go @@ -78,7 +78,7 @@ func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, op var err error for _, msg := range msgs { - topic, _ := msg.Header.Get("Micro-Topic") + topic, _ := msg.Header.Get(metadata.HeaderTopic) c, ok := topicMap[topic] if !ok { c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))