Merge pull request #34 from micro/options

Public Options so people can actually implement the interfaces
This commit is contained in:
Asim 2015-12-31 18:19:12 +00:00
commit 851b62c904
20 changed files with 229 additions and 269 deletions

View File

@ -93,7 +93,7 @@ type Example struct{}
func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error {
md, _ := c.GetMetadata(ctx) md, _ := c.GetMetadata(ctx)
log.Infof("Received Example.Call request with metadata: %v", md) log.Infof("Received Example.Call request with metadata: %v", md)
rsp.Msg = server.Config().Id() + ": Hello " + req.Name rsp.Msg = server.Options().Id + ": Hello " + req.Name
return nil return nil
} }
``` ```

View File

@ -1,6 +1,7 @@
package broker package broker
type Broker interface { type Broker interface {
Options() Options
Address() string Address() string
Connect() error Connect() error
Disconnect() error Disconnect() error
@ -28,7 +29,7 @@ type Publication interface {
} }
type Subscriber interface { type Subscriber interface {
Config() SubscribeOptions Options() SubscribeOptions
Topic() string Topic() string
Unsubscribe() error Unsubscribe() error
} }

View File

@ -28,6 +28,7 @@ type httpBroker struct {
id string id string
address string address string
unsubscribe chan *httpSubscriber unsubscribe chan *httpSubscriber
opts Options
sync.RWMutex sync.RWMutex
subscribers map[string][]*httpSubscriber subscribers map[string][]*httpSubscriber
@ -85,7 +86,7 @@ func (h *httpPublication) Topic() string {
return h.t return h.t
} }
func (h *httpSubscriber) Config() SubscribeOptions { func (h *httpSubscriber) Options() SubscribeOptions {
return h.opts return h.opts
} }
@ -213,6 +214,10 @@ func (h *httpBroker) Init(opts ...Option) error {
return nil return nil
} }
func (h *httpBroker) Options() Options {
return h.opts
}
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
s, err := registry.GetService("topic:" + topic) s, err := registry.GetService("topic:" + topic)
if err != nil { if err != nil {

View File

@ -1,8 +1,15 @@
package broker package broker
type Options struct{} type Options struct {
type PublishOptions struct{} // Other options to be used by broker implementations
Options map[string]string
}
type PublishOptions struct {
// Other options to be used by broker implementations
Options map[string]string
}
type SubscribeOptions struct { type SubscribeOptions struct {
// AutoAck defaults to true. When a handler returns // AutoAck defaults to true. When a handler returns
@ -12,6 +19,9 @@ type SubscribeOptions struct {
// will create a shared subscription where each // will create a shared subscription where each
// receives a subset of messages. // receives a subset of messages.
Queue string Queue string
// Other options to be used by broker implementations
Options map[string]string
} }
type Option func(*Options) type Option func(*Options)

View File

@ -62,10 +62,10 @@ type Streamer interface {
Close() error Close() error
} }
type Option func(*options) type Option func(*Options)
type CallOption func(*callOptions) type CallOption func(*CallOptions)
type PublishOption func(*publishOptions) type PublishOption func(*PublishOptions)
type RequestOption func(*requestOptions) type RequestOption func(*RequestOptions)
var ( var (
DefaultClient Client = newRpcClient() DefaultClient Client = newRpcClient()

View File

@ -8,87 +8,99 @@ import (
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
) )
type options struct { type Options struct {
contentType string ContentType string
broker broker.Broker Broker broker.Broker
codecs map[string]codec.NewCodec Codecs map[string]codec.NewCodec
registry registry.Registry Registry registry.Registry
selector selector.Selector Selector selector.Selector
transport transport.Transport Transport transport.Transport
wrappers []Wrapper Wrappers []Wrapper
// Other options to be used by client implementations
Options map[string]string
} }
type callOptions struct { type CallOptions struct {
selectOptions []selector.SelectOption SelectOptions []selector.SelectOption
// Other options to be used by client implementations
Options map[string]string
} }
type publishOptions struct{} type PublishOptions struct {
// Other options to be used by client implementations
Options map[string]string
}
type requestOptions struct { type RequestOptions struct {
stream bool Stream bool
// Other options to be used by client implementations
Options map[string]string
} }
// Broker to be used for pub/sub // Broker to be used for pub/sub
func Broker(b broker.Broker) Option { func Broker(b broker.Broker) Option {
return func(o *options) { return func(o *Options) {
o.broker = b o.Broker = b
} }
} }
// Codec to be used to encode/decode requests for a given content type // Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, c codec.NewCodec) Option { func Codec(contentType string, c codec.NewCodec) Option {
return func(o *options) { return func(o *Options) {
o.codecs[contentType] = c o.Codecs[contentType] = c
} }
} }
// Default content type of the client // Default content type of the client
func ContentType(ct string) Option { func ContentType(ct string) Option {
return func(o *options) { return func(o *Options) {
o.contentType = ct o.ContentType = ct
} }
} }
// Registry to find nodes for a given service // Registry to find nodes for a given service
func Registry(r registry.Registry) Option { func Registry(r registry.Registry) Option {
return func(o *options) { return func(o *Options) {
o.registry = r o.Registry = r
} }
} }
// Transport to use for communication e.g http, rabbitmq, etc // Transport to use for communication e.g http, rabbitmq, etc
func Transport(t transport.Transport) Option { func Transport(t transport.Transport) Option {
return func(o *options) { return func(o *Options) {
o.transport = t o.Transport = t
} }
} }
// Select is used to select a node to route a request to // Select is used to select a node to route a request to
func Selector(s selector.Selector) Option { func Selector(s selector.Selector) Option {
return func(o *options) { return func(o *Options) {
o.selector = s o.Selector = s
} }
} }
// Adds a Wrapper to a list of options passed into the client // Adds a Wrapper to a list of options passed into the client
func Wrap(w Wrapper) Option { func Wrap(w Wrapper) Option {
return func(o *options) { return func(o *Options) {
o.wrappers = append(o.wrappers, w) o.Wrappers = append(o.Wrappers, w)
} }
} }
// Call Options // Call Options
func WithSelectOption(so selector.SelectOption) CallOption { func WithSelectOption(so selector.SelectOption) CallOption {
return func(o *callOptions) { return func(o *CallOptions) {
o.selectOptions = append(o.selectOptions, so) o.SelectOptions = append(o.SelectOptions, so)
} }
} }
// Request Options // Request Options
func StreamingRequest() RequestOption { func StreamingRequest() RequestOption {
return func(o *requestOptions) { return func(o *RequestOptions) {
o.stream = true o.Stream = true
} }
} }

View File

@ -18,40 +18,40 @@ import (
type rpcClient struct { type rpcClient struct {
once sync.Once once sync.Once
opts options opts Options
} }
func newRpcClient(opt ...Option) Client { func newRpcClient(opt ...Option) Client {
var once sync.Once var once sync.Once
opts := options{ opts := Options{
codecs: make(map[string]codec.NewCodec), Codecs: make(map[string]codec.NewCodec),
} }
for _, o := range opt { for _, o := range opt {
o(&opts) o(&opts)
} }
if len(opts.contentType) == 0 { if len(opts.ContentType) == 0 {
opts.contentType = defaultContentType opts.ContentType = defaultContentType
} }
if opts.broker == nil { if opts.Broker == nil {
opts.broker = broker.DefaultBroker opts.Broker = broker.DefaultBroker
} }
if opts.registry == nil { if opts.Registry == nil {
opts.registry = registry.DefaultRegistry opts.Registry = registry.DefaultRegistry
} }
if opts.selector == nil { if opts.Selector == nil {
opts.selector = selector.NewSelector( opts.Selector = selector.NewSelector(
selector.Registry(opts.registry), selector.Registry(opts.Registry),
) )
} }
if opts.transport == nil { if opts.Transport == nil {
opts.transport = transport.DefaultTransport opts.Transport = transport.DefaultTransport
} }
rc := &rpcClient{ rc := &rpcClient{
@ -62,15 +62,15 @@ func newRpcClient(opt ...Option) Client {
c := Client(rc) c := Client(rc)
// wrap in reverse // wrap in reverse
for i := len(opts.wrappers); i > 0; i-- { for i := len(opts.Wrappers); i > 0; i-- {
c = opts.wrappers[i-1](c) c = opts.Wrappers[i-1](c)
} }
return c return c
} }
func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) { func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) {
if c, ok := r.opts.codecs[contentType]; ok { if c, ok := r.opts.Codecs[contentType]; ok {
return c, nil return c, nil
} }
if cf, ok := defaultCodecs[contentType]; ok { if cf, ok := defaultCodecs[contentType]; ok {
@ -98,7 +98,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
c, err := r.opts.transport.Dial(address) c, err := r.opts.Transport.Dial(address)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -131,7 +131,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request) (St
return nil, errors.InternalServerError("go.micro.client", err.Error()) return nil, errors.InternalServerError("go.micro.client", err.Error())
} }
c, err := r.opts.transport.Dial(address, transport.WithStream()) c, err := r.opts.Transport.Dial(address, transport.WithStream())
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -154,12 +154,12 @@ func (r *rpcClient) CallRemote(ctx context.Context, address string, request Requ
} }
func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error {
var copts callOptions var copts CallOptions
for _, opt := range opts { for _, opt := range opts {
opt(&copts) opt(&copts)
} }
next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...) next, err := r.opts.Selector.Select(request.Service(), copts.SelectOptions...)
if err != nil && err == selector.ErrNotFound { if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error()) return errors.NotFound("go.micro.client", err.Error())
} else if err != nil { } else if err != nil {
@ -179,7 +179,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac
} }
err = r.call(ctx, address, request, response) err = r.call(ctx, address, request, response)
r.opts.selector.Mark(request.Service(), node, err) r.opts.Selector.Mark(request.Service(), node, err)
return err return err
} }
@ -188,12 +188,12 @@ func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Re
} }
func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) { func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) {
var copts callOptions var copts CallOptions
for _, opt := range opts { for _, opt := range opts {
opt(&copts) opt(&copts)
} }
next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...) next, err := r.opts.Selector.Select(request.Service(), copts.SelectOptions...)
if err != nil && err == selector.ErrNotFound { if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error()) return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil { } else if err != nil {
@ -213,7 +213,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt
} }
stream, err := r.stream(ctx, address, request) stream, err := r.stream(ctx, address, request)
r.opts.selector.Mark(request.Service(), node, err) r.opts.Selector.Mark(request.Service(), node, err)
return stream, err return stream, err
} }
@ -234,24 +234,24 @@ func (r *rpcClient) Publish(ctx context.Context, p Publication, opts ...PublishO
return errors.InternalServerError("go.micro.client", err.Error()) return errors.InternalServerError("go.micro.client", err.Error())
} }
r.once.Do(func() { r.once.Do(func() {
r.opts.broker.Connect() r.opts.Broker.Connect()
}) })
return r.opts.broker.Publish(p.Topic(), &broker.Message{ return r.opts.Broker.Publish(p.Topic(), &broker.Message{
Header: md, Header: md,
Body: b.Bytes(), Body: b.Bytes(),
}) })
} }
func (r *rpcClient) NewPublication(topic string, message interface{}) Publication { func (r *rpcClient) NewPublication(topic string, message interface{}) Publication {
return newRpcPublication(topic, message, r.opts.contentType) return newRpcPublication(topic, message, r.opts.ContentType)
} }
func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication { func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication {
return newRpcPublication(topic, message, "application/octet-stream") return newRpcPublication(topic, message, "application/octet-stream")
} }
func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {
return newRpcRequest(service, method, request, r.opts.contentType, reqOpts...) return newRpcRequest(service, method, request, r.opts.ContentType, reqOpts...)
} }
func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request {

View File

@ -5,11 +5,11 @@ type rpcRequest struct {
method string method string
contentType string contentType string
request interface{} request interface{}
opts requestOptions opts RequestOptions
} }
func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request { func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request {
var opts requestOptions var opts RequestOptions
for _, o := range reqOpts { for _, o := range reqOpts {
o(&opts) o(&opts)
@ -41,5 +41,5 @@ func (r *rpcRequest) Request() interface{} {
} }
func (r *rpcRequest) Stream() bool { func (r *rpcRequest) Stream() bool {
return r.opts.stream return r.opts.Stream
} }

View File

@ -1,29 +1,23 @@
package cmd package cmd
import ( import (
"bytes"
"encoding/json"
"flag" "flag"
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
"net/http"
"os" "os"
"runtime"
"strings" "strings"
"text/tabwriter" "text/tabwriter"
"text/template" "text/template"
"time" "time"
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
log "github.com/golang/glog"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector" "github.com/micro/go-micro/selector"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
"github.com/pborman/uuid"
) )
var ( var (
@ -102,12 +96,6 @@ var (
Usage: "Comma-separated list of transport addresses", Usage: "Comma-separated list of transport addresses",
}, },
cli.BoolFlag{
Name: "enable_ping",
EnvVar: "MICRO_ENABLE_PING",
Usage: "Enable ping",
},
// logging flags // logging flags
cli.BoolFlag{ cli.BoolFlag{
Name: "logtostderr", Name: "logtostderr",
@ -160,54 +148,6 @@ func init() {
rand.Seed(time.Now().Unix()) rand.Seed(time.Now().Unix())
} }
// ping informs micro-services about this thing
func ping() {
type Ping struct {
Id string
Timestamp int64
Product string
Version string
Arch string
Os string
}
p := Ping{
Id: uuid.NewUUID().String(),
Product: "go-micro",
Version: "latest",
Arch: runtime.GOARCH,
Os: runtime.GOOS,
}
buf := bytes.NewBuffer(nil)
cl := &http.Client{}
fn := func() {
log.Infof("Ping micro-services.co")
p.Timestamp = time.Now().Unix()
b, err := json.Marshal(p)
if err != nil {
return
}
buf.Reset()
buf.Write(b)
rsp, err := cl.Post("https://micro-services.co/_ping", "application/json", buf)
if err != nil {
return
}
rsp.Body.Close()
}
// don't ping unless this thing has lived for 30 seconds
time.Sleep(time.Second * 30)
// only ping every 24 hours, be non invasive
for {
fn()
time.Sleep(time.Hour * 24)
}
}
func Setup(c *cli.Context) error { func Setup(c *cli.Context) error {
os.Args = os.Args[:1] os.Args = os.Args[:1]
@ -259,10 +199,6 @@ func Setup(c *cli.Context) error {
client.DefaultClient = client.NewClient() client.DefaultClient = client.NewClient()
if c.Bool("enable_ping") {
go ping()
}
return nil return nil
} }

View File

@ -14,7 +14,7 @@ type Example struct{}
func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error {
log.Info("Received Example.Call request") log.Info("Received Example.Call request")
rsp.Msg = server.Config().Id() + ": Hello " + req.Name rsp.Msg = server.DefaultOptions().Id + ": Hello " + req.Name
return nil return nil
} }

View File

@ -14,7 +14,7 @@ type Example struct{}
func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error {
md, _ := c.GetMetadata(ctx) md, _ := c.GetMetadata(ctx)
log.Infof("Received Example.Call request with metadata: %v", md) log.Infof("Received Example.Call request with metadata: %v", md)
rsp.Msg = server.Config().Id() + ": Hello " + req.Name rsp.Msg = server.DefaultOptions().Id + ": Hello " + req.Name
return nil return nil
} }

View File

@ -31,7 +31,7 @@ func main() {
// optionally setup command line usage // optionally setup command line usage
cmd.Init() cmd.Init()
md := server.Config().Metadata() md := server.DefaultOptions().Metadata
md["datacenter"] = "local" md["datacenter"] = "local"
server.DefaultServer = server.NewServer( server.DefaultServer = server.NewServer(

View File

@ -6,6 +6,9 @@ import (
type Options struct { type Options struct {
Timeout time.Duration Timeout time.Duration
// Other options to be used by registry implementations
Options map[string]string
} }
func Timeout(t time.Duration) Option { func Timeout(t time.Duration) Option {

View File

@ -6,10 +6,16 @@ import (
type Options struct { type Options struct {
Registry registry.Registry Registry registry.Registry
// Other options to be used by broker implementations
Options map[string]string
} }
type SelectOptions struct { type SelectOptions struct {
Filters []SelectFilter Filters []SelectFilter
// Other options to be used by broker implementations
Options map[string]string
} }
// Option used to initialise the selector // Option used to initialise the selector

View File

@ -7,166 +7,147 @@ import (
"github.com/micro/go-micro/transport" "github.com/micro/go-micro/transport"
) )
type options struct { type Options struct {
codecs map[string]codec.NewCodec Codecs map[string]codec.NewCodec
broker broker.Broker Broker broker.Broker
registry registry.Registry Registry registry.Registry
transport transport.Transport Transport transport.Transport
metadata map[string]string Metadata map[string]string
name string Name string
address string Address string
advertise string Advertise string
id string Id string
version string Version string
hdlrWrappers []HandlerWrapper HdlrWrappers []HandlerWrapper
subWrappers []SubscriberWrapper SubWrappers []SubscriberWrapper
// Extra options settable by users.
// Used for other implementations.
Options map[string]string
} }
func newOptions(opt ...Option) options { func newOptions(opt ...Option) Options {
opts := options{ opts := Options{
codecs: make(map[string]codec.NewCodec), Codecs: make(map[string]codec.NewCodec),
metadata: map[string]string{}, Metadata: map[string]string{},
Options: map[string]string{},
} }
for _, o := range opt { for _, o := range opt {
o(&opts) o(&opts)
} }
if opts.broker == nil { if opts.Broker == nil {
opts.broker = broker.DefaultBroker opts.Broker = broker.DefaultBroker
} }
if opts.registry == nil { if opts.Registry == nil {
opts.registry = registry.DefaultRegistry opts.Registry = registry.DefaultRegistry
} }
if opts.transport == nil { if opts.Transport == nil {
opts.transport = transport.DefaultTransport opts.Transport = transport.DefaultTransport
} }
if len(opts.address) == 0 { if len(opts.Address) == 0 {
opts.address = DefaultAddress opts.Address = DefaultAddress
} }
if len(opts.name) == 0 { if len(opts.Name) == 0 {
opts.name = DefaultName opts.Name = DefaultName
} }
if len(opts.id) == 0 { if len(opts.Id) == 0 {
opts.id = DefaultId opts.Id = DefaultId
} }
if len(opts.version) == 0 { if len(opts.Version) == 0 {
opts.version = DefaultVersion opts.Version = DefaultVersion
} }
return opts return opts
} }
func (o options) Name() string {
return o.name
}
func (o options) Id() string {
return o.name + "-" + o.id
}
func (o options) Version() string {
return o.version
}
func (o options) Address() string {
return o.address
}
func (o options) Advertise() string {
return o.advertise
}
func (o options) Metadata() map[string]string {
return o.metadata
}
// Server name // Server name
func Name(n string) Option { func Name(n string) Option {
return func(o *options) { return func(o *Options) {
o.name = n o.Name = n
} }
} }
// Unique server id // Unique server id
func Id(id string) Option { func Id(id string) Option {
return func(o *options) { return func(o *Options) {
o.id = id o.Id = id
} }
} }
// Version of the service // Version of the service
func Version(v string) Option { func Version(v string) Option {
return func(o *options) { return func(o *Options) {
o.version = v o.Version = v
} }
} }
// Address to bind to - host:port // Address to bind to - host:port
func Address(a string) Option { func Address(a string) Option {
return func(o *options) { return func(o *Options) {
o.address = a o.Address = a
} }
} }
// The address to advertise for discovery - host:port // The address to advertise for discovery - host:port
func Advertise(a string) Option { func Advertise(a string) Option {
return func(o *options) { return func(o *Options) {
o.advertise = a o.Advertise = a
} }
} }
// Broker to use for pub/sub // Broker to use for pub/sub
func Broker(b broker.Broker) Option { func Broker(b broker.Broker) Option {
return func(o *options) { return func(o *Options) {
o.broker = b o.Broker = b
} }
} }
// Codec to use to encode/decode requests for a given content type // Codec to use to encode/decode requests for a given content type
func Codec(contentType string, c codec.NewCodec) Option { func Codec(contentType string, c codec.NewCodec) Option {
return func(o *options) { return func(o *Options) {
o.codecs[contentType] = c o.Codecs[contentType] = c
} }
} }
// Registry used for discovery // Registry used for discovery
func Registry(r registry.Registry) Option { func Registry(r registry.Registry) Option {
return func(o *options) { return func(o *Options) {
o.registry = r o.Registry = r
} }
} }
// Transport mechanism for communication e.g http, rabbitmq, etc // Transport mechanism for communication e.g http, rabbitmq, etc
func Transport(t transport.Transport) Option { func Transport(t transport.Transport) Option {
return func(o *options) { return func(o *Options) {
o.transport = t o.Transport = t
} }
} }
// Metadata associated with the server // Metadata associated with the server
func Metadata(md map[string]string) Option { func Metadata(md map[string]string) Option {
return func(o *options) { return func(o *Options) {
o.metadata = md o.Metadata = md
} }
} }
// Adds a handler Wrapper to a list of options passed into the server // Adds a handler Wrapper to a list of options passed into the server
func WrapHandler(w HandlerWrapper) Option { func WrapHandler(w HandlerWrapper) Option {
return func(o *options) { return func(o *Options) {
o.hdlrWrappers = append(o.hdlrWrappers, w) o.HdlrWrappers = append(o.HdlrWrappers, w)
} }
} }
// Adds a subscriber Wrapper to a list of options passed into the server // Adds a subscriber Wrapper to a list of options passed into the server
func WrapSubscriber(w SubscriberWrapper) Option { func WrapSubscriber(w SubscriberWrapper) Option {
return func(o *options) { return func(o *Options) {
o.subWrappers = append(o.subWrappers, w) o.SubWrappers = append(o.SubWrappers, w)
} }
} }

View File

@ -23,7 +23,7 @@ type rpcServer struct {
exit chan chan error exit chan chan error
sync.RWMutex sync.RWMutex
opts options opts Options
handlers map[string]Handler handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber subscribers map[*subscriber][]broker.Subscriber
} }
@ -33,9 +33,9 @@ func newRpcServer(opts ...Option) Server {
return &rpcServer{ return &rpcServer{
opts: options, opts: options,
rpc: &server{ rpc: &server{
name: options.name, name: options.Name,
serviceMap: make(map[string]*service), serviceMap: make(map[string]*service),
hdlrWrappers: options.hdlrWrappers, hdlrWrappers: options.HdlrWrappers,
}, },
handlers: make(map[string]Handler), handlers: make(map[string]Handler),
subscribers: make(map[*subscriber][]broker.Subscriber), subscribers: make(map[*subscriber][]broker.Subscriber),
@ -89,7 +89,7 @@ func (s *rpcServer) accept(sock transport.Socket) {
} }
func (s *rpcServer) newCodec(contentType string) (codec.NewCodec, error) { func (s *rpcServer) newCodec(contentType string) (codec.NewCodec, error) {
if cf, ok := s.opts.codecs[contentType]; ok { if cf, ok := s.opts.Codecs[contentType]; ok {
return cf, nil return cf, nil
} }
if cf, ok := defaultCodecs[contentType]; ok { if cf, ok := defaultCodecs[contentType]; ok {
@ -98,7 +98,7 @@ func (s *rpcServer) newCodec(contentType string) (codec.NewCodec, error) {
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
} }
func (s *rpcServer) Config() options { func (s *rpcServer) Options() Options {
s.RLock() s.RLock()
opts := s.opts opts := s.opts
s.RUnlock() s.RUnlock()
@ -110,8 +110,8 @@ func (s *rpcServer) Init(opts ...Option) {
for _, opt := range opts { for _, opt := range opts {
opt(&s.opts) opt(&s.opts)
} }
if len(s.opts.id) == 0 { if len(s.opts.Id) == 0 {
s.opts.id = s.opts.name + "-" + DefaultId s.opts.Id = s.opts.Name + "-" + DefaultId
} }
s.Unlock() s.Unlock()
} }
@ -159,17 +159,17 @@ func (s *rpcServer) Subscribe(sb Subscriber) error {
func (s *rpcServer) Register() error { func (s *rpcServer) Register() error {
// parse address for host, port // parse address for host, port
config := s.Config() config := s.Options()
var advt, host string var advt, host string
var port int var port int
// check the advertise address first // check the advertise address first
// if it exists then use it, otherwise // if it exists then use it, otherwise
// use the address // use the address
if len(config.Advertise()) > 0 { if len(config.Advertise) > 0 {
advt = config.Advertise() advt = config.Advertise
} else { } else {
advt = config.Address() advt = config.Address
} }
parts := strings.Split(advt, ":") parts := strings.Split(advt, ":")
@ -187,13 +187,13 @@ func (s *rpcServer) Register() error {
// register service // register service
node := &registry.Node{ node := &registry.Node{
Id: config.Id(), Id: config.Id,
Address: addr, Address: addr,
Port: port, Port: port,
Metadata: config.Metadata(), Metadata: config.Metadata,
} }
node.Metadata["transport"] = config.transport.String() node.Metadata["transport"] = config.Transport.String()
s.RLock() s.RLock()
var endpoints []*registry.Endpoint var endpoints []*registry.Endpoint
@ -206,14 +206,14 @@ func (s *rpcServer) Register() error {
s.RUnlock() s.RUnlock()
service := &registry.Service{ service := &registry.Service{
Name: config.Name(), Name: config.Name,
Version: config.Version(), Version: config.Version,
Nodes: []*registry.Node{node}, Nodes: []*registry.Node{node},
Endpoints: endpoints, Endpoints: endpoints,
} }
log.Infof("Registering node: %s", node.Id) log.Infof("Registering node: %s", node.Id)
if err := config.registry.Register(service); err != nil { if err := config.Registry.Register(service); err != nil {
return err return err
} }
@ -222,7 +222,7 @@ func (s *rpcServer) Register() error {
for sb, _ := range s.subscribers { for sb, _ := range s.subscribers {
handler := s.createSubHandler(sb, s.opts) handler := s.createSubHandler(sb, s.opts)
sub, err := config.broker.Subscribe(sb.Topic(), handler) sub, err := config.Broker.Subscribe(sb.Topic(), handler)
if err != nil { if err != nil {
return err return err
} }
@ -233,17 +233,17 @@ func (s *rpcServer) Register() error {
} }
func (s *rpcServer) Deregister() error { func (s *rpcServer) Deregister() error {
config := s.Config() config := s.Options()
var advt, host string var advt, host string
var port int var port int
// check the advertise address first // check the advertise address first
// if it exists then use it, otherwise // if it exists then use it, otherwise
// use the address // use the address
if len(config.Advertise()) > 0 { if len(config.Advertise) > 0 {
advt = config.Advertise() advt = config.Advertise
} else { } else {
advt = config.Address() advt = config.Address
} }
parts := strings.Split(advt, ":") parts := strings.Split(advt, ":")
@ -260,19 +260,19 @@ func (s *rpcServer) Deregister() error {
} }
node := &registry.Node{ node := &registry.Node{
Id: config.Id(), Id: config.Id,
Address: addr, Address: addr,
Port: port, Port: port,
} }
service := &registry.Service{ service := &registry.Service{
Name: config.Name(), Name: config.Name,
Version: config.Version(), Version: config.Version,
Nodes: []*registry.Node{node}, Nodes: []*registry.Node{node},
} }
log.Infof("Deregistering node: %s", node.Id) log.Infof("Deregistering node: %s", node.Id)
if err := config.registry.Deregister(service); err != nil { if err := config.Registry.Deregister(service); err != nil {
return err return err
} }
@ -290,16 +290,16 @@ func (s *rpcServer) Deregister() error {
func (s *rpcServer) Start() error { func (s *rpcServer) Start() error {
registerHealthChecker(s) registerHealthChecker(s)
config := s.Config() config := s.Options()
ts, err := config.transport.Listen(config.address) ts, err := config.Transport.Listen(config.Address)
if err != nil { if err != nil {
return err return err
} }
log.Infof("Listening on %s", ts.Addr()) log.Infof("Listening on %s", ts.Addr())
s.Lock() s.Lock()
s.opts.address = ts.Addr() s.opts.Address = ts.Addr()
s.Unlock() s.Unlock()
go ts.Accept(s.accept) go ts.Accept(s.accept)
@ -307,11 +307,11 @@ func (s *rpcServer) Start() error {
go func() { go func() {
ch := <-s.exit ch := <-s.exit
ch <- ts.Close() ch <- ts.Close()
config.broker.Disconnect() config.Broker.Disconnect()
}() }()
// TODO: subscribe to cruft // TODO: subscribe to cruft
return config.broker.Connect() return config.Broker.Connect()
} }
func (s *rpcServer) Stop() error { func (s *rpcServer) Stop() error {

View File

@ -39,7 +39,7 @@ import (
) )
type Server interface { type Server interface {
Config() options Options() Options
Init(...Option) Init(...Option)
Handle(Handler) error Handle(Handler) error
NewHandler(interface{}) Handler NewHandler(interface{}) Handler
@ -80,7 +80,7 @@ type Streamer interface {
Close() error Close() error
} }
type Option func(*options) type Option func(*Options)
var ( var (
DefaultAddress = ":0" DefaultAddress = ":0"
@ -91,8 +91,8 @@ var (
) )
// Returns config options for the default service // Returns config options for the default service
func Config() options { func DefaultOptions() Options {
return DefaultServer.Config() return DefaultServer.Options()
} }
// Initialises the default server with options passed in // Initialises the default server with options passed in
@ -174,8 +174,8 @@ func Run() error {
// Starts the default server // Starts the default server
func Start() error { func Start() error {
config := DefaultServer.Config() config := DefaultServer.Options()
log.Infof("Starting server %s id %s", config.Name(), config.Id()) log.Infof("Starting server %s id %s", config.Name, config.Id)
return DefaultServer.Start() return DefaultServer.Start()
} }

View File

@ -154,7 +154,7 @@ func validateSubscriber(sub Subscriber) error {
return nil return nil
} }
func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handler { func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Publication) error { return func(p broker.Publication) error {
msg := p.Message() msg := p.Message()
ct := msg.Header["Content-Type"] ct := msg.Header["Content-Type"]
@ -216,8 +216,8 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handle
return nil return nil
} }
for i := len(opts.subWrappers); i > 0; i-- { for i := len(opts.SubWrappers); i > 0; i-- {
fn = opts.subWrappers[i-1](fn) fn = opts.SubWrappers[i-1](fn)
} }
go fn(ctx, &rpcPublication{ go fn(ctx, &rpcPublication{

View File

@ -22,7 +22,7 @@ type httpTransportClient struct {
ht *httpTransport ht *httpTransport
addr string addr string
conn net.Conn conn net.Conn
dialOpts dialOptions dialOpts DialOptions
once sync.Once once sync.Once
sync.Mutex sync.Mutex
@ -87,7 +87,7 @@ func (h *httpTransportClient) Send(m *Message) error {
func (h *httpTransportClient) Recv(m *Message) error { func (h *httpTransportClient) Recv(m *Message) error {
var r *http.Request var r *http.Request
if !h.dialOpts.stream { if !h.dialOpts.Stream {
rc, ok := <-h.r rc, ok := <-h.r
if !ok { if !ok {
return io.EOF return io.EOF
@ -281,7 +281,7 @@ func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
return nil, err return nil, err
} }
var dopts dialOptions var dopts DialOptions
for _, opt := range opts { for _, opt := range opts {
opt(&dopts) opt(&dopts)

View File

@ -29,23 +29,29 @@ type Transport interface {
String() string String() string
} }
type options struct{} type Options struct {
// Other options to be used by broker implementations
type dialOptions struct { Options map[string]string
stream bool
} }
type Option func(*options) type DialOptions struct {
Stream bool
type DialOption func(*dialOptions) // Other options to be used by broker implementations
Options map[string]string
}
type Option func(*Options)
type DialOption func(*DialOptions)
var ( var (
DefaultTransport Transport = newHttpTransport([]string{}) DefaultTransport Transport = newHttpTransport([]string{})
) )
func WithStream() DialOption { func WithStream() DialOption {
return func(o *dialOptions) { return func(o *DialOptions) {
o.stream = true o.Stream = true
} }
} }