Compare commits

...

7 Commits

Author SHA1 Message Date
Asim Aslam
b8f20924cc proxy publish 2019-02-23 17:06:17 +00:00
Asim Aslam
f1df0f6dfe update go modules 2019-02-23 16:29:15 +00:00
Asim Aslam
58adaef339 Add Exchange option 2019-02-23 10:50:53 +00:00
Asim Aslam
7db2912d90 add more verbose output 2019-02-15 17:20:09 +00:00
Asim Aslam
6819989195 change default name/version 2019-02-15 16:14:41 +00:00
Asim Aslam
b63213a225 Merge pull request #420 from unistack-org/race_transport
fix race in http transport
2019-02-15 15:58:49 +00:00
0a8f9b0a62 fix race in http transport
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2019-02-15 17:20:00 +03:00
7 changed files with 55 additions and 16 deletions

View File

@@ -65,6 +65,8 @@ type CallOptions struct {
}
type PublishOptions struct {
// Exchange is the routing exchange for the message
Exchange string
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
@@ -236,6 +238,13 @@ func DialTimeout(d time.Duration) Option {
// Call Options
// WithExchange sets the exchange to route a message through
func WithExchange(e string) PublishOption {
return func(o *PublishOptions) {
o.Exchange = e
}
}
// WithAddress sets the remote address to use rather than using service discovery
func WithAddress(a string) CallOption {
return func(o *CallOptions) {

View File

@@ -498,6 +498,13 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
}
func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
options := PublishOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
md, ok := metadata.FromContext(ctx)
if !ok {
md = make(map[string]string)
@@ -508,6 +515,19 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
md["Micro-Topic"] = msg.Topic()
md["Micro-Id"] = id
// set the topic
topic := msg.Topic()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
options.Exchange = prx
}
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
// encode message body
cf, err := r.newCodec(msg.ContentType())
if err != nil {
@@ -515,7 +535,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
}
b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{
Target: msg.Topic(),
Target: topic,
Type: codec.Publication,
Header: map[string]string{
"Micro-Id": id,
@@ -528,7 +548,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
r.opts.Broker.Connect()
})
return r.opts.Broker.Publish(msg.Topic(), &broker.Message{
return r.opts.Broker.Publish(topic, &broker.Message{
Header: md,
Body: b.Bytes(),
})

8
go.mod
View File

@@ -1,18 +1,22 @@
module github.com/micro/go-micro
require (
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 // indirect
github.com/go-log/log v0.1.0
github.com/golang/protobuf v1.2.0
github.com/google/uuid v1.1.0
github.com/hashicorp/consul v1.4.2
github.com/hashicorp/memberlist v0.1.3
github.com/mattn/go-colorable v0.1.1 // indirect
github.com/micro/cli v0.1.0
github.com/micro/go-log v0.1.0
github.com/micro/go-rcache v0.2.0
github.com/micro/go-rcache v0.2.1
github.com/micro/h2c v1.0.0
github.com/micro/mdns v0.1.0
github.com/micro/util v0.1.0
github.com/micro/util v0.2.0
github.com/mitchellh/hashstructure v1.0.0
github.com/pkg/errors v0.8.1
golang.org/x/crypto v0.0.0-20190222235706-ffb98f73852f // indirect
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd
golang.org/x/sys v0.0.0-20190222171317-cd391775e71e // indirect
)

13
go.sum
View File

@@ -1,4 +1,5 @@
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
@@ -46,8 +47,10 @@ github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.0/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/micro/cli v0.0.0-20181223203424-1b0c9793c300/go.mod h1:x9x6qy+tXv17jzYWQup462+j3SIUgDa6vVTzU4IXy/w=
github.com/micro/cli v0.1.0 h1:5DT+QdbAPPQvB3gYTgwze7tFO1m+7DU1sz9XfQczbsc=
github.com/micro/cli v0.1.0/go.mod h1:jRT9gmfVKWSS6pkKcXQ8YhUyj6bzwxK8Fp5b0Y7qNnk=
@@ -55,15 +58,20 @@ github.com/micro/go-log v0.1.0 h1:szYSR+yyTsomZM2jyinJC5562DlqffSjHmTZFaeZ2vY=
github.com/micro/go-log v0.1.0/go.mod h1:qaFBF6d6Jk01Gz4cbMkCA2vVuLk3FSaLLjmEGrMCreA=
github.com/micro/go-micro v0.23.0/go.mod h1:3z3lfMkNU9Sr1L/CxL++8pVJmQapRo0N6kNjwYDtOVs=
github.com/micro/go-micro v0.26.0/go.mod h1:CweCFO/pq8dCSIOdzVZ4ooIpUrKlyJ0AcFB269M7PgU=
github.com/micro/go-micro v0.26.1/go.mod h1:Jgc5gPEmDiG1TWE5Qnzzx5qyXnU9VTXKT1FkXkfvt8g=
github.com/micro/go-rcache v0.1.0 h1:YTIgANVHgBe1XOQ/yLICL+s2gbZCAdW+c2ckhekjkuc=
github.com/micro/go-rcache v0.1.0/go.mod h1:INzyZjXO5M+PmN2A33YxD4TaOY61xjFIM4CfSHv+At8=
github.com/micro/go-rcache v0.2.0/go.mod h1:EoiTwbY2ubQ6lc3ScV+SnmKbelDzeFezDxPDvF8XDxw=
github.com/micro/go-rcache v0.2.1 h1:hx24BZuhW4UVCyLE8p6fDrUetXrYDlOYSn5DSmqcjms=
github.com/micro/go-rcache v0.2.1/go.mod h1:aPCNY3RbjBdyd6ShLENl4MDSgpAiWIU4LyNLE9+TOEo=
github.com/micro/h2c v1.0.0/go.mod h1:54sOOQW/GRlHhH43vKwOhUb+kHaXhVxR0d3CJhn9alE=
github.com/micro/mdns v0.0.0-20181201230301-9c3770d4057a/go.mod h1:SQG6o/94RinohLuB5noHSevg2Iqg2wXLDUn4lj2LWWo=
github.com/micro/mdns v0.1.0 h1:fuLybUsfynbigJmCot/54i+gwe0hpc/vtCMvWt2WfDI=
github.com/micro/mdns v0.1.0/go.mod h1:KJ0dW7KmicXU2BV++qkLlmHYcVv7/hHnbtguSWt9Aoc=
github.com/micro/util v0.1.0 h1:ghhF5KKRNlKMexzK+cWo6W6uRAZdKy1UKG/9O74NCYc=
github.com/micro/util v0.1.0/go.mod h1:MZgOs0nwxzv9k4xQo4fpF9IwZGF2O96F5/phP9X4/Sw=
github.com/micro/util v0.2.0 h1:6u0cPj1TeixEk5cAR9jbcVRUWDQsmCaZvDBiM3zFZuA=
github.com/micro/util v0.2.0/go.mod h1:SgRDkxJJluC2ZNiPfINY42ObEaCAFjL3jP5a+u+qRLU=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.4 h1:rCMZsU2ScVSYcAsOXgmC6+AKOK+6pmQTOcw03nfwYV0=
@@ -99,6 +107,8 @@ golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXk
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 h1:ng3VDlRp5/DHpSWl02R4rM9I+8M2rhmsuLwAMmkLQWE=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190222235706-ffb98f73852f h1:qWFY9ZxP3tfI37wYIs/MnIAqK0vlXp1xnYEa5HxFSSY=
golang.org/x/crypto v0.0.0-20190222235706-ffb98f73852f/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -116,6 +126,9 @@ golang.org/x/sys v0.0.0-20190209173611-3b5209105503 h1:5SvYFrOM3W8Mexn9/oA44Ji7v
golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190213121743-983097b1a8a3 h1:+KlxhGbYkFs8lMfwKn+2ojry1ID5eBSMXprS2u/wqCE=
golang.org/x/sys v0.0.0-20190213121743-983097b1a8a3/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222171317-cd391775e71e h1:oF7qaQxUH6KzFdKN4ww7NpPdo53SZi4UlcksLrb2y/o=
golang.org/x/sys v0.0.0-20190222171317-cd391775e71e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@@ -339,7 +339,7 @@ func (s *rpcServer) Register() error {
s.Unlock()
if !registered {
log.Logf("Registering node: %s", node.Id)
log.Logf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
}
// create registry options
@@ -417,7 +417,7 @@ func (s *rpcServer) Deregister() error {
Nodes: []*registry.Node{node},
}
log.Logf("Deregistering node: %s", node.Id)
log.Logf("Registry [%s] Deregistering node: %s", config.Registry.String(), node.Id)
if err := config.Registry.Deregister(service); err != nil {
return err
}
@@ -466,7 +466,7 @@ func (s *rpcServer) Start() error {
return err
}
log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address())
log.Logf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
// announce self to the world
if err := s.Register(); err != nil {

View File

@@ -116,8 +116,8 @@ type Option func(*Options)
var (
DefaultAddress = ":0"
DefaultName = "go-server"
DefaultVersion = "1.0.0"
DefaultName = "server"
DefaultVersion = "latest"
DefaultId = uuid.New().String()
DefaultServer Server = newRpcServer()
DefaultRouter = newRpcRouter()

View File

@@ -133,12 +133,6 @@ func (h *httpTransportClient) Recv(m *Message) error {
r = rc
}
h.RLock()
if h.buff == nil {
return io.EOF
}
h.RUnlock()
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
@@ -181,7 +175,6 @@ func (h *httpTransportClient) Close() error {
h.once.Do(func() {
h.Lock()
h.buff.Reset(nil)
h.buff = nil
h.Unlock()
close(h.r)
})