Compare commits

..

173 Commits

Author SHA1 Message Date
Asim Aslam
98bb4a69c2 Merge pull request #325 from micro/accept_loop
make accept loop
2018-11-15 21:23:16 +00:00
Asim Aslam
e69413b763 add continue 2018-11-15 21:13:33 +00:00
Asim Aslam
45f18042b7 make accept loop 2018-11-15 19:55:13 +00:00
Asim Aslam
0672b051cc Add Local/Remote ip to metadata 2018-11-14 20:27:58 +00:00
Asim Aslam
3c496720cc Merge pull request #324 from micro/ip
Ip
2018-11-14 20:15:56 +00:00
Asim Aslam
881cb570d5 reorder 2018-11-14 19:49:04 +00:00
Asim Aslam
c6a2c8de6c add local/remote to testsocket 2018-11-14 19:45:46 +00:00
Asim Aslam
71bacf6991 add local/remote ip to socket 2018-11-14 19:41:13 +00:00
Asim Aslam
a0b257b572 remove line 2018-11-14 15:19:23 +00:00
Asim Aslam
a082c151f0 remove example section 2018-11-14 15:18:47 +00:00
Asim Aslam
302ab42a97 strip down readme 2018-11-14 15:18:13 +00:00
Asim Aslam
531d4dd24a Merge pull request #322 from mgrachev/fix-linter-issues
Fix some linter issues
2018-11-13 09:38:19 +00:00
Asim Aslam
1c401a852e Merge pull request #321 from mgrachev/errors-check
Add errors check
2018-11-13 09:36:54 +00:00
Mikhail Grachev
25e6dcc9b6 Fix some linter issues 2018-11-13 11:57:42 +03:00
Mikhail Grachev
4006d9f102 Add errors check 2018-11-13 11:56:21 +03:00
Asim Aslam
4c821baab4 go fmt 2018-11-03 12:17:11 +00:00
Asim Aslam
c8a35afc92 Merge pull request #313 from lovelly/master
Fix tcp check no ttl error
2018-11-03 12:16:56 +00:00
Asim Aslam
54f67db275 Merge pull request #289 from micro/http2
http2 support
2018-11-03 12:07:23 +00:00
lovelly
fd04722706 Fix tcp check no ttl error 2018-10-09 10:40:24 +08:00
Asim Aslam
4cee1f19f6 Merge pull request #309 from fireyang/master
fix rpc client call WARNING: DATA RACE
2018-09-20 07:39:45 +01:00
fireyang
ef8b5e28b0 fix rpc client call WARNING: DATA RACE 2018-09-20 10:08:00 +08:00
Asim Aslam
818f150b25 Merge pull request #308 from fireyang/master
fix bug: loop variable i captured by func literal
2018-09-19 15:13:19 +01:00
fireyang
446d3fc72e fix bug: loop variable i captured by func literal 2018-09-19 21:58:20 +08:00
Asim Aslam
240052246f Merge pull request #306 from DexterHD/fix-go-config-panic
fix bug with go-config panic
2018-09-13 19:26:57 +01:00
Anton Kucherov
156a51ab10 fix bug with go-config panic
See: https://github.com/micro/go-config/issues/49
2018-09-13 19:02:08 +03:00
Asim Aslam
52a4beb072 Merge pull request #305 from ahmadnurus/add_method_not_allowed_error
errors: Added 405 Method Not Allowed helper function
2018-09-13 07:39:42 +01:00
ahmadnurus
395d70cf01 errors: Added 405 Method Not Allowed helper function 2018-09-12 21:42:34 +07:00
Asim Aslam
3732dc2f42 bump travis 2018-08-28 16:00:04 +01:00
Asim Aslam
9d3cb65daa set broker address on Init 2018-08-18 17:28:58 +01:00
Asim Aslam
a0d3917832 Merge pull request #292 from micro/init
Add Init to all things, use init in cmd package to initialise
2018-08-09 18:30:36 +01:00
Asim Aslam
9968c7d007 Add Init to all things, use init in cmd package to initialise 2018-08-08 18:57:29 +01:00
Asim Aslam
68f5e71153 Merge pull request #290 from micro/connect
Support connect native registration
2018-08-06 17:20:37 +01:00
Asim Aslam
af328ee7b4 Support connect native registration 2018-08-06 17:12:34 +01:00
Asim Aslam
eebaa64d8c phase 1 2018-07-29 10:55:46 +01:00
Asim Aslam
88505388c1 Add verbosity to errors 2018-07-26 09:33:50 +01:00
Asim Aslam
8a778644cf Merge pull request #281 from micro/retry
retry only on timeout or internal server error
2018-07-22 17:52:12 +01:00
Asim Aslam
d3a76e646a retry only on timeout or internal server error 2018-07-22 17:41:58 +01:00
Asim Aslam
cfa824bc5f Merge pull request #280 from jiyeyuran/master
Allow client_retries to be 0
2018-07-19 23:24:57 -07:00
武新飞
39be61685c client_retries can be 0 2018-07-20 14:20:23 +08:00
Asim Aslam
ac2106ced7 strip deadline from stream 2018-07-17 16:39:07 -07:00
Asim Aslam
1b4f7d8a68 a stream should not timeout 2018-07-17 16:32:35 -07:00
Asim Aslam
5eb2e79b86 go fmt 2018-07-17 16:31:09 -07:00
Asim Aslam
a2eff9918e Merge pull request #269 from Ak-Army/master
handle function in mock response
2018-06-13 16:54:10 +01:00
Hunyadvári Péter
52a470532d handle function in mock response 2018-06-13 17:46:30 +02:00
Asim Aslam
cd9441fafb Merge pull request #268 from jasimmk/fix-connect-lock
Fixing httpBroker dead lock; If publish is called from a subscription #267
2018-06-13 16:21:24 +01:00
Jasim Muhammed
356cf82af5 Fixing httpBroker dead lock; If publish is called from a subscription 2018-06-13 10:28:39 +04:00
Asim Aslam
5372707d0e Strip flag setting 2018-05-30 11:49:50 +01:00
Asim Aslam
a1deb5c44e Merge pull request #264 from micro/register
set register ttl and interval by default
2018-05-30 11:31:13 +01:00
Asim Aslam
956b1c6867 set register ttl and interval by default 2018-05-29 12:28:55 +01:00
Asim Aslam
55aca8b0bf Merge pull request #262 from micro/retries
Retry requests
2018-05-29 11:52:47 +01:00
Asim Aslam
ba8582a47a change retries to actually mean retries 2018-05-28 16:01:04 +01:00
Asim Aslam
f409468ccd Merge branch 'master' of github.com:micro/go-micro 2018-05-28 15:51:52 +01:00
Asim Aslam
d982225a54 restructure test 2018-05-28 15:40:28 +01:00
Asim Aslam
217190c4d6 Merge pull request #261 from micro/pool
Set the default pool size to 1
2018-05-26 09:58:16 +01:00
Asim Aslam
a56e97b47d Change waitgroup processing 2018-05-26 09:41:41 +01:00
Asim Aslam
b4f47b1cc9 Set the default pool size to 1 2018-05-26 09:10:29 +01:00
Asim Aslam
070cebd605 Merge pull request #260 from elebore/master
just update the pool configuration of rpcClient  if the options changed
2018-05-26 09:03:16 +01:00
bogle
541e894507 just update the pool configuration if the options changed, because recreating the pool,existed idleconnection, if any, will be dropped without closing 2018-05-26 15:38:41 +08:00
Asim Aslam
c666558f8c make the broker/transport listen on new addr when stop/started with addr :0 2018-05-25 15:19:25 +01:00
Asim Aslam
6444b7e24c context cancellation is not required 2018-05-25 15:03:15 +01:00
Asim Aslam
023245a7ba shutdown broker once done 2018-05-25 14:43:32 +01:00
Asim Aslam
2a2ad553a1 reorder testing functions 2018-05-25 14:39:50 +01:00
Asim Aslam
909e13a24a Merge pull request #254 from micro/message
add message options
2018-05-10 17:42:46 +01:00
Asim Aslam
b17a802675 update mock 2018-05-10 17:39:13 +01:00
Asim Aslam
c3c0543733 add message options 2018-05-10 17:33:54 +01:00
Asim Aslam
b39ec4472c Return subscriber errors 2018-04-26 10:47:13 +01:00
Asim Aslam
b33489e481 update readme 2018-04-25 16:03:22 +01:00
Asim Aslam
8fb5e20a22 Merge pull request #248 from micro/rework
Rework Interfaces
2018-04-17 11:25:25 +01:00
Asim Aslam
0315b4480f revert some changes 2018-04-17 11:00:22 +01:00
Asim Aslam
ccbc1b9cf3 Fix broker registry issue 2018-04-17 08:30:36 +01:00
Asim Aslam
19fdfba0bf move wrapper files 2018-04-14 19:24:17 +01:00
Asim Aslam
d00ac200dd remove registry and transport default funcs 2018-04-14 18:43:54 +01:00
Asim Aslam
173f7107e2 remove broker default funcs 2018-04-14 18:26:54 +01:00
Asim Aslam
d00d76bf7c Move publication to message 2018-04-14 18:21:02 +01:00
Asim Aslam
65068e8b82 rename Streamer to Stream 2018-04-14 18:15:09 +01:00
Asim Aslam
c2cfe5310c Rework client interface 2018-04-14 18:06:52 +01:00
Asim Aslam
07068379c6 remove remote func methods 2018-04-14 16:16:58 +01:00
Asim Aslam
528b5f58de update sponsor area 2018-04-12 12:09:36 +01:00
Asim Aslam
378af01f77 update readme 2018-04-08 15:20:10 +01:00
Asim Aslam
c317547e4d bump travis 2018-04-08 12:53:57 +01:00
Asim Aslam
e55437698b misc moved to util 2018-04-08 12:37:45 +01:00
Asim Aslam
e365cad930 Merge pull request #245 from micro/register
add flags for register ttl and interval
2018-04-06 14:14:11 +01:00
Asim Aslam
56735b4427 add flags for register ttl and interval 2018-04-06 14:03:39 +01:00
Asim Aslam
73e22eb5b1 gofmt 2018-04-06 14:03:00 +01:00
Asim Aslam
c04b974311 nitpick the readme 2018-04-05 13:50:10 +01:00
Asim Aslam
75be57d6e4 syntax highlight code 2018-03-22 17:32:16 +00:00
Asim Aslam
270e9118c4 nitpick 2018-03-22 16:46:34 +00:00
Asim Aslam
5d3d61855c nitpick 2018-03-22 16:44:40 +00:00
Asim Aslam
7e0ee9ec08 include pubsub in the readme 2018-03-22 16:43:57 +00:00
Asim Aslam
2ae4214215 Merge pull request #238 from myabuyllc/registry-tcp-check
Add option to enable TCP check with Consul registry
2018-03-21 19:07:56 +00:00
Asim Aslam
edaa0a0719 Merge pull request #240 from Leon2012/master
fix bug #239
2018-03-21 18:54:52 +00:00
Shulhan
44b934d458 registry: rename context key "consul_register_tcp_check" to "consul_tcp_check" 2018-03-21 21:57:04 +07:00
Shulhan
65a90f5a21 registry.Register: use local variable to get context value 2018-03-21 18:18:48 +07:00
Shulhan
1eb4398b6c registry/consul: rename "RegisterTCPCheck" to "TCPCheck" 2018-03-21 18:17:56 +07:00
leon.peng
9b99d50396 fix bug #239 2018-03-21 03:17:38 +00:00
Shulhan
68ab671bd0 Use registry.options.Context to set Consul TCP check option 2018-03-19 20:34:56 +07:00
Shulhan
f4cdfaf27f Fix TCP address and port on service check registration 2018-03-19 20:34:12 +07:00
Asim Aslam
d486125d07 update readme 2018-03-19 10:21:46 +00:00
Shulhan
1599d717af Add option to enable TCP check with Consul registry
One disadvantage of using TTL based health check is the high network
traffic between Consul agent (either between servers, or between server
and client).

In order for the services considered alive by Consul, microservices must
send an update TTL to Consul every n seconds (currently 30 seconds).

Here is the explanation about TTL check from Consul documentation [1]

    Time to Live (TTL) - These checks retain their last known state for a
    given TTL. The state of the check must be updated periodically over
    the HTTP interface. If an external system fails to update the status
    within a given TTL, the check is set to the failed state. This
    mechanism, conceptually similar to a dead man's switch, relies on the
    application to directly report its health. For example, a healthy app
    can periodically PUT a status update to the HTTP endpoint; if the app
    fails, the TTL will expire and the health check enters a critical
    state. The endpoints used to update health information for a given
    check are the pass endpoint and the fail endpoint. TTL checks also
    persist their last known status to disk. This allows the Consul agent
    to restore the last known status of the check across restarts.
    Persisted check status is valid through the end of the TTL from the
    time of the last check.


Hint:

    TTL checks also persist their last known status to disk. This allows
    the Consul agent to restore the last known status of the check
    across restarts.

When microservices update the TTL, Consul will write to disk. Writing to
disk means all other slaves need to replicate it, which means master need
to inform other standby Consul to pull the new catalog. Hence, the
increased traffic.

More information about this issue can be viewed at Consul mailing list [2].

[1] https://www.consul.io/docs/agent/checks.html
[2] https://groups.google.com/forum/#!topic/consul-tool/84h7qmCCpjg
2018-03-14 19:40:59 +07:00
Asim Aslam
a941a4772b parallel test causes deadlock 2018-03-13 18:50:58 +00:00
Asim Aslam
dca078f30b Merge pull request #235 from shuLhan/dev-shulhan
Fix warnings from linter output
2018-03-13 18:25:37 +00:00
Shulhan
cbbf9f7e3b [test] service.TestService: run subtest in parallel
Reason: the t.Fatalf and t.Fatal must be invoked by test routine, not by
other routine, or the the test will not stopped [1].

[1] megacheck SA2002
2018-03-13 18:12:42 +07:00
Shulhan
a54dee31de [lint] service.Init: ignore error by assigning it to blank identifier 2018-03-13 17:51:33 +07:00
Shulhan
1bd541b69e service.Run: replace signal SIGKILL with SIGQUIT
According to "os/signal" documentation [1] and libc manual [2], SIGKILL
may not be caught by a program.

[1] https://godoc.org/os/signal
[2] https://www.gnu.org/software/libc/manual/html_node/Termination-Signals.html
2018-03-13 17:45:34 +07:00
Shulhan
e769802939 service.Run: simplify return statement 2018-03-13 17:40:13 +07:00
Asim Aslam
a3741f8a11 strip namespace from readme 2018-03-09 19:11:42 +00:00
Asim Aslam
6246fa2bcb Merge pull request #233 from micro/context
switch to stdlib context
2018-03-04 09:15:25 +00:00
Asim Aslam
c9b40cb33b switch to stdlib context 2018-03-03 11:53:52 +00:00
Asim Aslam
982e6068cf support services without version 2018-03-01 17:35:13 +00:00
Asim Aslam
e8b050ffd5 update travis 2018-03-01 10:07:48 +00:00
Asim Aslam
13f8e4fef7 nitpick 2018-02-28 15:40:41 +00:00
Asim Aslam
1fe528c411 update readme 2018-02-28 15:38:50 +00:00
Asim Aslam
d0d9582b81 Merge pull request #206 from darren-west/master
Added Options() to registry interface
2018-02-19 20:52:28 +00:00
Asim Aslam
42bdca63da Merge pull request #230 from micro/watch
Add watch options
2018-02-19 20:29:17 +00:00
Asim Aslam
02260dcaa3 Add watch options 2018-02-19 17:12:37 +00:00
Asim Aslam
eb7788ce25 Merge pull request #229 from tudurom/add_conflict_error
errors: Added 409 Conflict helper function
2018-02-13 08:33:22 +00:00
Tudor Roman
3b6f38a45c errors: Added 409 Conflict helper function 2018-02-11 15:51:33 +02:00
Asim Aslam
94ea766c9e syntax highlight 2018-02-01 13:54:37 +00:00
Asim Aslam
ff9ad875af update readme 2018-01-30 16:18:11 +00:00
Asim Aslam
45420d8413 Merge pull request #224 from dh1tw/rpc-server-subscribe-deadlock
fix possible deadlock since code can return without unlocking the Mutex
2018-01-02 10:22:13 +00:00
Tobias Wellnitz, DH1TW
0dcea05fb8 fix possible deadlock since code can return without unlocking the Mutex 2018-01-01 19:57:13 +01:00
Asim Aslam
b0b0338128 add option to set selector 2017-12-20 21:43:24 +00:00
Asim Aslam
11d75dae1b remove version from example 2017-11-30 12:28:20 +00:00
Asim Aslam
fc0bbcd339 change the blurb 2017-11-30 09:16:54 +00:00
Asim Aslam
c82dadfa55 Merge pull request #221 from gaxxx/master
add https support for consul
2017-11-28 07:11:48 +00:00
Siyun Wu
7c8d6087de add https support for consul
using enviroment variables

