Compare commits
42 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
45420d8413 | ||
|
0dcea05fb8 | ||
|
b0b0338128 | ||
|
11d75dae1b | ||
|
fc0bbcd339 | ||
|
c82dadfa55 | ||
|
7c8d6087de | ||
|
1f03681d82 | ||
|
1c1d46e1ac | ||
|
a545091c36 | ||
|
ada9ef48cf | ||
|
a7c4afac54 | ||
|
043e4aa979 | ||
|
78da1fde94 | ||
|
e7104d609a | ||
|
1890ec7044 | ||
|
d48735793d | ||
|
6fb652f78a | ||
|
bd46e60c13 | ||
|
42235bc973 | ||
|
c07b3636c0 | ||
|
2f09d5830c | ||
|
48513c78b6 | ||
|
bd34d39401 | ||
|
59685a4ff9 | ||
|
6385bf743c | ||
|
8fd8d9bd35 | ||
|
53554d98cd | ||
|
f6165f35c0 | ||
|
ae3f59a2f5 | ||
|
0703c514a9 | ||
|
b92130eeee | ||
|
e0e4596be0 | ||
|
5f60f7518d | ||
|
f2d4226817 | ||
|
236cfd6a3b | ||
|
d29b5e2fab | ||
|
7f173dfc63 | ||
|
8be72b676d | ||
|
0e696f4907 | ||
|
1748328f14 | ||
|
eff39083ca |
@@ -1,7 +1,7 @@
|
||||
language: go
|
||||
go:
|
||||
- 1.8.3
|
||||
- 1.9rc2
|
||||
- 1.8.5
|
||||
- 1.9.2
|
||||
notifications:
|
||||
slack:
|
||||
secure: aEvhLbhujaGaKSrOokiG3//PaVHTIrc3fBpoRbCRqfZpyq6WREoapJJhF+tIpWWOwaC9GmChbD6aHo/jMUgwKXVyPSaNjiEL87YzUUpL8B2zslNp1rgfTg/LrzthOx3Q1TYwpaAl3to0fuHUVFX4yMeC2vuThq7WSXgMMxFCtbc=
|
||||
|
234
README.md
234
README.md
@@ -1,6 +1,6 @@
|
||||
# Go Micro [](https://opensource.org/licenses/Apache-2.0) [](https://godoc.org/github.com/micro/go-micro) [](https://travis-ci.org/micro/go-micro) [](https://goreportcard.com/report/github.com/micro/go-micro)
|
||||
|
||||
Go Micro is a pluggable RPC framework for **microservices**. It is part of the [Micro](https://github.com/micro/micro) toolkit.
|
||||
Go Micro is a pluggable RPC framework for distributed systems development.
|
||||
|
||||
The **Micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly but everything can be easily swapped out. It comes with built in support for {json,proto}-rpc encoding, consul or multicast dns for service discovery, http for communication and random hashed client side load balancing.
|
||||
|
||||
@@ -12,12 +12,12 @@ Follow us on [Twitter](https://twitter.com/microhq) or join the [Slack](http://s
|
||||
|
||||
Go Micro abstracts away the details of distributed systems. Here are the main features.
|
||||
|
||||
- **Service Discovery** - Applications are automatically registered with service discovery so they can find each other.
|
||||
- **Load Balancing** - Smart client side load balancing is used to balance requests between instances of a service.
|
||||
- **Synchronous Communication** - Request-response is provided as a bidirectional streaming transport layer.
|
||||
- **Asynchronous Communication** - Microservices should promote an event driven architecture. Publish and Subscribe semantics are built in.
|
||||
- **Message Encoding** - Micro services can encode requests in a number of encoding formats and seamlessly decode based on the Content-Type header.
|
||||
- **RPC Client/Server** - The client and server leverage the above features and provide a clean simple interface for building microservices.
|
||||
- **Service Discovery** - Automatic registration and name resolution with service discovery
|
||||
- **Load Balancing** - Smart client side load balancing of services built on discovery
|
||||
- **Synchronous Comms** - RPC based communication with support for bidirectional streaming
|
||||
- **Asynchronous Comms** - PubSub interface built in for event driven architectures
|
||||
- **Message Encoding** - Dynamic encoding based on content-type with protobuf and json out of the box
|
||||
- **Service Interface** - All features are packaged in a simple high level interface for developing microservices
|
||||
|
||||
Go Micro supports both the Service and Function programming models. Read on to learn more.
|
||||
|
||||
@@ -27,70 +27,53 @@ For more detailed information on the architecture, installation and use of go-mi
|
||||
|
||||
## Learn By Example
|
||||
|
||||
An example service can be found in [**examples/service**](https://github.com/micro/examples/tree/master/service) and function in [**examples/function**](https://github.com/micro/examples/tree/master/function). The [**examples**](https://github.com/micro/examples) directory contains many more examples for using things such as middleware/wrappers, selector filters, pub/sub and code generation.
|
||||
For the complete greeter example look at [**examples/greeter**](https://github.com/micro/examples/tree/master/greeter). Other examples can be found throughout the GitHub repository.
|
||||
An example service can be found in [**examples/service**](https://github.com/micro/examples/tree/master/service) and function in [**examples/function**](https://github.com/micro/examples/tree/master/function).
|
||||
|
||||
Check out the blog post to learn how to write go-micro services [https://micro.mu/blog/2016/03/28/go-micro.html](https://micro.mu/blog/2016/03/28/go-micro.html) or watch the talk from the [Golang UK Conf 2016](https://www.youtube.com/watch?v=xspaDovwk34).
|
||||
The [**examples**](https://github.com/micro/examples) directory contains examples for using things such as middleware/wrappers, selector filters, pub/sub, grpc, plugins and much more. For the complete greeter example look at [**examples/greeter**](https://github.com/micro/examples/tree/master/greeter). Other examples can be found throughout the GitHub repository.
|
||||
|
||||
## Getting Started
|
||||
Watch the [Golang UK Conf 2016](https://www.youtube.com/watch?v=xspaDovwk34) video for a high level overview.
|
||||
|
||||
This is a quick getting started guide with the greeter service example.
|
||||
## Getting started
|
||||
|
||||
### Prerequisites: Service Discovery
|
||||
- [Service Discovery](#service-discovery)
|
||||
- [Writing a Service](#writing-a-service)
|
||||
- [Writing a Function](#writing-a-function)
|
||||
- [Plugins](#plugins)
|
||||
- [Wrappers](#wrappers)
|
||||
|
||||
There's just one prerequisite. We need a service discovery system to resolve service names to their address.
|
||||
The default discovery mechanism used in go-micro is Consul. Discovery is however pluggable so you can used
|
||||
etcd, kubernetes, zookeeper, etc. Plugins can be found in [micro/go-plugins](https://github.com/micro/go-plugins).
|
||||
## Service Discovery
|
||||
|
||||
### Multicast DNS
|
||||
|
||||
We can use multicast DNS with the built in MDNS registry for a zero dependency configuration.
|
||||
|
||||
Just pass `--registry=mdns` to any command
|
||||
```
|
||||
$ go run main.go --registry=mdns
|
||||
```
|
||||
Service discovery is used to resolve service names to addresses. It's the only dependency of go-micro.
|
||||
|
||||
### Consul
|
||||
|
||||
Alternatively we can use the default discovery system which is Consul.
|
||||
[Consul](https://www.consul.io/) is used as the default service discovery system.
|
||||
|
||||
**Mac OS**
|
||||
```
|
||||
brew install consul
|
||||
consul agent -dev
|
||||
```
|
||||
Discovery is pluggable. Find plugins for etcd, kubernetes, zookeeper and more in the [micro/go-plugins](https://github.com/micro/go-plugins) repo.
|
||||
|
||||
**Docker**
|
||||
```
|
||||
docker run consul
|
||||
```
|
||||
[Install guide](https://www.consul.io/intro/getting-started/install.html)
|
||||
|
||||
[Further installation instructions](https://www.consul.io/intro/getting-started/install.html)
|
||||
### Multicast DNS
|
||||
|
||||
### Run Service
|
||||
[Multicast DNS](https://en.wikipedia.org/wiki/Multicast_DNS) is a built in service discovery plugin for a zero dependency configuration.
|
||||
|
||||
Pass `--registry=mdns` to any command or the enviroment variable MICRO_REGISTRY=mdns
|
||||
|
||||
```
|
||||
$ go get github.com/micro/examples/service && service
|
||||
2016/03/14 10:59:14 Listening on [::]:50137
|
||||
2016/03/14 10:59:14 Broker Listening on [::]:50138
|
||||
2016/03/14 10:59:14 Registering node: greeter-ca62b017-e9d3-11e5-9bbb-68a86d0d36b6
|
||||
```
|
||||
|
||||
### Call Service
|
||||
```
|
||||
$ service --run_client
|
||||
Hello John
|
||||
go run main.go --registry=mdns
|
||||
```
|
||||
|
||||
## Writing a service
|
||||
|
||||
This is a simple greeter RPC service example
|
||||
|
||||
Find this example at [examples/service](https://github.com/micro/examples/tree/master/service).
|
||||
|
||||
### Create service proto
|
||||
|
||||
One of the key requirements of microservices is strongly defined interfaces so we utilised protobuf to define the handler and request/response.
|
||||
Here's a definition for the Greeter handler with the method Hello which takes a HelloRequest and HelloResponse both with one string arguments.
|
||||
One of the key requirements of microservices is strongly defined interfaces. Micro uses protobuf to achieve this.
|
||||
|
||||
`go-micro/examples/service/proto/greeter.proto`:
|
||||
Here we define the Greeter handler with the method Hello. It takes a HelloRequest and HelloResponse both with one string arguments.
|
||||
|
||||
```proto
|
||||
syntax = "proto3";
|
||||
@@ -110,28 +93,33 @@ message HelloResponse {
|
||||
|
||||
### Install protobuf
|
||||
|
||||
We use a protobuf plugin for code generation. This is completely optional. Look at [examples/server](https://github.com/micro/examples/blob/master/server/main.go)
|
||||
and [examples/client](https://github.com/micro/examples/blob/master/client/main.go) for examples without code generation.
|
||||
Install [protobuf](https://developers.google.com/protocol-buffers/)
|
||||
|
||||
Now install the micro fork of protoc-gen-go. The protobuf compiler for Go.
|
||||
|
||||
```shell
|
||||
go get github.com/micro/protobuf/{proto,protoc-gen-go}
|
||||
```
|
||||
|
||||
There's still a need for proto compiler to generate Go stub code from our proto file. You can either use the micro fork above or the official repo `github.com/golang/protobuf`.
|
||||
### Generate the proto
|
||||
|
||||
### Compile the proto
|
||||
After writing the proto definition we must compile it using protoc with the micro plugin.
|
||||
|
||||
```shell
|
||||
protoc -I$GOPATH/src --go_out=plugins=micro:$GOPATH/src \
|
||||
$GOPATH/src/github.com/micro/examples/service/proto/greeter.proto
|
||||
```
|
||||
|
||||
### Define the service
|
||||
### Write the service
|
||||
|
||||
Below is the code sample for the Greeter service. It basically implements the interface defined above for the Greeter handler,
|
||||
initialises the service, registers the handler and then runs itself. Simple as that.
|
||||
Below is the code for the greeter service.
|
||||
|
||||
`go-micro/examples/service/main.go`:
|
||||
It does the following:
|
||||
|
||||
1. Implements the interface defined for the Greeter handler
|
||||
2. Initialises a micro.Service
|
||||
3. Registers the Greeter handler
|
||||
4. Runs the service
|
||||
|
||||
```go
|
||||
package main
|
||||
@@ -155,7 +143,6 @@ func main() {
|
||||
// Create a new service. Optionally include some options here.
|
||||
service := micro.NewService(
|
||||
micro.Name("greeter"),
|
||||
micro.Version("latest"),
|
||||
)
|
||||
|
||||
// Init will parse the command line flags.
|
||||
@@ -174,6 +161,10 @@ func main() {
|
||||
### Run service
|
||||
```
|
||||
go run examples/service/main.go
|
||||
```
|
||||
|
||||
Output
|
||||
```
|
||||
2016/03/14 10:59:14 Listening on [::]:50137
|
||||
2016/03/14 10:59:14 Broker Listening on [::]:50138
|
||||
2016/03/14 10:59:14 Registering node: greeter-ca62b017-e9d3-11e5-9bbb-68a86d0d36b6
|
||||
@@ -181,10 +172,9 @@ go run examples/service/main.go
|
||||
|
||||
### Define a client
|
||||
|
||||
Below is the client code to query the greeter service. Notice we're using the code generated client interface `proto.NewGreeterClient`.
|
||||
This reduces the amount of boiler plate code we need to write. The greeter client can be reused throughout the code if need be.
|
||||
Below is the client code to query the greeter service.
|
||||
|
||||
`client.go`
|
||||
The generated proto includes a greeter client to reduce boilerplate code.
|
||||
|
||||
```go
|
||||
package main
|
||||
@@ -220,13 +210,18 @@ func main() {
|
||||
|
||||
```shell
|
||||
go run client.go
|
||||
```
|
||||
|
||||
Output
|
||||
```
|
||||
Hello John
|
||||
```
|
||||
|
||||
## Writing a Function
|
||||
|
||||
Go Micro includes the Function programming model. This is the notion of a one time executing Service which operates much like a service except exiting
|
||||
after completing a request. A function is defined much like a service and called in exactly the same way.
|
||||
Go Micro includes the Function programming model.
|
||||
|
||||
A Function is a one time executing Service which exits after completing a request.
|
||||
|
||||
### Defining a Function
|
||||
|
||||
@@ -265,61 +260,6 @@ func main() {
|
||||
|
||||
It's that simple.
|
||||
|
||||
## How does it work?
|
||||
|
||||
<p align="center">
|
||||
<img src="go-micro.png" />
|
||||
</p>
|
||||
|
||||
Go Micro is a framework that addresses the fundamental requirements to write microservices.
|
||||
|
||||
Let's dig into the core components.
|
||||
|
||||
### Registry
|
||||
|
||||
The registry provides a service discovery mechanism to resolve names to addresses. It can be backed by consul, etcd, zookeeper, dns, gossip, etc.
|
||||
Services should register using the registry on startup and deregister on shutdown. Services can optionally provide an expiry TTL and reregister
|
||||
on an interval to ensure liveness and that the service is cleaned up if it dies.
|
||||
|
||||
### Selector
|
||||
|
||||
The selector is a load balancing abstraction which builds on the registry. It allows services to be "filtered" using filter functions and "selected"
|
||||
using a choice of algorithms such as random, roundrobin, leastconn, etc. The selector is leveraged by the Client when making requests. The client
|
||||
will use the selector rather than the registry as it provides that built in mechanism of load balancing.
|
||||
|
||||
### Transport
|
||||
|
||||
The transport is the interface for synchronous request/response communication between services. It's akin to the golang net package but provides
|
||||
a higher level abstraction which allows us to switch out communication mechanisms e.g http, rabbitmq, websockets, NATS. The transport also
|
||||
supports bidirectional streaming. This is powerful for client side push to the server.
|
||||
|
||||
### Broker
|
||||
|
||||
The broker provides an interface to a message broker for asynchronous pub/sub communication. This is one of the fundamental requirements of an event
|
||||
driven architecture and microservices. By default we use an inbox style point to point HTTP system to minimise the number of dependencies required
|
||||
to get started. However there are many message broker implementations available in go-plugins e.g RabbitMQ, NATS, NSQ, Google Cloud Pub Sub.
|
||||
|
||||
### Codec
|
||||
|
||||
The codec is used for encoding and decoding messages before transporting them across the wire. This could be json, protobuf, bson, msgpack, etc.
|
||||
Where this differs from most other codecs is that we actually support the RPC format here as well. So we have JSON-RPC, PROTO-RPC, BSON-RPC, etc.
|
||||
It separates encoding from the client/server and provides a powerful method for integrating other systems such as gRPC, Vanadium, etc.
|
||||
|
||||
### Server
|
||||
|
||||
The server is the building block for writing a service. Here you can name your service, register request handlers, add middeware, etc. The service
|
||||
builds on the above packages to provide a unified interface for serving requests. The built in server is an RPC system. In the future there maybe
|
||||
other implementations. The server also allows you to define multiple codecs to serve different encoded messages.
|
||||
|
||||
### Client
|
||||
|
||||
The client provides an interface to make requests to services. Again like the server, it builds on the other packages to provide a unified interface
|
||||
for finding services by name using the registry, load balancing using the selector, making synchronous requests with the transport and asynchronous
|
||||
messaging using the broker.
|
||||
|
||||
|
||||
The above components are combined at the top-level of micro as a **Service**.
|
||||
|
||||
## Plugins
|
||||
|
||||
By default go-micro only provides a few implementation of each interface at the core but it's completely pluggable. There's already dozens of plugins which are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins). Contributions are welcome!
|
||||
@@ -353,6 +293,64 @@ Flag usage of plugins
|
||||
service --registry=etcdv3 --transport=nats --broker=kafka
|
||||
```
|
||||
|
||||
## Wrappers
|
||||
|
||||
Go-micro includes the notion of middleware as wrappers. The client or handlers can be wrapped using the decorator pattern.
|
||||
|
||||
### Handler
|
||||
|
||||
Here's an example service handler wrapper which logs the incoming request
|
||||
|
||||
```go
|
||||
// implements the server.HandlerWrapper
|
||||
func logWrapper(fn server.HandlerFunc) server.HandlerFunc {
|
||||
return func(ctx context.Context, req server.Request, rsp interface{}) error {
|
||||
fmt.Printf("[%v] server request: %s", time.Now(), req.Method())
|
||||
return fn(ctx, req, rsp)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
It can be initialised when creating the service
|
||||
|
||||
```go
|
||||
service := micro.NewService(
|
||||
micro.Name("greeter"),
|
||||
// wrap the handler
|
||||
micro.WrapHandler(logWrapper),
|
||||
)
|
||||
```
|
||||
|
||||
### Client
|
||||
|
||||
Here's an example of a client wrapper which logs requests made
|
||||
|
||||
```go
|
||||
type logWrapper struct {
|
||||
client.Client
|
||||
}
|
||||
|
||||
func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||
fmt.Printf("[wrapper] client request to service: %s method: %s\n", req.Service(), req.Method())
|
||||
return l.Client.Call(ctx, req, rsp)
|
||||
}
|
||||
|
||||
// implements client.Wrapper as logWrapper
|
||||
func logWrap(c client.Client) client.Client {
|
||||
return &logWrapper{c}
|
||||
}
|
||||
```
|
||||
|
||||
It can be initialised when creating the service
|
||||
|
||||
```go
|
||||
service := micro.NewService(
|
||||
micro.Name("greeter"),
|
||||
// wrap the client
|
||||
micro.WrapClient(logWrap),
|
||||
)
|
||||
```
|
||||
|
||||
## Other Languages
|
||||
|
||||
Check out [ja-micro](https://github.com/Sixt/ja-micro) to write services in Java
|
||||
|
@@ -2,8 +2,6 @@
|
||||
package broker
|
||||
|
||||
// Broker is an interface used for asynchronous messaging.
|
||||
// Its an abstraction over various message brokers
|
||||
// {NATS, RabbitMQ, Kafka, ...}
|
||||
type Broker interface {
|
||||
Options() Options
|
||||
Address() string
|
||||
|
@@ -3,6 +3,7 @@ package broker
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@@ -18,8 +19,9 @@ import (
|
||||
|
||||
"github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/broker/codec/json"
|
||||
"github.com/micro/go-micro/errors"
|
||||
merr "github.com/micro/go-micro/errors"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-rcache"
|
||||
maddr "github.com/micro/misc/lib/addr"
|
||||
mnet "github.com/micro/misc/lib/net"
|
||||
mls "github.com/micro/misc/lib/tls"
|
||||
@@ -28,15 +30,11 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// HTTP Broker is a placeholder for actual message brokers.
|
||||
// This should not really be used in production but useful
|
||||
// in developer where you want zero dependencies.
|
||||
|
||||
// HTTP Broker is a point to point async broker
|
||||
type httpBroker struct {
|
||||
id string
|
||||
address string
|
||||
unsubscribe chan *httpSubscriber
|
||||
opts Options
|
||||
id string
|
||||
address string
|
||||
opts Options
|
||||
|
||||
mux *http.ServeMux
|
||||
|
||||
@@ -53,9 +51,9 @@ type httpSubscriber struct {
|
||||
opts SubscribeOptions
|
||||
id string
|
||||
topic string
|
||||
ch chan *httpSubscriber
|
||||
fn Handler
|
||||
svc *registry.Service
|
||||
hb *httpBroker
|
||||
}
|
||||
|
||||
type httpPublication struct {
|
||||
@@ -106,11 +104,13 @@ func newHttpBroker(opts ...Option) Broker {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// set address
|
||||
addr := ":0"
|
||||
if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
|
||||
addr = options.Addrs[0]
|
||||
}
|
||||
|
||||
// get registry
|
||||
reg, ok := options.Context.Value(registryKey).(registry.Registry)
|
||||
if !ok {
|
||||
reg = registry.DefaultRegistry
|
||||
@@ -123,7 +123,6 @@ func newHttpBroker(opts ...Option) Broker {
|
||||
r: reg,
|
||||
c: &http.Client{Transport: newTransport(options.TLSConfig)},
|
||||
subscribers: make(map[string][]*httpSubscriber),
|
||||
unsubscribe: make(chan *httpSubscriber),
|
||||
exit: make(chan chan error),
|
||||
mux: http.NewServeMux(),
|
||||
}
|
||||
@@ -153,9 +152,41 @@ func (h *httpSubscriber) Topic() string {
|
||||
}
|
||||
|
||||
func (h *httpSubscriber) Unsubscribe() error {
|
||||
h.ch <- h
|
||||
// artificial delay
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
return h.hb.unsubscribe(h)
|
||||
}
|
||||
|
||||
func (h *httpBroker) subscribe(s *httpSubscriber) error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
var subscribers []*httpSubscriber
|
||||
|
||||
// look for subscriber
|
||||
for _, sub := range h.subscribers[s.topic] {
|
||||
// deregister and skip forward
|
||||
if sub.id == s.id {
|
||||
h.r.Deregister(sub.svc)
|
||||
continue
|
||||
}
|
||||
// keep subscriber
|
||||
subscribers = append(subscribers, sub)
|
||||
}
|
||||
|
||||
// set subscribers
|
||||
h.subscribers[s.topic] = subscribers
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -177,29 +208,75 @@ func (h *httpBroker) run(l net.Listener) {
|
||||
// received exit signal
|
||||
case ch := <-h.exit:
|
||||
ch <- l.Close()
|
||||
h.Lock()
|
||||
h.running = false
|
||||
h.Unlock()
|
||||
return
|
||||
// unsubscribe subscriber
|
||||
case subscriber := <-h.unsubscribe:
|
||||
h.Lock()
|
||||
var subscribers []*httpSubscriber
|
||||
for _, sub := range h.subscribers[subscriber.topic] {
|
||||
// deregister and skip forward
|
||||
if sub.id == subscriber.id {
|
||||
h.RLock()
|
||||
for _, subs := range h.subscribers {
|
||||
for _, sub := range subs {
|
||||
h.r.Deregister(sub.svc)
|
||||
continue
|
||||
}
|
||||
subscribers = append(subscribers, sub)
|
||||
}
|
||||
h.subscribers[subscriber.topic] = subscribers
|
||||
h.Unlock()
|
||||
h.RUnlock()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *httpBroker) start() error {
|
||||
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
if req.Method != "POST" {
|
||||
err := merr.BadRequest("go.micro.broker", "Method not allowed")
|
||||
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
defer req.Body.Close()
|
||||
|
||||
req.ParseForm()
|
||||
|
||||
b, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(errr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
var m *Message
|
||||
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
|
||||
errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(errr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
topic := m.Header[":topic"]
|
||||
delete(m.Header, ":topic")
|
||||
|
||||
if len(topic) == 0 {
|
||||
errr := merr.InternalServerError("go.micro.broker", "Topic not found")
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(errr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
p := &httpPublication{m: m, t: topic}
|
||||
id := req.Form.Get("id")
|
||||
|
||||
h.RLock()
|
||||
for _, subscriber := range h.subscribers[topic] {
|
||||
if id == subscriber.id {
|
||||
// sub is sync; crufty rate limiting
|
||||
// so we don't hose the cpu
|
||||
subscriber.fn(p)
|
||||
}
|
||||
}
|
||||
h.RUnlock()
|
||||
}
|
||||
|
||||
func (h *httpBroker) Address() string {
|
||||
h.RLock()
|
||||
defer h.RUnlock()
|
||||
return h.address
|
||||
}
|
||||
|
||||
func (h *httpBroker) Connect() error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
@@ -255,11 +332,20 @@ func (h *httpBroker) start() error {
|
||||
go http.Serve(l, h.mux)
|
||||
go h.run(l)
|
||||
|
||||
// get registry
|
||||
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
|
||||
if !ok {
|
||||
reg = registry.DefaultRegistry
|
||||
}
|
||||
// set rcache
|
||||
h.r = rcache.New(reg)
|
||||
|
||||
// set running
|
||||
h.running = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpBroker) stop() error {
|
||||
func (h *httpBroker) Disconnect() error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
@@ -267,76 +353,30 @@ func (h *httpBroker) stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// stop rcache
|
||||
rc, ok := h.r.(rcache.Cache)
|
||||
if ok {
|
||||
rc.Stop()
|
||||
}
|
||||
|
||||
// exit and return err
|
||||
ch := make(chan error)
|
||||
h.exit <- ch
|
||||
err := <-ch
|
||||
|
||||
// set not running
|
||||
h.running = false
|
||||
return err
|
||||
}
|
||||
|
||||
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
if req.Method != "POST" {
|
||||
err := errors.BadRequest("go.micro.broker", "Method not allowed")
|
||||
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
defer req.Body.Close()
|
||||
|
||||
req.ParseForm()
|
||||
|
||||
b, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
errr := errors.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(errr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
var m *Message
|
||||
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
|
||||
errr := errors.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(errr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
topic := m.Header[":topic"]
|
||||
delete(m.Header, ":topic")
|
||||
|
||||
if len(topic) == 0 {
|
||||
errr := errors.InternalServerError("go.micro.broker", "Topic not found")
|
||||
w.WriteHeader(500)
|
||||
w.Write([]byte(errr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
p := &httpPublication{m: m, t: topic}
|
||||
id := req.Form.Get("id")
|
||||
|
||||
h.RLock()
|
||||
for _, subscriber := range h.subscribers[topic] {
|
||||
if id == subscriber.id {
|
||||
// sub is sync; crufty rate limiting
|
||||
// so we don't hose the cpu
|
||||
subscriber.fn(p)
|
||||
}
|
||||
}
|
||||
h.RUnlock()
|
||||
}
|
||||
|
||||
func (h *httpBroker) Address() string {
|
||||
return h.address
|
||||
}
|
||||
|
||||
func (h *httpBroker) Connect() error {
|
||||
return h.start()
|
||||
}
|
||||
|
||||
func (h *httpBroker) Disconnect() error {
|
||||
return h.stop()
|
||||
}
|
||||
|
||||
func (h *httpBroker) Init(opts ...Option) error {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
if h.running {
|
||||
return errors.New("cannot init while connected")
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&h.opts)
|
||||
}
|
||||
@@ -345,12 +385,19 @@ func (h *httpBroker) Init(opts ...Option) error {
|
||||
h.id = "broker-" + uuid.NewUUID().String()
|
||||
}
|
||||
|
||||
// get registry
|
||||
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
|
||||
if !ok {
|
||||
reg = registry.DefaultRegistry
|
||||
}
|
||||
|
||||
h.r = reg
|
||||
// get rcache
|
||||
if rc, ok := h.r.(rcache.Cache); ok {
|
||||
rc.Stop()
|
||||
}
|
||||
|
||||
// set registry
|
||||
h.r = rcache.New(reg)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -360,10 +407,13 @@ func (h *httpBroker) Options() Options {
|
||||
}
|
||||
|
||||
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
|
||||
h.RLock()
|
||||
s, err := h.r.GetService("topic:" + topic)
|
||||
if err != nil {
|
||||
h.RUnlock()
|
||||
return err
|
||||
}
|
||||
h.RUnlock()
|
||||
|
||||
m := &Message{
|
||||
Header: make(map[string]string),
|
||||
@@ -381,7 +431,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
|
||||
return err
|
||||
}
|
||||
|
||||
fn := func(node *registry.Node, b []byte) {
|
||||
pub := func(node *registry.Node, b []byte) {
|
||||
scheme := "http"
|
||||
|
||||
// check if secure is added in metadata
|
||||
@@ -411,15 +461,14 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
|
||||
case broadcastVersion:
|
||||
for _, node := range service.Nodes {
|
||||
// publish async
|
||||
go fn(node, b)
|
||||
go pub(node, b)
|
||||
}
|
||||
|
||||
default:
|
||||
// select node to publish to
|
||||
node := service.Nodes[rand.Int()%len(service.Nodes)]
|
||||
|
||||
// publish async
|
||||
go fn(node, b)
|
||||
go pub(node, b)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -427,7 +476,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
|
||||
}
|
||||
|
||||
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
|
||||
opt := newSubscribeOptions(opts...)
|
||||
options := newSubscribeOptions(opts...)
|
||||
|
||||
// parse address for host, port
|
||||
parts := strings.Split(h.Address(), ":")
|
||||
@@ -439,7 +488,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
||||
return nil, err
|
||||
}
|
||||
|
||||
id := uuid.NewUUID().String()
|
||||
// create unique id
|
||||
id := h.id + "." + uuid.NewUUID().String()
|
||||
|
||||
var secure bool
|
||||
|
||||
@@ -449,7 +499,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
||||
|
||||
// register service
|
||||
node := ®istry.Node{
|
||||
Id: h.id + "." + id,
|
||||
Id: id,
|
||||
Address: addr,
|
||||
Port: port,
|
||||
Metadata: map[string]string{
|
||||
@@ -457,7 +507,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
||||
},
|
||||
}
|
||||
|
||||
version := opt.Queue
|
||||
// check for queue group or broadcast queue
|
||||
version := options.Queue
|
||||
if len(version) == 0 {
|
||||
version = broadcastVersion
|
||||
}
|
||||
@@ -468,22 +519,22 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
|
||||
Nodes: []*registry.Node{node},
|
||||
}
|
||||
|
||||
// generate subscriber
|
||||
subscriber := &httpSubscriber{
|
||||
opts: opt,
|
||||
id: h.id + "." + id,
|
||||
opts: options,
|
||||
hb: h,
|
||||
id: id,
|
||||
topic: topic,
|
||||
ch: h.unsubscribe,
|
||||
fn: handler,
|
||||
svc: service,
|
||||
}
|
||||
|
||||
if err := h.r.Register(service, registry.RegisterTTL(registerTTL)); err != nil {
|
||||
// subscribe now
|
||||
if err := h.subscribe(subscriber); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
h.Lock()
|
||||
h.subscribers[topic] = append(h.subscribers[topic], subscriber)
|
||||
h.Unlock()
|
||||
// return the subscriber
|
||||
return subscriber, nil
|
||||
}
|
||||
|
||||
|
@@ -70,7 +70,8 @@ var (
|
||||
cli.IntFlag{
|
||||
Name: "client_pool_size",
|
||||
EnvVar: "MICRO_CLIENT_POOL_SIZE",
|
||||
Usage: "Sets the client connection pool size. Default: 0",
|
||||
Usage: "Sets the client connection pool size. Default: 1",
|
||||
Value: 1,
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "client_pool_ttl",
|
||||
@@ -132,6 +133,7 @@ var (
|
||||
Name: "selector",
|
||||
EnvVar: "MICRO_SELECTOR",
|
||||
Usage: "Selector used to pick nodes for querying",
|
||||
Value: "cache",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "server",
|
||||
@@ -181,7 +183,7 @@ var (
|
||||
defaultServer = "rpc"
|
||||
defaultBroker = "http"
|
||||
defaultRegistry = "consul"
|
||||
defaultSelector = "default"
|
||||
defaultSelector = "cache"
|
||||
defaultTransport = "http"
|
||||
)
|
||||
|
||||
|
@@ -45,7 +45,7 @@ var (
|
||||
HeaderPrefix = "X-Micro-"
|
||||
)
|
||||
|
||||
// NewService creates an returns a new Service based on the packages within.
|
||||
// NewService creates and returns a new Service based on the packages within.
|
||||
func NewService(opts ...Option) Service {
|
||||
return newService(opts...)
|
||||
}
|
||||
@@ -81,5 +81,5 @@ func RegisterHandler(s server.Server, h interface{}, opts ...server.HandlerOptio
|
||||
|
||||
// RegisterSubscriber is syntactic sugar for registering a subscriber
|
||||
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
|
||||
return s.Subscribe(s.NewSubscriber(topic, h))
|
||||
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
|
||||
}
|
||||
|
@@ -104,6 +104,13 @@ func Registry(r registry.Registry) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Selector sets the selector for the service client
|
||||
func Selector(s selector.Selector) Option {
|
||||
return func(o *Options) {
|
||||
o.Client.Init(client.Selector(s))
|
||||
}
|
||||
}
|
||||
|
||||
// Transport sets the transport for the service
|
||||
// and the underlying components
|
||||
func Transport(t transport.Transport) Option {
|
||||
|
@@ -59,14 +59,6 @@ func newConsulRegistry(opts ...Option) Registry {
|
||||
config = c
|
||||
}
|
||||
}
|
||||
if config.HttpClient == nil {
|
||||
config.HttpClient = new(http.Client)
|
||||
}
|
||||
|
||||
// set timeout
|
||||
if options.Timeout > 0 {
|
||||
config.HttpClient.Timeout = options.Timeout
|
||||
}
|
||||
|
||||
// check if there are any addrs
|
||||
if len(options.Addrs) > 0 {
|
||||
@@ -82,6 +74,10 @@ func newConsulRegistry(opts ...Option) Registry {
|
||||
|
||||
// requires secure connection?
|
||||
if options.Secure || options.TLSConfig != nil {
|
||||
if config.HttpClient == nil {
|
||||
config.HttpClient = new(http.Client)
|
||||
}
|
||||
|
||||
config.Scheme = "https"
|
||||
// We're going to support InsecureSkipVerify
|
||||
config.HttpClient.Transport = newTransport(options.TLSConfig)
|
||||
@@ -90,6 +86,11 @@ func newConsulRegistry(opts ...Option) Registry {
|
||||
// create the client
|
||||
client, _ := consul.NewClient(config)
|
||||
|
||||
// set timeout
|
||||
if options.Timeout > 0 {
|
||||
config.HttpClient.Timeout = options.Timeout
|
||||
}
|
||||
|
||||
cr := &consulRegistry{
|
||||
Address: config.Address,
|
||||
Client: client,
|
||||
|
@@ -41,7 +41,7 @@ func newMockServer(rg *mockRegistry, l net.Listener) error {
|
||||
}
|
||||
|
||||
func newConsulTestRegistry(r *mockRegistry) (*consulRegistry, func()) {
|
||||
l, err := net.Listen("tcp", ":0")
|
||||
l, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
// blurgh?!!
|
||||
panic(err.Error())
|
||||
@@ -104,7 +104,11 @@ func TestConsul_GetService_WithHealthyServiceNodes(t *testing.T) {
|
||||
})
|
||||
defer cl()
|
||||
|
||||
svc, _ := cr.GetService("service-name")
|
||||
svc, err := cr.GetService("service-name")
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error", err)
|
||||
}
|
||||
|
||||
if exp, act := 1, len(svc); exp != act {
|
||||
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
@@ -140,7 +144,11 @@ func TestConsul_GetService_WithUnhealthyServiceNode(t *testing.T) {
|
||||
})
|
||||
defer cl()
|
||||
|
||||
svc, _ := cr.GetService("service-name")
|
||||
svc, err := cr.GetService("service-name")
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error", err)
|
||||
}
|
||||
|
||||
if exp, act := 1, len(svc); exp != act {
|
||||
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
@@ -176,7 +184,11 @@ func TestConsul_GetService_WithUnhealthyServiceNodes(t *testing.T) {
|
||||
})
|
||||
defer cl()
|
||||
|
||||
svc, _ := cr.GetService("service-name")
|
||||
svc, err := cr.GetService("service-name")
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error", err)
|
||||
}
|
||||
|
||||
if exp, act := 1, len(svc); exp != act {
|
||||
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
|
||||
}
|
||||
|
93
selector/cache/cache.go
vendored
93
selector/cache/cache.go
vendored
@@ -23,6 +23,8 @@ type cacheSelector struct {
|
||||
cache map[string][]*registry.Service
|
||||
ttls map[string]time.Time
|
||||
|
||||
once sync.Once
|
||||
|
||||
// used to close or reload watcher
|
||||
reload chan bool
|
||||
exit chan bool
|
||||
@@ -86,30 +88,53 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
// get does the actual request for a service
|
||||
// it also caches it
|
||||
get := func(service string) ([]*registry.Service, error) {
|
||||
// ask the registry
|
||||
services, err := c.so.Registry.GetService(service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// cache results
|
||||
c.set(service, c.cp(services))
|
||||
return services, nil
|
||||
}
|
||||
|
||||
// check the cache first
|
||||
services, ok := c.cache[service]
|
||||
|
||||
// cache miss or no services
|
||||
if !ok || len(services) == 0 {
|
||||
return get(service)
|
||||
}
|
||||
|
||||
// got cache but lets check ttl
|
||||
ttl, kk := c.ttls[service]
|
||||
|
||||
// got results, copy and return
|
||||
if ok && len(services) > 0 {
|
||||
// only return if its less than the ttl
|
||||
if kk && time.Since(ttl) < c.ttl {
|
||||
return c.cp(services), nil
|
||||
}
|
||||
// within ttl so return cache
|
||||
if kk && time.Since(ttl) < c.ttl {
|
||||
return c.cp(services), nil
|
||||
}
|
||||
|
||||
// cache miss or ttl expired
|
||||
// expired entry so get service
|
||||
services, err := get(service)
|
||||
|
||||
// now ask the registry
|
||||
services, err := c.so.Registry.GetService(service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// no error then return error
|
||||
if err == nil {
|
||||
return services, nil
|
||||
}
|
||||
|
||||
// we didn't have any results so cache
|
||||
c.cache[service] = c.cp(services)
|
||||
c.ttls[service] = time.Now().Add(c.ttl)
|
||||
return services, nil
|
||||
// not found error then return
|
||||
if err == registry.ErrNotFound {
|
||||
return nil, selector.ErrNotFound
|
||||
}
|
||||
|
||||
// other error
|
||||
|
||||
// return expired cache as last resort
|
||||
return c.cp(services), nil
|
||||
}
|
||||
|
||||
func (c *cacheSelector) set(service string, services []*registry.Service) {
|
||||
@@ -230,8 +255,6 @@ func (c *cacheSelector) update(res *registry.Result) {
|
||||
// reloads the watcher if Init is called
|
||||
// and returns when Close is called
|
||||
func (c *cacheSelector) run() {
|
||||
go c.tick()
|
||||
|
||||
for {
|
||||
// exit early if already dead
|
||||
if c.quit() {
|
||||
@@ -241,6 +264,9 @@ func (c *cacheSelector) run() {
|
||||
// create new watcher
|
||||
w, err := c.so.Registry.Watch()
|
||||
if err != nil {
|
||||
if c.quit() {
|
||||
return
|
||||
}
|
||||
log.Log(err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
@@ -248,33 +274,15 @@ func (c *cacheSelector) run() {
|
||||
|
||||
// watch for events
|
||||
if err := c.watch(w); err != nil {
|
||||
if c.quit() {
|
||||
return
|
||||
}
|
||||
log.Log(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check cache and expire on each tick
|
||||
func (c *cacheSelector) tick() {
|
||||
t := time.NewTicker(time.Minute)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
c.Lock()
|
||||
for service, expiry := range c.ttls {
|
||||
if d := time.Since(expiry); d > c.ttl {
|
||||
// TODO: maybe refresh the cache rather than blowing it away
|
||||
c.del(service)
|
||||
}
|
||||
}
|
||||
c.Unlock()
|
||||
case <-c.exit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// watch loops the next event and calls update
|
||||
// it returns if there's an error
|
||||
func (c *cacheSelector) watch(w registry.Watcher) error {
|
||||
@@ -324,6 +332,10 @@ func (c *cacheSelector) Options() selector.Options {
|
||||
}
|
||||
|
||||
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
|
||||
c.once.Do(func() {
|
||||
go c.run()
|
||||
})
|
||||
|
||||
sopts := selector.SelectOptions{
|
||||
Strategy: c.so.Strategy,
|
||||
}
|
||||
@@ -401,7 +413,7 @@ func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
}
|
||||
}
|
||||
|
||||
c := &cacheSelector{
|
||||
return &cacheSelector{
|
||||
so: sopts,
|
||||
ttl: ttl,
|
||||
cache: make(map[string][]*registry.Service),
|
||||
@@ -409,7 +421,4 @@ func NewSelector(opts ...selector.Option) selector.Selector {
|
||||
reload: make(chan bool, 1),
|
||||
exit: make(chan bool),
|
||||
}
|
||||
|
||||
go c.run()
|
||||
return c
|
||||
}
|
||||
|
@@ -1,9 +1,6 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
@@ -11,10 +8,6 @@ type defaultSelector struct {
|
||||
so Options
|
||||
}
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().Unix())
|
||||
}
|
||||
|
||||
func (r *defaultSelector) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&r.so)
|
||||
|
@@ -3,6 +3,7 @@ package server
|
||||
import (
|
||||
"fmt"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -181,12 +182,12 @@ func (s *rpcServer) Subscribe(sb Subscriber) error {
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
_, ok = s.subscribers[sub]
|
||||
if ok {
|
||||
return fmt.Errorf("subscriber %v already exists", s)
|
||||
}
|
||||
s.subscribers[sub] = nil
|
||||
s.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -232,19 +233,34 @@ func (s *rpcServer) Register() error {
|
||||
node.Metadata["registry"] = config.Registry.String()
|
||||
|
||||
s.RLock()
|
||||
var endpoints []*registry.Endpoint
|
||||
for _, e := range s.handlers {
|
||||
// Maps are ordered randomly, sort the keys for consistency
|
||||
var handlerList []string
|
||||
for n, e := range s.handlers {
|
||||
// Only advertise non internal handlers
|
||||
if !e.Options().Internal {
|
||||
endpoints = append(endpoints, e.Endpoints()...)
|
||||
handlerList = append(handlerList, n)
|
||||
}
|
||||
}
|
||||
for e, _ := range s.subscribers {
|
||||
sort.Strings(handlerList)
|
||||
|
||||
var subscriberList []*subscriber
|
||||
for e := range s.subscribers {
|
||||
// Only advertise non internal subscribers
|
||||
if !e.Options().Internal {
|
||||
endpoints = append(endpoints, e.Endpoints()...)
|
||||
subscriberList = append(subscriberList, e)
|
||||
}
|
||||
}
|
||||
sort.Slice(subscriberList, func(i, j int) bool {
|
||||
return subscriberList[i].topic > subscriberList[j].topic
|
||||
})
|
||||
|
||||
var endpoints []*registry.Endpoint
|
||||
for _, n := range handlerList {
|
||||
endpoints = append(endpoints, s.handlers[n].Endpoints()...)
|
||||
}
|
||||
for _, e := range subscriberList {
|
||||
endpoints = append(endpoints, e.Endpoints()...)
|
||||
}
|
||||
s.RUnlock()
|
||||
|
||||
service := ®istry.Service{
|
||||
|
33
service.go
33
service.go
@@ -3,9 +3,11 @@ package micro
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
log "github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/cmd"
|
||||
"github.com/micro/go-micro/metadata"
|
||||
@@ -15,7 +17,7 @@ import (
|
||||
type service struct {
|
||||
opts Options
|
||||
|
||||
init chan bool
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func newService(opts ...Option) Service {
|
||||
@@ -30,7 +32,6 @@ func newService(opts ...Option) Service {
|
||||
|
||||
return &service{
|
||||
opts: options,
|
||||
init: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,7 +45,10 @@ func (s *service) run(exit chan bool) {
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
s.opts.Server.Register()
|
||||
err := s.opts.Server.Register()
|
||||
if err != nil {
|
||||
log.Log("service run Server.Register error: ", err)
|
||||
}
|
||||
case <-exit:
|
||||
t.Stop()
|
||||
return
|
||||
@@ -56,23 +60,12 @@ func (s *service) run(exit chan bool) {
|
||||
// which parses command line flags. cmd.Init is only called
|
||||
// on first Init.
|
||||
func (s *service) Init(opts ...Option) {
|
||||
// If <-s.init blocks, Init has not been called yet
|
||||
// so we can call cmd.Init once.
|
||||
select {
|
||||
case <-s.init:
|
||||
// only process options
|
||||
for _, o := range opts {
|
||||
o(&s.opts)
|
||||
}
|
||||
default:
|
||||
// close init
|
||||
close(s.init)
|
||||
|
||||
// process options
|
||||
for _, o := range opts {
|
||||
o(&s.opts)
|
||||
}
|
||||
// process options
|
||||
for _, o := range opts {
|
||||
o(&s.opts)
|
||||
}
|
||||
|
||||
s.once.Do(func() {
|
||||
// Initialise the command flags, overriding new service
|
||||
s.opts.Cmd.Init(
|
||||
cmd.Broker(&s.opts.Broker),
|
||||
@@ -81,7 +74,7 @@ func (s *service) Init(opts ...Option) {
|
||||
cmd.Client(&s.opts.Client),
|
||||
cmd.Server(&s.opts.Server),
|
||||
)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s *service) Options() Options {
|
||||
|
@@ -32,7 +32,7 @@ func TestHTTPTransportPortRange(t *testing.T) {
|
||||
}
|
||||
expectedPort(t, "44445", lsn2)
|
||||
|
||||
lsn, err := tp.Listen(":0")
|
||||
lsn, err := tp.Listen("127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Errorf("Did not expect an error, got %s", err)
|
||||
}
|
||||
@@ -45,7 +45,7 @@ func TestHTTPTransportPortRange(t *testing.T) {
|
||||
func TestHTTPTransportCommunication(t *testing.T) {
|
||||
tr := NewTransport()
|
||||
|
||||
l, err := tr.Listen(":0")
|
||||
l, err := tr.Listen("127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected listen err: %v", err)
|
||||
}
|
||||
@@ -111,7 +111,7 @@ func TestHTTPTransportCommunication(t *testing.T) {
|
||||
func TestHTTPTransportError(t *testing.T) {
|
||||
tr := NewTransport()
|
||||
|
||||
l, err := tr.Listen(":0")
|
||||
l, err := tr.Listen("127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected listen err: %v", err)
|
||||
}
|
||||
@@ -181,7 +181,7 @@ func TestHTTPTransportError(t *testing.T) {
|
||||
func TestHTTPTransportTimeout(t *testing.T) {
|
||||
tr := NewTransport(Timeout(time.Millisecond * 100))
|
||||
|
||||
l, err := tr.Listen(":0")
|
||||
l, err := tr.Listen("127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected listen err: %v", err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user