Compare commits

...

14 Commits

Author SHA1 Message Date
Asim Aslam
e29ca94a93 Update go modules 2019-02-13 14:41:01 +00:00
Asim Aslam
f4be7d018d delete context file 2019-02-13 14:39:38 +00:00
Asim Aslam
7cb466359f rework gossip registry 2019-02-13 14:39:20 +00:00
Asim Aslam
c3722877c1 Merge pull request #417 from unistack-org/gossip
registry: [gossip] fix panic
2019-02-13 13:41:28 +00:00
f961c571bd registry: [gossip] fix panic
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x88 pc=0xd1b215]

goroutine 8 [running]:
sync.(*RWMutex).RLock(...)
        /var/home/vtolstov/sdk/go1.12beta2/src/sync/rwmutex.go:48
github.com/hashicorp/memberlist.(*Memberlist).LocalNode(0x0, 0x0)
        /home/vtolstov/devel/projects/centralv2/vendor/github.com/hashicorp/memberlist/memberlist.go:417 +0x35
github.com/micro/go-micro/registry/gossip.(*gossipRegistry).run.func3(0xc000155880)
        /home/vtolstov/devel/projects/centralv2/vendor/github.com/micro/go-micro/registry/gossip/gossip.go:565 +0xf5
created by github.com/micro/go-micro/registry/gossip.(*gossipRegistry).run
        /home/vtolstov/devel/projects/centralv2/vendor/github.com/micro/go-micro/registry/gossip/gossip.go:553 +0xa25

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2019-02-13 16:36:38 +03:00
Asim Aslam
d2fdbcc742 Update go modules 2019-02-13 13:32:55 +00:00
Asim Aslam
0cdae40f04 Merge pull request #416 from jiyeyuran/patch-4
reuse rcache
2019-02-13 09:58:49 +00:00
xinfei.wu
a56929d1b8 reuse rcache 2019-02-13 17:47:31 +08:00
Asim Aslam
c9bcdc8438 Merge pull request #415 from unistack-org/rejoin
registry: gossip add Reconnect and Timeout
2019-02-12 14:37:45 +00:00
36532c94b2 registry: [gossip] add ConnectRetry and ConnectTimeout
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2019-02-12 17:16:35 +03:00
Asim Aslam
3580cd1b1e no sponsors 2019-02-11 18:37:40 +00:00
Asim Aslam
a3ecd36763 add ability to set address 2019-02-11 18:37:25 +00:00
Asim Aslam
78b7ee9078 update readme 2019-02-09 12:25:34 +00:00
Asim Aslam
82bcb8748e update go modules 2019-02-07 12:42:45 +00:00
10 changed files with 502 additions and 627 deletions

View File