for example:
export CONSUL_HTTP_SSL=1
export CONSUL_HTTP_ADDR="https://example.com"
export CONSUL_CLIENT_CERT="/Users/foo/.ssh/consul/consul.cert"
export CONSUL_CLIENT_KEY="/Users/foo/.ssh/consul/consul.key"
export CONSUL_CACERT="/Users/foo/.ssh/consul/ca.cert"
2017-11-20 15:34:52 +08:00
Asim Aslam
1f03681d82 set test to use localhost 2017-11-09 14:21:26 +00:00
Asim Aslam
1c1d46e1ac Add some test logging 2017-11-09 14:16:35 +00:00
Asim Aslam
a545091c36 update go versions for travis build 2017-11-09 13:52:51 +00:00
Asim Aslam
ada9ef48cf Remove whitespace 2017-11-09 13:51:40 +00:00
Asim Aslam
a7c4afac54 Merge pull request #213 from weisd/master
add log when register err
2017-11-09 13:50:38 +00:00
Asim Aslam
043e4aa979 please stack overflow 2017-11-03 17:30:16 +00:00
Asim Aslam
78da1fde94 Merge pull request #217 from micro/perf
Performance upgrades
2017-10-29 14:48:23 +00:00
Asim Aslam
e7104d609a return the not found error 2017-10-28 16:21:32 +01:00
Asim Aslam
1890ec7044 rc is not used 2017-10-28 13:55:59 +01:00
Asim Aslam
d48735793d remove ticker 2017-10-26 21:12:48 +01:00
Asim Aslam
6fb652f78a lazily start watcher 2017-10-26 20:55:52 +01:00
Asim Aslam
bd46e60c13 optimise http broker with rcache 2017-10-26 20:48:11 +01:00
Asim Aslam
42235bc973 Merge branch 'master' into perf 2017-10-26 13:47:09 +01:00
Asim Aslam
c07b3636c0 update readme 2017-10-26 12:19:42 +01:00
Asim Aslam
2f09d5830c update readme 2017-10-26 12:18:14 +01:00
Asim Aslam
48513c78b6 further readme culling 2017-10-26 12:16:17 +01:00
Asim Aslam
bd34d39401 update readme 2017-10-26 12:02:52 +01:00
Asim Aslam
59685a4ff9 update readme 2017-10-26 12:02:10 +01:00
weisd
6385bf743c add log when register err 2017-10-25 14:23:58 +08:00
Asim Aslam
8fd8d9bd35 Enable connection pooling and selector caching by default 2017-10-24 15:35:25 +01:00
Asim Aslam
53554d98cd Merge pull request #209 from uffy/master
use sync.Once instead of chan
2017-10-09 14:09:56 +01:00
Uffy
f6165f35c0 import sorting 2017-10-09 20:55:03 +08:00
Uffy
ae3f59a2f5 use sync.Once instead of chan
sync.Once is more clear and faster than chan.
2017-10-09 15:47:28 +08:00
Asim Aslam
0703c514a9 Merge pull request #208 from uffy/master
remove redundant rand.Seed
2017-10-09 07:32:18 +01:00
Uffy
b92130eeee remove redundant rand.Seed 2017-10-09 14:22:15 +08:00
Asim Aslam
e0e4596be0 update readme 2017-10-03 11:19:03 +01:00
Asim Aslam
5f60f7518d update readme 2017-10-03 11:14:39 +01:00
Asim Aslam
f2d4226817 update readme 2017-10-03 11:05:54 +01:00
Asim Aslam
236cfd6a3b update readme 2017-10-03 11:03:46 +01:00
darren-west
d970586a29 Added Options() to registry interface 2017-09-28 11:16:56 +01:00
Asim Aslam
d29b5e2fab Merge pull request #203 from wzhliang/master
fixing typo
2017-08-25 11:00:18 +01:00
Wenzhi Liang
7f173dfc63 fixing typo 2017-08-25 17:56:57 +08:00
Asim Aslam
8be72b676d Merge pull request #202 from freman/mapsorting
Fix hashing of the service definition
2017-08-24 12:16:32 +01:00
Shannon Wynter
0e696f4907 Fix hashing of the service definition
Maps are sorted randomly, order the keys as a slice
2017-08-24 18:25:05 +10:00
Asim Aslam
1748328f14 Merge pull request #201 from vans9/register-subscriber-pass-opts
Pass options to the s.NewSubscriber
2017-08-24 08:53:50 +01:00
Pavel Arefiev
eff39083ca Pass options to the s.NewSubscriber 2017-08-24 10:47:32 +03:00
Asim Aslam
e3f818d18e Merge pull request #197 from sarbash/fix-tls-consul-registry
Check for uninitialized *config.HttpClient
2017-08-15 08:51:50 +01:00
Sergey Sarbash
db0df07fd6 Check for uninitialized *config.HttpClient 2017-08-15 10:49:24 +03:00
Asim Aslam
87c39542b0 Merge pull request #196 from sarbash/fix-tls-consul-registry
Fixes nil pointer dereferencing for Consul TLS transport
2017-08-15 08:08:03 +01:00
Sergey Sarbash
fd01ead575 Fixes nil pointer dereferencing for Consul TLS transport 2017-08-15 09:27:39 +03:00
Asim Aslam
382abbf28b update travis go versions 2017-08-09 13:01:41 +01:00
Asim Aslam
8264e90bce Merge pull request #189 from hlian/master
server/rpc_codec: friendlier behavior if c.codec.Write fails
2017-07-18 14:18:12 +01:00
Hao Lian
d4b149046f server/rpc_codec: if c.codec.Write fails, reset write buffer and encode an error message about the encoding failure
When developing go-micro services, it is frequently possible to set invalid results in the response pointer. When this happens (as I and @trushton personally experienced), `sendResponse()` returns an error correctly explaining what happened (e.g. protobuf refused to encode a bad struct) but the `call()` function one above it in the stack ignores the returned error object.

Thus, invalid structs go un-encoded and the _client side times out_. @trushton and I first caught this in our CI builds when we left a protobuf.Empty field uninitialized (nil) instead of setting it to `&ptypes.Empty{}`. This resulted in an `proto: oneof field has nil value` error, but it was dropped and became a terribly confusing client timeout instead.

This patch is two independent changes:

* In rpc_codec, when a serialization failure occurs serialize an error message, which will correctly become a 500 for HTTP services, about the encoding failure. This means rpc_codec only returns an `error` when a socket failure occurs, which I believe is the behavior that rpc_service is expecting anyway.

* In rpc_service, log any errors returned by sendResponse instead of dropping the error object. This will make debugging client timeouts less of a hassle.
2017-07-17 14:21:43 -04:00
Asim Aslam
e0b3a48323 brevity is key 2017-07-16 14:58:31 +01:00
Asim Aslam
f4ea9787a9 Strip readme comments 2017-07-14 08:24:46 +01:00
Asim Aslam
9438fae607 long standing typo 2017-06-15 19:57:27 +01:00
Asim Aslam
3812cbbcb6 defer wg.Done so it's called even if there's a panic 2017-06-12 14:18:59 +01:00
72 changed files with 1536 additions and 1203 deletions

View File

@@ -1,7 +1,7 @@
language: go
go:
- 1.7.4
- 1.8rc3
- 1.10.x
- 1.11.x
notifications:
slack:
secure: aEvhLbhujaGaKSrOokiG3//PaVHTIrc3fBpoRbCRqfZpyq6WREoapJJhF+tIpWWOwaC9GmChbD6aHo/jMUgwKXVyPSaNjiEL87YzUUpL8B2zslNp1rgfTg/LrzthOx3Q1TYwpaAl3to0fuHUVFX4yMeC2vuThq7WSXgMMxFCtbc=

367
README.md
View File

