Compare commits

..

23 Commits

Author SHA1 Message Date
Asim Aslam
107b571019 Add go mod 2019-01-30 11:43:40 +00:00
Asim Aslam
89c8e1f4a7 update readme 2019-01-29 09:20:34 +00:00
Asim Aslam
a06cd72337 update image 2019-01-29 09:08:14 +00:00
Asim Aslam
e22fa01935 fix ticker 2019-01-24 16:08:04 +00:00
Asim Aslam
a5015692e3 Merge pull request #400 from micro/interval
Move RegisterInterval into the server
2019-01-24 13:55:05 +00:00
Asim Aslam
539b8c1a3b Move RegisterInterval into the server 2019-01-24 13:22:17 +00:00
Asim Aslam
67a738b504 Merge pull request #399 from unistack-org/master
add context to SubscriberOptions
2019-01-24 13:11:33 +00:00
ac1afea7fc add context to server.SubscriberOptions and broker.SubscribeOption
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2019-01-24 15:36:01 +03:00
Asim Aslam
8090f9968d Update headers to remove X- prefix 2019-01-24 10:11:02 +00:00
Asim Aslam
7542aafd29 Update package comment 2019-01-23 18:15:17 +00:00
Asim Aslam
13de868b21 Rename 2019-01-23 18:14:36 +00:00
Asim Aslam
d090a97a3d Merge pull request #396 from micro/error
Fix #394 invalid error handling in rpc_router ServeRequest
2019-01-22 14:28:41 +00:00
Asim Aslam
8a0d5f0489 log if we can't even respond 2019-01-22 13:55:04 +00:00
Asim Aslam
2ed676acf4 handle errors differently 2019-01-22 13:52:18 +00:00
Asim Aslam
d8ba18deff change logging 2019-01-22 12:18:33 +00:00
Asim Aslam
1321782785 in case of reload return nil 2019-01-19 10:20:16 +00:00
Asim Aslam
48b80dd051 replace memory registry 2019-01-18 17:29:17 +00:00
Asim Aslam
943219f203 Merge pull request #393 from micro/legacy
Evolution of Codecs and Methods
2019-01-18 12:40:05 +00:00
Asim Aslam
6468733d98 Use protocol from node metadata 2019-01-18 12:30:39 +00:00
Asim Aslam
9bd32645be Account for old target 2019-01-18 10:43:41 +00:00
Asim Aslam
f41be53ff8 Add ability to process legacy requests 2019-01-18 10:23:36 +00:00
Asim Aslam
2cd2258731 For the legacy 2019-01-18 10:12:57 +00:00
Asim Aslam
9ce9977d21 Don't read unless we have b 2019-01-17 12:09:04 +00:00
38 changed files with 820 additions and 333 deletions

View File

@@ -1,6 +1,6 @@
# Go Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![GoDoc](https://godoc.org/github.com/micro/go-micro?status.svg)](https://godoc.org/github.com/micro/go-micro) [![Travis CI](https://api.travis-ci.org/micro/go-micro.svg?branch=master)](https://travis-ci.org/micro/go-micro) [![Go Report Card](https://goreportcard.com/badge/micro/go-micro)](https://goreportcard.com/report/github.com/micro/go-micro)
Go Micro is a pluggable framework for micro service development.
Go Micro is a framework for micro service development.
## Overview
@@ -8,7 +8,7 @@ Go Micro provides the core requirements for distributed systems development incl
The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly
but everything can be easily swapped out.
<img src="https://micro.mu/docs/images/go-micro.png" />
<img src="https://micro.mu/docs/images/go-micro.svg" />
Plugins are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).

View File

@@ -19,7 +19,6 @@ import (
"time"
"github.com/google/uuid"
"github.com/micro/go-log"
"github.com/micro/go-micro/codec/json"
merr "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
@@ -397,7 +396,6 @@ func (h *httpBroker) Connect() error {
return err
}
log.Logf("Broker Listening on %s", l.Addr().String())
addr := h.address
h.address = l.Addr().String()

View File

@@ -111,3 +111,10 @@ func TLSConfig(t *tls.Config) Option {
o.TLSConfig = t
}
}
// SubscribeContext set context
func SubscribeContext(ctx context.Context) SubscribeOption {
return func(o *SubscribeOptions) {
o.Context = ctx
}
}

View File

@@ -38,7 +38,9 @@ type Message interface {
type Request interface {
// The service to call
Service() string
// The endpoint to call
// The action to take
Method() string
// The endpoint to invoke
Endpoint() string
// The content type
ContentType() string

View File

@@ -4,10 +4,11 @@ import (
"bytes"
"context"
"fmt"
"net"
"strconv"
"sync"
"time"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/broker"
@@ -56,7 +57,12 @@ func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
}
func (r *rpcClient) call(ctx context.Context, address string, req Request, resp interface{}, opts CallOptions) error {
func (r *rpcClient) call(ctx context.Context, node *registry.Node, req Request, resp interface{}, opts CallOptions) error {
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}
msg := &transport.Message{
Header: make(map[string]string),
}
@@ -75,9 +81,16 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
// set the accept header
msg.Header["Accept"] = req.ContentType()
cf, err := r.newCodec(req.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
// setup old protocol
cf := setupProtocol(msg, node)
// no codec specified
if cf == nil {
var err error
cf, err = r.newCodec(req.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
}
var grr error
@@ -144,7 +157,12 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp
}
}
func (r *rpcClient) stream(ctx context.Context, address string, req Request, opts CallOptions) (Stream, error) {
func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) {
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}
msg := &transport.Message{
Header: make(map[string]string),
}
@@ -163,9 +181,16 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request, opt
// set the accept header
msg.Header["Accept"] = req.ContentType()
cf, err := r.newCodec(req.ContentType())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
// set old codecs
cf := setupProtocol(msg, node)
// no codec specified
if cf == nil {
var err error
cf, err = r.newCodec(req.ContentType())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
}
dOpts := []transport.DialOption{
@@ -245,9 +270,19 @@ func (r *rpcClient) Options() Options {
func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, error) {
// return remote address
if len(opts.Address) > 0 {
address := opts.Address
port := 0
host, sport, err := net.SplitHostPort(opts.Address)
if err == nil {
address = host
port, _ = strconv.Atoi(sport)
}
return func() (*registry.Node, error) {
return &registry.Node{
Address: opts.Address,
Address: address,
Port: port,
}, nil
}, nil
}
@@ -323,14 +358,8 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
return errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
// set the address
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}
// make the call
err = rcall(ctx, address, request, response, callOpts)
err = rcall(ctx, node, request, response, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
return err
}
@@ -406,12 +435,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
return nil, errors.InternalServerError("go.micro.client", "error getting next %s node: %v", request.Service(), err.Error())
}
address := node.Address
if node.Port > 0 {
address = fmt.Sprintf("%s:%d", address, node.Port)
}
stream, err := r.stream(ctx, address, request, callOpts)
stream, err := r.stream(ctx, node, request, callOpts)
r.opts.Selector.Mark(request.Service(), node, err)
return stream, err
}
@@ -463,8 +487,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
id := uuid.New().String()
md["Content-Type"] = msg.ContentType()
md["X-Micro-Topic"] = msg.Topic()
md["X-Micro-Id"] = id
md["Micro-Topic"] = msg.Topic()
md["Micro-Id"] = id
// encode message body
cf, err := r.newCodec(msg.ContentType())
@@ -476,8 +500,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
Target: msg.Topic(),
Type: codec.Publication,
Header: map[string]string{
"X-Micro-Id": id,
"X-Micro-Topic": msg.Topic(),
"Micro-Id": id,
"Micro-Topic": msg.Topic(),
},
}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())

