Compare commits

...

16 Commits

Author SHA1 Message Date
Asim Aslam
89c8e1f4a7 update readme 2019-01-29 09:20:34 +00:00
Asim Aslam
a06cd72337 update image 2019-01-29 09:08:14 +00:00
Asim Aslam
e22fa01935 fix ticker 2019-01-24 16:08:04 +00:00
Asim Aslam
a5015692e3 Merge pull request #400 from micro/interval
Move RegisterInterval into the server
2019-01-24 13:55:05 +00:00
Asim Aslam
539b8c1a3b Move RegisterInterval into the server 2019-01-24 13:22:17 +00:00
Asim Aslam
67a738b504 Merge pull request #399 from unistack-org/master
add context to SubscriberOptions
2019-01-24 13:11:33 +00:00
ac1afea7fc add context to server.SubscriberOptions and broker.SubscribeOption
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2019-01-24 15:36:01 +03:00
Asim Aslam
8090f9968d Update headers to remove X- prefix 2019-01-24 10:11:02 +00:00
Asim Aslam
7542aafd29 Update package comment 2019-01-23 18:15:17 +00:00
Asim Aslam
13de868b21 Rename 2019-01-23 18:14:36 +00:00
Asim Aslam
d090a97a3d Merge pull request #396 from micro/error
Fix #394 invalid error handling in rpc_router ServeRequest
2019-01-22 14:28:41 +00:00
Asim Aslam
8a0d5f0489 log if we can't even respond 2019-01-22 13:55:04 +00:00
Asim Aslam
2ed676acf4 handle errors differently 2019-01-22 13:52:18 +00:00
Asim Aslam
d8ba18deff change logging 2019-01-22 12:18:33 +00:00
Asim Aslam
1321782785 in case of reload return nil 2019-01-19 10:20:16 +00:00
Asim Aslam
48b80dd051 replace memory registry 2019-01-18 17:29:17 +00:00
24 changed files with 513 additions and 299 deletions

View File

@@ -1,6 +1,6 @@
# Go Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![GoDoc](https://godoc.org/github.com/micro/go-micro?status.svg)](https://godoc.org/github.com/micro/go-micro) [![Travis CI](https://api.travis-ci.org/micro/go-micro.svg?branch=master)](https://travis-ci.org/micro/go-micro) [![Go Report Card](https://goreportcard.com/badge/micro/go-micro)](https://goreportcard.com/report/github.com/micro/go-micro)
Go Micro is a pluggable framework for micro service development.
Go Micro is a framework for micro service development.
## Overview
@@ -8,7 +8,7 @@ Go Micro provides the core requirements for distributed systems development incl
The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly
but everything can be easily swapped out.
<img src="https://micro.mu/docs/images/go-micro.png" />
<img src="https://micro.mu/docs/images/go-micro.svg" />
Plugins are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).

View File

@@ -19,7 +19,6 @@ import (
"time"
"github.com/google/uuid"
"github.com/micro/go-log"
"github.com/micro/go-micro/codec/json"
merr "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry"
@@ -397,7 +396,6 @@ func (h *httpBroker) Connect() error {
return err
}
log.Logf("Broker Listening on %s", l.Addr().String())
addr := h.address
h.address = l.Addr().String()

View File

@@ -111,3 +111,10 @@ func TLSConfig(t *tls.Config) Option {
o.TLSConfig = t
}
}
// SubscribeContext set context
func SubscribeContext(ctx context.Context) SubscribeOption {
return func(o *SubscribeOptions) {
o.Context = ctx
}
}

View File

@@ -487,8 +487,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
id := uuid.New().String()
md["Content-Type"] = msg.ContentType()
md["X-Micro-Topic"] = msg.Topic()
md["X-Micro-Id"] = id
md["Micro-Topic"] = msg.Topic()
md["Micro-Id"] = id
// encode message body
cf, err := r.newCodec(msg.ContentType())
@@ -500,8 +500,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
Target: msg.Topic(),
Type: codec.Publication,
Header: map[string]string{
"X-Micro-Id": id,
"X-Micro-Topic": msg.Topic(),
"Micro-Id": id,
"Micro-Topic": msg.Topic(),
},
}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())

