Compare commits

...

24 Commits

Author SHA1 Message Date
Asim Aslam
31fc8df2ba add server request body 2019-02-04 13:13:03 +00:00
Asim Aslam
baf7de76bf Merge branch 'master' of github.com:micro/go-micro 2019-02-04 10:29:26 +00:00
Asim Aslam
31b6cad47b make copy before writing 2019-02-04 10:29:10 +00:00
Asim Aslam
686171c26d Merge pull request #413 from qkzsky/qkzsky-rpc-fix
client close: rpc: unable to write error response
2019-02-03 13:13:10 +00:00
kuangzhiqiang
6be205fd40 client close: rpc: unable to write error response
when client close notice: "rpc: unable to write error response..."
2019-02-03 19:12:13 +08:00
Asim Aslam
89014160fc Merge pull request #411 from unistack-org/gossip
registry: gossip unify registry option passing, optimize
2019-02-01 22:21:08 +00:00
422e2002a0 registry: gossip unify registry option passing, optimize
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2019-02-02 01:12:39 +03:00
Asim Aslam
cead99ac44 fix nil pointer 2019-02-01 16:01:51 +00:00
Asim Aslam
c03d935ffd fallback for 0.14.0 and older 2019-02-01 15:57:34 +00:00
Asim Aslam
88e12347d0 update mdns to remove race condition 2019-02-01 13:41:11 +00:00
Asim Aslam
652b1067f5 fix data race 2019-02-01 09:05:03 +00:00
Asim Aslam
7888d3e13d use official h2c server 2019-01-31 17:14:36 +00:00
Asim Aslam
b1a31134bd Support micro proxy 2019-01-30 18:42:11 +00:00
Asim Aslam
107b571019 Add go mod 2019-01-30 11:43:40 +00:00
Asim Aslam
89c8e1f4a7 update readme 2019-01-29 09:20:34 +00:00
Asim Aslam
a06cd72337 update image 2019-01-29 09:08:14 +00:00
Asim Aslam
e22fa01935 fix ticker 2019-01-24 16:08:04 +00:00
Asim Aslam
a5015692e3 Merge pull request #400 from micro/interval
Move RegisterInterval into the server
2019-01-24 13:55:05 +00:00
Asim Aslam
539b8c1a3b Move RegisterInterval into the server 2019-01-24 13:22:17 +00:00
Asim Aslam
67a738b504 Merge pull request #399 from unistack-org/master
add context to SubscriberOptions
2019-01-24 13:11:33 +00:00
ac1afea7fc add context to server.SubscriberOptions and broker.SubscribeOption
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2019-01-24 15:36:01 +03:00
Asim Aslam
8090f9968d Update headers to remove X- prefix 2019-01-24 10:11:02 +00:00
Asim Aslam
7542aafd29 Update package comment 2019-01-23 18:15:17 +00:00
Asim Aslam
13de868b21 Rename 2019-01-23 18:14:36 +00:00
28 changed files with 835 additions and 384 deletions

View File

@@ -1,6 +1,6 @@
# Go Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![GoDoc](https://godoc.org/github.com/micro/go-micro?status.svg)](https://godoc.org/github.com/micro/go-micro) [![Travis CI](https://api.travis-ci.org/micro/go-micro.svg?branch=master)](https://travis-ci.org/micro/go-micro) [![Go Report Card](https://goreportcard.com/badge/micro/go-micro)](https://goreportcard.com/report/github.com/micro/go-micro) # Go Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![GoDoc](https://godoc.org/github.com/micro/go-micro?status.svg)](https://godoc.org/github.com/micro/go-micro) [![Travis CI](https://api.travis-ci.org/micro/go-micro.svg?branch=master)](https://travis-ci.org/micro/go-micro) [![Go Report Card](https://goreportcard.com/badge/micro/go-micro)](https://goreportcard.com/report/github.com/micro/go-micro)
Go Micro is a pluggable framework for micro service development. Go Micro is a framework for micro service development.
## Overview ## Overview
@@ -8,7 +8,7 @@ Go Micro provides the core requirements for distributed systems development incl
The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly
but everything can be easily swapped out. but everything can be easily swapped out.
<img src="https://micro.mu/docs/images/go-micro.png" /> <img src="https://micro.mu/docs/images/go-micro.svg" />
Plugins are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins). Plugins are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).

View File

@@ -111,3 +111,10 @@ func TLSConfig(t *tls.Config) Option {
o.TLSConfig = t o.TLSConfig = t
} }
} }
// SubscribeContext set context
func SubscribeContext(ctx context.Context) SubscribeOption {
return func(o *SubscribeOptions) {
o.Context = ctx
}
}

View File

@@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"os"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
@@ -213,12 +214,17 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
codec: codec, codec: codec,
} }
// set request codec
if r, ok := req.(*rpcRequest); ok {
r.codec = codec
}
stream := &rpcStream{ stream := &rpcStream{
context: ctx, context: ctx,
request: req, request: req,
response: rsp, response: rsp,
closed: make(chan bool), closed: make(chan bool),
codec: newRpcCodec(msg, c, cf), codec: codec,
} }
ch := make(chan error, 1) ch := make(chan error, 1)
@@ -268,6 +274,18 @@ func (r *rpcClient) Options() Options {
} }
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) { func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
service := request.Service()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
service = prx
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
opts.Address = prx
}
// return remote address // return remote address
if len(opts.Address) > 0 { if len(opts.Address) > 0 {
address := opts.Address address := opts.Address
@@ -288,11 +306,11 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro
} }
// get next nodes from the selector // get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...) next, err := r.opts.Selector.Select(service, opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound { if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error()) return nil, errors.NotFound("go.micro.client", "service %s: %v", service, err.Error())
} else if err != nil { } else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", request.Service(), err.Error()) return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", service, err.Error())
} }
return next, nil return next, nil
@@ -487,8 +505,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
id := uuid.New().String() id := uuid.New().String()
md["Content-Type"] = msg.ContentType() md["Content-Type"] = msg.ContentType()
md["X-Micro-Topic"] = msg.Topic() md["Micro-Topic"] = msg.Topic()
md["X-Micro-Id"] = id md["Micro-Id"] = id
// encode message body // encode message body
cf, err := r.newCodec(msg.ContentType()) cf, err := r.newCodec(msg.ContentType())
@@ -500,8 +518,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
Target: msg.Topic(), Target: msg.Topic(),
Type: codec.Publication, Type: codec.Publication,
Header: map[string]string{ Header: map[string]string{
"X-Micro-Id": id, "Micro-Id": id,
"X-Micro-Topic": msg.Topic(), "Micro-Topic": msg.Topic(),
}, },
}, msg.Payload()); err != nil { }, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())

View File

