Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
b8f20924cc | ||
|
f1df0f6dfe | ||
|
58adaef339 | ||
|
7db2912d90 | ||
|
6819989195 | ||
|
b63213a225 | ||
0a8f9b0a62 |
@@ -65,6 +65,8 @@ type CallOptions struct {
|
||||
}
|
||||
|
||||
type PublishOptions struct {
|
||||
// Exchange is the routing exchange for the message
|
||||
Exchange string
|
||||
// Other options for implementations of the interface
|
||||
// can be stored in a context
|
||||
Context context.Context
|
||||
@@ -236,6 +238,13 @@ func DialTimeout(d time.Duration) Option {
|
||||
|
||||
// Call Options
|
||||
|
||||
// WithExchange sets the exchange to route a message through
|
||||
func WithExchange(e string) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
o.Exchange = e
|
||||
}
|
||||
}
|
||||
|
||||
// WithAddress sets the remote address to use rather than using service discovery
|
||||
func WithAddress(a string) CallOption {
|
||||
return func(o *CallOptions) {
|
||||
|
@@ -498,6 +498,13 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
|
||||
}
|
||||
|
||||
func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
|
||||
options := PublishOptions{
|
||||
Context: context.Background(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
md, ok := metadata.FromContext(ctx)
|
||||
if !ok {
|
||||
md = make(map[string]string)
|
||||
@@ -508,6 +515,19 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
|
||||
md["Micro-Topic"] = msg.Topic()
|
||||
md["Micro-Id"] = id
|
||||
|
||||
// set the topic
|
||||
topic := msg.Topic()
|
||||
|
||||
// get proxy
|
||||
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
|
||||
options.Exchange = prx
|
||||
}
|
||||
|
||||
// get the exchange
|
||||
if len(options.Exchange) > 0 {
|
||||
topic = options.Exchange
|
||||
}
|
||||
|
||||
// encode message body
|
||||
cf, err := r.newCodec(msg.ContentType())
|
||||
if err != nil {
|
||||
@@ -515,7 +535,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
|
||||
}
|
||||
b := &buffer{bytes.NewBuffer(nil)}
|
||||
if err := cf(b).Write(&codec.Message{
|
||||
Target: msg.Topic(),
|
||||
Target: topic,
|
||||
Type: codec.Publication,
|
||||
Header: map[string]string{
|
||||
"Micro-Id": id,
|
||||
@@ -528,7 +548,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
|
||||
r.opts.Broker.Connect()
|
||||
})
|
||||
|
||||
return r.opts.Broker.Publish(msg.Topic(), &broker.Message{
|
||||
return r.opts.Broker.Publish(topic, &broker.Message{
|
||||
Header: md,
|
||||
Body: b.Bytes(),
|
||||
})
|
||||
|
8
go.mod
8
go.mod
@@ -1,18 +1,22 @@
|
||||
module github.com/micro/go-micro
|
||||
|
||||
require (
|
||||
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 // indirect
|
||||
github.com/go-log/log v0.1.0
|
||||
github.com/golang/protobuf v1.2.0
|
||||
github.com/google/uuid v1.1.0
|
||||
github.com/hashicorp/consul v1.4.2
|
||||
github.com/hashicorp/memberlist v0.1.3
|
||||
github.com/mattn/go-colorable v0.1.1 // indirect
|
||||
github.com/micro/cli v0.1.0
|
||||
github.com/micro/go-log v0.1.0
|
||||
github.com/micro/go-rcache v0.2.0
|
||||
github.com/micro/go-rcache v0.2.1
|
||||
github.com/micro/h2c v1.0.0
|
||||
github.com/micro/mdns v0.1.0
|
||||
github.com/micro/util v0.1.0
|
||||
github.com/micro/util v0.2.0
|
||||
github.com/mitchellh/hashstructure v1.0.0
|
||||
github.com/pkg/errors v0.8.1
|
||||
golang.org/x/crypto v0.0.0-20190222235706-ffb98f73852f // indirect
|
||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd
|
||||
golang.org/x/sys v0.0.0-20190222171317-cd391775e71e // indirect
|
||||
)
|
||||
|
13
go.sum
13
go.sum
@@ -1,4 +1,5 @@
|
||||
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
|
||||
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
|
||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
|
||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
|
||||
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
||||
@@ -46,8 +47,10 @@ github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=
|
||||
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
|
||||
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-colorable v0.1.0/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
|
||||
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
|
||||
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
|
||||
github.com/micro/cli v0.0.0-20181223203424-1b0c9793c300/go.mod h1:x9x6qy+tXv17jzYWQup462+j3SIUgDa6vVTzU4IXy/w=
|
||||
github.com/micro/cli v0.1.0 h1:5DT+QdbAPPQvB3gYTgwze7tFO1m+7DU1sz9XfQczbsc=
|
||||
github.com/micro/cli v0.1.0/go.mod h1:jRT9gmfVKWSS6pkKcXQ8YhUyj6bzwxK8Fp5b0Y7qNnk=
|
||||
@@ -55,15 +58,20 @@ github.com/micro/go-log v0.1.0 h1:szYSR+yyTsomZM2jyinJC5562DlqffSjHmTZFaeZ2vY=
|
||||
github.com/micro/go-log v0.1.0/go.mod h1:qaFBF6d6Jk01Gz4cbMkCA2vVuLk3FSaLLjmEGrMCreA=
|
||||
github.com/micro/go-micro v0.23.0/go.mod h1:3z3lfMkNU9Sr1L/CxL++8pVJmQapRo0N6kNjwYDtOVs=
|
||||
github.com/micro/go-micro v0.26.0/go.mod h1:CweCFO/pq8dCSIOdzVZ4ooIpUrKlyJ0AcFB269M7PgU=
|
||||
github.com/micro/go-micro v0.26.1/go.mod h1:Jgc5gPEmDiG1TWE5Qnzzx5qyXnU9VTXKT1FkXkfvt8g=
|
||||
github.com/micro/go-rcache v0.1.0 h1:YTIgANVHgBe1XOQ/yLICL+s2gbZCAdW+c2ckhekjkuc=
|
||||
github.com/micro/go-rcache v0.1.0/go.mod h1:INzyZjXO5M+PmN2A33YxD4TaOY61xjFIM4CfSHv+At8=
|
||||
github.com/micro/go-rcache v0.2.0/go.mod h1:EoiTwbY2ubQ6lc3ScV+SnmKbelDzeFezDxPDvF8XDxw=
|
||||
github.com/micro/go-rcache v0.2.1 h1:hx24BZuhW4UVCyLE8p6fDrUetXrYDlOYSn5DSmqcjms=
|
||||
github.com/micro/go-rcache v0.2.1/go.mod h1:aPCNY3RbjBdyd6ShLENl4MDSgpAiWIU4LyNLE9+TOEo=
|
||||
github.com/micro/h2c v1.0.0/go.mod h1:54sOOQW/GRlHhH43vKwOhUb+kHaXhVxR0d3CJhn9alE=
|
||||
github.com/micro/mdns v0.0.0-20181201230301-9c3770d4057a/go.mod h1:SQG6o/94RinohLuB5noHSevg2Iqg2wXLDUn4lj2LWWo=
|
||||
github.com/micro/mdns v0.1.0 h1:fuLybUsfynbigJmCot/54i+gwe0hpc/vtCMvWt2WfDI=
|
||||
github.com/micro/mdns v0.1.0/go.mod h1:KJ0dW7KmicXU2BV++qkLlmHYcVv7/hHnbtguSWt9Aoc=
|
||||
github.com/micro/util v0.1.0 h1:ghhF5KKRNlKMexzK+cWo6W6uRAZdKy1UKG/9O74NCYc=
|
||||
github.com/micro/util v0.1.0/go.mod h1:MZgOs0nwxzv9k4xQo4fpF9IwZGF2O96F5/phP9X4/Sw=
|
||||
github.com/micro/util v0.2.0 h1:6u0cPj1TeixEk5cAR9jbcVRUWDQsmCaZvDBiM3zFZuA=
|
||||
github.com/micro/util v0.2.0/go.mod h1:SgRDkxJJluC2ZNiPfINY42ObEaCAFjL3jP5a+u+qRLU=
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/miekg/dns v1.1.4 h1:rCMZsU2ScVSYcAsOXgmC6+AKOK+6pmQTOcw03nfwYV0=
|
||||
@@ -99,6 +107,8 @@ golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXk
|
||||
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 h1:ng3VDlRp5/DHpSWl02R4rM9I+8M2rhmsuLwAMmkLQWE=
|
||||
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190222235706-ffb98f73852f h1:qWFY9ZxP3tfI37wYIs/MnIAqK0vlXp1xnYEa5HxFSSY=
|
||||
golang.org/x/crypto v0.0.0-20190222235706-ffb98f73852f/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
@@ -116,6 +126,9 @@ golang.org/x/sys v0.0.0-20190209173611-3b5209105503 h1:5SvYFrOM3W8Mexn9/oA44Ji7v
|
||||
golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190213121743-983097b1a8a3 h1:+KlxhGbYkFs8lMfwKn+2ojry1ID5eBSMXprS2u/wqCE=
|
||||
golang.org/x/sys v0.0.0-20190213121743-983097b1a8a3/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190222171317-cd391775e71e h1:oF7qaQxUH6KzFdKN4ww7NpPdo53SZi4UlcksLrb2y/o=
|
||||
golang.org/x/sys v0.0.0-20190222171317-cd391775e71e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
@@ -339,7 +339,7 @@ func (s *rpcServer) Register() error {
|
||||
s.Unlock()
|
||||
|
||||
if !registered {
|
||||
log.Logf("Registering node: %s", node.Id)
|
||||
log.Logf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
|
||||
}
|
||||
|
||||
// create registry options
|
||||
@@ -417,7 +417,7 @@ func (s *rpcServer) Deregister() error {
|
||||
Nodes: []*registry.Node{node},
|
||||
}
|
||||
|
||||
log.Logf("Deregistering node: %s", node.Id)
|
||||
log.Logf("Registry [%s] Deregistering node: %s", config.Registry.String(), node.Id)
|
||||
if err := config.Registry.Deregister(service); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -466,7 +466,7 @@ func (s *rpcServer) Start() error {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address())
|
||||
log.Logf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
|
||||
|
||||
// announce self to the world
|
||||
if err := s.Register(); err != nil {
|
||||
|
@@ -116,8 +116,8 @@ type Option func(*Options)
|
||||
|
||||
var (
|
||||
DefaultAddress = ":0"
|
||||
DefaultName = "go-server"
|
||||
DefaultVersion = "1.0.0"
|
||||
DefaultName = "server"
|
||||
DefaultVersion = "latest"
|
||||
DefaultId = uuid.New().String()
|
||||
DefaultServer Server = newRpcServer()
|
||||
DefaultRouter = newRpcRouter()
|
||||
|
@@ -133,12 +133,6 @@ func (h *httpTransportClient) Recv(m *Message) error {
|
||||
r = rc
|
||||
}
|
||||
|
||||
h.RLock()
|
||||
if h.buff == nil {
|
||||
return io.EOF
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
// set timeout if its greater than 0
|
||||
if h.ht.opts.Timeout > time.Duration(0) {
|
||||
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
|
||||
@@ -181,7 +175,6 @@ func (h *httpTransportClient) Close() error {
|
||||
h.once.Do(func() {
|
||||
h.Lock()
|
||||
h.buff.Reset(nil)
|
||||
h.buff = nil
|
||||
h.Unlock()
|
||||
close(h.r)
|
||||
})
|
||||
|
Reference in New Issue
Block a user