@@ -1,370 +1,31 @@
# Go Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![GoDoc](https://godoc.org/github.com/micro/go-micro?status.svg)](https://godoc.org/github.com/micro/go-micro) [![Travis CI](https://api.travis-ci.org/micro/go-micro.svg?branch=master)](https://travis-ci.org/micro/go-micro) [![Go Report Card](https://goreportcard.com/badge/micro/go-micro)](https://goreportcard.com/report/github.com/micro/go-micro)
Go Micro is a pluggable RPC framework for **microservices**. It is part of the [Micro](https://github.com/micro/micro) toolkit.
Go Micro is a pluggable RPC framework for distributed systems development.
The **Micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly but everything can be easily swapped out. It comes with built in support for {json,proto}-rpc encoding, consul or multicast dns for service discovery, http for communication and random hashed client side load balancing.
The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly but everything can be easily swapped out. It comes with built in support for {json,proto}-rpc encoding, consul or multicast dns for service discovery, http for communication and random hashed client side load balancing.
Everything in go-micro is **pluggable**. You can find and contribute to plugins at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).
Plugins are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).
Follow us on Twitter at [@MicroHQ](https://twitter.com/microhq), join the [Slack](https://micro-services.slack.com) community [here](http://slack.micro.mu/) or
check out the [Mailing List](https://groups.google.com/forum/#!forum/microhq).
Follow us on [Twitter](https://twitter.com/microhq) or join the [Slack](http://slack.micro.mu/) community.
## Features
Go Micro abstracts way the details of distributed systems. Here are the main features.
Go Micro abstracts away the details of distributed systems. Here are the main features.
- **Service Discovery** - Applications are automatically registered with service discovery so they can find each other.
- **Load Balancing** - Smart client side load balancing is used to balance requests between instances of a service.
- **Synchronous Communication** - Request-response is provided as a bidirectional streaming transport layer.
- **Asynchronous Communication** - Microservices should promote an event driven architecture. Publish and Subscribe semantics are built in.
- **Message Encoding** - Micro services can encode requests in a number of encoding formats and seamlessly decode based on the Content-Type header.
- **RPC Client/Server** - The client and server leverage the above features and provide a clean simple interface for building microservices.
Go Micro supports both the Service and Function programming models. Read on to learn more.
## Docs
For more detailed information on the architecture, installation and use of go-micro checkout the [docs](https://micro.mu/docs).
## Learn By Example
An example service can be found in [**examples/service**](https://github.com/micro/examples/tree/master/service) and function in [**examples/function**](https://github.com/micro/examples/tree/master/function). The [**examples**](https://github.com/micro/examples) directory contains many more examples for using things such as middleware/wrappers, selector filters, pub/sub and code generation.
For the complete greeter example look at [**examples/greeter**](https://github.com/micro/examples/tree/master/greeter). Other examples can be found throughout the GitHub repository.
Check out the blog post to learn how to write go-micro services [https://micro.mu/blog/2016/03/28/go-micro.html](https://micro.mu/blog/2016/03/28/go-micro.html) or watch the talk from the [Golang UK Conf 2016](https://www.youtube.com/watch?v=xspaDovwk34).
- **Service Discovery** - Automatic service registration and name resolution
- **Load Balancing** - Client side load balancing built on discovery
- **Message Encoding** - Dynamic encoding based on content-type with protobuf and json support
- **Sync Streaming** - RPC based communication with support for bidirectional streaming
- **Async Messaging** - Native PubSub messaging built in for event driven architectures
## Getting Started
This is a quick getting started guide with the greeter service example.
### Prerequisites: Service Discovery
There's just one prerequisite. We need a service discovery system to resolve service names to their address.
The default discovery mechanism used in go-micro is Consul. Discovery is however pluggable so you can used
etcd, kubernetes, zookeeper, etc. Plugins can be found in [micro/go-plugins](https://github.com/micro/go-plugins).
### Multicast DNS
We can use multicast DNS with the built in MDNS registry for a zero dependency configuration.
Just pass `--registry=mdns` to any command
```
$ go run main.go --registry=mdns
```
### Consul
Alternatively we can use the default discovery system which is Consul.
**Mac OS**
```
brew install consul
consul agent -dev
```
**Docker**
```
docker run consul
```
[Further installation instructions](https://www.consul.io/intro/getting-started/install.html)
### Run Service
```
$ go get github.com/micro/examples/service && service
2016/03/14 10:59:14 Listening on [::]:50137
2016/03/14 10:59:14 Broker Listening on [::]:50138
2016/03/14 10:59:14 Registering node: greeter-ca62b017-e9d3-11e5-9bbb-68a86d0d36b6
```
### Call Service
```
$ service --run_client
Hello John
```
## Writing a service
### Create service proto
One of the key requirements of microservices is strongly defined interfaces so we utilised protobuf to define the handler and request/response.
Here's a definition for the Greeter handler with the method Hello which takes a HelloRequest and HelloResponse both with one string arguments.
`go-micro/examples/service/proto/greeter.proto`:
```proto
syntax = "proto3";
service Greeter {
rpc Hello(HelloRequest) returns (HelloResponse) {}
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string greeting = 2;
}
```
### Install protobuf
We use a protobuf plugin for code generation. This is completely optional. Look at [examples/server](https://github.com/micro/examples/blob/master/server/main.go)
and [examples/client](https://github.com/micro/examples/blob/master/client/main.go) for examples without code generation.
```shell
go get github.com/micro/protobuf/{proto,protoc-gen-go}
```
There's still a need for proto compiler to generate Go stub code from our proto file. You can either use the micro fork above or the official repo `github.com/golang/protobuf`.
### Compile the proto
```shell
protoc -I$GOPATH/src --go_out=plugins=micro:$GOPATH/src \
$GOPATH/src/github.com/micro/examples/service/proto/greeter.proto
```
### Define the service
Below is the code sample for the Greeter service. It basically implements the interface defined above for the Greeter handler,
initialises the service, registers the handler and then runs itself. Simple as that.
`go-micro/examples/service/main.go`:
```go
package main
import (
"fmt"
micro "github.com/micro/go-micro"
proto "github.com/micro/examples/service/proto"
"golang.org/x/net/context"
)
type Greeter struct{}
func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error {
rsp.Greeting = "Hello " + req.Name
return nil
}
func main() {
// Create a new service. Optionally include some options here.
service := micro.NewService(
micro.Name("greeter"),
micro.Version("latest"),
micro.Metadata(map[string]string{
"type": "helloworld",
}),
)
// Init will parse the command line flags. Any flags set will
// override the above settings. Options defined here will
// override anything set on the command line.
service.Init()
// Register handler
proto.RegisterGreeterHandler(service.Server(), new(Greeter))
// Run the server
if err := service.Run(); err != nil {
fmt.Println(err)
}
}
```
### Run service
```
go run examples/service/main.go
2016/03/14 10:59:14 Listening on [::]:50137
2016/03/14 10:59:14 Broker Listening on [::]:50138
2016/03/14 10:59:14 Registering node: greeter-ca62b017-e9d3-11e5-9bbb-68a86d0d36b6
```
### Define a client
Below is the client code to query the greeter service. Notice we're using the code generated client interface `proto.NewGreeterClient`.
This reduces the amount of boiler plate code we need to write. The greeter client can be reused throughout the code if need be.
`client.go`
```go
package main
import (
"fmt"
micro "github.com/micro/go-micro"
proto "github.com/micro/examples/service/proto"
"golang.org/x/net/context"
)
func main() {
// Create a new service. Optionally include some options here.
service := micro.NewService(micro.Name("greeter.client"))
// Create new greeter client
greeter := proto.NewGreeterClient("greeter", service.Client())
// Call the greeter
rsp, err := greeter.Hello(context.TODO(), &proto.HelloRequest{Name: "John"})
if err != nil {
fmt.Println(err)
}
// Print response
fmt.Println(rsp.Greeting)
}
```
### Run the client
```shell
go run client.go
Hello John
```
## Writing a Function
Go Micro includes the Function programming model. This is the notion of a one time executing Service which operates much like a service except exiting
after completing a request. A function is defined much like a service and called in exactly the same way.
### Defining a Function
```go
package main
import (
proto "github.com/micro/examples/function/proto"
"github.com/micro/go-micro"
"golang.org/x/net/context"
)
type Greeter struct{}
func (g *Greeter) Hello(ctx context.Context, req *proto.HelloRequest, rsp *proto.HelloResponse) error {
rsp.Greeting = "Hello " + req.Name
return nil
}
func main() {
// create a new function
fnc := micro.NewFunction(
micro.Name("go.micro.fnc.greeter"),
)
// init the command line
fnc.Init()
// register a handler
fnc.Handle(new(Greeter))
// run the function
fnc.Run()
}
```
It's that simple.
## How does it work?
<p align="center">
<img src="go-micro.png" />
</p>
Go Micro is a framework that addresses the fundamental requirements to write microservices.
Let's dig into the core components.
### Registry
The registry provides a service discovery mechanism to resolve names to addresses. It can be backed by consul, etcd, zookeeper, dns, gossip, etc.
Services should register using the registry on startup and deregister on shutdown. Services can optionally provide an expiry TTL and reregister
on an interval to ensure liveness and that the service is cleaned up if it dies.
### Selector
The selector is a load balancing abstraction which builds on the registry. It allows services to be "filtered" using filter functions and "selected"
using a choice of algorithms such as random, roundrobin, leastconn, etc. The selector is leveraged by the Client when making requests. The client
will use the selector rather than the registry as it provides that built in mechanism of load balancing.
### Transport
The transport is the interface for synchronous request/response communication between services. It's akin to the golang net package but provides
a higher level abstraction which allows us to switch out communication mechanisms e.g http, rabbitmq, websockets, NATS. The transport also
supports bidirectional streaming. This is powerful for client side push to the server.
### Broker
The broker provides an interface to a message broker for asynchronous pub/sub communication. This is one of the fundamental requirements of an event
driven architecture and microservices. By default we use an inbox style point to point HTTP system to minimise the number of dependencies required
to get started. However there are many message broker implementations available in go-plugins e.g RabbitMQ, NATS, NSQ, Google Cloud Pub Sub.
### Codec
The codec is used for encoding and decoding messages before transporting them across the wire. This could be json, protobuf, bson, msgpack, etc.
Where this differs from most other codecs is that we actually support the RPC format here as well. So we have JSON-RPC, PROTO-RPC, BSON-RPC, etc.
It separates encoding from the client/server and provides a powerful method for integrating other systems such as gRPC, Vanadium, etc.
### Server
The server is the building block for writing a service. Here you can name your service, register request handlers, add middeware, etc. The service
builds on the above packages to provide a unified interface for serving requests. The built in server is an RPC system. In the future there maybe
other implementations. The server also allows you to define multiple codecs to serve different encoded messages.
### Client
The client provides an interface to make requests to services. Again like the server, it builds on the other packages to provide a unified interface
for finding services by name using the registry, load balancing using the selector, making synchronous requests with the transport and asynchronous
messaging using the broker.
The above components are combined at the top-level of micro as a **Service**.
## Plugins
By default go-micro only provides a few implementation of each interface at the core but it's completely pluggable. There's already dozens of plugins which are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins). Contributions are welcome!
### Build with plugins
If you want to integrate plugins simply link them in a separate file and rebuild
Create a plugins.go file
```go
import (
// etcd v3 registry
_ "github.com/micro/go-plugins/registry/etcdv3"
// nats transport
_ "github.com/micro/go-plugins/transport/nats"
// kafka broker
_ "github.com/micro/go-plugins/broker/kafka"
)
```
Build binary
```shell
// For local use
go build -i -o service ./main.go ./plugins.go
```
Flag usage of plugins
```shell
service --registry=etcdv3 --transport=nats --broker=kafka
```
## Other Languages
Check out [ja-micro](https://github.com/Sixt/ja-micro) to write services in Java
For detailed information on the architecture, installation and use of go-micro checkout the [docs](https://micro.mu/docs).
## Sponsors
Open source development of Micro is sponsored by Sixt
Sixt is an Enterprise Sponsor of Micro
<a href="https://micro.mu/blog/2016/04/25/announcing-sixt-sponsorship.html"><img src="https://micro.mu/sixt_logo.png" width=150px height="auto" /></a>
Become a sponsor by backing micro on [Patreon](https://www.patreon.com/microhq)

View File

@@ -2,8 +2,6 @@
package broker
// Broker is an interface used for asynchronous messaging.
// Its an abstraction over various message brokers
// {NATS, RabbitMQ, Kafka, ...}
type Broker interface {
Options() Options
Address() string

View File

@@ -2,7 +2,9 @@ package broker
import (
"bytes"
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"io/ioutil"
@@ -18,25 +20,20 @@ import (
"github.com/micro/go-log"
"github.com/micro/go-micro/broker/codec/json"
"github.com/micro/go-micro/errors"
merr "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
maddr "github.com/micro/misc/lib/addr"
mnet "github.com/micro/misc/lib/net"
mls "github.com/micro/misc/lib/tls"
"github.com/micro/go-rcache"
maddr "github.com/micro/util/go/lib/addr"
mnet "github.com/micro/util/go/lib/net"
mls "github.com/micro/util/go/lib/tls"
"github.com/pborman/uuid"
"golang.org/x/net/context"
)
// HTTP Broker is a placeholder for actual message brokers.
// This should not really be used in production but useful
// in developer where you want zero dependencies.
// HTTP Broker is a point to point async broker
type httpBroker struct {
id string
address string
unsubscribe chan *httpSubscriber
opts Options
id string
address string
opts Options
mux *http.ServeMux
@@ -53,9 +50,9 @@ type httpSubscriber struct {
opts SubscribeOptions
id string
topic string
ch chan *httpSubscriber
fn Handler
svc *registry.Service
hb *httpBroker
}
type httpPublication struct {
@@ -106,11 +103,13 @@ func newHttpBroker(opts ...Option) Broker {
o(&options)
}
// set address
addr := ":0"
if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
addr = options.Addrs[0]
}
// get registry
reg, ok := options.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
@@ -123,7 +122,6 @@ func newHttpBroker(opts ...Option) Broker {
r: reg,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
unsubscribe: make(chan *httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
}
@@ -153,9 +151,41 @@ func (h *httpSubscriber) Topic() string {
}
func (h *httpSubscriber) Unsubscribe() error {
h.ch <- h
// artificial delay
time.Sleep(time.Millisecond * 10)
return h.hb.unsubscribe(h)
}
func (h *httpBroker) subscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
return err
}
h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
return nil
}
func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
var subscribers []*httpSubscriber
// look for subscriber
for _, sub := range h.subscribers[s.topic] {
// deregister and skip forward
if sub.id == s.id {
_ = h.r.Deregister(sub.svc)
continue
}
// keep subscriber
subscribers = append(subscribers, sub)
}
// set subscribers
h.subscribers[s.topic] = subscribers
return nil
}
@@ -170,42 +200,91 @@ func (h *httpBroker) run(l net.Listener) {
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
_ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL))
}
}
h.RUnlock()
// received exit signal
case ch := <-h.exit:
ch <- l.Close()
h.Lock()
h.running = false
h.Unlock()
return
// unsubscribe subscriber
case subscriber := <-h.unsubscribe:
h.Lock()
var subscribers []*httpSubscriber
for _, sub := range h.subscribers[subscriber.topic] {
// deregister and skip forward
if sub.id == subscriber.id {
h.r.Deregister(sub.svc)
continue
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
_ = h.r.Deregister(sub.svc)
}
subscribers = append(subscribers, sub)
}
h.subscribers[subscriber.topic] = subscribers
h.Unlock()
h.RUnlock()
return
}
}
}
func (h *httpBroker) start() error {
h.Lock()
defer h.Unlock()
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
err := merr.BadRequest("go.micro.broker", "Method not allowed")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
return
}
defer req.Body.Close()
req.ParseForm()
b, err := ioutil.ReadAll(req.Body)
if err != nil {
errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
var m *Message
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
topic := m.Header[":topic"]
delete(m.Header, ":topic")
if len(topic) == 0 {
errr := merr.InternalServerError("go.micro.broker", "Topic not found")
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
p := &httpPublication{m: m, t: topic}
id := req.Form.Get("id")
h.RLock()
for _, subscriber := range h.subscribers[topic] {
if id == subscriber.id {
// sub is sync; crufty rate limiting
// so we don't hose the cpu
subscriber.fn(p)
}
}
h.RUnlock()
}
func (h *httpBroker) Address() string {
h.RLock()
defer h.RUnlock()
return h.address
}
func (h *httpBroker) Connect() error {
h.RLock()
if h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
var l net.Listener
var err error
@@ -250,107 +329,94 @@ func (h *httpBroker) start() error {
}
log.Logf("Broker Listening on %s", l.Addr().String())
addr := h.address
h.address = l.Addr().String()
go http.Serve(l, h.mux)
go h.run(l)
go func() {
h.run(l)
h.Lock()
h.address = addr
h.Unlock()
}()
// get registry
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
// set rcache
h.r = rcache.New(reg)
// set running
h.running = true
return nil
}
func (h *httpBroker) stop() error {
func (h *httpBroker) Disconnect() error {
h.RLock()
if !h.running {
h.RUnlock()
return nil
}
h.RUnlock()
h.Lock()
defer h.Unlock()
if !h.running {
return nil
// stop rcache
rc, ok := h.r.(rcache.Cache)
if ok {
rc.Stop()
}
// exit and return err
ch := make(chan error)
h.exit <- ch
err := <-ch
// set not running
h.running = false
return err
}
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
err := errors.BadRequest("go.micro.broker", "Method not allowed")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
return
}
defer req.Body.Close()
req.ParseForm()
b, err := ioutil.ReadAll(req.Body)
if err != nil {
errr := errors.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
var m *Message
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
errr := errors.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
topic := m.Header[":topic"]
delete(m.Header, ":topic")
if len(topic) == 0 {
errr := errors.InternalServerError("go.micro.broker", "Topic not found")
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
p := &httpPublication{m: m, t: topic}
id := req.Form.Get("id")
func (h *httpBroker) Init(opts ...Option) error {
h.RLock()
for _, subscriber := range h.subscribers[topic] {
if id == subscriber.id {
// sub is sync; crufty rate limiting
// so we don't hose the cpu
subscriber.fn(p)
}
if h.running {
h.RUnlock()
return errors.New("cannot init while connected")
}
h.RUnlock()
}
func (h *httpBroker) Address() string {
return h.address
}
h.Lock()
defer h.Unlock()
func (h *httpBroker) Connect() error {
return h.start()
}
func (h *httpBroker) Disconnect() error {
return h.stop()
}
func (h *httpBroker) Init(opts ...Option) error {
for _, o := range opts {
o(&h.opts)
}
if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 {
h.address = h.opts.Addrs[0]
}
if len(h.id) == 0 {
h.id = "broker-" + uuid.NewUUID().String()
}
// get registry
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
h.r = reg
// get rcache
if rc, ok := h.r.(rcache.Cache); ok {
rc.Stop()
}
// set registry
h.r = rcache.New(reg)
return nil
}
@@ -360,10 +426,13 @@ func (h *httpBroker) Options() Options {
}
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
h.RLock()
s, err := h.r.GetService("topic:" + topic)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
m := &Message{
Header: make(map[string]string),
@@ -381,7 +450,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
return err
}
fn := func(node *registry.Node, b []byte) {
pub := func(node *registry.Node, b []byte) {
scheme := "http"
// check if secure is added in metadata
@@ -411,15 +480,14 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
case broadcastVersion:
for _, node := range service.Nodes {
// publish async
go fn(node, b)
go pub(node, b)
}
default:
// select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)]
// publish async
go fn(node, b)
go pub(node, b)
}
}
@@ -427,7 +495,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
}
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
opt := newSubscribeOptions(opts...)
options := newSubscribeOptions(opts...)
// parse address for host, port
parts := strings.Split(h.Address(), ":")
@@ -439,7 +507,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
return nil, err
}
id := uuid.NewUUID().String()
// create unique id
id := h.id + "." + uuid.NewUUID().String()
var secure bool
@@ -449,7 +518,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
// register service
node := &registry.Node{
Id: h.id + "." + id,
Id: id,
Address: addr,
Port: port,
Metadata: map[string]string{
@@ -457,7 +526,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
},
}
version := opt.Queue
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
}
@@ -468,22 +538,22 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
Nodes: []*registry.Node{node},
}
// generate subscriber
subscriber := &httpSubscriber{
opts: opt,
id: h.id + "." + id,
opts: options,
hb: h,
id: id,
topic: topic,
ch: h.unsubscribe,
fn: handler,
svc: service,
}
if err := h.r.Register(service, registry.RegisterTTL(registerTTL)); err != nil {
// subscribe now
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
h.Lock()
h.subscribers[topic] = append(h.subscribers[topic], subscriber)
h.Unlock()
// return the subscriber
return subscriber, nil
}

View File

@@ -1,11 +1,11 @@
package broker
import (
"context"
"crypto/tls"
"github.com/micro/go-micro/broker/codec"
"github.com/micro/go-micro/registry"
"golang.org/x/net/context"
)
type Options struct {

View File

@@ -1,10 +1,9 @@
package client
import (
"context"
"math"
"time"
"golang.org/x/net/context"
)
type BackoffFunc func(ctx context.Context, req Request, attempts int) (time.Duration, error)

View File

@@ -1,18 +1,19 @@
package client
import (
"context"
"math"
"testing"
"time"
"golang.org/x/net/context"
)
func TestBackoff(t *testing.T) {
delta := time.Duration(0)
c := NewClient()
for i := 0; i < 5; i++ {
d, err := exponentialBackoff(context.TODO(), NewJsonRequest("test", "test", nil), i)
d, err := exponentialBackoff(context.TODO(), c.NewRequest("test", "test", nil), i)
if err != nil {
t.Fatal(err)
}

View File

@@ -2,9 +2,8 @@
package client
import (
"context"
"time"
"golang.org/x/net/context"
)
// Client is the interface used to make requests to services.
@@ -13,22 +12,18 @@ import (
type Client interface {
Init(...Option) error
Options() Options
NewPublication(topic string, msg interface{}) Publication
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error)
StreamRemote(ctx context.Context, addr string, req Request, opts ...CallOption) (Streamer, error)
Publish(ctx context.Context, p Publication, opts ...PublishOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
String() string
}
// Publication is the interface for a message published asynchronously
type Publication interface {
// Message is the interface for publishing asynchronously
type Message interface {
Topic() string
Message() interface{}
Payload() interface{}
ContentType() string
}
@@ -42,8 +37,8 @@ type Request interface {
Stream() bool
}
// Streamer is the inteface for a bidirectional synchronous stream
type Streamer interface {
// Stream is the inteface for a bidirectional synchronous stream
type Stream interface {
Context() context.Context
Request() Request
Send(interface{}) error
@@ -61,6 +56,9 @@ type CallOption func(*CallOptions)
// PublishOption used by Publish
type PublishOption func(*PublishOptions)
// MessageOption used by NewMessage
type MessageOption func(*MessageOptions)
// RequestOption used by NewRequest
type RequestOption func(*RequestOptions)
@@ -70,13 +68,13 @@ var (
// DefaultBackoff is the default backoff function for retries
DefaultBackoff = exponentialBackoff
// DefaultRetry is the default check-for-retry function for retries
DefaultRetry = alwaysRetry
DefaultRetry = RetryOnError
// DefaultRetries is the default number of times a request is tried
DefaultRetries = 1
// DefaultRequestTimeout is the default request timeout
DefaultRequestTimeout = time.Second * 5
// DefaultPoolSize sets the connection pool size
DefaultPoolSize = 0
DefaultPoolSize = 1
// DefaultPoolTTL sets the connection pool ttl
DefaultPoolTTL = time.Minute
)
@@ -86,26 +84,15 @@ func Call(ctx context.Context, request Request, response interface{}, opts ...Ca
return DefaultClient.Call(ctx, request, response, opts...)
}
// Makes a synchronous call to the specified address using the default client
func CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error {
return DefaultClient.CallRemote(ctx, address, request, response, opts...)
}
// Creates a streaming connection with a service and returns responses on the
// channel passed in. It's up to the user to close the streamer.
func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
return DefaultClient.Stream(ctx, request, opts...)
}
// Creates a streaming connection to the address specified.
func StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) {
return DefaultClient.StreamRemote(ctx, address, request, opts...)
}
// Publishes a publication using the default client. Using the underlying broker
// set within the options.
func Publish(ctx context.Context, p Publication) error {
return DefaultClient.Publish(ctx, p)
func Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
return DefaultClient.Publish(ctx, msg, opts...)
}
// Creates a new message using the default client
func NewMessage(topic string, payload interface{}, opts ...MessageOption) Message {
return DefaultClient.NewMessage(topic, payload, opts...)
}
// Creates a new client with the options passed in
@@ -113,25 +100,16 @@ func NewClient(opt ...Option) Client {
return newRpcClient(opt...)
}
// Creates a new publication using the default client
func NewPublication(topic string, message interface{}) Publication {
return DefaultClient.NewPublication(topic, message)
}
// Creates a new request using the default client. Content Type will
// be set to the default within options and use the appropriate codec
func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewRequest(service, method, request, reqOpts...)
}
// Creates a new protobuf request using the default client
func NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewProtoRequest(service, method, request, reqOpts...)
}
// Creates a new json request using the default client
func NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return DefaultClient.NewJsonRequest(service, method, request, reqOpts...)
// Creates a streaming connection with a service and returns responses on the
// channel passed in. It's up to the user to close the streamer.
func NewStream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
return DefaultClient.Stream(ctx, request, opts...)
}
func String() string {

View File

@@ -1,51 +0,0 @@
package client
/*
Wrapper is a type of middleware for the go-micro client. It allows
the client to be "wrapped" so that requests and responses can be intercepted
to perform extra requirements such as auth, tracing, monitoring, logging, etc.
Example usage:
import (
"log"
"github.com/micro/go-micro/client"
)
type LogWrapper struct {
client.Client
}
func (l *LogWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
log.Println("Making request to service " + req.Service() + " method " + req.Method())
return w.Client.Call(ctx, req, rsp)
}
func Wrapper(c client.Client) client.Client {
return &LogWrapper{c}
}
func main() {
c := client.NewClient(client.Wrap(Wrapper))
}
*/
import (
"golang.org/x/net/context"
)
// CallFunc represents the individual call func
type CallFunc func(ctx context.Context, address string, req Request, rsp interface{}, opts CallOptions) error
// CallWrapper is a low level wrapper for the CallFunc
type CallWrapper func(CallFunc) CallFunc
// Wrapper wraps a client and returns a client
type Wrapper func(Client) Client
// StreamWrapper wraps a Stream and returns the equivalent
type StreamWrapper func(Streamer) Streamer

View File

@@ -1,7 +1,7 @@
package client
import (
"golang.org/x/net/context"
"context"
)
type clientKey struct{}

View File

@@ -1,7 +1,7 @@
package mock
import (
"golang.org/x/net/context"
"context"
)
type responseKey struct{}

View File

@@ -1,14 +1,13 @@
package mock
import (
"context"
"fmt"
"reflect"
"sync"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/errors"
"golang.org/x/net/context"
)
var (
@@ -50,22 +49,14 @@ func (m *MockClient) Options() client.Options {
return m.Opts
}
func (m *MockClient) NewPublication(topic string, msg interface{}) client.Publication {
return m.Client.NewPublication(topic, msg)
func (m *MockClient) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message {
return m.Client.NewMessage(topic, msg, opts...)
}
func (m *MockClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewRequest(service, method, req, reqOpts...)
}
func (m *MockClient) NewProtoRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewProtoRequest(service, method, req, reqOpts...)
}
func (m *MockClient) NewJsonRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return m.Client.NewJsonRequest(service, method, req, reqOpts...)
}
func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
m.Lock()
defer m.Unlock()
@@ -89,8 +80,12 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr {
v = reflect.Indirect(v)
}
response := r.Response
if t := reflect.TypeOf(r.Response); t.Kind() == reflect.Func {
response = reflect.ValueOf(r.Response).Call([]reflect.Value{})[0].Interface()
}
v.Set(reflect.ValueOf(r.Response))
v.Set(reflect.ValueOf(response))
return nil
}
@@ -98,39 +93,7 @@ func (m *MockClient) Call(ctx context.Context, req client.Request, rsp interface
return fmt.Errorf("rpc: can't find service %s", req.Method())
}
func (m *MockClient) CallRemote(ctx context.Context, addr string, req client.Request, rsp interface{}, opts ...client.CallOption) error {
m.Lock()
defer m.Unlock()
response, ok := m.Response[req.Service()]
if !ok {
return errors.NotFound("go.micro.client.mock", "service not found")
}
for _, r := range response {
if r.Method != req.Method() {
continue
}
if r.Error != nil {
return r.Error
}
v := reflect.ValueOf(rsp)
if t := reflect.TypeOf(rsp); t.Kind() == reflect.Ptr {
v = reflect.Indirect(v)
}
v.Set(reflect.ValueOf(r.Response))
return nil
}
return fmt.Errorf("rpc: can't find service %s", req.Method())
}
func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
m.Lock()
defer m.Unlock()
@@ -138,15 +101,7 @@ func (m *MockClient) Stream(ctx context.Context, req client.Request, opts ...cli
return nil, nil
}
func (m *MockClient) StreamRemote(ctx context.Context, addr string, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
m.Lock()
defer m.Unlock()
// TODO: mock stream
return nil, nil
}
func (m *MockClient) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error {
func (m *MockClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
return nil
}

View File

@@ -1,11 +1,10 @@
package mock
import (
"context"
"testing"
"github.com/micro/go-micro/errors"
"golang.org/x/net/context"
)
func TestClient(t *testing.T) {
@@ -17,12 +16,14 @@ func TestClient(t *testing.T) {
{Method: "Foo.Bar", Response: map[string]interface{}{"foo": "bar"}},
{Method: "Foo.Struct", Response: &TestResponse{Param: "aparam"}},
{Method: "Foo.Fail", Error: errors.InternalServerError("go.mock", "failed")},
{Method: "Foo.Func", Response: func() string { return "string" }},
{Method: "Foo.FuncStruct", Response: func() *TestResponse { return &TestResponse{Param: "aparam"} }},
}
c := NewClient(Response("go.mock", response))
for _, r := range response {
req := c.NewJsonRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"})
req := c.NewRequest("go.mock", r.Method, map[string]interface{}{"foo": "bar"})
var rsp interface{}
err := c.Call(context.TODO(), req, &rsp)

View File

@@ -1,6 +1,7 @@
package client
import (
"context"
"time"
"github.com/micro/go-micro/broker"
@@ -8,8 +9,6 @@ import (
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport"
"golang.org/x/net/context"
)
type Options struct {
@@ -41,6 +40,8 @@ type Options struct {
type CallOptions struct {
SelectOptions []selector.SelectOption
// Address of remote host
Address string
// Backoff func
Backoff BackoffFunc
// Check if retriable func
@@ -66,8 +67,13 @@ type PublishOptions struct {
Context context.Context
}
type MessageOptions struct {
ContentType string
}
type RequestOptions struct {
Stream bool
ContentType string
Stream bool
// Other options for implementations of the interface
// can be stored in a context
@@ -227,6 +233,13 @@ func DialTimeout(d time.Duration) Option {
// Call Options
// WithAddress sets the remote address to use rather than using service discovery
func WithAddress(a string) CallOption {
return func(o *CallOptions) {
o.Address = a
}
}
func WithSelectOption(so ...selector.SelectOption) CallOption {
return func(o *CallOptions) {
o.SelectOptions = append(o.SelectOptions, so...)
@@ -282,6 +295,12 @@ func WithDialTimeout(d time.Duration) CallOption {
// Request Options
func WithContentType(ct string) RequestOption {
return func(o *RequestOptions) {
o.ContentType = ct
}
}
func StreamingRequest() RequestOption {
return func(o *RequestOptions) {
o.Stream = true

View File

@@ -1,11 +1,35 @@
package client
import "golang.org/x/net/context"
import (
"context"
"github.com/micro/go-micro/errors"
)
// note that returning either false or a non-nil error will result in the call not being retried
type RetryFunc func(ctx context.Context, req Request, retryCount int, err error) (bool, error)
// always retry on error
func alwaysRetry(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
// RetryAlways always retry on error
func RetryAlways(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
return true, nil
}
// RetryOnError retries a request on a 500 or timeout error
func RetryOnError(ctx context.Context, req Request, retryCount int, err error) (bool, error) {
if err == nil {
return false, nil
}
e := errors.Parse(err.Error())
if e == nil {
return false, nil
}
switch e.Code {
// retry on timeout or internal server error
case 408, 500:
return true, nil
default:
return false, nil
}
}

View File

@@ -2,24 +2,27 @@ package client
import (
"bytes"
"context"
"fmt"
"sync"
"time"
"sync/atomic"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport"
"golang.org/x/net/context"
)
type rpcClient struct {
once sync.Once
opts Options
pool *pool
seq uint64
}
func newRpcClient(opt ...Option) Client {
@@ -29,6 +32,7 @@ func newRpcClient(opt ...Option) Client {
once: sync.Once{},
opts: opts,
pool: newPool(opts.PoolSize, opts.PoolTTL),
seq: 0,
}
c := Client(rc)
@@ -85,11 +89,15 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
r.pool.release(address, c, grr)
}()
seq := atomic.LoadUint64(&r.seq)
atomic.AddUint64(&r.seq, 1)
stream := &rpcStream{
context: ctx,
request: req,
closed: make(chan bool),
codec: newRpcPlusCodec(msg, c, cf),
seq: seq,
}
defer stream.Close()
@@ -128,7 +136,7 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
}
}
func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Streamer, error) {
func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Stream, error) {
msg := &transport.Message{
Header: make(map[string]string),
}
@@ -152,7 +160,15 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
c, err := r.opts.Transport.Dial(address, transport.WithStream(), transport.WithTimeout(opts.DialTimeout))
dOpts := []transport.DialOption{
transport.WithStream(),
}
if opts.DialTimeout >= 0 {
dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
}
c, err := r.opts.Transport.Dial(address, dOpts...)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", "connection error: %v", err)
}
@@ -195,9 +211,12 @@ func (r *rpcClient) Init(opts ...Option) error {
o(&r.opts)
}
// recreate the pool if the options changed
// update pool configuration if the options changed
if size != r.opts.PoolSize || ttl != r.opts.PoolTTL {
r.pool = newPool(r.opts.PoolSize, r.opts.PoolTTL)
r.pool.Lock()
r.pool.size = r.opts.PoolSize
r.pool.ttl = int64(r.opts.PoolTTL.Seconds())
r.pool.Unlock()
}
return nil
@@ -207,13 +226,25 @@ func (r *rpcClient) Options() Options {
return r.opts
}
func (r *rpcClient) CallRemote(ctx context.Context, address string, request Request, response interface{}, opts ...CallOption) error {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
// return remote address
if len(opts.Address) > 0 {
return func() (*registry.Node, error) {
return &registry.Node{
Address: opts.Address,
}, nil
}, nil
}
return r.call(ctx, address, request, response, callOpts)
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", request.Service(), err.Error())
}
return next, nil
}
func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
@@ -223,12 +254,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
opt(&callOpts)
}
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
next, err := r.next(request, callOpts)
if err != nil {
return err
}
// check if we already have a deadline
@@ -263,7 +291,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
@@ -274,9 +302,9 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
return errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
// set the address
@@ -294,10 +322,10 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
ch := make(chan error, callOpts.Retries)
var gerr error
for i := 0; i < callOpts.Retries; i++ {
go func() {
for i := 0; i <= callOpts.Retries; i++ {
go func(i int) {
ch <- call(i)
}()
}(i)
select {
case <-ctx.Done():
@@ -324,40 +352,16 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return gerr
}
func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
return r.stream(ctx, address, request, callOpts)
}
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
// make a copy of call opts
callOpts := r.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
// no deadline so we create a new one
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
next, err := r.next(request, callOpts)
if err != nil {
return nil, err
}
// should we noop right here?
@@ -367,11 +371,11 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
default:
}
call := func(i int) (Streamer, error) {
call := func(i int) (Stream, error) {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, request, i)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
}
// only sleep if greater than 0
@@ -381,9 +385,9 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
node, err := next()
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
address := node.Address
@@ -397,14 +401,14 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
}
type response struct {
stream Streamer
stream Stream
err error
}
ch := make(chan response, callOpts.Retries)
var grr error
for i := 0; i < callOpts.Retries; i++ {
for i := 0; i <= callOpts.Retries; i++ {
go func() {
s, err := call(i)
ch <- response{s, err}
@@ -435,49 +439,38 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, grr
}
func (r *rpcClient) Publish(ctx context.Context, p Publication, opts ...PublishOption) error {
func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOption) error {
md, ok := metadata.FromContext(ctx)
if !ok {
md = make(map[string]string)
}
md["Content-Type"] = p.ContentType()
md["Content-Type"] = msg.ContentType()
// encode message body
cf, err := r.newCodec(p.ContentType())
cf, err := r.newCodec(msg.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil {
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
r.once.Do(func() {
r.opts.Broker.Connect()
})
return r.opts.Broker.Publish(p.Topic(), &broker.Message{
return r.opts.Broker.Publish(msg.Topic(), &broker.Message{
Header: md,
Body: b.Bytes(),
})
}
func (r *rpcClient) NewPublication(topic string, message interface{}) Publication {
return newRpcPublication(topic, message, r.opts.ContentType)
func (r *rpcClient) NewMessage(topic string, message interface{}, opts ...MessageOption) Message {
return newMessage(topic, message, r.opts.ContentType, opts...)
}
func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication {
return newRpcPublication(topic, message, "application/octet-stream")
}
func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, r.opts.ContentType, reqOpts...)
}
func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/octet-stream", reqOpts...)
}
func (r *rpcClient) NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, "application/json", reqOpts...)
return newRequest(service, method, request, r.opts.ContentType, reqOpts...)
}
func (r *rpcClient) String() string {

View File

@@ -1,16 +1,102 @@
package client
import (
"context"
"fmt"
"testing"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/mock"
"github.com/micro/go-micro/selector"
"golang.org/x/net/context"
)
func TestCallAddress(t *testing.T) {
var called bool
service := "test.service"
method := "Test.Method"
address := "10.1.10.1:8080"
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
called = true
if req.Service() != service {
return fmt.Errorf("expected service: %s got %s", service, req.Service())
}
if req.Method() != method {
return fmt.Errorf("expected service: %s got %s", method, req.Method())
}
if addr != address {
return fmt.Errorf("expected address: %s got %s", address, addr)
}
// don't do the call
return nil
}
}
r := mock.NewRegistry()
c := NewClient(
Registry(r),
WrapCall(wrap),
)
c.Options().Selector.Init(selector.Registry(r))
req := c.NewRequest(service, method, nil)
// test calling remote address
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
t.Fatal("call with address error", err)
}
if !called {
t.Fatal("wrapper not called")
}
}
func TestCallRetry(t *testing.T) {
service := "test.service"
method := "Test.Method"
address := "10.1.10.1:8080"
var called int
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
called++
if called == 1 {
return errors.InternalServerError("test.error", "retry request")
}
// don't do the call
return nil
}
}
r := mock.NewRegistry()
c := NewClient(
Registry(r),
WrapCall(wrap),
)
c.Options().Selector.Init(selector.Registry(r))
req := c.NewRequest(service, method, nil)
// test calling remote address
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
t.Fatal("call with address error", err)
}
// num calls
if called < c.Options().CallOptions.Retries+1 {
t.Fatal("request not retried")
}
}
func TestCallWrapper(t *testing.T) {
var called bool
id := "test.1"

36
client/rpc_message.go Normal file
View File

@@ -0,0 +1,36 @@
package client
type message struct {
topic string
contentType string
payload interface{}
}
func newMessage(topic string, payload interface{}, contentType string, opts ...MessageOption) Message {
var options MessageOptions
for _, o := range opts {
o(&options)
}
if len(options.ContentType) > 0 {
contentType = options.ContentType
}
return &message{
payload: payload,
topic: topic,
contentType: contentType,
}
}
func (m *message) ContentType() string {
return m.contentType
}
func (m *message) Topic() string {
return m.topic
}
func (m *message) Payload() interface{} {
return m.payload
}

View File

@@ -1,27 +0,0 @@
package client
type rpcPublication struct {
topic string
contentType string
message interface{}
}
func newRpcPublication(topic string, message interface{}, contentType string) Publication {
return &rpcPublication{
message: message,
topic: topic,
contentType: contentType,
}
}
func (r *rpcPublication) ContentType() string {
return r.contentType
}
func (r *rpcPublication) Topic() string {
return r.topic
}
func (r *rpcPublication) Message() interface{} {
return r.message
}

View File

@@ -8,13 +8,18 @@ type rpcRequest struct {
opts RequestOptions
}
func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
func newRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
var opts RequestOptions
for _, o := range reqOpts {
o(&opts)
}
// set the content-type specified
if len(opts.ContentType) > 0 {
contentType = opts.ContentType
}
return &rpcRequest{
service: service,
method: method,

View File

@@ -0,0 +1,23 @@
package client
import (
"testing"
)
func TestRequestOptions(t *testing.T) {
r := newRequest("service", "method", nil, "application/json")
if r.Service() != "service" {
t.Fatalf("expected 'service' got %s", r.Service())
}
if r.Method() != "method" {
t.Fatalf("expected 'method' got %s", r.Method())
}
if r.ContentType() != "application/json" {
t.Fatalf("expected 'method' got %s", r.ContentType())
}
r2 := newRequest("service", "method", nil, "application/json", WithContentType("application/protobuf"))
if r2.ContentType() != "application/protobuf" {
t.Fatalf("expected 'method' got %s", r2.ContentType())
}
}

View File

@@ -1,10 +1,9 @@
package client
import (
"context"
"io"
"sync"
"golang.org/x/net/context"
)
// Implements the streamer interface
@@ -45,7 +44,6 @@ func (r *rpcStream) Send(msg interface{}) error {
}
seq := r.seq
r.seq++
req := request{
Service: r.request.Service(),

17
client/wrapper.go Normal file
View File

@@ -0,0 +1,17 @@
package client
import (
"context"
)
// CallFunc represents the individual call func
type CallFunc func(ctx context.Context, address string, req Request, rsp interface{}, opts CallOptions) error
// CallWrapper is a low level wrapper for the CallFunc
type CallWrapper func(CallFunc) CallFunc
// Wrapper wraps a client and returns a client
type Wrapper func(Client) Client
// StreamWrapper wraps a Stream and returns the equivalent
type StreamWrapper func(Stream) Stream

View File

@@ -70,13 +70,28 @@ var (
cli.IntFlag{
Name: "client_pool_size",
EnvVar: "MICRO_CLIENT_POOL_SIZE",
Usage: "Sets the client connection pool size. Default: 0",
Usage: "Sets the client connection pool size. Default: 1",
},
cli.StringFlag{
Name: "client_pool_ttl",
EnvVar: "MICRO_CLIENT_POOL_TTL",
Usage: "Sets the client connection pool ttl. e.g 500ms, 5s, 1m. Default: 1m",
},
cli.IntFlag{
Name: "register_ttl",
EnvVar: "MICRO_REGISTER_TTL",
Usage: "Register TTL in seconds",
},
cli.IntFlag{
Name: "register_interval",
EnvVar: "MICRO_REGISTER_INTERVAL",
Usage: "Register interval in seconds",
},
cli.StringFlag{
Name: "server",
EnvVar: "MICRO_SERVER",
Usage: "Server for go-micro; rpc",
},
cli.StringFlag{
Name: "server_name",
EnvVar: "MICRO_SERVER_NAME",
@@ -132,11 +147,7 @@ var (
Name: "selector",
EnvVar: "MICRO_SELECTOR",
Usage: "Selector used to pick nodes for querying",
},
cli.StringFlag{
Name: "server",
EnvVar: "MICRO_SERVER",
Usage: "Server for go-micro; rpc",
Value: "cache",
},
cli.StringFlag{
Name: "transport",
@@ -181,7 +192,7 @@ var (
defaultServer = "rpc"
defaultBroker = "http"
defaultRegistry = "consul"
defaultSelector = "default"
defaultSelector = "cache"
defaultTransport = "http"
)
@@ -251,48 +262,40 @@ func (c *cmd) Before(ctx *cli.Context) error {
// Set the client
if name := ctx.String("client"); len(name) > 0 {
if cl, ok := c.opts.Clients[name]; ok {
// only change if we have the client and type differs
if cl, ok := c.opts.Clients[name]; ok && (*c.opts.Client).String() != name {
*c.opts.Client = cl()
}
}
// Set the server
if name := ctx.String("server"); len(name) > 0 {
if s, ok := c.opts.Servers[name]; ok {
// only change if we have the server and type differs
if s, ok := c.opts.Servers[name]; ok && (*c.opts.Server).String() != name {
*c.opts.Server = s()
}
}
// Set the broker
if name := ctx.String("broker"); len(name) > 0 || len(ctx.String("broker_address")) > 0 {
if len(name) == 0 {
name = defaultBroker
}
if b, ok := c.opts.Brokers[name]; ok {
n := b(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
*c.opts.Broker = n
} else {
if name := ctx.String("broker"); len(name) > 0 && (*c.opts.Broker).String() != name {
b, ok := c.opts.Brokers[name]
if !ok {
return fmt.Errorf("Broker %s not found", name)
}
*c.opts.Broker = b()
serverOpts = append(serverOpts, server.Broker(*c.opts.Broker))
clientOpts = append(clientOpts, client.Broker(*c.opts.Broker))
}
// Set the registry
if name := ctx.String("registry"); len(name) > 0 || len(ctx.String("registry_address")) > 0 {
if len(name) == 0 {
name = defaultRegistry
}
if r, ok := c.opts.Registries[name]; ok {
n := r(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
*c.opts.Registry = n
} else {
if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name {
r, ok := c.opts.Registries[name]
if !ok {
return fmt.Errorf("Registry %s not found", name)
}
*c.opts.Registry = r()
serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))
@@ -303,31 +306,26 @@ func (c *cmd) Before(ctx *cli.Context) error {
}
// Set the selector
if name := ctx.String("selector"); len(name) > 0 {
if s, ok := c.opts.Selectors[name]; ok {
n := s(selector.Registry(*c.opts.Registry))
*c.opts.Selector = n
} else {
if name := ctx.String("selector"); len(name) > 0 && (*c.opts.Selector).String() != name {
s, ok := c.opts.Selectors[name]
if !ok {
return fmt.Errorf("Selector %s not found", name)
}
*c.opts.Selector = s(selector.Registry(*c.opts.Registry))
// No server option here. Should there be?
clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))
}
// Set the transport
if name := ctx.String("transport"); len(name) > 0 || len(ctx.String("transport_address")) > 0 {
if len(name) == 0 {
name = defaultTransport
}
if t, ok := c.opts.Transports[name]; ok {
n := t(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
*c.opts.Transport = n
} else {
if name := ctx.String("transport"); len(name) > 0 && (*c.opts.Transport).String() != name {
t, ok := c.opts.Transports[name]
if !ok {
return fmt.Errorf("Transport %s not found", name)
}
*c.opts.Transport = t()
serverOpts = append(serverOpts, server.Transport(*c.opts.Transport))
clientOpts = append(clientOpts, client.Transport(*c.opts.Transport))
}
@@ -348,6 +346,18 @@ func (c *cmd) Before(ctx *cli.Context) error {
serverOpts = append(serverOpts, server.Metadata(metadata))
}
if len(ctx.String("broker_address")) > 0 {
(*c.opts.Broker).Init(broker.Addrs(strings.Split(ctx.String("broker_address"), ",")...))
}
if len(ctx.String("registry_address")) > 0 {
(*c.opts.Registry).Init(registry.Addrs(strings.Split(ctx.String("registry_address"), ",")...))
}
if len(ctx.String("transport_address")) > 0 {
(*c.opts.Transport).Init(transport.Addrs(strings.Split(ctx.String("transport_address"), ",")...))
}
if len(ctx.String("server_name")) > 0 {
serverOpts = append(serverOpts, server.Name(ctx.String("server_name")))
}
@@ -368,8 +378,12 @@ func (c *cmd) Before(ctx *cli.Context) error {
serverOpts = append(serverOpts, server.Advertise(ctx.String("server_advertise")))
}
if ttl := time.Duration(ctx.GlobalInt("register_ttl")); ttl > 0 {
serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second))
}
// client opts
if r := ctx.Int("client_retries"); r > 0 {
if r := ctx.Int("client_retries"); r >= 0 {
clientOpts = append(clientOpts, client.Retries(r))
}

View File

@@ -1,14 +1,14 @@
package cmd
import (
"context"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/transport"
"golang.org/x/net/context"
)
type Options struct {

View File

@@ -54,7 +54,8 @@ func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
case codec.Response:
return j.c.ReadHeader(m)
case codec.Publication:
io.Copy(j.buf, j.rwc)
_, err := io.Copy(j.buf, j.rwc)
return err
default:
return fmt.Errorf("Unrecognised message type: %v", mt)
}

View File

@@ -55,7 +55,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
return err
}
if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush()
if err = flusher.Flush(); err != nil {
return err
}
}
case codec.Response:
c.Lock()
@@ -82,7 +84,9 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
return err
}
if flusher, ok := c.rwc.(flusher); ok {
err = flusher.Flush()
if err = flusher.Flush(); err != nil {
return err
}
}
case codec.Publication:
data, err := proto.Marshal(b.(proto.Message))
@@ -127,7 +131,8 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
m.Id = rtmp.GetSeq()
m.Error = rtmp.GetError()
case codec.Publication:
io.Copy(c.buf, c.rwc)
_, err := io.Copy(c.buf, c.rwc)
return err
default:
return fmt.Errorf("Unrecognised message type: %v", mt)
}

View File

@@ -82,6 +82,16 @@ func NotFound(id, format string, a ...interface{}) error {
}
}
// MethodNotAllowed generates a 405 error.
func MethodNotAllowed(id, format string, a ...interface{}) error {
return &Error{
Id: id,
Code: 405,
Detail: fmt.Sprintf(format, a...),
Status: http.StatusText(405),
}
}
// InternalServerError generates a 500 error.
func InternalServerError(id, format string, a ...interface{}) error {
return &Error{
@@ -91,3 +101,13 @@ func InternalServerError(id, format string, a ...interface{}) error {
Status: http.StatusText(500),
}
}
// Conflict generates a 409 error.
func Conflict(id, format string, a ...interface{}) error {
return &Error{
Id: id,
Code: 409,
Detail: fmt.Sprintf(format, a...),
Status: http.StatusText(409),
}
}

View File

@@ -1,10 +1,10 @@
package micro
import (
"context"
"time"
"github.com/micro/go-micro/server"
"golang.org/x/net/context"
)
type function struct {
@@ -23,7 +23,7 @@ func fnHandlerWrapper(f Function) server.HandlerWrapper {
func fnSubWrapper(f Function) server.SubscriberWrapper {
return func(s server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Publication) error {
return func(ctx context.Context, msg server.Message) error {
defer f.Done()
return s(ctx, msg)
}

View File

@@ -1,13 +1,12 @@
package micro
import (
"context"
"sync"
"testing"
"github.com/micro/go-micro/registry/mock"
proto "github.com/micro/go-micro/server/debug/proto"
"golang.org/x/net/context"
)
func TestFunction(t *testing.T) {
@@ -16,8 +15,8 @@ func TestFunction(t *testing.T) {
// create service
fn := NewFunction(
Name("test.function"),
Registry(mock.NewRegistry()),
Name("test.function"),
AfterStart(func() error {
wg.Done()
return nil
@@ -27,29 +26,35 @@ func TestFunction(t *testing.T) {
// we can't test fn.Init as it parses the command line
// fn.Init()
ch := make(chan error, 2)
go func() {
// wait for start
wg.Wait()
// test call debug
req := fn.Client().NewRequest(
"test.function",
"Debug.Health",
new(proto.HealthRequest),
)
rsp := new(proto.HealthResponse)
err := fn.Client().Call(context.TODO(), req, rsp)
if err != nil {
t.Fatal(err)
}
if rsp.Status != "ok" {
t.Fatalf("function response: %s", rsp.Status)
}
// run service
ch <- fn.Run()
}()
// run service
fn.Run()
// wait for start
wg.Wait()
// test call debug
req := fn.Client().NewRequest(
"test.function",
"Debug.Health",
new(proto.HealthRequest),
)
rsp := new(proto.HealthResponse)
err := fn.Client().Call(context.TODO(), req, rsp)
if err != nil {
t.Fatal(err)
}
if rsp.Status != "ok" {
t.Fatalf("function response: %s", rsp.Status)
}
if err := <-ch; err != nil {
t.Fatal(err)
}
}

View File

@@ -2,10 +2,10 @@
package micro
import (
"context"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/server"
"golang.org/x/net/context"
)
type serviceKey struct{}
@@ -45,7 +45,7 @@ var (
HeaderPrefix = "X-Micro-"
)
// NewService creates an returns a new Service based on the packages within.
// NewService creates and returns a new Service based on the packages within.
func NewService(opts ...Option) Service {
return newService(opts...)
}
@@ -81,5 +81,5 @@ func RegisterHandler(s server.Server, h interface{}, opts ...server.HandlerOptio
// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h))
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}

View File

@@ -2,7 +2,7 @@
package metadata
import (
"golang.org/x/net/context"
"context"
)
type metaKey struct{}

View File

@@ -1,9 +1,8 @@
package metadata
import (
"context"
"testing"
"golang.org/x/net/context"
)
func TestMetadataContext(t *testing.T) {

View File

@@ -1,6 +1,7 @@
package micro
import (
"context"
"time"
"github.com/micro/cli"
@@ -11,8 +12,6 @@ import (
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/transport"
"golang.org/x/net/context"
)
type Options struct {
@@ -101,6 +100,15 @@ func Registry(r registry.Registry) Option {
o.Server.Init(server.Registry(r))
// Update Selector
o.Client.Options().Selector.Init(selector.Registry(r))
// Update Broker
o.Broker.Init(broker.Registry(r))
}
}
// Selector sets the selector for the service client
func Selector(s selector.Selector) Option {
return func(o *Options) {
o.Client.Init(client.Selector(s))
}
}

View File

@@ -1,8 +1,9 @@
package micro
import (
"context"
"github.com/micro/go-micro/client"
"golang.org/x/net/context"
)
type publisher struct {
@@ -11,5 +12,5 @@ type publisher struct {
}
func (p *publisher) Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error {
return p.c.Publish(ctx, p.c.NewPublication(p.topic, msg))
return p.c.Publish(ctx, p.c.NewMessage(p.topic, msg))
}

View File

@@ -1,11 +1,23 @@
package consul
import (
"context"
"time"
consul "github.com/hashicorp/consul/api"
"github.com/micro/go-micro/registry"
"golang.org/x/net/context"
)
// Connect specifies services should be registered as Consul Connect services
func Connect() registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, "consul_connect", true)
}
}
func Config(c *consul.Config) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
@@ -14,3 +26,22 @@ func Config(c *consul.Config) registry.Option {
o.Context = context.WithValue(o.Context, "consul_config", c)
}
}
//
// TCPCheck will tell the service provider to check the service address
// and port every `t` interval. It will enabled only if `t` is greater than 0.
// See `TCP + Interval` for more information [1].
//
// [1] https://www.consul.io/docs/agent/checks.html
//
func TCPCheck(t time.Duration) registry.Option {
return func(o *registry.Options) {
if t <= time.Duration(0) {
return
}
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, "consul_tcp_check", t)
}
}

View File

@@ -17,12 +17,28 @@ import (
type consulRegistry struct {
Address string
Client *consul.Client
Options Options
opts Options
// connect enabled
connect bool
sync.Mutex
register map[string]uint64
}
func getDeregisterTTL(t time.Duration) time.Duration {
// splay slightly for the watcher?
splay := time.Second * 5
deregTTL := t + splay
// consul has a minimum timeout on deregistration of 1 minute.
if t < time.Minute {
deregTTL = time.Minute + splay
}
return deregTTL
}
func newTransport(config *tls.Config) *http.Transport {
if config == nil {
config = &tls.Config{
@@ -45,32 +61,31 @@ func newTransport(config *tls.Config) *http.Transport {
return t
}
func newConsulRegistry(opts ...Option) Registry {
var options Options
func configure(c *consulRegistry, opts ...Option) {
// set opts
for _, o := range opts {
o(&options)
o(&c.opts)
}
// use default config
config := consul.DefaultConfig()
if options.Context != nil {
if c.opts.Context != nil {
// Use the consul config passed in the options, if available
if c, ok := options.Context.Value("consul_config").(*consul.Config); ok {
config = c
if co, ok := c.opts.Context.Value("consul_config").(*consul.Config); ok {
config = co
}
if cn, ok := c.opts.Context.Value("consul_connect").(bool); ok {
c.connect = cn
}
}
// set timeout
if options.Timeout > 0 {
config.HttpClient.Timeout = options.Timeout
}
// check if there are any addrs
if len(options.Addrs) > 0 {
addr, port, err := net.SplitHostPort(options.Addrs[0])
if len(c.opts.Addrs) > 0 {
addr, port, err := net.SplitHostPort(c.opts.Addrs[0])
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
addr = options.Addrs[0]
addr = c.opts.Addrs[0]
config.Address = fmt.Sprintf("%s:%s", addr, port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
@@ -78,25 +93,43 @@ func newConsulRegistry(opts ...Option) Registry {
}
// requires secure connection?
if options.Secure || options.TLSConfig != nil {
if c.opts.Secure || c.opts.TLSConfig != nil {
if config.HttpClient == nil {
config.HttpClient = new(http.Client)
}
config.Scheme = "https"
// We're going to support InsecureSkipVerify
config.HttpClient.Transport = newTransport(options.TLSConfig)
config.HttpClient.Transport = newTransport(c.opts.TLSConfig)
}
// set timeout
if c.opts.Timeout > 0 {
config.HttpClient.Timeout = c.opts.Timeout
}
// create the client
client, _ := consul.NewClient(config)
// set address/client
c.Address = config.Address
c.Client = client
}
func newConsulRegistry(opts ...Option) Registry {
cr := &consulRegistry{
Address: config.Address,
Client: client,
Options: options,
opts: Options{},
register: make(map[string]uint64),
}
configure(cr, opts...)
return cr
}
func (c *consulRegistry) Init(opts ...Option) error {
configure(c, opts...)
return nil
}
func (c *consulRegistry) Deregister(s *Service) error {
if len(s.Nodes) == 0 {
return errors.New("Require at least one node")
@@ -116,11 +149,21 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
return errors.New("Require at least one node")
}
var regTCPCheck bool
var regInterval time.Duration
var options RegisterOptions
for _, o := range opts {
o(&options)
}
if c.opts.Context != nil {
if tcpCheckInterval, ok := c.opts.Context.Value("consul_tcp_check").(time.Duration); ok {
regTCPCheck = true
regInterval = tcpCheckInterval
}
}
// create hash of service; uint64
h, err := hash.Hash(s, nil)
if err != nil {
@@ -137,10 +180,21 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
// if it's already registered and matches then just pass the check
if ok && v == h {
// if the err is nil we're all good, bail out
// if not, we don't know what the state is, so full re-register
if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil {
return nil
if options.TTL == time.Duration(0) {
services, _, err := c.Client.Health().Checks(s.Name, nil)
if err == nil {
for _, v := range services {
if v.ServiceID == node.Id {
return nil
}
}
}
} else {
// if the err is nil we're all good, bail out
// if not, we don't know what the state is, so full re-register
if err := c.Client.Agent().PassTTL("service:"+node.Id, ""); err == nil {
return nil
}
}
}
@@ -151,31 +205,43 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
var check *consul.AgentServiceCheck
// if the TTL is greater than 0 create an associated check
if options.TTL > time.Duration(0) {
// splay slightly for the watcher?
splay := time.Second * 5
deregTTL := options.TTL + splay
// consul has a minimum timeout on deregistration of 1 minute.
if options.TTL < time.Minute {
deregTTL = time.Minute + splay
}
if regTCPCheck {
deregTTL := getDeregisterTTL(regInterval)
check = &consul.AgentServiceCheck{
TTL: fmt.Sprintf("%v", options.TTL),
TCP: fmt.Sprintf("%s:%d", node.Address, node.Port),
Interval: fmt.Sprintf("%v", regInterval),
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
}
// if the TTL is greater than 0 create an associated check
} else if options.TTL > time.Duration(0) {
deregTTL := getDeregisterTTL(options.TTL)
check = &consul.AgentServiceCheck{
TTL: fmt.Sprintf("%v", options.TTL),
DeregisterCriticalServiceAfter: fmt.Sprintf("%v", deregTTL),
}
}
// register the service
if err := c.Client.Agent().ServiceRegister(&consul.AgentServiceRegistration{
asr := &consul.AgentServiceRegistration{
ID: node.Id,
Name: s.Name,
Tags: tags,
Port: node.Port,
Address: node.Address,
Check: check,
}); err != nil {
}
// Specify consul connect
if c.connect {
asr.Connect = &consul.AgentServiceConnect{
Native: true,
}
}
if err := c.Client.Agent().ServiceRegister(asr); err != nil {
return err
}
@@ -194,7 +260,15 @@ func (c *consulRegistry) Register(s *Service, opts ...RegisterOption) error {
}
func (c *consulRegistry) GetService(name string) ([]*Service, error) {
rsp, _, err := c.Client.Health().Service(name, "", false, nil)
var rsp []*consul.ServiceEntry
var err error
// if we're connect enabled only get connect services
if c.connect {
rsp, _, err = c.Client.Health().Connect(name, "", false, nil)
} else {
rsp, _, err = c.Client.Health().Service(name, "", false, nil)
}
if err != nil {
return nil, err
}
@@ -207,18 +281,18 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
}
// version is now a tag
version, found := decodeVersion(s.Service.Tags)
version, _ := decodeVersion(s.Service.Tags)
// service ID is now the node id
id := s.Service.ID
// key is always the version
key := version
// address is service address
address := s.Service.Address
// if we can't get the version we bail
// use old the old ways
if !found {
continue
// use node address
if len(address) == 0 {
address = s.Node.Address
}
svc, ok := serviceMap[key]
@@ -232,6 +306,7 @@ func (c *consulRegistry) GetService(name string) ([]*Service, error) {
}
var del bool
for _, check := range s.Checks {
// delete the node if the status is critical
if check.Status == "critical" {
@@ -275,10 +350,14 @@ func (c *consulRegistry) ListServices() ([]*Service, error) {
return services, nil
}
func (c *consulRegistry) Watch() (Watcher, error) {
return newConsulWatcher(c)
func (c *consulRegistry) Watch(opts ...WatchOption) (Watcher, error) {
return newConsulWatcher(c, opts...)
}
func (c *consulRegistry) String() string {
return "consul"
}
func (c *consulRegistry) Options() Options {
return c.opts
}

View File

@@ -41,7 +41,7 @@ func newMockServer(rg *mockRegistry, l net.Listener) error {
}
func newConsulTestRegistry(r *mockRegistry) (*consulRegistry, func()) {
l, err := net.Listen("tcp", ":0")
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
// blurgh?!!
panic(err.Error())
@@ -104,7 +104,11 @@ func TestConsul_GetService_WithHealthyServiceNodes(t *testing.T) {
})
defer cl()
svc, _ := cr.GetService("service-name")
svc, err := cr.GetService("service-name")
if err != nil {
t.Fatal("Unexpected error", err)
}
if exp, act := 1, len(svc); exp != act {
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
}
@@ -140,7 +144,11 @@ func TestConsul_GetService_WithUnhealthyServiceNode(t *testing.T) {
})
defer cl()
svc, _ := cr.GetService("service-name")
svc, err := cr.GetService("service-name")
if err != nil {
t.Fatal("Unexpected error", err)
}
if exp, act := 1, len(svc); exp != act {
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
}
@@ -176,7 +184,11 @@ func TestConsul_GetService_WithUnhealthyServiceNodes(t *testing.T) {
})
defer cl()
svc, _ := cr.GetService("service-name")
svc, err := cr.GetService("service-name")
if err != nil {
t.Fatal("Unexpected error", err)
}
if exp, act := 1, len(svc); exp != act {
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
}

View File

@@ -10,6 +10,7 @@ import (
type consulWatcher struct {
r *consulRegistry
wo WatchOptions
wp *watch.Plan
watchers map[string]*watch.Plan
@@ -20,9 +21,15 @@ type consulWatcher struct {
services map[string][]*Service
}
func newConsulWatcher(cr *consulRegistry) (Watcher, error) {
func newConsulWatcher(cr *consulRegistry, opts ...WatchOption) (Watcher, error) {
var wo WatchOptions
for _, o := range opts {
o(&wo)
}
cw := &consulWatcher{
r: cr,
wo: wo,
exit: make(chan bool),
next: make(chan *Result, 10),
watchers: make(map[string]*watch.Plan),
@@ -53,7 +60,7 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
for _, e := range entries {
serviceName = e.Service.Service
// version is now a tag
version, found := decodeVersion(e.Service.Tags)
version, _ := decodeVersion(e.Service.Tags)
// service ID is now the node id
id := e.Service.ID
// key is always the version
@@ -61,9 +68,9 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
// address is service address
address := e.Service.Address
// if we can't get the version we bail
if !found {
continue
// use node address
if len(address) == 0 {
address = e.Node.Address
}
svc, ok := serviceMap[key]
@@ -185,6 +192,12 @@ func (cw *consulWatcher) handle(idx uint64, data interface{}) {
// add new watchers
for service, _ := range services {
// Filter on watch options
// wo.Service: Only watch services we care about
if len(cw.wo.Service) > 0 && service != cw.wo.Service {
continue
}
if _, ok := cw.watchers[service]; ok {
continue
}

View File

@@ -49,6 +49,17 @@ func newRegistry(opts ...registry.Option) registry.Registry {
}
}
func (m *mdnsRegistry) Init(opts ...registry.Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}
func (m *mdnsRegistry) Options() registry.Options {
return m.opts
}
func (m *mdnsRegistry) Register(service *registry.Service, opts ...registry.RegisterOption) error {
m.Lock()
defer m.Unlock()
@@ -297,8 +308,14 @@ func (m *mdnsRegistry) ListServices() ([]*registry.Service, error) {
return services, nil
}
func (m *mdnsRegistry) Watch() (registry.Watcher, error) {
func (m *mdnsRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
var wo registry.WatchOptions
for _, o := range opts {
o(&wo)
}
md := &mdnsWatcher{
wo: wo,
ch: make(chan *mdns.ServiceEntry, 32),
exit: make(chan struct{}),
}

View File

@@ -9,6 +9,7 @@ import (
)
type mdnsWatcher struct {
wo registry.WatchOptions
ch chan *mdns.ServiceEntry
exit chan struct{}
}
@@ -26,6 +27,12 @@ func (m *mdnsWatcher) Next() (*registry.Result, error) {
continue
}
// Filter watch options
// wo.Service: Only keep services we care about
if len(m.wo.Service) > 0 && txt.Service != m.wo.Service {
continue
}
var action string
if e.TTL == 0 {

View File

@@ -87,15 +87,27 @@ func (m *mockRegistry) Deregister(s *registry.Service) error {
return nil
}
func (m *mockRegistry) Watch() (registry.Watcher, error) {
return &mockWatcher{exit: make(chan bool)}, nil
func (m *mockRegistry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
var wopts registry.WatchOptions
for _, o := range opts {
o(&wopts)
}
return &mockWatcher{exit: make(chan bool), opts: wopts}, nil
}
func (m *mockRegistry) String() string {
return "mock"
}
func NewRegistry() registry.Registry {
func (m *mockRegistry) Init(opts ...registry.Option) error {
return nil
}
func (m *mockRegistry) Options() registry.Options {
return registry.Options{}
}
func NewRegistry(opts ...registry.Options) registry.Registry {
m := &mockRegistry{Services: make(map[string][]*registry.Service)}
m.init()
return m

View File

@@ -8,6 +8,7 @@ import (
type mockWatcher struct {
exit chan bool
opts registry.WatchOptions
}
func (m *mockWatcher) Next() (*registry.Result, error) {

View File

@@ -1,10 +1,9 @@
package registry
import (
"context"
"crypto/tls"
"time"
"golang.org/x/net/context"
)
type Options struct {
@@ -25,6 +24,15 @@ type RegisterOptions struct {
Context context.Context
}
type WatchOptions struct {
// Specify a service to watch
// If blank, the watch is for all services
Service string
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
// Addrs is the registry addresses to use
func Addrs(addrs ...string) Option {
return func(o *Options) {
@@ -57,3 +65,10 @@ func RegisterTTL(t time.Duration) RegisterOption {
o.TTL = t
}
}
// Watch a service
func WatchService(name string) WatchOption {
return func(o *WatchOptions) {
o.Service = name
}
}

View File

@@ -9,11 +9,13 @@ import (
// and an abstraction over varying implementations
// {consul, etcd, zookeeper, ...}
type Registry interface {
Init(...Option) error
Options() Options
Register(*Service, ...RegisterOption) error
Deregister(*Service) error
GetService(string) ([]*Service, error)
ListServices() ([]*Service, error)
Watch() (Watcher, error)
Watch(...WatchOption) (Watcher, error)
String() string
}
@@ -21,6 +23,8 @@ type Option func(*Options)
type RegisterOption func(*RegisterOptions)
type WatchOption func(*WatchOptions)
var (
DefaultRegistry = newConsulRegistry()
@@ -52,8 +56,8 @@ func ListServices() ([]*Service, error) {
}
// Watch returns a watcher which allows you to track updates to the registry.
func Watch() (Watcher, error) {
return DefaultRegistry.Watch()
func Watch(opts ...WatchOption) (Watcher, error) {
return DefaultRegistry.Watch(opts...)
}
func String() string {

View File

@@ -1,3 +1,4 @@
// Package cache is a caching selector. It uses the registry watcher.
package cache
import (
@@ -9,11 +10,6 @@ import (
"github.com/micro/go-micro/selector"
)
/*
Cache selector is a selector which uses the registry.Watcher to Cache service entries.
It defaults to a TTL for 1 minute and causes a cache miss on the next request.
*/
type cacheSelector struct {
so selector.Options
ttl time.Duration
@@ -23,6 +19,8 @@ type cacheSelector struct {
cache map[string][]*registry.Service
ttls map[string]time.Time
watched map[string]bool
// used to close or reload watcher
reload chan bool
exit chan bool
@@ -86,30 +84,59 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
c.Lock()
defer c.Unlock()
// watch service if not watched
if _, ok := c.watched[service]; !ok {
go c.run(service)
c.watched[service] = true
}
// get does the actual request for a service
// it also caches it
get := func(service string) ([]*registry.Service, error) {
// ask the registry
services, err := c.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// cache results
c.set(service, c.cp(services))
return services, nil
}
// check the cache first
services, ok := c.cache[service]
// cache miss or no services
if !ok || len(services) == 0 {
return get(service)
}
// got cache but lets check ttl
ttl, kk := c.ttls[service]
// got results, copy and return
if ok && len(services) > 0 {
// only return if its less than the ttl
if kk && time.Since(ttl) < c.ttl {
return c.cp(services), nil
}
// within ttl so return cache
if kk && time.Since(ttl) < c.ttl {
return c.cp(services), nil
}
// cache miss or ttl expired
// expired entry so get service
services, err := get(service)
// now ask the registry
services, err := c.so.Registry.GetService(service)
if err != nil {
return nil, err
// no error then return error
if err == nil {
return services, nil
}
// we didn't have any results so cache
c.cache[service] = c.cp(services)
c.ttls[service] = time.Now().Add(c.ttl)
return services, nil
// not found error then return
if err == registry.ErrNotFound {
return nil, selector.ErrNotFound
}
// other error
// return expired cache as last resort
return c.cp(services), nil
}
func (c *cacheSelector) set(service string, services []*registry.Service) {
@@ -229,9 +256,7 @@ func (c *cacheSelector) update(res *registry.Result) {
// it creates a new watcher if there's a problem
// reloads the watcher if Init is called
// and returns when Close is called
func (c *cacheSelector) run() {
go c.tick()
func (c *cacheSelector) run(name string) {
for {
// exit early if already dead
if c.quit() {
@@ -239,8 +264,13 @@ func (c *cacheSelector) run() {
}
// create new watcher
w, err := c.so.Registry.Watch()
w, err := c.so.Registry.Watch(
registry.WatchService(name),
)
if err != nil {
if c.quit() {
return
}
log.Log(err)
time.Sleep(time.Second)
continue
@@ -248,33 +278,15 @@ func (c *cacheSelector) run() {
// watch for events
if err := c.watch(w); err != nil {
if c.quit() {
return
}
log.Log(err)
continue
}
}
}
// check cache and expire on each tick
func (c *cacheSelector) tick() {
t := time.NewTicker(time.Minute)
for {
select {
case <-t.C:
c.Lock()
for service, expiry := range c.ttls {
if d := time.Since(expiry); d > c.ttl {
// TODO: maybe refresh the cache rather than blowing it away
c.del(service)
}
}
c.Unlock()
case <-c.exit:
return
}
}
}
// watch loops the next event and calls update
// it returns if there's an error
func (c *cacheSelector) watch(w registry.Watcher) error {
@@ -354,17 +366,16 @@ func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (s
}
func (c *cacheSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (c *cacheSelector) Reset(service string) {
return
}
// Close stops the watcher and destroys the cache
func (c *cacheSelector) Close() error {
c.Lock()
c.cache = make(map[string][]*registry.Service)
c.watched = make(map[string]bool)
c.Unlock()
select {
@@ -401,15 +412,13 @@ func NewSelector(opts ...selector.Option) selector.Selector {
}
}
c := &cacheSelector{
so: sopts,
ttl: ttl,
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1),
exit: make(chan bool),
return &cacheSelector{
so: sopts,
ttl: ttl,
watched: make(map[string]bool),
cache: make(map[string][]*registry.Service),
ttls: make(map[string]time.Time),
reload: make(chan bool, 1),
exit: make(chan bool),
}
go c.run()
return c
}

View File

@@ -1,10 +1,10 @@
package cache
import (
"context"
"time"
"github.com/micro/go-micro/selector"
"golang.org/x/net/context"
)
type ttlKey struct{}

View File

@@ -1,9 +1,6 @@
package selector
import (
"math/rand"
"time"
"github.com/micro/go-micro/registry"
)
@@ -11,10 +8,6 @@ type defaultSelector struct {
so Options
}
func init() {
rand.Seed(time.Now().Unix())
}
func (r *defaultSelector) Init(opts ...Option) error {
for _, o := range opts {
o(&r.so)
@@ -55,11 +48,9 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er
}
func (r *defaultSelector) Mark(service string, node *registry.Node, err error) {
return
}
func (r *defaultSelector) Reset(service string) {
return
}
func (r *defaultSelector) Close() error {

View File

@@ -80,7 +80,7 @@ func TestFilterEndpoint(t *testing.T) {
}
}
if seen == false && data.count > 0 {
if !seen && data.count > 0 {
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
}
}
@@ -232,7 +232,7 @@ func TestFilterVersion(t *testing.T) {
seen = true
}
if seen == false && data.count > 0 {
if !seen && data.count > 0 {
t.Fatalf("Expected %d services but seen is %t; result %+v", data.count, seen, services)
}
}

View File

@@ -1,9 +1,9 @@
package selector
import (
"github.com/micro/go-micro/registry"
"context"
"golang.org/x/net/context"
"github.com/micro/go-micro/registry"
)
type Options struct {

View File

@@ -1,7 +1,7 @@
package server
import (
"golang.org/x/net/context"
"context"
)
type serverKey struct{}

View File

@@ -1,12 +1,11 @@
package debug
import (
"context"
"runtime"
"time"
proto "github.com/micro/go-micro/server/debug/proto"
"golang.org/x/net/context"
)
// The debug handler represents an internal server handler

View File

@@ -1,11 +1,11 @@
package server
import (
"context"
"reflect"
"testing"
"github.com/micro/go-micro/registry"
"golang.org/x/net/context"
)
type testHandler struct{}

View File

@@ -1,6 +1,7 @@
package server
import (
"context"
"time"
"github.com/micro/go-micro/broker"
@@ -8,8 +9,6 @@ import (
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server/debug"
"github.com/micro/go-micro/transport"
"golang.org/x/net/context"
)
type Options struct {

View File

@@ -7,6 +7,7 @@ import (
"github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/transport"
"github.com/pkg/errors"
)
type rpcPlusCodec struct {
@@ -96,7 +97,11 @@ func (c *rpcPlusCodec) WriteResponse(r *response, body interface{}, last bool) e
Header: map[string]string{},
}
if err := c.codec.Write(m, body); err != nil {
return err
c.buf.wbuf.Reset()
m.Error = errors.Wrapf(err, "Unable to encode body").Error()
if err := c.codec.Write(m, nil); err != nil {
return err
}
}
m.Header["Content-Type"] = c.req.Header["Content-Type"]

110
server/rpc_codec_test.go Normal file
View File

@@ -0,0 +1,110 @@
package server
import (
"bytes"
"errors"
"testing"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/transport"
)
// testCodec is a dummy codec that only knows how to encode nil bodies
type testCodec struct {
buf *bytes.Buffer
}
type testSocket struct {
local string
remote string
}
// TestCodecWriteError simulates what happens when a codec is unable
// to encode a message (e.g. a missing branch of an "oneof" message in
// protobufs)
//
// We expect an error to be sent to the socket. Previously the socket
// would remain open with no bytes sent, leading to client-side
// timeouts.
func TestCodecWriteError(t *testing.T) {
socket := testSocket{}
message := transport.Message{
Header: map[string]string{},
Body: []byte{},
}
rwc := readWriteCloser{
rbuf: new(bytes.Buffer),
wbuf: new(bytes.Buffer),
}
c := rpcPlusCodec{
buf: &rwc,
codec: &testCodec{
buf: rwc.wbuf,
},
req: &message,
socket: socket,
}
err := c.WriteResponse(&response{
ServiceMethod: "Service.Method",
Seq: 0,
Error: "",
next: nil,
}, "body", false)
if err != nil {
t.Fatalf(`Expected WriteResponse to fail; got "%+v" instead`, err)
}
const expectedError = "Unable to encode body: simulating a codec write failure"
actualError := rwc.wbuf.String()
if actualError != expectedError {
t.Fatalf(`Expected error "%+v" in the write buffer, got "%+v" instead`, expectedError, actualError)
}
}
func (c *testCodec) ReadHeader(message *codec.Message, typ codec.MessageType) error {
return nil
}
func (c *testCodec) ReadBody(dest interface{}) error {
return nil
}
func (c *testCodec) Write(message *codec.Message, dest interface{}) error {
if dest != nil {
return errors.New("simulating a codec write failure")
}
c.buf.Write([]byte(message.Error))
return nil
}
func (c *testCodec) Close() error {
return nil
}
func (c *testCodec) String() string {
return "string"
}
func (s testSocket) Local() string {
return s.local
}
func (s testSocket) Remote() string {
return s.remote
}
func (s testSocket) Recv(message *transport.Message) error {
return nil
}
func (s testSocket) Send(message *transport.Message) error {
return nil
}
func (s testSocket) Close() error {
return nil
}

View File

@@ -8,10 +8,10 @@ type rpcRequest struct {
stream bool
}
type rpcPublication struct {
type rpcMessage struct {
topic string
contentType string
message interface{}
payload interface{}
}
func (r *rpcRequest) ContentType() string {
@@ -34,14 +34,14 @@ func (r *rpcRequest) Stream() bool {
return r.stream
}
func (r *rpcPublication) ContentType() string {
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcPublication) Topic() string {
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcPublication) Message() interface{} {
return r.message
func (r *rpcMessage) Payload() interface{} {
return r.payload
}

View File

@@ -1,8 +1,10 @@
package server
import (
"context"
"fmt"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
@@ -15,9 +17,7 @@ import (
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/transport"
"github.com/micro/misc/lib/addr"
"golang.org/x/net/context"
"github.com/micro/util/go/lib/addr"
)
type rpcServer struct {
@@ -66,6 +66,9 @@ func (s *rpcServer) accept(sock transport.Socket) {
return
}
// add to wait group
s.wg.Add(1)
// we use this Timeout header to set a server deadline
to := msg.Header["Timeout"]
// we use this Content-Type header to identify the codec needed
@@ -80,6 +83,7 @@ func (s *rpcServer) accept(sock transport.Socket) {
},
Body: []byte(err.Error()),
})
s.wg.Done()
return
}
@@ -93,6 +97,10 @@ func (s *rpcServer) accept(sock transport.Socket) {
delete(hdr, "Content-Type")
delete(hdr, "Timeout")
// set local/remote ips
hdr["Local"] = sock.Local()
hdr["Remote"] = sock.Remote()
ctx := metadata.NewContext(context.Background(), hdr)
// set the timeout if we have it
@@ -102,17 +110,12 @@ func (s *rpcServer) accept(sock transport.Socket) {
}
}
// add to wait group
s.wg.Add(1)
// TODO: needs better error handling
if err := s.rpc.serveRequest(ctx, codec, ct); err != nil {
log.Logf("Unexpected error serving request, closing socket: %v", err)
s.wg.Done()
log.Logf("Unexpected error serving request, closing socket: %v", err)
return
}
// finish request
s.wg.Done()
}
}
@@ -184,12 +187,12 @@ func (s *rpcServer) Subscribe(sb Subscriber) error {
}
s.Lock()
defer s.Unlock()
_, ok = s.subscribers[sub]
if ok {
return fmt.Errorf("subscriber %v already exists", s)
}
s.subscribers[sub] = nil
s.Unlock()
return nil
}
@@ -235,19 +238,34 @@ func (s *rpcServer) Register() error {
node.Metadata["registry"] = config.Registry.String()
s.RLock()
var endpoints []*registry.Endpoint
for _, e := range s.handlers {
// Maps are ordered randomly, sort the keys for consistency
var handlerList []string
for n, e := range s.handlers {
// Only advertise non internal handlers
if !e.Options().Internal {
endpoints = append(endpoints, e.Endpoints()...)
handlerList = append(handlerList, n)
}
}
for e, _ := range s.subscribers {
sort.Strings(handlerList)
var subscriberList []*subscriber
for e := range s.subscribers {
// Only advertise non internal subscribers
if !e.Options().Internal {
endpoints = append(endpoints, e.Endpoints()...)
subscriberList = append(subscriberList, e)
}
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
var endpoints []*registry.Endpoint
for _, n := range handlerList {
endpoints = append(endpoints, s.handlers[n].Endpoints()...)
}
for _, e := range subscriberList {
endpoints = append(endpoints, e.Endpoints()...)
}
s.RUnlock()
service := &registry.Service{
@@ -374,14 +392,40 @@ func (s *rpcServer) Start() error {
log.Logf("Listening on %s", ts.Addr())
s.Lock()
// swap address
addr := s.opts.Address
s.opts.Address = ts.Addr()
s.Unlock()
go ts.Accept(s.accept)
exit := make(chan bool, 1)
go func() {
for {
err := ts.Accept(s.accept)
// check if we're supposed to exit
select {
case <-exit:
return
default:
}
// check the error and backoff
if err != nil {
log.Logf("Accept error: %v", err)
time.Sleep(time.Second)
continue
}
// no error just exit
return
}
}()
go func() {
// wait for exit
ch := <-s.exit
exit <- true
// wait for requests to finish
if wait(s.opts.Context) {
@@ -393,6 +437,11 @@ func (s *rpcServer) Start() error {
// disconnect the broker
config.Broker.Disconnect()
s.Lock()
// swap back address
s.opts.Address = addr
s.Unlock()
}()
// TODO: subscribe to cruft

View File

@@ -7,6 +7,7 @@ package server
// Meh, we need to get rid of this shit
import (
"context"
"errors"
"io"
"reflect"
@@ -16,7 +17,6 @@ import (
"unicode/utf8"
"github.com/micro/go-log"
"golang.org/x/net/context"
)
var (
@@ -119,9 +119,9 @@ func prepareMethod(method reflect.Method) *methodType {
if stream {
// check stream type
streamType := reflect.TypeOf((*Streamer)(nil)).Elem()
streamType := reflect.TypeOf((*Stream)(nil)).Elem()
if !argType.Implements(streamType) {
log.Log(mname, "argument does not implement Streamer interface:", argType)
log.Log(mname, "argument does not implement Stream interface:", argType)
return nil
}
} else {
@@ -199,7 +199,7 @@ func (server *server) register(rcvr interface{}) error {
return nil
}
func (server *server) sendResponse(sending *sync.Mutex, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) {
func (server *server) sendResponse(sending sync.Locker, req *request, reply interface{}, codec serverCodec, errmsg string, last bool) (err error) {
resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod
@@ -249,7 +249,10 @@ func (s *service) call(ctx context.Context, server *server, sending *sync.Mutex,
errmsg = err.Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg, true)
err = server.sendResponse(sending, req, replyv.Interface(), codec, errmsg, true)
if err != nil {
log.Log("rpc call: unable to send response: ", err)
}
server.freeRequest(req)
return
}

View File

@@ -1,9 +1,8 @@
package server
import (
"context"
"sync"
"golang.org/x/net/context"
)
// Implements the Streamer interface

View File

@@ -2,13 +2,13 @@
package server
import (
"context"
"os"
"os/signal"
"syscall"
"github.com/micro/go-log"
"github.com/pborman/uuid"
"golang.org/x/net/context"
)
type Server interface {
@@ -25,9 +25,9 @@ type Server interface {
String() string
}
type Publication interface {
type Message interface {
Topic() string
Message() interface{}
Payload() interface{}
ContentType() string
}
@@ -40,11 +40,11 @@ type Request interface {
Stream() bool
}
// Streamer represents a stream established with a client.
// Stream represents a stream established with a client.
// A stream can be bidirectional which is indicated by the request.
// The last error will be left in Error().
// EOF indicated end of the stream.
type Streamer interface {
type Stream interface {
Context() context.Context
Request() Request
Send(interface{}) error

View File

@@ -2,14 +2,15 @@ package server
import (
"bytes"
"context"
"fmt"
"reflect"
"strings"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
"golang.org/x/net/context"
)
const (
@@ -176,6 +177,8 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
delete(hdr, "Content-Type")
ctx := metadata.NewContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
@@ -204,7 +207,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
return err
}
fn := func(ctx context.Context, msg Publication) error {
fn := func(ctx context.Context, msg Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
@@ -213,7 +216,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Message()))
vals = append(vals, reflect.ValueOf(msg.Payload()))
returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil {
@@ -228,14 +231,27 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
s.wg.Add(1)
go func() {
fn(ctx, &rpcPublication{
defer s.wg.Done()
results <- fn(ctx, &rpcMessage{
topic: sb.topic,
contentType: ct,
message: req.Interface(),
payload: req.Interface(),
})
s.wg.Done()
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if err := <-results; err != nil {
errors = append(errors, err.Error())
}
}
if len(errors) > 0 {
return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return nil
}
}

View File

@@ -1,7 +1,7 @@
package server
import (
"golang.org/x/net/context"
"context"
)
// HandlerFunc represents a single method of a handler. It's used primarily
@@ -12,7 +12,7 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
// SubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete
// publication message.
type SubscriberFunc func(ctx context.Context, msg Publication) error
type SubscriberFunc func(ctx context.Context, msg Message) error
// HandlerWrapper wraps the HandlerFunc and returns the equivalent
type HandlerWrapper func(HandlerFunc) HandlerFunc
@@ -20,8 +20,8 @@ type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
// StreamerWrapper wraps a Streamer interface and returns the equivalent.
// StreamWrapper wraps a Stream interface and returns the equivalent.
// Because streams exist for the lifetime of a method invocation this
// is a convenient way to wrap a Stream as its in use for trace, monitoring,
// metrics, etc.
type StreamerWrapper func(Streamer) Streamer
type StreamWrapper func(Stream) Stream

View File

@@ -3,9 +3,12 @@ package micro
import (
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/micro/cli"
"github.com/micro/go-log"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/metadata"
@@ -15,7 +18,7 @@ import (
type service struct {
opts Options
init chan bool
once sync.Once
}
func newService(opts ...Option) Service {
@@ -30,7 +33,6 @@ func newService(opts ...Option) Service {
return &service{
opts: options,
init: make(chan bool),
}
}
@@ -44,7 +46,10 @@ func (s *service) run(exit chan bool) {
for {
select {
case <-t.C:
s.opts.Server.Register()
err := s.opts.Server.Register()
if err != nil {
log.Log("service run Server.Register error: ", err)
}
case <-exit:
t.Stop()
return
@@ -56,32 +61,35 @@ func (s *service) run(exit chan bool) {
// which parses command line flags. cmd.Init is only called
// on first Init.
func (s *service) Init(opts ...Option) {
// If <-s.init blocks, Init has not been called yet
// so we can call cmd.Init once.
select {
case <-s.init:
// only process options
for _, o := range opts {
o(&s.opts)
}
default:
// close init
close(s.init)
// process options
for _, o := range opts {
o(&s.opts)
}
// process options
for _, o := range opts {
o(&s.opts)
s.once.Do(func() {
// save user action
action := s.opts.Cmd.App().Action
// set service action
s.opts.Cmd.App().Action = func(c *cli.Context) {
// set register interval
if i := time.Duration(c.GlobalInt("register_interval")); i > 0 {
s.opts.RegisterInterval = i * time.Second
}
// user action
action(c)
}
// Initialise the command flags, overriding new service
s.opts.Cmd.Init(
_ = s.opts.Cmd.Init(
cmd.Broker(&s.opts.Broker),
cmd.Registry(&s.opts.Registry),
cmd.Transport(&s.opts.Transport),
cmd.Client(&s.opts.Client),
cmd.Server(&s.opts.Server),
)
}
})
}
func (s *service) Options() Options {
@@ -160,7 +168,7 @@ func (s *service) Run() error {
go s.run(ex)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
select {
// wait on kill signal
@@ -172,9 +180,5 @@ func (s *service) Run() error {
// exit reg loop
close(ex)
if err := s.Stop(); err != nil {
return err
}
return nil
return s.Stop()
}

View File

@@ -1,13 +1,12 @@
package micro
import (
"context"
"sync"
"testing"
"github.com/micro/go-micro/registry/mock"
proto "github.com/micro/go-micro/server/debug/proto"
"golang.org/x/net/context"
)
func TestService(t *testing.T) {
@@ -31,9 +30,7 @@ func TestService(t *testing.T) {
// we can't test service.Init as it parses the command line
// service.Init()
// register handler
// do that later
// run service
go func() {
// wait for start
wg.Wait()
@@ -60,6 +57,7 @@ func TestService(t *testing.T) {
cancel()
}()
// run service
service.Run()
if err := service.Run(); err != nil {
t.Fatal(err)
}
}

View File

@@ -1,6 +1,7 @@
package transport
import (
//"fmt"
"bufio"
"bytes"
"crypto/tls"
@@ -13,10 +14,11 @@ import (
"sync"
"time"
"github.com/micro/go-log"
maddr "github.com/micro/misc/lib/addr"
mnet "github.com/micro/misc/lib/net"
mls "github.com/micro/misc/lib/tls"
"github.com/micro/h2c"
maddr "github.com/micro/util/go/lib/addr"
mnet "github.com/micro/util/go/lib/net"
mls "github.com/micro/util/go/lib/tls"
"golang.org/x/net/http2"
)
type buffer struct {
@@ -38,16 +40,25 @@ type httpTransportClient struct {
r chan *http.Request
bl []*http.Request
buff *bufio.Reader
// local/remote ip
local string
remote string
}
type httpTransportSocket struct {
ht *httpTransport
r chan *http.Request
conn net.Conn
once sync.Once
ht *httpTransport
w http.ResponseWriter
r *http.Request
rw *bufio.ReadWriter
sync.Mutex
buff *bufio.Reader
conn net.Conn
// for the first request
ch chan *http.Request
// local/remote ip
local string
remote string
}
type httpTransportListener struct {
@@ -59,6 +70,14 @@ func (b *buffer) Close() error {
return nil
}
func (h *httpTransportClient) Local() string {
return h.local
}
func (h *httpTransportClient) Remote() string {
return h.remote
}
func (h *httpTransportClient) Send(m *Message) error {
header := make(http.Header)
@@ -170,33 +189,87 @@ func (h *httpTransportClient) Close() error {
return err
}
func (h *httpTransportSocket) Local() string {
return h.local
}
func (h *httpTransportSocket) Remote() string {
return h.remote
}
func (h *httpTransportSocket) Recv(m *Message) error {
if m == nil {
return errors.New("message passed in is nil")
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
r, err := http.ReadRequest(h.buff)
if err != nil {
return err
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
r.Body.Close()
m.Body = b
if m.Header == nil {
m.Header = make(map[string]string)
}
for k, v := range r.Header {
// process http 1
if h.r.ProtoMajor == 1 {
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
var r *http.Request
select {
// get first request
case r = <-h.ch:
// read next request
default:
rr, err := http.ReadRequest(h.rw.Reader)
if err != nil {
return err
}
r = rr
}
// read body
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
// set body
r.Body.Close()
m.Body = b
// set headers
for k, v := range h.r.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
m.Header[k] = ""
}
}
// return early early
return nil
}
// processing http2 request
// read streaming body
// set max buffer size
buf := make([]byte, 4*1024)
// read the request body
n, err := h.r.Body.Read(buf)
// not an eof error
if err != nil {
return err
}
// check if we have data
if n > 0 {
m.Body = buf[:n]
}
// set headers
for k, v := range h.r.Header {
if len(v) > 0 {
m.Header[k] = v[0]
} else {
@@ -204,78 +277,73 @@ func (h *httpTransportSocket) Recv(m *Message) error {
}
}
select {
case h.r <- r:
default:
}
return nil
}
func (h *httpTransportSocket) Send(m *Message) error {
b := bytes.NewBuffer(m.Body)
defer b.Reset()
if h.r.ProtoMajor == 1 {
rsp := &http.Response{
Header: h.r.Header,
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
r := <-h.r
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
rsp := &http.Response{
Header: r.Header,
Body: &buffer{b},
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return rsp.Write(h.conn)
}
// http2 request
// set headers
for k, v := range m.Header {
rsp.Header.Set(k, v)
h.w.Header().Set(k, v)
}
select {
case h.r <- r:
default:
}
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
}
return rsp.Write(h.conn)
// write request
_, err := h.w.Write(m.Body)
return err
}
func (h *httpTransportSocket) error(m *Message) error {
b := bytes.NewBuffer(m.Body)
defer b.Reset()
rsp := &http.Response{
Header: make(http.Header),
Body: &buffer{b},
Status: "500 Internal Server Error",
StatusCode: 500,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
if h.r.ProtoMajor == 1 {
rsp := &http.Response{
Header: make(http.Header),
Body: ioutil.NopCloser(bytes.NewReader(m.Body)),
Status: "500 Internal Server Error",
StatusCode: 500,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
for k, v := range m.Header {
rsp.Header.Set(k, v)
}
return rsp.Write(h.conn)
return rsp.Write(h.conn)
}
return nil
}
func (h *httpTransportSocket) Close() error {
err := h.conn.Close()
h.once.Do(func() {
h.Lock()
h.buff.Reset(nil)
h.buff = nil
h.Unlock()
})
return err
if h.r.ProtoMajor == 1 {
return h.conn.Close()
}
return nil
}
func (h *httpTransportListener) Addr() string {
@@ -287,46 +355,69 @@ func (h *httpTransportListener) Close() error {
}
func (h *httpTransportListener) Accept(fn func(Socket)) error {
var tempDelay time.Duration
// create handler mux
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var buf *bufio.ReadWriter
var con net.Conn
for {
c, err := h.listener.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
log.Logf("http: Accept error: %v; retrying in %v\n", err, tempDelay)
time.Sleep(tempDelay)
continue
// read a regular request
if r.ProtoMajor == 1 {
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
return err
r.Body = ioutil.NopCloser(bytes.NewReader(b))
// hijack the conn
hj, ok := w.(http.Hijacker)
if !ok {
// we're screwed
http.Error(w, "cannot serve conn", http.StatusInternalServerError)
return
}
conn, bufrw, err := hj.Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
buf = bufrw
con = conn
}
sock := &httpTransportSocket{
ht: h.ht,
conn: c,
buff: bufio.NewReader(c),
r: make(chan *http.Request, 1),
}
// save the request
ch := make(chan *http.Request, 1)
ch <- r
go func() {
// TODO: think of a better error response strategy
defer func() {
if r := recover(); r != nil {
log.Log("panic recovered: ", r)
sock.Close()
}
}()
fn(&httpTransportSocket{
ht: h.ht,
w: w,
r: r,
rw: buf,
ch: ch,
conn: con,
local: h.Addr(),
remote: r.RemoteAddr,
})
})
fn(sock)
}()
// default http2 server
srv := &http.Server{
Handler: mux,
}
// insecure connection use h2c
if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) {
srv.Handler = &h2c.HandlerH2C{
Handler: mux,
H2Server: &http2.Server{},
}
}
// begin serving
return srv.Serve(h.listener)
}
func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
@@ -365,6 +456,8 @@ func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
buff: bufio.NewReader(conn),
dialOpts: dopts,
r: make(chan *http.Request, 1),
local: conn.LocalAddr().String(),
remote: conn.RemoteAddr().String(),
}, nil
}
@@ -423,6 +516,17 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err
}, nil
}
func (h *httpTransport) Init(opts ...Option) error {
for _, o := range opts {
o(&h.opts)
}
return nil
}
func (h *httpTransport) Options() Options {
return h.opts
}
func (h *httpTransport) String() string {
return "http"
}

View File

@@ -32,7 +32,7 @@ func TestHTTPTransportPortRange(t *testing.T) {
}
expectedPort(t, "44445", lsn2)
lsn, err := tp.Listen(":0")
lsn, err := tp.Listen("127.0.0.1:0")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
@@ -45,7 +45,7 @@ func TestHTTPTransportPortRange(t *testing.T) {
func TestHTTPTransportCommunication(t *testing.T) {
tr := NewTransport()
l, err := tr.Listen(":0")
l, err := tr.Listen("127.0.0.1:0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}
@@ -111,7 +111,7 @@ func TestHTTPTransportCommunication(t *testing.T) {
func TestHTTPTransportError(t *testing.T) {
tr := NewTransport()
l, err := tr.Listen(":0")
l, err := tr.Listen("127.0.0.1:0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}
@@ -181,7 +181,7 @@ func TestHTTPTransportError(t *testing.T) {
func TestHTTPTransportTimeout(t *testing.T) {
tr := NewTransport(Timeout(time.Millisecond * 100))
l, err := tr.Listen(":0")
l, err := tr.Listen("127.0.0.1:0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}

View File

@@ -18,6 +18,9 @@ type mockSocket struct {
exit chan bool
// listener exit
lexit chan bool
local string
remote string
}
type mockClient struct {
@@ -51,6 +54,14 @@ func (ms *mockSocket) Recv(m *transport.Message) error {
return nil
}
func (ms *mockSocket) Local() string {
return ms.local
}
func (ms *mockSocket) Remote() string {
return ms.remote
}
func (ms *mockSocket) Send(m *transport.Message) error {
select {
case <-ms.exit:
@@ -93,10 +104,12 @@ func (m *mockListener) Accept(fn func(transport.Socket)) error {
return nil
case c := <-m.conn:
go fn(&mockSocket{
lexit: c.lexit,
exit: c.exit,
send: c.recv,
recv: c.send,
lexit: c.lexit,
exit: c.exit,
send: c.recv,
recv: c.send,
local: c.Remote(),
remote: c.Local(),
})
}
}
@@ -118,10 +131,12 @@ func (m *mockTransport) Dial(addr string, opts ...transport.DialOption) (transpo
client := &mockClient{
&mockSocket{
send: make(chan *transport.Message),
recv: make(chan *transport.Message),
exit: make(chan bool),
lexit: listener.exit,
send: make(chan *transport.Message),
recv: make(chan *transport.Message),
exit: make(chan bool),
lexit: listener.exit,
local: addr,
remote: addr,
},
options,
}
@@ -171,6 +186,17 @@ func (m *mockTransport) Listen(addr string, opts ...transport.ListenOption) (tra
return listener, nil
}
func (m *mockTransport) Init(opts ...transport.Option) error {
for _, o := range opts {
o(&m.opts)
}
return nil
}
func (m *mockTransport) Options() transport.Options {
return m.opts
}
func (m *mockTransport) String() string {
return "mock"
}

View File

@@ -1,11 +1,11 @@
package transport
import (
"context"
"crypto/tls"
"time"
"github.com/micro/go-micro/transport/codec"
"golang.org/x/net/context"
)
type Options struct {

View File

@@ -14,6 +14,8 @@ type Socket interface {
Recv(*Message) error
Send(*Message) error
Close() error
Local() string
Remote() string
}
type Client interface {
@@ -30,6 +32,8 @@ type Listener interface {
// services. It uses socket send/recv semantics and had various
// implementations {HTTP, RabbitMQ, NATS, ...}
type Transport interface {
Init(...Option) error
Options() Options
Dial(addr string, opts ...DialOption) (Client, error)
Listen(addr string, opts ...ListenOption) (Listener, error)
String() string
@@ -50,15 +54,3 @@ var (
func NewTransport(opts ...Option) Transport {
return newHTTPTransport(opts...)
}
func Dial(addr string, opts ...DialOption) (Client, error) {
return DefaultTransport.Dial(addr, opts...)
}
func Listen(addr string, opts ...ListenOption) (Listener, error) {
return DefaultTransport.Listen(addr, opts...)
}
func String() string {
return DefaultTransport.String()
}

View File

@@ -1,10 +1,10 @@
package micro
import (
"context"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/metadata"
"golang.org/x/net/context"
)
type clientWrapper struct {
@@ -36,12 +36,12 @@ func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interf
return c.Client.Call(ctx, req, rsp, opts...)
}
func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
ctx = c.setHeaders(ctx)
return c.Client.Stream(ctx, req, opts...)
}
func (c *clientWrapper) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error {
func (c *clientWrapper) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
ctx = c.setHeaders(ctx)
return c.Client.Publish(ctx, p, opts...)
}

View File

@@ -1,11 +1,10 @@
package micro
import (
"context"
"testing"
"github.com/micro/go-micro/metadata"
"golang.org/x/net/context"
)
func TestWrapper(t *testing.T) {