// Package broker is a tunnel broker
package broker

import (
	"context"

	"github.com/micro/go-micro/v2/broker"
	"github.com/micro/go-micro/v2/transport"
	"github.com/micro/go-micro/v2/tunnel"
)

type tunBroker struct {
	opts   broker.Options
	tunnel tunnel.Tunnel
}

type tunSubscriber struct {
	topic   string
	handler broker.Handler
	opts    broker.SubscribeOptions

	closed   chan bool
	listener tunnel.Listener
}

type tunEvent struct {
	topic   string
	message *broker.Message
}

// used to access tunnel from options context
type tunnelKey struct{}
type tunnelAddr struct{}

func (t *tunBroker) Init(opts ...broker.Option) error {
	for _, o := range opts {
		o(&t.opts)
	}
	return nil
}

func (t *tunBroker) Options() broker.Options {
	return t.opts
}

func (t *tunBroker) Address() string {
	return t.tunnel.Address()
}

func (t *tunBroker) Connect() error {
	return t.tunnel.Connect()
}

func (t *tunBroker) Disconnect() error {
	return t.tunnel.Close()
}

func (t *tunBroker) Publish(topic string, m *broker.Message, opts ...broker.PublishOption) error {
	// TODO: this is probably inefficient, we might want to just maintain an open connection
	// it may be easier to add broadcast to the tunnel
	c, err := t.tunnel.Dial(topic, tunnel.DialMode(tunnel.Multicast))
	if err != nil {
		return err
	}
	defer c.Close()

	return c.Send(&transport.Message{
		Header: m.Header,
		Body:   m.Body,
	})
}

func (t *tunBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
	l, err := t.tunnel.Listen(topic, tunnel.ListenMode(tunnel.Multicast))
	if err != nil {
		return nil, err
	}

	var options broker.SubscribeOptions
	for _, o := range opts {
		o(&options)
	}

	tunSub := &tunSubscriber{
		topic:    topic,
		handler:  h,
		opts:     options,
		closed:   make(chan bool),
		listener: l,
	}

	// start processing
	go tunSub.run()

	return tunSub, nil
}

func (t *tunBroker) String() string {
	return "tunnel"
}

func (t *tunSubscriber) run() {
	for {
		// accept a new connection
		c, err := t.listener.Accept()
		if err != nil {
			select {
			case <-t.closed:
				return
			default:
				continue
			}
		}

		// receive message
		m := new(transport.Message)
		if err := c.Recv(m); err != nil {
			c.Close()
			continue
		}

		// close the connection
		c.Close()

		// handle the message
		go t.handler(&tunEvent{
			topic: t.topic,
			message: &broker.Message{
				Header: m.Header,
				Body:   m.Body,
			},
		})
	}
}

func (t *tunSubscriber) Options() broker.SubscribeOptions {
	return t.opts
}

func (t *tunSubscriber) Topic() string {
	return t.topic
}

func (t *tunSubscriber) Unsubscribe() error {
	select {
	case <-t.closed:
		return nil
	default:
		close(t.closed)
		return t.listener.Close()
	}
}

func (t *tunEvent) Topic() string {
	return t.topic
}

func (t *tunEvent) Message() *broker.Message {
	return t.message
}

func (t *tunEvent) Ack() error {
	return nil
}

func (t *tunEvent) Error() error {
	return nil
}

func NewBroker(opts ...broker.Option) broker.Broker {
	options := broker.Options{
		Context: context.Background(),
	}
	for _, o := range opts {
		o(&options)
	}
	t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
	if !ok {
		t = tunnel.NewTunnel()
	}

	a, ok := options.Context.Value(tunnelAddr{}).(string)
	if ok {
		// initialise address
		t.Init(tunnel.Address(a))
	}

	if len(options.Addrs) > 0 {
		// initialise nodes
		t.Init(tunnel.Nodes(options.Addrs...))
	}

	return &tunBroker{
		opts:   options,
		tunnel: t,
	}
}

// WithAddress sets the tunnel address
func WithAddress(a string) broker.Option {
	return func(o *broker.Options) {
		if o.Context == nil {
			o.Context = context.Background()
		}
		o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
	}
}

// WithTunnel sets the internal tunnel
func WithTunnel(t tunnel.Tunnel) broker.Option {
	return func(o *broker.Options) {
		if o.Context == nil {
			o.Context = context.Background()
		}
		o.Context = context.WithValue(o.Context, tunnelKey{}, t)
	}
}