View File

@@ -21,10 +21,11 @@ func TestCallAddress(t *testing.T) {
var called bool
service := "test.service"
endpoint := "Test.Endpoint"
address := "10.1.10.1:8080"
address := "10.1.10.1"
port := 8080
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
called = true
if req.Service() != service {
@@ -35,8 +36,12 @@ func TestCallAddress(t *testing.T) {
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
}
if addr != address {
return fmt.Errorf("expected address: %s got %s", address, addr)
if node.Address != address {
return fmt.Errorf("expected address: %s got %s", address, node.Address)
}
if node.Port != port {
return fmt.Errorf("expected address: %d got %d", port, node.Port)
}
// don't do the call
@@ -54,7 +59,7 @@ func TestCallAddress(t *testing.T) {
req := c.NewRequest(service, endpoint, nil)
// test calling remote address
if err := c.Call(context.Background(), req, nil, WithAddress(address)); err != nil {
if err := c.Call(context.Background(), req, nil, WithAddress(fmt.Sprintf("%s:%d", address, port))); err != nil {
t.Fatal("call with address error", err)
}
@@ -67,12 +72,12 @@ func TestCallAddress(t *testing.T) {
func TestCallRetry(t *testing.T) {
service := "test.service"
endpoint := "Test.Endpoint"
address := "10.1.10.1:8080"
address := "10.1.10.1"
var called int
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
called++
if called == 1 {
return errors.InternalServerError("test.error", "retry request")
@@ -108,12 +113,11 @@ func TestCallWrapper(t *testing.T) {
id := "test.1"
service := "test.service"
endpoint := "Test.Endpoint"
host := "10.1.10.1"
address := "10.1.10.1"
port := 8080
address := "10.1.10.1:8080"
wrap := func(cf CallFunc) CallFunc {
return func(ctx context.Context, addr string, req Request, rsp interface{}, opts CallOptions) error {
return func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error {
called = true
if req.Service() != service {
@@ -124,8 +128,8 @@ func TestCallWrapper(t *testing.T) {
return fmt.Errorf("expected service: %s got %s", endpoint, req.Endpoint())
}
if addr != address {
return fmt.Errorf("expected address: %s got %s", address, addr)
if node.Address != address {
return fmt.Errorf("expected address: %s got %s", address, node.Address)
}
// don't do the call
@@ -146,7 +150,7 @@ func TestCallWrapper(t *testing.T) {
Nodes: []*registry.Node{
&registry.Node{
Id: id,
Address: host,
Address: address,
Port: port,
},
},

View File

@@ -12,6 +12,7 @@ import (
"github.com/micro/go-micro/codec/proto"
"github.com/micro/go-micro/codec/protorpc"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/transport"
)
@@ -58,6 +59,15 @@ var (
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": raw.NewCodec,
}
// TODO: remove legacy codec list
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
)
func (rwc *readWriteCloser) Read(p []byte) (n int, err error) {
@@ -74,6 +84,68 @@ func (rwc *readWriteCloser) Close() error {
return nil
}
func getHeaders(m *codec.Message) {
get := func(hdr string) string {
if hd := m.Header[hdr]; len(hd) > 0 {
return hd
}
// old
return m.Header["X-"+hdr]
}
// check error in header
if len(m.Error) == 0 {
m.Error = get("Micro-Error")
}
// check endpoint in header
if len(m.Endpoint) == 0 {
m.Endpoint = get("Micro-Endpoint")
}
// check method in header
if len(m.Method) == 0 {
m.Method = get("Micro-Method")
}
if len(m.Id) == 0 {
m.Id = get("Micro-Id")
}
}
func setHeaders(m *codec.Message) {
set := func(hdr, v string) {
m.Header[hdr] = v
m.Header["X-"+hdr] = v
}
set("Micro-Id", m.Id)
set("Micro-Service", m.Target)
set("Micro-Method", m.Method)
set("Micro-Endpoint", m.Endpoint)
}
// setupProtocol sets up the old protocol
func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
protocol := node.Metadata["protocol"]
// got protocol
if len(protocol) > 0 {
return nil
}
// no protocol use old codecs
switch msg.Header["Content-Type"] {
case "application/json":
msg.Header["Content-Type"] = "application/json-rpc"
case "application/protobuf":
msg.Header["Content-Type"] = "application/proto-rpc"
}
// now return codec
return defaultCodecs[msg.Header["Content-Type"]]
}
func newRpcCodec(req *transport.Message, client transport.Client, c codec.NewCodec) codec.Codec {
rwc := &readWriteCloser{
wbuf: bytes.NewBuffer(nil),
@@ -102,9 +174,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
}
// set the mucp headers
m.Header["X-Micro-Id"] = m.Id
m.Header["X-Micro-Service"] = m.Target
m.Header["X-Micro-Endpoint"] = m.Endpoint
setHeaders(m)
// if body is bytes Frame don't encode
if body != nil {
@@ -139,37 +209,25 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
return nil
}
func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error {
var m transport.Message
if err := c.client.Recv(&m); err != nil {
func (c *rpcCodec) ReadHeader(m *codec.Message, r codec.MessageType) error {
var tm transport.Message
// read message from transport
if err := c.client.Recv(&tm); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error())
}
c.buf.rbuf.Reset()
c.buf.rbuf.Write(m.Body)
var me codec.Message
// set headers
me.Header = m.Header
c.buf.rbuf.Reset()
c.buf.rbuf.Write(tm.Body)
// set headers from transport
m.Header = tm.Header
// read header
err := c.codec.ReadHeader(&me, r)
wm.Endpoint = me.Endpoint
wm.Id = me.Id
wm.Error = me.Error
err := c.codec.ReadHeader(m, r)
// check error in header
if len(me.Error) == 0 {
wm.Error = me.Header["X-Micro-Error"]
}
// check method in header
if len(me.Endpoint) == 0 {
wm.Endpoint = me.Header["X-Micro-Endpoint"]
}
if len(me.Id) == 0 {
wm.Id = me.Header["X-Micro-Id"]
}
// get headers
getHeaders(m)
// return header error
if err != nil {

View File

@@ -6,6 +6,7 @@ import (
type rpcRequest struct {
service string
method string
endpoint string
contentType string
codec codec.Codec
@@ -27,6 +28,7 @@ func newRequest(service, endpoint string, request interface{}, contentType strin
return &rpcRequest{
service: service,
method: endpoint,
endpoint: endpoint,
body: request,
contentType: contentType,
@@ -42,6 +44,10 @@ func (r *rpcRequest) Service() string {
return r.service
}
func (r *rpcRequest) Method() string {
return r.method
}
func (r *rpcRequest) Endpoint() string {
return r.endpoint
}

View File

@@ -53,6 +53,7 @@ func (r *rpcStream) Send(msg interface{}) error {
req := codec.Message{
Id: r.id,
Target: r.request.Service(),
Method: r.request.Method(),
Endpoint: r.request.Endpoint(),
Type: codec.Request,
}

View File

@@ -2,10 +2,12 @@ package client
import (
"context"
"github.com/micro/go-micro/registry"
)
// CallFunc represents the individual call func
type CallFunc func(ctx context.Context, address string, req Request, rsp interface{}, opts CallOptions) error
type CallFunc func(ctx context.Context, node *registry.Node, req Request, rsp interface{}, opts CallOptions) error
// CallWrapper is a low level wrapper for the CallFunc
type CallWrapper func(CallFunc) CallFunc

View File

@@ -404,6 +404,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second))
}
if val := time.Duration(ctx.GlobalInt("register_interval")); val > 0 {
serverOpts = append(serverOpts, server.RegisterInterval(val*time.Second))
}
// client opts
if r := ctx.Int("client_retries"); r >= 0 {
clientOpts = append(clientOpts, client.Retries(r))

View File

@@ -53,6 +53,7 @@ type Message struct {
Id string
Type MessageType
Target string
Method string
Endpoint string
Error string

View File

@@ -29,8 +29,8 @@ func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
// service method
path := m.Header[":path"]
if len(path) == 0 || path[0] != '/' {
m.Target = m.Header["X-Micro-Service"]
m.Endpoint = m.Header["X-Micro-Endpoint"]
m.Target = m.Header["Micro-Service"]
m.Endpoint = m.Header["Micro-Endpoint"]
} else {
// [ , a.package.Foo, Bar]
parts := strings.Split(path, "/")

View File

@@ -45,9 +45,9 @@ func newClientCodec(conn io.ReadWriteCloser) *clientCodec {
func (c *clientCodec) Write(m *codec.Message, b interface{}) error {
c.Lock()
c.pending[m.Id] = m.Endpoint
c.pending[m.Id] = m.Method
c.Unlock()
c.req.Method = m.Endpoint
c.req.Method = m.Method
c.req.Params[0] = b
c.req.ID = m.Id
return c.enc.Encode(&c.req)
@@ -66,7 +66,7 @@ func (c *clientCodec) ReadHeader(m *codec.Message) error {
}
c.Lock()
m.Endpoint = c.pending[c.resp.ID]
m.Method = c.pending[c.resp.ID]
delete(c.pending, c.resp.ID)
c.Unlock()

View File

@@ -53,7 +53,7 @@ func (c *serverCodec) ReadHeader(m *codec.Message) error {
if err := c.dec.Decode(&c.req); err != nil {
return err
}
m.Endpoint = c.req.Method
m.Method = c.req.Method
m.Id = fmt.Sprintf("%v", c.req.ID)
c.req.ID = nil
return nil

View File

@@ -18,13 +18,13 @@ func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
}
func (c *Codec) ReadBody(b interface{}) error {
if b == nil {
return nil
}
buf, err := ioutil.ReadAll(c.Conn)
if err != nil {
return err
}
if b == nil {
return nil
}
return proto.Unmarshal(buf, b.(proto.Message))
}

View File

@@ -47,7 +47,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
c.Lock()
defer c.Unlock()
// This is protobuf, of course we copy it.
pbr := &Request{ServiceMethod: &m.Endpoint, Seq: id(m.Id)}
pbr := &Request{ServiceMethod: &m.Method, Seq: id(m.Id)}
data, err := proto.Marshal(pbr)
if err != nil {
return err
@@ -73,7 +73,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error {
case codec.Response, codec.Error:
c.Lock()
defer c.Unlock()
rtmp := &Response{ServiceMethod: &m.Endpoint, Seq: id(m.Id), Error: &m.Error}
rtmp := &Response{ServiceMethod: &m.Method, Seq: id(m.Id), Error: &m.Error}
data, err := proto.Marshal(rtmp)
if err != nil {
return err
@@ -126,7 +126,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
if err != nil {
return err
}
m.Endpoint = rtmp.GetServiceMethod()
m.Method = rtmp.GetServiceMethod()
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
case codec.Response:
data, err := ReadNetString(c.rwc)
@@ -138,7 +138,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error {
if err != nil {
return err
}
m.Endpoint = rtmp.GetServiceMethod()
m.Method = rtmp.GetServiceMethod()
m.Id = fmt.Sprintf("%d", rtmp.GetSeq())
m.Error = rtmp.GetError()
case codec.Publication:

17
go.mod Normal file
View File

@@ -0,0 +1,17 @@
module github.com/micro/go-micro
require (
github.com/golang/protobuf v1.2.0
github.com/google/uuid v1.1.0
github.com/hashicorp/consul v1.4.2
github.com/hashicorp/memberlist v0.1.3
github.com/micro/cli v0.1.0
github.com/micro/go-log v0.1.0
github.com/micro/go-rcache v0.1.0
github.com/micro/h2c v1.0.0
github.com/micro/mdns v0.1.0
github.com/micro/util v0.1.0
github.com/mitchellh/hashstructure v1.0.0
github.com/pkg/errors v0.8.1
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3
)

101
go.sum Normal file
View File

@@ -0,0 +1,101 @@
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/go-log/log v0.1.0 h1:wudGTNsiGzrD5ZjgIkVZ517ugi2XRe9Q/xRCzwEO4/U=
github.com/go-log/log v0.1.0/go.mod h1:4mBwpdRMFLiuXZDCwU2lKQFsoSCo72j3HqBK9d81N2M=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/uuid v1.1.0 h1:Jf4mxPC/ziBnoPIdpQdPJ9OeiomAUHLvxmPRSPH9m4s=
github.com/google/uuid v1.1.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/consul v1.4.2 h1:D9iJoJb8Ehe/Zmr+UEE3U3FjOLZ4LUxqFMl4O43BM1U=
github.com/hashicorp/consul v1.4.2/go.mod h1:mFrjN1mfidgJfYP1xrJCF+AfRhr6Eaqhb2+sfyn/OOI=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.0 h1:wvCrVc9TjDls6+YGAF2hAifE1E5U1+b4tH6KdvN3Gig=
github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3 h1:zKjpN5BK/P5lMYrLmBHdBULWbJ0XpYR+7NGzqkZzoD4=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-rootcerts v1.0.0 h1:ueI78wUjYExhCvMLow4icJnayNNFRgy0d9EGs/a1T44=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0 h1:GeH6tui99pF4NJgfnhp+L6+FfobzVW3Ah46sLo0ICXs=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
github.com/hashicorp/memberlist v0.1.3 h1:EmmoJme1matNzb+hMpDuR/0sbJSUisxyqBGG676r31M=
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/micro/cli v0.0.0-20181223203424-1b0c9793c300/go.mod h1:x9x6qy+tXv17jzYWQup462+j3SIUgDa6vVTzU4IXy/w=
github.com/micro/cli v0.1.0 h1:5DT+QdbAPPQvB3gYTgwze7tFO1m+7DU1sz9XfQczbsc=
github.com/micro/cli v0.1.0/go.mod h1:jRT9gmfVKWSS6pkKcXQ8YhUyj6bzwxK8Fp5b0Y7qNnk=
github.com/micro/go-log v0.1.0 h1:szYSR+yyTsomZM2jyinJC5562DlqffSjHmTZFaeZ2vY=
github.com/micro/go-log v0.1.0/go.mod h1:qaFBF6d6Jk01Gz4cbMkCA2vVuLk3FSaLLjmEGrMCreA=
github.com/micro/go-micro v0.23.0/go.mod h1:3z3lfMkNU9Sr1L/CxL++8pVJmQapRo0N6kNjwYDtOVs=
github.com/micro/go-rcache v0.1.0 h1:YTIgANVHgBe1XOQ/yLICL+s2gbZCAdW+c2ckhekjkuc=
github.com/micro/go-rcache v0.1.0/go.mod h1:INzyZjXO5M+PmN2A33YxD4TaOY61xjFIM4CfSHv+At8=
github.com/micro/h2c v1.0.0 h1:ejw6MS5+WaUoMHRtqkVCCrrVzLMzOFEH52rEyd8Fl2I=
github.com/micro/h2c v1.0.0/go.mod h1:54sOOQW/GRlHhH43vKwOhUb+kHaXhVxR0d3CJhn9alE=
github.com/micro/mdns v0.0.0-20181201230301-9c3770d4057a/go.mod h1:SQG6o/94RinohLuB5noHSevg2Iqg2wXLDUn4lj2LWWo=
github.com/micro/mdns v0.1.0 h1:fuLybUsfynbigJmCot/54i+gwe0hpc/vtCMvWt2WfDI=
github.com/micro/mdns v0.1.0/go.mod h1:KJ0dW7KmicXU2BV++qkLlmHYcVv7/hHnbtguSWt9Aoc=
github.com/micro/util v0.1.0 h1:ghhF5KKRNlKMexzK+cWo6W6uRAZdKy1UKG/9O74NCYc=
github.com/micro/util v0.1.0/go.mod h1:MZgOs0nwxzv9k4xQo4fpF9IwZGF2O96F5/phP9X4/Sw=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.3 h1:1g0r1IvskvgL8rR+AcHzUA+oFmGcQlaIm4IqakufeMM=
github.com/miekg/dns v1.1.3/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0 h1:vKb8ShqSby24Yrqr/yDYkuFz8d0WUjys40rvnGC8aR0=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/hashstructure v1.0.0 h1:ZkRJX1CyOoTkar7p/mLS5TZU4nJ1Rn/F8u9dGS02Q3Y=
github.com/mitchellh/hashstructure v1.0.0/go.mod h1:QjSHrPWS+BGUVBYkbTZWEnOh3G1DutKwClXU/ABz6AQ=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664 h1:YbZJ76lQ1BqNhVe7dKTSB67wDrc2VPRR75IyGyyPDX8=
golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 h1:ulvT7fqt0yHWzpJwI57MezWnYDVpCAYBVuYst/L+fAY=
golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc h1:WiYx1rIFmx8c0mXAFtv5D/mHyKe1+jmuP7PViuwqwuQ=
golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@@ -1,4 +1,4 @@
// Package micro is a pluggable RPC framework for microservices
// Package micro is a pluggable framework for microservices
package micro
import (
@@ -42,7 +42,7 @@ type Publisher interface {
type Option func(*Options)
var (
HeaderPrefix = "X-Micro-"
HeaderPrefix = "Micro-"
)
// NewService creates and returns a new Service based on the packages within.

View File

Before

Width:  |  Height:  |  Size: 25 KiB

After

Width:  |  Height:  |  Size: 25 KiB

View File

@@ -22,9 +22,6 @@ type Options struct {
Registry registry.Registry
Transport transport.Transport
// Register loop interval
RegisterInterval time.Duration
// Before and After funcs
BeforeStart []func() error
BeforeStop []func() error
@@ -168,7 +165,7 @@ func RegisterTTL(t time.Duration) Option {
// RegisterInterval specifies the interval on which to re-register
func RegisterInterval(t time.Duration) Option {
return func(o *Options) {
o.RegisterInterval = t
o.Server.Init(server.RegisterInterval(t))
}
}

51
registry/memory/data.go Normal file
View File

@@ -0,0 +1,51 @@
package memory
import (
"github.com/micro/go-micro/registry"
)
var (
// mock data
Data = map[string][]*registry.Service{
"foo": []*registry.Service{
{
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "foo-1.0.0-123",
Address: "localhost",
Port: 9999,
},
{
Id: "foo-1.0.0-321",
Address: "localhost",
Port: 9999,
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
{
Id: "foo-1.0.1-321",
Address: "localhost",
Port: 6666,
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*registry.Node{
{
Id: "foo-1.0.3-345",
Address: "localhost",
Port: 8888,
},
},
},
},
}
)

View File

@@ -2,60 +2,24 @@
package memory
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/registry"
)
type Registry struct {
options registry.Options
sync.RWMutex
Services map[string][]*registry.Service
Watchers map[string]*Watcher
}
var (
// mock data
Data = map[string][]*registry.Service{
"foo": []*registry.Service{
{
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "foo-1.0.0-123",
Address: "localhost",
Port: 9999,
},
{
Id: "foo-1.0.0-321",
Address: "localhost",
Port: 9999,
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
{
Id: "foo-1.0.1-321",
Address: "localhost",
Port: 6666,
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*registry.Node{
{
Id: "foo-1.0.3-345",
Address: "localhost",
Port: 8888,
},
},
},
},
}
timeout = time.Millisecond * 10
)
// Setup sets mock data
@@ -67,69 +31,130 @@ func (m *Registry) Setup() {
m.Services = Data
}
func (m *Registry) GetService(service string) ([]*registry.Service, error) {
m.Lock()
defer m.Unlock()
func (m *Registry) watch(r *registry.Result) {
var watchers []*Watcher
m.RLock()
for _, w := range m.Watchers {
watchers = append(watchers, w)
}
m.RUnlock()
for _, w := range watchers {
select {
case <-w.exit:
m.Lock()
delete(m.Watchers, w.id)
m.Unlock()
default:
select {
case w.res <- r:
case <-time.After(timeout):
}
}
}
}
func (m *Registry) Init(opts ...registry.Option) error {
for _, o := range opts {
o(&m.options)
}
// add services
m.Lock()
for k, v := range getServices(m.options.Context) {
s := m.Services[k]
m.Services[k] = addServices(s, v)
}
m.Unlock()
return nil
}
func (m *Registry) Options() registry.Options {
return m.options
}
func (m *Registry) GetService(service string) ([]*registry.Service, error) {
m.RLock()
s, ok := m.Services[service]
if !ok || len(s) == 0 {
m.RUnlock()
return nil, registry.ErrNotFound
}
m.RUnlock()
return s, nil
}
func (m *Registry) ListServices() ([]*registry.Service, error) {
m.Lock()
defer m.Unlock()
m.RLock()
var services []*registry.Service
for _, service := range m.Services {
services = append(services, service...)
}
m.RUnlock()
return services, nil
}
func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
m.Lock()
defer m.Unlock()
go m.watch(&registry.Result{Action: "update", Service: s})
m.Lock()
services := addServices(m.Services[s.Name], []*registry.Service{s})
m.Services[s.Name] = services
m.Unlock()
return nil
}
func (m *Registry) Deregister(s *registry.Service) error {
m.Lock()
defer m.Unlock()
go m.watch(&registry.Result{Action: "delete", Service: s})
m.Lock()
services := delServices(m.Services[s.Name], []*registry.Service{s})
m.Services[s.Name] = services
m.Unlock()
return nil
}
func (m *Registry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
var wopts registry.WatchOptions
var wo registry.WatchOptions
for _, o := range opts {
o(&wopts)
o(&wo)
}
return &memoryWatcher{exit: make(chan bool), opts: wopts}, nil
w := &Watcher{
exit: make(chan bool),
res: make(chan *registry.Result),
id: uuid.New().String(),
wo: wo,
}
m.Lock()
m.Watchers[w.id] = w
m.Unlock()
return w, nil
}
func (m *Registry) String() string {
return "memory"
}
func (m *Registry) Init(opts ...registry.Option) error {
return nil
}
func (m *Registry) Options() registry.Options {
return registry.Options{}
}
func NewRegistry(opts ...registry.Option) registry.Registry {
options := registry.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
services := getServices(options.Context)
if services == nil {
services = make(map[string][]*registry.Service)
}
return &Registry{
Services: make(map[string][]*registry.Service),
options: options,
Services: services,
Watchers: make(map[string]*Watcher),
}
}

View File

@@ -80,9 +80,8 @@ var (
}
)
func TestMockRegistry(t *testing.T) {
func TestMemoryRegistry(t *testing.T) {
m := NewRegistry()
m.(*Registry).Setup()
fn := func(k string, v []*registry.Service) {
services, err := m.GetService(k)
@@ -108,11 +107,6 @@ func TestMockRegistry(t *testing.T) {
}
}
// test existing memory data
for k, v := range Data {
fn(k, v)
}
// register data
for _, v := range testData {
for _, service := range v {
@@ -124,7 +118,6 @@ func TestMockRegistry(t *testing.T) {
// using test data
for k, v := range testData {
fn(k, v)
}

View File

@@ -0,0 +1,27 @@
package memory
import (
"context"
"github.com/micro/go-micro/registry"
)
type servicesKey struct{}
func getServices(ctx context.Context) map[string][]*registry.Service {
s, ok := ctx.Value(servicesKey{}).(map[string][]*registry.Service)
if !ok {
return nil
}
return s
}
// Services is an option that preloads service data
func Services(s map[string][]*registry.Service) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, servicesKey{}, s)
}
}

View File

@@ -0,0 +1,37 @@
package memory
import (
"errors"
"github.com/micro/go-micro/registry"
)
type Watcher struct {
id string
wo registry.WatchOptions
res chan *registry.Result
exit chan bool
}
func (m *Watcher) Next() (*registry.Result, error) {
for {
select {
case r := <-m.res:
if len(m.wo.Service) > 0 && m.wo.Service != r.Service.Name {
continue
}
return r, nil
case <-m.exit:
return nil, errors.New("watcher stopped")
}
}
}
func (m *Watcher) Stop() {
select {
case <-m.exit:
return
default:
close(m.exit)
}
}

View File

@@ -0,0 +1,30 @@
package memory
import (
"testing"
"github.com/micro/go-micro/registry"
)
func TestWatcher(t *testing.T) {
w := &Watcher{
id: "test",
res: make(chan *registry.Result),
exit: make(chan bool),
}
go func() {
w.res <- &registry.Result{}
}()
_, err := w.Next()
if err != nil {
t.Fatal("unexpected err", err)
}
w.Stop()
if _, err := w.Next(); err == nil {
t.Fatal("expected error on Next()")
}
}

View File

@@ -326,12 +326,16 @@ func (c *registrySelector) run(name string) {
func (c *registrySelector) watch(w registry.Watcher) error {
defer w.Stop()
// reload chan
reload := make(chan bool, 1)
// manage this loop
go func() {
// wait for exit or reload signal
select {
case <-c.exit:
case <-c.reload:
reload <- true
}
// stop the watcher
@@ -341,7 +345,12 @@ func (c *registrySelector) watch(w registry.Watcher) error {
for {
res, err := w.Next()
if err != nil {
return err
select {
case <-reload:
return nil
default:
return err
}
}
c.update(res)
}

View File

@@ -1,13 +1,20 @@
package server
import "context"
type HandlerOption func(*HandlerOptions)
type HandlerOptions struct {
Internal bool
Metadata map[string]map[string]string
}
type SubscriberOption func(*SubscriberOptions)
type SubscriberOptions struct {
Queue string
Internal bool
Context context.Context
}
// EndpointMetadata is a Handler option that allows metadata to be added to
@@ -34,6 +41,17 @@ func InternalSubscriber(b bool) SubscriberOption {
o.Internal = b
}
}
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
opt := SubscriberOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&opt)
}
return opt
}
// Shared queue name distributed messages across subscribers
func SubscriberQueue(n string) SubscriberOption {
@@ -41,3 +59,10 @@ func SubscriberQueue(n string) SubscriberOption {
o.Queue = n
}
}
// SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberContext(ctx context.Context) SubscriberOption {
return func(o *SubscriberOptions) {
o.Context = ctx
}
}

View File

@@ -27,6 +27,8 @@ type Options struct {
// The register expiry time
RegisterTTL time.Duration
// The interval on which to register
RegisterInterval time.Duration
// The router for requests
Router Router
@@ -168,6 +170,13 @@ func RegisterTTL(t time.Duration) Option {
}
}
// Register the service with at interval
func RegisterInterval(t time.Duration) Option {
return func(o *Options) {
o.RegisterInterval = t
}
}
// WithRouter sets the request router
func WithRouter(r Router) Option {
return func(o *Options) {

View File

@@ -41,6 +41,15 @@ var (
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": raw.NewCodec,
}
// TODO: remove legacy codec list
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
)
func (rwc *readWriteCloser) Read(p []byte) (n int, err error) {
@@ -57,6 +66,92 @@ func (rwc *readWriteCloser) Close() error {
return nil
}
func getHeader(hdr string, md map[string]string) string {
if hd := md[hdr]; len(hd) > 0 {
return hd
}
return md["X-"+hdr]
}
func getHeaders(m *codec.Message) {
get := func(hdr, v string) string {
if len(v) > 0 {
return v
}
if hd := m.Header[hdr]; len(hd) > 0 {
return hd
}
// old
return m.Header["X-"+hdr]
}
m.Id = get("Micro-Id", m.Id)
m.Error = get("Micro-Error", m.Error)
m.Endpoint = get("Micro-Endpoint", m.Endpoint)
m.Method = get("Micro-Method", m.Method)
m.Target = get("Micro-Service", m.Target)
// TODO: remove this cruft
if len(m.Endpoint) == 0 {
m.Endpoint = m.Method
}
}
func setHeaders(m, r *codec.Message) {
set := func(hdr, v string) {
if len(v) == 0 {
return
}
m.Header[hdr] = v
m.Header["X-"+hdr] = v
}
// set headers
set("Micro-Id", r.Id)
set("Micro-Service", r.Target)
set("Micro-Method", r.Method)
set("Micro-Endpoint", r.Endpoint)
set("Micro-Error", r.Error)
}
// setupProtocol sets up the old protocol
func setupProtocol(msg *transport.Message) codec.NewCodec {
service := getHeader("Micro-Service", msg.Header)
method := getHeader("Micro-Method", msg.Header)
endpoint := getHeader("Micro-Endpoint", msg.Header)
protocol := getHeader("Micro-Protocol", msg.Header)
target := getHeader("Micro-Target", msg.Header)
// if the protocol exists (mucp) do nothing
if len(protocol) > 0 {
return nil
}
// if no service/method/endpoint then it's the old protocol
if len(service) == 0 && len(method) == 0 && len(endpoint) == 0 {
return defaultCodecs[msg.Header["Content-Type"]]
}
// old target method specified
if len(target) > 0 {
return defaultCodecs[msg.Header["Content-Type"]]
}
// no method then set to endpoint
if len(method) == 0 {
msg.Header["Micro-Method"] = endpoint
}
// no endpoint then set to method
if len(endpoint) == 0 {
msg.Header["Micro-Endpoint"] = method
}
return nil
}
func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) codec.Codec {
rwc := &readWriteCloser{
rbuf: bytes.NewBuffer(req.Body),
@@ -73,7 +168,7 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
}
func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
// the initieal message
// the initial message
m := codec.Message{
Header: c.req.Header,
Body: c.req.Body,
@@ -108,18 +203,17 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
c.first = false
// set some internal things
m.Target = m.Header["X-Micro-Service"]
m.Endpoint = m.Header["X-Micro-Endpoint"]
m.Id = m.Header["X-Micro-Id"]
getHeaders(&m)
// read header via codec
err := c.codec.ReadHeader(&m, codec.Request)
if err := c.codec.ReadHeader(&m, codec.Request); err != nil {
return err
}
// set the method/id
r.Endpoint = m.Endpoint
r.Id = m.Id
// set message
*r = m
return err
return nil
}
func (c *rpcCodec) ReadBody(b interface{}) error {
@@ -141,6 +235,8 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
// create a new message
m := &codec.Message{
Target: r.Target,
Method: r.Method,
Endpoint: r.Endpoint,
Id: r.Id,
Error: r.Error,
@@ -152,24 +248,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
m.Header = map[string]string{}
}
// set request id
if len(r.Id) > 0 {
m.Header["X-Micro-Id"] = r.Id
}
// set target
if len(r.Target) > 0 {
m.Header["X-Micro-Service"] = r.Target
}
// set request endpoint
if len(r.Endpoint) > 0 {
m.Header["X-Micro-Endpoint"] = r.Endpoint
}
if len(r.Error) > 0 {
m.Header["X-Micro-Error"] = r.Error
}
setHeaders(m, r)
// the body being sent
var body []byte
@@ -187,6 +266,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
// write an error if it failed
m.Error = errors.Wrapf(err, "Unable to encode body").Error()
m.Header["X-Micro-Error"] = m.Error
m.Header["Micro-Error"] = m.Error
// no body to write
if err := c.codec.Write(m, nil); err != nil {
return err

View File

@@ -7,6 +7,7 @@ import (
type rpcRequest struct {
service string
method string
endpoint string
contentType string
socket transport.Socket
@@ -34,6 +35,10 @@ func (r *rpcRequest) Service() string {
return r.service
}
func (r *rpcRequest) Method() string {
return r.method
}
func (r *rpcRequest) Endpoint() string {
return r.endpoint
}

View File

@@ -162,33 +162,30 @@ func prepareMethod(method reflect.Method) *methodType {
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
}
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, errmsg string, last bool) (err error) {
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, last bool) error {
msg := new(codec.Message)
msg.Type = codec.Response
resp := router.getResponse()
resp.msg = msg
// Encode the response header
resp.msg.Endpoint = req.msg.Endpoint
if errmsg != "" {
resp.msg.Error = errmsg
reply = invalidRequest
}
resp.msg.Id = req.msg.Id
sending.Lock()
err = cc.Write(resp.msg, reply)
err := cc.Write(resp.msg, reply)
sending.Unlock()
router.freeResponse(resp)
return err
}
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) {
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) error {
defer router.freeRequest(req)
function := mtype.method.Func
var returnValues []reflect.Value
r := &rpcRequest{
service: req.msg.Target,
contentType: req.msg.Header["Content-Type"],
method: req.msg.Method,
endpoint: req.msg.Endpoint,
body: req.msg.Body,
}
@@ -205,18 +202,13 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
return nil
}
errmsg := ""
err := fn(ctx, r, replyv.Interface())
if err != nil {
errmsg = err.Error()
// execute handler
if err := fn(ctx, r, replyv.Interface()); err != nil {
return err
}
err = router.sendResponse(sending, req, replyv.Interface(), cc, errmsg, true)
if err != nil {
log.Log("rpc call: unable to send response: ", err)
}
router.freeRequest(req)
return
// send response
return router.sendResponse(sending, req, replyv.Interface(), cc, true)
}
// declare a local error to see if we errored out already
@@ -249,16 +241,15 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
// client.Stream request
r.stream = true
errmsg := ""
// execute handler
if err := fn(ctx, r, stream); err != nil {
errmsg = err.Error()
return err
}
// this is the last packet, we don't do anything with
// the error here (well sendStreamResponse will log it
// already)
router.sendResponse(sending, req, nil, cc, errmsg, true)
router.freeRequest(req)
return router.sendResponse(sending, req, nil, cc, true)
}
func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
@@ -447,11 +438,9 @@ func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response)
}
// send a response if we actually managed to read a header.
if req != nil {
router.sendResponse(sending, req, invalidRequest, rsp.Codec(), err.Error(), true)
router.freeRequest(req)
}
return err
}
service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
return nil
return service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
}

View File

@@ -10,7 +10,7 @@ import (
"sync"
"time"
"github.com/micro/go-log"
log "github.com/micro/go-log"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/metadata"
@@ -97,25 +97,32 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
ct = DefaultContentType
}
// TODO: needs better error handling
cf, err := s.newCodec(ct)
if err != nil {
sock.Send(&transport.Message{
Header: map[string]string{
"Content-Type": "text/plain",
},
Body: []byte(err.Error()),
})
s.wg.Done()
return
// setup old protocol
cf := setupProtocol(&msg)
// no old codec
if cf == nil {
// TODO: needs better error handling
var err error
if cf, err = s.newCodec(ct); err != nil {
sock.Send(&transport.Message{
Header: map[string]string{
"Content-Type": "text/plain",
},
Body: []byte(err.Error()),
})
s.wg.Done()
return
}
}
rcodec := newRpcCodec(&msg, sock, cf)
// internal request
request := &rpcRequest{
service: msg.Header["X-Micro-Service"],
endpoint: msg.Header["X-Micro-Endpoint"],
service: getHeader("Micro-Service", msg.Header),
method: getHeader("Micro-Method", msg.Header),
endpoint: getHeader("Micro-Endpoint", msg.Header),
contentType: ct,
codec: rcodec,
header: msg.Header,
@@ -151,12 +158,15 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// TODO: handle error better
if err := handler(ctx, request, response); err != nil {
// write an error response
rcodec.Write(&codec.Message{
err = rcodec.Write(&codec.Message{
Header: msg.Header,
Error: err.Error(),
Type: codec.Error,
}, nil)
// could not write the error response
if err != nil {
log.Logf("rpc: unable to write error response: %v", err)
}
s.wg.Done()
return
}
@@ -276,6 +286,7 @@ func (s *rpcServer) Register() error {
node.Metadata["broker"] = config.Broker.String()
node.Metadata["server"] = s.String()
node.Metadata["registry"] = config.Registry.String()
node.Metadata["protocol"] = "mucp"
s.RLock()
// Maps are ordered randomly, sort the keys for consistency
@@ -346,6 +357,9 @@ func (s *rpcServer) Register() error {
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.Queue(queue))
}
if cx := sb.Options().Context; cx != nil {
opts = append(opts, broker.SubscribeContext(cx))
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil {
return err
@@ -425,36 +439,53 @@ func (s *rpcServer) Start() error {
registerDebugHandler(s)
config := s.Options()
// start listening on the transport
ts, err := config.Transport.Listen(config.Address)
if err != nil {
return err
}
log.Logf("Transport Listening on %s", ts.Addr())
s.Lock()
log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
// swap address
s.Lock()
addr := s.opts.Address
s.opts.Address = ts.Addr()
s.Unlock()
exit := make(chan bool, 1)
// connect to the broker
if err := config.Broker.Connect(); err != nil {
return err
}
log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address())
// announce self to the world
if err := s.Register(); err != nil {
log.Log("Server register error: ", err)
}
exit := make(chan bool)
go func() {
for {
// listen for connections
err := ts.Accept(s.ServeConn)
// check if we're supposed to exit
// TODO: listen for messages
// msg := broker.Exchange(service).Consume()
select {
// check if we're supposed to exit
case <-exit:
return
default:
}
// check the error and backoff
if err != nil {
log.Logf("Accept error: %v", err)
time.Sleep(time.Second)
continue
default:
if err != nil {
log.Logf("Accept error: %v", err)
time.Sleep(time.Second)
continue
}
}
// no error just exit
@@ -463,9 +494,37 @@ func (s *rpcServer) Start() error {
}()
go func() {
// wait for exit
ch := <-s.exit
exit <- true
t := new(time.Ticker)
// only process if it exists
if s.opts.RegisterInterval > time.Duration(0) {
// new ticker
t = time.NewTicker(s.opts.RegisterInterval)
}
// return error chan
var ch chan error
Loop:
for {
select {
// register self on interval
case <-t.C:
if err := s.Register(); err != nil {
log.Log("Server register error: ", err)
}
// wait for exit
case ch = <-s.exit:
t.Stop()
close(exit)
break Loop
}
}
// deregister self
if err := s.Deregister(); err != nil {
log.Log("Server deregister error: ", err)
}
// wait for requests to finish
if wait(s.opts.Context) {
@@ -478,14 +537,13 @@ func (s *rpcServer) Start() error {
// disconnect the broker
config.Broker.Disconnect()
s.Lock()
// swap back address
s.Lock()
s.opts.Address = addr
s.Unlock()
}()
// TODO: subscribe to cruft
return config.Broker.Connect()
return nil
}
func (s *rpcServer) Stop() error {

View File

@@ -31,6 +31,8 @@ func (r *rpcStream) Send(msg interface{}) error {
defer r.Unlock()
resp := codec.Message{
Target: r.request.Service(),
Method: r.request.Method(),
Endpoint: r.request.Endpoint(),
Id: r.id,
Type: codec.Response,

View File

@@ -21,8 +21,6 @@ type Server interface {
NewHandler(interface{}, ...HandlerOption) Handler
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
Subscribe(Subscriber) error
Register() error
Deregister() error
Start() error
Stop() error
String() string
@@ -45,6 +43,8 @@ type Message interface {
type Request interface {
// Service name requested
Service() string
// The action requested
Method() string
// Endpoint name requested
Endpoint() string
// Content type provided
@@ -112,10 +112,6 @@ type Subscriber interface {
type Option func(*Options)
type HandlerOption func(*HandlerOptions)
type SubscriberOption func(*SubscriberOptions)
var (
DefaultAddress = ":0"
DefaultName = "go-server"
@@ -175,16 +171,6 @@ func Subscribe(s Subscriber) error {
return DefaultServer.Subscribe(s)
}
// Register registers the default server with the discovery system
func Register() error {
return DefaultServer.Register()
}
// Deregister deregisters the default server from the discovery system
func Deregister() error {
return DefaultServer.Deregister()
}
// Run starts the default server and waits for a kill
// signal before exiting. Also registers/deregisters the server
func Run() error {
@@ -192,18 +178,10 @@ func Run() error {
return err
}
if err := DefaultServer.Register(); err != nil {
return err
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
log.Logf("Received signal %s", <-ch)
if err := DefaultServer.Deregister(); err != nil {
return err
}
return Stop()
}

View File

@@ -5,10 +5,7 @@ import (
"os/signal"
"sync"
"syscall"
"time"
"github.com/micro/cli"
"github.com/micro/go-log"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/metadata"
@@ -36,27 +33,6 @@ func newService(opts ...Option) Service {
}
}
func (s *service) run(exit chan bool) {
if s.opts.RegisterInterval <= time.Duration(0) {
return
}
t := time.NewTicker(s.opts.RegisterInterval)
for {
select {
case <-t.C:
err := s.opts.Server.Register()
if err != nil {
log.Log("service run Server.Register error: ", err)
}
case <-exit:
t.Stop()
return
}
}
}
// Init initialises options. Additionally it calls cmd.Init
// which parses command line flags. cmd.Init is only called
// on first Init.
@@ -67,20 +43,6 @@ func (s *service) Init(opts ...Option) {
}
s.once.Do(func() {
// save user action
action := s.opts.Cmd.App().Action
// set service action
s.opts.Cmd.App().Action = func(c *cli.Context) {
// set register interval
if i := time.Duration(c.GlobalInt("register_interval")); i > 0 {
s.opts.RegisterInterval = i * time.Second
}
// user action
action(c)
}
// Initialise the command flags, overriding new service
_ = s.opts.Cmd.Init(
cmd.Broker(&s.opts.Broker),
@@ -105,7 +67,7 @@ func (s *service) Server() server.Server {
}
func (s *service) String() string {
return "go-micro"
return "micro"
}
func (s *service) Start() error {
@@ -119,10 +81,6 @@ func (s *service) Start() error {
return err
}
if err := s.opts.Server.Register(); err != nil {
return err
}
for _, fn := range s.opts.AfterStart {
if err := fn(); err != nil {
return err
@@ -141,10 +99,6 @@ func (s *service) Stop() error {
}
}
if err := s.opts.Server.Deregister(); err != nil {
return err
}
if err := s.opts.Server.Stop(); err != nil {
return err
}
@@ -163,10 +117,6 @@ func (s *service) Run() error {
return err
}
// start reg loop
ex := make(chan bool)
go s.run(ex)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
@@ -177,8 +127,5 @@ func (s *service) Run() error {
case <-s.opts.Context.Done():
}
// exit reg loop
close(ex)
return s.Stop()
}