Compare commits
16 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
89c8e1f4a7 | ||
|
a06cd72337 | ||
|
e22fa01935 | ||
|
a5015692e3 | ||
|
539b8c1a3b | ||
|
67a738b504 | ||
ac1afea7fc | |||
|
8090f9968d | ||
|
7542aafd29 | ||
|
13de868b21 | ||
|
d090a97a3d | ||
|
8a0d5f0489 | ||
|
2ed676acf4 | ||
|
d8ba18deff | ||
|
1321782785 | ||
|
48b80dd051 |
@@ -1,6 +1,6 @@
|
||||
# Go Micro [](https://opensource.org/licenses/Apache-2.0) [](https://godoc.org/github.com/micro/go-micro) [](https://travis-ci.org/micro/go-micro) [](https://goreportcard.com/report/github.com/micro/go-micro)
|
||||
|
||||
Go Micro is a pluggable framework for micro service development.
|
||||
Go Micro is a framework for micro service development.
|
||||
|
||||
## Overview
|
||||
|
||||
@@ -8,7 +8,7 @@ Go Micro provides the core requirements for distributed systems development incl
|
||||
The **micro** philosophy is sane defaults with a pluggable architecture. We provide defaults to get you started quickly
|
||||
but everything can be easily swapped out.
|
||||
|
||||
<img src="https://micro.mu/docs/images/go-micro.png" />
|
||||
<img src="https://micro.mu/docs/images/go-micro.svg" />
|
||||
|
||||
Plugins are available at [github.com/micro/go-plugins](https://github.com/micro/go-plugins).
|
||||
|
||||
|
@@ -19,7 +19,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/codec/json"
|
||||
merr "github.com/micro/go-micro/errors"
|
||||
"github.com/micro/go-micro/registry"
|
||||
@@ -397,7 +396,6 @@ func (h *httpBroker) Connect() error {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Logf("Broker Listening on %s", l.Addr().String())
|
||||
addr := h.address
|
||||
h.address = l.Addr().String()
|
||||
|
||||
|
@@ -111,3 +111,10 @@ func TLSConfig(t *tls.Config) Option {
|
||||
o.TLSConfig = t
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribeContext set context
|
||||
func SubscribeContext(ctx context.Context) SubscribeOption {
|
||||
return func(o *SubscribeOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
@@ -487,8 +487,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
|
||||
|
||||
id := uuid.New().String()
|
||||
md["Content-Type"] = msg.ContentType()
|
||||
md["X-Micro-Topic"] = msg.Topic()
|
||||
md["X-Micro-Id"] = id
|
||||
md["Micro-Topic"] = msg.Topic()
|
||||
md["Micro-Id"] = id
|
||||
|
||||
// encode message body
|
||||
cf, err := r.newCodec(msg.ContentType())
|
||||
@@ -500,8 +500,8 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
|
||||
Target: msg.Topic(),
|
||||
Type: codec.Publication,
|
||||
Header: map[string]string{
|
||||
"X-Micro-Id": id,
|
||||
"X-Micro-Topic": msg.Topic(),
|
||||
"Micro-Id": id,
|
||||
"Micro-Topic": msg.Topic(),
|
||||
},
|
||||
}, msg.Payload()); err != nil {
|
||||
return errors.InternalServerError("go.micro.client", err.Error())
|
||||
|
@@ -84,6 +84,47 @@ func (rwc *readWriteCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getHeaders(m *codec.Message) {
|
||||
get := func(hdr string) string {
|
||||
if hd := m.Header[hdr]; len(hd) > 0 {
|
||||
return hd
|
||||
}
|
||||
// old
|
||||
return m.Header["X-"+hdr]
|
||||
}
|
||||
|
||||
// check error in header
|
||||
if len(m.Error) == 0 {
|
||||
m.Error = get("Micro-Error")
|
||||
}
|
||||
|
||||
// check endpoint in header
|
||||
if len(m.Endpoint) == 0 {
|
||||
m.Endpoint = get("Micro-Endpoint")
|
||||
}
|
||||
|
||||
// check method in header
|
||||
if len(m.Method) == 0 {
|
||||
m.Method = get("Micro-Method")
|
||||
}
|
||||
|
||||
if len(m.Id) == 0 {
|
||||
m.Id = get("Micro-Id")
|
||||
}
|
||||
}
|
||||
|
||||
func setHeaders(m *codec.Message) {
|
||||
set := func(hdr, v string) {
|
||||
m.Header[hdr] = v
|
||||
m.Header["X-"+hdr] = v
|
||||
}
|
||||
|
||||
set("Micro-Id", m.Id)
|
||||
set("Micro-Service", m.Target)
|
||||
set("Micro-Method", m.Method)
|
||||
set("Micro-Endpoint", m.Endpoint)
|
||||
}
|
||||
|
||||
// setupProtocol sets up the old protocol
|
||||
func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
|
||||
protocol := node.Metadata["protocol"]
|
||||
@@ -133,10 +174,7 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
|
||||
}
|
||||
|
||||
// set the mucp headers
|
||||
m.Header["X-Micro-Id"] = m.Id
|
||||
m.Header["X-Micro-Service"] = m.Target
|
||||
m.Header["X-Micro-Method"] = m.Method
|
||||
m.Header["X-Micro-Endpoint"] = m.Endpoint
|
||||
setHeaders(m)
|
||||
|
||||
// if body is bytes Frame don't encode
|
||||
if body != nil {
|
||||
@@ -171,43 +209,25 @@ func (c *rpcCodec) Write(m *codec.Message, body interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *rpcCodec) ReadHeader(wm *codec.Message, r codec.MessageType) error {
|
||||
var m transport.Message
|
||||
if err := c.client.Recv(&m); err != nil {
|
||||
func (c *rpcCodec) ReadHeader(m *codec.Message, r codec.MessageType) error {
|
||||
var tm transport.Message
|
||||
|
||||
// read message from transport
|
||||
if err := c.client.Recv(&tm); err != nil {
|
||||
return errors.InternalServerError("go.micro.client.transport", err.Error())
|
||||
}
|
||||
c.buf.rbuf.Reset()
|
||||
c.buf.rbuf.Write(m.Body)
|
||||
|
||||
var me codec.Message
|
||||
// set headers
|
||||
me.Header = m.Header
|
||||
c.buf.rbuf.Reset()
|
||||
c.buf.rbuf.Write(tm.Body)
|
||||
|
||||
// set headers from transport
|
||||
m.Header = tm.Header
|
||||
|
||||
// read header
|
||||
err := c.codec.ReadHeader(&me, r)
|
||||
wm.Endpoint = me.Endpoint
|
||||
wm.Method = me.Method
|
||||
wm.Id = me.Id
|
||||
wm.Error = me.Error
|
||||
err := c.codec.ReadHeader(m, r)
|
||||
|
||||
// check error in header
|
||||
if len(me.Error) == 0 {
|
||||
wm.Error = me.Header["X-Micro-Error"]
|
||||
}
|
||||
|
||||
// check endpoint in header
|
||||
if len(me.Endpoint) == 0 {
|
||||
wm.Endpoint = me.Header["X-Micro-Endpoint"]
|
||||
}
|
||||
|
||||
// check method in header
|
||||
if len(me.Method) == 0 {
|
||||
wm.Method = me.Header["X-Micro-Method"]
|
||||
}
|
||||
|
||||
if len(me.Id) == 0 {
|
||||
wm.Id = me.Header["X-Micro-Id"]
|
||||
}
|
||||
// get headers
|
||||
getHeaders(m)
|
||||
|
||||
// return header error
|
||||
if err != nil {
|
||||
|
@@ -404,6 +404,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
|
||||
serverOpts = append(serverOpts, server.RegisterTTL(ttl*time.Second))
|
||||
}
|
||||
|
||||
if val := time.Duration(ctx.GlobalInt("register_interval")); val > 0 {
|
||||
serverOpts = append(serverOpts, server.RegisterInterval(val*time.Second))
|
||||
}
|
||||
|
||||
// client opts
|
||||
if r := ctx.Int("client_retries"); r >= 0 {
|
||||
clientOpts = append(clientOpts, client.Retries(r))
|
||||
|
@@ -29,8 +29,8 @@ func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
|
||||
// service method
|
||||
path := m.Header[":path"]
|
||||
if len(path) == 0 || path[0] != '/' {
|
||||
m.Target = m.Header["X-Micro-Service"]
|
||||
m.Endpoint = m.Header["X-Micro-Endpoint"]
|
||||
m.Target = m.Header["Micro-Service"]
|
||||
m.Endpoint = m.Header["Micro-Endpoint"]
|
||||
} else {
|
||||
// [ , a.package.Foo, Bar]
|
||||
parts := strings.Split(path, "/")
|
||||
|
@@ -1,4 +1,4 @@
|
||||
// Package micro is a pluggable RPC framework for microservices
|
||||
// Package micro is a pluggable framework for microservices
|
||||
package micro
|
||||
|
||||
import (
|
||||
@@ -42,7 +42,7 @@ type Publisher interface {
|
||||
type Option func(*Options)
|
||||
|
||||
var (
|
||||
HeaderPrefix = "X-Micro-"
|
||||
HeaderPrefix = "Micro-"
|
||||
)
|
||||
|
||||
// NewService creates and returns a new Service based on the packages within.
|
Before Width: | Height: | Size: 25 KiB After Width: | Height: | Size: 25 KiB |
@@ -22,9 +22,6 @@ type Options struct {
|
||||
Registry registry.Registry
|
||||
Transport transport.Transport
|
||||
|
||||
// Register loop interval
|
||||
RegisterInterval time.Duration
|
||||
|
||||
// Before and After funcs
|
||||
BeforeStart []func() error
|
||||
BeforeStop []func() error
|
||||
@@ -168,7 +165,7 @@ func RegisterTTL(t time.Duration) Option {
|
||||
// RegisterInterval specifies the interval on which to re-register
|
||||
func RegisterInterval(t time.Duration) Option {
|
||||
return func(o *Options) {
|
||||
o.RegisterInterval = t
|
||||
o.Server.Init(server.RegisterInterval(t))
|
||||
}
|
||||
}
|
||||
|
||||
|
51
registry/memory/data.go
Normal file
51
registry/memory/data.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
var (
|
||||
// mock data
|
||||
Data = map[string][]*registry.Service{
|
||||
"foo": []*registry.Service{
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-1.0.0-123",
|
||||
Address: "localhost",
|
||||
Port: 9999,
|
||||
},
|
||||
{
|
||||
Id: "foo-1.0.0-321",
|
||||
Address: "localhost",
|
||||
Port: 9999,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.1",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-1.0.1-321",
|
||||
Address: "localhost",
|
||||
Port: 6666,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.3",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-1.0.3-345",
|
||||
Address: "localhost",
|
||||
Port: 8888,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
@@ -2,60 +2,24 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
type Registry struct {
|
||||
options registry.Options
|
||||
|
||||
sync.RWMutex
|
||||
Services map[string][]*registry.Service
|
||||
Watchers map[string]*Watcher
|
||||
}
|
||||
|
||||
var (
|
||||
// mock data
|
||||
Data = map[string][]*registry.Service{
|
||||
"foo": []*registry.Service{
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.0",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-1.0.0-123",
|
||||
Address: "localhost",
|
||||
Port: 9999,
|
||||
},
|
||||
{
|
||||
Id: "foo-1.0.0-321",
|
||||
Address: "localhost",
|
||||
Port: 9999,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.1",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-1.0.1-321",
|
||||
Address: "localhost",
|
||||
Port: 6666,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "foo",
|
||||
Version: "1.0.3",
|
||||
Nodes: []*registry.Node{
|
||||
{
|
||||
Id: "foo-1.0.3-345",
|
||||
Address: "localhost",
|
||||
Port: 8888,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
timeout = time.Millisecond * 10
|
||||
)
|
||||
|
||||
// Setup sets mock data
|
||||
@@ -67,69 +31,130 @@ func (m *Registry) Setup() {
|
||||
m.Services = Data
|
||||
}
|
||||
|
||||
func (m *Registry) GetService(service string) ([]*registry.Service, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
func (m *Registry) watch(r *registry.Result) {
|
||||
var watchers []*Watcher
|
||||
|
||||
m.RLock()
|
||||
for _, w := range m.Watchers {
|
||||
watchers = append(watchers, w)
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
for _, w := range watchers {
|
||||
select {
|
||||
case <-w.exit:
|
||||
m.Lock()
|
||||
delete(m.Watchers, w.id)
|
||||
m.Unlock()
|
||||
default:
|
||||
select {
|
||||
case w.res <- r:
|
||||
case <-time.After(timeout):
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Registry) Init(opts ...registry.Option) error {
|
||||
for _, o := range opts {
|
||||
o(&m.options)
|
||||
}
|
||||
|
||||
// add services
|
||||
m.Lock()
|
||||
for k, v := range getServices(m.options.Context) {
|
||||
s := m.Services[k]
|
||||
m.Services[k] = addServices(s, v)
|
||||
}
|
||||
m.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Registry) Options() registry.Options {
|
||||
return m.options
|
||||
}
|
||||
|
||||
func (m *Registry) GetService(service string) ([]*registry.Service, error) {
|
||||
m.RLock()
|
||||
s, ok := m.Services[service]
|
||||
if !ok || len(s) == 0 {
|
||||
m.RUnlock()
|
||||
return nil, registry.ErrNotFound
|
||||
}
|
||||
m.RUnlock()
|
||||
return s, nil
|
||||
|
||||
}
|
||||
|
||||
func (m *Registry) ListServices() ([]*registry.Service, error) {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
m.RLock()
|
||||
var services []*registry.Service
|
||||
for _, service := range m.Services {
|
||||
services = append(services, service...)
|
||||
}
|
||||
m.RUnlock()
|
||||
return services, nil
|
||||
}
|
||||
|
||||
func (m *Registry) Register(s *registry.Service, opts ...registry.RegisterOption) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
go m.watch(®istry.Result{Action: "update", Service: s})
|
||||
|
||||
m.Lock()
|
||||
services := addServices(m.Services[s.Name], []*registry.Service{s})
|
||||
m.Services[s.Name] = services
|
||||
m.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Registry) Deregister(s *registry.Service) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
go m.watch(®istry.Result{Action: "delete", Service: s})
|
||||
|
||||
m.Lock()
|
||||
services := delServices(m.Services[s.Name], []*registry.Service{s})
|
||||
m.Services[s.Name] = services
|
||||
m.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Registry) Watch(opts ...registry.WatchOption) (registry.Watcher, error) {
|
||||
var wopts registry.WatchOptions
|
||||
var wo registry.WatchOptions
|
||||
for _, o := range opts {
|
||||
o(&wopts)
|
||||
o(&wo)
|
||||
}
|
||||
return &memoryWatcher{exit: make(chan bool), opts: wopts}, nil
|
||||
|
||||
w := &Watcher{
|
||||
exit: make(chan bool),
|
||||
res: make(chan *registry.Result),
|
||||
id: uuid.New().String(),
|
||||
wo: wo,
|
||||
}
|
||||
|
||||
m.Lock()
|
||||
m.Watchers[w.id] = w
|
||||
m.Unlock()
|
||||
return w, nil
|
||||
}
|
||||
|
||||
func (m *Registry) String() string {
|
||||
return "memory"
|
||||
}
|
||||
|
||||
func (m *Registry) Init(opts ...registry.Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Registry) Options() registry.Options {
|
||||
return registry.Options{}
|
||||
}
|
||||
|
||||
func NewRegistry(opts ...registry.Option) registry.Registry {
|
||||
options := registry.Options{
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
services := getServices(options.Context)
|
||||
if services == nil {
|
||||
services = make(map[string][]*registry.Service)
|
||||
}
|
||||
|
||||
return &Registry{
|
||||
Services: make(map[string][]*registry.Service),
|
||||
options: options,
|
||||
Services: services,
|
||||
Watchers: make(map[string]*Watcher),
|
||||
}
|
||||
}
|
||||
|
@@ -80,9 +80,8 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
func TestMockRegistry(t *testing.T) {
|
||||
func TestMemoryRegistry(t *testing.T) {
|
||||
m := NewRegistry()
|
||||
m.(*Registry).Setup()
|
||||
|
||||
fn := func(k string, v []*registry.Service) {
|
||||
services, err := m.GetService(k)
|
||||
@@ -108,11 +107,6 @@ func TestMockRegistry(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// test existing memory data
|
||||
for k, v := range Data {
|
||||
fn(k, v)
|
||||
}
|
||||
|
||||
// register data
|
||||
for _, v := range testData {
|
||||
for _, service := range v {
|
||||
@@ -124,7 +118,6 @@ func TestMockRegistry(t *testing.T) {
|
||||
|
||||
// using test data
|
||||
for k, v := range testData {
|
||||
|
||||
fn(k, v)
|
||||
}
|
||||
|
||||
|
27
registry/memory/options.go
Normal file
27
registry/memory/options.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
type servicesKey struct{}
|
||||
|
||||
func getServices(ctx context.Context) map[string][]*registry.Service {
|
||||
s, ok := ctx.Value(servicesKey{}).(map[string][]*registry.Service)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Services is an option that preloads service data
|
||||
func Services(s map[string][]*registry.Service) registry.Option {
|
||||
return func(o *registry.Options) {
|
||||
if o.Context == nil {
|
||||
o.Context = context.Background()
|
||||
}
|
||||
o.Context = context.WithValue(o.Context, servicesKey{}, s)
|
||||
}
|
||||
}
|
37
registry/memory/watcher.go
Normal file
37
registry/memory/watcher.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
type Watcher struct {
|
||||
id string
|
||||
wo registry.WatchOptions
|
||||
res chan *registry.Result
|
||||
exit chan bool
|
||||
}
|
||||
|
||||
func (m *Watcher) Next() (*registry.Result, error) {
|
||||
for {
|
||||
select {
|
||||
case r := <-m.res:
|
||||
if len(m.wo.Service) > 0 && m.wo.Service != r.Service.Name {
|
||||
continue
|
||||
}
|
||||
return r, nil
|
||||
case <-m.exit:
|
||||
return nil, errors.New("watcher stopped")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Watcher) Stop() {
|
||||
select {
|
||||
case <-m.exit:
|
||||
return
|
||||
default:
|
||||
close(m.exit)
|
||||
}
|
||||
}
|
30
registry/memory/watcher_test.go
Normal file
30
registry/memory/watcher_test.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package memory
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
)
|
||||
|
||||
func TestWatcher(t *testing.T) {
|
||||
w := &Watcher{
|
||||
id: "test",
|
||||
res: make(chan *registry.Result),
|
||||
exit: make(chan bool),
|
||||
}
|
||||
|
||||
go func() {
|
||||
w.res <- ®istry.Result{}
|
||||
}()
|
||||
|
||||
_, err := w.Next()
|
||||
if err != nil {
|
||||
t.Fatal("unexpected err", err)
|
||||
}
|
||||
|
||||
w.Stop()
|
||||
|
||||
if _, err := w.Next(); err == nil {
|
||||
t.Fatal("expected error on Next()")
|
||||
}
|
||||
}
|
@@ -326,12 +326,16 @@ func (c *registrySelector) run(name string) {
|
||||
func (c *registrySelector) watch(w registry.Watcher) error {
|
||||
defer w.Stop()
|
||||
|
||||
// reload chan
|
||||
reload := make(chan bool, 1)
|
||||
|
||||
// manage this loop
|
||||
go func() {
|
||||
// wait for exit or reload signal
|
||||
select {
|
||||
case <-c.exit:
|
||||
case <-c.reload:
|
||||
reload <- true
|
||||
}
|
||||
|
||||
// stop the watcher
|
||||
@@ -341,7 +345,12 @@ func (c *registrySelector) watch(w registry.Watcher) error {
|
||||
for {
|
||||
res, err := w.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
select {
|
||||
case <-reload:
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
c.update(res)
|
||||
}
|
||||
|
@@ -1,13 +1,20 @@
|
||||
package server
|
||||
|
||||
import "context"
|
||||
|
||||
type HandlerOption func(*HandlerOptions)
|
||||
|
||||
type HandlerOptions struct {
|
||||
Internal bool
|
||||
Metadata map[string]map[string]string
|
||||
}
|
||||
|
||||
type SubscriberOption func(*SubscriberOptions)
|
||||
|
||||
type SubscriberOptions struct {
|
||||
Queue string
|
||||
Internal bool
|
||||
Context context.Context
|
||||
}
|
||||
|
||||
// EndpointMetadata is a Handler option that allows metadata to be added to
|
||||
@@ -34,6 +41,17 @@ func InternalSubscriber(b bool) SubscriberOption {
|
||||
o.Internal = b
|
||||
}
|
||||
}
|
||||
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
|
||||
opt := SubscriberOptions{
|
||||
Context: context.Background(),
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
o(&opt)
|
||||
}
|
||||
|
||||
return opt
|
||||
}
|
||||
|
||||
// Shared queue name distributed messages across subscribers
|
||||
func SubscriberQueue(n string) SubscriberOption {
|
||||
@@ -41,3 +59,10 @@ func SubscriberQueue(n string) SubscriberOption {
|
||||
o.Queue = n
|
||||
}
|
||||
}
|
||||
|
||||
// SubscriberContext set context options to allow broker SubscriberOption passed
|
||||
func SubscriberContext(ctx context.Context) SubscriberOption {
|
||||
return func(o *SubscriberOptions) {
|
||||
o.Context = ctx
|
||||
}
|
||||
}
|
||||
|
@@ -27,6 +27,8 @@ type Options struct {
|
||||
|
||||
// The register expiry time
|
||||
RegisterTTL time.Duration
|
||||
// The interval on which to register
|
||||
RegisterInterval time.Duration
|
||||
|
||||
// The router for requests
|
||||
Router Router
|
||||
@@ -168,6 +170,13 @@ func RegisterTTL(t time.Duration) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Register the service with at interval
|
||||
func RegisterInterval(t time.Duration) Option {
|
||||
return func(o *Options) {
|
||||
o.RegisterInterval = t
|
||||
}
|
||||
}
|
||||
|
||||
// WithRouter sets the request router
|
||||
func WithRouter(r Router) Option {
|
||||
return func(o *Options) {
|
||||
|
@@ -66,13 +66,63 @@ func (rwc *readWriteCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getHeader(hdr string, md map[string]string) string {
|
||||
if hd := md[hdr]; len(hd) > 0 {
|
||||
return hd
|
||||
}
|
||||
return md["X-"+hdr]
|
||||
}
|
||||
|
||||
func getHeaders(m *codec.Message) {
|
||||
get := func(hdr, v string) string {
|
||||
if len(v) > 0 {
|
||||
return v
|
||||
}
|
||||
|
||||
if hd := m.Header[hdr]; len(hd) > 0 {
|
||||
return hd
|
||||
}
|
||||
|
||||
// old
|
||||
return m.Header["X-"+hdr]
|
||||
}
|
||||
|
||||
m.Id = get("Micro-Id", m.Id)
|
||||
m.Error = get("Micro-Error", m.Error)
|
||||
m.Endpoint = get("Micro-Endpoint", m.Endpoint)
|
||||
m.Method = get("Micro-Method", m.Method)
|
||||
m.Target = get("Micro-Service", m.Target)
|
||||
|
||||
// TODO: remove this cruft
|
||||
if len(m.Endpoint) == 0 {
|
||||
m.Endpoint = m.Method
|
||||
}
|
||||
}
|
||||
|
||||
func setHeaders(m, r *codec.Message) {
|
||||
set := func(hdr, v string) {
|
||||
if len(v) == 0 {
|
||||
return
|
||||
}
|
||||
m.Header[hdr] = v
|
||||
m.Header["X-"+hdr] = v
|
||||
}
|
||||
|
||||
// set headers
|
||||
set("Micro-Id", r.Id)
|
||||
set("Micro-Service", r.Target)
|
||||
set("Micro-Method", r.Method)
|
||||
set("Micro-Endpoint", r.Endpoint)
|
||||
set("Micro-Error", r.Error)
|
||||
}
|
||||
|
||||
// setupProtocol sets up the old protocol
|
||||
func setupProtocol(msg *transport.Message) codec.NewCodec {
|
||||
service := msg.Header["X-Micro-Service"]
|
||||
method := msg.Header["X-Micro-Method"]
|
||||
endpoint := msg.Header["X-Micro-Endpoint"]
|
||||
protocol := msg.Header["X-Micro-Protocol"]
|
||||
target := msg.Header["X-Micro-Target"]
|
||||
service := getHeader("Micro-Service", msg.Header)
|
||||
method := getHeader("Micro-Method", msg.Header)
|
||||
endpoint := getHeader("Micro-Endpoint", msg.Header)
|
||||
protocol := getHeader("Micro-Protocol", msg.Header)
|
||||
target := getHeader("Micro-Target", msg.Header)
|
||||
|
||||
// if the protocol exists (mucp) do nothing
|
||||
if len(protocol) > 0 {
|
||||
@@ -91,12 +141,12 @@ func setupProtocol(msg *transport.Message) codec.NewCodec {
|
||||
|
||||
// no method then set to endpoint
|
||||
if len(method) == 0 {
|
||||
msg.Header["X-Micro-Method"] = method
|
||||
msg.Header["Micro-Method"] = endpoint
|
||||
}
|
||||
|
||||
// no endpoint then set to method
|
||||
if len(endpoint) == 0 {
|
||||
msg.Header["X-Micro-Endpoint"] = method
|
||||
msg.Header["Micro-Endpoint"] = method
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -118,7 +168,7 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
|
||||
}
|
||||
|
||||
func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
|
||||
// the initieal message
|
||||
// the initial message
|
||||
m := codec.Message{
|
||||
Header: c.req.Header,
|
||||
Body: c.req.Body,
|
||||
@@ -153,25 +203,17 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
|
||||
c.first = false
|
||||
|
||||
// set some internal things
|
||||
m.Target = m.Header["X-Micro-Service"]
|
||||
m.Method = m.Header["X-Micro-Method"]
|
||||
m.Endpoint = m.Header["X-Micro-Endpoint"]
|
||||
m.Id = m.Header["X-Micro-Id"]
|
||||
getHeaders(&m)
|
||||
|
||||
// read header via codec
|
||||
err := c.codec.ReadHeader(&m, codec.Request)
|
||||
|
||||
// set the method/id
|
||||
r.Method = m.Method
|
||||
r.Endpoint = m.Endpoint
|
||||
r.Id = m.Id
|
||||
|
||||
// TODO: remove the old legacy cruft
|
||||
if len(r.Endpoint) == 0 {
|
||||
r.Endpoint = r.Method
|
||||
if err := c.codec.ReadHeader(&m, codec.Request); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
// set message
|
||||
*r = m
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *rpcCodec) ReadBody(b interface{}) error {
|
||||
@@ -206,29 +248,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
|
||||
m.Header = map[string]string{}
|
||||
}
|
||||
|
||||
// set request id
|
||||
if len(r.Id) > 0 {
|
||||
m.Header["X-Micro-Id"] = r.Id
|
||||
}
|
||||
|
||||
// set target
|
||||
if len(r.Target) > 0 {
|
||||
m.Header["X-Micro-Service"] = r.Target
|
||||
}
|
||||
|
||||
// set request method
|
||||
if len(r.Method) > 0 {
|
||||
m.Header["X-Micro-Method"] = r.Method
|
||||
}
|
||||
|
||||
// set request endpoint
|
||||
if len(r.Endpoint) > 0 {
|
||||
m.Header["X-Micro-Endpoint"] = r.Endpoint
|
||||
}
|
||||
|
||||
if len(r.Error) > 0 {
|
||||
m.Header["X-Micro-Error"] = r.Error
|
||||
}
|
||||
setHeaders(m, r)
|
||||
|
||||
// the body being sent
|
||||
var body []byte
|
||||
@@ -246,6 +266,7 @@ func (c *rpcCodec) Write(r *codec.Message, b interface{}) error {
|
||||
// write an error if it failed
|
||||
m.Error = errors.Wrapf(err, "Unable to encode body").Error()
|
||||
m.Header["X-Micro-Error"] = m.Error
|
||||
m.Header["Micro-Error"] = m.Error
|
||||
// no body to write
|
||||
if err := c.codec.Write(m, nil); err != nil {
|
||||
return err
|
||||
|
@@ -162,27 +162,23 @@ func prepareMethod(method reflect.Method) *methodType {
|
||||
return &methodType{method: method, ArgType: argType, ReplyType: replyType, ContextType: contextType, stream: stream}
|
||||
}
|
||||
|
||||
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, errmsg string, last bool) (err error) {
|
||||
func (router *router) sendResponse(sending sync.Locker, req *request, reply interface{}, cc codec.Writer, last bool) error {
|
||||
msg := new(codec.Message)
|
||||
msg.Type = codec.Response
|
||||
resp := router.getResponse()
|
||||
resp.msg = msg
|
||||
|
||||
// Encode the response header
|
||||
resp.msg.Endpoint = req.msg.Endpoint
|
||||
if errmsg != "" {
|
||||
resp.msg.Error = errmsg
|
||||
reply = invalidRequest
|
||||
}
|
||||
resp.msg.Id = req.msg.Id
|
||||
sending.Lock()
|
||||
err = cc.Write(resp.msg, reply)
|
||||
err := cc.Write(resp.msg, reply)
|
||||
sending.Unlock()
|
||||
router.freeResponse(resp)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) {
|
||||
func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, mtype *methodType, req *request, argv, replyv reflect.Value, cc codec.Writer) error {
|
||||
defer router.freeRequest(req)
|
||||
|
||||
function := mtype.method.Func
|
||||
var returnValues []reflect.Value
|
||||
|
||||
@@ -206,18 +202,13 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
|
||||
return nil
|
||||
}
|
||||
|
||||
errmsg := ""
|
||||
err := fn(ctx, r, replyv.Interface())
|
||||
if err != nil {
|
||||
errmsg = err.Error()
|
||||
// execute handler
|
||||
if err := fn(ctx, r, replyv.Interface()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = router.sendResponse(sending, req, replyv.Interface(), cc, errmsg, true)
|
||||
if err != nil {
|
||||
log.Log("rpc call: unable to send response: ", err)
|
||||
}
|
||||
router.freeRequest(req)
|
||||
return
|
||||
// send response
|
||||
return router.sendResponse(sending, req, replyv.Interface(), cc, true)
|
||||
}
|
||||
|
||||
// declare a local error to see if we errored out already
|
||||
@@ -250,16 +241,15 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
|
||||
// client.Stream request
|
||||
r.stream = true
|
||||
|
||||
errmsg := ""
|
||||
// execute handler
|
||||
if err := fn(ctx, r, stream); err != nil {
|
||||
errmsg = err.Error()
|
||||
return err
|
||||
}
|
||||
|
||||
// this is the last packet, we don't do anything with
|
||||
// the error here (well sendStreamResponse will log it
|
||||
// already)
|
||||
router.sendResponse(sending, req, nil, cc, errmsg, true)
|
||||
router.freeRequest(req)
|
||||
return router.sendResponse(sending, req, nil, cc, true)
|
||||
}
|
||||
|
||||
func (m *methodType) prepareContext(ctx context.Context) reflect.Value {
|
||||
@@ -448,11 +438,9 @@ func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response)
|
||||
}
|
||||
// send a response if we actually managed to read a header.
|
||||
if req != nil {
|
||||
router.sendResponse(sending, req, invalidRequest, rsp.Codec(), err.Error(), true)
|
||||
router.freeRequest(req)
|
||||
}
|
||||
return err
|
||||
}
|
||||
service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
|
||||
return nil
|
||||
return service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
|
||||
}
|
||||
|
@@ -10,7 +10,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/micro/go-log"
|
||||
log "github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/codec"
|
||||
"github.com/micro/go-micro/metadata"
|
||||
@@ -120,9 +120,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
||||
|
||||
// internal request
|
||||
request := &rpcRequest{
|
||||
service: msg.Header["X-Micro-Service"],
|
||||
method: msg.Header["X-Micro-Method"],
|
||||
endpoint: msg.Header["X-Micro-Endpoint"],
|
||||
service: getHeader("Micro-Service", msg.Header),
|
||||
method: getHeader("Micro-Method", msg.Header),
|
||||
endpoint: getHeader("Micro-Endpoint", msg.Header),
|
||||
contentType: ct,
|
||||
codec: rcodec,
|
||||
header: msg.Header,
|
||||
@@ -158,12 +158,15 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
|
||||
// TODO: handle error better
|
||||
if err := handler(ctx, request, response); err != nil {
|
||||
// write an error response
|
||||
rcodec.Write(&codec.Message{
|
||||
err = rcodec.Write(&codec.Message{
|
||||
Header: msg.Header,
|
||||
Error: err.Error(),
|
||||
Type: codec.Error,
|
||||
}, nil)
|
||||
|
||||
// could not write the error response
|
||||
if err != nil {
|
||||
log.Logf("rpc: unable to write error response: %v", err)
|
||||
}
|
||||
s.wg.Done()
|
||||
return
|
||||
}
|
||||
@@ -354,6 +357,9 @@ func (s *rpcServer) Register() error {
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
opts = append(opts, broker.Queue(queue))
|
||||
}
|
||||
if cx := sb.Options().Context; cx != nil {
|
||||
opts = append(opts, broker.SubscribeContext(cx))
|
||||
}
|
||||
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -433,36 +439,53 @@ func (s *rpcServer) Start() error {
|
||||
registerDebugHandler(s)
|
||||
config := s.Options()
|
||||
|
||||
// start listening on the transport
|
||||
ts, err := config.Transport.Listen(config.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Logf("Transport Listening on %s", ts.Addr())
|
||||
s.Lock()
|
||||
log.Logf("Transport [%s] Listening on %s", config.Transport.String(), ts.Addr())
|
||||
|
||||
// swap address
|
||||
s.Lock()
|
||||
addr := s.opts.Address
|
||||
s.opts.Address = ts.Addr()
|
||||
s.Unlock()
|
||||
|
||||
exit := make(chan bool, 1)
|
||||
// connect to the broker
|
||||
if err := config.Broker.Connect(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Logf("Broker [%s] Listening on %s", config.Broker.String(), config.Broker.Address())
|
||||
|
||||
// announce self to the world
|
||||
if err := s.Register(); err != nil {
|
||||
log.Log("Server register error: ", err)
|
||||
}
|
||||
|
||||
exit := make(chan bool)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
// listen for connections
|
||||
err := ts.Accept(s.ServeConn)
|
||||
|
||||
// check if we're supposed to exit
|
||||
// TODO: listen for messages
|
||||
// msg := broker.Exchange(service).Consume()
|
||||
|
||||
select {
|
||||
// check if we're supposed to exit
|
||||
case <-exit:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// check the error and backoff
|
||||
if err != nil {
|
||||
log.Logf("Accept error: %v", err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
default:
|
||||
if err != nil {
|
||||
log.Logf("Accept error: %v", err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// no error just exit
|
||||
@@ -471,9 +494,37 @@ func (s *rpcServer) Start() error {
|
||||
}()
|
||||
|
||||
go func() {
|
||||
// wait for exit
|
||||
ch := <-s.exit
|
||||
exit <- true
|
||||
t := new(time.Ticker)
|
||||
|
||||
// only process if it exists
|
||||
if s.opts.RegisterInterval > time.Duration(0) {
|
||||
// new ticker
|
||||
t = time.NewTicker(s.opts.RegisterInterval)
|
||||
}
|
||||
|
||||
// return error chan
|
||||
var ch chan error
|
||||
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
// register self on interval
|
||||
case <-t.C:
|
||||
if err := s.Register(); err != nil {
|
||||
log.Log("Server register error: ", err)
|
||||
}
|
||||
// wait for exit
|
||||
case ch = <-s.exit:
|
||||
t.Stop()
|
||||
close(exit)
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
|
||||
// deregister self
|
||||
if err := s.Deregister(); err != nil {
|
||||
log.Log("Server deregister error: ", err)
|
||||
}
|
||||
|
||||
// wait for requests to finish
|
||||
if wait(s.opts.Context) {
|
||||
@@ -486,14 +537,13 @@ func (s *rpcServer) Start() error {
|
||||
// disconnect the broker
|
||||
config.Broker.Disconnect()
|
||||
|
||||
s.Lock()
|
||||
// swap back address
|
||||
s.Lock()
|
||||
s.opts.Address = addr
|
||||
s.Unlock()
|
||||
}()
|
||||
|
||||
// TODO: subscribe to cruft
|
||||
return config.Broker.Connect()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *rpcServer) Stop() error {
|
||||
|
@@ -21,8 +21,6 @@ type Server interface {
|
||||
NewHandler(interface{}, ...HandlerOption) Handler
|
||||
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
|
||||
Subscribe(Subscriber) error
|
||||
Register() error
|
||||
Deregister() error
|
||||
Start() error
|
||||
Stop() error
|
||||
String() string
|
||||
@@ -114,10 +112,6 @@ type Subscriber interface {
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
type HandlerOption func(*HandlerOptions)
|
||||
|
||||
type SubscriberOption func(*SubscriberOptions)
|
||||
|
||||
var (
|
||||
DefaultAddress = ":0"
|
||||
DefaultName = "go-server"
|
||||
@@ -177,16 +171,6 @@ func Subscribe(s Subscriber) error {
|
||||
return DefaultServer.Subscribe(s)
|
||||
}
|
||||
|
||||
// Register registers the default server with the discovery system
|
||||
func Register() error {
|
||||
return DefaultServer.Register()
|
||||
}
|
||||
|
||||
// Deregister deregisters the default server from the discovery system
|
||||
func Deregister() error {
|
||||
return DefaultServer.Deregister()
|
||||
}
|
||||
|
||||
// Run starts the default server and waits for a kill
|
||||
// signal before exiting. Also registers/deregisters the server
|
||||
func Run() error {
|
||||
@@ -194,18 +178,10 @@ func Run() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := DefaultServer.Register(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
|
||||
log.Logf("Received signal %s", <-ch)
|
||||
|
||||
if err := DefaultServer.Deregister(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return Stop()
|
||||
}
|
||||
|
||||
|
55
service.go
55
service.go
@@ -5,10 +5,7 @@ import (
|
||||
"os/signal"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/micro/cli"
|
||||
"github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/client"
|
||||
"github.com/micro/go-micro/cmd"
|
||||
"github.com/micro/go-micro/metadata"
|
||||
@@ -36,27 +33,6 @@ func newService(opts ...Option) Service {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *service) run(exit chan bool) {
|
||||
if s.opts.RegisterInterval <= time.Duration(0) {
|
||||
return
|
||||
}
|
||||
|
||||
t := time.NewTicker(s.opts.RegisterInterval)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
err := s.opts.Server.Register()
|
||||
if err != nil {
|
||||
log.Log("service run Server.Register error: ", err)
|
||||
}
|
||||
case <-exit:
|
||||
t.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Init initialises options. Additionally it calls cmd.Init
|
||||
// which parses command line flags. cmd.Init is only called
|
||||
// on first Init.
|
||||
@@ -67,20 +43,6 @@ func (s *service) Init(opts ...Option) {
|
||||
}
|
||||
|
||||
s.once.Do(func() {
|
||||
// save user action
|
||||
action := s.opts.Cmd.App().Action
|
||||
|
||||
// set service action
|
||||
s.opts.Cmd.App().Action = func(c *cli.Context) {
|
||||
// set register interval
|
||||
if i := time.Duration(c.GlobalInt("register_interval")); i > 0 {
|
||||
s.opts.RegisterInterval = i * time.Second
|
||||
}
|
||||
|
||||
// user action
|
||||
action(c)
|
||||
}
|
||||
|
||||
// Initialise the command flags, overriding new service
|
||||
_ = s.opts.Cmd.Init(
|
||||
cmd.Broker(&s.opts.Broker),
|
||||
@@ -105,7 +67,7 @@ func (s *service) Server() server.Server {
|
||||
}
|
||||
|
||||
func (s *service) String() string {
|
||||
return "go-micro"
|
||||
return "micro"
|
||||
}
|
||||
|
||||
func (s *service) Start() error {
|
||||
@@ -119,10 +81,6 @@ func (s *service) Start() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.opts.Server.Register(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, fn := range s.opts.AfterStart {
|
||||
if err := fn(); err != nil {
|
||||
return err
|
||||
@@ -141,10 +99,6 @@ func (s *service) Stop() error {
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.opts.Server.Deregister(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.opts.Server.Stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -163,10 +117,6 @@ func (s *service) Run() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// start reg loop
|
||||
ex := make(chan bool)
|
||||
go s.run(ex)
|
||||
|
||||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT)
|
||||
|
||||
@@ -177,8 +127,5 @@ func (s *service) Run() error {
|
||||
case <-s.opts.Context.Done():
|
||||
}
|
||||
|
||||
// exit reg loop
|
||||
close(ex)
|
||||
|
||||
return s.Stop()
|
||||
}
|
||||
|
Reference in New Issue
Block a user