Compare commits

..

26 Commits

Author SHA1 Message Date
Asim Aslam
2a70aef658 Merge pull request #429 from printfcoder/master
add Chinese version guide
2019-03-03 13:46:12 +00:00
shuxian
598de823ba add Chinese version guide 2019-03-03 21:36:44 +08:00
shuxian
6802de63ff readme zh-cn version 2019-02-28 16:57:31 +08:00
Asim Aslam
99c80d0878 Merge pull request #426 from printfcoder/master
solve consul.NewRegistry httpclient 'nil pointer dereference' bug
2019-02-28 07:10:23 +00:00
shuxian
d3f447a732 solve NewRegistry httpclient 'nil pointer dereference' bug 2019-02-28 09:56:57 +08:00
Asim Aslam
b8f20924cc proxy publish 2019-02-23 17:06:17 +00:00
Asim Aslam
f1df0f6dfe update go modules 2019-02-23 16:29:15 +00:00
Asim Aslam
58adaef339 Add Exchange option 2019-02-23 10:50:53 +00:00
Asim Aslam
7db2912d90 add more verbose output 2019-02-15 17:20:09 +00:00
Asim Aslam
6819989195 change default name/version 2019-02-15 16:14:41 +00:00
Asim Aslam
b63213a225 Merge pull request #420 from unistack-org/race_transport
fix race in http transport
2019-02-15 15:58:49 +00:00
0a8f9b0a62 fix race in http transport
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2019-02-15 17:20:00 +03:00
Asim Aslam
e29ca94a93 Update go modules 2019-02-13 14:41:01 +00:00
Asim Aslam
f4be7d018d delete context file 2019-02-13 14:39:38 +00:00
Asim Aslam
7cb466359f rework gossip registry 2019-02-13 14:39:20 +00:00
Asim Aslam
c3722877c1 Merge pull request #417 from unistack-org/gossip
registry: [gossip] fix panic
2019-02-13 13:41:28 +00:00
f961c571bd registry: [gossip] fix panic
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x88 pc=0xd1b215]

goroutine 8 [running]:
sync.(*RWMutex).RLock(...)
        /var/home/vtolstov/sdk/go1.12beta2/src/sync/rwmutex.go:48
github.com/hashicorp/memberlist.(*Memberlist).LocalNode(0x0, 0x0)
        /home/vtolstov/devel/projects/centralv2/vendor/github.com/hashicorp/memberlist/memberlist.go:417 +0x35
github.com/micro/go-micro/registry/gossip.(*gossipRegistry).run.func3(0xc000155880)
        /home/vtolstov/devel/projects/centralv2/vendor/github.com/micro/go-micro/registry/gossip/gossip.go:565 +0xf5
created by github.com/micro/go-micro/registry/gossip.(*gossipRegistry).run
        /home/vtolstov/devel/projects/centralv2/vendor/github.com/micro/go-micro/registry/gossip/gossip.go:553 +0xa25

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2019-02-13 16:36:38 +03:00
Asim Aslam
d2fdbcc742 Update go modules 2019-02-13 13:32:55 +00:00
Asim Aslam
0cdae40f04 Merge pull request #416 from jiyeyuran/patch-4
reuse rcache
2019-02-13 09:58:49 +00:00
xinfei.wu
a56929d1b8 reuse rcache 2019-02-13 17:47:31 +08:00
Asim Aslam
c9bcdc8438 Merge pull request #415 from unistack-org/rejoin
registry: gossip add Reconnect and Timeout
2019-02-12 14:37:45 +00:00
36532c94b2 registry: [gossip] add ConnectRetry and ConnectTimeout
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2019-02-12 17:16:35 +03:00
Asim Aslam
3580cd1b1e no sponsors 2019-02-11 18:37:40 +00:00
Asim Aslam
a3ecd36763 add ability to set address 2019-02-11 18:37:25 +00:00
Asim Aslam
78b7ee9078 update readme 2019-02-09 12:25:34 +00:00
Asim Aslam
82bcb8748e update go modules 2019-02-07 12:42:45 +00:00
17 changed files with 604 additions and 646 deletions

View File

