Compare commits

..

42 Commits

Author SHA1 Message Date
Asim Aslam
45420d8413 Merge pull request #224 from dh1tw/rpc-server-subscribe-deadlock
fix possible deadlock since code can return without unlocking the Mutex
2018-01-02 10:22:13 +00:00
Tobias Wellnitz, DH1TW
0dcea05fb8 fix possible deadlock since code can return without unlocking the Mutex 2018-01-01 19:57:13 +01:00
Asim Aslam
b0b0338128 add option to set selector 2017-12-20 21:43:24 +00:00
Asim Aslam
11d75dae1b remove version from example 2017-11-30 12:28:20 +00:00
Asim Aslam
fc0bbcd339 change the blurb 2017-11-30 09:16:54 +00:00
Asim Aslam
c82dadfa55 Merge pull request #221 from gaxxx/master
add https support for consul
2017-11-28 07:11:48 +00:00
Siyun Wu
7c8d6087de add https support for consul
using enviroment variables

for example:
export CONSUL_HTTP_SSL=1
export CONSUL_HTTP_ADDR="https://example.com"
export CONSUL_CLIENT_CERT="/Users/foo/.ssh/consul/consul.cert"
export CONSUL_CLIENT_KEY="/Users/foo/.ssh/consul/consul.key"
export CONSUL_CACERT="/Users/foo/.ssh/consul/ca.cert"
2017-11-20 15:34:52 +08:00
Asim Aslam
1f03681d82 set test to use localhost 2017-11-09 14:21:26 +00:00
Asim Aslam
1c1d46e1ac Add some test logging 2017-11-09 14:16:35 +00:00
Asim Aslam
a545091c36 update go versions for travis build 2017-11-09 13:52:51 +00:00
Asim Aslam
ada9ef48cf Remove whitespace 2017-11-09 13:51:40 +00:00
Asim Aslam
a7c4afac54 Merge pull request #213 from weisd/master
add log when register err
2017-11-09 13:50:38 +00:00
Asim Aslam
043e4aa979 please stack overflow 2017-11-03 17:30:16 +00:00
Asim Aslam
78da1fde94 Merge pull request #217 from micro/perf
Performance upgrades
2017-10-29 14:48:23 +00:00
Asim Aslam
e7104d609a return the not found error 2017-10-28 16:21:32 +01:00
Asim Aslam
1890ec7044 rc is not used 2017-10-28 13:55:59 +01:00
Asim Aslam
d48735793d remove ticker 2017-10-26 21:12:48 +01:00
Asim Aslam
6fb652f78a lazily start watcher 2017-10-26 20:55:52 +01:00
Asim Aslam
bd46e60c13 optimise http broker with rcache 2017-10-26 20:48:11 +01:00
Asim Aslam
42235bc973 Merge branch 'master' into perf 2017-10-26 13:47:09 +01:00
Asim Aslam
c07b3636c0 update readme 2017-10-26 12:19:42 +01:00
Asim Aslam
2f09d5830c update readme 2017-10-26 12:18:14 +01:00
Asim Aslam
48513c78b6 further readme culling 2017-10-26 12:16:17 +01:00
Asim Aslam
bd34d39401 update readme 2017-10-26 12:02:52 +01:00
Asim Aslam
59685a4ff9 update readme 2017-10-26 12:02:10 +01:00
weisd
6385bf743c add log when register err 2017-10-25 14:23:58 +08:00
Asim Aslam
8fd8d9bd35 Enable connection pooling and selector caching by default 2017-10-24 15:35:25 +01:00
Asim Aslam
53554d98cd Merge pull request #209 from uffy/master
use sync.Once instead of chan
2017-10-09 14:09:56 +01:00
Uffy
f6165f35c0 import sorting 2017-10-09 20:55:03 +08:00
Uffy
ae3f59a2f5 use sync.Once instead of chan
sync.Once is more clear and faster than chan.
2017-10-09 15:47:28 +08:00
Asim Aslam
0703c514a9 Merge pull request #208 from uffy/master
remove redundant rand.Seed
2017-10-09 07:32:18 +01:00
Uffy
b92130eeee remove redundant rand.Seed 2017-10-09 14:22:15 +08:00
Asim Aslam
e0e4596be0 update readme 2017-10-03 11:19:03 +01:00
Asim Aslam
5f60f7518d update readme 2017-10-03 11:14:39 +01:00
Asim Aslam
f2d4226817 update readme 2017-10-03 11:05:54 +01:00
Asim Aslam
236cfd6a3b update readme 2017-10-03 11:03:46 +01:00
Asim Aslam
d29b5e2fab Merge pull request #203 from wzhliang/master
fixing typo
2017-08-25 11:00:18 +01:00
Wenzhi Liang
7f173dfc63 fixing typo 2017-08-25 17:56:57 +08:00
Asim Aslam
8be72b676d Merge pull request #202 from freman/mapsorting
Fix hashing of the service definition
2017-08-24 12:16:32 +01:00
Shannon Wynter
0e696f4907 Fix hashing of the service definition
Maps are sorted randomly, order the keys as a slice
2017-08-24 18:25:05 +10:00
Asim Aslam
1748328f14 Merge pull request #201 from vans9/register-subscriber-pass-opts
Pass options to the s.NewSubscriber
2017-08-24 08:53:50 +01:00
Pavel Arefiev
eff39083ca Pass options to the s.NewSubscriber 2017-08-24 10:47:32 +03:00
14 changed files with 406 additions and 326 deletions