@@ -31,7 +31,7 @@ across the services and retry a different node if there's a problem.
to seamlessly encode and decode Go types for you. Any variety of messages could be encoded and sent from different clients. The client
and server handle this by default. This includes protobuf and json by default.
- **Sync Streaming** - RPC based request/response with support for bidirectional streaming. We provide an abstraction for synchronous
- **Request/Response** - RPC based request/response with support for bidirectional streaming. We provide an abstraction for synchronous
communication. A request made to a service will be automatically resolved, load balanced, dialled and streamed. The default
transport is http/1.1 or http2 when tls is enabled.
@@ -52,5 +52,3 @@ See the [docs](https://micro.mu/docs/go-micro.html) for detailed information on
Sixt is an Enterprise Sponsor of Micro
<a href="https://micro.mu/blog/2016/04/25/announcing-sixt-sponsorship.html"><img src="https://micro.mu/sixt_logo.png" width=150px height="auto" /></a>
Become a sponsor by backing micro on [Patreon](https://www.patreon.com/microhq)

5
go.mod
View File

@@ -1,17 +1,18 @@
module github.com/micro/go-micro
require (
github.com/go-log/log v0.1.0
github.com/golang/protobuf v1.2.0
github.com/google/uuid v1.1.0
github.com/hashicorp/consul v1.4.2
github.com/hashicorp/memberlist v0.1.3
github.com/micro/cli v0.1.0
github.com/micro/go-log v0.1.0
github.com/micro/go-rcache v0.1.0
github.com/micro/go-rcache v0.2.0
github.com/micro/h2c v1.0.0
github.com/micro/mdns v0.1.0
github.com/micro/util v0.1.0
github.com/mitchellh/hashstructure v1.0.0
github.com/pkg/errors v0.8.1
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd
)

38
go.sum
View File

@@ -2,6 +2,7 @@ github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hC
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -26,10 +27,11 @@ github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqk
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-rootcerts v1.0.0 h1:ueI78wUjYExhCvMLow4icJnayNNFRgy0d9EGs/a1T44=
github.com/hashicorp/go-rootcerts v1.0.0 h1:Rqb66Oo1X/eSV1x66xbDccZjhJigjg0+e82kpwzSwCI=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-sockaddr v1.0.1 h1:eCkkJ5KOOktDvwbsE9KPyiBWaOfp1ZNy2gLHgL8PSBM=
github.com/hashicorp/go-sockaddr v1.0.1/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
@@ -43,16 +45,19 @@ github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2p
github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.0/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/micro/cli v0.0.0-20181223203424-1b0c9793c300/go.mod h1:x9x6qy+tXv17jzYWQup462+j3SIUgDa6vVTzU4IXy/w=
github.com/micro/cli v0.1.0 h1:5DT+QdbAPPQvB3gYTgwze7tFO1m+7DU1sz9XfQczbsc=
github.com/micro/cli v0.1.0/go.mod h1:jRT9gmfVKWSS6pkKcXQ8YhUyj6bzwxK8Fp5b0Y7qNnk=
github.com/micro/go-log v0.1.0 h1:szYSR+yyTsomZM2jyinJC5562DlqffSjHmTZFaeZ2vY=
github.com/micro/go-log v0.1.0/go.mod h1:qaFBF6d6Jk01Gz4cbMkCA2vVuLk3FSaLLjmEGrMCreA=
github.com/micro/go-micro v0.23.0/go.mod h1:3z3lfMkNU9Sr1L/CxL++8pVJmQapRo0N6kNjwYDtOVs=
github.com/micro/go-micro v0.26.0/go.mod h1:CweCFO/pq8dCSIOdzVZ4ooIpUrKlyJ0AcFB269M7PgU=
github.com/micro/go-rcache v0.1.0 h1:YTIgANVHgBe1XOQ/yLICL+s2gbZCAdW+c2ckhekjkuc=
github.com/micro/go-rcache v0.1.0/go.mod h1:INzyZjXO5M+PmN2A33YxD4TaOY61xjFIM4CfSHv+At8=
github.com/micro/h2c v1.0.0 h1:ejw6MS5+WaUoMHRtqkVCCrrVzLMzOFEH52rEyd8Fl2I=
github.com/micro/go-rcache v0.2.0/go.mod h1:EoiTwbY2ubQ6lc3ScV+SnmKbelDzeFezDxPDvF8XDxw=
github.com/micro/h2c v1.0.0/go.mod h1:54sOOQW/GRlHhH43vKwOhUb+kHaXhVxR0d3CJhn9alE=
github.com/micro/mdns v0.0.0-20181201230301-9c3770d4057a/go.mod h1:SQG6o/94RinohLuB5noHSevg2Iqg2wXLDUn4lj2LWWo=
github.com/micro/mdns v0.1.0 h1:fuLybUsfynbigJmCot/54i+gwe0hpc/vtCMvWt2WfDI=
@@ -60,11 +65,13 @@ github.com/micro/mdns v0.1.0/go.mod h1:KJ0dW7KmicXU2BV++qkLlmHYcVv7/hHnbtguSWt9A
github.com/micro/util v0.1.0 h1:ghhF5KKRNlKMexzK+cWo6W6uRAZdKy1UKG/9O74NCYc=
github.com/micro/util v0.1.0/go.mod h1:MZgOs0nwxzv9k4xQo4fpF9IwZGF2O96F5/phP9X4/Sw=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.3 h1:1g0r1IvskvgL8rR+AcHzUA+oFmGcQlaIm4IqakufeMM=
github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.4 h1:rCMZsU2ScVSYcAsOXgmC6+AKOK+6pmQTOcw03nfwYV0=
github.com/miekg/dns v1.1.4/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0 h1:vKb8ShqSby24Yrqr/yDYkuFz8d0WUjys40rvnGC8aR0=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure v1.0.0 h1:ZkRJX1CyOoTkar7p/mLS5TZU4nJ1Rn/F8u9dGS02Q3Y=
github.com/mitchellh/hashstructure v1.0.0/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ=
@@ -77,25 +84,40 @@ github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/posener/complete v1.2.1/go.mod h1:6gapUrK/U1TAN7ciCoNRIdVC5sbdBTUh1DKN0g6uH7E=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664 h1:YbZJ76lQ1BqNhVe7dKTSB67wDrc2VPRR75IyGyyPDX8=
golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67 h1:ng3VDlRp5/DHpSWl02R4rM9I+8M2rhmsuLwAMmkLQWE=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 h1:ulvT7fqt0yHWzpJwI57MezWnYDVpCAYBVuYst/L+fAY=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006 h1:bfLnR+k0tq5Lqt6dflRLcZiz6UaXCMt3vhYJ1l4FQ80=
golang.org/x/net v0.0.0-20190206173232-65e2d4e15006/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd h1:HuTn7WObtcDo9uEEU7rEqL0jYthdXAmZ6PP+meazmaU=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc h1:WiYx1rIFmx8c0mXAFtv5D/mHyKe1+jmuP7PViuwqwuQ=
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952 h1:FDfvYgoVsA7TTZSbgiqjAbfPbK47CNHdWl3h/PJtii0=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190209173611-3b5209105503 h1:5SvYFrOM3W8Mexn9/oA44Ji7vhXAZQ9hiP+1Q/DMrWg=
golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190213121743-983097b1a8a3 h1:+KlxhGbYkFs8lMfwKn+2ojry1ID5eBSMXprS2u/wqCE=
golang.org/x/sys v0.0.0-20190213121743-983097b1a8a3/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@@ -122,6 +122,13 @@ func Transport(t transport.Transport) Option {
// Convenience options
// Address sets the address of the server
func Address(addr string) Option {
return func(o *Options) {
o.Server.Init(server.Address(addr))
}
}
// Name of the service
func Name(n string) Option {
return func(o *Options) {

View File

@@ -1,17 +0,0 @@
package gossip
import (
"context"
"github.com/micro/go-micro/registry"
)
// setRegistryOption returns a function to setup a context with given value
func setRegistryOption(k, v interface{}) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}

View File

@@ -1,17 +1,16 @@
// Package Gossip provides a gossip registry based on hashicorp/memberlist
// Package gossip provides a gossip registry based on hashicorp/memberlist
package gossip
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/golang/protobuf/proto"
@@ -35,6 +34,13 @@ const (
actionTypeSync
)
const (
nodeActionUnknown int32 = iota
nodeActionJoin
nodeActionLeave
nodeActionUpdate
)
func actionTypeString(t int32) string {
switch t {
case actionTypeCreate:
@@ -64,18 +70,45 @@ type delegate struct {
updates chan *update
}
type gossipRegistry struct {
queue *memberlist.TransmitLimitedQueue
updates chan *update
options registry.Options
member *memberlist.Memberlist
interval time.Duration
type event struct {
action int32
node string
}
type eventDelegate struct {
events chan *event
}
func (ed *eventDelegate) NotifyJoin(n *memberlist.Node) {
ed.events <- &event{action: nodeActionJoin, node: n.Address()}
}
func (ed *eventDelegate) NotifyLeave(n *memberlist.Node) {
ed.events <- &event{action: nodeActionLeave, node: n.Address()}
}
func (ed *eventDelegate) NotifyUpdate(n *memberlist.Node) {
ed.events <- &event{action: nodeActionUpdate, node: n.Address()}
}
type gossipRegistry struct {
queue *memberlist.TransmitLimitedQueue
updates chan *update
events chan *event
options registry.Options
member *memberlist.Memberlist
interval time.Duration
tcpInterval time.Duration
connectRetry bool
connectTimeout time.Duration
sync.RWMutex
services map[string][]*registry.Service
s sync.RWMutex
watchers map[string]chan *registry.Result
mtu int
addrs []string
members map[string]int32
done chan bool
}
type update struct {
@@ -84,10 +117,16 @@ type update struct {
sync chan *registry.Service
}
type updates struct {
sync.RWMutex
services map[uint64]*update
}
var (
// You should change this if using secure
DefaultSecret = []byte("micro-gossip-key") // exactly 16 bytes
ExpiryTick = time.Second * 5
ExpiryTick = time.Second * 1 // needs to be smaller than registry.RegisterTTL
MaxPacketSize = 512
)
func configure(g *gossipRegistry, opts ...registry.Option) error {
@@ -119,9 +158,9 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
}
// shutdown old member
if g.member != nil {
g.Stop()
}
g.Stop()
// new done chan
g.done = make(chan bool)
// replace addresses
curAddrs = newAddrs
@@ -129,19 +168,23 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// create a new default config
c := memberlist.DefaultLocalConfig()
// log to dev null
c.LogOutput = ioutil.Discard
// sane good default options
c.LogOutput = ioutil.Discard // log to /dev/null
c.PushPullInterval = 0 // disable expensive tcp push/pull
c.ProtocolVersion = 4 // suport latest stable features
if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil {
c = optConfig
// set config from options
if config, ok := g.options.Context.Value(configKey{}).(*memberlist.Config); ok && config != nil {
c = config
}
if hostport, ok := g.options.Context.Value(contextAddress{}).(string); ok {
host, port, err := net.SplitHostPort(hostport)
// set address
if address, ok := g.options.Context.Value(addressKey{}).(string); ok {
host, port, err := net.SplitHostPort(address)
if err == nil {
pn, err := strconv.Atoi(port)
p, err := strconv.Atoi(port)
if err == nil {
c.BindPort = pn
c.BindPort = p
}
c.BindAddr = host
}
@@ -150,12 +193,13 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
c.BindPort = 0
}
if hostport, ok := g.options.Context.Value(contextAdvertise{}).(string); ok {
host, port, err := net.SplitHostPort(hostport)
// set the advertise address
if advertise, ok := g.options.Context.Value(advertiseKey{}).(string); ok {
host, port, err := net.SplitHostPort(advertise)
if err == nil {
pn, err := strconv.Atoi(port)
p, err := strconv.Atoi(port)
if err == nil {
c.AdvertisePort = pn
c.AdvertisePort = p
}
c.AdvertiseAddr = host
}
@@ -169,7 +213,7 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// set a secret key if secure
if g.options.Secure {
k, ok := g.options.Context.Value(contextSecretKey{}).([]byte)
k, ok := g.options.Context.Value(secretKey{}).([]byte)
if !ok {
// use the default secret
k = DefaultSecret
@@ -177,6 +221,16 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
c.SecretKey = k
}
// set connect retry
if v, ok := g.options.Context.Value(connectRetryKey{}).(bool); ok && v {
g.connectRetry = true
}
// set connect timeout
if td, ok := g.options.Context.Value(connectTimeoutKey{}).(time.Duration); ok {
g.connectTimeout = td
}
// create a queue
queue := &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
@@ -191,27 +245,39 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
queue: queue,
}
if g.connectRetry {
c.Events = &eventDelegate{
events: g.events,
}
}
// create the memberlist
m, err := memberlist.Create(c)
if err != nil {
return err
}
// join the memberlist
// set internals
g.Lock()
if len(curAddrs) > 0 {
_, err := m.Join(curAddrs)
if err != nil {
return err
for _, addr := range curAddrs {
g.members[addr] = nodeActionUnknown
}
}
// set internals
g.tcpInterval = c.PushPullInterval
g.addrs = curAddrs
g.queue = queue
g.member = m
g.interval = c.GossipInterval
log.Logf("Registry Listening on %s", m.LocalNode().Address())
return nil
g.Unlock()
log.Logf("[gossip] Registry Listening on %s", m.LocalNode().Address())
// try connect
return g.connect(curAddrs)
}
func (*broadcast) UniqueBroadcast() {}
@@ -225,6 +291,9 @@ func (b *broadcast) Message() []byte {
if err != nil {
return nil
}
if l := len(up); l > MaxPacketSize {
log.Logf("[gossip] broadcast message size %d bigger then MaxPacketSize %d", l, MaxPacketSize)
}
return up
}
@@ -313,7 +382,6 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) {
if err := json.Unmarshal(buf, &services); err != nil {
return
}
for _, service := range services {
for _, srv := range service {
d.updates <- &update{
@@ -325,8 +393,57 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) {
}
}
func (g *gossipRegistry) connect(addrs []string) error {
if len(addrs) == 0 {
return nil
}
timeout := make(<-chan time.Time)
if g.connectTimeout > 0 {
timeout = time.After(g.connectTimeout)
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
fn := func() (int, error) {
return g.member.Join(addrs)
}
// don't wait for first try
if _, err := fn(); err == nil {
return nil
}
// wait loop
for {
select {
// context closed
case <-g.options.Context.Done():
return nil
// call close, don't wait anymore
case <-g.done:
return nil
// in case of timeout fail with a timeout error
case <-timeout:
return fmt.Errorf("[gossip] connect timeout %v", g.addrs)
// got a tick, try to connect
case <-ticker.C:
if _, err := fn(); err == nil {
log.Logf("[gossip] connect success for %v", g.addrs)
return nil
} else {
log.Logf("[gossip] connect failed for %v", g.addrs)
}
}
}
return nil
}
func (g *gossipRegistry) publish(action string, services []*registry.Service) {
g.s.RLock()
g.RLock()
for _, sub := range g.watchers {
go func(sub chan *registry.Result) {
for _, service := range services {
@@ -334,7 +451,7 @@ func (g *gossipRegistry) publish(action string, services []*registry.Service) {
}
}(sub)
}
g.s.RUnlock()
g.RUnlock()
}
func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
@@ -343,66 +460,108 @@ func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
id := uuid.New().String()
g.s.Lock()
g.Lock()
g.watchers[id] = next
g.s.Unlock()
g.Unlock()
go func() {
<-exit
g.s.Lock()
g.Lock()
delete(g.watchers, id)
close(next)
g.s.Unlock()
g.Unlock()
}()
return next, exit
}
func (g *gossipRegistry) wait() {
ctx := g.options.Context
if c, ok := ctx.Value(contextContext{}).(context.Context); ok && c != nil {
ctx = c
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
func (g *gossipRegistry) Stop() error {
select {
// wait on kill signal
case <-ch:
// wait on context cancel
case <-ctx.Done():
case <-g.done:
return nil
default:
close(g.done)
g.Lock()
if g.member != nil {
g.member.Leave(g.interval * 2)
g.member.Shutdown()
g.member = nil
}
g.Unlock()
}
g.Stop()
return nil
}
func (g *gossipRegistry) Stop() {
g.member.Leave(g.interval * 2)
g.member.Shutdown()
// connectLoop attempts to reconnect to the memberlist
func (g *gossipRegistry) connectLoop() {
// try every second
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-g.done:
return
case <-g.options.Context.Done():
g.Stop()
return
case <-ticker.C:
var addrs []string
g.RLock()
// only process if we have a memberlist
if g.member == nil {
g.RUnlock()
continue
}
// self
local := g.member.LocalNode().Address()
// operate on each member
for node, action := range g.members {
switch action {
// process leave event
case nodeActionLeave:
// don't process self
if node == local {
continue
}
addrs = append(addrs, node)
}
}
g.RUnlock()
// connect to all the members
// TODO: only connect to new members
if len(addrs) > 0 {
g.connect(addrs)
}
}
}
}
func (g *gossipRegistry) run() {
var mtx sync.Mutex
updates := map[uint64]*update{}
func (g *gossipRegistry) expiryLoop(updates *updates) {
ticker := time.NewTicker(ExpiryTick)
defer ticker.Stop()
// expiry loop
go func() {
t := time.NewTicker(ExpiryTick)
defer t.Stop()
for _ = range t.C {
for {
select {
case <-g.done:
return
case <-ticker.C:
now := uint64(time.Now().UnixNano())
mtx.Lock()
updates.Lock()
// process all the updates
for k, v := range updates {
for k, v := range updates.services {
// check if expiry time has passed
if d := (v.Update.Expires); d < now {
// delete from records
delete(updates, k)
delete(updates.services, k)
// set to delete
v.Update.Action = actionTypeDelete
// fire a new update
@@ -410,9 +569,44 @@ func (g *gossipRegistry) run() {
}
}
mtx.Unlock()
updates.Unlock()
}
}()
}
}
// process member events
func (g *gossipRegistry) eventLoop() {
for {
select {
// return when done
case <-g.done:
return
case ev := <-g.events:
// TODO: nonblocking update
g.Lock()
if _, ok := g.members[ev.node]; ok {
g.members[ev.node] = ev.action
}
g.Unlock()
}
}
}
func (g *gossipRegistry) run() {
updates := &updates{
services: make(map[uint64]*update),
}
// expiry loop
go g.expiryLoop(updates)
// event loop
go g.eventLoop()
// connect loop
if g.connectRetry {
go g.connectLoop()
}
// process the updates
for u := range g.updates {
@@ -434,9 +628,9 @@ func (g *gossipRegistry) run() {
if u.Update.Expires > 0 {
// create a hash of this service
if hash, err := hashstructure.Hash(u.Service, nil); err == nil {
mtx.Lock()
updates[hash] = u
mtx.Unlock()
updates.Lock()
updates.services[hash] = u
updates.Unlock()
}
}
case actionTypeDelete:
@@ -455,9 +649,9 @@ func (g *gossipRegistry) run() {
// delete from expiry checks
if hash, err := hashstructure.Hash(u.Service, nil); err == nil {
mtx.Lock()
delete(updates, hash)
mtx.Unlock()
updates.Lock()
delete(updates.services, hash)
updates.Unlock()
}
case actionTypeSync:
// no sync channel provided
@@ -512,6 +706,10 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register
o(&options)
}
if options.TTL == 0 && g.tcpInterval == 0 {
return fmt.Errorf("Require register TTL or interval for memberlist.Config")
}
up := &pb.Update{
Expires: uint64(time.Now().Add(options.TTL).UnixNano()),
Action: actionTypeCreate,
@@ -599,27 +797,28 @@ func (g *gossipRegistry) String() string {
}
func NewRegistry(opts ...registry.Option) registry.Registry {
gossip := &gossipRegistry{
g := &gossipRegistry{
options: registry.Options{
Context: context.Background(),
},
done: make(chan bool),
events: make(chan *event, 100),
updates: make(chan *update, 100),
services: make(map[string][]*registry.Service),
watchers: make(map[string]chan *registry.Result),
members: make(map[string]int32),
}
// run the updater
go gossip.run()
go g.run()
// configure the gossiper
if err := configure(gossip, opts...); err != nil {
log.Fatalf("Error configuring registry: %v", err)
if err := configure(g, opts...); err != nil {
log.Fatalf("[gossip] Error configuring registry: %v", err)
}
// wait for setup
<-time.After(gossip.interval * 2)
<-time.After(g.interval * 2)
go gossip.wait()
return gossip
return g
}

View File

@@ -1,7 +1,6 @@
package gossip_test
package gossip
import (
"context"
"os"
"sync"
"testing"
@@ -9,95 +8,83 @@ import (
"github.com/google/uuid"
"github.com/hashicorp/memberlist"
micro "github.com/micro/go-micro"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/gossip"
pb "github.com/micro/go-micro/registry/gossip/proto"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/server"
)
var (
r1 registry.Registry
r2 registry.Registry
mu sync.Mutex
)
func newConfig() *memberlist.Config {
wc := memberlist.DefaultLANConfig()
wc.DisableTcpPings = false
wc.GossipVerifyIncoming = false
wc.GossipVerifyOutgoing = false
wc.EnableCompression = false
wc.LogOutput = os.Stderr
wc.ProtocolVersion = 4
wc.Name = uuid.New().String()
return wc
func newMemberlistConfig() *memberlist.Config {
mc := memberlist.DefaultLANConfig()
mc.DisableTcpPings = false
mc.GossipVerifyIncoming = false
mc.GossipVerifyOutgoing = false
mc.EnableCompression = false
mc.PushPullInterval = 3 * time.Second
mc.LogOutput = os.Stderr
mc.ProtocolVersion = 4
mc.Name = uuid.New().String()
return mc
}
func newRegistries() {
mu.Lock()
defer mu.Unlock()
if r1 != nil && r2 != nil {
return
func newRegistry(opts ...registry.Option) registry.Registry {
options := []registry.Option{
ConnectRetry(true),
ConnectTimeout(60 * time.Second),
}
wc1 := newConfig()
wc2 := newConfig()
rops1 := []registry.Option{gossip.Config(wc1), gossip.Address("127.0.0.1:54321")}
rops2 := []registry.Option{gossip.Config(wc2), gossip.Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321")}
r1 = gossip.NewRegistry(rops1...) // first started without members
r2 = gossip.NewRegistry(rops2...) // second started joining
options = append(options, opts...)
r := NewRegistry(options...)
return r
}
func TestRegistryBroadcast(t *testing.T) {
newRegistries()
func TestGossipRegistryBroadcast(t *testing.T) {
mc1 := newMemberlistConfig()
r1 := newRegistry(Config(mc1), Address("127.0.0.1:54321"))
svc1 := &registry.Service{Name: "r1-svc", Version: "0.0.0.1"}
svc2 := &registry.Service{Name: "r2-svc", Version: "0.0.0.2"}
mc2 := newMemberlistConfig()
r2 := newRegistry(Config(mc2), Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321"))
<-time.After(1 * time.Second)
if err := r1.Register(svc1); err != nil {
defer r1.(*gossipRegistry).Stop()
defer r2.(*gossipRegistry).Stop()
svc1 := &registry.Service{Name: "service.1", Version: "0.0.0.1"}
svc2 := &registry.Service{Name: "service.2", Version: "0.0.0.2"}
if err := r1.Register(svc1, registry.RegisterTTL(10*time.Second)); err != nil {
t.Fatal(err)
}
<-time.After(1 * time.Second)
if err := r2.Register(svc2); err != nil {
if err := r2.Register(svc2, registry.RegisterTTL(10*time.Second)); err != nil {
t.Fatal(err)
}
var found bool
svcs, err := r1.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "r2-svc" {
if svc.Name == "service.2" {
found = true
}
}
if !found {
t.Fatalf("r2-svc not found in r1, broadcast not work")
t.Fatalf("[gossip registry] service.2 not found in r1, broadcast not work")
}
found = false
svcs, err = r2.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "r1-svc" {
if svc.Name == "service.1" {
found = true
}
}
if !found {
t.Fatalf("r1-svc not found in r2, broadcast not work")
t.Fatalf("[gossip registry] broadcast failed: service.1 not found in r2")
}
if err := r1.Deregister(svc1); err != nil {
@@ -108,38 +95,72 @@ func TestRegistryBroadcast(t *testing.T) {
}
}
func TestGossipRegistryRetry(t *testing.T) {
mc1 := newMemberlistConfig()
r1 := newRegistry(Config(mc1), Address("127.0.0.1:54321"))
func TestServerRegistry(t *testing.T) {
newRegistries()
mc2 := newMemberlistConfig()
r2 := newRegistry(Config(mc2), Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321"))
_, err := newServer("s1", r1, t)
defer r1.(*gossipRegistry).Stop()
defer r2.(*gossipRegistry).Stop()
svc1 := &registry.Service{Name: "service.1", Version: "0.0.0.1"}
svc2 := &registry.Service{Name: "service.2", Version: "0.0.0.2"}
var mu sync.Mutex
ch := make(chan struct{})
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
go func() {
for {
select {
case <-ticker.C:
mu.Lock()
if r1 != nil {
r1.Register(svc1, registry.RegisterTTL(2*time.Second))
}
if r2 != nil {
r2.Register(svc2, registry.RegisterTTL(2*time.Second))
}
if ch != nil {
close(ch)
ch = nil
}
mu.Unlock()
}
}
}()
<-ch
var found bool
svcs, err := r2.ListServices()
if err != nil {
t.Fatal(err)
}
_, err = newServer("s2", r2, t)
if err != nil {
t.Fatal(err)
}
svcs, err := r1.ListServices()
if err != nil {
t.Fatal(err)
}
if len(svcs) < 1 {
t.Fatalf("r1 svcs unknown %#+v\n", svcs)
}
found := false
for _, svc := range svcs {
if svc.Name == "s2" {
if svc.Name == "service.1" {
found = true
}
}
if !found {
t.Fatalf("r1 does not have s2, broadcast not work")
t.Fatalf("[gossip registry] broadcast failed: service.1 not found in r2")
}
if err = r1.(*gossipRegistry).Stop(); err != nil {
t.Fatalf("[gossip registry] failed to stop registry: %v", err)
}
mu.Lock()
r1 = nil
mu.Unlock()
<-time.After(3 * time.Second)
found = false
svcs, err = r2.ListServices()
if err != nil {
@@ -147,53 +168,47 @@ func TestServerRegistry(t *testing.T) {
}
for _, svc := range svcs {
if svc.Name == "s1" {
if svc.Name == "service.1" {
found = true
}
}
if found {
t.Fatalf("[gossip registry] service.1 found in r2")
}
if tr := os.Getenv("TRAVIS"); len(tr) > 0 {
t.Logf("[gossip registry] skip test on travis")
t.Skip()
return
}
r1 = newRegistry(Config(mc1), Address("127.0.0.1:54321"))
<-time.After(2 * time.Second)
found = false
svcs, err = r2.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "service.1" {
found = true
}
}
if !found {
t.Fatalf("r2 does not have s1, broadcast not work")
t.Fatalf("[gossip registry] connect retry failed: service.1 not found in r2")
}
}
type testServer struct{}
func (*testServer) Test(ctx context.Context, req *pb.Update, rsp *pb.Update) error {
return nil
}
func newServer(n string, r registry.Registry, t *testing.T) (micro.Service, error) {
h := &testServer{}
var wg sync.WaitGroup
wg.Add(1)
sopts := []server.Option{
server.Name(n),
server.Registry(r),
if err := r1.Deregister(svc1); err != nil {
t.Fatal(err)
}
if err := r2.Deregister(svc2); err != nil {
t.Fatal(err)
}
copts := []client.Option{
client.Selector(selector.NewSelector(selector.Registry(r))),
client.Registry(r),
}
srv := micro.NewService(
micro.Server(server.NewServer(sopts...)),
micro.Client(client.NewClient(copts...)),
micro.AfterStart(func() error {
wg.Done()
return nil
}),
)
srv.Server().NewHandler(h)
go func() {
t.Fatal(srv.Run())
}()
wg.Wait()
return srv, nil
r1.(*gossipRegistry).Stop()
r2.(*gossipRegistry).Stop()
}

View File

@@ -2,45 +2,57 @@ package gossip
import (
"context"
"time"
"github.com/hashicorp/memberlist"
"github.com/micro/go-micro/registry"
)
type contextSecretKey struct{}
type secretKey struct{}
type addressKey struct{}
type configKey struct{}
type advertiseKey struct{}
type connectTimeoutKey struct{}
type connectRetryKey struct{}
// helper for setting registry options
func setRegistryOption(k, v interface{}) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}
// Secret specifies an encryption key. The value should be either
// 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256.
func Secret(k []byte) registry.Option {
return setRegistryOption(contextSecretKey{}, k)
return setRegistryOption(secretKey{}, k)
}
type contextAddress struct{}
// Address to bind to - host:port
func Address(a string) registry.Option {
return setRegistryOption(contextAddress{}, a)
return setRegistryOption(addressKey{}, a)
}
type contextConfig struct{}
// Config allow to inject a *memberlist.Config struct for configuring gossip
// Config sets *memberlist.Config for configuring gossip
func Config(c *memberlist.Config) registry.Option {
return setRegistryOption(contextConfig{}, c)
return setRegistryOption(configKey{}, c)
}
type contextAdvertise struct{}
// The address to advertise for other gossip members - host:port
// The address to advertise for other gossip members to connect to - host:port
func Advertise(a string) registry.Option {
return setRegistryOption(contextAdvertise{}, a)
return setRegistryOption(advertiseKey{}, a)
}
type contextContext struct{}
// Context specifies a context for the registry.
// Can be used to signal shutdown of the registry.
// Can be used for extra option values.
func Context(ctx context.Context) registry.Option {
return setRegistryOption(contextContext{}, ctx)
// ConnectTimeout sets the registry connect timeout. Use -1 to specify infinite timeout
func ConnectTimeout(td time.Duration) registry.Option {
return setRegistryOption(connectTimeoutKey{}, td)
}
// ConnectRetry enables reconnect to registry then connection closed,
// use with ConnectTimeout to specify how long retry
func ConnectRetry(v bool) registry.Option {
return setRegistryOption(connectRetryKey{}, v)
}

View File

@@ -11,7 +11,6 @@ type Options struct {
Timeout time.Duration
Secure bool
TLSConfig *tls.Config
// Other options for implementations of the interface
// can be stored in a context
Context context.Context

View File

@@ -1,359 +1,25 @@
package selector
import (
"sync"
"time"
"github.com/micro/go-log"
"github.com/micro/go-micro/registry"
"github.com/micro/go-rcache"
)
type registrySelector struct {
so Options
ttl time.Duration
// registry cache
sync.RWMutex
cache map[string][]*registry.Service
ttls map[string]time.Time
watched map[string]bool
// used to close or reload watcher
reload chan bool
exit chan bool
so Options
rc rcache.Cache
}
var (
DefaultTTL = time.Minute
)
// isValid checks if the service is valid
func (c *registrySelector) isValid(services []*registry.Service, ttl time.Time) bool {
// no services exist
if len(services) == 0 {
return false
}
// ttl is invalid
if ttl.IsZero() {
return false
}
// time since ttl is longer than timeout
if time.Since(ttl) > c.ttl {
return false
}
// ok
return true
}
func (c *registrySelector) quit() bool {
select {
case <-c.exit:
return true
default:
return false
}
}
// cp copies a service. Because we're caching handing back pointers would
// create a race condition, so we do this instead
// its fast enough
func (c *registrySelector) cp(current []*registry.Service) []*registry.Service {
var services []*registry.Service
for _, service := range current {
// copy service
s := new(registry.Service)
*s = *service
// copy nodes
var nodes []*registry.Node
for _, node := range service.Nodes {
n := new(registry.Node)
*n = *node
nodes = append(nodes, n)
}
s.Nodes = nodes
// copy endpoints
var eps []*registry.Endpoint
for _, ep := range service.Endpoints {
e := new(registry.Endpoint)
*e = *ep
eps = append(eps, e)
}
s.Endpoints = eps
// append service
services = append(services, s)
}
return services
}
func (c *registrySelector) del(service string) {
delete(c.cache, service)
delete(c.ttls, service)
}
func (c *registrySelector) get(service string) ([]*registry.Service, error) {
// read lock
c.RLock()
// check the cache first
services := c.cache[service]
// get cache ttl
ttl := c.ttls[service]
// got services && within ttl so return cache
if c.isValid(services, ttl) {
// make a copy
cp := c.cp(services)
// unlock the read
c.RUnlock()
// return servics
return cp, nil
}
// get does the actual request for a service and cache it
get := func(service string) ([]*registry.Service, error) {
// ask the registry
services, err := c.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// cache results
c.Lock()
c.set(service, c.cp(services))
c.Unlock()
return services, nil
}
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
}
// unlock the read lock
c.RUnlock()
// get and return services
return get(service)
}
func (c *registrySelector) set(service string, services []*registry.Service) {
c.cache[service] = services
c.ttls[service] = time.Now().Add(c.ttl)
}
func (c *registrySelector) update(res *registry.Result) {
if res == nil || res.Service == nil {
return
}
c.Lock()
defer c.Unlock()
services, ok := c.cache[res.Service.Name]
if !ok {
// we're not going to cache anything
// unless there was already a lookup
return
}
if len(res.Service.Nodes) == 0 {
switch res.Action {
case "delete":
c.del(res.Service.Name)
}
return
}
// existing service found
var service *registry.Service
var index int
for i, s := range services {
if s.Version == res.Service.Version {
service = s
index = i
func (c *registrySelector) newRCache() rcache.Cache {
ropts := []rcache.Option{}
if c.so.Context != nil {
if t, ok := c.so.Context.Value("selector_ttl").(time.Duration); ok {
ropts = append(ropts, rcache.WithTTL(t))
}
}
switch res.Action {
case "create", "update":
if service == nil {
c.set(res.Service.Name, append(services, res.Service))
return
}
// append old nodes to new service
for _, cur := range service.Nodes {
var seen bool
for _, node := range res.Service.Nodes {
if cur.Id == node.Id {
seen = true
break
}
}
if !seen {
res.Service.Nodes = append(res.Service.Nodes, cur)
}
}
services[index] = res.Service
c.set(res.Service.Name, services)
case "delete":
if service == nil {
return
}
var nodes []*registry.Node
// filter cur nodes to remove the dead one
for _, cur := range service.Nodes {
var seen bool
for _, del := range res.Service.Nodes {
if del.Id == cur.Id {
seen = true
break
}
}
if !seen {
nodes = append(nodes, cur)
}
}
// still got nodes, save and return
if len(nodes) > 0 {
service.Nodes = nodes
services[index] = service
c.set(service.Name, services)
return
}
// zero nodes left
// only have one thing to delete
// nuke the thing
if len(services) == 1 {
c.del(service.Name)
return
}
// still have more than 1 service
// check the version and keep what we know
var srvs []*registry.Service
for _, s := range services {
if s.Version != service.Version {
srvs = append(srvs, s)
}
}
// save
c.set(service.Name, srvs)
}
}
// run starts the cache watcher loop
// it creates a new watcher if there's a problem
// reloads the watcher if Init is called
// and returns when Close is called
func (c *registrySelector) run(name string) {
// set watcher
c.Lock()
c.watched[name] = true
c.Unlock()
// delete watcher on exit
defer func() {
c.Lock()
delete(c.watched, name)
c.Unlock()
}()
// error counter
var cerr int
for {
// exit early if already dead
if c.quit() {
return
}
// create new watcher
w, err := c.so.Registry.Watch(
registry.WatchService(name),
)
if err != nil {
if c.quit() {
return
}
cerr++
if cerr > 3 {
log.Log(err)
cerr = 0
}
time.Sleep(time.Second)
continue
}
// watch for events
if err := c.watch(w); err != nil {
if c.quit() {
return
}
cerr++
if cerr > 3 {
cerr = 0
log.Log(err)
}
continue
}
// reset err counter
cerr = 0
}
}
// watch loops the next event and calls update
// it returns if there's an error
func (c *registrySelector) watch(w registry.Watcher) error {
defer w.Stop()
// reload chan
reload := make(chan bool, 1)
// manage this loop
go func() {
// wait for exit or reload signal
select {
case <-c.exit:
case <-c.reload:
reload <- true
}
// stop the watcher
w.Stop()
}()
for {
res, err := w.Next()
if err != nil {
select {
case <-reload:
return nil
default:
return err
}
}
c.update(res)
}
return rcache.New(c.so.Registry, ropts...)
}
func (c *registrySelector) Init(opts ...Option) error {
@@ -361,15 +27,8 @@ func (c *registrySelector) Init(opts ...Option) error {
o(&c.so)
}
// reload the watcher
go func() {
select {
case <-c.exit:
return
default:
c.reload <- true
}
}()
c.rc.Stop()
c.rc = c.newRCache()
return nil
}
@@ -390,7 +49,7 @@ func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, e
// get the service
// try the cache first
// if that fails go directly to the registry
services, err := c.get(service)
services, err := c.rc.GetService(service)
if err != nil {
return nil, err
}
@@ -416,17 +75,8 @@ func (c *registrySelector) Reset(service string) {
// Close stops the watcher and destroys the cache
func (c *registrySelector) Close() error {
c.Lock()
c.cache = make(map[string][]*registry.Service)
c.watched = make(map[string]bool)
c.Unlock()
c.rc.Stop()
select {
case <-c.exit:
return nil
default:
close(c.exit)
}
return nil
}
@@ -447,21 +97,10 @@ func NewSelector(opts ...Option) Selector {
sopts.Registry = registry.DefaultRegistry
}
ttl := DefaultTTL
if sopts.Context != nil {
if t, ok := sopts.Context.Value("selector_ttl").(time.Duration); ok {
ttl = t
}
s := &registrySelector{
so: sopts,
}
s.rc = s.newRCache()
return &registrySelector{
so: sopts,
ttl: ttl,
watched: make(map[string]bool),
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1),
exit: make(chan bool),
}
return s
}