diff --git a/tunnel/broker/broker.go b/tunnel/broker/broker.go new file mode 100644 index 00000000..0a33c610 --- /dev/null +++ b/tunnel/broker/broker.go @@ -0,0 +1,213 @@ +// Package broker is a tunnel broker +package broker + +import ( + "context" + + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/transport" + "github.com/micro/go-micro/tunnel" +) + +type tunBroker struct { + opts broker.Options + tunnel tunnel.Tunnel +} + +type tunSubscriber struct { + topic string + handler broker.Handler + opts broker.SubscribeOptions + + closed chan bool + listener tunnel.Listener +} + +type tunEvent struct { + topic string + message *broker.Message +} + +// used to access tunnel from options context +type tunnelKey struct{} +type tunnelAddr struct{} + +func (t *tunBroker) Init(opts ...broker.Option) error { + for _, o := range opts { + o(&t.opts) + } + return nil +} + +func (t *tunBroker) Options() broker.Options { + return t.opts +} + +func (t *tunBroker) Address() string { + return t.tunnel.Address() +} + +func (t *tunBroker) Connect() error { + return t.tunnel.Connect() +} + +func (t *tunBroker) Disconnect() error { + return t.tunnel.Close() +} + +func (t *tunBroker) Publish(topic string, m *broker.Message, opts ...broker.PublishOption) error { + // TODO: this is probably inefficient, we might want to just maintain an open connection + // it may be easier to add broadcast to the tunnel + c, err := t.tunnel.Dial(topic) + if err != nil { + return err + } + defer c.Close() + + return c.Send(&transport.Message{ + Header: m.Header, + Body: m.Body, + }) +} + +func (t *tunBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + l, err := t.tunnel.Listen(topic) + if err != nil { + return nil, err + } + + var options broker.SubscribeOptions + for _, o := range opts { + o(&options) + } + + tunSub := &tunSubscriber{ + topic: topic, + handler: h, + opts: options, + closed: make(chan bool), + listener: l, + } + + // start processing + go tunSub.run() + + return tunSub, nil +} + +func (t *tunBroker) String() string { + return "tunnel" +} + +func (t *tunSubscriber) run() { + for { + // accept a new connection + c, err := t.listener.Accept() + if err != nil { + select { + case <-t.closed: + return + default: + continue + } + } + + // receive message + m := new(transport.Message) + if err := c.Recv(m); err != nil { + c.Close() + continue + } + + // close the connection + c.Close() + + // handle the message + go t.handler(&tunEvent{ + topic: t.topic, + message: &broker.Message{ + Header: m.Header, + Body: m.Body, + }, + }) + } +} + +func (t *tunSubscriber) Options() broker.SubscribeOptions { + return t.opts +} + +func (t *tunSubscriber) Topic() string { + return t.topic +} + +func (t *tunSubscriber) Unsubscribe() error { + select { + case <-t.closed: + return nil + default: + close(t.closed) + return t.listener.Close() + } +} + +func (t *tunEvent) Topic() string { + return t.topic +} + +func (t *tunEvent) Message() *broker.Message { + return t.message +} + +func (t *tunEvent) Ack() error { + return nil +} + +func NewBroker(opts ...broker.Option) broker.Broker { + options := broker.Options{ + Context: context.Background(), + } + for _, o := range opts { + o(&options) + } + t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel) + if !ok { + t = tunnel.NewTunnel() + } + + a, ok := options.Context.Value(tunnelAddr{}).(string) + if ok { + // initialise address + t.Init(tunnel.Address(a)) + } + + if len(options.Addrs) > 0 { + // initialise nodes + t.Init(tunnel.Nodes(options.Addrs...)) + } + + return &tunBroker{ + opts: options, + tunnel: t, + } +} + +// WithAddress sets the tunnel address +func WithAddress(a string) broker.Option { + return func(o *broker.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, tunnelAddr{}, a) + } +} + +// WithTunnel sets the internal tunnel +func WithTunnel(t tunnel.Tunnel) broker.Option { + return func(o *broker.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, tunnelKey{}, t) + } +}