View File

@@ -1,7 +1,7 @@
language: go
go:
- 1.8.3
- 1.9rc2
- 1.8.5
- 1.9.2
notifications:
slack:
secure: aEvhLbhujaGaKSrOokiG3//PaVHTIrc3fBpoRbCRqfZpyq6WREoapJJhF+tIpWWOwaC9GmChbD6aHo/jMUgwKXVyPSaNjiEL87YzUUpL8B2zslNp1rgfTg/LrzthOx3Q1TYwpaAl3to0fuHUVFX4yMeC2vuThq7WSXgMMxFCtbc=

234
README.md
View File

@@ -1,6 +1,6 @@
# Go Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![GoDoc](https://godoc.org/github.com/micro/go-micro?status.svg)](https://godoc.org/github.com/micro/go-micro) [![Travis CI](https://api.travis-ci.org/micro/go-micro.svg?branch=master)](https://travis-ci.org/micro/go-micro) [![Go Report Card](https://goreportcard.com/badge/micro/go-micro)](https://goreportcard.com/report/github.com/micro/go-micro)
Go Micro is a pluggable RPC framework for **microservices**. It is part of the [Micro](https://github.com/micro/micro) toolkit.
Go Micro is a pluggable RPC framework for distributed systems development.
The **Micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly but everything can be easily swapped out. It comes with built in support for {json,proto}-rpc encoding, consul or multicast dns for service discovery, http for communication and random hashed client side load balancing.
@@ -12,12 +12,12 @@ Follow us on [Twitter](https://twitter.com/microhq) or join the [Slack](http://s
Go Micro abstracts away the details of distributed systems. Here are the main features.
- **Service Discovery** - Applications are automatically registered with service discovery so they can find each other.
- **Load Balancing** - Smart client side load balancing is used to balance requests between instances of a service.
- **Synchronous Communication** - Request-response is provided as a bidirectional streaming transport layer.
- **Asynchronous Communication** - Microservices should promote an event driven architecture. Publish and Subscribe semantics are built in.
- **Message Encoding** - Micro services can encode requests in a number of encoding formats and seamlessly decode based on the Content-Type header.
- **RPC Client/Server** - The client and server leverage the above features and provide a clean simple interface for building microservices.
- **Service Discovery** - Automatic registration and name resolution with service discovery
- **Load Balancing** - Smart client side load balancing of services built on discovery
- **Synchronous Comms** - RPC based communication with support for bidirectional streaming
- **Asynchronous Comms** - PubSub interface built in for event driven architectures
- **Message Encoding** - Dynamic encoding based on content-type with protobuf and json out of the box
- **Service Interface** - All features are packaged in a simple high level interface for developing microservices
Go Micro supports both the Service and Function programming models. Read on to learn more.
@@ -27,70 +27,53 @@ For more detailed information on the architecture, installation and use of go-mi
## Learn By Example
An example service can be found in [**examples/service**](https://github.com/micro/examples/tree/master/service) and function in [**examples/function**](https://github.com/micro/examples/tree/master/function). The [**examples**](https://github.com/micro/examples) directory contains many more examples for using things such as middleware/wrappers, selector filters, pub/sub and code generation.
For the complete greeter example look at [**examples/greeter**](https://github.com/micro/examples/tree/master/greeter). Other examples can be found throughout the GitHub repository.
An example service can be found in [**examples/service**](https://github.com/micro/examples/tree/master/service) and function in [**examples/function**](https://github.com/micro/examples/tree/master/function).
Check out the blog post to learn how to write go-micro services [https://micro.mu/blog/2016/03/28/go-micro.html](https://micro.mu/blog/2016/03/28/go-micro.html) or watch the talk from the [Golang UK Conf 2016](https://www.youtube.com/watch?v=xspaDovwk34).
The [**examples**](https://github.com/micro/examples) directory contains examples for using things such as middleware/wrappers, selector filters, pub/sub, grpc, plugins and much more. For the complete greeter example look at [**examples/greeter**](https://github.com/micro/examples/tree/master/greeter). Other examples can be found throughout the GitHub repository.
## Getting Started
Watch the [Golang UK Conf 2016](https://www.youtube.com/watch?v=xspaDovwk34) video for a high level overview.
This is a quick getting started guide with the greeter service example.
## Getting started
### Prerequisites: Service Discovery
- [Service Discovery](#service-discovery)
- [Writing a Service](#writing-a-service)
- [Writing a Function](#writing-a-function)
- [Plugins](#plugins)
- [Wrappers](#wrappers)
There's just one prerequisite. We need a service discovery system to resolve service names to their address.
The default discovery mechanism used in go-micro is Consul. Discovery is however pluggable so you can used
etcd, kubernetes, zookeeper, etc. Plugins can be found in [micro/go-plugins](https://github.com/micro/go-plugins).
## Service Discovery
### Multicast DNS
We can use multicast DNS with the built in MDNS registry for a zero dependency configuration.
Just pass `--registry=mdns` to any command
```
$ go run main.go --registry=mdns
```
Service discovery is used to resolve service names to addresses. It's the only dependency of go-micro.
### Consul
Alternatively we can use the default discovery system which is Consul.
[Consul](https://www.consul.io/) is used as the default service discovery system.
**Mac OS**
```
brew install consul
consul agent -dev
```
Discovery is pluggable. Find plugins for etcd, kubernetes, zookeeper and more in the [micro/go-plugins](https://github.com/micro/go-plugins) repo.
**Docker**
```
docker run consul
```
[Install guide](https://www.consul.io/intro/getting-started/install.html)
[Further installation instructions](https://www.consul.io/intro/getting-started/install.html)
### Multicast DNS
### Run Service
[Multicast DNS](https://en.wikipedia.org/wiki/Multicast_DNS) is a built in service discovery plugin for a zero dependency configuration.
Pass `--registry=mdns` to any command or the enviroment variable MICRO_REGISTRY=mdns
```
$ go get github.com/micro/examples/service && service
2016/03/14 10:59:14 Listening on [::]:50137
2016/03/14 10:59:14 Broker Listening on [::]:50138
2016/03/14 10:59:14 Registering node: greeter-ca62b017-e9d3-11e5-9bbb-68a86d0d36b6
```
### Call Service
```
$ service --run_client
Hello John
go run main.go --registry=mdns
```
## Writing a service
This is a simple greeter RPC service example
Find this example at [examples/service](https://github.com/micro/examples/tree/master/service).
### Create service proto
One of the key requirements of microservices is strongly defined interfaces so we utilised protobuf to define the handler and request/response.
Here's a definition for the Greeter handler with the method Hello which takes a HelloRequest and HelloResponse both with one string arguments.
One of the key requirements of microservices is strongly defined interfaces. Micro uses protobuf to achieve this.
`go-micro/examples/service/proto/greeter.proto`:
Here we define the Greeter handler with the method Hello. It takes a HelloRequest and HelloResponse both with one string arguments.
```proto
syntax = "proto3";
@@ -110,28 +93,33 @@ message HelloResponse {
### Install protobuf
We use a protobuf plugin for code generation. This is completely optional. Look at [examples/server](https://github.com/micro/examples/blob/master/server/main.go)
and [examples/client](https://github.com/micro/examples/blob/master/client/main.go) for examples without code generation.
Install [protobuf](https://developers.google.com/protocol-buffers/)
Now install the micro fork of protoc-gen-go. The protobuf compiler for Go.
```shell
go get github.com/micro/protobuf/{proto,protoc-gen-go}
```
There's still a need for proto compiler to generate Go stub code from our proto file. You can either use the micro fork above or the official repo `github.com/golang/protobuf`.
### Generate the proto
### Compile the proto
After writing the proto definition we must compile it using protoc with the micro plugin.
```shell
protoc -I$GOPATH/src --go_out=plugins=micro:$GOPATH/src \
$GOPATH/src/github.com/micro/examples/service/proto/greeter.proto
```
### Define the service
### Write the service
Below is the code sample for the Greeter service. It basically implements the interface defined above for the Greeter handler,
initialises the service, registers the handler and then runs itself. Simple as that.
Below is the code for the greeter service.
`go-micro/examples/service/main.go`:
It does the following:
1. Implements the interface defined for the Greeter handler
2. Initialises a micro.Service
3. Registers the Greeter handler
4. Runs the service
```go
package main
@@ -155,7 +143,6 @@ func main() {
// Create a new service. Optionally include some options here.
service := micro.NewService(
micro.Name("greeter"),
micro.Version("latest"),
)
// Init will parse the command line flags.
@@ -174,6 +161,10 @@ func main() {
### Run service
```
go run examples/service/main.go
```
Output
```
2016/03/14 10:59:14 Listening on [::]:50137
2016/03/14 10:59:14 Broker Listening on [::]:50138
2016/03/14 10:59:14 Registering node: greeter-ca62b017-e9d3-11e5-9bbb-68a86d0d36b6
@@ -181,10 +172,9 @@ go run examples/service/main.go
### Define a client
Below is the client code to query the greeter service. Notice we're using the code generated client interface `proto.NewGreeterClient`.
This reduces the amount of boiler plate code we need to write. The greeter client can be reused throughout the code if need be.
Below is the client code to query the greeter service.
`client.go`
The generated proto includes a greeter client to reduce boilerplate code.
```go
package main
@@ -220,13 +210,18 @@ func main() {
```shell
go run client.go
```
Output
```
Hello John
```
## Writing a Function
Go Micro includes the Function programming model. This is the notion of a one time executing Service which operates much like a service except exiting
after completing a request. A function is defined much like a service and called in exactly the same way.
Go Micro includes the Function programming model.
A Function is a one time executing Service which exits after completing a request.
### Defining a Function
@@ -265,61 +260,6 @@ func main() {
It's that simple.
## How does it work?
<p align="center">
<img src="go-micro.png" />
</p>
Go Micro is a framework that addresses the fundamental requirements to write microservices.
Let's dig into the core components.
### Registry
The registry provides a service discovery mechanism to resolve names to addresses. It can be backed by consul, etcd, zookeeper, dns, gossip, etc.
Services should register using the registry on startup and deregister on shutdown. Services can optionally provide an expiry TTL and reregister
on an interval to ensure liveness and that the service is cleaned up if it dies.
### Selector
The selector is a load balancing abstraction which builds on the registry. It allows services to be "filtered" using filter functions and "selected"
using a choice of algorithms such as random, roundrobin, leastconn, etc. The selector is leveraged by the Client when making requests. The client
will use the selector rather than the registry as it provides that built in mechanism of load balancing.
### Transport
The transport is the interface for synchronous request/response communication between services. It's akin to the golang net package but provides
a higher level abstraction which allows us to switch out communication mechanisms e.g http, rabbitmq, websockets, NATS. The transport also
supports bidirectional streaming. This is powerful for client side push to the server.
### Broker
The broker provides an interface to a message broker for asynchronous pub/sub communication. This is one of the fundamental requirements of an event
driven architecture and microservices. By default we use an inbox style point to point HTTP system to minimise the number of dependencies required
to get started. However there are many message broker implementations available in go-plugins e.g RabbitMQ, NATS, NSQ, Google Cloud Pub Sub.
### Codec
The codec is used for encoding and decoding messages before transporting them across the wire. This could be json, protobuf, bson, msgpack, etc.
Where this differs from most other codecs is that we actually support the RPC format here as well. So we have JSON-RPC, PROTO-RPC, BSON-RPC, etc.
It separates encoding from the client/server and provides a powerful method for integrating other systems such as gRPC, Vanadium, etc.
### Server
The server is the building block for writing a service. Here you can name your service, register request handlers, add middeware, etc. The service
builds on the above packages to provide a unified interface for serving requests. The built in server is an RPC system. In the future there maybe
other implementations. The server also allows you to define multiple codecs to serve different encoded messages.
### Client
The client provides an interface to make requests to services. Again like the server, it builds on the other packages to provide a unified interface
for finding services by name using the registry, load balancing using the selector, making synchronous requests with the transport and asynchronous
messaging using the broker.
The above components are combined at the top-level of micro as a **Service**.
## Plugins
By default go-micro only provides a few implementation of each interface at the core but it's completely pluggable. There's already dozens of plugins which are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins). Contributions are welcome!
@@ -353,6 +293,64 @@ Flag usage of plugins
service --registry=etcdv3 --transport=nats --broker=kafka
```
## Wrappers
Go-micro includes the notion of middleware as wrappers. The client or handlers can be wrapped using the decorator pattern.
### Handler
Here's an example service handler wrapper which logs the incoming request
```go
// implements the server.HandlerWrapper
func logWrapper(fn server.HandlerFunc) server.HandlerFunc {
return func(ctx context.Context, req server.Request, rsp interface{}) error {
fmt.Printf("[%v] server request: %s", time.Now(), req.Method())
return fn(ctx, req, rsp)
}
}
```
It can be initialised when creating the service
```go
service := micro.NewService(
micro.Name("greeter"),
// wrap the handler
micro.WrapHandler(logWrapper),
)
```
### Client
Here's an example of a client wrapper which logs requests made
```go
type logWrapper struct {
client.Client
}
func (l *logWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
fmt.Printf("[wrapper] client request to service: %s method: %s\n", req.Service(), req.Method())
return l.Client.Call(ctx, req, rsp)
}
// implements client.Wrapper as logWrapper
func logWrap(c client.Client) client.Client {
return &logWrapper{c}
}
```
It can be initialised when creating the service
```go
service := micro.NewService(
micro.Name("greeter"),
// wrap the client
micro.WrapClient(logWrap),
)
```
## Other Languages
Check out [ja-micro](https://github.com/Sixt/ja-micro) to write services in Java

View File

@@ -2,8 +2,6 @@
package broker
// Broker is an interface used for asynchronous messaging.
// Its an abstraction over various message brokers
// {NATS, RabbitMQ, Kafka, ...}
type Broker interface {
Options() Options
Address() string

View File

@@ -3,6 +3,7 @@ package broker
import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"io"
"io/ioutil"
@@ -18,8 +19,9 @@ import (
"github.com/micro/go-log"
"github.com/micro/go-micro/broker/codec/json"
"github.com/micro/go-micro/errors"
merr "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-rcache"
maddr "github.com/micro/misc/lib/addr"
mnet "github.com/micro/misc/lib/net"
mls "github.com/micro/misc/lib/tls"
@@ -28,15 +30,11 @@ import (
"golang.org/x/net/context"
)
// HTTP Broker is a placeholder for actual message brokers.
// This should not really be used in production but useful
// in developer where you want zero dependencies.
// HTTP Broker is a point to point async broker
type httpBroker struct {
id string
address string
unsubscribe chan *httpSubscriber
opts Options
id string
address string
opts Options
mux *http.ServeMux
@@ -53,9 +51,9 @@ type httpSubscriber struct {
opts SubscribeOptions
id string
topic string
ch chan *httpSubscriber
fn Handler
svc *registry.Service
hb *httpBroker
}
type httpPublication struct {
@@ -106,11 +104,13 @@ func newHttpBroker(opts ...Option) Broker {
o(&options)
}
// set address
addr := ":0"
if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 {
addr = options.Addrs[0]
}
// get registry
reg, ok := options.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
@@ -123,7 +123,6 @@ func newHttpBroker(opts ...Option) Broker {
r: reg,
c: &http.Client{Transport: newTransport(options.TLSConfig)},
subscribers: make(map[string][]*httpSubscriber),
unsubscribe: make(chan *httpSubscriber),
exit: make(chan chan error),
mux: http.NewServeMux(),
}
@@ -153,9 +152,41 @@ func (h *httpSubscriber) Topic() string {
}
func (h *httpSubscriber) Unsubscribe() error {
h.ch <- h
// artificial delay
time.Sleep(time.Millisecond * 10)
return h.hb.unsubscribe(h)
}
func (h *httpBroker) subscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil {
return err
}
h.subscribers[s.topic] = append(h.subscribers[s.topic], s)
return nil
}
func (h *httpBroker) unsubscribe(s *httpSubscriber) error {
h.Lock()
defer h.Unlock()
var subscribers []*httpSubscriber
// look for subscriber
for _, sub := range h.subscribers[s.topic] {
// deregister and skip forward
if sub.id == s.id {
h.r.Deregister(sub.svc)
continue
}
// keep subscriber
subscribers = append(subscribers, sub)
}
// set subscribers
h.subscribers[s.topic] = subscribers
return nil
}
@@ -177,29 +208,75 @@ func (h *httpBroker) run(l net.Listener) {
// received exit signal
case ch := <-h.exit:
ch <- l.Close()
h.Lock()
h.running = false
h.Unlock()
return
// unsubscribe subscriber
case subscriber := <-h.unsubscribe:
h.Lock()
var subscribers []*httpSubscriber
for _, sub := range h.subscribers[subscriber.topic] {
// deregister and skip forward
if sub.id == subscriber.id {
h.RLock()
for _, subs := range h.subscribers {
for _, sub := range subs {
h.r.Deregister(sub.svc)
continue
}
subscribers = append(subscribers, sub)
}
h.subscribers[subscriber.topic] = subscribers
h.Unlock()
h.RUnlock()
return
}
}
}
func (h *httpBroker) start() error {
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
err := merr.BadRequest("go.micro.broker", "Method not allowed")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
return
}
defer req.Body.Close()
req.ParseForm()
b, err := ioutil.ReadAll(req.Body)
if err != nil {
errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
var m *Message
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
topic := m.Header[":topic"]
delete(m.Header, ":topic")
if len(topic) == 0 {
errr := merr.InternalServerError("go.micro.broker", "Topic not found")
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
p := &httpPublication{m: m, t: topic}
id := req.Form.Get("id")
h.RLock()
for _, subscriber := range h.subscribers[topic] {
if id == subscriber.id {
// sub is sync; crufty rate limiting
// so we don't hose the cpu
subscriber.fn(p)
}
}
h.RUnlock()
}
func (h *httpBroker) Address() string {
h.RLock()
defer h.RUnlock()
return h.address
}
func (h *httpBroker) Connect() error {
h.Lock()
defer h.Unlock()
@@ -255,11 +332,20 @@ func (h *httpBroker) start() error {
go http.Serve(l, h.mux)
go h.run(l)
// get registry
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
// set rcache
h.r = rcache.New(reg)
// set running
h.running = true
return nil
}
func (h *httpBroker) stop() error {
func (h *httpBroker) Disconnect() error {
h.Lock()
defer h.Unlock()
@@ -267,76 +353,30 @@ func (h *httpBroker) stop() error {
return nil
}
// stop rcache
rc, ok := h.r.(rcache.Cache)
if ok {
rc.Stop()
}
// exit and return err
ch := make(chan error)
h.exit <- ch
err := <-ch
// set not running
h.running = false
return err
}
func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
err := errors.BadRequest("go.micro.broker", "Method not allowed")
http.Error(w, err.Error(), http.StatusMethodNotAllowed)
return
}
defer req.Body.Close()
req.ParseForm()
b, err := ioutil.ReadAll(req.Body)
if err != nil {
errr := errors.InternalServerError("go.micro.broker", "Error reading request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
var m *Message
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
errr := errors.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
topic := m.Header[":topic"]
delete(m.Header, ":topic")
if len(topic) == 0 {
errr := errors.InternalServerError("go.micro.broker", "Topic not found")
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
p := &httpPublication{m: m, t: topic}
id := req.Form.Get("id")
h.RLock()
for _, subscriber := range h.subscribers[topic] {
if id == subscriber.id {
// sub is sync; crufty rate limiting
// so we don't hose the cpu
subscriber.fn(p)
}
}
h.RUnlock()
}
func (h *httpBroker) Address() string {
return h.address
}
func (h *httpBroker) Connect() error {
return h.start()
}
func (h *httpBroker) Disconnect() error {
return h.stop()
}
func (h *httpBroker) Init(opts ...Option) error {
h.Lock()
defer h.Unlock()
if h.running {
return errors.New("cannot init while connected")
}
for _, o := range opts {
o(&h.opts)
}
@@ -345,12 +385,19 @@ func (h *httpBroker) Init(opts ...Option) error {
h.id = "broker-" + uuid.NewUUID().String()
}
// get registry
reg, ok := h.opts.Context.Value(registryKey).(registry.Registry)
if !ok {
reg = registry.DefaultRegistry
}
h.r = reg
// get rcache
if rc, ok := h.r.(rcache.Cache); ok {
rc.Stop()
}
// set registry
h.r = rcache.New(reg)
return nil
}
@@ -360,10 +407,13 @@ func (h *httpBroker) Options() Options {
}
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
h.RLock()
s, err := h.r.GetService("topic:" + topic)
if err != nil {
h.RUnlock()
return err
}
h.RUnlock()
m := &Message{
Header: make(map[string]string),
@@ -381,7 +431,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
return err
}
fn := func(node *registry.Node, b []byte) {
pub := func(node *registry.Node, b []byte) {
scheme := "http"
// check if secure is added in metadata
@@ -411,15 +461,14 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
case broadcastVersion:
for _, node := range service.Nodes {
// publish async
go fn(node, b)
go pub(node, b)
}
default:
// select node to publish to
node := service.Nodes[rand.Int()%len(service.Nodes)]
// publish async
go fn(node, b)
go pub(node, b)
}
}
@@ -427,7 +476,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
}
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
opt := newSubscribeOptions(opts...)
options := newSubscribeOptions(opts...)
// parse address for host, port
parts := strings.Split(h.Address(), ":")
@@ -439,7 +488,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
return nil, err
}
id := uuid.NewUUID().String()
// create unique id
id := h.id + "." + uuid.NewUUID().String()
var secure bool
@@ -449,7 +499,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
// register service
node := &registry.Node{
Id: h.id + "." + id,
Id: id,
Address: addr,
Port: port,
Metadata: map[string]string{
@@ -457,7 +507,8 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
},
}
version := opt.Queue
// check for queue group or broadcast queue
version := options.Queue
if len(version) == 0 {
version = broadcastVersion
}
@@ -468,22 +519,22 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
Nodes: []*registry.Node{node},
}
// generate subscriber
subscriber := &httpSubscriber{
opts: opt,
id: h.id + "." + id,
opts: options,
hb: h,
id: id,
topic: topic,
ch: h.unsubscribe,
fn: handler,
svc: service,
}
if err := h.r.Register(service, registry.RegisterTTL(registerTTL)); err != nil {
// subscribe now
if err := h.subscribe(subscriber); err != nil {
return nil, err
}
h.Lock()
h.subscribers[topic] = append(h.subscribers[topic], subscriber)
h.Unlock()
// return the subscriber
return subscriber, nil
}

View File

@@ -70,7 +70,8 @@ var (
cli.IntFlag{
Name: "client_pool_size",
EnvVar: "MICRO_CLIENT_POOL_SIZE",
Usage: "Sets the client connection pool size. Default: 0",
Usage: "Sets the client connection pool size. Default: 1",
Value: 1,
},
cli.StringFlag{
Name: "client_pool_ttl",
@@ -132,6 +133,7 @@ var (
Name: "selector",
EnvVar: "MICRO_SELECTOR",
Usage: "Selector used to pick nodes for querying",
Value: "cache",
},
cli.StringFlag{
Name: "server",
@@ -181,7 +183,7 @@ var (
defaultServer = "rpc"
defaultBroker = "http"
defaultRegistry = "consul"
defaultSelector = "default"
defaultSelector = "cache"
defaultTransport = "http"
)

View File

@@ -45,7 +45,7 @@ var (
HeaderPrefix = "X-Micro-"
)
// NewService creates an returns a new Service based on the packages within.
// NewService creates and returns a new Service based on the packages within.
func NewService(opts ...Option) Service {
return newService(opts...)
}
@@ -81,5 +81,5 @@ func RegisterHandler(s server.Server, h interface{}, opts ...server.HandlerOptio
// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h))
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
}

View File

@@ -104,6 +104,13 @@ func Registry(r registry.Registry) Option {
}
}
// Selector sets the selector for the service client
func Selector(s selector.Selector) Option {
return func(o *Options) {
o.Client.Init(client.Selector(s))
}
}
// Transport sets the transport for the service
// and the underlying components
func Transport(t transport.Transport) Option {

View File

@@ -59,14 +59,6 @@ func newConsulRegistry(opts ...Option) Registry {
config = c
}
}
if config.HttpClient == nil {
config.HttpClient = new(http.Client)
}
// set timeout
if options.Timeout > 0 {
config.HttpClient.Timeout = options.Timeout
}
// check if there are any addrs
if len(options.Addrs) > 0 {
@@ -82,6 +74,10 @@ func newConsulRegistry(opts ...Option) Registry {
// requires secure connection?
if options.Secure || options.TLSConfig != nil {
if config.HttpClient == nil {
config.HttpClient = new(http.Client)
}
config.Scheme = "https"
// We're going to support InsecureSkipVerify
config.HttpClient.Transport = newTransport(options.TLSConfig)
@@ -90,6 +86,11 @@ func newConsulRegistry(opts ...Option) Registry {
// create the client
client, _ := consul.NewClient(config)
// set timeout
if options.Timeout > 0 {
config.HttpClient.Timeout = options.Timeout
}
cr := &consulRegistry{
Address: config.Address,
Client: client,

View File

@@ -41,7 +41,7 @@ func newMockServer(rg *mockRegistry, l net.Listener) error {
}
func newConsulTestRegistry(r *mockRegistry) (*consulRegistry, func()) {
l, err := net.Listen("tcp", ":0")
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
// blurgh?!!
panic(err.Error())
@@ -104,7 +104,11 @@ func TestConsul_GetService_WithHealthyServiceNodes(t *testing.T) {
})
defer cl()
svc, _ := cr.GetService("service-name")
svc, err := cr.GetService("service-name")
if err != nil {
t.Fatal("Unexpected error", err)
}
if exp, act := 1, len(svc); exp != act {
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
}
@@ -140,7 +144,11 @@ func TestConsul_GetService_WithUnhealthyServiceNode(t *testing.T) {
})
defer cl()
svc, _ := cr.GetService("service-name")
svc, err := cr.GetService("service-name")
if err != nil {
t.Fatal("Unexpected error", err)
}
if exp, act := 1, len(svc); exp != act {
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
}
@@ -176,7 +184,11 @@ func TestConsul_GetService_WithUnhealthyServiceNodes(t *testing.T) {
})
defer cl()
svc, _ := cr.GetService("service-name")
svc, err := cr.GetService("service-name")
if err != nil {
t.Fatal("Unexpected error", err)
}
if exp, act := 1, len(svc); exp != act {
t.Fatalf("Expected len of svc to be `%d`, got `%d`.", exp, act)
}

View File

@@ -23,6 +23,8 @@ type cacheSelector struct {
cache map[string][]*registry.Service
ttls map[string]time.Time
once sync.Once
// used to close or reload watcher
reload chan bool
exit chan bool
@@ -86,30 +88,53 @@ func (c *cacheSelector) get(service string) ([]*registry.Service, error) {
c.Lock()
defer c.Unlock()
// get does the actual request for a service
// it also caches it
get := func(service string) ([]*registry.Service, error) {
// ask the registry
services, err := c.so.Registry.GetService(service)
if err != nil {
return nil, err
}
// cache results
c.set(service, c.cp(services))
return services, nil
}
// check the cache first
services, ok := c.cache[service]
// cache miss or no services
if !ok || len(services) == 0 {
return get(service)
}
// got cache but lets check ttl
ttl, kk := c.ttls[service]
// got results, copy and return
if ok && len(services) > 0 {
// only return if its less than the ttl
if kk && time.Since(ttl) < c.ttl {
return c.cp(services), nil
}
// within ttl so return cache
if kk && time.Since(ttl) < c.ttl {
return c.cp(services), nil
}
// cache miss or ttl expired
// expired entry so get service
services, err := get(service)
// now ask the registry
services, err := c.so.Registry.GetService(service)
if err != nil {
return nil, err
// no error then return error
if err == nil {
return services, nil
}
// we didn't have any results so cache
c.cache[service] = c.cp(services)
c.ttls[service] = time.Now().Add(c.ttl)
return services, nil
// not found error then return
if err == registry.ErrNotFound {
return nil, selector.ErrNotFound
}
// other error
// return expired cache as last resort
return c.cp(services), nil
}
func (c *cacheSelector) set(service string, services []*registry.Service) {
@@ -230,8 +255,6 @@ func (c *cacheSelector) update(res *registry.Result) {
// reloads the watcher if Init is called
// and returns when Close is called
func (c *cacheSelector) run() {
go c.tick()
for {
// exit early if already dead
if c.quit() {
@@ -241,6 +264,9 @@ func (c *cacheSelector) run() {
// create new watcher
w, err := c.so.Registry.Watch()
if err != nil {
if c.quit() {
return
}
log.Log(err)
time.Sleep(time.Second)
continue
@@ -248,33 +274,15 @@ func (c *cacheSelector) run() {
// watch for events
if err := c.watch(w); err != nil {
if c.quit() {
return
}
log.Log(err)
continue
}
}
}
// check cache and expire on each tick
func (c *cacheSelector) tick() {
t := time.NewTicker(time.Minute)
for {
select {
case <-t.C:
c.Lock()
for service, expiry := range c.ttls {
if d := time.Since(expiry); d > c.ttl {
// TODO: maybe refresh the cache rather than blowing it away
c.del(service)
}
}
c.Unlock()
case <-c.exit:
return
}
}
}
// watch loops the next event and calls update
// it returns if there's an error
func (c *cacheSelector) watch(w registry.Watcher) error {
@@ -324,6 +332,10 @@ func (c *cacheSelector) Options() selector.Options {
}
func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) {
c.once.Do(func() {
go c.run()
})
sopts := selector.SelectOptions{
Strategy: c.so.Strategy,
}
@@ -401,7 +413,7 @@ func NewSelector(opts ...selector.Option) selector.Selector {
}
}
c := &cacheSelector{
return &cacheSelector{
so: sopts,
ttl: ttl,
cache: make(map[string][]*registry.Service),
@@ -409,7 +421,4 @@ func NewSelector(opts ...selector.Option) selector.Selector {
reload: make(chan bool, 1),
exit: make(chan bool),
}
go c.run()
return c
}

View File

@@ -1,9 +1,6 @@
package selector
import (
"math/rand"
"time"
"github.com/micro/go-micro/registry"
)
@@ -11,10 +8,6 @@ type defaultSelector struct {
so Options
}
func init() {
rand.Seed(time.Now().Unix())
}
func (r *defaultSelector) Init(opts ...Option) error {
for _, o := range opts {
o(&r.so)

View File

@@ -3,6 +3,7 @@ package server
import (
"fmt"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
@@ -181,12 +182,12 @@ func (s *rpcServer) Subscribe(sb Subscriber) error {
}
s.Lock()
defer s.Unlock()
_, ok = s.subscribers[sub]
if ok {
return fmt.Errorf("subscriber %v already exists", s)
}
s.subscribers[sub] = nil
s.Unlock()
return nil
}
@@ -232,19 +233,34 @@ func (s *rpcServer) Register() error {
node.Metadata["registry"] = config.Registry.String()
s.RLock()
var endpoints []*registry.Endpoint
for _, e := range s.handlers {
// Maps are ordered randomly, sort the keys for consistency
var handlerList []string
for n, e := range s.handlers {
// Only advertise non internal handlers
if !e.Options().Internal {
endpoints = append(endpoints, e.Endpoints()...)
handlerList = append(handlerList, n)
}
}
for e, _ := range s.subscribers {
sort.Strings(handlerList)
var subscriberList []*subscriber
for e := range s.subscribers {
// Only advertise non internal subscribers
if !e.Options().Internal {
endpoints = append(endpoints, e.Endpoints()...)
subscriberList = append(subscriberList, e)
}
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
var endpoints []*registry.Endpoint
for _, n := range handlerList {
endpoints = append(endpoints, s.handlers[n].Endpoints()...)
}
for _, e := range subscriberList {
endpoints = append(endpoints, e.Endpoints()...)
}
s.RUnlock()
service := &registry.Service{

View File

@@ -3,9 +3,11 @@ package micro
import (
"os"
"os/signal"
"sync"
"syscall"
"time"
log "github.com/micro/go-log"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/metadata"
@@ -15,7 +17,7 @@ import (
type service struct {
opts Options
init chan bool
once sync.Once
}
func newService(opts ...Option) Service {
@@ -30,7 +32,6 @@ func newService(opts ...Option) Service {
return &service{
opts: options,
init: make(chan bool),
}
}
@@ -44,7 +45,10 @@ func (s *service) run(exit chan bool) {
for {
select {
case <-t.C:
s.opts.Server.Register()
err := s.opts.Server.Register()
if err != nil {
log.Log("service run Server.Register error: ", err)
}
case <-exit:
t.Stop()
return
@@ -56,23 +60,12 @@ func (s *service) run(exit chan bool) {
// which parses command line flags. cmd.Init is only called
// on first Init.
func (s *service) Init(opts ...Option) {
// If <-s.init blocks, Init has not been called yet
// so we can call cmd.Init once.
select {
case <-s.init:
// only process options
for _, o := range opts {
o(&s.opts)
}
default:
// close init
close(s.init)
// process options
for _, o := range opts {
o(&s.opts)
}
// process options
for _, o := range opts {
o(&s.opts)
}
s.once.Do(func() {
// Initialise the command flags, overriding new service
s.opts.Cmd.Init(
cmd.Broker(&s.opts.Broker),
@@ -81,7 +74,7 @@ func (s *service) Init(opts ...Option) {
cmd.Client(&s.opts.Client),
cmd.Server(&s.opts.Server),
)
}
})
}
func (s *service) Options() Options {

View File

@@ -32,7 +32,7 @@ func TestHTTPTransportPortRange(t *testing.T) {
}
expectedPort(t, "44445", lsn2)
lsn, err := tp.Listen(":0")
lsn, err := tp.Listen("127.0.0.1:0")
if err != nil {
t.Errorf("Did not expect an error, got %s", err)
}
@@ -45,7 +45,7 @@ func TestHTTPTransportPortRange(t *testing.T) {
func TestHTTPTransportCommunication(t *testing.T) {
tr := NewTransport()
l, err := tr.Listen(":0")
l, err := tr.Listen("127.0.0.1:0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}
@@ -111,7 +111,7 @@ func TestHTTPTransportCommunication(t *testing.T) {
func TestHTTPTransportError(t *testing.T) {
tr := NewTransport()
l, err := tr.Listen(":0")
l, err := tr.Listen("127.0.0.1:0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}
@@ -181,7 +181,7 @@ func TestHTTPTransportError(t *testing.T) {
func TestHTTPTransportTimeout(t *testing.T) {
tr := NewTransport(Timeout(time.Millisecond * 100))
l, err := tr.Listen(":0")
l, err := tr.Listen("127.0.0.1:0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}