| @@ -6,11 +6,13 @@ import ( | ||||
|  | ||||
| type serverKey struct{} | ||||
|  | ||||
| // FromContext returns Server from context | ||||
| func FromContext(ctx context.Context) (Server, bool) { | ||||
| 	c, ok := ctx.Value(serverKey{}).(Server) | ||||
| 	return c, ok | ||||
| } | ||||
|  | ||||
| // NewContext stores Server to context | ||||
| func NewContext(ctx context.Context, s Server) context.Context { | ||||
| 	return context.WithValue(ctx, serverKey{}, s) | ||||
| } | ||||
|   | ||||
| @@ -32,7 +32,7 @@ const ( | ||||
| 	defaultContentType = "application/json" | ||||
| ) | ||||
|  | ||||
| type NoopServer struct { | ||||
| type noopServer struct { | ||||
| 	h           Handler | ||||
| 	opts        Options | ||||
| 	rsvc        *registry.Service | ||||
| @@ -45,7 +45,12 @@ type NoopServer struct { | ||||
| 	sync.RWMutex | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) newCodec(contentType string) (codec.NewCodec, error) { | ||||
| // NewServer returns new noop server | ||||
| func NewServer(opts ...Option) Server { | ||||
| 	return &noopServer{opts: NewOptions(opts...)} | ||||
| } | ||||
|  | ||||
| func (n *noopServer) newCodec(contentType string) (codec.NewCodec, error) { | ||||
| 	if cf, ok := n.opts.Codecs[contentType]; ok { | ||||
| 		return cf, nil | ||||
| 	} | ||||
| @@ -55,12 +60,12 @@ func (n *NoopServer) newCodec(contentType string) (codec.NewCodec, error) { | ||||
| 	return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) Handle(handler Handler) error { | ||||
| func (n *noopServer) Handle(handler Handler) error { | ||||
| 	n.h = handler | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) Subscribe(sb Subscriber) error { | ||||
| func (n *noopServer) Subscribe(sb Subscriber) error { | ||||
| 	sub, ok := sb.(*subscriber) | ||||
| 	if !ok { | ||||
| 		return fmt.Errorf("invalid subscriber: expected *subscriber") | ||||
| @@ -84,15 +89,15 @@ func (n *NoopServer) Subscribe(sb Subscriber) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler { | ||||
| func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler { | ||||
| 	return newRpcHandler(h, opts...) | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber { | ||||
| func (n *noopServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber { | ||||
| 	return newSubscriber(topic, sb, opts...) | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) Init(opts ...Option) error { | ||||
| func (n *noopServer) Init(opts ...Option) error { | ||||
| 	for _, o := range opts { | ||||
| 		o(&n.opts) | ||||
| 	} | ||||
| @@ -110,15 +115,15 @@ func (n *NoopServer) Init(opts ...Option) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) Options() Options { | ||||
| func (n *noopServer) Options() Options { | ||||
| 	return n.opts | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) String() string { | ||||
| func (n *noopServer) String() string { | ||||
| 	return "noop" | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) Register() error { | ||||
| func (n *noopServer) Register() error { | ||||
| 	n.RLock() | ||||
| 	rsvc := n.rsvc | ||||
| 	config := n.opts | ||||
| @@ -233,7 +238,7 @@ func (n *NoopServer) Register() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) Deregister() error { | ||||
| func (n *noopServer) Deregister() error { | ||||
| 	var err error | ||||
|  | ||||
| 	n.RLock() | ||||
| @@ -293,7 +298,7 @@ func (n *NoopServer) Deregister() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) Start() error { | ||||
| func (n *noopServer) Start() error { | ||||
| 	n.RLock() | ||||
| 	if n.started { | ||||
| 		n.RUnlock() | ||||
| @@ -433,7 +438,7 @@ func (n *NoopServer) Start() error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) Stop() error { | ||||
| func (n *noopServer) Stop() error { | ||||
| 	n.RLock() | ||||
| 	if !n.started { | ||||
| 		n.RUnlock() | ||||
|   | ||||
| @@ -85,7 +85,7 @@ func NewOptions(opts ...Option) Options { | ||||
| 	return options | ||||
| } | ||||
|  | ||||
| // Server name | ||||
| // Name sets the server name option | ||||
| func Name(n string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.Name = n | ||||
| @@ -99,7 +99,7 @@ func Namespace(n string) Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Logger | ||||
| // Logger sets the logger option | ||||
| func Logger(l logger.Logger) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.Logger = l | ||||
| @@ -127,7 +127,7 @@ func Address(a string) Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // The address to advertise for discovery - host:port | ||||
| // Advertise the address to advertise for discovery - host:port | ||||
| func Advertise(a string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.Advertise = a | ||||
| @@ -199,14 +199,14 @@ func RegisterCheck(fn func(context.Context) error) Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Register the service with a TTL | ||||
| // RegisterTTL registers service with a TTL | ||||
| func RegisterTTL(t time.Duration) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.RegisterTTL = t | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Register the service with at interval | ||||
| // RegisterInterval registers service with at interval | ||||
| func RegisterInterval(t time.Duration) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.RegisterInterval = t | ||||
| @@ -250,28 +250,31 @@ func Wait(wg *sync.WaitGroup) Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Adds a handler Wrapper to a list of options passed into the server | ||||
| // WrapHandler adds a handler Wrapper to a list of options passed into the server | ||||
| func WrapHandler(w HandlerWrapper) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.HdlrWrappers = append(o.HdlrWrappers, w) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Adds a subscriber Wrapper to a list of options passed into the server | ||||
| // WrapSubscriber adds a subscriber Wrapper to a list of options passed into the server | ||||
| func WrapSubscriber(w SubscriberWrapper) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.SubWrappers = append(o.SubWrappers, w) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // HandlerOption func | ||||
| type HandlerOption func(*HandlerOptions) | ||||
|  | ||||
| // HandlerOptions struct | ||||
| type HandlerOptions struct { | ||||
| 	Internal bool | ||||
| 	Metadata map[string]map[string]string | ||||
| 	Context  context.Context | ||||
| } | ||||
|  | ||||
| // NewHandlerOptions creates new HandlerOptions | ||||
| func NewHandlerOptions(opts ...HandlerOption) HandlerOptions { | ||||
| 	options := HandlerOptions{ | ||||
| 		Context: context.Background(), | ||||
| @@ -284,8 +287,10 @@ func NewHandlerOptions(opts ...HandlerOption) HandlerOptions { | ||||
| 	return options | ||||
| } | ||||
|  | ||||
| // SubscriberOption func | ||||
| type SubscriberOption func(*SubscriberOptions) | ||||
|  | ||||
| // SubscriberOptions struct | ||||
| type SubscriberOptions struct { | ||||
| 	// AutoAck defaults to true. When a handler returns | ||||
| 	// with a nil error the message is acked. | ||||
| @@ -295,6 +300,7 @@ type SubscriberOptions struct { | ||||
| 	Context  context.Context | ||||
| } | ||||
|  | ||||
| // NewSubscriberOptions create new SubscriberOptions | ||||
| func NewSubscriberOptions(opts ...SubscriberOption) SubscriberOptions { | ||||
| 	options := SubscriberOptions{ | ||||
| 		AutoAck: true, | ||||
| @@ -316,7 +322,7 @@ func EndpointMetadata(name string, md map[string]string) HandlerOption { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Internal Handler options specifies that a handler is not advertised | ||||
| // InternalHandler options specifies that a handler is not advertised | ||||
| // to the discovery system. In the future this may also limit request | ||||
| // to the internal network or authorised user. | ||||
| func InternalHandler(b bool) HandlerOption { | ||||
| @@ -325,7 +331,7 @@ func InternalHandler(b bool) HandlerOption { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Internal Subscriber options specifies that a subscriber is not advertised | ||||
| // InternalSubscriber options specifies that a subscriber is not advertised | ||||
| // to the discovery system. | ||||
| func InternalSubscriber(b bool) SubscriberOption { | ||||
| 	return func(o *SubscriberOptions) { | ||||
| @@ -341,7 +347,7 @@ func DisableAutoAck() SubscriberOption { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Shared queue name distributed messages across subscribers | ||||
| // SubscriberQueue sets the shared queue name distributed messages across subscribers | ||||
| func SubscriberQueue(n string) SubscriberOption { | ||||
| 	return func(o *SubscriberOptions) { | ||||
| 		o.Queue = n | ||||
|   | ||||
| @@ -52,6 +52,7 @@ var ( | ||||
| 	} | ||||
| ) | ||||
|  | ||||
| // NewRegistryService returns *registry.Service from Server | ||||
| func NewRegistryService(s Server) (*registry.Service, error) { | ||||
| 	opts := s.Options() | ||||
|  | ||||
|   | ||||
| @@ -11,7 +11,7 @@ import ( | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	DefaultServer Server = &NoopServer{opts: NewOptions()} | ||||
| 	DefaultServer Server = NewServer() | ||||
| ) | ||||
|  | ||||
| // Server is a simple micro server abstraction | ||||
|   | ||||
| @@ -59,6 +59,7 @@ func isExportedOrBuiltinType(t reflect.Type) bool { | ||||
| 	return isExported(t.Name()) || t.PkgPath() == "" | ||||
| } | ||||
|  | ||||
| // ValidateSubscriber func | ||||
| func ValidateSubscriber(sub Subscriber) error { | ||||
| 	typ := reflect.TypeOf(sub.Subscriber()) | ||||
| 	var argType reflect.Type | ||||
| @@ -184,7 +185,7 @@ func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subs | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (n *NoopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { | ||||
| func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { | ||||
| 	return func(p broker.Event) (err error) { | ||||
| 		defer func() { | ||||
| 			if r := recover(); r != nil { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user