Merge pull request #42 from micro/internal_handler

Add implementation for internal handlers and subscribers.
This commit is contained in:
Asim 2016-01-08 15:00:53 +00:00
commit fb25558142
6 changed files with 74 additions and 15 deletions

View File

@ -17,5 +17,5 @@ func (d *Debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto
} }
func registerDebugHandler(s Server) { func registerDebugHandler(s Server) {
s.Handle(s.NewHandler(&Debug{s.Options().DebugHandler})) s.Handle(s.NewHandler(&Debug{s.Options().DebugHandler}, InternalHandler(true)))
} }

View File

@ -20,6 +20,7 @@ type Handler interface {
Name() string Name() string
Handler() interface{} Handler() interface{}
Endpoints() []*registry.Endpoint Endpoints() []*registry.Endpoint
Options() HandlerOptions
} }
// Subscriber interface represents a subscription to a given topic using // Subscriber interface represents a subscription to a given topic using
@ -28,4 +29,30 @@ type Subscriber interface {
Topic() string Topic() string
Subscriber() interface{} Subscriber() interface{}
Endpoints() []*registry.Endpoint Endpoints() []*registry.Endpoint
Options() SubscriberOptions
}
type HandlerOptions struct {
Internal bool
}
type SubscriberOptions struct {
Internal bool
}
// Internal Handler options specifies that a handler is not advertised
// to the discovery system. In the future this may also limit request
// to the internal network or authorised user.
func InternalHandler(b bool) HandlerOption {
return func(o *HandlerOptions) {
o.Internal = b
}
}
// Internal Subscriber options specifies that a subscriber is not advertised
// to the discovery system.
func InternalSubscriber(b bool) SubscriberOption {
return func(o *SubscriberOptions) {
o.Internal = b
}
} }

View File

@ -10,9 +10,15 @@ type rpcHandler struct {
name string name string
handler interface{} handler interface{}
endpoints []*registry.Endpoint endpoints []*registry.Endpoint
opts HandlerOptions
} }
func newRpcHandler(handler interface{}) Handler { func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler {
var options HandlerOptions
for _, o := range opts {
o(&options)
}
typ := reflect.TypeOf(handler) typ := reflect.TypeOf(handler)
hdlr := reflect.ValueOf(handler) hdlr := reflect.ValueOf(handler)
name := reflect.Indirect(hdlr).Type().Name() name := reflect.Indirect(hdlr).Type().Name()
@ -30,6 +36,7 @@ func newRpcHandler(handler interface{}) Handler {
name: name, name: name,
handler: handler, handler: handler,
endpoints: endpoints, endpoints: endpoints,
opts: options,
} }
} }
@ -44,3 +51,7 @@ func (r *rpcHandler) Handler() interface{} {
func (r *rpcHandler) Endpoints() []*registry.Endpoint { func (r *rpcHandler) Endpoints() []*registry.Endpoint {
return r.endpoints return r.endpoints
} }
func (r *rpcHandler) Options() HandlerOptions {
return r.opts
}

View File

@ -114,8 +114,8 @@ func (s *rpcServer) Init(opts ...Option) error {
return nil return nil
} }
func (s *rpcServer) NewHandler(h interface{}) Handler { func (s *rpcServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
return newRpcHandler(h) return newRpcHandler(h, opts...)
} }
func (s *rpcServer) Handle(h Handler) error { func (s *rpcServer) Handle(h Handler) error {
@ -128,8 +128,8 @@ func (s *rpcServer) Handle(h Handler) error {
return nil return nil
} }
func (s *rpcServer) NewSubscriber(topic string, sb interface{}) Subscriber { func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
return newSubscriber(topic, sb) return newSubscriber(topic, sb, opts...)
} }
func (s *rpcServer) Subscribe(sb Subscriber) error { func (s *rpcServer) Subscribe(sb Subscriber) error {
@ -199,11 +199,17 @@ func (s *rpcServer) Register() error {
s.RLock() s.RLock()
var endpoints []*registry.Endpoint var endpoints []*registry.Endpoint
for _, e := range s.handlers { for _, e := range s.handlers {
// Only advertise non internal handlers
if !e.Options().Internal {
endpoints = append(endpoints, e.Endpoints()...) endpoints = append(endpoints, e.Endpoints()...)
} }
}
for e, _ := range s.subscribers { for e, _ := range s.subscribers {
// Only advertise non internal subscribers
if !e.Options().Internal {
endpoints = append(endpoints, e.Endpoints()...) endpoints = append(endpoints, e.Endpoints()...)
} }
}
s.RUnlock() s.RUnlock()
service := &registry.Service{ service := &registry.Service{

View File

@ -42,8 +42,8 @@ type Server interface {
Options() Options Options() Options
Init(...Option) error Init(...Option) error
Handle(Handler) error Handle(Handler) error
NewHandler(interface{}) Handler NewHandler(interface{}, ...HandlerOption) Handler
NewSubscriber(string, interface{}) Subscriber NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
Subscribe(Subscriber) error Subscribe(Subscriber) error
Register() error Register() error
Deregister() error Deregister() error
@ -82,6 +82,10 @@ type Streamer interface {
type Option func(*Options) type Option func(*Options)
type HandlerOption func(*HandlerOptions)
type SubscriberOption func(*SubscriberOptions)
var ( var (
DefaultAddress = ":0" DefaultAddress = ":0"
DefaultName = "go-server" DefaultName = "go-server"
@ -110,8 +114,8 @@ func NewServer(opt ...Option) Server {
// Creates a new subscriber interface with the given topic // Creates a new subscriber interface with the given topic
// and handler using the default server // and handler using the default server
func NewSubscriber(topic string, h interface{}) Subscriber { func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber {
return DefaultServer.NewSubscriber(topic, h) return DefaultServer.NewSubscriber(topic, h, opts...)
} }
// Creates a new handler interface using the default server // Creates a new handler interface using the default server
@ -124,8 +128,8 @@ func NewSubscriber(topic string, h interface{}) Subscriber {
// return nil // return nil
// } // }
// //
func NewHandler(h interface{}) Handler { func NewHandler(h interface{}, opts ...HandlerOption) Handler {
return DefaultServer.NewHandler(h) return DefaultServer.NewHandler(h, opts...)
} }
// Registers a handler interface with the default server to // Registers a handler interface with the default server to

View File

@ -29,9 +29,15 @@ type subscriber struct {
subscriber interface{} subscriber interface{}
handlers []*handler handlers []*handler
endpoints []*registry.Endpoint endpoints []*registry.Endpoint
opts SubscriberOptions
} }
func newSubscriber(topic string, sub interface{}) Subscriber { func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
var options SubscriberOptions
for _, o := range opts {
o(&options)
}
var endpoints []*registry.Endpoint var endpoints []*registry.Endpoint
var handlers []*handler var handlers []*handler
@ -96,6 +102,7 @@ func newSubscriber(topic string, sub interface{}) Subscriber {
subscriber: sub, subscriber: sub,
handlers: handlers, handlers: handlers,
endpoints: endpoints, endpoints: endpoints,
opts: options,
} }
} }
@ -241,3 +248,7 @@ func (s *subscriber) Subscriber() interface{} {
func (s *subscriber) Endpoints() []*registry.Endpoint { func (s *subscriber) Endpoints() []*registry.Endpoint {
return s.endpoints return s.endpoints
} }
func (s *subscriber) Options() SubscriberOptions {
return s.opts
}