From 3e9f556e4ac5aa023ce25679f3af5a49566e938f Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 31 May 2017 20:45:36 +0100 Subject: [PATCH] working function solution --- function.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++++ go-micro.go | 5 ++++ 2 files changed, 72 insertions(+) create mode 100644 function.go diff --git a/function.go b/function.go new file mode 100644 index 00000000..631c04ac --- /dev/null +++ b/function.go @@ -0,0 +1,67 @@ +package micro + +import ( + "github.com/micro/go-micro/server" + "golang.org/x/net/context" +) + +type function struct { + cancel context.CancelFunc + Service +} + +func fnHandlerWrapper(f Function) server.HandlerWrapper { + return func(h server.HandlerFunc) server.HandlerFunc { + return func(ctx context.Context, req server.Request, rsp interface{}) error { + defer f.Done() + return h(ctx, req, rsp) + } + } +} + +func fnSubWrapper(f Function) server.SubscriberWrapper { + return func(s server.SubscriberFunc) server.SubscriberFunc { + return func(ctx context.Context, msg server.Publication) error { + defer f.Done() + return s(ctx, msg) + } + } +} + +func newFunction(opts ...Option) Function { + ctx, cancel := context.WithCancel(context.Background()) + opts = append(opts, Context(ctx)) + service := newService(opts...) + + fn := &function{ + cancel: cancel, + Service: service, + } + + service.Server().Init( + // ensure the service waits for requests to finish + server.Wait(true), + // wrap handlers and subscribers to finish execution + server.WrapHandler(fnHandlerWrapper(fn)), + server.WrapSubscriber(fnSubWrapper(fn)), + ) + + return fn +} + +func (f *function) Done() error { + f.cancel() + return nil +} + +func (f *function) Handle(v interface{}) error { + return f.Service.Server().Handle( + f.Service.Server().NewHandler(v), + ) +} + +func (f *function) Subscribe(topic string, v interface{}) error { + return f.Service.Server().Subscribe( + f.Service.Server().NewSubscriber(topic, v), + ) +} diff --git a/go-micro.go b/go-micro.go index 56c5630a..818b3476 100644 --- a/go-micro.go +++ b/go-micro.go @@ -61,6 +61,11 @@ func NewContext(ctx context.Context, s Service) context.Context { return context.WithValue(ctx, serviceKey{}, s) } +// NewFunction returns a new Function for a one time executing Service +func NewFunction(opts ...Option) Function { + return newFunction(opts...) +} + // NewPublisher returns a new Publisher func NewPublisher(topic string, c client.Client) Publisher { if c == nil {