@@ -2,6 +2,8 @@
Go Micro is a framework for micro service development.
Read this in other languages: [Simplified Chinese](./README.zh-cn.md)
## Overview
Go Micro provides the core requirements for distributed systems development including RPC and Event driven communication.
@@ -31,7 +33,7 @@ across the services and retry a different node if there's a problem.
to seamlessly encode and decode Go types for you. Any variety of messages could be encoded and sent from different clients. The client
and server handle this by default. This includes protobuf and json by default.
- **Sync Streaming** - RPC based request/response with support for bidirectional streaming. We provide an abstraction for synchronous
- **Request/Response** - RPC based request/response with support for bidirectional streaming. We provide an abstraction for synchronous
communication. A request made to a service will be automatically resolved, load balanced, dialled and streamed. The default
transport is http/1.1 or http2 when tls is enabled.
@@ -52,5 +54,3 @@ See the [docs](https://micro.mu/docs/go-micro.html) for detailed information on
Sixt is an Enterprise Sponsor of Micro
<a href="https://micro.mu/blog/2016/04/25/announcing-sixt-sponsorship.html"><img src="https://micro.mu/sixt_logo.png" width=150px height="auto" /></a>
Become a sponsor by backing micro on [Patreon](https://www.patreon.com/microhq)

41
README.zh-cn.md Normal file
View File

@@ -0,0 +1,41 @@
# Go Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![GoDoc](https://godoc.org/github.com/micro/go-micro?status.svg)](https://godoc.org/github.com/micro/go-micro) [![Travis CI](https://api.travis-ci.org/micro/go-micro.svg?branch=master)](https://travis-ci.org/micro/go-micro) [![Go Report Card](https://goreportcard.com/badge/micro/go-micro)](https://goreportcard.com/report/github.com/micro/go-micro)
Go Micro是基于Golang的微服务开发框架。
## 概览
Go Micro提供分布式系统开发的核心库包含RPC与事件驱动的通信机制。
**micro**的设计哲学是可插拔的架构理念,她提供可快速构建系统的组件,并且可以根据自身的需求剥离默认实现并自行定制。
<img src="https://micro.mu/docs/images/go-micro.svg" />
所有插件可在仓库[github.com/micro/go-plugins](https://github.com/micro/go-plugins)中找到。
可以订阅我们的[Twitter](https://twitter.com/microhq)或者加入[Slack](http://slack.micro.mu/)论坛。
## 特性
Go Micro把分布式系统的各种细节抽象出来。下面是它的主要特性。
- **服务发现Service Discovery** - 自动服务注册与名称解析。服务发现是微服务开发中的核心。当服务A要与服务B协作时它得知道B在哪里。默认的服务发现系统是Consul而multicast DNS (mdns组播)机制作为本地解决方案或者零依赖的P2P网络中的SWIM协议gossip
- **负载均衡Load Balancing** - 在服务发现之上构建了负载均衡机制。当我们得到一个服务的任意多个的实例节点时,我们要一个机制去决定要路由到哪一个节点。我们使用随机处理过的哈希负载均衡机制来保证对服务请求颁发的均匀分布,并且在发生问题时进行重试。
- **消息编码Message Encoding** - 支持基于内容类型content-type动态编码消息。客户端和服务端会一起使用content-type的格式来对Go进行无缝编/解码。各种各样的消息被编码会发送到不同的客户端客户端服服务端默认会处理这些消息。content-type默认包含proto-rpc和json-rpc。
- **Request/Response** - RPC通信基于支持双向流的请求/响应方式我们提供有抽象的同步通信机制。请求发送到服务时会自动解析、负载均衡、拨号、转成字节流。默认的传输协议是http/1.1而tls下使用http2协议。
- **异步消息Async Messaging** - 发布订阅PubSub头等功能内置在异步通信与事件驱动架构中。事件通知在微服务开发中处于核心位置。默认的消息传送使用点到点http/1.1激活tls时则使用http2。
- **可插拔接口Pluggable Interfaces** - Go Micro为每个分布式系统抽象出接口。因此Go Micro的接口都是可插拔的允许其在运行时不可知的情况下仍可支持。所以只要实现接口可以在内部使用任何的技术。更多插件请参考[github.com/micro/go-plugins](https://github.com/micro/go-plugins)。
## 快速上手
更多关于架构、安装的资料可以查看[文档](https://micro.mu/docs/go-micro_cn.html)。
## 赞助商
Sixt是我们的赞助商。
<a href="https://micro.mu/blog/2016/04/25/announcing-sixt-sponsorship.html"><img src="https://micro.mu/sixt_logo.png" width=150px height="auto" /></a>

View File

@@ -65,6 +65,8 @@ type CallOptions struct {
}
type PublishOptions struct {
// Exchange is the routing exchange for the message
Exchange string
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
@@ -236,6 +238,13 @@ func DialTimeout(d time.Duration) Option {
// Call Options
// WithExchange sets the exchange to route a message through
func WithExchange(e string) PublishOption {
return func(o *PublishOptions) {
o.Exchange = e
}
}
// WithAddress sets the remote address to use rather than using service discovery
func WithAddress(a string) CallOption {
return func(o *CallOptions) {

View File

@@ -498,6 +498,13 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
}
func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
options := PublishOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
md, ok := metadata.FromContext(ctx)
if !ok {
md = make(map[string]string)
@@ -508,6 +515,19 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
md["Micro-Topic"] = msg.Topic()
md["Micro-Id"] = id
// set the topic
topic := msg.Topic()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
options.Exchange = prx
}
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
// encode message body
cf, err := r.newCodec(msg.ContentType())
if err != nil {
@@ -515,7 +535,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
}
b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{
Target: msg.Topic(),
Target: topic,
Type: codec.Publication,
Header: map[string]string{
"Micro-Id": id,
@@ -528,7 +548,7 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
r.opts.Broker.Connect()
})
return r.opts.Broker.Publish(msg.Topic(), &broker.Message{
return r.opts.Broker.Publish(topic, &broker.Message{
Header: md,
Body: b.Bytes(),
})

11
go.mod
View File

@@ -1,17 +1,22 @@
module github.com/micro/go-micro
require (
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2 // indirect
github.com/go-log/log v0.1.0
github.com/golang/protobuf v1.2.0
github.com/google/uuid v1.1.0
github.com/hashicorp/consul v1.4.2
github.com/hashicorp/memberlist v0.1.3
github.com/mattn/go-colorable v0.1.1 // indirect
github.com/micro/cli v0.1.0
github.com/micro/go-log v0.1.0
github.com/micro/go-rcache v0.1.0
github.com/micro/go-rcache v0.2.1
github.com/micro/h2c v1.0.0
github.com/micro/mdns v0.1.0
github.com/micro/util v0.1.0
github.com/micro/util v0.2.0
github.com/mitchellh/hashstructure v1.0.0
github.com/pkg/errors v0.8.1
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3
golang.org/x/crypto v0.0.0-20190222235706-ffb98f73852f // indirect
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd
golang.org/x/sys v0.0.0-20190222171317-cd391775e71e // indirect
)

51
go.sum
View File

@@ -1,7 +1,9 @@
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/circbuf v0.0.0-20190214190532-5111143e8da2/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -26,10 +28,11 @@ github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqk
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-rootcerts v1.0.0 h1:ueI78wUjYExhCvMLow4icJnayNNFRgy0d9EGs/a1T44=
github.com/hashicorp/go-rootcerts v1.0.0 h1:Rqb66Oo1X/eSV1x66xbDccZjhJigjg0+e82kpwzSwCI=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-sockaddr v1.0.1 h1:eCkkJ5KOOktDvwbsE9KPyiBWaOfp1ZNy2gLHgL8PSBM=
github.com/hashicorp/go-sockaddr v1.0.1/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
@@ -43,28 +46,40 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p
github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.0/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/micro/cli v0.0.0-20181223203424-1b0c9793c300/go.mod h1:x9x6qy+tXv17jzYWQup462+j3SIUgDa6vVTzU4IXy/w=
github.com/micro/cli v0.1.0 h1:5DT+QdbAPPQvB3gYTgwze7tFO1m+7DU1sz9XfQczbsc=
github.com/micro/cli v0.1.0/go.mod h1:jRT9gmfVKWSS6pkKcXQ8YhUyj6bzwxK8Fp5b0Y7qNnk=
github.com/micro/go-log v0.1.0 h1:szYSR+yyTsomZM2jyinJC5562DlqffSjHmTZFaeZ2vY=
github.com/micro/go-log v0.1.0/go.mod h1:qaFBF6d6Jk01Gz4cbMkCA2vVuLk3FSaLLjmEGrMCreA=
github.com/micro/go-micro v0.23.0/go.mod h1:3z3lfMkNU9Sr1L/CxL++8pVJmQapRo0N6kNjwYDtOVs=
github.com/micro/go-micro v0.26.0/go.mod h1:CweCFO/pq8dCSIOdzVZ4ooIpUrKlyJ0AcFB269M7PgU=
github.com/micro/go-micro v0.26.1/go.mod h1:Jgc5gPEmDiG1TWE5Qnzzx5qyXnU9VTXKT1FkXkfvt8g=
github.com/micro/go-rcache v0.1.0 h1:YTIgANVHgBe1XOQ/yLICL+s2gbZCAdW+c2ckhekjkuc=
github.com/micro/go-rcache v0.1.0/go.mod h1:INzyZjXO5M+PmN2A33YxD4TaOY61xjFIM4CfSHv+At8=
github.com/micro/h2c v1.0.0 h1:ejw6MS5+WaUoMHRtqkVCCrrVzLMzOFEH52rEyd8Fl2I=
github.com/micro/go-rcache v0.2.0/go.mod h1:EoiTwbY2ubQ6lc3ScV+SnmKbelDzeFezDxPDvF8XDxw=
github.com/micro/go-rcache v0.2.1 h1:hx24BZuhW4UVCyLE8p6fDrUetXrYDlOYSn5DSmqcjms=
github.com/micro/go-rcache v0.2.1/go.mod h1:aPCNY3RbjBdyd6ShLENl4MDSgpAiWIU4LyNLE9+TOEo=
github.com/micro/h2c v1.0.0/go.mod h1:54sOOQW/GRlHhH43vKwOhUb+kHaXhVxR0d3CJhn9alE=
github.com/micro/mdns v0.0.0-20181201230301-9c3770d4057a/go.mod h1:SQG6o/94RinohLuB5noHSevg2Iqg2wXLDUn4lj2LWWo=
github.com/micro/mdns v0.1.0 h1:fuLybUsfynbigJmCot/54i+gwe0hpc/vtCMvWt2WfDI=
github.com/micro/mdns v0.1.0/go.mod h1:KJ0dW7KmicXU2BV++qkLlmHYcVv7/hHnbtguSWt9Aoc=
github.com/micro/util v0.1.0 h1:ghhF5KKRNlKMexzK+cWo6W6uRAZdKy1UKG/9O74NCYc=
github.com/micro/util v0.1.0/go.mod h1:MZgOs0nwxzv9k4xQo4fpF9IwZGF2O96F5/phP9X4/Sw=
github.com/micro/util v0.2.0 h1:6u0cPj1TeixEk5cAR9jbcVRUWDQsmCaZvDBiM3zFZuA=
github.com/micro/util v0.2.0/go.mod h1:SgRDkxJJluC2ZNiPfINY42ObEaCAFjL3jP5a+u+qRLU=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.3 h1:1g0r1IvskvgL8rR+AcHzUA+oFmGcQlaIm4IqakufeMM=
github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.4 h1:rCMZsU2ScVSYcAsOXgmC6+AKOK+6pmQTOcw03nfwYV0=
github.com/miekg/dns v1.1.4/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0 h1:vKb8ShqSby24Yrqr/yDYkuFz8d0WUjys40rvnGC8aR0=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure v1.0.0 h1:ZkRJX1CyOoTkar7p/mLS5TZU4nJ1Rn/F8u9dGS02Q3Y=
github.com/mitchellh/hashstructure v1.0.0/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ=
@@ -77,25 +92,45 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/posener/complete v1.2.1/go.mod h1:6gapUrK/U1TAN7ciCoNRIdVC5sbdBTUh1DKN0g6uH7E=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664 h1:YbZJ76lQ1BqNhVe7dKTSB67wDrc2VPRR75IyGyyPDX8=
golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 h1:ng3VDlRp5/DHpSWl02R4rM9I+8M2rhmsuLwAMmkLQWE=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190222235706-ffb98f73852f h1:qWFY9ZxP3tfI37wYIs/MnIAqK0vlXp1xnYEa5HxFSSY=
golang.org/x/crypto v0.0.0-20190222235706-ffb98f73852f/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 h1:ulvT7fqt0yHWzpJwI57MezWnYDVpCAYBVuYst/L+fAY=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 h1:bfLnR+k0tq5Lqt6dflRLcZiz6UaXCMt3vhYJ1l4FQ80=
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd h1:HuTn7WObtcDo9uEEU7rEqL0jYthdXAmZ6PP+meazmaU=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc h1:WiYx1rIFmx8c0mXAFtv5D/mHyKe1+jmuP7PViuwqwuQ=
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952 h1:FDfvYgoVsA7TTZSbgiqjAbfPbK47CNHdWl3h/PJtii0=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190209173611-3b5209105503 h1:5SvYFrOM3W8Mexn9/oA44Ji7vhXAZQ9hiP+1Q/DMrWg=
golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190213121743-983097b1a8a3 h1:+KlxhGbYkFs8lMfwKn+2ojry1ID5eBSMXprS2u/wqCE=
golang.org/x/sys v0.0.0-20190213121743-983097b1a8a3/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222171317-cd391775e71e h1:oF7qaQxUH6KzFdKN4ww7NpPdo53SZi4UlcksLrb2y/o=
golang.org/x/sys v0.0.0-20190222171317-cd391775e71e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@@ -122,6 +122,13 @@ func Transport(t transport.Transport) Option {
// Convenience options
// Address sets the address of the server
func Address(addr string) Option {
return func(o *Options) {
o.Server.Init(server.Address(addr))
}
}
// Name of the service
func Name(n string) Option {
return func(o *Options) {

View File

@@ -105,12 +105,13 @@ func configure(c *consulRegistry, opts ...registry.Option) {
}
}
if config.HttpClient == nil {
config.HttpClient = new(http.Client)
}
// requires secure connection?
if c.opts.Secure || c.opts.TLSConfig != nil {
if config.HttpClient == nil {
config.HttpClient = new(http.Client)
}
config.Scheme = "https"
// We're going to support InsecureSkipVerify
config.HttpClient.Transport = newTransport(c.opts.TLSConfig)

View File

@@ -1,17 +0,0 @@
package gossip
import (
"context"
"github.com/micro/go-micro/registry"
)
// setRegistryOption returns a function to setup a context with given value
func setRegistryOption(k, v interface{}) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}

View File

@@ -1,17 +1,16 @@
// Package Gossip provides a gossip registry based on hashicorp/memberlist
// Package gossip provides a gossip registry based on hashicorp/memberlist
package gossip
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/golang/protobuf/proto"
@@ -35,6 +34,13 @@ const (
actionTypeSync
)
const (
nodeActionUnknown int32 = iota
nodeActionJoin
nodeActionLeave
nodeActionUpdate
)
func actionTypeString(t int32) string {
switch t {
case actionTypeCreate:
@@ -64,18 +70,45 @@ type delegate struct {
updates chan *update
}
type gossipRegistry struct {
queue *memberlist.TransmitLimitedQueue
updates chan *update
options registry.Options
member *memberlist.Memberlist
interval time.Duration
type event struct {
action int32
node string
}
type eventDelegate struct {
events chan *event
}
func (ed *eventDelegate) NotifyJoin(n *memberlist.Node) {
ed.events <- &event{action: nodeActionJoin, node: n.Address()}
}
func (ed *eventDelegate) NotifyLeave(n *memberlist.Node) {
ed.events <- &event{action: nodeActionLeave, node: n.Address()}
}
func (ed *eventDelegate) NotifyUpdate(n *memberlist.Node) {
ed.events <- &event{action: nodeActionUpdate, node: n.Address()}
}
type gossipRegistry struct {
queue *memberlist.TransmitLimitedQueue
updates chan *update
events chan *event
options registry.Options
member *memberlist.Memberlist
interval time.Duration
tcpInterval time.Duration
connectRetry bool
connectTimeout time.Duration
sync.RWMutex
services map[string][]*registry.Service
s sync.RWMutex
watchers map[string]chan *registry.Result
mtu int
addrs []string
members map[string]int32
done chan bool
}
type update struct {
@@ -84,10 +117,16 @@ type update struct {
sync chan *registry.Service
}
type updates struct {
sync.RWMutex
services map[uint64]*update
}
var (
// You should change this if using secure
DefaultSecret = []byte("micro-gossip-key") // exactly 16 bytes
ExpiryTick = time.Second * 5
ExpiryTick = time.Second * 1 // needs to be smaller than registry.RegisterTTL
MaxPacketSize = 512
)
func configure(g *gossipRegistry, opts ...registry.Option) error {
@@ -119,9 +158,9 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
}
// shutdown old member
if g.member != nil {
g.Stop()
}
g.Stop()
// new done chan
g.done = make(chan bool)
// replace addresses
curAddrs = newAddrs
@@ -129,19 +168,23 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// create a new default config
c := memberlist.DefaultLocalConfig()
// log to dev null
c.LogOutput = ioutil.Discard
// sane good default options
c.LogOutput = ioutil.Discard // log to /dev/null
c.PushPullInterval = 0 // disable expensive tcp push/pull
c.ProtocolVersion = 4 // suport latest stable features
if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil {
c = optConfig
// set config from options
if config, ok := g.options.Context.Value(configKey{}).(*memberlist.Config); ok && config != nil {
c = config
}
if hostport, ok := g.options.Context.Value(contextAddress{}).(string); ok {
host, port, err := net.SplitHostPort(hostport)
// set address
if address, ok := g.options.Context.Value(addressKey{}).(string); ok {
host, port, err := net.SplitHostPort(address)
if err == nil {
pn, err := strconv.Atoi(port)
p, err := strconv.Atoi(port)
if err == nil {
c.BindPort = pn
c.BindPort = p
}
c.BindAddr = host
}
@@ -150,12 +193,13 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
c.BindPort = 0
}
if hostport, ok := g.options.Context.Value(contextAdvertise{}).(string); ok {
host, port, err := net.SplitHostPort(hostport)
// set the advertise address
if advertise, ok := g.options.Context.Value(advertiseKey{}).(string); ok {
host, port, err := net.SplitHostPort(advertise)
if err == nil {
pn, err := strconv.Atoi(port)
p, err := strconv.Atoi(port)
if err == nil {
c.AdvertisePort = pn
c.AdvertisePort = p
}
c.AdvertiseAddr = host
}
@@ -169,7 +213,7 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// set a secret key if secure
if g.options.Secure {
k, ok := g.options.Context.Value(contextSecretKey{}).([]byte)
k, ok := g.options.Context.Value(secretKey{}).([]byte)
if !ok {
// use the default secret
k = DefaultSecret
@@ -177,6 +221,16 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
c.SecretKey = k
}
// set connect retry
if v, ok := g.options.Context.Value(connectRetryKey{}).(bool); ok && v {
g.connectRetry = true
}
// set connect timeout
if td, ok := g.options.Context.Value(connectTimeoutKey{}).(time.Duration); ok {
g.connectTimeout = td
}
// create a queue
queue := &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
@@ -191,27 +245,39 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
queue: queue,
}
if g.connectRetry {
c.Events = &eventDelegate{
events: g.events,
}
}
// create the memberlist
m, err := memberlist.Create(c)
if err != nil {
return err
}
// join the memberlist
// set internals
g.Lock()
if len(curAddrs) > 0 {
_, err := m.Join(curAddrs)
if err != nil {
return err
for _, addr := range curAddrs {
g.members[addr] = nodeActionUnknown
}
}
// set internals
g.tcpInterval = c.PushPullInterval
g.addrs = curAddrs
g.queue = queue
g.member = m
g.interval = c.GossipInterval
log.Logf("Registry Listening on %s", m.LocalNode().Address())
return nil
g.Unlock()
log.Logf("[gossip] Registry Listening on %s", m.LocalNode().Address())
// try connect
return g.connect(curAddrs)
}
func (*broadcast) UniqueBroadcast() {}
@@ -225,6 +291,9 @@ func (b *broadcast) Message() []byte {
if err != nil {
return nil
}
if l := len(up); l > MaxPacketSize {
log.Logf("[gossip] broadcast message size %d bigger then MaxPacketSize %d", l, MaxPacketSize)
}
return up
}
@@ -313,7 +382,6 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) {
if err := json.Unmarshal(buf, &services); err != nil {
return
}
for _, service := range services {
for _, srv := range service {
d.updates <- &update{
@@ -325,8 +393,57 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) {
}
}
func (g *gossipRegistry) connect(addrs []string) error {
if len(addrs) == 0 {
return nil
}
timeout := make(<-chan time.Time)
if g.connectTimeout > 0 {
timeout = time.After(g.connectTimeout)
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
fn := func() (int, error) {
return g.member.Join(addrs)
}
// don't wait for first try
if _, err := fn(); err == nil {
return nil
}
// wait loop
for {
select {
// context closed
case <-g.options.Context.Done():
return nil
// call close, don't wait anymore
case <-g.done:
return nil
// in case of timeout fail with a timeout error
case <-timeout:
return fmt.Errorf("[gossip] connect timeout %v", g.addrs)
// got a tick, try to connect
case <-ticker.C:
if _, err := fn(); err == nil {
log.Logf("[gossip] connect success for %v", g.addrs)
return nil
} else {
log.Logf("[gossip] connect failed for %v", g.addrs)
}
}
}
return nil
}
func (g *gossipRegistry) publish(action string, services []*registry.Service) {
g.s.RLock()
g.RLock()
for _, sub := range g.watchers {
go func(sub chan *registry.Result) {
for _, service := range services {
@@ -334,7 +451,7 @@ func (g *gossipRegistry) publish(action string, services []*registry.Service) {
}
}(sub)
}
g.s.RUnlock()
g.RUnlock()
}
func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
@@ -343,66 +460,108 @@ func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
id := uuid.New().String()
g.s.Lock()
g.Lock()
g.watchers[id] = next
g.s.Unlock()
g.Unlock()
go func() {
<-exit
g.s.Lock()
g.Lock()
delete(g.watchers, id)
close(next)
g.s.Unlock()
g.Unlock()
}()
return next, exit
}
func (g *gossipRegistry) wait() {
ctx := g.options.Context
if c, ok := ctx.Value(contextContext{}).(context.Context); ok && c != nil {
ctx = c
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
func (g *gossipRegistry) Stop() error {
select {
// wait on kill signal
case <-ch:
// wait on context cancel
case <-ctx.Done():
case <-g.done:
return nil
default:
close(g.done)
g.Lock()
if g.member != nil {
g.member.Leave(g.interval * 2)
g.member.Shutdown()
g.member = nil
}
g.Unlock()
}
g.Stop()
return nil
}
func (g *gossipRegistry) Stop() {
g.member.Leave(g.interval * 2)
g.member.Shutdown()
// connectLoop attempts to reconnect to the memberlist
func (g *gossipRegistry) connectLoop() {
// try every second
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-g.done:
return
case <-g.options.Context.Done():
g.Stop()
return
case <-ticker.C:
var addrs []string
g.RLock()
// only process if we have a memberlist
if g.member == nil {
g.RUnlock()
continue
}
// self
local := g.member.LocalNode().Address()
// operate on each member
for node, action := range g.members {
switch action {
// process leave event
case nodeActionLeave:
// don't process self
if node == local {
continue
}
addrs = append(addrs, node)
}
}
g.RUnlock()
// connect to all the members
// TODO: only connect to new members
if len(addrs) > 0 {
g.connect(addrs)
}
}
}
}
func (g *gossipRegistry) run() {
var mtx sync.Mutex
updates := map[uint64]*update{}
func (g *gossipRegistry) expiryLoop(updates *updates) {
ticker := time.NewTicker(ExpiryTick)
defer ticker.Stop()
// expiry loop
go func() {
t := time.NewTicker(ExpiryTick)
defer t.Stop()
for _ = range t.C {
for {
select {
case <-g.done:
return
case <-ticker.C:
now := uint64(time.Now().UnixNano())
mtx.Lock()
updates.Lock()
// process all the updates
for k, v := range updates {
for k, v := range updates.services {
// check if expiry time has passed
if d := (v.Update.Expires); d < now {
// delete from records
delete(updates, k)
delete(updates.services, k)
// set to delete
v.Update.Action = actionTypeDelete
// fire a new update
@@ -410,9 +569,44 @@ func (g *gossipRegistry) run() {
}
}
mtx.Unlock()
updates.Unlock()
}
}()
}
}
// process member events
func (g *gossipRegistry) eventLoop() {
for {
select {
// return when done
case <-g.done:
return
case ev := <-g.events:
// TODO: nonblocking update
g.Lock()
if _, ok := g.members[ev.node]; ok {
g.members[ev.node] = ev.action
}
g.Unlock()
}
}
}
func (g *gossipRegistry) run() {
updates := &updates{
services: make(map[uint64]*update),
}
// expiry loop
go g.expiryLoop(updates)
// event loop
go g.eventLoop()
// connect loop
if g.connectRetry {
go g.connectLoop()
}
// process the updates
for u := range g.updates {
@@ -434,9 +628,9 @@ func (g *gossipRegistry) run() {
if u.Update.Expires > 0 {
// create a hash of this service
if hash, err := hashstructure.Hash(u.Service, nil); err == nil {
mtx.Lock()
updates[hash] = u
mtx.Unlock()
updates.Lock()
updates.services[hash] = u
updates.Unlock()
}
}
case actionTypeDelete:
@@ -455,9 +649,9 @@ func (g *gossipRegistry) run() {
// delete from expiry checks
if hash, err := hashstructure.Hash(u.Service, nil); err == nil {
mtx.Lock()
delete(updates, hash)
mtx.Unlock()
updates.Lock()
delete(updates.services, hash)
updates.Unlock()
}
case actionTypeSync:
// no sync channel provided
@@ -512,6 +706,10 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register
o(&options)
}
if options.TTL == 0 && g.tcpInterval == 0 {
return fmt.Errorf("Require register TTL or interval for memberlist.Config")
}
up := &pb.Update{
Expires: uint64(time.Now().Add(options.TTL).UnixNano()),
Action: actionTypeCreate,
@@ -599,27 +797,28 @@ func (g *gossipRegistry) String() string {
}
func NewRegistry(opts ...registry.Option) registry.Registry {
gossip := &gossipRegistry{
g := &gossipRegistry{
options: registry.Options{
Context: context.Background(),
},
done: make(chan bool),
events: make(chan *event, 100),
updates: make(chan *update, 100),
services: make(map[string][]*registry.Service),
watchers: make(map[string]chan *registry.Result),
members: make(map[string]int32),
}
// run the updater
go gossip.run()
go g.run()
// configure the gossiper
if err := configure(gossip, opts...); err != nil {
log.Fatalf("Error configuring registry: %v", err)
if err := configure(g, opts...); err != nil {
log.Fatalf("[gossip] Error configuring registry: %v", err)
}
// wait for setup
<-time.After(gossip.interval * 2)
<-time.After(g.interval * 2)
go gossip.wait()
return gossip
return g
}

View File

@@ -1,7 +1,6 @@
package gossip_test
package gossip
import (
"context"
"os"
"sync"
"testing"
@@ -9,95 +8,83 @@ import (
"github.com/google/uuid"
"github.com/hashicorp/memberlist"
micro "github.com/micro/go-micro"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/gossip"
pb "github.com/micro/go-micro/registry/gossip/proto"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/server"
)
var (
r1 registry.Registry
r2 registry.Registry
mu sync.Mutex
)
func newConfig() *memberlist.Config {
wc := memberlist.DefaultLANConfig()
wc.DisableTcpPings = false
wc.GossipVerifyIncoming = false
wc.GossipVerifyOutgoing = false
wc.EnableCompression = false
wc.LogOutput = os.Stderr
wc.ProtocolVersion = 4
wc.Name = uuid.New().String()
return wc
func newMemberlistConfig() *memberlist.Config {
mc := memberlist.DefaultLANConfig()
mc.DisableTcpPings = false
mc.GossipVerifyIncoming = false
mc.GossipVerifyOutgoing = false
mc.EnableCompression = false
mc.PushPullInterval = 3 * time.Second
mc.LogOutput = os.Stderr
mc.ProtocolVersion = 4
mc.Name = uuid.New().String()
return mc
}
func newRegistries() {
mu.Lock()
defer mu.Unlock()
if r1 != nil && r2 != nil {
return
func newRegistry(opts ...registry.Option) registry.Registry {
options := []registry.Option{
ConnectRetry(true),
ConnectTimeout(60 * time.Second),
}
wc1 := newConfig()
wc2 := newConfig()
rops1 := []registry.Option{gossip.Config(wc1), gossip.Address("127.0.0.1:54321")}
rops2 := []registry.Option{gossip.Config(wc2), gossip.Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321")}
r1 = gossip.NewRegistry(rops1...) // first started without members
r2 = gossip.NewRegistry(rops2...) // second started joining
options = append(options, opts...)
r := NewRegistry(options...)
return r
}
func TestRegistryBroadcast(t *testing.T) {
newRegistries()
func TestGossipRegistryBroadcast(t *testing.T) {
mc1 := newMemberlistConfig()
r1 := newRegistry(Config(mc1), Address("127.0.0.1:54321"))
svc1 := &registry.Service{Name: "r1-svc", Version: "0.0.0.1"}
svc2 := &registry.Service{Name: "r2-svc", Version: "0.0.0.2"}
mc2 := newMemberlistConfig()
r2 := newRegistry(Config(mc2), Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321"))
<-time.After(1 * time.Second)
if err := r1.Register(svc1); err != nil {
defer r1.(*gossipRegistry).Stop()
defer r2.(*gossipRegistry).Stop()
svc1 := &registry.Service{Name: "service.1", Version: "0.0.0.1"}
svc2 := &registry.Service{Name: "service.2", Version: "0.0.0.2"}
if err := r1.Register(svc1, registry.RegisterTTL(10*time.Second)); err != nil {
t.Fatal(err)
}
<-time.After(1 * time.Second)
if err := r2.Register(svc2); err != nil {
if err := r2.Register(svc2, registry.RegisterTTL(10*time.Second)); err != nil {
t.Fatal(err)
}
var found bool
svcs, err := r1.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "r2-svc" {
if svc.Name == "service.2" {
found = true
}
}
if !found {
t.Fatalf("r2-svc not found in r1, broadcast not work")
t.Fatalf("[gossip registry] service.2 not found in r1, broadcast not work")
}
found = false
svcs, err = r2.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "r1-svc" {
if svc.Name == "service.1" {
found = true
}
}
if !found {
t.Fatalf("r1-svc not found in r2, broadcast not work")
t.Fatalf("[gossip registry] broadcast failed: service.1 not found in r2")
}
if err := r1.Deregister(svc1); err != nil {
@@ -108,38 +95,72 @@ func TestRegistryBroadcast(t *testing.T) {
}
}
func TestGossipRegistryRetry(t *testing.T) {
mc1 := newMemberlistConfig()
r1 := newRegistry(Config(mc1), Address("127.0.0.1:54321"))
func TestServerRegistry(t *testing.T) {
newRegistries()
mc2 := newMemberlistConfig()
r2 := newRegistry(Config(mc2), Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321"))
_, err := newServer("s1", r1, t)
defer r1.(*gossipRegistry).Stop()
defer r2.(*gossipRegistry).Stop()
svc1 := &registry.Service{Name: "service.1", Version: "0.0.0.1"}
svc2 := &registry.Service{Name: "service.2", Version: "0.0.0.2"}
var mu sync.Mutex
ch := make(chan struct{})
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
go func() {
for {
select {
case <-ticker.C:
mu.Lock()
if r1 != nil {
r1.Register(svc1, registry.RegisterTTL(2*time.Second))
}
if r2 != nil {
r2.Register(svc2, registry.RegisterTTL(2*time.Second))
}
if ch != nil {
close(ch)
ch = nil
}
mu.Unlock()
}
}
}()
<-ch
var found bool
svcs, err := r2.ListServices()
if err != nil {
t.Fatal(err)
}
_, err = newServer("s2", r2, t)
if err != nil {
t.Fatal(err)
}
svcs, err := r1.ListServices()
if err != nil {
t.Fatal(err)
}
if len(svcs) < 1 {
t.Fatalf("r1 svcs unknown %#+v\n", svcs)
}
found := false
for _, svc := range svcs {
if svc.Name == "s2" {
if svc.Name == "service.1" {
found = true
}
}
if !found {
t.Fatalf("r1 does not have s2, broadcast not work")
t.Fatalf("[gossip registry] broadcast failed: service.1 not found in r2")
}
if err = r1.(*gossipRegistry).Stop(); err != nil {
t.Fatalf("[gossip registry] failed to stop registry: %v", err)
}
mu.Lock()
r1 = nil
mu.Unlock()
<-time.After(3 * time.Second)
found = false
svcs, err = r2.ListServices()
if err != nil {
@@ -147,53 +168,47 @@ func TestServerRegistry(t *testing.T) {
}
for _, svc := range svcs {
if svc.Name == "s1" {
if svc.Name == "service.1" {
found = true
}
}
if found {
t.Fatalf("[gossip registry] service.1 found in r2")
}
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
t.Logf("[gossip registry] skip test on travis")
t.Skip()
return
}
r1 = newRegistry(Config(mc1), Address("127.0.0.1:54321"))
<-time.After(2 * time.Second)
found = false
svcs, err = r2.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "service.1" {
found = true
}
}
if !found {
t.Fatalf("r2 does not have s1, broadcast not work")
t.Fatalf("[gossip registry] connect retry failed: service.1 not found in r2")
}
}
type testServer struct{}
func (*testServer) Test(ctx context.Context, req *pb.Update, rsp *pb.Update) error {
return nil
}
func newServer(n string, r registry.Registry, t *testing.T) (micro.Service, error) {
h := &testServer{}
var wg sync.WaitGroup
wg.Add(1)
sopts := []server.Option{
server.Name(n),
server.Registry(r),
if err := r1.Deregister(svc1); err != nil {
t.Fatal(err)
}
if err := r2.Deregister(svc2); err != nil {
t.Fatal(err)
}
copts := []client.Option{
client.Selector(selector.NewSelector(selector.Registry(r))),
client.Registry(r),
}
srv := micro.NewService(
micro.Server(server.NewServer(sopts...)),
micro.Client(client.NewClient(copts...)),
micro.AfterStart(func() error {
wg.Done()
return nil
}),
)
srv.Server().NewHandler(h)
go func() {
t.Fatal(srv.Run())
}()
wg.Wait()
return srv, nil
r1.(*gossipRegistry).Stop()
r2.(*gossipRegistry).Stop()
}

View File

@@ -2,45 +2,57 @@ package gossip
import (
"context"
"time"
"github.com/hashicorp/memberlist"
"github.com/micro/go-micro/registry"
)
type contextSecretKey struct{}
type secretKey struct{}
type addressKey struct{}
type configKey struct{}
type advertiseKey struct{}
type connectTimeoutKey struct{}
type connectRetryKey struct{}
// helper for setting registry options
func setRegistryOption(k, v interface{}) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}
// Secret specifies an encryption key. The value should be either
// 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256.
func Secret(k []byte) registry.Option {
return setRegistryOption(contextSecretKey{}, k)
return setRegistryOption(secretKey{}, k)
}
type contextAddress struct{}
// Address to bind to - host:port
func Address(a string) registry.Option {
return setRegistryOption(contextAddress{}, a)
return setRegistryOption(addressKey{}, a)
}
type contextConfig struct{}
// Config allow to inject a *memberlist.Config struct for configuring gossip
// Config sets *memberlist.Config for configuring gossip
func Config(c *memberlist.Config) registry.Option {
return setRegistryOption(contextConfig{}, c)
return setRegistryOption(configKey{}, c)
}
type contextAdvertise struct{}
// The address to advertise for other gossip members - host:port
// The address to advertise for other gossip members to connect to - host:port
func Advertise(a string) registry.Option {
return setRegistryOption(contextAdvertise{}, a)
return setRegistryOption(advertiseKey{}, a)
}
type contextContext struct{}
// Context specifies a context for the registry.
// Can be used to signal shutdown of the registry.
// Can be used for extra option values.
func Context(ctx context.Context) registry.Option {
return setRegistryOption(contextContext{}, ctx)
// ConnectTimeout sets the registry connect timeout. Use -1 to specify infinite timeout
func ConnectTimeout(td time.Duration) registry.Option {
return setRegistryOption(connectTimeoutKey{}, td)
}
// ConnectRetry enables reconnect to registry then connection closed,
// use with ConnectTimeout to specify how long retry
func ConnectRetry(v bool) registry.Option {
return setRegistryOption(connectRetryKey{}, v)
}

View File

@@ -11,7 +11,6 @@ type Options struct {
Timeout time.Duration
Secure bool
TLSConfig *tls.Config
// Other options for implementations of the interface
// can be stored in a context
Context context.Context

View File

@@ -1,359 +1,25 @@
package selector
import (
"sync"
"time"
"github.com/micro/go-log"
"github.com/micro/go-micro/registry"
"github.com/micro/go-rcache"
)
type registrySelector struct {
so Options
ttl time.Duration
// registry cache
sync.RWMutex
cache map[string][]*registry.Service
ttls map[string]time.Time
watched map[string]bool
// used to close or reload watcher
reload chan bool
exit chan bool
so Options
rc rcache.Cache
}
var (
DefaultTTL = time.Minute
)
// isValid checks if the service is valid
func (c *registrySelector) isValid(services []*registry.Service, ttl time.Time) bool {
// no services exist
if len(services) == 0 {
return false
}
// ttl is invalid
if ttl.IsZero() {
return false
}
// time since ttl is longer than timeout
if time.Since(ttl) > c.ttl {
return false
}
// ok
return true
}
func (c *registrySelector) quit() bool {
select {
case <-c.exit:
return true
default:
return false
}
}
// cp copies a service. Because we're caching handing back pointers would
// create a race condition, so we do this instead
// its fast enough
func (c *registrySelector) cp(current []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range current {
// copy service
s := new(registry.Service)
*s = *service
// copy nodes
var nodes []*registry.Node
for _, node := range service.Nodes {
n := new(registry.Node)
*n = *node
nodes = append(nodes, n)
}
s.Nodes = nodes
// copy endpoints
var eps []*registry.Endpoint
for _, ep := range service.Endpoints {
e := new(registry.Endpoint)
*e = *ep
eps = append(eps, e)
}
s.Endpoints = eps
// append service
services = append(services, s)
}
return services
}
func (c *registrySelector) del(service string) {
delete(c.cache, service)
delete(c.ttls, service)
}
func (c *registrySelector) get(service string) ([]*registry.Service, error) {
// read lock
c.RLock()
// check the cache first
services := c.cache[service]
// get cache ttl
ttl := c.ttls[service]
// got services && within ttl so return cache
if c.isValid(services, ttl) {
// make a copy
cp := c.cp(services)
// unlock the read
c.RUnlock()
// return servics
return cp, nil
}
// get does the actual request for a service and cache it
get := func(service string) ([]*registry.Service, error) {
// ask the registry
services, err := c.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// cache results
c.Lock()
c.set(service, c.cp(services))
c.Unlock()
return services, nil
}
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
}
// unlock the read lock
c.RUnlock()
// get and return services
return get(service)
}
func (c *registrySelector) set(service string, services []*registry.Service) {
c.cache[service] = services
c.ttls[service] = time.Now().Add(c.ttl)
}
func (c *registrySelector) update(res *registry.Result) {
if res == nil || res.Service == nil {
return
}
c.Lock()
defer c.Unlock()
services, ok := c.cache[res.Service.Name]
if !ok {
// we're not going to cache anything
// unless there was already a lookup
return
}
if len(res.Service.Nodes) == 0 {
switch res.Action {
case "delete":
c.del(res.Service.Name)
}
return
}
// existing service found
var service *registry.Service
var index int
for i, s := range services {
if s.Version == res.Service.Version {
service = s
index = i
func (c *registrySelector) newRCache() rcache.Cache {
ropts := []rcache.Option{}
if c.so.Context != nil {
if t, ok := c.so.Context.Value("selector_ttl").(time.Duration); ok {
ropts = append(ropts, rcache.WithTTL(t))
}
}
switch res.Action {
case "create", "update":
if service == nil {
c.set(res.Service.Name, append(services, res.Service))
return
}
// append old nodes to new service
for _, cur := range service.Nodes {
var seen bool
for _, node := range res.Service.Nodes {
if cur.Id == node.Id {
seen = true
break
}
}
if !seen {
res.Service.Nodes = append(res.Service.Nodes, cur)
}
}
services[index] = res.Service
c.set(res.Service.Name, services)
case "delete":
if service == nil {
return
}
var nodes []*registry.Node
// filter cur nodes to remove the dead one
for _, cur := range service.Nodes {
var seen bool
for _, del := range res.Service.Nodes {
if del.Id == cur.Id {
seen = true
break
}
}
if !seen {
nodes = append(nodes, cur)
}
}
// still got nodes, save and return
if len(nodes) > 0 {
service.Nodes = nodes
services[index] = service
c.set(service.Name, services)
return
}
// zero nodes left
// only have one thing to delete
// nuke the thing
if len(services) == 1 {
c.del(service.Name)
return
}
// still have more than 1 service
// check the version and keep what we know
var srvs []*registry.Service
for _, s := range services {
if s.Version != service.Version {
srvs = append(srvs, s)
}
}
// save
c.set(service.Name, srvs)
}
}
// run starts the cache watcher loop
// it creates a new watcher if there's a problem
// reloads the watcher if Init is called
// and returns when Close is called
func (c *registrySelector) run(name string) {
// set watcher
c.Lock()
c.watched[name] = true
c.Unlock()
// delete watcher on exit
defer func() {
c.Lock()
delete(c.watched, name)
c.Unlock()
}()
// error counter
var cerr int
for {
// exit early if already dead
if c.quit() {
return
}
// create new watcher
w, err := c.so.Registry.Watch(
registry.WatchService(name),
)
if err != nil {
if c.quit() {
return
}
cerr++
if cerr > 3 {
log.Log(err)
cerr = 0
}
time.Sleep(time.Second)
continue
}
// watch for events
if err := c.watch(w); err != nil {
if c.quit() {
return
}
cerr++
if cerr > 3 {
cerr = 0
log.Log(err)
}
continue
}
// reset err counter
cerr = 0
}
}
// watch loops the next event and calls update
// it returns if there's an error
func (c *registrySelector) watch(w registry.Watcher) error {
defer w.Stop()
// reload chan
reload := make(chan bool, 1)
// manage this loop
go func() {
// wait for exit or reload signal
select {
case <-c.exit:
case <-c.reload:
reload <- true
}
// stop the watcher
w.Stop()
}()
for {
res, err := w.Next()
if err != nil {
select {
case <-reload:
return nil
default:
return err
}
}
c.update(res)
}
return rcache.New(c.so.Registry, ropts...)
}
func (c *registrySelector) Init(opts ...Option) error {
@@ -361,15 +27,8 @@ func (c *registrySelector) Init(opts ...Option) error {
o(&c.so)
}
// reload the watcher
go func() {
select {
case <-c.exit:
return
default:
c.reload <- true
}
}()
c.rc.Stop()
c.rc = c.newRCache()
return nil
}
@@ -390,7 +49,7 @@ func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, e
// get the service
// try the cache first
// if that fails go directly to the registry
services, err := c.get(service)
services, err := c.rc.GetService(service)
if err != nil {
return nil, err
}
@@ -416,17 +75,8 @@ func (c *registrySelector) Reset(service string) {
// Close stops the watcher and destroys the cache
func (c *registrySelector) Close() error {
c.Lock()
c.cache = make(map[string][]*registry.Service)
c.watched = make(map[string]bool)
c.Unlock()
c.rc.Stop()
select {
case <-c.exit:
return nil
default:
close(c.exit)
}
return nil
}
@@ -447,21 +97,10 @@ func NewSelector(opts ...Option) Selector {
sopts.Registry = registry.DefaultRegistry
}
ttl := DefaultTTL
if sopts.Context != nil {
if t, ok := sopts.Context.Value("selector_ttl").(time.Duration); ok {
ttl = t
}
s := &registrySelector{
so: sopts,
}
s.rc = s.newRCache()
return &registrySelector{
so: sopts,
ttl: ttl,
watched: make(map[string]bool),
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1),
exit: make(chan bool),
}
return s
}

View File

@@ -339,7 +339,7 @@ func (s *rpcServer) Register() error {
s.Unlock()
if !registered {
log.Logf("Registering node: %s", node.Id)
log.Logf("Registry [%s] Registering node: %s", config.Registry.String(), node.Id)
}
// create registry options
@@ -417,7 +417,7 @@ func (s *rpcServer) Deregister() error {
Nodes: []*registry.Node{node},
}
log.Logf("Deregistering node: %s", node.Id)
log.Logf("Registry [%s] Deregistering node: %s", config.Registry.String(), node.Id)
if err := config.Registry.Deregister(service); err != nil {
return err
}
@@ -466,7 +466,7 @@ func (s *rpcServer) Start() error {
return err
}
log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address())
log.Logf("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
// announce self to the world
if err := s.Register(); err != nil {

View File

@@ -116,8 +116,8 @@ type Option func(*Options)
var (
DefaultAddress = ":0"
DefaultName = "go-server"
DefaultVersion = "1.0.0"
DefaultName = "server"
DefaultVersion = "latest"
DefaultId = uuid.New().String()
DefaultServer Server = newRpcServer()
DefaultRouter = newRpcRouter()

View File

@@ -133,12 +133,6 @@ func (h *httpTransportClient) Recv(m *Message) error {
r = rc
}
h.RLock()
if h.buff == nil {
return io.EOF
}
h.RUnlock()
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
@@ -181,7 +175,6 @@ func (h *httpTransportClient) Close() error {
h.once.Do(func() {
h.Lock()
h.buff.Reset(nil)
h.buff = nil
h.Unlock()
close(h.r)
})