View File

@@ -84,6 +84,47 @@ func (rwc *readWriteCloser) Close() error {
return nil
}
func getHeaders(m *codec.Message) {
get := func(hdr string) string {
if hd := m.Header[hdr]; len(hd) > 0 {
return hd
}
// old
return m.Header["X-"+hdr]
}
// check error in header
if len(m.Error) == 0 {
m.Error = get("Micro-Error")
}
// check endpoint in header
if len(m.Endpoint) == 0 {
m.Endpoint = get("Micro-Endpoint")
}
// check method in header
if len(m.Method) == 0 {
m.Method = get("Micro-Method")
}
if len(m.Id) == 0 {
m.Id = get("Micro-Id")
}
}
func setHeaders(m *codec.Message) {
set := func(hdr, v string) {
m.Header[hdr] = v
m.Header["X-"+hdr] = v
}
set("Micro-Id", m.Id)
set("Micro-Service", m.Target)
set("Micro-Method", m.Method)
set("Micro-Endpoint", m.Endpoint)
}
// setupProtocol sets up the old protocol
func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
protocol := node.Metadata["protocol"]
@@ -133,10 +174,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
}
// set the mucp headers
m.Header["X-Micro-Id"] = m.Id
m.Header["X-Micro-Service"] = m.Target
m.Header["X-Micro-Method"] = m.Method
m.Header["X-Micro-Endpoint"] = m.Endpoint
setHeaders(m)
// if body is bytes Frame don't encode
if body != nil {
@@ -171,43 +209,25 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
return nil
}
func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error {
var m transport.Message
if err := c.client.Recv(&m); err != nil {
func (c *rpcCodec) ReadHeader(m *codec.Message, r codec.MessageType) error {
var tm transport.Message
// read message from transport
if err := c.client.Recv(&tm); err != nil {
return errors.InternalServerError("go.micro.client.transport", err.Error())
}
c.buf.rbuf.Reset()
c.buf.rbuf.Write(m.Body)
var me codec.Message
// set headers
me.Header = m.Header
c.buf.rbuf.Reset()
c.buf.rbuf.Write(tm.Body)
// set headers from transport
m.Header = tm.Header
// read header
err := c.codec.ReadHeader(&me, r)
wm.Endpoint = me.Endpoint
wm.Method = me.Method
wm.Id = me.Id
wm.Error = me.Error
err := c.codec.ReadHeader(m, r)
// check error in header
if len(me.Error) == 0 {
wm.Error = me.Header["X-Micro-Error"]
}
// check endpoint in header
if len(me.Endpoint) == 0 {
wm.Endpoint = me.Header["X-Micro-Endpoint"]
}
// check method in header
if len(me.Method) == 0 {
wm.Method = me.Header["X-Micro-Method"]
}
if len(me.Id) == 0 {
wm.Id = me.Header["X-Micro-Id"]
}
// get headers
getHeaders(m)
// return header error
if err != nil {

View File

@@ -404,6 +404,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second))
}
if val := time.Duration(ctx.GlobalInt("register_interval")); val > 0 {
serverOpts = append(serverOpts, server.RegisterInterval(val*time.Second))
}
// client opts
if r := ctx.Int("client_retries"); r >= 0 {
clientOpts = append(clientOpts, client.Retries(r))

View File

@@ -29,8 +29,8 @@ func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
// service method
path := m.Header[":path"]
if len(path) == 0 || path[0] != '/' {
m.Target = m.Header["X-Micro-Service"]
m.Endpoint = m.Header["X-Micro-Endpoint"]
m.Target = m.Header["Micro-Service"]
m.Endpoint = m.Header["Micro-Endpoint"]
} else {
// [ , a.package.Foo, Bar]
parts := strings.Split(path, "/")

View File

@@ -1,4 +1,4 @@
// Package micro is a pluggable RPC framework for microservices
// Package micro is a pluggable framework for microservices
package micro
import (
@@ -42,7 +42,7 @@ type Publisher interface {
type Option func(*Options)
var (
HeaderPrefix = "X-Micro-"
HeaderPrefix = "Micro-"
)
// NewService creates and returns a new Service based on the packages within.

View File

Before

Width:  |  Height:  |  Size: 25 KiB

After

Width:  |  Height:  |  Size: 25 KiB

View File

@@ -22,9 +22,6 @@ type Options struct {
Registry registry.Registry
Transport transport.Transport
// Register loop interval
RegisterInterval time.Duration
// Before and After funcs
BeforeStart []func() error
BeforeStop []func() error
@@ -168,7 +165,7 @@ func RegisterTTL(t time.Duration) Option {
// RegisterInterval specifies the interval on which to re-register
func RegisterInterval(t time.Duration) Option {
return func(o *Options) {
o.RegisterInterval = t
o.Server.Init(server.RegisterInterval(t))
}
}

51
registry/memory/data.go Normal file
View File

@@ -0,0 +1,51 @@
package memory
import (
"github.com/micro/go-micro/registry"
)
var (
// mock data
Data = map[string][]*registry.Service{
"foo": []*registry.Service{
{
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "foo-1.0.0-123",
Address: "localhost",
Port: 9999,
},
{
Id: "foo-1.0.0-321",
Address: "localhost",
Port: 9999,
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
{
Id: "foo-1.0.1-321",
Address: "localhost",
Port: 6666,
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*registry.Node{
{
Id: "foo-1.0.3-345",
Address: "localhost",
Port: 8888,
},
},
},
},
}
)

View File

@@ -2,60 +2,24 @@
package memory
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/registry"
)
type Registry struct {
options registry.Options
sync.RWMutex
Services map[string][]*registry.Service
Watchers map[string]*Watcher
}
var (
// mock data
Data = map[string][]*registry.Service{
"foo": []*registry.Service{
{
Name: "foo",
Version: "1.0.0",
Nodes: []*registry.Node{
{
Id: "foo-1.0.0-123",
Address: "localhost",
Port: 9999,
},
{
Id: "foo-1.0.0-321",
Address: "localhost",
Port: 9999,
},
},
},
{
Name: "foo",
Version: "1.0.1",
Nodes: []*registry.Node{
{
Id: "foo-1.0.1-321",
Address: "localhost",
Port: 6666,
},
},
},
{
Name: "foo",
Version: "1.0.3",
Nodes: []*registry.Node{
{
Id: "foo-1.0.3-345",
Address: "localhost",
Port: 8888,
},
},
},
},
}
timeout = time.Millisecond * 10
)
// Setup sets mock data
@@ -67,69 +31,130 @@ func (m *Registry) Setup() {
m.Services = Data
}
func (m *Registry) GetService(service string) ([]*registry.Service, error) {
m.Lock()
defer m.Unlock()
func (m *Registry) watch(r *registry.Result) {
var watchers []*Watcher
m.RLock()
for _, w := range m.Watchers {
watchers = append(watchers, w)
}
m.RUnlock()
for _, w := range watchers {
select {
case <-w.exit:
m.Lock()
delete(m.Watchers, w.id)
m.Unlock()
default:
select {
case w.res <- r:
case <-time.After(timeout):
}
}
}
}
func (m *Registry) Init(opts ...registry.Option) error {
for _, o := range opts {
o(&m.options)
}
// add services
m.Lock()
for k, v := range getServices(m.options.Context) {
s := m.Services[k]
m.Services[k] = addServices(s, v)
}
m.Unlock()
return nil
}
func (m *Registry) Options() registry.Options {
return m.options
}
func (m *Registry) GetService(service string) ([]*registry.Service, error) {
m.RLock()
s, ok := m.Services[service]
if !ok || len(s) == 0 {
m.RUnlock()
return nil, registry.ErrNotFound
}
m.RUnlock()
return s, nil
}
func (m *Registry) ListServices() ([]*registry.Service, error) {
m.Lock()
defer m.Unlock()
m.RLock()
var services []*registry.Service
for _, service := range m.Services {
services = append(services, service...)
}
m.RUnlock()
return services, nil
}
func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
m.Lock()
defer m.Unlock()
go m.watch(&registry.Result{Action: "update", Service: s})
m.Lock()
services := addServices(m.Services[s.Name], []*registry.Service{s})
m.Services[s.Name] = services
m.Unlock()
return nil
}
func (m *Registry) Deregister(s *registry.Service) error {
m.Lock()
defer m.Unlock()
go m.watch(&registry.Result{Action: "delete", Service: s})
m.Lock()
services := delServices(m.Services[s.Name], []*registry.Service{s})
m.Services[s.Name] = services
m.Unlock()
return nil
}
func (m *Registry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
var wopts registry.WatchOptions
var wo registry.WatchOptions
for _, o := range opts {
o(&wopts)
o(&wo)
}
return &memoryWatcher{exit: make(chan bool), opts: wopts}, nil
w := &Watcher{
exit: make(chan bool),
res: make(chan *registry.Result),
id: uuid.New().String(),
wo: wo,
}
m.Lock()
m.Watchers[w.id] = w
m.Unlock()
return w, nil
}
func (m *Registry) String() string {
return "memory"
}
func (m *Registry) Init(opts ...registry.Option) error {
return nil
}
func (m *Registry) Options() registry.Options {
return registry.Options{}
}
func NewRegistry(opts ...registry.Option) registry.Registry {
options := registry.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
services := getServices(options.Context)
if services == nil {
services = make(map[string][]*registry.Service)
}
return &Registry{
Services: make(map[string][]*registry.Service),
options: options,
Services: services,
Watchers: make(map[string]*Watcher),
}
}

View File

@@ -80,9 +80,8 @@ var (
}
)
func TestMockRegistry(t *testing.T) {
func TestMemoryRegistry(t *testing.T) {
m := NewRegistry()
m.(*Registry).Setup()
fn := func(k string, v []*registry.Service) {
services, err := m.GetService(k)
@@ -108,11 +107,6 @@ func TestMockRegistry(t *testing.T) {
}
}
// test existing memory data
for k, v := range Data {
fn(k, v)
}
// register data
for _, v := range testData {
for _, service := range v {
@@ -124,7 +118,6 @@ func TestMockRegistry(t *testing.T) {
// using test data
for k, v := range testData {
fn(k, v)
}

View File

@@ -0,0 +1,27 @@
package memory
import (
"context"
"github.com/micro/go-micro/registry"
)
type servicesKey struct{}
func getServices(ctx context.Context) map[string][]*registry.Service {
s, ok := ctx.Value(servicesKey{}).(map[string][]*registry.Service)
if !ok {
return nil
}
return s
}
// Services is an option that preloads service data
func Services(s map[string][]*registry.Service) registry.Option {
return func(o *registry.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, servicesKey{}, s)
}
}

View File

@@ -0,0 +1,37 @@
package memory
import (
"errors"
"github.com/micro/go-micro/registry"
)
type Watcher struct {
id string
wo registry.WatchOptions
res chan *registry.Result
exit chan bool
}
func (m *Watcher) Next() (*registry.Result, error) {
for {
select {
case r := <-m.res:
if len(m.wo.Service) > 0 && m.wo.Service != r.Service.Name {
continue
}
return r, nil
case <-m.exit:
return nil, errors.New("watcher stopped")
}
}
}
func (m *Watcher) Stop() {
select {
case <-m.exit:
return
default:
close(m.exit)
}
}

View File

@@ -0,0 +1,30 @@
package memory
import (
"testing"
"github.com/micro/go-micro/registry"
)
func TestWatcher(t *testing.T) {
w := &Watcher{
id: "test",
res: make(chan *registry.Result),
exit: make(chan bool),
}
go func() {
w.res <- &registry.Result{}
}()
_, err := w.Next()
if err != nil {
t.Fatal("unexpected err", err)
}
w.Stop()
if _, err := w.Next(); err == nil {
t.Fatal("expected error on Next()")
}
}

View File

@@ -326,12 +326,16 @@ func (c *registrySelector) run(name string) {
func (c *registrySelector) watch(w registry.Watcher) error {
defer w.Stop()
// reload chan
reload := make(chan bool, 1)
// manage this loop
go func() {
// wait for exit or reload signal
select {
case <-c.exit:
case <-c.reload:
reload <- true
}
// stop the watcher
@@ -341,7 +345,12 @@ func (c *registrySelector) watch(w registry.Watcher) error {
for {
res, err := w.Next()
if err != nil {
return err
select {
case <-reload:
return nil
default:
return err
}
}
c.update(res)
}

View File

@@ -1,13 +1,20 @@
package server
import "context"
type HandlerOption func(*HandlerOptions)
type HandlerOptions struct {
Internal bool
Metadata map[string]map[string]string
}
type SubscriberOption func(*SubscriberOptions)
type SubscriberOptions struct {
Queue string
Internal bool
Context context.Context
}
// EndpointMetadata is a Handler option that allows metadata to be added to
@@ -34,6 +41,17 @@ func InternalSubscriber(b bool) SubscriberOption {
o.Internal = b
}
}
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
opt := SubscriberOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&opt)
}
return opt
}
// Shared queue name distributed messages across subscribers
func SubscriberQueue(n string) SubscriberOption {
@@ -41,3 +59,10 @@ func SubscriberQueue(n string) SubscriberOption {
o.Queue = n
}
}
// SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberContext(ctx context.Context) SubscriberOption {
return func(o *SubscriberOptions) {
o.Context = ctx
}
}

View File

@@ -27,6 +27,8 @@ type Options struct {
// The register expiry time
RegisterTTL time.Duration
// The interval on which to register
RegisterInterval time.Duration
// The router for requests
Router Router
@@ -168,6 +170,13 @@ func RegisterTTL(t time.Duration) Option {
}
}
// Register the service with at interval
func RegisterInterval(t time.Duration) Option {
return func(o *Options) {
o.RegisterInterval = t
}
}
// WithRouter sets the request router
func WithRouter(r Router) Option {
return func(o *Options) {

View File

@@ -66,13 +66,63 @@ func (rwc *readWriteCloser) Close() error {
return nil
}
func getHeader(hdr string, md map[string]string) string {
if hd := md[hdr]; len(hd) > 0 {
return hd
}
return md["X-"+hdr]
}
func getHeaders(m *codec.Message) {
get := func(hdr, v string) string {
if len(v) > 0 {
return v
}
if hd := m.Header[hdr]; len(hd) > 0 {
return hd
}
// old
return m.Header["X-"+hdr]
}
m.Id = get("Micro-Id", m.Id)
m.Error = get("Micro-Error", m.Error)
m.Endpoint = get("Micro-Endpoint", m.Endpoint)
m.Method = get("Micro-Method", m.Method)
m.Target = get("Micro-Service", m.Target)
// TODO: remove this cruft
if len(m.Endpoint) == 0 {
m.Endpoint = m.Method
}
}
func setHeaders(m, r *codec.Message) {
set := func(hdr, v string) {
if len(v) == 0 {
return
}
m.Header[hdr] = v
m.Header["X-"+hdr] = v
}
// set headers
set("Micro-Id", r.Id)
set("Micro-Service", r.Target)
set("Micro-Method", r.Method)
set("Micro-Endpoint", r.Endpoint)
set("Micro-Error", r.Error)
}
// setupProtocol sets up the old protocol
func setupProtocol(msg *transport.Message) codec.NewCodec {
service := msg.Header["X-Micro-Service"]
method := msg.Header["X-Micro-Method"]
endpoint := msg.Header["X-Micro-Endpoint"]
protocol := msg.Header["X-Micro-Protocol"]
target := msg.Header["X-Micro-Target"]
service := getHeader("Micro-Service", msg.Header)
method := getHeader("Micro-Method", msg.Header)
endpoint := getHeader("Micro-Endpoint", msg.Header)
protocol := getHeader("Micro-Protocol", msg.Header)
target := getHeader("Micro-Target", msg.Header)
// if the protocol exists (mucp) do nothing
if len(protocol) > 0 {
@@ -91,12 +141,12 @@ func setupProtocol(msg *transport.Message) codec.NewCodec {
// no method then set to endpoint
if len(method) == 0 {
msg.Header["X-Micro-Method"] = method
msg.Header["Micro-Method"] = endpoint
}
// no endpoint then set to method
if len(endpoint) == 0 {
msg.Header["X-Micro-Endpoint"] = method
msg.Header["Micro-Endpoint"] = method
}
return nil
@@ -118,7 +168,7 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
}
func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
// the initieal message
// the initial message
m := codec.Message{
Header: c.req.Header,
Body: c.req.Body,
@@ -153,25 +203,17 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
c.first = false
// set some internal things
m.Target = m.Header["X-Micro-Service"]
m.Method = m.Header["X-Micro-Method"]
m.Endpoint = m.Header["X-Micro-Endpoint"]
m.Id = m.Header["X-Micro-Id"]
getHeaders(&m)
// read header via codec
err := c.codec.ReadHeader(&m, codec.Request)
// set the method/id
r.Method = m.Method
r.Endpoint = m.Endpoint
r.Id = m.Id
// TODO: remove the old legacy cruft
if len(r.Endpoint) == 0 {
r.Endpoint = r.Method
if err := c.codec.ReadHeader(&m, codec.Request); err != nil {
return err
}
return err
// set message
*r = m
return nil
}
func (c *rpcCodec) ReadBody(b interface{}) error {
@@ -206,29 +248,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
m.Header = map[string]string{}
}
// set request id
if len(r.Id) > 0 {
m.Header["X-Micro-Id"] = r.Id
}
// set target
if len(r.Target) > 0 {
m.Header["X-Micro-Service"] = r.Target
}
// set request method
if len(r.Method) > 0 {
m.Header["X-Micro-Method"] = r.Method
}
// set request endpoint
if len(r.Endpoint) > 0 {
m.Header["X-Micro-Endpoint"] = r.Endpoint
}
if len(r.Error) > 0 {
m.Header["X-Micro-Error"] = r.Error
}
setHeaders(m, r)
// the body being sent
var body []byte
@@ -246,6 +266,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
// write an error if it failed
m.Error = errors.Wrapf(err, "Unable to encode body").Error()
m.Header["X-Micro-Error"] = m.Error
m.Header["Micro-Error"] = m.Error
// no body to write
if err := c.codec.Write(m, nil); err != nil {
return err

View File

@@ -162,27 +162,23 @@ func prepareMethod(method reflect.Method) *methodType {
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
}
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, errmsg string, last bool) (err error) {
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, last bool) error {
msg := new(codec.Message)
msg.Type = codec.Response
resp := router.getResponse()
resp.msg = msg
// Encode the response header
resp.msg.Endpoint = req.msg.Endpoint
if errmsg != "" {
resp.msg.Error = errmsg
reply = invalidRequest
}
resp.msg.Id = req.msg.Id
sending.Lock()
err = cc.Write(resp.msg, reply)
err := cc.Write(resp.msg, reply)
sending.Unlock()
router.freeResponse(resp)
return err
}
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) {
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) error {
defer router.freeRequest(req)
function := mtype.method.Func
var returnValues []reflect.Value
@@ -206,18 +202,13 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
return nil
}
errmsg := ""
err := fn(ctx, r, replyv.Interface())
if err != nil {
errmsg = err.Error()
// execute handler
if err := fn(ctx, r, replyv.Interface()); err != nil {
return err
}
err = router.sendResponse(sending, req, replyv.Interface(), cc, errmsg, true)
if err != nil {
log.Log("rpc call: unable to send response: ", err)
}
router.freeRequest(req)
return
// send response
return router.sendResponse(sending, req, replyv.Interface(), cc, true)
}
// declare a local error to see if we errored out already
@@ -250,16 +241,15 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
// client.Stream request
r.stream = true
errmsg := ""
// execute handler
if err := fn(ctx, r, stream); err != nil {
errmsg = err.Error()
return err
}
// this is the last packet, we don't do anything with
// the error here (well sendStreamResponse will log it
// already)
router.sendResponse(sending, req, nil, cc, errmsg, true)
router.freeRequest(req)
return router.sendResponse(sending, req, nil, cc, true)
}
func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
@@ -448,11 +438,9 @@ func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response)
}
// send a response if we actually managed to read a header.
if req != nil {
router.sendResponse(sending, req, invalidRequest, rsp.Codec(), err.Error(), true)
router.freeRequest(req)
}
return err
}
service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
return nil
return service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
}

View File

@@ -10,7 +10,7 @@ import (
"sync"
"time"
"github.com/micro/go-log"
log "github.com/micro/go-log"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/metadata"
@@ -120,9 +120,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// internal request
request := &rpcRequest{
service: msg.Header["X-Micro-Service"],
method: msg.Header["X-Micro-Method"],
endpoint: msg.Header["X-Micro-Endpoint"],
service: getHeader("Micro-Service", msg.Header),
method: getHeader("Micro-Method", msg.Header),
endpoint: getHeader("Micro-Endpoint", msg.Header),
contentType: ct,
codec: rcodec,
header: msg.Header,
@@ -158,12 +158,15 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
// TODO: handle error better
if err := handler(ctx, request, response); err != nil {
// write an error response
rcodec.Write(&codec.Message{
err = rcodec.Write(&codec.Message{
Header: msg.Header,
Error: err.Error(),
Type: codec.Error,
}, nil)
// could not write the error response
if err != nil {
log.Logf("rpc: unable to write error response: %v", err)
}
s.wg.Done()
return
}
@@ -354,6 +357,9 @@ func (s *rpcServer) Register() error {
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.Queue(queue))
}
if cx := sb.Options().Context; cx != nil {
opts = append(opts, broker.SubscribeContext(cx))
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil {
return err
@@ -433,36 +439,53 @@ func (s *rpcServer) Start() error {
registerDebugHandler(s)
config := s.Options()
// start listening on the transport
ts, err := config.Transport.Listen(config.Address)
if err != nil {
return err
}
log.Logf("Transport Listening on %s", ts.Addr())
s.Lock()
log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
// swap address
s.Lock()
addr := s.opts.Address
s.opts.Address = ts.Addr()
s.Unlock()
exit := make(chan bool, 1)
// connect to the broker
if err := config.Broker.Connect(); err != nil {
return err
}
log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address())
// announce self to the world
if err := s.Register(); err != nil {
log.Log("Server register error: ", err)
}
exit := make(chan bool)
go func() {
for {
// listen for connections
err := ts.Accept(s.ServeConn)
// check if we're supposed to exit
// TODO: listen for messages
// msg := broker.Exchange(service).Consume()
select {
// check if we're supposed to exit
case <-exit:
return
default:
}
// check the error and backoff
if err != nil {
log.Logf("Accept error: %v", err)
time.Sleep(time.Second)
continue
default:
if err != nil {
log.Logf("Accept error: %v", err)
time.Sleep(time.Second)
continue
}
}
// no error just exit
@@ -471,9 +494,37 @@ func (s *rpcServer) Start() error {
}()
go func() {
// wait for exit
ch := <-s.exit
exit <- true
t := new(time.Ticker)
// only process if it exists
if s.opts.RegisterInterval > time.Duration(0) {
// new ticker
t = time.NewTicker(s.opts.RegisterInterval)
}
// return error chan
var ch chan error
Loop:
for {
select {
// register self on interval
case <-t.C:
if err := s.Register(); err != nil {
log.Log("Server register error: ", err)
}
// wait for exit
case ch = <-s.exit:
t.Stop()
close(exit)
break Loop
}
}
// deregister self
if err := s.Deregister(); err != nil {
log.Log("Server deregister error: ", err)
}
// wait for requests to finish
if wait(s.opts.Context) {
@@ -486,14 +537,13 @@ func (s *rpcServer) Start() error {
// disconnect the broker
config.Broker.Disconnect()
s.Lock()
// swap back address
s.Lock()
s.opts.Address = addr
s.Unlock()
}()
// TODO: subscribe to cruft
return config.Broker.Connect()
return nil
}
func (s *rpcServer) Stop() error {

View File

@@ -21,8 +21,6 @@ type Server interface {
NewHandler(interface{}, ...HandlerOption) Handler
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
Subscribe(Subscriber) error
Register() error
Deregister() error
Start() error
Stop() error
String() string
@@ -114,10 +112,6 @@ type Subscriber interface {
type Option func(*Options)
type HandlerOption func(*HandlerOptions)
type SubscriberOption func(*SubscriberOptions)
var (
DefaultAddress = ":0"
DefaultName = "go-server"
@@ -177,16 +171,6 @@ func Subscribe(s Subscriber) error {
return DefaultServer.Subscribe(s)
}
// Register registers the default server with the discovery system
func Register() error {
return DefaultServer.Register()
}
// Deregister deregisters the default server from the discovery system
func Deregister() error {
return DefaultServer.Deregister()
}
// Run starts the default server and waits for a kill
// signal before exiting. Also registers/deregisters the server
func Run() error {
@@ -194,18 +178,10 @@ func Run() error {
return err
}
if err := DefaultServer.Register(); err != nil {
return err
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
log.Logf("Received signal %s", <-ch)
if err := DefaultServer.Deregister(); err != nil {
return err
}
return Stop()
}

View File

@@ -5,10 +5,7 @@ import (
"os/signal"
"sync"
"syscall"
"time"
"github.com/micro/cli"
"github.com/micro/go-log"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/metadata"
@@ -36,27 +33,6 @@ func newService(opts ...Option) Service {
}
}
func (s *service) run(exit chan bool) {
if s.opts.RegisterInterval <= time.Duration(0) {
return
}
t := time.NewTicker(s.opts.RegisterInterval)
for {
select {
case <-t.C:
err := s.opts.Server.Register()
if err != nil {
log.Log("service run Server.Register error: ", err)
}
case <-exit:
t.Stop()
return
}
}
}
// Init initialises options. Additionally it calls cmd.Init
// which parses command line flags. cmd.Init is only called
// on first Init.
@@ -67,20 +43,6 @@ func (s *service) Init(opts ...Option) {
}
s.once.Do(func() {
// save user action
action := s.opts.Cmd.App().Action
// set service action
s.opts.Cmd.App().Action = func(c *cli.Context) {
// set register interval
if i := time.Duration(c.GlobalInt("register_interval")); i > 0 {
s.opts.RegisterInterval = i * time.Second
}
// user action
action(c)
}
// Initialise the command flags, overriding new service
_ = s.opts.Cmd.Init(
cmd.Broker(&s.opts.Broker),
@@ -105,7 +67,7 @@ func (s *service) Server() server.Server {
}
func (s *service) String() string {
return "go-micro"
return "micro"
}
func (s *service) Start() error {
@@ -119,10 +81,6 @@ func (s *service) Start() error {
return err
}
if err := s.opts.Server.Register(); err != nil {
return err
}
for _, fn := range s.opts.AfterStart {
if err := fn(); err != nil {
return err
@@ -141,10 +99,6 @@ func (s *service) Stop() error {
}
}
if err := s.opts.Server.Deregister(); err != nil {
return err
}
if err := s.opts.Server.Stop(); err != nil {
return err
}
@@ -163,10 +117,6 @@ func (s *service) Run() error {
return err
}
// start reg loop
ex := make(chan bool)
go s.run(ex)
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
@@ -177,8 +127,5 @@ func (s *service) Run() error {
case <-s.opts.Context.Done():
}
// exit reg loop
close(ex)
return s.Stop()
}