many improvements with options and noop stuff

* add many options helpers
* fix noop client to allow publish messages to topic in broker
* fix noop server to allow registering in registry
* fix noop server to allow subscribe to topic in broker
* fix new service initialization

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2020-10-16 09:38:57 +03:00
parent a59aae760f
commit 14c97d59c1
39 changed files with 1384 additions and 432 deletions

View File

@@ -1,94 +1,59 @@
package server
import "context"
import (
"reflect"
type HandlerOption func(*HandlerOptions)
"github.com/unistack-org/micro/v3/registry"
)
type HandlerOptions struct {
Internal bool
Metadata map[string]map[string]string
Context context.Context
type rpcHandler struct {
name string
handler interface{}
endpoints []*registry.Endpoint
opts HandlerOptions
}
func NewHandlerOptions(opts ...HandlerOption) HandlerOptions {
options := HandlerOptions{
Context: context.Background(),
func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler {
options := NewHandlerOptions(opts...)
typ := reflect.TypeOf(handler)
hdlr := reflect.ValueOf(handler)
name := reflect.Indirect(hdlr).Type().Name()
var endpoints []*registry.Endpoint
for m := 0; m < typ.NumMethod(); m++ {
if e := registry.ExtractEndpoint(typ.Method(m)); e != nil {
e.Name = name + "." + e.Name
for k, v := range options.Metadata[e.Name] {
e.Metadata[k] = v
}
endpoints = append(endpoints, e)
}
}
for _, o := range opts {
o(&options)
}
return options
}
type SubscriberOption func(*SubscriberOptions)
type SubscriberOptions struct {
// AutoAck defaults to true. When a handler returns
// with a nil error the message is acked.
AutoAck bool
Queue string
Internal bool
Context context.Context
}
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
options := SubscriberOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
// EndpointMetadata is a Handler option that allows metadata to be added to
// individual endpoints.
func EndpointMetadata(name string, md map[string]string) HandlerOption {
return func(o *HandlerOptions) {
o.Metadata[name] = md
return &rpcHandler{
name: name,
handler: handler,
endpoints: endpoints,
opts: options,
}
}
// Internal Handler options specifies that a handler is not advertised
// to the discovery system. In the future this may also limit request
// to the internal network or authorised user.
func InternalHandler(b bool) HandlerOption {
return func(o *HandlerOptions) {
o.Internal = b
}
func (r *rpcHandler) Name() string {
return r.name
}
// Internal Subscriber options specifies that a subscriber is not advertised
// to the discovery system.
func InternalSubscriber(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Internal = b
}
func (r *rpcHandler) Handler() interface{} {
return r.handler
}
// DisableAutoAck will disable auto acking of messages
// after they have been handled.
func DisableAutoAck() SubscriberOption {
return func(o *SubscriberOptions) {
o.AutoAck = false
}
func (r *rpcHandler) Endpoints() []*registry.Endpoint {
return r.endpoints
}
// Shared queue name distributed messages across subscribers
func SubscriberQueue(n string) SubscriberOption {
return func(o *SubscriberOptions) {
o.Queue = n
}
}
// SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberContext(ctx context.Context) SubscriberOption {
return func(o *SubscriberOptions) {
o.Context = ctx
}
func (r *rpcHandler) Options() HandlerOptions {
return r.opts
}

View File

@@ -1,108 +1,462 @@
package server
import "github.com/unistack-org/micro/v3/registry"
import (
"bytes"
"fmt"
"sort"
"sync"
"time"
type noopServer struct {
h Handler
opts Options
craw "github.com/unistack-org/micro-codec-bytes"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/codec"
cjson "github.com/unistack-org/micro/v3/codec/json"
cjsonrpc "github.com/unistack-org/micro/v3/codec/jsonrpc"
cproto "github.com/unistack-org/micro/v3/codec/proto"
cprotorpc "github.com/unistack-org/micro/v3/codec/protorpc"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/registry"
)
var (
DefaultCodecs = map[string]codec.NewCodec{
"application/json": cjson.NewCodec,
"application/json-rpc": cjsonrpc.NewCodec,
"application/protobuf": cproto.NewCodec,
"application/proto-rpc": cprotorpc.NewCodec,
"application/octet-stream": craw.NewCodec,
}
)
const (
defaultContentType = "application/json"
)
type NoopServer struct {
h Handler
opts Options
rsvc *registry.Service
handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber
registered bool
started bool
exit chan chan error
wg *sync.WaitGroup
sync.RWMutex
}
type noopHandler struct {
opts HandlerOptions
h interface{}
func (n *NoopServer) newCodec(contentType string) (codec.NewCodec, error) {
if cf, ok := n.opts.Codecs[contentType]; ok {
return cf, nil
}
if cf, ok := DefaultCodecs[contentType]; ok {
return cf, nil
}
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
}
type noopSubscriber struct {
topic string
opts SubscriberOptions
h interface{}
}
func (n *noopSubscriber) Topic() string {
return n.topic
}
func (n *noopSubscriber) Subscriber() interface{} {
return n.h
}
func (n *noopSubscriber) Endpoints() []*registry.Endpoint {
return nil
}
func (n *noopSubscriber) Options() SubscriberOptions {
return n.opts
}
func (n *noopHandler) Endpoints() []*registry.Endpoint {
return nil
}
func (n *noopHandler) Handler() interface{} {
return nil
}
func (n *noopHandler) Options() HandlerOptions {
return n.opts
}
func (n *noopHandler) Name() string {
return "noop"
}
func (n *noopServer) Handle(handler Handler) error {
func (n *NoopServer) Handle(handler Handler) error {
n.h = handler
return nil
}
func (n *noopServer) Subscribe(subscriber Subscriber) error {
// n.s = handler
func (n *NoopServer) Subscribe(sb Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := ValidateSubscriber(sb); err != nil {
return err
}
n.Lock()
if _, ok = n.subscribers[sub]; ok {
n.Unlock()
return fmt.Errorf("subscriber %v already exists", sub)
}
n.subscribers[sub] = nil
n.Unlock()
return nil
}
func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
options := NewHandlerOptions()
for _, o := range opts {
o(&options)
}
return &noopHandler{opts: options, h: h}
func (n *NoopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
return newRpcHandler(h, opts...)
}
func (n *noopServer) NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber {
options := NewSubscriberOptions()
for _, o := range opts {
o(&options)
}
return &noopSubscriber{topic: topic, opts: options, h: h}
func (n *NoopServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
return newSubscriber(topic, sb, opts...)
}
func (n *noopServer) Init(opts ...Option) error {
func (n *NoopServer) Init(opts ...Option) error {
for _, o := range opts {
o(&n.opts)
}
if n.handlers == nil {
n.handlers = make(map[string]Handler)
}
if n.subscribers == nil {
n.subscribers = make(map[*subscriber][]broker.Subscriber)
}
if n.exit == nil {
n.exit = make(chan chan error)
}
return nil
}
func (n *noopServer) Start() error {
return nil
}
func (n *noopServer) Stop() error {
return nil
}
func (n *noopServer) Options() Options {
func (n *NoopServer) Options() Options {
return n.opts
}
func (n *noopServer) String() string {
func (n *NoopServer) String() string {
return "noop"
}
func newServer(opts ...Option) Server {
options := NewOptions()
for _, o := range opts {
o(&options)
func (n *NoopServer) Register() error {
n.RLock()
rsvc := n.rsvc
config := n.opts
n.RUnlock()
// if service already filled, reuse it and return early
if rsvc != nil {
if err := DefaultRegisterFunc(rsvc, config); err != nil {
return err
}
return nil
}
return &noopServer{opts: options}
var err error
var service *registry.Service
var cacheService bool
service, err = NewRegistryService(n)
if err != nil {
return err
}
n.RLock()
// Maps are ordered randomly, sort the keys for consistency
var handlerList []string
for n, e := range n.handlers {
// Only advertise non internal handlers
if !e.Options().Internal {
handlerList = append(handlerList, n)
}
}
sort.Strings(handlerList)
var subscriberList []*subscriber
for e := range n.subscribers {
// Only advertise non internal subscribers
if !e.Options().Internal {
subscriberList = append(subscriberList, e)
}
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList))
for _, h := range handlerList {
endpoints = append(endpoints, n.handlers[h].Endpoints()...)
}
for _, e := range subscriberList {
endpoints = append(endpoints, e.Endpoints()...)
}
n.RUnlock()
service.Nodes[0].Metadata["protocol"] = "noop"
service.Nodes[0].Metadata["transport"] = "noop"
service.Endpoints = endpoints
n.RLock()
registered := n.registered
n.RUnlock()
if !registered {
if logger.V(logger.InfoLevel) {
logger.Infof("Registry [%s] Registering node: %s", config.Registry.String(), service.Nodes[0].Id)
}
}
// register the service
if err := DefaultRegisterFunc(service, config); err != nil {
return err
}
// already registered? don't need to register subscribers
if registered {
return nil
}
n.Lock()
defer n.Unlock()
cx := config.Context
for sb := range n.subscribers {
handler := n.createSubHandler(sb, config)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if sb.Options().Context != nil {
cx = sb.Options().Context
}
opts = append(opts, broker.SubscribeContext(cx), broker.SubscribeAutoAck(sb.Options().AutoAck))
if logger.V(logger.InfoLevel) {
logger.Infof("Subscribing to topic: %s", sb.Topic())
}
sub, err := config.Broker.Subscribe(cx, sb.Topic(), handler, opts...)
if err != nil {
return err
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
n.registered = true
if cacheService {
n.rsvc = service
}
return nil
}
func (n *NoopServer) Deregister() error {
var err error
n.RLock()
config := n.opts
n.RUnlock()
service, err := NewRegistryService(n)
if err != nil {
return err
}
if logger.V(logger.InfoLevel) {
logger.Infof("Deregistering node: %s", service.Nodes[0].Id)
}
if err := DefaultDeregisterFunc(service, config); err != nil {
return err
}
n.Lock()
n.rsvc = nil
if !n.registered {
n.Unlock()
return nil
}
n.registered = false
cx := config.Context
wg := sync.WaitGroup{}
for sb, subs := range n.subscribers {
for _, sub := range subs {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
wg.Add(1)
go func(s broker.Subscriber) {
defer wg.Done()
if logger.V(logger.InfoLevel) {
logger.Infof("Unsubscribing from topic: %s", s.Topic())
}
if err := s.Unsubscribe(cx); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Unsubscribing from topic: %s err: %v", s.Topic(), err)
}
}
}(sub)
}
n.subscribers[sb] = nil
}
wg.Wait()
n.Unlock()
return nil
}
func (n *NoopServer) Start() error {
n.RLock()
if n.started {
n.RUnlock()
return nil
}
config := n.Options()
n.RUnlock()
if logger.V(logger.InfoLevel) {
logger.Infof("Server [noop] Listening on %s", config.Address)
}
n.Lock()
if len(config.Advertise) == 0 {
config.Advertise = config.Address
}
n.Unlock()
// only connect if we're subscribed
if len(n.subscribers) > 0 {
// connect to the broker
if err := config.Broker.Connect(config.Context); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Broker [%s] connect error: %v", config.Broker.String(), err)
}
return err
}
if logger.V(logger.InfoLevel) {
logger.Infof("Broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address())
}
}
// use RegisterCheck func before register
if err := config.RegisterCheck(config.Context); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
}
} else {
// announce self to the world
if err := n.Register(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Server register error: %v", err)
}
}
}
go func() {
t := new(time.Ticker)
// only process if it exists
if config.RegisterInterval > time.Duration(0) {
// new ticker
t = time.NewTicker(config.RegisterInterval)
}
// return error chan
var ch chan error
Loop:
for {
select {
// register self on interval
case <-t.C:
n.RLock()
registered := n.registered
n.RUnlock()
rerr := config.RegisterCheck(config.Context)
if rerr != nil && registered {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
}
// deregister self in case of error
if err := n.Deregister(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
}
}
} else if rerr != nil && !registered {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, rerr)
}
continue
}
if err := n.Register(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
}
}
// wait for exit
case ch = <-n.exit:
break Loop
}
}
// deregister self
if err := n.Deregister(); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Error("Server deregister error: ", err)
}
}
// wait for waitgroup
if n.wg != nil {
n.wg.Wait()
}
// stop the grpc server
exit := make(chan bool)
go func() {
close(exit)
}()
select {
case <-exit:
}
// close transport
ch <- nil
if logger.V(logger.InfoLevel) {
logger.Infof("Broker [%s] Disconnected from %s", config.Broker.String(), config.Broker.Address())
}
// disconnect broker
if err := config.Broker.Disconnect(config.Context); err != nil {
if logger.V(logger.ErrorLevel) {
logger.Errorf("Broker [%s] disconnect error: %v", config.Broker.String(), err)
}
}
}()
// mark the server as started
n.Lock()
n.started = true
n.Unlock()
return nil
}
func (n *NoopServer) Stop() error {
n.RLock()
if !n.started {
n.RUnlock()
return nil
}
n.RUnlock()
ch := make(chan error)
n.exit <- ch
err := <-ch
n.Lock()
n.rsvc = nil
n.started = false
n.Unlock()
return err
}
type noopcodec struct {
*bytes.Buffer
}
func (c noopcodec) Close() error {
return nil
}

View File

@@ -39,6 +39,10 @@ type Options struct {
RegisterTTL time.Duration
// The interval on which to register
RegisterInterval time.Duration
// RegisterAttempts specify how many times try to register
RegisterAttempts int
// DeegisterAttempts specify how many times try to deregister
DeregisterAttempts int
// The router for requests
Router Router
@@ -61,6 +65,8 @@ func NewOptions(opts ...Option) Options {
RegisterInterval: DefaultRegisterInterval,
RegisterTTL: DefaultRegisterTTL,
RegisterCheck: DefaultRegisterCheck,
Logger: logger.DefaultLogger,
Tracer: tracer.DefaultTracer,
Broker: broker.DefaultBroker,
Registry: registry.DefaultRegistry,
Address: DefaultAddress,
@@ -255,3 +261,94 @@ func WrapSubscriber(w SubscriberWrapper) Option {
o.SubWrappers = append(o.SubWrappers, w)
}
}
type HandlerOption func(*HandlerOptions)
type HandlerOptions struct {
Internal bool
Metadata map[string]map[string]string
Context context.Context
}
func NewHandlerOptions(opts ...HandlerOption) HandlerOptions {
options := HandlerOptions{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
type SubscriberOption func(*SubscriberOptions)
type SubscriberOptions struct {
// AutoAck defaults to true. When a handler returns
// with a nil error the message is acked.
AutoAck bool
Queue string
Internal bool
Context context.Context
}
func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions {
options := SubscriberOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
return options
}
// EndpointMetadata is a Handler option that allows metadata to be added to
// individual endpoints.
func EndpointMetadata(name string, md map[string]string) HandlerOption {
return func(o *HandlerOptions) {
o.Metadata[name] = md
}
}
// Internal Handler options specifies that a handler is not advertised
// to the discovery system. In the future this may also limit request
// to the internal network or authorised user.
func InternalHandler(b bool) HandlerOption {
return func(o *HandlerOptions) {
o.Internal = b
}
}
// Internal Subscriber options specifies that a subscriber is not advertised
// to the discovery system.
func InternalSubscriber(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Internal = b
}
}
// DisableAutoAck will disable auto acking of messages
// after they have been handled.
func DisableAutoAck() SubscriberOption {
return func(o *SubscriberOptions) {
o.AutoAck = false
}
}
// Shared queue name distributed messages across subscribers
func SubscriberQueue(n string) SubscriberOption {
return func(o *SubscriberOptions) {
o.Queue = n
}
}
// SubscriberContext set context options to allow broker SubscriberOption passed
func SubscriberContext(ctx context.Context) SubscriberOption {
return func(o *SubscriberOptions) {
o.Context = ctx
}
}

89
server/registry.go Normal file
View File

@@ -0,0 +1,89 @@
package server
import (
"net"
"time"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/registry"
"github.com/unistack-org/micro/v3/util/addr"
"github.com/unistack-org/micro/v3/util/backoff"
)
var (
// DefaultRegisterFunc uses backoff to register service
DefaultRegisterFunc = func(service *registry.Service, config Options) error {
var err error
opts := []registry.RegisterOption{
registry.RegisterTTL(config.RegisterTTL),
registry.RegisterDomain(config.Namespace),
}
for i := 0; i <= config.RegisterAttempts; i++ {
err = config.Registry.Register(service, opts...)
if err == nil {
break
}
// backoff then retry
time.Sleep(backoff.Do(i + 1))
continue
}
return err
}
// DefaultDeregisterFunc uses backoff to deregister service
DefaultDeregisterFunc = func(service *registry.Service, config Options) error {
var err error
opts := []registry.DeregisterOption{
registry.DeregisterDomain(config.Namespace),
}
for i := 0; i <= config.DeregisterAttempts; i++ {
err = config.Registry.Deregister(service, opts...)
if err == nil {
break
}
// backoff then retry
time.Sleep(backoff.Do(i + 1))
continue
}
return err
}
)
func NewRegistryService(s Server) (*registry.Service, error) {
opts := s.Options()
advt := opts.Address
if len(opts.Advertise) > 0 {
advt = opts.Advertise
}
host, port, err := net.SplitHostPort(advt)
if err != nil {
return nil, err
}
addr, err := addr.Extract(host)
if err != nil {
addr = host
}
node := &registry.Node{
Id: opts.Name + "-" + opts.Id,
Address: net.JoinHostPort(addr, port),
}
node.Metadata = metadata.Copy(opts.Metadata)
node.Metadata["server"] = s.String()
node.Metadata["broker"] = opts.Broker.String()
node.Metadata["registry"] = opts.Registry.String()
return &registry.Service{
Name: opts.Name,
Version: opts.Version,
Nodes: []*registry.Node{node},
Metadata: metadata.New(0),
}, nil
}

90
server/request.go Normal file
View File

@@ -0,0 +1,90 @@
package server
import (
raw "github.com/unistack-org/micro-codec-bytes"
"github.com/unistack-org/micro/v3/codec"
)
type rpcRequest struct {
service string
method string
contentType string
codec codec.Codec
header map[string]string
body []byte
stream bool
payload interface{}
}
type rpcMessage struct {
topic string
contentType string
payload interface{}
header map[string]string
body []byte
codec codec.Codec
}
func (r *rpcRequest) ContentType() string {
return r.contentType
}
func (r *rpcRequest) Service() string {
return r.service
}
func (r *rpcRequest) Method() string {
return r.method
}
func (r *rpcRequest) Endpoint() string {
return r.method
}
func (r *rpcRequest) Codec() codec.Reader {
return r.codec
}
func (r *rpcRequest) Header() map[string]string {
return r.header
}
func (r *rpcRequest) Read() ([]byte, error) {
f := &raw.Frame{}
if err := r.codec.ReadBody(f); err != nil {
return nil, err
}
return f.Data, nil
}
func (r *rpcRequest) Stream() bool {
return r.stream
}
func (r *rpcRequest) Body() interface{} {
return r.payload
}
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcMessage) Payload() interface{} {
return r.payload
}
func (r *rpcMessage) Header() map[string]string {
return r.header
}
func (r *rpcMessage) Body() []byte {
return r.body
}
func (r *rpcMessage) Codec() codec.Reader {
return r.codec
}

View File

@@ -11,7 +11,7 @@ import (
)
var (
DefaultServer Server = newServer()
DefaultServer Server = &NoopServer{opts: NewOptions()}
)
// Server is a simple micro server abstraction

View File

@@ -1,10 +1,20 @@
package server
import (
"bytes"
"context"
"fmt"
"reflect"
"runtime/debug"
"strings"
"unicode"
"unicode/utf8"
"github.com/unistack-org/micro/v3/broker"
"github.com/unistack-org/micro/v3/errors"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/registry"
)
const (
@@ -17,6 +27,22 @@ var (
typeOfError = reflect.TypeOf((*error)(nil)).Elem()
)
type handler struct {
method reflect.Value
reqType reflect.Type
ctxType reflect.Type
}
type subscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*registry.Endpoint
opts SubscriberOptions
}
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
@@ -86,3 +112,200 @@ func ValidateSubscriber(sub Subscriber) error {
return nil
}
func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
var endpoints []*registry.Endpoint
var handlers []*handler
options := NewSubscriberOptions(opts...)
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
h := &handler{
method: reflect.ValueOf(sub),
}
switch typ.NumIn() {
case 1:
h.reqType = typ.In(0)
case 2:
h.ctxType = typ.In(0)
h.reqType = typ.In(1)
}
handlers = append(handlers, h)
endpoints = append(endpoints, &registry.Endpoint{
Name: "Func",
Request: registry.ExtractSubValue(typ),
Metadata: map[string]string{
"topic": topic,
"subscriber": "true",
},
})
} else {
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
method: method.Func,
}
switch method.Type.NumIn() {
case 2:
h.reqType = method.Type.In(1)
case 3:
h.ctxType = method.Type.In(1)
h.reqType = method.Type.In(2)
}
handlers = append(handlers, h)
endpoints = append(endpoints, &registry.Endpoint{
Name: name + "." + method.Name,
Request: registry.ExtractSubValue(method.Type),
Metadata: map[string]string{
"topic": topic,
"subscriber": "true",
},
})
}
}
return &subscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
subscriber: sub,
handlers: handlers,
endpoints: endpoints,
opts: options,
}
}
func (n *NoopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) (err error) {
defer func() {
if r := recover(); r != nil {
if logger.V(logger.ErrorLevel) {
logger.Error("panic recovered: ", r)
logger.Error(string(debug.Stack()))
}
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
}
}()
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = make(map[string]string)
}
ct := msg.Header["Content-Type"]
if len(ct) == 0 {
msg.Header["Content-Type"] = defaultContentType
ct = defaultContentType
}
cf, err := n.newCodec(ct)
if err != nil {
return err
}
hdr := make(map[string]string, len(msg.Header))
for k, v := range msg.Header {
hdr[k] = v
}
delete(hdr, "Content-Type")
ctx := metadata.NewContext(sb.opts.Context, hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
if isVal {
req = req.Elem()
}
if err = cf(noopcodec{bytes.NewBuffer(msg.Body)}).ReadBody(req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Payload()))
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
}
return nil
}
for i := len(opts.SubWrappers); i > 0; i-- {
fn = opts.SubWrappers[i-1](fn)
}
if n.wg != nil {
n.wg.Add(1)
}
go func() {
if n.wg != nil {
defer n.wg.Done()
}
err := fn(ctx, &rpcMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
header: msg.Header,
body: msg.Body,
})
results <- err
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
}
}
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}
func (s *subscriber) Topic() string {
return s.topic
}
func (s *subscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *subscriber) Endpoints() []*registry.Endpoint {
return s.endpoints
}
func (s *subscriber) Options() SubscriberOptions {
return s.opts
}