diff --git a/README.md b/README.md index e63ed11b..22f68aec 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,7 @@ type Example struct{} func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { md, _ := c.GetMetadata(ctx) log.Infof("Received Example.Call request with metadata: %v", md) - rsp.Msg = server.Config().Id() + ": Hello " + req.Name + rsp.Msg = server.Options().Id + ": Hello " + req.Name return nil } ``` diff --git a/broker/broker.go b/broker/broker.go index 5099adf3..284f023e 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -1,6 +1,7 @@ package broker type Broker interface { + Options() Options Address() string Connect() error Disconnect() error @@ -28,7 +29,7 @@ type Publication interface { } type Subscriber interface { - Config() SubscribeOptions + Options() SubscribeOptions Topic() string Unsubscribe() error } diff --git a/broker/http_broker.go b/broker/http_broker.go index 00119176..ba9d58ec 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -28,6 +28,7 @@ type httpBroker struct { id string address string unsubscribe chan *httpSubscriber + opts Options sync.RWMutex subscribers map[string][]*httpSubscriber @@ -85,7 +86,7 @@ func (h *httpPublication) Topic() string { return h.t } -func (h *httpSubscriber) Config() SubscribeOptions { +func (h *httpSubscriber) Options() SubscribeOptions { return h.opts } @@ -213,6 +214,10 @@ func (h *httpBroker) Init(opts ...Option) error { return nil } +func (h *httpBroker) Options() Options { + return h.opts +} + func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { s, err := registry.GetService("topic:" + topic) if err != nil { diff --git a/broker/options.go b/broker/options.go index ea682666..a0309aa1 100644 --- a/broker/options.go +++ b/broker/options.go @@ -1,8 +1,15 @@ package broker -type Options struct{} +type Options struct { -type PublishOptions struct{} + // Other options to be used by broker implementations + Options map[string]string +} + +type PublishOptions struct { + // Other options to be used by broker implementations + Options map[string]string +} type SubscribeOptions struct { // AutoAck defaults to true. When a handler returns @@ -12,6 +19,9 @@ type SubscribeOptions struct { // will create a shared subscription where each // receives a subset of messages. Queue string + + // Other options to be used by broker implementations + Options map[string]string } type Option func(*Options) diff --git a/client/client.go b/client/client.go index 7f02f2b3..906e399f 100644 --- a/client/client.go +++ b/client/client.go @@ -62,10 +62,10 @@ type Streamer interface { Close() error } -type Option func(*options) -type CallOption func(*callOptions) -type PublishOption func(*publishOptions) -type RequestOption func(*requestOptions) +type Option func(*Options) +type CallOption func(*CallOptions) +type PublishOption func(*PublishOptions) +type RequestOption func(*RequestOptions) var ( DefaultClient Client = newRpcClient() diff --git a/client/options.go b/client/options.go index 389b34b1..102c8400 100644 --- a/client/options.go +++ b/client/options.go @@ -8,87 +8,99 @@ import ( "github.com/micro/go-micro/transport" ) -type options struct { - contentType string - broker broker.Broker - codecs map[string]codec.NewCodec - registry registry.Registry - selector selector.Selector - transport transport.Transport - wrappers []Wrapper +type Options struct { + ContentType string + Broker broker.Broker + Codecs map[string]codec.NewCodec + Registry registry.Registry + Selector selector.Selector + Transport transport.Transport + Wrappers []Wrapper + + // Other options to be used by client implementations + Options map[string]string } -type callOptions struct { - selectOptions []selector.SelectOption +type CallOptions struct { + SelectOptions []selector.SelectOption + + // Other options to be used by client implementations + Options map[string]string } -type publishOptions struct{} +type PublishOptions struct { + // Other options to be used by client implementations + Options map[string]string +} -type requestOptions struct { - stream bool +type RequestOptions struct { + Stream bool + + // Other options to be used by client implementations + Options map[string]string } // Broker to be used for pub/sub func Broker(b broker.Broker) Option { - return func(o *options) { - o.broker = b + return func(o *Options) { + o.Broker = b } } // Codec to be used to encode/decode requests for a given content type func Codec(contentType string, c codec.NewCodec) Option { - return func(o *options) { - o.codecs[contentType] = c + return func(o *Options) { + o.Codecs[contentType] = c } } // Default content type of the client func ContentType(ct string) Option { - return func(o *options) { - o.contentType = ct + return func(o *Options) { + o.ContentType = ct } } // Registry to find nodes for a given service func Registry(r registry.Registry) Option { - return func(o *options) { - o.registry = r + return func(o *Options) { + o.Registry = r } } // Transport to use for communication e.g http, rabbitmq, etc func Transport(t transport.Transport) Option { - return func(o *options) { - o.transport = t + return func(o *Options) { + o.Transport = t } } // Select is used to select a node to route a request to func Selector(s selector.Selector) Option { - return func(o *options) { - o.selector = s + return func(o *Options) { + o.Selector = s } } // Adds a Wrapper to a list of options passed into the client func Wrap(w Wrapper) Option { - return func(o *options) { - o.wrappers = append(o.wrappers, w) + return func(o *Options) { + o.Wrappers = append(o.Wrappers, w) } } // Call Options func WithSelectOption(so selector.SelectOption) CallOption { - return func(o *callOptions) { - o.selectOptions = append(o.selectOptions, so) + return func(o *CallOptions) { + o.SelectOptions = append(o.SelectOptions, so) } } // Request Options func StreamingRequest() RequestOption { - return func(o *requestOptions) { - o.stream = true + return func(o *RequestOptions) { + o.Stream = true } } diff --git a/client/rpc_client.go b/client/rpc_client.go index 406e2836..8fcd573f 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -18,40 +18,40 @@ import ( type rpcClient struct { once sync.Once - opts options + opts Options } func newRpcClient(opt ...Option) Client { var once sync.Once - opts := options{ - codecs: make(map[string]codec.NewCodec), + opts := Options{ + Codecs: make(map[string]codec.NewCodec), } for _, o := range opt { o(&opts) } - if len(opts.contentType) == 0 { - opts.contentType = defaultContentType + if len(opts.ContentType) == 0 { + opts.ContentType = defaultContentType } - if opts.broker == nil { - opts.broker = broker.DefaultBroker + if opts.Broker == nil { + opts.Broker = broker.DefaultBroker } - if opts.registry == nil { - opts.registry = registry.DefaultRegistry + if opts.Registry == nil { + opts.Registry = registry.DefaultRegistry } - if opts.selector == nil { - opts.selector = selector.NewSelector( - selector.Registry(opts.registry), + if opts.Selector == nil { + opts.Selector = selector.NewSelector( + selector.Registry(opts.Registry), ) } - if opts.transport == nil { - opts.transport = transport.DefaultTransport + if opts.Transport == nil { + opts.Transport = transport.DefaultTransport } rc := &rpcClient{ @@ -62,15 +62,15 @@ func newRpcClient(opt ...Option) Client { c := Client(rc) // wrap in reverse - for i := len(opts.wrappers); i > 0; i-- { - c = opts.wrappers[i-1](c) + for i := len(opts.Wrappers); i > 0; i-- { + c = opts.Wrappers[i-1](c) } return c } func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) { - if c, ok := r.opts.codecs[contentType]; ok { + if c, ok := r.opts.Codecs[contentType]; ok { return c, nil } if cf, ok := defaultCodecs[contentType]; ok { @@ -98,7 +98,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r return errors.InternalServerError("go.micro.client", err.Error()) } - c, err := r.opts.transport.Dial(address) + c, err := r.opts.Transport.Dial(address) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -131,7 +131,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, req Request) (St return nil, errors.InternalServerError("go.micro.client", err.Error()) } - c, err := r.opts.transport.Dial(address, transport.WithStream()) + c, err := r.opts.Transport.Dial(address, transport.WithStream()) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } @@ -154,12 +154,12 @@ func (r *rpcClient) CallRemote(ctx context.Context, address string, request Requ } func (r *rpcClient) Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { - var copts callOptions + var copts CallOptions for _, opt := range opts { opt(&copts) } - next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...) + next, err := r.opts.Selector.Select(request.Service(), copts.SelectOptions...) if err != nil && err == selector.ErrNotFound { return errors.NotFound("go.micro.client", err.Error()) } else if err != nil { @@ -179,7 +179,7 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac } err = r.call(ctx, address, request, response) - r.opts.selector.Mark(request.Service(), node, err) + r.opts.Selector.Mark(request.Service(), node, err) return err } @@ -188,12 +188,12 @@ func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Re } func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) { - var copts callOptions + var copts CallOptions for _, opt := range opts { opt(&copts) } - next, err := r.opts.selector.Select(request.Service(), copts.selectOptions...) + next, err := r.opts.Selector.Select(request.Service(), copts.SelectOptions...) if err != nil && err == selector.ErrNotFound { return nil, errors.NotFound("go.micro.client", err.Error()) } else if err != nil { @@ -213,7 +213,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOpt } stream, err := r.stream(ctx, address, request) - r.opts.selector.Mark(request.Service(), node, err) + r.opts.Selector.Mark(request.Service(), node, err) return stream, err } @@ -234,24 +234,24 @@ func (r *rpcClient) Publish(ctx context.Context, p Publication, opts ...PublishO return errors.InternalServerError("go.micro.client", err.Error()) } r.once.Do(func() { - r.opts.broker.Connect() + r.opts.Broker.Connect() }) - return r.opts.broker.Publish(p.Topic(), &broker.Message{ + return r.opts.Broker.Publish(p.Topic(), &broker.Message{ Header: md, Body: b.Bytes(), }) } func (r *rpcClient) NewPublication(topic string, message interface{}) Publication { - return newRpcPublication(topic, message, r.opts.contentType) + return newRpcPublication(topic, message, r.opts.ContentType) } func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication { return newRpcPublication(topic, message, "application/octet-stream") } func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { - return newRpcRequest(service, method, request, r.opts.contentType, reqOpts...) + return newRpcRequest(service, method, request, r.opts.ContentType, reqOpts...) } func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { diff --git a/client/rpc_request.go b/client/rpc_request.go index 5a5ec0df..ec50a76a 100644 --- a/client/rpc_request.go +++ b/client/rpc_request.go @@ -5,11 +5,11 @@ type rpcRequest struct { method string contentType string request interface{} - opts requestOptions + opts RequestOptions } func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request { - var opts requestOptions + var opts RequestOptions for _, o := range reqOpts { o(&opts) @@ -41,5 +41,5 @@ func (r *rpcRequest) Request() interface{} { } func (r *rpcRequest) Stream() bool { - return r.opts.stream + return r.opts.Stream } diff --git a/cmd/cmd.go b/cmd/cmd.go index 8dd54881..776eb6a3 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -1,29 +1,23 @@ package cmd import ( - "bytes" - "encoding/json" "flag" "fmt" "io" "math/rand" - "net/http" "os" - "runtime" "strings" "text/tabwriter" "text/template" "time" "github.com/codegangsta/cli" - log "github.com/golang/glog" "github.com/micro/go-micro/broker" "github.com/micro/go-micro/client" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/selector" "github.com/micro/go-micro/server" "github.com/micro/go-micro/transport" - "github.com/pborman/uuid" ) var ( @@ -102,12 +96,6 @@ var ( Usage: "Comma-separated list of transport addresses", }, - cli.BoolFlag{ - Name: "enable_ping", - EnvVar: "MICRO_ENABLE_PING", - Usage: "Enable ping", - }, - // logging flags cli.BoolFlag{ Name: "logtostderr", @@ -160,54 +148,6 @@ func init() { rand.Seed(time.Now().Unix()) } -// ping informs micro-services about this thing -func ping() { - type Ping struct { - Id string - Timestamp int64 - Product string - Version string - Arch string - Os string - } - - p := Ping{ - Id: uuid.NewUUID().String(), - Product: "go-micro", - Version: "latest", - Arch: runtime.GOARCH, - Os: runtime.GOOS, - } - - buf := bytes.NewBuffer(nil) - cl := &http.Client{} - - fn := func() { - log.Infof("Ping micro-services.co") - p.Timestamp = time.Now().Unix() - b, err := json.Marshal(p) - if err != nil { - return - } - buf.Reset() - buf.Write(b) - rsp, err := cl.Post("https://micro-services.co/_ping", "application/json", buf) - if err != nil { - return - } - rsp.Body.Close() - } - - // don't ping unless this thing has lived for 30 seconds - time.Sleep(time.Second * 30) - - // only ping every 24 hours, be non invasive - for { - fn() - time.Sleep(time.Hour * 24) - } -} - func Setup(c *cli.Context) error { os.Args = os.Args[:1] @@ -259,10 +199,6 @@ func Setup(c *cli.Context) error { client.DefaultClient = client.NewClient() - if c.Bool("enable_ping") { - go ping() - } - return nil } diff --git a/examples/server/codegen/codegen.go b/examples/server/codegen/codegen.go index 5d485aeb..c236305a 100644 --- a/examples/server/codegen/codegen.go +++ b/examples/server/codegen/codegen.go @@ -14,7 +14,7 @@ type Example struct{} func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { log.Info("Received Example.Call request") - rsp.Msg = server.Config().Id() + ": Hello " + req.Name + rsp.Msg = server.DefaultOptions().Id + ": Hello " + req.Name return nil } diff --git a/examples/server/handler/example.go b/examples/server/handler/example.go index d109ac60..e79a574f 100644 --- a/examples/server/handler/example.go +++ b/examples/server/handler/example.go @@ -14,7 +14,7 @@ type Example struct{} func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { md, _ := c.GetMetadata(ctx) log.Infof("Received Example.Call request with metadata: %v", md) - rsp.Msg = server.Config().Id() + ": Hello " + req.Name + rsp.Msg = server.DefaultOptions().Id + ": Hello " + req.Name return nil } diff --git a/examples/server/main.go b/examples/server/main.go index 193fd499..ad9485fa 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -31,7 +31,7 @@ func main() { // optionally setup command line usage cmd.Init() - md := server.Config().Metadata() + md := server.DefaultOptions().Metadata md["datacenter"] = "local" server.DefaultServer = server.NewServer( diff --git a/registry/options.go b/registry/options.go index fbd16b34..ab951103 100644 --- a/registry/options.go +++ b/registry/options.go @@ -6,6 +6,9 @@ import ( type Options struct { Timeout time.Duration + + // Other options to be used by registry implementations + Options map[string]string } func Timeout(t time.Duration) Option { diff --git a/selector/options.go b/selector/options.go index 59738edf..d7e0a417 100644 --- a/selector/options.go +++ b/selector/options.go @@ -6,10 +6,16 @@ import ( type Options struct { Registry registry.Registry + + // Other options to be used by broker implementations + Options map[string]string } type SelectOptions struct { Filters []SelectFilter + + // Other options to be used by broker implementations + Options map[string]string } // Option used to initialise the selector diff --git a/server/options.go b/server/options.go index 4ad36d7d..1ddeb7f5 100644 --- a/server/options.go +++ b/server/options.go @@ -7,166 +7,147 @@ import ( "github.com/micro/go-micro/transport" ) -type options struct { - codecs map[string]codec.NewCodec - broker broker.Broker - registry registry.Registry - transport transport.Transport - metadata map[string]string - name string - address string - advertise string - id string - version string - hdlrWrappers []HandlerWrapper - subWrappers []SubscriberWrapper +type Options struct { + Codecs map[string]codec.NewCodec + Broker broker.Broker + Registry registry.Registry + Transport transport.Transport + Metadata map[string]string + Name string + Address string + Advertise string + Id string + Version string + HdlrWrappers []HandlerWrapper + SubWrappers []SubscriberWrapper + + // Extra options settable by users. + // Used for other implementations. + Options map[string]string } -func newOptions(opt ...Option) options { - opts := options{ - codecs: make(map[string]codec.NewCodec), - metadata: map[string]string{}, +func newOptions(opt ...Option) Options { + opts := Options{ + Codecs: make(map[string]codec.NewCodec), + Metadata: map[string]string{}, + Options: map[string]string{}, } for _, o := range opt { o(&opts) } - if opts.broker == nil { - opts.broker = broker.DefaultBroker + if opts.Broker == nil { + opts.Broker = broker.DefaultBroker } - if opts.registry == nil { - opts.registry = registry.DefaultRegistry + if opts.Registry == nil { + opts.Registry = registry.DefaultRegistry } - if opts.transport == nil { - opts.transport = transport.DefaultTransport + if opts.Transport == nil { + opts.Transport = transport.DefaultTransport } - if len(opts.address) == 0 { - opts.address = DefaultAddress + if len(opts.Address) == 0 { + opts.Address = DefaultAddress } - if len(opts.name) == 0 { - opts.name = DefaultName + if len(opts.Name) == 0 { + opts.Name = DefaultName } - if len(opts.id) == 0 { - opts.id = DefaultId + if len(opts.Id) == 0 { + opts.Id = DefaultId } - if len(opts.version) == 0 { - opts.version = DefaultVersion + if len(opts.Version) == 0 { + opts.Version = DefaultVersion } return opts } -func (o options) Name() string { - return o.name -} - -func (o options) Id() string { - return o.name + "-" + o.id -} - -func (o options) Version() string { - return o.version -} - -func (o options) Address() string { - return o.address -} - -func (o options) Advertise() string { - return o.advertise -} - -func (o options) Metadata() map[string]string { - return o.metadata -} - // Server name func Name(n string) Option { - return func(o *options) { - o.name = n + return func(o *Options) { + o.Name = n } } // Unique server id func Id(id string) Option { - return func(o *options) { - o.id = id + return func(o *Options) { + o.Id = id } } // Version of the service func Version(v string) Option { - return func(o *options) { - o.version = v + return func(o *Options) { + o.Version = v } } // Address to bind to - host:port func Address(a string) Option { - return func(o *options) { - o.address = a + return func(o *Options) { + o.Address = a } } // The address to advertise for discovery - host:port func Advertise(a string) Option { - return func(o *options) { - o.advertise = a + return func(o *Options) { + o.Advertise = a } } // Broker to use for pub/sub func Broker(b broker.Broker) Option { - return func(o *options) { - o.broker = b + return func(o *Options) { + o.Broker = b } } // Codec to use to encode/decode requests for a given content type func Codec(contentType string, c codec.NewCodec) Option { - return func(o *options) { - o.codecs[contentType] = c + return func(o *Options) { + o.Codecs[contentType] = c } } // Registry used for discovery func Registry(r registry.Registry) Option { - return func(o *options) { - o.registry = r + return func(o *Options) { + o.Registry = r } } // Transport mechanism for communication e.g http, rabbitmq, etc func Transport(t transport.Transport) Option { - return func(o *options) { - o.transport = t + return func(o *Options) { + o.Transport = t } } // Metadata associated with the server func Metadata(md map[string]string) Option { - return func(o *options) { - o.metadata = md + return func(o *Options) { + o.Metadata = md } } // Adds a handler Wrapper to a list of options passed into the server func WrapHandler(w HandlerWrapper) Option { - return func(o *options) { - o.hdlrWrappers = append(o.hdlrWrappers, w) + return func(o *Options) { + o.HdlrWrappers = append(o.HdlrWrappers, w) } } // Adds a subscriber Wrapper to a list of options passed into the server func WrapSubscriber(w SubscriberWrapper) Option { - return func(o *options) { - o.subWrappers = append(o.subWrappers, w) + return func(o *Options) { + o.SubWrappers = append(o.SubWrappers, w) } } diff --git a/server/rpc_server.go b/server/rpc_server.go index f5f93f1d..6b65b0ed 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -23,7 +23,7 @@ type rpcServer struct { exit chan chan error sync.RWMutex - opts options + opts Options handlers map[string]Handler subscribers map[*subscriber][]broker.Subscriber } @@ -33,9 +33,9 @@ func newRpcServer(opts ...Option) Server { return &rpcServer{ opts: options, rpc: &server{ - name: options.name, + name: options.Name, serviceMap: make(map[string]*service), - hdlrWrappers: options.hdlrWrappers, + hdlrWrappers: options.HdlrWrappers, }, handlers: make(map[string]Handler), subscribers: make(map[*subscriber][]broker.Subscriber), @@ -89,7 +89,7 @@ func (s *rpcServer) accept(sock transport.Socket) { } func (s *rpcServer) newCodec(contentType string) (codec.NewCodec, error) { - if cf, ok := s.opts.codecs[contentType]; ok { + if cf, ok := s.opts.Codecs[contentType]; ok { return cf, nil } if cf, ok := defaultCodecs[contentType]; ok { @@ -98,7 +98,7 @@ func (s *rpcServer) newCodec(contentType string) (codec.NewCodec, error) { return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) } -func (s *rpcServer) Config() options { +func (s *rpcServer) Options() Options { s.RLock() opts := s.opts s.RUnlock() @@ -110,8 +110,8 @@ func (s *rpcServer) Init(opts ...Option) { for _, opt := range opts { opt(&s.opts) } - if len(s.opts.id) == 0 { - s.opts.id = s.opts.name + "-" + DefaultId + if len(s.opts.Id) == 0 { + s.opts.Id = s.opts.Name + "-" + DefaultId } s.Unlock() } @@ -159,17 +159,17 @@ func (s *rpcServer) Subscribe(sb Subscriber) error { func (s *rpcServer) Register() error { // parse address for host, port - config := s.Config() + config := s.Options() var advt, host string var port int // check the advertise address first // if it exists then use it, otherwise // use the address - if len(config.Advertise()) > 0 { - advt = config.Advertise() + if len(config.Advertise) > 0 { + advt = config.Advertise } else { - advt = config.Address() + advt = config.Address } parts := strings.Split(advt, ":") @@ -187,13 +187,13 @@ func (s *rpcServer) Register() error { // register service node := ®istry.Node{ - Id: config.Id(), + Id: config.Id, Address: addr, Port: port, - Metadata: config.Metadata(), + Metadata: config.Metadata, } - node.Metadata["transport"] = config.transport.String() + node.Metadata["transport"] = config.Transport.String() s.RLock() var endpoints []*registry.Endpoint @@ -206,14 +206,14 @@ func (s *rpcServer) Register() error { s.RUnlock() service := ®istry.Service{ - Name: config.Name(), - Version: config.Version(), + Name: config.Name, + Version: config.Version, Nodes: []*registry.Node{node}, Endpoints: endpoints, } log.Infof("Registering node: %s", node.Id) - if err := config.registry.Register(service); err != nil { + if err := config.Registry.Register(service); err != nil { return err } @@ -222,7 +222,7 @@ func (s *rpcServer) Register() error { for sb, _ := range s.subscribers { handler := s.createSubHandler(sb, s.opts) - sub, err := config.broker.Subscribe(sb.Topic(), handler) + sub, err := config.Broker.Subscribe(sb.Topic(), handler) if err != nil { return err } @@ -233,17 +233,17 @@ func (s *rpcServer) Register() error { } func (s *rpcServer) Deregister() error { - config := s.Config() + config := s.Options() var advt, host string var port int // check the advertise address first // if it exists then use it, otherwise // use the address - if len(config.Advertise()) > 0 { - advt = config.Advertise() + if len(config.Advertise) > 0 { + advt = config.Advertise } else { - advt = config.Address() + advt = config.Address } parts := strings.Split(advt, ":") @@ -260,19 +260,19 @@ func (s *rpcServer) Deregister() error { } node := ®istry.Node{ - Id: config.Id(), + Id: config.Id, Address: addr, Port: port, } service := ®istry.Service{ - Name: config.Name(), - Version: config.Version(), + Name: config.Name, + Version: config.Version, Nodes: []*registry.Node{node}, } log.Infof("Deregistering node: %s", node.Id) - if err := config.registry.Deregister(service); err != nil { + if err := config.Registry.Deregister(service); err != nil { return err } @@ -290,16 +290,16 @@ func (s *rpcServer) Deregister() error { func (s *rpcServer) Start() error { registerHealthChecker(s) - config := s.Config() + config := s.Options() - ts, err := config.transport.Listen(config.address) + ts, err := config.Transport.Listen(config.Address) if err != nil { return err } log.Infof("Listening on %s", ts.Addr()) s.Lock() - s.opts.address = ts.Addr() + s.opts.Address = ts.Addr() s.Unlock() go ts.Accept(s.accept) @@ -307,11 +307,11 @@ func (s *rpcServer) Start() error { go func() { ch := <-s.exit ch <- ts.Close() - config.broker.Disconnect() + config.Broker.Disconnect() }() // TODO: subscribe to cruft - return config.broker.Connect() + return config.Broker.Connect() } func (s *rpcServer) Stop() error { diff --git a/server/server.go b/server/server.go index b9844601..65449b18 100644 --- a/server/server.go +++ b/server/server.go @@ -39,7 +39,7 @@ import ( ) type Server interface { - Config() options + Options() Options Init(...Option) Handle(Handler) error NewHandler(interface{}) Handler @@ -80,7 +80,7 @@ type Streamer interface { Close() error } -type Option func(*options) +type Option func(*Options) var ( DefaultAddress = ":0" @@ -91,8 +91,8 @@ var ( ) // Returns config options for the default service -func Config() options { - return DefaultServer.Config() +func DefaultOptions() Options { + return DefaultServer.Options() } // Initialises the default server with options passed in @@ -174,8 +174,8 @@ func Run() error { // Starts the default server func Start() error { - config := DefaultServer.Config() - log.Infof("Starting server %s id %s", config.Name(), config.Id()) + config := DefaultServer.Options() + log.Infof("Starting server %s id %s", config.Name, config.Id) return DefaultServer.Start() } diff --git a/server/subscriber.go b/server/subscriber.go index 7cca43cb..5a834497 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -154,7 +154,7 @@ func validateSubscriber(sub Subscriber) error { return nil } -func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handler { +func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { return func(p broker.Publication) error { msg := p.Message() ct := msg.Header["Content-Type"] @@ -216,8 +216,8 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handle return nil } - for i := len(opts.subWrappers); i > 0; i-- { - fn = opts.subWrappers[i-1](fn) + for i := len(opts.SubWrappers); i > 0; i-- { + fn = opts.SubWrappers[i-1](fn) } go fn(ctx, &rpcPublication{ diff --git a/transport/http_transport.go b/transport/http_transport.go index 59177806..5adeac73 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -22,7 +22,7 @@ type httpTransportClient struct { ht *httpTransport addr string conn net.Conn - dialOpts dialOptions + dialOpts DialOptions once sync.Once sync.Mutex @@ -87,7 +87,7 @@ func (h *httpTransportClient) Send(m *Message) error { func (h *httpTransportClient) Recv(m *Message) error { var r *http.Request - if !h.dialOpts.stream { + if !h.dialOpts.Stream { rc, ok := <-h.r if !ok { return io.EOF @@ -281,7 +281,7 @@ func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) { return nil, err } - var dopts dialOptions + var dopts DialOptions for _, opt := range opts { opt(&dopts) diff --git a/transport/transport.go b/transport/transport.go index 361c712e..ab647186 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -29,23 +29,29 @@ type Transport interface { String() string } -type options struct{} - -type dialOptions struct { - stream bool +type Options struct { + // Other options to be used by broker implementations + Options map[string]string } -type Option func(*options) +type DialOptions struct { + Stream bool -type DialOption func(*dialOptions) + // Other options to be used by broker implementations + Options map[string]string +} + +type Option func(*Options) + +type DialOption func(*DialOptions) var ( DefaultTransport Transport = newHttpTransport([]string{}) ) func WithStream() DialOption { - return func(o *dialOptions) { - o.stream = true + return func(o *DialOptions) { + o.Stream = true } }