Compare commits

...

24 Commits

Author SHA1 Message Date
Asim Aslam
31fc8df2ba add server request body 2019-02-04 13:13:03 +00:00
Asim Aslam
baf7de76bf Merge branch 'master' of github.com:micro/go-micro 2019-02-04 10:29:26 +00:00
Asim Aslam
31b6cad47b make copy before writing 2019-02-04 10:29:10 +00:00
Asim Aslam
686171c26d Merge pull request #413 from qkzsky/qkzsky-rpc-fix
client close: rpc: unable to write error response
2019-02-03 13:13:10 +00:00
kuangzhiqiang
6be205fd40 client close: rpc: unable to write error response
when client close notice: "rpc: unable to write error response..."
2019-02-03 19:12:13 +08:00
Asim Aslam
89014160fc Merge pull request #411 from unistack-org/gossip
registry: gossip unify registry option passing, optimize
2019-02-01 22:21:08 +00:00
422e2002a0 registry: gossip unify registry option passing, optimize
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2019-02-02 01:12:39 +03:00
Asim Aslam
cead99ac44 fix nil pointer 2019-02-01 16:01:51 +00:00
Asim Aslam
c03d935ffd fallback for 0.14.0 and older 2019-02-01 15:57:34 +00:00
Asim Aslam
88e12347d0 update mdns to remove race condition 2019-02-01 13:41:11 +00:00
Asim Aslam
652b1067f5 fix data race 2019-02-01 09:05:03 +00:00
Asim Aslam
7888d3e13d use official h2c server 2019-01-31 17:14:36 +00:00
Asim Aslam
b1a31134bd Support micro proxy 2019-01-30 18:42:11 +00:00
Asim Aslam
107b571019 Add go mod 2019-01-30 11:43:40 +00:00
Asim Aslam
89c8e1f4a7 update readme 2019-01-29 09:20:34 +00:00
Asim Aslam
a06cd72337 update image 2019-01-29 09:08:14 +00:00
Asim Aslam
e22fa01935 fix ticker 2019-01-24 16:08:04 +00:00
Asim Aslam
a5015692e3 Merge pull request #400 from micro/interval
Move RegisterInterval into the server
2019-01-24 13:55:05 +00:00
Asim Aslam
539b8c1a3b Move RegisterInterval into the server 2019-01-24 13:22:17 +00:00
Asim Aslam
67a738b504 Merge pull request #399 from unistack-org/master
add context to SubscriberOptions
2019-01-24 13:11:33 +00:00
ac1afea7fc add context to server.SubscriberOptions and broker.SubscribeOption
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2019-01-24 15:36:01 +03:00
Asim Aslam
8090f9968d Update headers to remove X- prefix 2019-01-24 10:11:02 +00:00
Asim Aslam
7542aafd29 Update package comment 2019-01-23 18:15:17 +00:00
Asim Aslam
13de868b21 Rename 2019-01-23 18:14:36 +00:00
28 changed files with 835 additions and 384 deletions

View File

@@ -1,6 +1,6 @@
# Go Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![GoDoc](https://godoc.org/github.com/micro/go-micro?status.svg)](https://godoc.org/github.com/micro/go-micro) [![Travis CI](https://api.travis-ci.org/micro/go-micro.svg?branch=master)](https://travis-ci.org/micro/go-micro) [![Go Report Card](https://goreportcard.com/badge/micro/go-micro)](https://goreportcard.com/report/github.com/micro/go-micro)
Go Micro is a pluggable framework for micro service development.
Go Micro is a framework for micro service development.
## Overview
@@ -8,7 +8,7 @@ Go Micro provides the core requirements for distributed systems development incl
The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly
but everything can be easily swapped out.
<img src="https://micro.mu/docs/images/go-micro.png" />
<img src="https://micro.mu/docs/images/go-micro.svg" />
Plugins are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).

View File

@@ -111,3 +111,10 @@ func TLSConfig(t *tls.Config) Option {
o.TLSConfig = t
}
}
// SubscribeContext set context
func SubscribeContext(ctx context.Context) SubscribeOption {
return func(o *SubscribeOptions) {
o.Context = ctx
}
}

View File

@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"net"
"os"
"strconv"
"sync"
"sync/atomic"
@@ -213,12 +214,17 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request
codec: codec,
}
// set request codec
if r, ok := req.(*rpcRequest); ok {
r.codec = codec
}
stream := &rpcStream{
context: ctx,
request: req,
response: rsp,
closed: make(chan bool),
codec: newRpcCodec(msg, c, cf),
codec: codec,
}
ch := make(chan error, 1)
@@ -268,6 +274,18 @@ func (r *rpcClient) Options() Options {
}
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
service := request.Service()
// get proxy
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
service = prx
}
// get proxy address
if prx := os.Getenv("MICRO_PROXY_ADDRESS"); len(prx) > 0 {
opts.Address = prx
}
// return remote address
if len(opts.Address) > 0 {
address := opts.Address
@@ -288,11 +306,11 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro
}
// get next nodes from the selector
next, err := r.opts.Selector.Select(request.Service(), opts.SelectOptions...)
next, err := r.opts.Selector.Select(service, opts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", "service %s: %v", request.Service(), err.Error())
return nil, errors.NotFound("go.micro.client", "service %s: %v", service, err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", request.Service(), err.Error())
return nil, errors.InternalServerError("go.micro.client", "error selecting %s node: %v", service, err.Error())
}
return next, nil
@@ -487,8 +505,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
id := uuid.New().String()
md["Content-Type"] = msg.ContentType()
md["X-Micro-Topic"] = msg.Topic()
md["X-Micro-Id"] = id
md["Micro-Topic"] = msg.Topic()
md["Micro-Id"] = id
// encode message body
cf, err := r.newCodec(msg.ContentType())
@@ -500,8 +518,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
Target: msg.Topic(),
Type: codec.Publication,
Header: map[string]string{
"X-Micro-Id": id,
"X-Micro-Topic": msg.Topic(),
"Micro-Id": id,
"Micro-Topic": msg.Topic(),
},
}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())

View File

