213
									
								
								tunnel/broker/broker.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										213
									
								
								tunnel/broker/broker.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,213 @@ | |||||||
|  | // Package broker is a tunnel broker | ||||||
|  | package broker | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  |  | ||||||
|  | 	"github.com/micro/go-micro/broker" | ||||||
|  | 	"github.com/micro/go-micro/transport" | ||||||
|  | 	"github.com/micro/go-micro/tunnel" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type tunBroker struct { | ||||||
|  | 	opts   broker.Options | ||||||
|  | 	tunnel tunnel.Tunnel | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type tunSubscriber struct { | ||||||
|  | 	topic   string | ||||||
|  | 	handler broker.Handler | ||||||
|  | 	opts    broker.SubscribeOptions | ||||||
|  |  | ||||||
|  | 	closed   chan bool | ||||||
|  | 	listener tunnel.Listener | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type tunEvent struct { | ||||||
|  | 	topic   string | ||||||
|  | 	message *broker.Message | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // used to access tunnel from options context | ||||||
|  | type tunnelKey struct{} | ||||||
|  | type tunnelAddr struct{} | ||||||
|  |  | ||||||
|  | func (t *tunBroker) Init(opts ...broker.Option) error { | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&t.opts) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunBroker) Options() broker.Options { | ||||||
|  | 	return t.opts | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunBroker) Address() string { | ||||||
|  | 	return t.tunnel.Address() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunBroker) Connect() error { | ||||||
|  | 	return t.tunnel.Connect() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunBroker) Disconnect() error { | ||||||
|  | 	return t.tunnel.Close() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunBroker) Publish(topic string, m *broker.Message, opts ...broker.PublishOption) error { | ||||||
|  | 	// TODO: this is probably inefficient, we might want to just maintain an open connection | ||||||
|  | 	// it may be easier to add broadcast to the tunnel | ||||||
|  | 	c, err := t.tunnel.Dial(topic) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	defer c.Close() | ||||||
|  |  | ||||||
|  | 	return c.Send(&transport.Message{ | ||||||
|  | 		Header: m.Header, | ||||||
|  | 		Body:   m.Body, | ||||||
|  | 	}) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { | ||||||
|  | 	l, err := t.tunnel.Listen(topic) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	var options broker.SubscribeOptions | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	tunSub := &tunSubscriber{ | ||||||
|  | 		topic:    topic, | ||||||
|  | 		handler:  h, | ||||||
|  | 		opts:     options, | ||||||
|  | 		closed:   make(chan bool), | ||||||
|  | 		listener: l, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// start processing | ||||||
|  | 	go tunSub.run() | ||||||
|  |  | ||||||
|  | 	return tunSub, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunBroker) String() string { | ||||||
|  | 	return "tunnel" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunSubscriber) run() { | ||||||
|  | 	for { | ||||||
|  | 		// accept a new connection | ||||||
|  | 		c, err := t.listener.Accept() | ||||||
|  | 		if err != nil { | ||||||
|  | 			select { | ||||||
|  | 			case <-t.closed: | ||||||
|  | 				return | ||||||
|  | 			default: | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// receive message | ||||||
|  | 		m := new(transport.Message) | ||||||
|  | 		if err := c.Recv(m); err != nil { | ||||||
|  | 			c.Close() | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// close the connection | ||||||
|  | 		c.Close() | ||||||
|  |  | ||||||
|  | 		// handle the message | ||||||
|  | 		go t.handler(&tunEvent{ | ||||||
|  | 			topic: t.topic, | ||||||
|  | 			message: &broker.Message{ | ||||||
|  | 				Header: m.Header, | ||||||
|  | 				Body:   m.Body, | ||||||
|  | 			}, | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunSubscriber) Options() broker.SubscribeOptions { | ||||||
|  | 	return t.opts | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunSubscriber) Topic() string { | ||||||
|  | 	return t.topic | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunSubscriber) Unsubscribe() error { | ||||||
|  | 	select { | ||||||
|  | 	case <-t.closed: | ||||||
|  | 		return nil | ||||||
|  | 	default: | ||||||
|  | 		close(t.closed) | ||||||
|  | 		return t.listener.Close() | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunEvent) Topic() string { | ||||||
|  | 	return t.topic | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunEvent) Message() *broker.Message { | ||||||
|  | 	return t.message | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (t *tunEvent) Ack() error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewBroker(opts ...broker.Option) broker.Broker { | ||||||
|  | 	options := broker.Options{ | ||||||
|  | 		Context: context.Background(), | ||||||
|  | 	} | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  | 	t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel) | ||||||
|  | 	if !ok { | ||||||
|  | 		t = tunnel.NewTunnel() | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	a, ok := options.Context.Value(tunnelAddr{}).(string) | ||||||
|  | 	if ok { | ||||||
|  | 		// initialise address | ||||||
|  | 		t.Init(tunnel.Address(a)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if len(options.Addrs) > 0 { | ||||||
|  | 		// initialise nodes | ||||||
|  | 		t.Init(tunnel.Nodes(options.Addrs...)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return &tunBroker{ | ||||||
|  | 		opts:   options, | ||||||
|  | 		tunnel: t, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // WithAddress sets the tunnel address | ||||||
|  | func WithAddress(a string) broker.Option { | ||||||
|  | 	return func(o *broker.Options) { | ||||||
|  | 		if o.Context == nil { | ||||||
|  | 			o.Context = context.Background() | ||||||
|  | 		} | ||||||
|  | 		o.Context = context.WithValue(o.Context, tunnelAddr{}, a) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // WithTunnel sets the internal tunnel | ||||||
|  | func WithTunnel(t tunnel.Tunnel) broker.Option { | ||||||
|  | 	return func(o *broker.Options) { | ||||||
|  | 		if o.Context == nil { | ||||||
|  | 			o.Context = context.Background() | ||||||
|  | 		} | ||||||
|  | 		o.Context = context.WithValue(o.Context, tunnelKey{}, t) | ||||||
|  | 	} | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user