revert some changes
This commit is contained in:
		| @@ -44,3 +44,27 @@ var ( | |||||||
| func NewBroker(opts ...Option) Broker { | func NewBroker(opts ...Option) Broker { | ||||||
| 	return newHttpBroker(opts...) | 	return newHttpBroker(opts...) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func Init(opts ...Option) error { | ||||||
|  | 	return DefaultBroker.Init(opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func Connect() error { | ||||||
|  | 	return DefaultBroker.Connect() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func Disconnect() error { | ||||||
|  | 	return DefaultBroker.Disconnect() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func Publish(topic string, msg *Message, opts ...PublishOption) error { | ||||||
|  | 	return DefaultBroker.Publish(topic, msg, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { | ||||||
|  | 	return DefaultBroker.Subscribe(topic, handler, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func String() string { | ||||||
|  | 	return DefaultBroker.String() | ||||||
|  | } | ||||||
|   | |||||||
| @@ -76,7 +76,39 @@ var ( | |||||||
| 	DefaultPoolTTL = time.Minute | 	DefaultPoolTTL = time.Minute | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // Makes a synchronous call to a service using the default client | ||||||
|  | func Call(ctx context.Context, request Request, response interface{}, opts ...CallOption) error { | ||||||
|  | 	return DefaultClient.Call(ctx, request, response, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Publishes a publication using the default client. Using the underlying broker | ||||||
|  | // set within the options. | ||||||
|  | func Publish(ctx context.Context, msg Message) error { | ||||||
|  | 	return DefaultClient.Publish(ctx, msg) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Creates a new message using the default client | ||||||
|  | func NewMessage(topic string, payload interface{}) Message { | ||||||
|  | 	return DefaultClient.NewMessage(topic, payload) | ||||||
|  | } | ||||||
|  |  | ||||||
| // Creates a new client with the options passed in | // Creates a new client with the options passed in | ||||||
| func NewClient(opt ...Option) Client { | func NewClient(opt ...Option) Client { | ||||||
| 	return newRpcClient(opt...) | 	return newRpcClient(opt...) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Creates a new request using the default client. Content Type will | ||||||
|  | // be set to the default within options and use the appropriate codec | ||||||
|  | func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { | ||||||
|  | 	return DefaultClient.NewRequest(service, method, request, reqOpts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Creates a streaming connection with a service and returns responses on the | ||||||
|  | // channel passed in. It's up to the user to close the streamer. | ||||||
|  | func NewStream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) { | ||||||
|  | 	return DefaultClient.Stream(ctx, request, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func String() string { | ||||||
|  | 	return DefaultClient.String() | ||||||
|  | } | ||||||
|   | |||||||
| @@ -33,3 +33,32 @@ var ( | |||||||
| func NewRegistry(opts ...Option) Registry { | func NewRegistry(opts ...Option) Registry { | ||||||
| 	return newConsulRegistry(opts...) | 	return newConsulRegistry(opts...) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Register a service node. Additionally supply options such as TTL. | ||||||
|  | func Register(s *Service, opts ...RegisterOption) error { | ||||||
|  | 	return DefaultRegistry.Register(s, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Deregister a service node | ||||||
|  | func Deregister(s *Service) error { | ||||||
|  | 	return DefaultRegistry.Deregister(s) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Retrieve a service. A slice is returned since we separate Name/Version. | ||||||
|  | func GetService(name string) ([]*Service, error) { | ||||||
|  | 	return DefaultRegistry.GetService(name) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // List the services. Only returns service names | ||||||
|  | func ListServices() ([]*Service, error) { | ||||||
|  | 	return DefaultRegistry.ListServices() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Watch returns a watcher which allows you to track updates to the registry. | ||||||
|  | func Watch(opts ...WatchOption) (Watcher, error) { | ||||||
|  | 	return DefaultRegistry.Watch(opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func String() string { | ||||||
|  | 	return DefaultRegistry.String() | ||||||
|  | } | ||||||
|   | |||||||
| @@ -3,7 +3,11 @@ package server | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"os" | ||||||
|  | 	"os/signal" | ||||||
|  | 	"syscall" | ||||||
|  |  | ||||||
|  | 	"github.com/micro/go-log" | ||||||
| 	"github.com/pborman/uuid" | 	"github.com/pborman/uuid" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -63,7 +67,102 @@ var ( | |||||||
| 	DefaultServer  Server = newRpcServer() | 	DefaultServer  Server = newRpcServer() | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // DefaultOptions returns config options for the default service | ||||||
|  | func DefaultOptions() Options { | ||||||
|  | 	return DefaultServer.Options() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Init initialises the default server with options passed in | ||||||
|  | func Init(opt ...Option) { | ||||||
|  | 	if DefaultServer == nil { | ||||||
|  | 		DefaultServer = newRpcServer(opt...) | ||||||
|  | 	} | ||||||
|  | 	DefaultServer.Init(opt...) | ||||||
|  | } | ||||||
|  |  | ||||||
| // NewServer returns a new server with options passed in | // NewServer returns a new server with options passed in | ||||||
| func NewServer(opt ...Option) Server { | func NewServer(opt ...Option) Server { | ||||||
| 	return newRpcServer(opt...) | 	return newRpcServer(opt...) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // NewSubscriber creates a new subscriber interface with the given topic | ||||||
|  | // and handler using the default server | ||||||
|  | func NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber { | ||||||
|  | 	return DefaultServer.NewSubscriber(topic, h, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // NewHandler creates a new handler interface using the default server | ||||||
|  | // Handlers are required to be a public object with public | ||||||
|  | // methods. Call to a service method such as Foo.Bar expects | ||||||
|  | // the type: | ||||||
|  | // | ||||||
|  | //	type Foo struct {} | ||||||
|  | //	func (f *Foo) Bar(ctx, req, rsp) error { | ||||||
|  | //		return nil | ||||||
|  | //	} | ||||||
|  | // | ||||||
|  | func NewHandler(h interface{}, opts ...HandlerOption) Handler { | ||||||
|  | 	return DefaultServer.NewHandler(h, opts...) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Handle registers a handler interface with the default server to | ||||||
|  | // handle inbound requests | ||||||
|  | func Handle(h Handler) error { | ||||||
|  | 	return DefaultServer.Handle(h) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Subscribe registers a subscriber interface with the default server | ||||||
|  | // which subscribes to specified topic with the broker | ||||||
|  | func Subscribe(s Subscriber) error { | ||||||
|  | 	return DefaultServer.Subscribe(s) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Register registers the default server with the discovery system | ||||||
|  | func Register() error { | ||||||
|  | 	return DefaultServer.Register() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Deregister deregisters the default server from the discovery system | ||||||
|  | func Deregister() error { | ||||||
|  | 	return DefaultServer.Deregister() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Run starts the default server and waits for a kill | ||||||
|  | // signal before exiting. Also registers/deregisters the server | ||||||
|  | func Run() error { | ||||||
|  | 	if err := Start(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := DefaultServer.Register(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	ch := make(chan os.Signal, 1) | ||||||
|  | 	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) | ||||||
|  | 	log.Logf("Received signal %s", <-ch) | ||||||
|  |  | ||||||
|  | 	if err := DefaultServer.Deregister(); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return Stop() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Start starts the default server | ||||||
|  | func Start() error { | ||||||
|  | 	config := DefaultServer.Options() | ||||||
|  | 	log.Logf("Starting server %s id %s", config.Name, config.Id) | ||||||
|  | 	return DefaultServer.Start() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Stop stops the default server | ||||||
|  | func Stop() error { | ||||||
|  | 	log.Logf("Stopping server") | ||||||
|  | 	return DefaultServer.Stop() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // String returns name of Server implementation | ||||||
|  | func String() string { | ||||||
|  | 	return DefaultServer.String() | ||||||
|  | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user