@@ -84,6 +84,50 @@ func (rwc *readWriteCloser) Close() error {
return nil
}
func getHeaders(m *codec.Message) {
get := func(hdr string) string {
if hd := m.Header[hdr]; len(hd) > 0 {
return hd
}
// old
return m.Header["X-"+hdr]
}
// check error in header
if len(m.Error) == 0 {
m.Error = get("Micro-Error")
}
// check endpoint in header
if len(m.Endpoint) == 0 {
m.Endpoint = get("Micro-Endpoint")
}
// check method in header
if len(m.Method) == 0 {
m.Method = get("Micro-Method")
}
if len(m.Id) == 0 {
m.Id = get("Micro-Id")
}
}
func setHeaders(m *codec.Message) {
set := func(hdr, v string) {
if len(v) == 0 {
return
}
m.Header[hdr] = v
m.Header["X-"+hdr] = v
}
set("Micro-Id", m.Id)
set("Micro-Service", m.Target)
set("Micro-Method", m.Method)
set("Micro-Endpoint", m.Endpoint)
}
// setupProtocol sets up the old protocol
func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
protocol := node.Metadata["protocol"]
@@ -133,10 +177,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
}
// set the mucp headers
m.Header["X-Micro-Id"] = m.Id
m.Header["X-Micro-Service"] = m.Target
m.Header["X-Micro-Method"] = m.Method
m.Header["X-Micro-Endpoint"] = m.Endpoint
setHeaders(m)
// if body is bytes Frame don't encode
if body != nil {
@@ -162,52 +203,32 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
Header: m.Header,
Body: m.Body,
}
// send the request
if err := c.client.Send(&msg); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error())
}
return nil
}
func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error {
var m transport.Message
if err := c.client.Recv(&m); err != nil {
func (c *rpcCodec) ReadHeader(m *codec.Message, r codec.MessageType) error {
var tm transport.Message
// read message from transport
if err := c.client.Recv(&tm); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error())
}
c.buf.rbuf.Reset()
c.buf.rbuf.Write(m.Body)
var me codec.Message
// set headers
me.Header = m.Header
c.buf.rbuf.Reset()
c.buf.rbuf.Write(tm.Body)
// set headers from transport
m.Header = tm.Header
// read header
err := c.codec.ReadHeader(&me, r)
wm.Endpoint = me.Endpoint
wm.Method = me.Method
wm.Id = me.Id
wm.Error = me.Error
err := c.codec.ReadHeader(m, r)
// check error in header
if len(me.Error) == 0 {
wm.Error = me.Header["X-Micro-Error"]
}
// check endpoint in header
if len(me.Endpoint) == 0 {
wm.Endpoint = me.Header["X-Micro-Endpoint"]
}
// check method in header
if len(me.Method) == 0 {
wm.Method = me.Header["X-Micro-Method"]
}
if len(me.Id) == 0 {
wm.Id = me.Header["X-Micro-Id"]
}
// get headers
getHeaders(m)
// return header error
if err != nil {

View File

@@ -404,6 +404,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second))
}
if val := time.Duration(ctx.GlobalInt("register_interval")); val > 0 {
serverOpts = append(serverOpts, server.RegisterInterval(val*time.Second))
}
// client opts
if r := ctx.Int("client_retries"); r >= 0 {
clientOpts = append(clientOpts, client.Retries(r))

View File

@@ -29,8 +29,8 @@ func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
// service method
path := m.Header[":path"]
if len(path) == 0 || path[0] != '/' {
m.Target = m.Header["X-Micro-Service"]
m.Endpoint = m.Header["X-Micro-Endpoint"]
m.Target = m.Header["Micro-Service"]
m.Endpoint = m.Header["Micro-Endpoint"]
} else {
// [ , a.package.Foo, Bar]
parts := strings.Split(path, "/")

17
go.mod Normal file
View File

@@ -0,0 +1,17 @@
module github.com/micro/go-micro
require (
github.com/golang/protobuf v1.2.0
github.com/google/uuid v1.1.0
github.com/hashicorp/consul v1.4.2
github.com/hashicorp/memberlist v0.1.3
github.com/micro/cli v0.1.0
github.com/micro/go-log v0.1.0
github.com/micro/go-rcache v0.1.0
github.com/micro/h2c v1.0.0
github.com/micro/mdns v0.1.0
github.com/micro/util v0.1.0
github.com/mitchellh/hashstructure v1.0.0
github.com/pkg/errors v0.8.1
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3
)

101
go.sum Normal file
View File

@@ -0,0 +1,101 @@
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/go-log/log v0.1.0 h1:wudGTNsiGzrD5ZjgIkVZ517ugi2XRe9Q/xRCzwEO4/U=
github.com/go-log/log v0.1.0/go.mod h1:4mBwpdRMFLiuXZDCwU2lKQFsoSCo72j3HqBK9d81N2M=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/uuid v1.1.0 h1:Jf4mxPC/ziBnoPIdpQdPJ9OeiomAUHLvxmPRSPH9m4s=
github.com/google/uuid v1.1.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/consul v1.4.2 h1:D9iJoJb8Ehe/Zmr+UEE3U3FjOLZ4LUxqFMl4O43BM1U=
github.com/hashicorp/consul v1.4.2/go.mod h1:mFrjN1mfidgJfYP1xrJCF+AfRhr6Eaqhb2+sfyn/OOI=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.0 h1:wvCrVc9TjDls6+YGAF2hAifE1E5U1+b4tH6KdvN3Gig=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-rootcerts v1.0.0 h1:ueI78wUjYExhCvMLow4icJnayNNFRgy0d9EGs/a1T44=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3 h1:EmmoJme1matNzb+hMpDuR/0sbJSUisxyqBGG676r31M=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/micro/cli v0.0.0-20181223203424-1b0c9793c300/go.mod h1:x9x6qy+tXv17jzYWQup462+j3SIUgDa6vVTzU4IXy/w=
github.com/micro/cli v0.1.0 h1:5DT+QdbAPPQvB3gYTgwze7tFO1m+7DU1sz9XfQczbsc=
github.com/micro/cli v0.1.0/go.mod h1:jRT9gmfVKWSS6pkKcXQ8YhUyj6bzwxK8Fp5b0Y7qNnk=
github.com/micro/go-log v0.1.0 h1:szYSR+yyTsomZM2jyinJC5562DlqffSjHmTZFaeZ2vY=
github.com/micro/go-log v0.1.0/go.mod h1:qaFBF6d6Jk01Gz4cbMkCA2vVuLk3FSaLLjmEGrMCreA=
github.com/micro/go-micro v0.23.0/go.mod h1:3z3lfMkNU9Sr1L/CxL++8pVJmQapRo0N6kNjwYDtOVs=
github.com/micro/go-rcache v0.1.0 h1:YTIgANVHgBe1XOQ/yLICL+s2gbZCAdW+c2ckhekjkuc=
github.com/micro/go-rcache v0.1.0/go.mod h1:INzyZjXO5M+PmN2A33YxD4TaOY61xjFIM4CfSHv+At8=
github.com/micro/h2c v1.0.0 h1:ejw6MS5+WaUoMHRtqkVCCrrVzLMzOFEH52rEyd8Fl2I=
github.com/micro/h2c v1.0.0/go.mod h1:54sOOQW/GRlHhH43vKwOhUb+kHaXhVxR0d3CJhn9alE=
github.com/micro/mdns v0.0.0-20181201230301-9c3770d4057a/go.mod h1:SQG6o/94RinohLuB5noHSevg2Iqg2wXLDUn4lj2LWWo=
github.com/micro/mdns v0.1.0 h1:fuLybUsfynbigJmCot/54i+gwe0hpc/vtCMvWt2WfDI=
github.com/micro/mdns v0.1.0/go.mod h1:KJ0dW7KmicXU2BV++qkLlmHYcVv7/hHnbtguSWt9Aoc=
github.com/micro/util v0.1.0 h1:ghhF5KKRNlKMexzK+cWo6W6uRAZdKy1UKG/9O74NCYc=
github.com/micro/util v0.1.0/go.mod h1:MZgOs0nwxzv9k4xQo4fpF9IwZGF2O96F5/phP9X4/Sw=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.3 h1:1g0r1IvskvgL8rR+AcHzUA+oFmGcQlaIm4IqakufeMM=
github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0 h1:vKb8ShqSby24Yrqr/yDYkuFz8d0WUjys40rvnGC8aR0=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure v1.0.0 h1:ZkRJX1CyOoTkar7p/mLS5TZU4nJ1Rn/F8u9dGS02Q3Y=
github.com/mitchellh/hashstructure v1.0.0/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664 h1:YbZJ76lQ1BqNhVe7dKTSB67wDrc2VPRR75IyGyyPDX8=
golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 h1:ulvT7fqt0yHWzpJwI57MezWnYDVpCAYBVuYst/L+fAY=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc h1:WiYx1rIFmx8c0mXAFtv5D/mHyKe1+jmuP7PViuwqwuQ=
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@@ -1,4 +1,4 @@
// Package micro is a pluggable RPC framework for microservices
// Package micro is a pluggable framework for microservices
package micro
import (
@@ -42,7 +42,7 @@ type Publisher interface {
type Option func(*Options)
var (
HeaderPrefix = "X-Micro-"
HeaderPrefix = "Micro-"
)
// NewService creates and returns a new Service based on the packages within.

View File

Before

Width:  |  Height:  |  Size: 25 KiB

After

Width:  |  Height:  |  Size: 25 KiB

View File

@@ -22,9 +22,6 @@ type Options struct {
Registry registry.Registry
Transport transport.Transport
// Register loop interval
RegisterInterval time.Duration
// Before and After funcs
BeforeStart []func() error
BeforeStop []func() error
@@ -168,7 +165,7 @@ func RegisterTTL(t time.Duration) Option {
// RegisterInterval specifies the interval on which to re-register
func RegisterInterval(t time.Duration) Option {
return func(o *Options) {
o.RegisterInterval = t
o.Server.Init(server.RegisterInterval(t))
}
}

View File

@@ -1,6 +1,6 @@
# Gossip Registry
Gossip is a zero dependency registry which uses hashicorp/memberlist to broadcast registry information
Gossip is a zero dependency registry which uses github.com/hashicorp/memberlist to broadcast registry information
via the SWIM protocol.
## Usage
@@ -20,5 +20,5 @@ On startup you'll see something like
To join this gossip ring set the registry address using flag or env var
```bash
MICRO_REGISTRY_ADDRESS= 192.168.1.65:56390
MICRO_REGISTRY_ADDRESS=192.168.1.65:56390
```

View File

@@ -0,0 +1,17 @@
package gossip
import (
"context"
"github.com/micro/go-micro/registry"
)
// setRegistryOption returns a function to setup a context with given value
func setRegistryOption(k, v interface{}) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}

View File

@@ -7,9 +7,11 @@ import (
"io/ioutil"
"net"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/golang/protobuf/proto"
@@ -21,10 +23,35 @@ import (
"github.com/mitchellh/hashstructure"
)
// use registry.Result int32 values after it switches from string to int32 types
// type actionType int32
// type updateType int32
const (
addAction = "update"
delAction = "delete"
syncAction = "sync"
actionTypeInvalid int32 = iota
actionTypeCreate
actionTypeDelete
actionTypeUpdate
actionTypeSync
)
func actionTypeString(t int32) string {
switch t {
case actionTypeCreate:
return "create"
case actionTypeDelete:
return "delete"
case actionTypeUpdate:
return "update"
case actionTypeSync:
return "sync"
}
return "invalid"
}
const (
updateTypeInvalid int32 = iota
updateTypeService
)
type broadcast struct {
@@ -93,23 +120,18 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// shutdown old member
if g.member != nil {
g.member.Shutdown()
g.Stop()
}
// replace addresses
curAddrs = newAddrs
// create a queue
queue := &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(curAddrs)
},
RetransmitMult: 3,
}
// create a new default config
c := memberlist.DefaultLocalConfig()
// log to dev null
c.LogOutput = ioutil.Discard
if optConfig, ok := g.options.Context.Value(contextConfig{}).(*memberlist.Config); ok && optConfig != nil {
c = optConfig
}
@@ -145,15 +167,6 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// set the name
c.Name = strings.Join([]string{"micro", hostname, uuid.New().String()}, "-")
// set the delegate
c.Delegate = &delegate{
updates: g.updates,
queue: queue,
}
// log to dev null
c.LogOutput = ioutil.Discard
// set a secret key if secure
if g.options.Secure {
k, ok := g.options.Context.Value(contextSecretKey{}).([]byte)
@@ -164,6 +177,20 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
c.SecretKey = k
}
// create a queue
queue := &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(curAddrs)
},
RetransmitMult: 3,
}
// set the delegate
c.Delegate = &delegate{
updates: g.updates,
queue: queue,
}
// create the memberlist
m, err := memberlist.Create(c)
if err != nil {
@@ -187,29 +214,10 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
return nil
}
func (*broadcast) UniqueBroadcast() {}
func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
up := new(pb.Update)
if err := proto.Unmarshal(other.Message(), up); err != nil {
return false
}
// ids do not match
if b.update.Id == up.Id {
return false
}
// timestamps do not match
if b.update.Timestamp != up.Timestamp {
return false
}
// type does not match
if b.update.Type != up.Type {
return false
}
// invalidates
return true
return false
}
func (b *broadcast) Message() []byte {
@@ -242,7 +250,7 @@ func (d *delegate) NotifyMsg(b []byte) {
}
// only process service action
if up.Type != "service" {
if up.Type != updateTypeService {
return
}
@@ -280,7 +288,7 @@ func (d *delegate) LocalState(join bool) []byte {
d.updates <- &update{
Update: &pb.Update{
Action: syncAction,
Action: actionTypeSync,
},
sync: syncCh,
}
@@ -309,7 +317,7 @@ func (d *delegate) MergeRemoteState(buf []byte, join bool) {
for _, service := range services {
for _, srv := range service {
d.updates <- &update{
Update: &pb.Update{Action: addAction},
Update: &pb.Update{Action: actionTypeCreate},
Service: srv,
sync: nil,
}
@@ -350,6 +358,31 @@ func (g *gossipRegistry) subscribe() (chan *registry.Result, chan bool) {
return next, exit
}
func (g *gossipRegistry) wait() {
ctx := g.options.Context
if c, ok := ctx.Value(contextContext{}).(context.Context); ok && c != nil {
ctx = c
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
select {
// wait on kill signal
case <-ch:
// wait on context cancel
case <-ctx.Done():
}
g.Stop()
}
func (g *gossipRegistry) Stop() {
g.member.Leave(g.interval * 2)
g.member.Shutdown()
}
func (g *gossipRegistry) run() {
var mtx sync.Mutex
updates := map[uint64]*update{}
@@ -367,11 +400,11 @@ func (g *gossipRegistry) run() {
// process all the updates
for k, v := range updates {
// check if expiry time has passed
if d := (v.Update.Timestamp + v.Update.Expires); d < now {
if d := (v.Update.Expires); d < now {
// delete from records
delete(updates, k)
// set to delete
v.Update.Action = delAction
v.Update.Action = actionTypeDelete
// fire a new update
g.updates <- v
}
@@ -384,7 +417,7 @@ func (g *gossipRegistry) run() {
// process the updates
for u := range g.updates {
switch u.Update.Action {
case addAction:
case actionTypeCreate:
g.Lock()
if service, ok := g.services[u.Service.Name]; !ok {
g.services[u.Service.Name] = []*registry.Service{u.Service}
@@ -395,7 +428,7 @@ func (g *gossipRegistry) run() {
g.Unlock()
// publish update to watchers
go g.publish(addAction, []*registry.Service{u.Service})
go g.publish(actionTypeString(actionTypeCreate), []*registry.Service{u.Service})
// we need to expire the node at some point in the future
if u.Update.Expires > 0 {
@@ -406,7 +439,7 @@ func (g *gossipRegistry) run() {
mtx.Unlock()
}
}
case delAction:
case actionTypeDelete:
g.Lock()
if service, ok := g.services[u.Service.Name]; ok {
if services := delServices(service, []*registry.Service{u.Service}); len(services) == 0 {
@@ -418,7 +451,7 @@ func (g *gossipRegistry) run() {
g.Unlock()
// publish update to watchers
go g.publish(delAction, []*registry.Service{u.Service})
go g.publish(actionTypeString(actionTypeDelete), []*registry.Service{u.Service})
// delete from expiry checks
if hash, err := hashstructure.Hash(u.Service, nil); err == nil {
@@ -426,7 +459,7 @@ func (g *gossipRegistry) run() {
delete(updates, hash)
mtx.Unlock()
}
case syncAction:
case actionTypeSync:
// no sync channel provided
if u.sync == nil {
continue
@@ -441,7 +474,7 @@ func (g *gossipRegistry) run() {
}
// publish to watchers
go g.publish(addAction, service)
go g.publish(actionTypeString(actionTypeCreate), service)
}
g.RUnlock()
@@ -480,11 +513,9 @@ func (g *gossipRegistry) Register(s *registry.Service, opts ...registry.Register
}
up := &pb.Update{
Id: uuid.New().String(),
Timestamp: uint64(time.Now().UnixNano()),
Expires: uint64(options.TTL.Nanoseconds()),
Action: "update",
Type: "service",
Expires: uint64(time.Now().Add(options.TTL).UnixNano()),
Action: actionTypeCreate,
Type: updateTypeService,
Metadata: map[string]string{
"Content-Type": "application/json",
},
@@ -519,10 +550,8 @@ func (g *gossipRegistry) Deregister(s *registry.Service) error {
g.Unlock()
up := &pb.Update{
Id: uuid.New().String(),
Timestamp: uint64(time.Now().UnixNano()),
Action: "delete",
Type: "service",
Action: actionTypeDelete,
Type: updateTypeService,
Metadata: map[string]string{
"Content-Type": "application/json",
},
@@ -590,5 +619,7 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
// wait for setup
<-time.After(gossip.interval * 2)
go gossip.wait()
return gossip
}

View File

@@ -0,0 +1,199 @@
package gossip_test
import (
"context"
"os"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/hashicorp/memberlist"
micro "github.com/micro/go-micro"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/registry/gossip"
pb "github.com/micro/go-micro/registry/gossip/proto"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/server"
)
var (
r1 registry.Registry
r2 registry.Registry
mu sync.Mutex
)
func newConfig() *memberlist.Config {
wc := memberlist.DefaultLANConfig()
wc.DisableTcpPings = false
wc.GossipVerifyIncoming = false
wc.GossipVerifyOutgoing = false
wc.EnableCompression = false
wc.LogOutput = os.Stderr
wc.ProtocolVersion = 4
wc.Name = uuid.New().String()
return wc
}
func newRegistries() {
mu.Lock()
defer mu.Unlock()
if r1 != nil && r2 != nil {
return
}
wc1 := newConfig()
wc2 := newConfig()
rops1 := []registry.Option{gossip.Config(wc1), gossip.Address("127.0.0.1:54321")}
rops2 := []registry.Option{gossip.Config(wc2), gossip.Address("127.0.0.2:54321"), registry.Addrs("127.0.0.1:54321")}
r1 = gossip.NewRegistry(rops1...) // first started without members
r2 = gossip.NewRegistry(rops2...) // second started joining
}
func TestRegistryBroadcast(t *testing.T) {
newRegistries()
svc1 := &registry.Service{Name: "r1-svc", Version: "0.0.0.1"}
svc2 := &registry.Service{Name: "r2-svc", Version: "0.0.0.2"}
<-time.After(1 * time.Second)
if err := r1.Register(svc1); err != nil {
t.Fatal(err)
}
<-time.After(1 * time.Second)
if err := r2.Register(svc2); err != nil {
t.Fatal(err)
}
var found bool
svcs, err := r1.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "r2-svc" {
found = true
}
}
if !found {
t.Fatalf("r2-svc not found in r1, broadcast not work")
}
found = false
svcs, err = r2.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "r1-svc" {
found = true
}
}
if !found {
t.Fatalf("r1-svc not found in r2, broadcast not work")
}
if err := r1.Deregister(svc1); err != nil {
t.Fatal(err)
}
if err := r2.Deregister(svc2); err != nil {
t.Fatal(err)
}
}
func TestServerRegistry(t *testing.T) {
newRegistries()
_, err := newServer("s1", r1, t)
if err != nil {
t.Fatal(err)
}
_, err = newServer("s2", r2, t)
if err != nil {
t.Fatal(err)
}
svcs, err := r1.ListServices()
if err != nil {
t.Fatal(err)
}
if len(svcs) < 1 {
t.Fatalf("r1 svcs unknown %#+v\n", svcs)
}
found := false
for _, svc := range svcs {
if svc.Name == "s2" {
found = true
}
}
if !found {
t.Fatalf("r1 does not have s2, broadcast not work")
}
found = false
svcs, err = r2.ListServices()
if err != nil {
t.Fatal(err)
}
for _, svc := range svcs {
if svc.Name == "s1" {
found = true
}
}
if !found {
t.Fatalf("r2 does not have s1, broadcast not work")
}
}
type testServer struct{}
func (*testServer) Test(ctx context.Context, req *pb.Update, rsp *pb.Update) error {
return nil
}
func newServer(n string, r registry.Registry, t *testing.T) (micro.Service, error) {
h := &testServer{}
var wg sync.WaitGroup
wg.Add(1)
sopts := []server.Option{
server.Name(n),
server.Registry(r),
}
copts := []client.Option{
client.Selector(selector.NewSelector(selector.Registry(r))),
client.Registry(r),
}
srv := micro.NewService(
micro.Server(server.NewServer(sopts...)),
micro.Client(client.NewClient(copts...)),
micro.AfterStart(func() error {
wg.Done()
return nil
}),
)
srv.Server().NewHandler(h)
go func() {
t.Fatal(srv.Run())
}()
wg.Wait()
return srv, nil
}

View File

@@ -12,34 +12,35 @@ type contextSecretKey struct{}
// Secret specifies an encryption key. The value should be either
// 16, 24, or 32 bytes to select AES-128, AES-192, or AES-256.
func Secret(k []byte) registry.Option {
return func(o *registry.Options) {
o.Context = context.WithValue(o.Context, contextSecretKey{}, k)
}
return setRegistryOption(contextSecretKey{}, k)
}
type contextAddress struct{}
// Address to bind to - host:port
func Address(a string) registry.Option {
return func(o *registry.Options) {
o.Context = context.WithValue(o.Context, contextAddress{}, a)
}
return setRegistryOption(contextAddress{}, a)
}
type contextConfig struct{}
// Config allow to inject a *memberlist.Config struct for configuring gossip
func Config(c *memberlist.Config) registry.Option {
return func(o *registry.Options) {
o.Context = context.WithValue(o.Context, contextConfig{}, c)
}
return setRegistryOption(contextConfig{}, c)
}
type contextAdvertise struct{}
// The address to advertise for other gossip members - host:port
func Advertise(a string) registry.Option {
return func(o *registry.Options) {
o.Context = context.WithValue(o.Context, contextAdvertise{}, a)
}
return setRegistryOption(contextAdvertise{}, a)
}
type contextContext struct{}
// Context specifies a context for the registry.
// Can be used to signal shutdown of the registry.
// Can be used for extra option values.
func Context(ctx context.Context) registry.Option {
return setRegistryOption(contextContext{}, ctx)
}

View File

@@ -3,9 +3,12 @@
package gossip
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
fmt "fmt"
math "math"
proto "github.com/golang/protobuf/proto"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
@@ -20,16 +23,12 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// Update is the message broadcast
type Update struct {
// unique id of update
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
// unix nano timestamp of update
Timestamp uint64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// time to live for entry
Expires uint64 `protobuf:"varint,3,opt,name=expires,proto3" json:"expires,omitempty"`
// type of update; service
Type string `protobuf:"bytes,4,opt,name=type,proto3" json:"type,omitempty"`
// what action is taken; add, del, put
Action string `protobuf:"bytes,5,opt,name=action,proto3" json:"action,omitempty"`
Expires uint64 `protobuf:"varint,1,opt,name=expires,proto3" json:"expires,omitempty"`
// type of update
Type int32 `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"`
// what action is taken
Action int32 `protobuf:"varint,3,opt,name=action,proto3" json:"action,omitempty"`
// any other associated metadata about the data
Metadata map[string]string `protobuf:"bytes,6,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
// the payload data;
@@ -43,16 +42,17 @@ func (m *Update) Reset() { *m = Update{} }
func (m *Update) String() string { return proto.CompactTextString(m) }
func (*Update) ProtoMessage() {}
func (*Update) Descriptor() ([]byte, []int) {
return fileDescriptor_gossip_fd1eb378131a5d12, []int{0}
return fileDescriptor_18cba623e76e57f3, []int{0}
}
func (m *Update) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Update.Unmarshal(m, b)
}
func (m *Update) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Update.Marshal(b, m, deterministic)
}
func (dst *Update) XXX_Merge(src proto.Message) {
xxx_messageInfo_Update.Merge(dst, src)
func (m *Update) XXX_Merge(src proto.Message) {
xxx_messageInfo_Update.Merge(m, src)
}
func (m *Update) XXX_Size() int {
return xxx_messageInfo_Update.Size(m)
@@ -63,20 +63,6 @@ func (m *Update) XXX_DiscardUnknown() {
var xxx_messageInfo_Update proto.InternalMessageInfo
func (m *Update) GetId() string {
if m != nil {
return m.Id
}
return ""
}
func (m *Update) GetTimestamp() uint64 {
if m != nil {
return m.Timestamp
}
return 0
}
func (m *Update) GetExpires() uint64 {
if m != nil {
return m.Expires
@@ -84,18 +70,18 @@ func (m *Update) GetExpires() uint64 {
return 0
}
func (m *Update) GetType() string {
func (m *Update) GetType() int32 {
if m != nil {
return m.Type
}
return ""
return 0
}
func (m *Update) GetAction() string {
func (m *Update) GetAction() int32 {
if m != nil {
return m.Action
}
return ""
return 0
}
func (m *Update) GetMetadata() map[string]string {
@@ -118,25 +104,24 @@ func init() {
}
func init() {
proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor_gossip_fd1eb378131a5d12)
proto.RegisterFile("github.com/micro/go-micro/registry/gossip/proto/gossip.proto", fileDescriptor_18cba623e76e57f3)
}
var fileDescriptor_gossip_fd1eb378131a5d12 = []byte{
// 251 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x90, 0xcf, 0x4a, 0xc4, 0x30,
0x10, 0x87, 0x69, 0xb6, 0x9b, 0xb5, 0xe3, 0x1f, 0x64, 0x10, 0x09, 0xb2, 0x87, 0xe2, 0xa9, 0x17,
0x5b, 0xd0, 0xcb, 0xa2, 0x5e, 0x3d, 0x7a, 0x09, 0xf8, 0x00, 0xd9, 0x36, 0xd4, 0xa0, 0xd9, 0x84,
0x64, 0x56, 0xec, 0x13, 0xf8, 0xda, 0xb2, 0x69, 0x54, 0xbc, 0x7d, 0xdf, 0xcc, 0x24, 0x99, 0x5f,
0xe0, 0x71, 0x34, 0xf4, 0xba, 0xdf, 0xb6, 0xbd, 0xb3, 0x9d, 0x35, 0x7d, 0x70, 0xdd, 0xe8, 0x6e,
0x66, 0x08, 0x7a, 0x34, 0x91, 0xc2, 0xd4, 0x8d, 0x2e, 0x46, 0xe3, 0x3b, 0x1f, 0x1c, 0xb9, 0x2c,
0x6d, 0x12, 0xe4, 0xb3, 0x5d, 0x7f, 0x31, 0xe0, 0x2f, 0x7e, 0x50, 0xa4, 0xf1, 0x0c, 0x98, 0x19,
0x44, 0x51, 0x17, 0x4d, 0x25, 0x99, 0x19, 0x70, 0x0d, 0x15, 0x19, 0xab, 0x23, 0x29, 0xeb, 0x05,
0xab, 0x8b, 0xa6, 0x94, 0x7f, 0x05, 0x14, 0xb0, 0xd2, 0x9f, 0xde, 0x04, 0x1d, 0xc5, 0x22, 0xf5,
0x7e, 0x14, 0x11, 0x4a, 0x9a, 0xbc, 0x16, 0x65, 0xba, 0x29, 0x31, 0x5e, 0x02, 0x57, 0x3d, 0x19,
0xb7, 0x13, 0xcb, 0x54, 0xcd, 0x86, 0x1b, 0x38, 0xb2, 0x9a, 0xd4, 0xa0, 0x48, 0x09, 0x5e, 0x2f,
0x9a, 0xe3, 0xdb, 0x75, 0x9b, 0xf7, 0x9c, 0xb7, 0x6a, 0x9f, 0x73, 0xfb, 0x69, 0x47, 0x61, 0x92,
0xbf, 0xd3, 0x87, 0x57, 0xd2, 0xa9, 0x55, 0x5d, 0x34, 0x27, 0x32, 0xf1, 0xd5, 0x03, 0x9c, 0xfe,
0x1b, 0xc7, 0x73, 0x58, 0xbc, 0xe9, 0x29, 0x67, 0x3a, 0x20, 0x5e, 0xc0, 0xf2, 0x43, 0xbd, 0xef,
0x75, 0x0a, 0x54, 0xc9, 0x59, 0xee, 0xd9, 0xa6, 0xd8, 0xf2, 0xf4, 0x31, 0x77, 0xdf, 0x01, 0x00,
0x00, 0xff, 0xff, 0x06, 0x6e, 0x00, 0x3c, 0x58, 0x01, 0x00, 0x00,
var fileDescriptor_18cba623e76e57f3 = []byte{
// 227 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x54, 0x8f, 0xc1, 0x4a, 0x03, 0x31,
0x14, 0x45, 0x49, 0xa7, 0x4d, 0xed, 0x53, 0x41, 0x1e, 0x22, 0x41, 0x5c, 0x0c, 0xae, 0x66, 0xe3,
0x0c, 0xe8, 0xa6, 0xa8, 0x5b, 0x97, 0x6e, 0x02, 0x7e, 0x40, 0x3a, 0x0d, 0x31, 0xe8, 0x34, 0x21,
0x79, 0x15, 0xf3, 0xa9, 0xfe, 0x8d, 0x34, 0x89, 0x42, 0x77, 0xe7, 0x24, 0x37, 0xdc, 0x1b, 0x78,
0x36, 0x96, 0xde, 0xf7, 0x9b, 0x7e, 0x74, 0xd3, 0x30, 0xd9, 0x31, 0xb8, 0xc1, 0xb8, 0xbb, 0x02,
0x41, 0x1b, 0x1b, 0x29, 0xa4, 0xc1, 0xb8, 0x18, 0xad, 0x1f, 0x7c, 0x70, 0xe4, 0xaa, 0xf4, 0x59,
0x90, 0x17, 0xbb, 0xfd, 0x61, 0xc0, 0xdf, 0xfc, 0x56, 0x91, 0x46, 0x01, 0x4b, 0xfd, 0xed, 0x6d,
0xd0, 0x51, 0xb0, 0x96, 0x75, 0x73, 0xf9, 0xa7, 0x88, 0x30, 0xa7, 0xe4, 0xb5, 0x98, 0xb5, 0xac,
0x5b, 0xc8, 0xcc, 0x78, 0x05, 0x5c, 0x8d, 0x64, 0xdd, 0x4e, 0x34, 0xf9, 0xb4, 0x1a, 0xae, 0xe1,
0x64, 0xd2, 0xa4, 0xb6, 0x8a, 0x94, 0xe0, 0x6d, 0xd3, 0x9d, 0xde, 0xdf, 0xf4, 0xb5, 0xb9, 0xf4,
0xf4, 0xaf, 0xf5, 0xfa, 0x65, 0x47, 0x21, 0xc9, 0xff, 0xf4, 0xa1, 0x25, 0xbf, 0x5a, 0xb6, 0xac,
0x3b, 0x93, 0x99, 0xaf, 0x9f, 0xe0, 0xfc, 0x28, 0x8e, 0x17, 0xd0, 0x7c, 0xe8, 0x94, 0x07, 0xae,
0xe4, 0x01, 0xf1, 0x12, 0x16, 0x5f, 0xea, 0x73, 0x5f, 0xd6, 0xad, 0x64, 0x91, 0xc7, 0xd9, 0x9a,
0x6d, 0x78, 0xfe, 0xea, 0xc3, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xd6, 0x63, 0x7b, 0x1b, 0x2a,
0x01, 0x00, 0x00,
}

View File

@@ -4,16 +4,12 @@ package gossip;
// Update is the message broadcast
message Update {
// unique id of update
string id = 1;
// unix nano timestamp of update
uint64 timestamp = 2;
// time to live for entry
uint64 expires = 3;
// type of update; service
string type = 4;
// what action is taken; add, del, put
string action = 5;
uint64 expires = 1;
// type of update
int32 type = 2;
// what action is taken
int32 action = 3;
// any other associated metadata about the data
map<string, string> metadata = 6;
// the payload data;

View File

@@ -70,21 +70,30 @@ func addNodes(old, neu []*registry.Node) []*registry.Node {
}
func addServices(old, neu []*registry.Service) []*registry.Service {
var srv []*registry.Service
for _, s := range neu {
var seen bool
for i, o := range old {
for _, o := range old {
if o.Version == s.Version {
s.Nodes = addNodes(o.Nodes, s.Nodes)
sp := new(registry.Service)
// make copy
*sp = *o
// set nodes
sp.Nodes = addNodes(o.Nodes, s.Nodes)
// mark as seen
seen = true
old[i] = s
srv = append(srv, sp)
break
}
}
if !seen {
old = append(old, s)
srv = append(srv, cp([]*registry.Service{s})...)
}
}
return old
return srv
}
func delNodes(old, del []*registry.Node) []*registry.Node {

View File

@@ -2,6 +2,7 @@
package registry
import (
"context"
"net"
"strings"
"sync"
@@ -194,20 +195,20 @@ func (m *mdnsRegistry) Deregister(service *Service) error {
}
func (m *mdnsRegistry) GetService(service string) ([]*Service, error) {
p := mdns.DefaultParams(service)
p.Timeout = m.opts.Timeout
entryCh := make(chan *mdns.ServiceEntry, 10)
p.Entries = entryCh
exit := make(chan bool)
defer close(exit)
serviceMap := make(map[string]*Service)
entries := make(chan *mdns.ServiceEntry, 10)
done := make(chan bool)
p := mdns.DefaultParams(service)
// set context with timeout
p.Context, _ = context.WithTimeout(context.Background(), m.opts.Timeout)
// set entries channel
p.Entries = entries
go func() {
for {
select {
case e := <-entryCh:
case e := <-entries:
// list record so skip
if p.Service == "_services" {
continue
@@ -243,16 +244,21 @@ func (m *mdnsRegistry) GetService(service string) ([]*Service, error) {
})
serviceMap[txt.Version] = s
case <-exit:
case <-p.Context.Done():
close(done)
return
}
}
}()
// execute the query
if err := mdns.Query(p); err != nil {
return nil, err
}
// wait for completion
<-done
// create list and return
var services []*Service
@@ -264,21 +270,22 @@ func (m *mdnsRegistry) GetService(service string) ([]*Service, error) {
}
func (m *mdnsRegistry) ListServices() ([]*Service, error) {
p := mdns.DefaultParams("_services")
p.Timeout = m.opts.Timeout
entryCh := make(chan *mdns.ServiceEntry, 10)
p.Entries = entryCh
exit := make(chan bool)
defer close(exit)
serviceMap := make(map[string]bool)
entries := make(chan *mdns.ServiceEntry, 10)
done := make(chan bool)
p := mdns.DefaultParams("_services")
// set context with timeout
p.Context, _ = context.WithTimeout(context.Background(), m.opts.Timeout)
// set entries channel
p.Entries = entries
var services []*Service
go func() {
for {
select {
case e := <-entryCh:
case e := <-entries:
if e.TTL == 0 {
continue
}
@@ -288,16 +295,21 @@ func (m *mdnsRegistry) ListServices() ([]*Service, error) {
serviceMap[name] = true
services = append(services, &Service{Name: name})
}
case <-exit:
case <-p.Context.Done():
close(done)
return
}
}
}()
// execute query
if err := mdns.Query(p); err != nil {
return nil, err
}
// wait till done
<-done
return services, nil
}

View File

@@ -1,13 +1,20 @@
package server
import "context"
type HandlerOption func(*HandlerOptions)
type HandlerOptions struct {
Internal bool
Metadata map[string]map[string]string
}
type SubscriberOption func(*SubscriberOptions)
type SubscriberOptions struct {
Queue string
Internal bool
Context context.Context
}
// EndpointMetadata is a Handler option that allows metadata to be added to
@@ -34,6 +41,17 @@ func InternalSubscriber(b bool) SubscriberOption {
o.Internal = b
}
}
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
opt := SubscriberOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&opt)
}
return opt
}
// Shared queue name distributed messages across subscribers
func SubscriberQueue(n string) SubscriberOption {
@@ -41,3 +59,10 @@ func SubscriberQueue(n string) SubscriberOption {
o.Queue = n
}
}
// SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberContext(ctx context.Context) SubscriberOption {
return func(o *SubscriberOptions) {
o.Context = ctx
}
}

View File

@@ -27,6 +27,8 @@ type Options struct {
// The register expiry time
RegisterTTL time.Duration
// The interval on which to register
RegisterInterval time.Duration
// The router for requests
Router Router
@@ -168,6 +170,13 @@ func RegisterTTL(t time.Duration) Option {
}
}
// Register the service with at interval
func RegisterInterval(t time.Duration) Option {
return func(o *Options) {
o.RegisterInterval = t
}
}
// WithRouter sets the request router
func WithRouter(r Router) Option {
return func(o *Options) {

View File

@@ -66,13 +66,63 @@ func (rwc *readWriteCloser) Close() error {
return nil
}
func getHeader(hdr string, md map[string]string) string {
if hd := md[hdr]; len(hd) > 0 {
return hd
}
return md["X-"+hdr]
}
func getHeaders(m *codec.Message) {
get := func(hdr, v string) string {
if len(v) > 0 {
return v
}
if hd := m.Header[hdr]; len(hd) > 0 {
return hd
}
// old
return m.Header["X-"+hdr]
}
m.Id = get("Micro-Id", m.Id)
m.Error = get("Micro-Error", m.Error)
m.Endpoint = get("Micro-Endpoint", m.Endpoint)
m.Method = get("Micro-Method", m.Method)
m.Target = get("Micro-Service", m.Target)
// TODO: remove this cruft
if len(m.Endpoint) == 0 {
m.Endpoint = m.Method
}
}
func setHeaders(m, r *codec.Message) {
set := func(hdr, v string) {
if len(v) == 0 {
return
}
m.Header[hdr] = v
m.Header["X-"+hdr] = v
}
// set headers
set("Micro-Id", r.Id)
set("Micro-Service", r.Target)
set("Micro-Method", r.Method)
set("Micro-Endpoint", r.Endpoint)
set("Micro-Error", r.Error)
}
// setupProtocol sets up the old protocol
func setupProtocol(msg *transport.Message) codec.NewCodec {
service := msg.Header["X-Micro-Service"]
method := msg.Header["X-Micro-Method"]
endpoint := msg.Header["X-Micro-Endpoint"]
protocol := msg.Header["X-Micro-Protocol"]
target := msg.Header["X-Micro-Target"]
service := getHeader("Micro-Service", msg.Header)
method := getHeader("Micro-Method", msg.Header)
endpoint := getHeader("Micro-Endpoint", msg.Header)
protocol := getHeader("Micro-Protocol", msg.Header)
target := getHeader("Micro-Target", msg.Header)
// if the protocol exists (mucp) do nothing
if len(protocol) > 0 {
@@ -91,12 +141,12 @@ func setupProtocol(msg *transport.Message) codec.NewCodec {
// no method then set to endpoint
if len(method) == 0 {
msg.Header["X-Micro-Method"] = method
msg.Header["Micro-Method"] = endpoint
}
// no endpoint then set to method
if len(endpoint) == 0 {
msg.Header["X-Micro-Endpoint"] = method
msg.Header["Micro-Endpoint"] = method
}
return nil
@@ -118,7 +168,7 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
}
func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
// the initieal message
// the initial message
m := codec.Message{
Header: c.req.Header,
Body: c.req.Body,
@@ -153,25 +203,22 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
c.first = false
// set some internal things
m.Target = m.Header["X-Micro-Service"]
m.Method = m.Header["X-Micro-Method"]
m.Endpoint = m.Header["X-Micro-Endpoint"]
m.Id = m.Header["X-Micro-Id"]
getHeaders(&m)
// read header via codec
err := c.codec.ReadHeader(&m, codec.Request)
// set the method/id
r.Method = m.Method
r.Endpoint = m.Endpoint
r.Id = m.Id
// TODO: remove the old legacy cruft
if len(r.Endpoint) == 0 {
r.Endpoint = r.Method
if err := c.codec.ReadHeader(&m, codec.Request); err != nil {
return err
}
return err
// fallback for 0.14 and older
if len(m.Endpoint) == 0 {
m.Endpoint = m.Method
}
// set message
*r = m
return nil
}
func (c *rpcCodec) ReadBody(b interface{}) error {
@@ -206,29 +253,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
m.Header = map[string]string{}
}
// set request id
if len(r.Id) > 0 {
m.Header["X-Micro-Id"] = r.Id
}
// set target
if len(r.Target) > 0 {
m.Header["X-Micro-Service"] = r.Target
}
// set request method
if len(r.Method) > 0 {
m.Header["X-Micro-Method"] = r.Method
}
// set request endpoint
if len(r.Endpoint) > 0 {
m.Header["X-Micro-Endpoint"] = r.Endpoint
}
if len(r.Error) > 0 {
m.Header["X-Micro-Error"] = r.Error
}
setHeaders(m, r)
// the body being sent
var body []byte
@@ -246,6 +271,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
// write an error if it failed
m.Error = errors.Wrapf(err, "Unable to encode body").Error()
m.Header["X-Micro-Error"] = m.Error
m.Header["Micro-Error"] = m.Error
// no body to write
if err := c.codec.Write(m, nil); err != nil {
return err

View File

@@ -47,6 +47,11 @@ func (r *rpcRequest) Header() map[string]string {
return r.header
}
func (r *rpcRequest) Body() interface{} {
// TODO: convert to interface value
return r.body
}
func (r *rpcRequest) Read() ([]byte, error) {
// got a body
if r.body != nil {

View File

@@ -10,7 +10,7 @@ import (
"sync"
"time"
"github.com/micro/go-log"
log "github.com/micro/go-log"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/metadata"
@@ -120,9 +120,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// internal request
request := &rpcRequest{
service: msg.Header["X-Micro-Service"],
method: msg.Header["X-Micro-Method"],
endpoint: msg.Header["X-Micro-Endpoint"],
service: getHeader("Micro-Service", msg.Header),
method: getHeader("Micro-Method", msg.Header),
endpoint: getHeader("Micro-Endpoint", msg.Header),
contentType: ct,
codec: rcodec,
header: msg.Header,
@@ -157,15 +157,17 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// TODO: handle error better
if err := handler(ctx, request, response); err != nil {
// write an error response
err = rcodec.Write(&codec.Message{
Header: msg.Header,
Error: err.Error(),
Type: codec.Error,
}, nil)
// could not write the error response
if err != nil {
log.Logf("rpc: unable to write error response: %v", err)
if err != lastStreamResponseError {
// write an error response
err = rcodec.Write(&codec.Message{
Header: msg.Header,
Error: err.Error(),
Type: codec.Error,
}, nil)
// could not write the error response
if err != nil {
log.Logf("rpc: unable to write error response: %v", err)
}
}
s.wg.Done()
return
@@ -274,12 +276,18 @@ func (s *rpcServer) Register() error {
return err
}
// make copy of metadata
md := make(metadata.Metadata)
for k, v := range config.Metadata {
md[k] = v
}
// register service
node := &registry.Node{
Id: config.Name + "-" + config.Id,
Address: addr,
Port: port,
Metadata: config.Metadata,
Metadata: md,
}
node.Metadata["transport"] = config.Transport.String()
@@ -357,6 +365,9 @@ func (s *rpcServer) Register() error {
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.Queue(queue))
}
if cx := sb.Options().Context; cx != nil {
opts = append(opts, broker.SubscribeContext(cx))
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil {
return err
@@ -436,6 +447,7 @@ func (s *rpcServer) Start() error {
registerDebugHandler(s)
config := s.Options()
// start listening on the transport
ts, err := config.Transport.Listen(config.Address)
if err != nil {
return err
@@ -443,30 +455,45 @@ func (s *rpcServer) Start() error {
log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
s.Lock()
// swap address
s.Lock()
addr := s.opts.Address
s.opts.Address = ts.Addr()
s.Unlock()
exit := make(chan bool, 1)
// connect to the broker
if err := config.Broker.Connect(); err != nil {
return err
}
log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address())
// announce self to the world
if err := s.Register(); err != nil {
log.Log("Server register error: ", err)
}
exit := make(chan bool)
go func() {
for {
// listen for connections
err := ts.Accept(s.ServeConn)
// check if we're supposed to exit
// TODO: listen for messages
// msg := broker.Exchange(service).Consume()
select {
// check if we're supposed to exit
case <-exit:
return
default:
}
// check the error and backoff
if err != nil {
log.Logf("Accept error: %v", err)
time.Sleep(time.Second)
continue
default:
if err != nil {
log.Logf("Accept error: %v", err)
time.Sleep(time.Second)
continue
}
}
// no error just exit
@@ -475,9 +502,37 @@ func (s *rpcServer) Start() error {
}()
go func() {
// wait for exit
ch := <-s.exit
exit <- true
t := new(time.Ticker)
// only process if it exists
if s.opts.RegisterInterval > time.Duration(0) {
// new ticker
t = time.NewTicker(s.opts.RegisterInterval)
}
// return error chan
var ch chan error
Loop:
for {
select {
// register self on interval
case <-t.C:
if err := s.Register(); err != nil {
log.Log("Server register error: ", err)
}
// wait for exit
case ch = <-s.exit:
t.Stop()
close(exit)
break Loop
}
}
// deregister self
if err := s.Deregister(); err != nil {
log.Log("Server deregister error: ", err)
}
// wait for requests to finish
if wait(s.opts.Context) {
@@ -490,18 +545,12 @@ func (s *rpcServer) Start() error {
// disconnect the broker
config.Broker.Disconnect()
s.Lock()
// swap back address
s.Lock()
s.opts.Address = addr
s.Unlock()
}()
// TODO: subscribe to cruft
if err := config.Broker.Connect(); err != nil {
return err
}
log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address())
return nil
}

View File

@@ -21,8 +21,6 @@ type Server interface {
NewHandler(interface{}, ...HandlerOption) Handler
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
Subscribe(Subscriber) error
Register() error
Deregister() error
Start() error
Stop() error
String() string
@@ -53,6 +51,8 @@ type Request interface {
ContentType() string
// Header of the request
Header() map[string]string
// Body is the initial decoded value
Body() interface{}
// Read the undecoded request body
Read() ([]byte, error)
// The encoded message stream
@@ -114,10 +114,6 @@ type Subscriber interface {
type Option func(*Options)
type HandlerOption func(*HandlerOptions)
type SubscriberOption func(*SubscriberOptions)
var (
DefaultAddress = ":0"
DefaultName = "go-server"
@@ -177,16 +173,6 @@ func Subscribe(s Subscriber) error {
return DefaultServer.Subscribe(s)
}
// Register registers the default server with the discovery system
func Register() error {
return DefaultServer.Register()
}
// Deregister deregisters the default server from the discovery system
func Deregister() error {
return DefaultServer.Deregister()
}
// Run starts the default server and waits for a kill
// signal before exiting. Also registers/deregisters the server
func Run() error {
@@ -194,18 +180,10 @@ func Run() error {
return err
}
if err := DefaultServer.Register(); err != nil {
return err
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
log.Logf("Received signal %s", <-ch)
if err := DefaultServer.Deregister(); err != nil {
return err
}
return Stop()
}

View File

@@ -5,10 +5,7 @@ import (
"os/signal"
"sync"
"syscall"
"time"
"github.com/micro/cli"
"github.com/micro/go-log"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/metadata"
@@ -36,27 +33,6 @@ func newService(opts ...Option) Service {
}
}
func (s *service) run(exit chan bool) {
if s.opts.RegisterInterval <= time.Duration(0) {
return
}
t := time.NewTicker(s.opts.RegisterInterval)
for {
select {
case <-t.C:
err := s.opts.Server.Register()
if err != nil {
log.Log("service run Server.Register error: ", err)
}
case <-exit:
t.Stop()
return
}
}
}
// Init initialises options. Additionally it calls cmd.Init
// which parses command line flags. cmd.Init is only called
// on first Init.
@@ -67,20 +43,6 @@ func (s *service) Init(opts ...Option) {
}
s.once.Do(func() {
// save user action
action := s.opts.Cmd.App().Action
// set service action
s.opts.Cmd.App().Action = func(c *cli.Context) {
// set register interval
if i := time.Duration(c.GlobalInt("register_interval")); i > 0 {
s.opts.RegisterInterval = i * time.Second
}
// user action
action(c)
}
// Initialise the command flags, overriding new service
_ = s.opts.Cmd.Init(
cmd.Broker(&s.opts.Broker),
@@ -105,7 +67,7 @@ func (s *service) Server() server.Server {
}
func (s *service) String() string {
return "go-micro"
return "micro"
}
func (s *service) Start() error {
@@ -119,10 +81,6 @@ func (s *service) Start() error {
return err
}
if err := s.opts.Server.Register(); err != nil {
return err
}
for _, fn := range s.opts.AfterStart {
if err := fn(); err != nil {
return err
@@ -141,10 +99,6 @@ func (s *service) Stop() error {
}
}
if err := s.opts.Server.Deregister(); err != nil {
return err
}
if err := s.opts.Server.Stop(); err != nil {
return err
}
@@ -163,10 +117,6 @@ func (s *service) Run() error {
return err
}
// start reg loop
ex := make(chan bool)
go s.run(ex)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
@@ -177,8 +127,5 @@ func (s *service) Run() error {
case <-s.opts.Context.Done():
}
// exit reg loop
close(ex)
return s.Stop()
}

View File

@@ -13,11 +13,11 @@ import (
"sync"
"time"
"github.com/micro/h2c"
maddr "github.com/micro/util/go/lib/addr"
mnet "github.com/micro/util/go/lib/net"
mls "github.com/micro/util/go/lib/tls"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
type buffer struct {
@@ -35,7 +35,7 @@ type httpTransportClient struct {
dialOpts DialOptions
once sync.Once
sync.Mutex
sync.RWMutex
r chan *http.Request
bl []*http.Request
buff *bufio.Reader
@@ -133,11 +133,11 @@ func (h *httpTransportClient) Recv(m *Message) error {
r = rc
}
h.Lock()
defer h.Unlock()
h.RLock()
if h.buff == nil {
return io.EOF
}
h.RUnlock()
// set timeout if its greater than 0
if h.ht.opts.Timeout > time.Duration(0) {
@@ -424,10 +424,7 @@ func (h *httpTransportListener) Accept(fn func(Socket)) error {
// insecure connection use h2c
if !(h.ht.opts.Secure || h.ht.opts.TLSConfig != nil) {
srv.Handler = &h2c.HandlerH2C{
Handler: mux,
H2Server: &http2.Server{},
}
srv.Handler = h2c.NewHandler(mux, &http2.Server{})
}
// begin serving