@@ -84,6 +84,50 @@ func (rwc *readWriteCloser) Close() error {
return nil return nil
} }
func getHeaders(m *codec.Message) {
get := func(hdr string) string {
if hd := m.Header[hdr]; len(hd) > 0 {
return hd
}
// old
return m.Header["X-"+hdr]
}
// check error in header
if len(m.Error) == 0 {
m.Error = get("Micro-Error")
}
// check endpoint in header
if len(m.Endpoint) == 0 {
m.Endpoint = get("Micro-Endpoint")
}
// check method in header
if len(m.Method) == 0 {
m.Method = get("Micro-Method")
}
if len(m.Id) == 0 {
m.Id = get("Micro-Id")
}
}
func setHeaders(m *codec.Message) {
set := func(hdr, v string) {
if len(v) == 0 {
return
}
m.Header[hdr] = v
m.Header["X-"+hdr] = v
}
set("Micro-Id", m.Id)
set("Micro-Service", m.Target)
set("Micro-Method", m.Method)
set("Micro-Endpoint", m.Endpoint)
}
// setupProtocol sets up the old protocol // setupProtocol sets up the old protocol
func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec { func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
protocol := node.Metadata["protocol"] protocol := node.Metadata["protocol"]
@@ -133,10 +177,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
} }
// set the mucp headers // set the mucp headers
m.Header["X-Micro-Id"] = m.Id setHeaders(m)
m.Header["X-Micro-Service"] = m.Target
m.Header["X-Micro-Method"] = m.Method
m.Header["X-Micro-Endpoint"] = m.Endpoint
// if body is bytes Frame don't encode // if body is bytes Frame don't encode
if body != nil { if body != nil {
@@ -162,52 +203,32 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
Header: m.Header, Header: m.Header,
Body: m.Body, Body: m.Body,
} }
// send the request // send the request
if err := c.client.Send(&msg); err != nil { if err := c.client.Send(&msg); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error()) return errors.InternalServerError("go.micro.client.transport", err.Error())
} }
return nil return nil
} }
func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error { func (c *rpcCodec) ReadHeader(m *codec.Message, r codec.MessageType) error {
var m transport.Message var tm transport.Message
if err := c.client.Recv(&m); err != nil {
// read message from transport
if err := c.client.Recv(&tm); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error()) return errors.InternalServerError("go.micro.client.transport", err.Error())
} }
c.buf.rbuf.Reset()
c.buf.rbuf.Write(m.Body)
var me codec.Message c.buf.rbuf.Reset()
// set headers c.buf.rbuf.Write(tm.Body)
me.Header = m.Header
// set headers from transport
m.Header = tm.Header
// read header // read header
err := c.codec.ReadHeader(&me, r) err := c.codec.ReadHeader(m, r)
wm.Endpoint = me.Endpoint
wm.Method = me.Method
wm.Id = me.Id
wm.Error = me.Error
// check error in header // get headers
if len(me.Error) == 0 { getHeaders(m)
wm.Error = me.Header["X-Micro-Error"]
}
// check endpoint in header
if len(me.Endpoint) == 0 {
wm.Endpoint = me.Header["X-Micro-Endpoint"]
}
// check method in header
if len(me.Method) == 0 {
wm.Method = me.Header["X-Micro-Method"]
}
if len(me.Id) == 0 {
wm.Id = me.Header["X-Micro-Id"]
}
// return header error // return header error
if err != nil { if err != nil {

View File

@@ -404,6 +404,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second)) serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second))
} }
if val := time.Duration(ctx.GlobalInt("register_interval")); val > 0 {
serverOpts = append(serverOpts, server.RegisterInterval(val*time.Second))
}
// client opts // client opts
if r := ctx.Int("client_retries"); r >= 0 { if r := ctx.Int("client_retries"); r >= 0 {
clientOpts = append(clientOpts, client.Retries(r)) clientOpts = append(clientOpts, client.Retries(r))

View File

@@ -29,8 +29,8 @@ func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
// service method // service method
path := m.Header[":path"] path := m.Header[":path"]
if len(path) == 0 || path[0] != '/' { if len(path) == 0 || path[0] != '/' {
m.Target = m.Header["X-Micro-Service"] m.Target = m.Header["Micro-Service"]
m.Endpoint = m.Header["X-Micro-Endpoint"] m.Endpoint = m.Header["Micro-Endpoint"]
} else { } else {
// [ , a.package.Foo, Bar] // [ , a.package.Foo, Bar]
parts := strings.Split(path, "/") parts := strings.Split(path, "/")

17
go.mod Normal file
View File

@@ -0,0 +1,17 @@
module github.com/micro/go-micro
require (
github.com/golang/protobuf v1.2.0
github.com/google/uuid v1.1.0
github.com/hashicorp/consul v1.4.2
github.com/hashicorp/memberlist v0.1.3
github.com/micro/cli v0.1.0
github.com/micro/go-log v0.1.0
github.com/micro/go-rcache v0.1.0
github.com/micro/h2c v1.0.0
github.com/micro/mdns v0.1.0
github.com/micro/util v0.1.0
github.com/mitchellh/hashstructure v1.0.0
github.com/pkg/errors v0.8.1
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3
)

101
go.sum Normal file
View File

@@ -0,0 +1,101 @@
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/go-log/log v0.1.0 h1:wudGTNsiGzrD5ZjgIkVZ517ugi2XRe9Q/xRCzwEO4/U=
github.com/go-log/log v0.1.0/go.mod h1:4mBwpdRMFLiuXZDCwU2lKQFsoSCo72j3HqBK9d81N2M=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/uuid v1.1.0 h1:Jf4mxPC/ziBnoPIdpQdPJ9OeiomAUHLvxmPRSPH9m4s=
github.com/google/uuid v1.1.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/consul v1.4.2 h1:D9iJoJb8Ehe/Zmr+UEE3U3FjOLZ4LUxqFMl4O43BM1U=
github.com/hashicorp/consul v1.4.2/go.mod h1:mFrjN1mfidgJfYP1xrJCF+AfRhr6Eaqhb2+sfyn/OOI=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.0 h1:wvCrVc9TjDls6+YGAF2hAifE1E5U1+b4tH6KdvN3Gig=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-rootcerts v1.0.0 h1:ueI78wUjYExhCvMLow4icJnayNNFRgy0d9EGs/a1T44=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3 h1:EmmoJme1matNzb+hMpDuR/0sbJSUisxyqBGG676r31M=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/micro/cli v0.0.0-20181223203424-1b0c9793c300/go.mod h1:x9x6qy+tXv17jzYWQup462+j3SIUgDa6vVTzU4IXy/w=
github.com/micro/cli v0.1.0 h1:5DT+QdbAPPQvB3gYTgwze7tFO1m+7DU1sz9XfQczbsc=
github.com/micro/cli v0.1.0/go.mod h1:jRT9gmfVKWSS6pkKcXQ8YhUyj6bzwxK8Fp5b0Y7qNnk=
github.com/micro/go-log v0.1.0 h1:szYSR+yyTsomZM2jyinJC5562DlqffSjHmTZFaeZ2vY=
github.com/micro/go-log v0.1.0/go.mod h1:qaFBF6d6Jk01Gz4cbMkCA2vVuLk3FSaLLjmEGrMCreA=
github.com/micro/go-micro v0.23.0/go.mod h1:3z3lfMkNU9Sr1L/CxL++8pVJmQapRo0N6kNjwYDtOVs=
github.com/micro/go-rcache v0.1.0 h1:YTIgANVHgBe1XOQ/yLICL+s2gbZCAdW+c2ckhekjkuc=
github.com/micro/go-rcache v0.1.0/go.mod h1:INzyZjXO5M+PmN2A33YxD4TaOY61xjFIM4CfSHv+At8=
github.com/micro/h2c v1.0.0 h1:ejw6MS5+WaUoMHRtqkVCCrrVzLMzOFEH52rEyd8Fl2I=
github.com/micro/h2c v1.0.0/go.mod h1:54sOOQW/GRlHhH43vKwOhUb+kHaXhVxR0d3CJhn9alE=
github.com/micro/mdns v0.0.0-20181201230301-9c3770d4057a/go.mod h1:SQG6o/94RinohLuB5noHSevg2Iqg2wXLDUn4lj2LWWo=
github.com/micro/mdns v0.1.0 h1:fuLybUsfynbigJmCot/54i+gwe0hpc/vtCMvWt2WfDI=
github.com/micro/mdns v0.1.0/go.mod h1:KJ0dW7KmicXU2BV++qkLlmHYcVv7/hHnbtguSWt9Aoc=
github.com/micro/util v0.1.0 h1:ghhF5KKRNlKMexzK+cWo6W6uRAZdKy1UKG/9O74NCYc=
github.com/micro/util v0.1.0/go.mod h1:MZgOs0nwxzv9k4xQo4fpF9IwZGF2O96F5/phP9X4/Sw=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.3 h1:1g0r1IvskvgL8rR+AcHzUA+oFmGcQlaIm4IqakufeMM=
github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0 h1:vKb8ShqSby24Yrqr/yDYkuFz8d0WUjys40rvnGC8aR0=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure v1.0.0 h1:ZkRJX1CyOoTkar7p/mLS5TZU4nJ1Rn/F8u9dGS02Q3Y=
github.com/mitchellh/hashstructure v1.0.0/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664 h1:YbZJ76lQ1BqNhVe7dKTSB67wDrc2VPRR75IyGyyPDX8=
golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 h1:ulvT7fqt0yHWzpJwI57MezWnYDVpCAYBVuYst/L+fAY=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc h1:WiYx1rIFmx8c0mXAFtv5D/mHyKe1+jmuP7PViuwqwuQ=
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@@ -1,4 +1,4 @@
// Package micro is a pluggable RPC framework for microservices // Package micro is a pluggable framework for microservices
package micro package micro
import ( import (
@@ -42,7 +42,7 @@ type Publisher interface {
type Option func(*Options) type Option func(*Options)
var ( var (
HeaderPrefix = "X-Micro-" HeaderPrefix = "Micro-"
) )
// NewService creates and returns a new Service based on the packages within. // NewService creates and returns a new Service based on the packages within.

View File

Before

Width:  |  Height:  |  Size: 25 KiB

After

Width:  |  Height:  |  Size: 25 KiB

View File

@@ -22,9 +22,6 @@ type Options struct {
Registry registry.Registry Registry registry.Registry
Transport transport.Transport Transport transport.Transport
// Register loop interval
RegisterInterval time.Duration
// Before and After funcs // Before and After funcs
BeforeStart []func() error BeforeStart []func() error
BeforeStop []func() error BeforeStop []func() error
@@ -168,7 +165,7 @@ func RegisterTTL(t time.Duration) Option {
// RegisterInterval specifies the interval on which to re-register // RegisterInterval specifies the interval on which to re-register
func RegisterInterval(t time.Duration) Option { func RegisterInterval(t time.Duration) Option {
return func(o *Options) { return func(o *Options) {
o.RegisterInterval = t o.Server.Init(server.RegisterInterval(t))
} }
} }

View File

@@ -1,6 +1,6 @@
# Gossip Registry # Gossip Registry
Gossip is a zero dependency registry which uses hashicorp/memberlist to broadcast registry information Gossip is a zero dependency registry which uses github.com/hashicorp/memberlist to broadcast registry information
via the SWIM protocol. via the SWIM protocol.
## Usage ## Usage
@@ -20,5 +20,5 @@ On startup you'll see something like
To join this gossip ring set the registry address using flag or env var To join this gossip ring set the registry address using flag or env var
```bash ```bash
MICRO_REGISTRY_ADDRESS= 192.168.1.65:56390 MICRO_REGISTRY_ADDRESS=192.168.1.65:56390
``` ```

View File

@@ -0,0 +1,17 @@
package gossip
import (
"context"
"github.com/micro/go-micro/registry"
)
// setRegistryOption returns a function to setup a context with given value
func setRegistryOption(k, v interface{}) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}

View File

@@ -7,9 +7,11 @@ import (
"io/ioutil" "io/ioutil"
"net" "net"
"os" "os"
"os/signal"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"syscall"
"time" "time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@@ -21,10 +23,35 @@ import (
"github.com/mitchellh/hashstructure" "github.com/mitchellh/hashstructure"
) )
// use registry.Result int32 values after it switches from string to int32 types
// type actionType int32
// type updateType int32
const ( const (
addAction = "update" actionTypeInvalid int32 = iota
delAction = "delete" actionTypeCreate
syncAction = "sync" actionTypeDelete
actionTypeUpdate
actionTypeSync
)
func actionTypeString(t int32) string {
switch t {
case actionTypeCreate:
return "create"
case actionTypeDelete:
return "delete"
case actionTypeUpdate:
return "update"
case actionTypeSync:
return "sync"
}
return "invalid"
}
const (
updateTypeInvalid int32 = iota
updateTypeService
) )
type broadcast struct { type broadcast struct {
@@ -93,23 +120,18 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// shutdown old member // shutdown old member
if g.member != nil { if g.member != nil {
g.member.Shutdown() g.Stop()
} }
// replace addresses // replace addresses
curAddrs = newAddrs curAddrs = newAddrs
// create a queue
queue := &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(curAddrs)
},
RetransmitMult: 3,
}
// create a new default config // create a new default config
c := memberlist.DefaultLocalConfig() c := memberlist.DefaultLocalConfig()
// log to dev null
c.LogOutput = ioutil.Discard
if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil { if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil {
c = optConfig c = optConfig
} }
@@ -145,15 +167,6 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// set the name // set the name
c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-") c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-")
// set the delegate
c.Delegate = &delegate{
updates: g.updates,
queue: queue,
}
// log to dev null
c.LogOutput = ioutil.Discard
// set a secret key if secure // set a secret key if secure
if g.options.Secure { if g.options.Secure {
k, ok := g.options.Context.Value(contextSecretKey{}).([]byte) k, ok := g.options.Context.Value(contextSecretKey{}).([]byte)
@@ -164,6 +177,20 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
c.SecretKey = k c.SecretKey = k
} }
// create a queue
queue := &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(curAddrs)
},
RetransmitMult: 3,
}
// set the delegate
c.Delegate = &delegate{
updates: g.updates,
queue: queue,
}
// create the memberlist // create the memberlist
m, err := memberlist.Create(c) m, err := memberlist.Create(c)
if err != nil { if err != nil {
@@ -187,29 +214,10 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
return nil return nil
} }
func (*broadcast) UniqueBroadcast() {}
func (b *broadcast) Invalidates(other memberlist.Broadcast) bool { func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
up := new(pb.Update) return false
if err := proto.Unmarshal(other.Message(), up); err != nil {
return false
}
// ids do not match
if b.update.Id == up.Id {
return false
}
// timestamps do not match
if b.update.Timestamp != up.Timestamp {
return false
}
// type does not match
if b.update.Type != up.Type {
return false
}
// invalidates
return true
} }
func (b *broadcast) Message() []byte { func (b *broadcast) Message() []byte {
@@ -242,7 +250,7 @@ func (d *delegate) NotifyMsg(b []byte) {
} }
// only process service action // only process service action
if up.Type != "service" { if up.Type != updateTypeService {
return return
} }
@@ -280,7 +288,7 @@ func (d *delegate) LocalState(join bool) []byte {
d.updates <- &update{ d.updates <- &update{
Update: &pb.Update{ Update: &pb.Update{
Action: syncAction, Action: actionTypeSync,
}, },
sync: syncCh, sync: syncCh,
} }
@@ -309,7 +317,7 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) {
for _, service := range services { for _, service := range services {
for _, srv := range service { for _, srv := range service {
d.updates <- &update{ d.updates <- &update{
Update: &pb.Update{Action: addAction}, Update: &pb.Update{Action: actionTypeCreate},
Service: srv, Service: srv,
sync: nil, sync: nil,
} }
@@ -350,6 +358,31 @@ func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
return next, exit return next, exit
} }
func (g *gossipRegistry) wait() {
ctx := g.options.Context
if c, ok := ctx.Value(contextContext{}).(context.Context); ok && c != nil {
ctx = c
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
select {
// wait on kill signal
case <-ch:
// wait on context cancel
case <-ctx.Done():
}
g.Stop()
}
func (g *gossipRegistry) Stop() {
g.member.Leave(g.interval * 2)
g.member.Shutdown()
}
func (g *gossipRegistry) run() { func (g *gossipRegistry) run() {
var mtx sync.Mutex var mtx sync.Mutex
updates := map[uint64]*update{} updates := map[uint64]*update{}
@@ -367,11 +400,11 @@ func (g *gossipRegistry) run() {
// process all the updates // process all the updates
for k, v := range updates { for k, v := range updates {
// check if expiry time has passed // check if expiry time has passed
if d := (v.Update.Timestamp + v.Update.Expires); d < now { if d := (v.Update.Expires); d < now {
// delete from records // delete from records
delete(updates, k) delete(updates, k)
// set to delete // set to delete
v.Update.Action = delAction v.Update.Action = actionTypeDelete
// fire a new update // fire a new update
g.updates <- v g.updates <- v
} }
@@ -384,7 +417,7 @@ func (g *gossipRegistry) run() {
// process the updates // process the updates
for u := range g.updates { for u := range g.updates {
switch u.Update.Action { switch u.Update.Action {
case addAction: case actionTypeCreate:
g.Lock() g.Lock()
if service, ok := g.services[u.Service.Name]; !ok { if service, ok := g.services[u.Service.Name]; !ok {
g.services[u.Service.Name] = []*registry.Service{u.Service} g.services[u.Service.Name] = []*registry.Service{u.Service}
@@ -395,7 +428,7 @@ func (g *gossipRegistry) run() {
g.Unlock() g.Unlock()
// publish update to watchers // publish update to watchers
go g.publish(addAction, []*registry.Service{u.Service}) go g.publish(actionTypeString(actionTypeCreate), []*registry.Service{u.Service})
// we need to expire the node at some point in the future // we need to expire the node at some point in the future
if u.Update.Expires > 0 { if u.Update.Expires > 0 {
@@ -406,7 +439,7 @@ func (g *gossipRegistry) run() {
mtx.Unlock() mtx.Unlock()
} }
} }
case delAction: case actionTypeDelete:
g.Lock() g.Lock()
if service, ok := g.services[u.Service.Name]; ok { if service, ok := g.services[u.Service.Name]; ok {
if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 { if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 {
@@ -418,7 +451,7 @@ func (g *gossipRegistry) run() {
g.Unlock() g.Unlock()
// publish update to watchers // publish update to watchers
go g.publish(delAction, []*registry.Service{u.Service}) go g.publish(actionTypeString(actionTypeDelete), []*registry.Service{u.Service})
// delete from expiry checks // delete from expiry checks
if hash, err := hashstructure.Hash(u.Service, nil); err == nil { if hash, err := hashstructure.Hash(u.Service, nil); err == nil {
@@ -426,7 +459,7 @@ func (g *gossipRegistry) run() {
delete(updates, hash) delete(updates, hash)
mtx.Unlock() mtx.Unlock()
} }
case syncAction: case actionTypeSync:
// no sync channel provided // no sync channel provided
if u.sync == nil { if u.sync == nil {
continue continue
@@ -441,7 +474,7 @@ func (g *gossipRegistry) run() {
} }
// publish to watchers // publish to watchers
go g.publish(addAction, service) go g.publish(actionTypeString(actionTypeCreate), service)
} }
g.RUnlock() g.RUnlock()
@@ -480,11 +513,9 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register
} }
up := &pb.Update{ up := &pb.Update{
Id: uuid.New().String(), Expires: uint64(time.Now().Add(options.TTL).UnixNano()),
Timestamp: uint64(time.Now().UnixNano()), Action: actionTypeCreate,
Expires: uint64(options.TTL.Nanoseconds()), Type: updateTypeService,
Action: "update",
Type: "service",
Metadata: map[string]string{ Metadata: map[string]string{
"Content-Type": "application/json", "Content-Type": "application/json",
}, },
@@ -519,10 +550,8 @@ func (g *gossipRegistry) Deregister(s *registry.Service) error {
g.Unlock() g.Unlock()
up := &pb.Update{ up := &pb.Update{
Id: uuid.New().String(), Action: actionTypeDelete,
Timestamp: uint64(time.Now().UnixNano()), Type: updateTypeService,
Action: "delete",
Type: "service",
Metadata: map[string]string{ Metadata: map[string]string{
"Content-Type": "application/json", "Content-Type": "application/json",
}, },
@@ -590,5 +619,7 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
// wait for setup // wait for setup
<-time.After(gossip.interval * 2) <-time.After(gossip.interval * 2)
go gossip.wait()
return gossip return gossip
} }

View File

@@ -0,0 +1,199 @@
package gossip_test
import (
"context"
"os"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/hashicorp/memberlist"
micro "github.com/micro/go-micro"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/gossip"
pb "github.com/micro/go-micro/registry/gossip/proto"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/server"
)
var (
r1 registry.Registry
r2 registry.Registry
mu sync.Mutex
)
func newConfig() *memberlist.Config {
wc := memberlist.DefaultLANConfig()
wc.DisableTcpPings = false
wc.GossipVerifyIncoming = false
wc.GossipVerifyOutgoing = false
wc.EnableCompression = false
wc.LogOutput = os.Stderr
wc.ProtocolVersion = 4
wc.Name = uuid.New().String()
return wc
}
func newRegistries() {
mu.Lock()
defer mu.Unlock()
if r1 != nil && r2 != nil {
return
}
wc1 := newConfig()
wc2 := newConfig()
rops1 := []registry.Option{gossip.Config(wc1), gossip.Address("127.0.0.1:54321")}
rops2 := []registry.Option{gossip.Config(wc2), gossip.Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321")}
r1 = gossip.NewRegistry(rops1...) // first started without members
r2 = gossip.NewRegistry(rops2...) // second started joining
}
func TestRegistryBroadcast(t *testing.T) {
newRegistries()
svc1 := &registry.Service{Name: "r1-svc", Version: "0.0.0.1"}
svc2 := &registry.Service{Name: "r2-svc", Version: "0.0.0.2"}
<-time.After(1 * time.Second)
if err := r1.Register(svc1); err != nil {
t.Fatal(err)
}
<-time.After(1 * time.Second)
if err := r2.Register(svc2); err != nil {
t.Fatal(err)
}
var found bool
svcs, err := r1.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "r2-svc" {
found = true
}
}
if !found {
t.Fatalf("r2-svc not found in r1, broadcast not work")
}
found = false
svcs, err = r2.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "r1-svc" {
found = true
}
}
if !found {
t.Fatalf("r1-svc not found in r2, broadcast not work")
}
if err := r1.Deregister(svc1); err != nil {
t.Fatal(err)
}
if err := r2.Deregister(svc2); err != nil {
t.Fatal(err)
}
}
func TestServerRegistry(t *testing.T) {
newRegistries()
_, err := newServer("s1", r1, t)
if err != nil {
t.Fatal(err)
}
_, err = newServer("s2", r2, t)
if err != nil {
t.Fatal(err)
}
svcs, err := r1.ListServices()
if err != nil {
t.Fatal(err)
}
if len(svcs) < 1 {
t.Fatalf("r1 svcs unknown %#+v\n", svcs)
}
found := false
for _, svc := range svcs {
if svc.Name == "s2" {
found = true
}
}
if !found {
t.Fatalf("r1 does not have s2, broadcast not work")
}
found = false
svcs, err = r2.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "s1" {
found = true
}
}
if !found {
t.Fatalf("r2 does not have s1, broadcast not work")
}
}
type testServer struct{}
func (*testServer) Test(ctx context.Context, req *pb.Update, rsp *pb.Update) error {
return nil
}
func newServer(n string, r registry.Registry, t *testing.T) (micro.Service, error) {
h := &testServer{}
var wg sync.WaitGroup
wg.Add(1)
sopts := []server.Option{
server.Name(n),
server.Registry(r),
}
copts := []client.Option{
client.Selector(selector.NewSelector(selector.Registry(r))),
client.Registry(r),
}
srv := micro.NewService(
micro.Server(server.NewServer(sopts...)),
micro.Client(client.NewClient(copts...)),
micro.AfterStart(func() error {
wg.Done()
return nil
}),
)
srv.Server().NewHandler(h)
go func() {
t.Fatal(srv.Run())
}()
wg.Wait()
return srv, nil
}

View File

@@ -12,34 +12,35 @@ type contextSecretKey struct{}
// Secret specifies an encryption key. The value should be either // Secret specifies an encryption key. The value should be either
// 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256. // 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256.
func Secret(k []byte) registry.Option { func Secret(k []byte) registry.Option {
return func(o *registry.Options) { return setRegistryOption(contextSecretKey{}, k)
o.Context = context.WithValue(o.Context, contextSecretKey{}, k)
}
} }
type contextAddress struct{} type contextAddress struct{}
// Address to bind to - host:port // Address to bind to - host:port
func Address(a string) registry.Option { func Address(a string) registry.Option {
return func(o *registry.Options) { return setRegistryOption(contextAddress{}, a)
o.Context = context.WithValue(o.Context, contextAddress{}, a)
}
} }
type contextConfig struct{} type contextConfig struct{}
// Config allow to inject a *memberlist.Config struct for configuring gossip // Config allow to inject a *memberlist.Config struct for configuring gossip
func Config(c *memberlist.Config) registry.Option { func Config(c *memberlist.Config) registry.Option {
return func(o *registry.Options) { return setRegistryOption(contextConfig{}, c)
o.Context = context.WithValue(o.Context, contextConfig{}, c)
}
} }
type contextAdvertise struct{} type contextAdvertise struct{}
// The address to advertise for other gossip members - host:port // The address to advertise for other gossip members - host:port
func Advertise(a string) registry.Option { func Advertise(a string) registry.Option {
return func(o *registry.Options) { return setRegistryOption(contextAdvertise{}, a)
o.Context = context.WithValue(o.Context, contextAdvertise{}, a) }
}
type contextContext struct{}
// Context specifies a context for the registry.
// Can be used to signal shutdown of the registry.
// Can be used for extra option values.
func Context(ctx context.Context) registry.Option {
return setRegistryOption(contextContext{}, ctx)
} }

View File

@@ -3,9 +3,12 @@
package gossip package gossip
import proto "github.com/golang/protobuf/proto" import (
import fmt "fmt" fmt "fmt"
import math "math" math "math"
proto "github.com/golang/protobuf/proto"
)
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal var _ = proto.Marshal
@@ -20,16 +23,12 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// Update is the message broadcast // Update is the message broadcast
type Update struct { type Update struct {
// unique id of update
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// unix nano timestamp of update
Timestamp uint64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// time to live for entry // time to live for entry
Expires uint64 `protobuf:"varint,3,opt,name=expires,proto3" json:"expires,omitempty"` Expires uint64 `protobuf:"varint,1,opt,name=expires,proto3" json:"expires,omitempty"`
// type of update; service // type of update
Type string `protobuf:"bytes,4,opt,name=type,proto3" json:"type,omitempty"` Type int32 `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"`
// what action is taken; add, del, put // what action is taken
Action string `protobuf:"bytes,5,opt,name=action,proto3" json:"action,omitempty"` Action int32 `protobuf:"varint,3,opt,name=action,proto3" json:"action,omitempty"`
// any other associated metadata about the data // any other associated metadata about the data
Metadata map[string]string `protobuf:"bytes,6,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` Metadata map[string]string `protobuf:"bytes,6,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// the payload data; // the payload data;
@@ -43,16 +42,17 @@ func (m *Update) Reset() { *m = Update{} }
func (m *Update) String() string { return proto.CompactTextString(m) } func (m *Update) String() string { return proto.CompactTextString(m) }
func (*Update) ProtoMessage() {} func (*Update) ProtoMessage() {}
func (*Update) Descriptor() ([]byte, []int) { func (*Update) Descriptor() ([]byte, []int) {
return fileDescriptor_gossip_fd1eb378131a5d12, []int{0} return fileDescriptor_18cba623e76e57f3, []int{0}
} }
func (m *Update) XXX_Unmarshal(b []byte) error { func (m *Update) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Update.Unmarshal(m, b) return xxx_messageInfo_Update.Unmarshal(m, b)
} }
func (m *Update) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { func (m *Update) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Update.Marshal(b, m, deterministic) return xxx_messageInfo_Update.Marshal(b, m, deterministic)
} }
func (dst *Update) XXX_Merge(src proto.Message) { func (m *Update) XXX_Merge(src proto.Message) {
xxx_messageInfo_Update.Merge(dst, src) xxx_messageInfo_Update.Merge(m, src)
} }
func (m *Update) XXX_Size() int { func (m *Update) XXX_Size() int {
return xxx_messageInfo_Update.Size(m) return xxx_messageInfo_Update.Size(m)
@@ -63,20 +63,6 @@ func (m *Update) XXX_DiscardUnknown() {
var xxx_messageInfo_Update proto.InternalMessageInfo var xxx_messageInfo_Update proto.InternalMessageInfo
func (m *Update) GetId() string {
if m != nil {
return m.Id
}
return ""
}
func (m *Update) GetTimestamp() uint64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *Update) GetExpires() uint64 { func (m *Update) GetExpires() uint64 {
if m != nil { if m != nil {
return m.Expires return m.Expires
@@ -84,18 +70,18 @@ func (m *Update) GetExpires() uint64 {
return 0 return 0
} }
func (m *Update) GetType() string { func (m *Update) GetType() int32 {
if m != nil { if m != nil {
return m.Type return m.Type
} }
return "" return 0
} }
func (m *Update) GetAction() string { func (m *Update) GetAction() int32 {
if m != nil { if m != nil {
return m.Action return m.Action
} }
return "" return 0
} }
func (m *Update) GetMetadata() map[string]string { func (m *Update) GetMetadata() map[string]string {
@@ -118,25 +104,24 @@ func init() {
} }
func init() { func init() {
proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor_gossip_fd1eb378131a5d12) proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor_18cba623e76e57f3)
} }
var fileDescriptor_gossip_fd1eb378131a5d12 = []byte{ var fileDescriptor_18cba623e76e57f3 = []byte{
// 251 bytes of a gzipped FileDescriptorProto // 227 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0xcf, 0x4a, 0xc4, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x8f, 0xc1, 0x4a, 0x03, 0x31,
0x10, 0x87, 0x69, 0xb6, 0x9b, 0xb5, 0xe3, 0x1f, 0x64, 0x10, 0x09, 0xb2, 0x87, 0xe2, 0xa9, 0x17, 0x14, 0x45, 0x49, 0xa7, 0x4d, 0xed, 0x53, 0x41, 0x1e, 0x22, 0x41, 0x5c, 0x0c, 0xae, 0x66, 0xe3,
0x5b, 0xd0, 0xcb, 0xa2, 0x5e, 0x3d, 0x7a, 0x09, 0xf8, 0x00, 0xd9, 0x36, 0xd4, 0xa0, 0xd9, 0x84, 0x0c, 0xe8, 0xa6, 0xa8, 0x5b, 0x97, 0x6e, 0x02, 0x7e, 0x40, 0x3a, 0x0d, 0x31, 0xe8, 0x34, 0x21,
0x64, 0x56, 0xec, 0x13, 0xf8, 0xda, 0xb2, 0x69, 0x54, 0xbc, 0x7d, 0xdf, 0xcc, 0x24, 0x99, 0x5f, 0x79, 0x15, 0xf3, 0xa9, 0xfe, 0x8d, 0x34, 0x89, 0x42, 0x77, 0xe7, 0x24, 0x37, 0xdc, 0x1b, 0x78,
0xe0, 0x71, 0x34, 0xf4, 0xba, 0xdf, 0xb6, 0xbd, 0xb3, 0x9d, 0x35, 0x7d, 0x70, 0xdd, 0xe8, 0x6e, 0x36, 0x96, 0xde, 0xf7, 0x9b, 0x7e, 0x74, 0xd3, 0x30, 0xd9, 0x31, 0xb8, 0xc1, 0xb8, 0xbb, 0x02,
0x66, 0x08, 0x7a, 0x34, 0x91, 0xc2, 0xd4, 0x8d, 0x2e, 0x46, 0xe3, 0x3b, 0x1f, 0x1c, 0xb9, 0x2c, 0x41, 0x1b, 0x1b, 0x29, 0xa4, 0xc1, 0xb8, 0x18, 0xad, 0x1f, 0x7c, 0x70, 0xe4, 0xaa, 0xf4, 0x59,
0x6d, 0x12, 0xe4, 0xb3, 0x5d, 0x7f, 0x31, 0xe0, 0x2f, 0x7e, 0x50, 0xa4, 0xf1, 0x0c, 0x98, 0x19, 0x90, 0x17, 0xbb, 0xfd, 0x61, 0xc0, 0xdf, 0xfc, 0x56, 0x91, 0x46, 0x01, 0x4b, 0xfd, 0xed, 0x6d,
0x44, 0x51, 0x17, 0x4d, 0x25, 0x99, 0x19, 0x70, 0x0d, 0x15, 0x19, 0xab, 0x23, 0x29, 0xeb, 0x05, 0xd0, 0x51, 0xb0, 0x96, 0x75, 0x73, 0xf9, 0xa7, 0x88, 0x30, 0xa7, 0xe4, 0xb5, 0x98, 0xb5, 0xac,
0xab, 0x8b, 0xa6, 0x94, 0x7f, 0x05, 0x14, 0xb0, 0xd2, 0x9f, 0xde, 0x04, 0x1d, 0xc5, 0x22, 0xf5, 0x5b, 0xc8, 0xcc, 0x78, 0x05, 0x5c, 0x8d, 0x64, 0xdd, 0x4e, 0x34, 0xf9, 0xb4, 0x1a, 0xae, 0xe1,
0x7e, 0x14, 0x11, 0x4a, 0x9a, 0xbc, 0x16, 0x65, 0xba, 0x29, 0x31, 0x5e, 0x02, 0x57, 0x3d, 0x19, 0x64, 0xd2, 0xa4, 0xb6, 0x8a, 0x94, 0xe0, 0x6d, 0xd3, 0x9d, 0xde, 0xdf, 0xf4, 0xb5, 0xb9, 0xf4,
0xb7, 0x13, 0xcb, 0x54, 0xcd, 0x86, 0x1b, 0x38, 0xb2, 0x9a, 0xd4, 0xa0, 0x48, 0x09, 0x5e, 0x2f, 0xf4, 0xaf, 0xf5, 0xfa, 0x65, 0x47, 0x21, 0xc9, 0xff, 0xf4, 0xa1, 0x25, 0xbf, 0x5a, 0xb6, 0xac,
0x9a, 0xe3, 0xdb, 0x75, 0x9b, 0xf7, 0x9c, 0xb7, 0x6a, 0x9f, 0x73, 0xfb, 0x69, 0x47, 0x61, 0x92, 0x3b, 0x93, 0x99, 0xaf, 0x9f, 0xe0, 0xfc, 0x28, 0x8e, 0x17, 0xd0, 0x7c, 0xe8, 0x94, 0x07, 0xae,
0xbf, 0xd3, 0x87, 0x57, 0xd2, 0xa9, 0x55, 0x5d, 0x34, 0x27, 0x32, 0xf1, 0xd5, 0x03, 0x9c, 0xfe, 0xe4, 0x01, 0xf1, 0x12, 0x16, 0x5f, 0xea, 0x73, 0x5f, 0xd6, 0xad, 0x64, 0x91, 0xc7, 0xd9, 0x9a,
0x1b, 0xc7, 0x73, 0x58, 0xbc, 0xe9, 0x29, 0x67, 0x3a, 0x20, 0x5e, 0xc0, 0xf2, 0x43, 0xbd, 0xef, 0x6d, 0x78, 0xfe, 0xea, 0xc3, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xd6, 0x63, 0x7b, 0x1b, 0x2a,
0x75, 0x0a, 0x54, 0xc9, 0x59, 0xee, 0xd9, 0xa6, 0xd8, 0xf2, 0xf4, 0x31, 0x77, 0xdf, 0x01, 0x00, 0x01, 0x00, 0x00,
0x00, 0xff, 0xff, 0x06, 0x6e, 0x00, 0x3c, 0x58, 0x01, 0x00, 0x00,
} }

View File

@@ -4,16 +4,12 @@ package gossip;
// Update is the message broadcast // Update is the message broadcast
message Update { message Update {
// unique id of update
string id = 1;
// unix nano timestamp of update
uint64 timestamp = 2;
// time to live for entry // time to live for entry
uint64 expires = 3; uint64 expires = 1;
// type of update; service // type of update
string type = 4; int32 type = 2;
// what action is taken; add, del, put // what action is taken
string action = 5; int32 action = 3;
// any other associated metadata about the data // any other associated metadata about the data
map<string, string> metadata = 6; map<string, string> metadata = 6;
// the payload data; // the payload data;

View File

@@ -70,21 +70,30 @@ func addNodes(old, neu []*registry.Node) []*registry.Node {
} }
func addServices(old, neu []*registry.Service) []*registry.Service { func addServices(old, neu []*registry.Service) []*registry.Service {
var srv []*registry.Service
for _, s := range neu { for _, s := range neu {
var seen bool var seen bool
for i, o := range old { for _, o := range old {
if o.Version == s.Version { if o.Version == s.Version {
s.Nodes = addNodes(o.Nodes, s.Nodes) sp := new(registry.Service)
// make copy
*sp = *o
// set nodes
sp.Nodes = addNodes(o.Nodes, s.Nodes)
// mark as seen
seen = true seen = true
old[i] = s srv = append(srv, sp)
break break
} }
} }
if !seen { if !seen {
old = append(old, s) srv = append(srv, cp([]*registry.Service{s})...)
} }
} }
return old
return srv
} }
func delNodes(old, del []*registry.Node) []*registry.Node { func delNodes(old, del []*registry.Node) []*registry.Node {

View File

@@ -2,6 +2,7 @@
package registry package registry
import ( import (
"context"
"net" "net"
"strings" "strings"
"sync" "sync"
@@ -194,20 +195,20 @@ func (m *mdnsRegistry) Deregister(service *Service) error {
} }
func (m *mdnsRegistry) GetService(service string) ([]*Service, error) { func (m *mdnsRegistry) GetService(service string) ([]*Service, error) {
p := mdns.DefaultParams(service)
p.Timeout = m.opts.Timeout
entryCh := make(chan *mdns.ServiceEntry, 10)
p.Entries = entryCh
exit := make(chan bool)
defer close(exit)
serviceMap := make(map[string]*Service) serviceMap := make(map[string]*Service)
entries := make(chan *mdns.ServiceEntry, 10)
done := make(chan bool)
p := mdns.DefaultParams(service)
// set context with timeout
p.Context, _ = context.WithTimeout(context.Background(), m.opts.Timeout)
// set entries channel
p.Entries = entries
go func() { go func() {
for { for {
select { select {
case e := <-entryCh: case e := <-entries:
// list record so skip // list record so skip
if p.Service == "_services" { if p.Service == "_services" {
continue continue
@@ -243,16 +244,21 @@ func (m *mdnsRegistry) GetService(service string) ([]*Service, error) {
}) })
serviceMap[txt.Version] = s serviceMap[txt.Version] = s
case <-exit: case <-p.Context.Done():
close(done)
return return
} }
} }
}() }()
// execute the query
if err := mdns.Query(p); err != nil { if err := mdns.Query(p); err != nil {
return nil, err return nil, err
} }
// wait for completion
<-done
// create list and return // create list and return
var services []*Service var services []*Service
@@ -264,21 +270,22 @@ func (m *mdnsRegistry) GetService(service string) ([]*Service, error) {
} }
func (m *mdnsRegistry) ListServices() ([]*Service, error) { func (m *mdnsRegistry) ListServices() ([]*Service, error) {
p := mdns.DefaultParams("_services")
p.Timeout = m.opts.Timeout
entryCh := make(chan *mdns.ServiceEntry, 10)
p.Entries = entryCh
exit := make(chan bool)
defer close(exit)
serviceMap := make(map[string]bool) serviceMap := make(map[string]bool)
entries := make(chan *mdns.ServiceEntry, 10)
done := make(chan bool)
p := mdns.DefaultParams("_services")
// set context with timeout
p.Context, _ = context.WithTimeout(context.Background(), m.opts.Timeout)
// set entries channel
p.Entries = entries
var services []*Service var services []*Service
go func() { go func() {
for { for {
select { select {
case e := <-entryCh: case e := <-entries:
if e.TTL == 0 { if e.TTL == 0 {
continue continue
} }
@@ -288,16 +295,21 @@ func (m *mdnsRegistry) ListServices() ([]*Service, error) {
serviceMap[name] = true serviceMap[name] = true
services = append(services, &Service{Name: name}) services = append(services, &Service{Name: name})
} }
case <-exit: case <-p.Context.Done():
close(done)
return return
} }
} }
}() }()
// execute query
if err := mdns.Query(p); err != nil { if err := mdns.Query(p); err != nil {
return nil, err return nil, err
} }
// wait till done
<-done
return services, nil return services, nil
} }

View File

@@ -1,13 +1,20 @@
package server package server
import "context"
type HandlerOption func(*HandlerOptions)
type HandlerOptions struct { type HandlerOptions struct {
Internal bool Internal bool
Metadata map[string]map[string]string Metadata map[string]map[string]string
} }
type SubscriberOption func(*SubscriberOptions)
type SubscriberOptions struct { type SubscriberOptions struct {
Queue string Queue string
Internal bool Internal bool
Context context.Context
} }
// EndpointMetadata is a Handler option that allows metadata to be added to // EndpointMetadata is a Handler option that allows metadata to be added to
@@ -34,6 +41,17 @@ func InternalSubscriber(b bool) SubscriberOption {
o.Internal = b o.Internal = b
} }
} }
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
opt := SubscriberOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&opt)
}
return opt
}
// Shared queue name distributed messages across subscribers // Shared queue name distributed messages across subscribers
func SubscriberQueue(n string) SubscriberOption { func SubscriberQueue(n string) SubscriberOption {
@@ -41,3 +59,10 @@ func SubscriberQueue(n string) SubscriberOption {
o.Queue = n o.Queue = n
} }
} }
// SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberContext(ctx context.Context) SubscriberOption {
return func(o *SubscriberOptions) {
o.Context = ctx
}
}

View File

@@ -27,6 +27,8 @@ type Options struct {
// The register expiry time // The register expiry time
RegisterTTL time.Duration RegisterTTL time.Duration
// The interval on which to register
RegisterInterval time.Duration
// The router for requests // The router for requests
Router Router Router Router
@@ -168,6 +170,13 @@ func RegisterTTL(t time.Duration) Option {
} }
} }
// Register the service with at interval
func RegisterInterval(t time.Duration) Option {
return func(o *Options) {
o.RegisterInterval = t
}
}
// WithRouter sets the request router // WithRouter sets the request router
func WithRouter(r Router) Option { func WithRouter(r Router) Option {
return func(o *Options) { return func(o *Options) {

View File

@@ -66,13 +66,63 @@ func (rwc *readWriteCloser) Close() error {
return nil return nil
} }
func getHeader(hdr string, md map[string]string) string {
if hd := md[hdr]; len(hd) > 0 {
return hd
}
return md["X-"+hdr]
}
func getHeaders(m *codec.Message) {
get := func(hdr, v string) string {
if len(v) > 0 {
return v
}
if hd := m.Header[hdr]; len(hd) > 0 {
return hd
}
// old
return m.Header["X-"+hdr]
}
m.Id = get("Micro-Id", m.Id)
m.Error = get("Micro-Error", m.Error)
m.Endpoint = get("Micro-Endpoint", m.Endpoint)
m.Method = get("Micro-Method", m.Method)
m.Target = get("Micro-Service", m.Target)
// TODO: remove this cruft
if len(m.Endpoint) == 0 {
m.Endpoint = m.Method
}
}
func setHeaders(m, r *codec.Message) {
set := func(hdr, v string) {
if len(v) == 0 {
return
}
m.Header[hdr] = v
m.Header["X-"+hdr] = v
}
// set headers
set("Micro-Id", r.Id)
set("Micro-Service", r.Target)
set("Micro-Method", r.Method)
set("Micro-Endpoint", r.Endpoint)
set("Micro-Error", r.Error)
}
// setupProtocol sets up the old protocol // setupProtocol sets up the old protocol
func setupProtocol(msg *transport.Message) codec.NewCodec { func setupProtocol(msg *transport.Message) codec.NewCodec {
service := msg.Header["X-Micro-Service"] service := getHeader("Micro-Service", msg.Header)
method := msg.Header["X-Micro-Method"] method := getHeader("Micro-Method", msg.Header)
endpoint := msg.Header["X-Micro-Endpoint"] endpoint := getHeader("Micro-Endpoint", msg.Header)
protocol := msg.Header["X-Micro-Protocol"] protocol := getHeader("Micro-Protocol", msg.Header)
target := msg.Header["X-Micro-Target"] target := getHeader("Micro-Target", msg.Header)
// if the protocol exists (mucp) do nothing // if the protocol exists (mucp) do nothing
if len(protocol) > 0 { if len(protocol) > 0 {
@@ -91,12 +141,12 @@ func setupProtocol(msg *transport.Message) codec.NewCodec {
// no method then set to endpoint // no method then set to endpoint
if len(method) == 0 { if len(method) == 0 {
msg.Header["X-Micro-Method"] = method msg.Header["Micro-Method"] = endpoint
} }
// no endpoint then set to method // no endpoint then set to method
if len(endpoint) == 0 { if len(endpoint) == 0 {
msg.Header["X-Micro-Endpoint"] = method msg.Header["Micro-Endpoint"] = method
} }
return nil return nil
@@ -118,7 +168,7 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
} }
func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error { func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
// the initieal message // the initial message
m := codec.Message{ m := codec.Message{
Header: c.req.Header, Header: c.req.Header,
Body: c.req.Body, Body: c.req.Body,
@@ -153,25 +203,22 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
c.first = false c.first = false
// set some internal things // set some internal things
m.Target = m.Header["X-Micro-Service"] getHeaders(&m)
m.Method = m.Header["X-Micro-Method"]
m.Endpoint = m.Header["X-Micro-Endpoint"]
m.Id = m.Header["X-Micro-Id"]
// read header via codec // read header via codec
err := c.codec.ReadHeader(&m, codec.Request) if err := c.codec.ReadHeader(&m, codec.Request); err != nil {
return err
// set the method/id
r.Method = m.Method
r.Endpoint = m.Endpoint
r.Id = m.Id
// TODO: remove the old legacy cruft
if len(r.Endpoint) == 0 {
r.Endpoint = r.Method
} }
return err // fallback for 0.14 and older
if len(m.Endpoint) == 0 {
m.Endpoint = m.Method
}
// set message
*r = m
return nil
} }
func (c *rpcCodec) ReadBody(b interface{}) error { func (c *rpcCodec) ReadBody(b interface{}) error {
@@ -206,29 +253,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
m.Header = map[string]string{} m.Header = map[string]string{}
} }
// set request id setHeaders(m, r)
if len(r.Id) > 0 {
m.Header["X-Micro-Id"] = r.Id
}
// set target
if len(r.Target) > 0 {
m.Header["X-Micro-Service"] = r.Target
}
// set request method
if len(r.Method) > 0 {
m.Header["X-Micro-Method"] = r.Method
}
// set request endpoint
if len(r.Endpoint) > 0 {
m.Header["X-Micro-Endpoint"] = r.Endpoint
}
if len(r.Error) > 0 {
m.Header["X-Micro-Error"] = r.Error
}
// the body being sent // the body being sent
var body []byte var body []byte
@@ -246,6 +271,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
// write an error if it failed // write an error if it failed
m.Error = errors.Wrapf(err, "Unable to encode body").Error() m.Error = errors.Wrapf(err, "Unable to encode body").Error()
m.Header["X-Micro-Error"] = m.Error m.Header["X-Micro-Error"] = m.Error
m.Header["Micro-Error"] = m.Error
// no body to write // no body to write
if err := c.codec.Write(m, nil); err != nil { if err := c.codec.Write(m, nil); err != nil {
return err return err

View File

@@ -47,6 +47,11 @@ func (r *rpcRequest) Header() map[string]string {
return r.header return r.header
} }
func (r *rpcRequest) Body() interface{} {
// TODO: convert to interface value
return r.body
}
func (r *rpcRequest) Read() ([]byte, error) { func (r *rpcRequest) Read() ([]byte, error) {
// got a body // got a body
if r.body != nil { if r.body != nil {

View File

@@ -10,7 +10,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/micro/go-log" log "github.com/micro/go-log"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/metadata" "github.com/micro/go-micro/metadata"
@@ -120,9 +120,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// internal request // internal request
request := &rpcRequest{ request := &rpcRequest{
service: msg.Header["X-Micro-Service"], service: getHeader("Micro-Service", msg.Header),
method: msg.Header["X-Micro-Method"], method: getHeader("Micro-Method", msg.Header),
endpoint: msg.Header["X-Micro-Endpoint"], endpoint: getHeader("Micro-Endpoint", msg.Header),
contentType: ct, contentType: ct,
codec: rcodec, codec: rcodec,
header: msg.Header, header: msg.Header,
@@ -157,15 +157,17 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// TODO: handle error better // TODO: handle error better
if err := handler(ctx, request, response); err != nil { if err := handler(ctx, request, response); err != nil {
// write an error response if err != lastStreamResponseError {
err = rcodec.Write(&codec.Message{ // write an error response
Header: msg.Header, err = rcodec.Write(&codec.Message{
Error: err.Error(), Header: msg.Header,
Type: codec.Error, Error: err.Error(),
}, nil) Type: codec.Error,
// could not write the error response }, nil)
if err != nil { // could not write the error response
log.Logf("rpc: unable to write error response: %v", err) if err != nil {
log.Logf("rpc: unable to write error response: %v", err)
}
} }
s.wg.Done() s.wg.Done()
return return
@@ -274,12 +276,18 @@ func (s *rpcServer) Register() error {
return err return err
} }
// make copy of metadata
md := make(metadata.Metadata)
for k, v := range config.Metadata {
md[k] = v
}
// register service // register service
node := &registry.Node{ node := &registry.Node{
Id: config.Name + "-" + config.Id, Id: config.Name + "-" + config.Id,
Address: addr, Address: addr,
Port: port, Port: port,
Metadata: config.Metadata, Metadata: md,
} }
node.Metadata["transport"] = config.Transport.String() node.Metadata["transport"] = config.Transport.String()
@@ -357,6 +365,9 @@ func (s *rpcServer) Register() error {
if queue := sb.Options().Queue; len(queue) > 0 { if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.Queue(queue)) opts = append(opts, broker.Queue(queue))
} }
if cx := sb.Options().Context; cx != nil {
opts = append(opts, broker.SubscribeContext(cx))
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil { if err != nil {
return err return err
@@ -436,6 +447,7 @@ func (s *rpcServer) Start() error {
registerDebugHandler(s) registerDebugHandler(s)
config := s.Options() config := s.Options()
// start listening on the transport
ts, err := config.Transport.Listen(config.Address) ts, err := config.Transport.Listen(config.Address)
if err != nil { if err != nil {
return err return err
@@ -443,30 +455,45 @@ func (s *rpcServer) Start() error {
log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr()) log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
s.Lock()
// swap address // swap address
s.Lock()
addr := s.opts.Address addr := s.opts.Address
s.opts.Address = ts.Addr() s.opts.Address = ts.Addr()
s.Unlock() s.Unlock()
exit := make(chan bool, 1) // connect to the broker
if err := config.Broker.Connect(); err != nil {
return err
}
log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address())
// announce self to the world
if err := s.Register(); err != nil {
log.Log("Server register error: ", err)
}
exit := make(chan bool)
go func() { go func() {
for { for {
// listen for connections
err := ts.Accept(s.ServeConn) err := ts.Accept(s.ServeConn)
// check if we're supposed to exit // TODO: listen for messages
// msg := broker.Exchange(service).Consume()
select { select {
// check if we're supposed to exit
case <-exit: case <-exit:
return return
default:
}
// check the error and backoff // check the error and backoff
if err != nil { default:
log.Logf("Accept error: %v", err) if err != nil {
time.Sleep(time.Second) log.Logf("Accept error: %v", err)
continue time.Sleep(time.Second)
continue
}
} }
// no error just exit // no error just exit
@@ -475,9 +502,37 @@ func (s *rpcServer) Start() error {
}() }()
go func() { go func() {
// wait for exit t := new(time.Ticker)
ch := <-s.exit
exit <- true // only process if it exists
if s.opts.RegisterInterval > time.Duration(0) {
// new ticker
t = time.NewTicker(s.opts.RegisterInterval)
}
// return error chan
var ch chan error
Loop:
for {
select {
// register self on interval
case <-t.C:
if err := s.Register(); err != nil {
log.Log("Server register error: ", err)
}
// wait for exit
case ch = <-s.exit:
t.Stop()
close(exit)
break Loop
}
}
// deregister self
if err := s.Deregister(); err != nil {
log.Log("Server deregister error: ", err)
}
// wait for requests to finish // wait for requests to finish
if wait(s.opts.Context) { if wait(s.opts.Context) {
@@ -490,18 +545,12 @@ func (s *rpcServer) Start() error {
// disconnect the broker // disconnect the broker
config.Broker.Disconnect() config.Broker.Disconnect()
s.Lock()
// swap back address // swap back address
s.Lock()
s.opts.Address = addr s.opts.Address = addr
s.Unlock() s.Unlock()
}() }()
// TODO: subscribe to cruft
if err := config.Broker.Connect(); err != nil {
return err
}
log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address())
return nil return nil
} }

View File

@@ -21,8 +21,6 @@ type Server interface {
NewHandler(interface{}, ...HandlerOption) Handler NewHandler(interface{}, ...HandlerOption) Handler
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
Subscribe(Subscriber) error Subscribe(Subscriber) error
Register() error
Deregister() error
Start() error Start() error
Stop() error Stop() error
String() string String() string
@@ -53,6 +51,8 @@ type Request interface {
ContentType() string ContentType() string
// Header of the request // Header of the request
Header() map[string]string Header() map[string]string
// Body is the initial decoded value
Body() interface{}
// Read the undecoded request body // Read the undecoded request body
Read() ([]byte, error) Read() ([]byte, error)
// The encoded message stream // The encoded message stream
@@ -114,10 +114,6 @@ type Subscriber interface {
type Option func(*Options) type Option func(*Options)
type HandlerOption func(*HandlerOptions)
type SubscriberOption func(*SubscriberOptions)
var ( var (
DefaultAddress = ":0" DefaultAddress = ":0"
DefaultName = "go-server" DefaultName = "go-server"
@@ -177,16 +173,6 @@ func Subscribe(s Subscriber) error {
return DefaultServer.Subscribe(s) return DefaultServer.Subscribe(s)
} }
// Register registers the default server with the discovery system
func Register() error {
return DefaultServer.Register()
}
// Deregister deregisters the default server from the discovery system
func Deregister() error {
return DefaultServer.Deregister()
}
// Run starts the default server and waits for a kill // Run starts the default server and waits for a kill
// signal before exiting. Also registers/deregisters the server // signal before exiting. Also registers/deregisters the server
func Run() error { func Run() error {
@@ -194,18 +180,10 @@ func Run() error {
return err return err
} }
if err := DefaultServer.Register(); err != nil {
return err
}
ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
log.Logf("Received signal %s", <-ch) log.Logf("Received signal %s", <-ch)
if err := DefaultServer.Deregister(); err != nil {
return err
}
return Stop() return Stop()
} }

View File

@@ -5,10 +5,7 @@ import (
"os/signal" "os/signal"
"sync" "sync"
"syscall" "syscall"
"time"
"github.com/micro/cli"
"github.com/micro/go-log"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd" "github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/metadata" "github.com/micro/go-micro/metadata"
@@ -36,27 +33,6 @@ func newService(opts ...Option) Service {
} }
} }
func (s *service) run(exit chan bool) {
if s.opts.RegisterInterval <= time.Duration(0) {
return
}
t := time.NewTicker(s.opts.RegisterInterval)
for {
select {
case <-t.C:
err := s.opts.Server.Register()
if err != nil {
log.Log("service run Server.Register error: ", err)
}
case <-exit:
t.Stop()
return
}
}
}
// Init initialises options. Additionally it calls cmd.Init // Init initialises options. Additionally it calls cmd.Init
// which parses command line flags. cmd.Init is only called // which parses command line flags. cmd.Init is only called
// on first Init. // on first Init.
@@ -67,20 +43,6 @@ func (s *service) Init(opts ...Option) {
} }
s.once.Do(func() { s.once.Do(func() {
// save user action
action := s.opts.Cmd.App().Action
// set service action
s.opts.Cmd.App().Action = func(c *cli.Context) {
// set register interval
if i := time.Duration(c.GlobalInt("register_interval")); i > 0 {
s.opts.RegisterInterval = i * time.Second
}
// user action
action(c)
}
// Initialise the command flags, overriding new service // Initialise the command flags, overriding new service
_ = s.opts.Cmd.Init( _ = s.opts.Cmd.Init(
cmd.Broker(&s.opts.Broker), cmd.Broker(&s.opts.Broker),
@@ -105,7 +67,7 @@ func (s *service) Server() server.Server {
} }
func (s *service) String() string { func (s *service) String() string {
return "go-micro" return "micro"
} }
func (s *service) Start() error { func (s *service) Start() error {
@@ -119,10 +81,6 @@ func (s *service) Start() error {
return err return err
} }
if err := s.opts.Server.Register(); err != nil {
return err
}
for _, fn := range s.opts.AfterStart { for _, fn := range s.opts.AfterStart {
if err := fn(); err != nil { if err := fn(); err != nil {
return err return err
@@ -141,10 +99,6 @@ func (s *service) Stop() error {
} }
} }
if err := s.opts.Server.Deregister(); err != nil {
return err
}
if err := s.opts.Server.Stop(); err != nil { if err := s.opts.Server.Stop(); err != nil {
return err return err
} }
@@ -163,10 +117,6 @@ func (s *service) Run() error {
return err return err
} }
// start reg loop
ex := make(chan bool)
go s.run(ex)
ch := make(chan os.Signal, 1) ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
@@ -177,8 +127,5 @@ func (s *service) Run() error {
case <-s.opts.Context.Done(): case <-s.opts.Context.Done():
} }
// exit reg loop
close(ex)
return s.Stop() return s.Stop()
} }

View File

@@ -13,11 +13,11 @@ import (
"sync" "sync"
"time" "time"
"github.com/micro/h2c"
maddr "github.com/micro/util/go/lib/addr" maddr "github.com/micro/util/go/lib/addr"
mnet "github.com/micro/util/go/lib/net" mnet "github.com/micro/util/go/lib/net"
mls "github.com/micro/util/go/lib/tls" mls "github.com/micro/util/go/lib/tls"
"golang.org/x/net/http2" "golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
) )
type buffer struct { type buffer struct {
@@ -35,7 +35,7 @@ type httpTransportClient struct {
dialOpts DialOptions dialOpts DialOptions
once sync.Once once sync.Once
sync.Mutex sync.RWMutex
r chan *http.Request r chan *http.Request
bl []*http.Request bl []*http.Request
buff *bufio.Reader buff *bufio.Reader
@@ -133,11 +133,11 @@ func (h *httpTransportClient) Recv(m *Message) error {
r = rc r = rc
} }
h.Lock() h.RLock()
defer h.Unlock()
if h.buff == nil { if h.buff == nil {
return io.EOF return io.EOF
} }
h.RUnlock()
// set timeout if its greater than 0 // set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) { if h.ht.opts.Timeout > time.Duration(0) {
@@ -424,10 +424,7 @@ func (h *httpTransportListener) Accept(fn func(Socket)) error {
// insecure connection use h2c // insecure connection use h2c
if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) { if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) {
srv.Handler = &h2c.HandlerH2C{ srv.Handler = h2c.NewHandler(mux, &http2.Server{})
Handler: mux,
H2Server: &http2.Server{},
}
} }
// begin serving // begin serving