Add implementation for internal handlers and subscribers. They are not advertised to discovery
This commit is contained in:
parent
7401c44973
commit
f812613973
@ -17,5 +17,5 @@ func (d *Debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto
|
||||
}
|
||||
|
||||
func registerDebugHandler(s Server) {
|
||||
s.Handle(s.NewHandler(&Debug{s.Options().DebugHandler}))
|
||||
s.Handle(s.NewHandler(&Debug{s.Options().DebugHandler}, InternalHandler(true)))
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ type Handler interface {
|
||||
Name() string
|
||||
Handler() interface{}
|
||||
Endpoints() []*registry.Endpoint
|
||||
Options() HandlerOptions
|
||||
}
|
||||
|
||||
// Subscriber interface represents a subscription to a given topic using
|
||||
@ -28,4 +29,30 @@ type Subscriber interface {
|
||||
Topic() string
|
||||
Subscriber() interface{}
|
||||
Endpoints() []*registry.Endpoint
|
||||
Options() SubscriberOptions
|
||||
}
|
||||
|
||||
type HandlerOptions struct {
|
||||
Internal bool
|
||||
}
|
||||
|
||||
type SubscriberOptions struct {
|
||||
Internal bool
|
||||
}
|
||||
|
||||
// Internal Handler options specifies that a handler is not advertised
|
||||
// to the discovery system. In the future this may also limit request
|
||||
// to the internal network or authorised user.
|
||||
func InternalHandler(b bool) HandlerOption {
|
||||
return func(o *HandlerOptions) {
|
||||
o.Internal = b
|
||||
}
|
||||
}
|
||||
|
||||
// Internal Subscriber options specifies that a subscriber is not advertised
|
||||
// to the discovery system.
|
||||
func InternalSubscriber(b bool) SubscriberOption {
|
||||
return func(o *SubscriberOptions) {
|
||||
o.Internal = b
|
||||
}
|
||||
}
|
||||
|
@ -10,9 +10,15 @@ type rpcHandler struct {
|
||||
name string
|
||||
handler interface{}
|
||||
endpoints []*registry.Endpoint
|
||||
opts HandlerOptions
|
||||
}
|
||||
|
||||
func newRpcHandler(handler interface{}) Handler {
|
||||
func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler {
|
||||
var options HandlerOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
typ := reflect.TypeOf(handler)
|
||||
hdlr := reflect.ValueOf(handler)
|
||||
name := reflect.Indirect(hdlr).Type().Name()
|
||||
@ -30,6 +36,7 @@ func newRpcHandler(handler interface{}) Handler {
|
||||
name: name,
|
||||
handler: handler,
|
||||
endpoints: endpoints,
|
||||
opts: options,
|
||||
}
|
||||
}
|
||||
|
||||
@ -44,3 +51,7 @@ func (r *rpcHandler) Handler() interface{} {
|
||||
func (r *rpcHandler) Endpoints() []*registry.Endpoint {
|
||||
return r.endpoints
|
||||
}
|
||||
|
||||
func (r *rpcHandler) Options() HandlerOptions {
|
||||
return r.opts
|
||||
}
|
||||
|
@ -114,8 +114,8 @@ func (s *rpcServer) Init(opts ...Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *rpcServer) NewHandler(h interface{}) Handler {
|
||||
return newRpcHandler(h)
|
||||
func (s *rpcServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
|
||||
return newRpcHandler(h, opts...)
|
||||
}
|
||||
|
||||
func (s *rpcServer) Handle(h Handler) error {
|
||||
@ -128,8 +128,8 @@ func (s *rpcServer) Handle(h Handler) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *rpcServer) NewSubscriber(topic string, sb interface{}) Subscriber {
|
||||
return newSubscriber(topic, sb)
|
||||
func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
|
||||
return newSubscriber(topic, sb, opts...)
|
||||
}
|
||||
|
||||
func (s *rpcServer) Subscribe(sb Subscriber) error {
|
||||
@ -199,10 +199,16 @@ func (s *rpcServer) Register() error {
|
||||
s.RLock()
|
||||
var endpoints []*registry.Endpoint
|
||||
for _, e := range s.handlers {
|
||||
endpoints = append(endpoints, e.Endpoints()...)
|
||||
// Only advertise non internal handlers
|
||||
if !e.Options().Internal {
|
||||
endpoints = append(endpoints, e.Endpoints()...)
|
||||
}
|
||||
}
|
||||
for e, _ := range s.subscribers {
|
||||
endpoints = append(endpoints, e.Endpoints()...)
|
||||
// Only advertise non internal subscribers
|
||||
if !e.Options().Internal {
|
||||
endpoints = append(endpoints, e.Endpoints()...)
|
||||
}
|
||||
}
|
||||
s.RUnlock()
|
||||
|
||||
|
@ -42,8 +42,8 @@ type Server interface {
|
||||
Options() Options
|
||||
Init(...Option) error
|
||||
Handle(Handler) error
|
||||
NewHandler(interface{}) Handler
|
||||
NewSubscriber(string, interface{}) Subscriber
|
||||
NewHandler(interface{}, ...HandlerOption) Handler
|
||||
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
|
||||
Subscribe(Subscriber) error
|
||||
Register() error
|
||||
Deregister() error
|
||||
@ -82,6 +82,10 @@ type Streamer interface {
|
||||
|
||||
type Option func(*Options)
|
||||
|
||||
type HandlerOption func(*HandlerOptions)
|
||||
|
||||
type SubscriberOption func(*SubscriberOptions)
|
||||
|
||||
var (
|
||||
DefaultAddress = ":0"
|
||||
DefaultName = "go-server"
|
||||
@ -110,8 +114,8 @@ func NewServer(opt ...Option) Server {
|
||||
|
||||
// Creates a new subscriber interface with the given topic
|
||||
// and handler using the default server
|
||||
func NewSubscriber(topic string, h interface{}) Subscriber {
|
||||
return DefaultServer.NewSubscriber(topic, h)
|
||||
func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber {
|
||||
return DefaultServer.NewSubscriber(topic, h, opts...)
|
||||
}
|
||||
|
||||
// Creates a new handler interface using the default server
|
||||
@ -124,8 +128,8 @@ func NewSubscriber(topic string, h interface{}) Subscriber {
|
||||
// return nil
|
||||
// }
|
||||
//
|
||||
func NewHandler(h interface{}) Handler {
|
||||
return DefaultServer.NewHandler(h)
|
||||
func NewHandler(h interface{}, opts ...HandlerOption) Handler {
|
||||
return DefaultServer.NewHandler(h, opts...)
|
||||
}
|
||||
|
||||
// Registers a handler interface with the default server to
|
||||
|
@ -29,9 +29,15 @@ type subscriber struct {
|
||||
subscriber interface{}
|
||||
handlers []*handler
|
||||
endpoints []*registry.Endpoint
|
||||
opts SubscriberOptions
|
||||
}
|
||||
|
||||
func newSubscriber(topic string, sub interface{}) Subscriber {
|
||||
func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
|
||||
var options SubscriberOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
var endpoints []*registry.Endpoint
|
||||
var handlers []*handler
|
||||
|
||||
@ -96,6 +102,7 @@ func newSubscriber(topic string, sub interface{}) Subscriber {
|
||||
subscriber: sub,
|
||||
handlers: handlers,
|
||||
endpoints: endpoints,
|
||||
opts: options,
|
||||
}
|
||||
}
|
||||
|
||||
@ -241,3 +248,7 @@ func (s *subscriber) Subscriber() interface{} {
|
||||
func (s *subscriber) Endpoints() []*registry.Endpoint {
|
||||
return s.endpoints
|
||||
}
|
||||
|
||||
func (s *subscriber) Options() SubscriberOptions {
|
||||
return s.opts
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user