// Package broker is a tunnel broker
package broker // import "go.unistack.org/micro/v3/network/tunnel/broker"

import (
	"context"
	"fmt"

	"go.unistack.org/micro/v3/broker"
	"go.unistack.org/micro/v3/logger"
	"go.unistack.org/micro/v3/metadata"
	"go.unistack.org/micro/v3/network/transport"
	"go.unistack.org/micro/v3/network/tunnel"
)

type tunBroker struct {
	tunnel tunnel.Tunnel
	opts   broker.Options
}

type tunSubscriber struct {
	listener tunnel.Listener
	handler  broker.Handler
	closed   chan bool
	topic    string
	opts     broker.SubscribeOptions
}

type tunBatchSubscriber struct {
	listener tunnel.Listener
	handler  broker.BatchHandler
	closed   chan bool
	topic    string
	opts     broker.SubscribeOptions
}

type tunEvent struct {
	err     error
	message *broker.Message
	topic   string
}

// used to access tunnel from options context
type (
	tunnelKey  struct{}
	tunnelAddr struct{}
)

func (t *tunBroker) Init(opts ...broker.Option) error {
	for _, o := range opts {
		o(&t.opts)
	}
	return nil
}

func (t *tunBroker) Name() string {
	return t.opts.Name
}

func (t *tunBroker) Options() broker.Options {
	return t.opts
}

func (t *tunBroker) Address() string {
	return t.tunnel.Address()
}

func (t *tunBroker) Connect(ctx context.Context) error {
	return t.tunnel.Connect(ctx)
}

func (t *tunBroker) Disconnect(ctx context.Context) error {
	return t.tunnel.Close(ctx)
}

func (t *tunBroker) BatchPublish(ctx context.Context, msgs []*broker.Message, opts ...broker.PublishOption) error {
	// TODO: this is probably inefficient, we might want to just maintain an open connection
	// it may be easier to add broadcast to the tunnel
	topicMap := make(map[string]tunnel.Session)

	var err error
	for _, msg := range msgs {
		topic, _ := msg.Header.Get(metadata.HeaderTopic)
		c, ok := topicMap[topic]
		if !ok {
			c, err = t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
			if err != nil {
				return err
			}
			defer c.Close()
			topicMap[topic] = c
		}

		if err = c.Send(&transport.Message{
			Header: msg.Header,
			Body:   msg.Body,
		}); err != nil {
			//	msg.SetError(err)
			return err
		}
	}

	return nil
}

func (t *tunBroker) Publish(ctx context.Context, topic string, m *broker.Message, opts ...broker.PublishOption) error {
	// TODO: this is probably inefficient, we might want to just maintain an open connection
	// it may be easier to add broadcast to the tunnel
	c, err := t.tunnel.Dial(ctx, topic, tunnel.DialMode(tunnel.Multicast))
	if err != nil {
		return err
	}
	defer c.Close()

	return c.Send(&transport.Message{
		Header: m.Header,
		Body:   m.Body,
	})
}

func (t *tunBroker) BatchSubscribe(ctx context.Context, topic string, h broker.BatchHandler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
	l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
	if err != nil {
		return nil, err
	}

	tunSub := &tunBatchSubscriber{
		topic:    topic,
		handler:  h,
		opts:     broker.NewSubscribeOptions(opts...),
		closed:   make(chan bool),
		listener: l,
	}

	// start processing
	go tunSub.run()

	return tunSub, nil
}

func (t *tunBroker) Subscribe(ctx context.Context, topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
	l, err := t.tunnel.Listen(ctx, topic, tunnel.ListenMode(tunnel.Multicast))
	if err != nil {
		return nil, err
	}

	tunSub := &tunSubscriber{
		topic:    topic,
		handler:  h,
		opts:     broker.NewSubscribeOptions(opts...),
		closed:   make(chan bool),
		listener: l,
	}

	// start processing
	go tunSub.run()

	return tunSub, nil
}

func (t *tunBroker) String() string {
	return "tunnel"
}

func (t *tunBatchSubscriber) run() {
	for {
		// accept a new connection
		c, err := t.listener.Accept()
		if err != nil {
			select {
			case <-t.closed:
				return
			default:
				continue
			}
		}

		// receive message
		m := new(transport.Message)
		if err := c.Recv(m); err != nil {
			if logger.V(logger.ErrorLevel) {
				logger.Error(t.opts.Context, err.Error())
			}
			if err = c.Close(); err != nil {
				if logger.V(logger.ErrorLevel) {
					logger.Error(t.opts.Context, err.Error())
				}
			}
			continue
		}

		// close the connection
		c.Close()

		evts := broker.Events{&tunEvent{
			topic: t.topic,
			message: &broker.Message{
				Header: m.Header,
				Body:   m.Body,
			},
		}}
		// handle the message
		go func() {
			_ = t.handler(evts)
		}()

	}
}

func (t *tunSubscriber) run() {
	for {
		// accept a new connection
		c, err := t.listener.Accept()
		if err != nil {
			select {
			case <-t.closed:
				return
			default:
				continue
			}
		}

		// receive message
		m := new(transport.Message)
		if err := c.Recv(m); err != nil {
			if logger.V(logger.ErrorLevel) {
				logger.Error(t.opts.Context, err.Error())
			}
			if err = c.Close(); err != nil {
				if logger.V(logger.ErrorLevel) {
					logger.Error(t.opts.Context, err.Error())
				}
			}
			continue
		}

		// close the connection
		c.Close()

		// handle the message
		go func() {
			_ = t.handler(&tunEvent{
				topic: t.topic,
				message: &broker.Message{
					Header: m.Header,
					Body:   m.Body,
				},
			})
		}()
	}
}

func (t *tunBatchSubscriber) Options() broker.SubscribeOptions {
	return t.opts
}

func (t *tunBatchSubscriber) Topic() string {
	return t.topic
}

func (t *tunBatchSubscriber) Unsubscribe(ctx context.Context) error {
	select {
	case <-t.closed:
		return nil
	default:
		close(t.closed)
		return t.listener.Close()
	}
}

func (t *tunSubscriber) Options() broker.SubscribeOptions {
	return t.opts
}

func (t *tunSubscriber) Topic() string {
	return t.topic
}

func (t *tunSubscriber) Unsubscribe(ctx context.Context) error {
	select {
	case <-t.closed:
		return nil
	default:
		close(t.closed)
		return t.listener.Close()
	}
}

func (t *tunEvent) Topic() string {
	return t.topic
}

func (t *tunEvent) Message() *broker.Message {
	return t.message
}

func (t *tunEvent) Ack() error {
	return nil
}

func (t *tunEvent) Error() error {
	return t.err
}

func (t *tunEvent) SetError(err error) {
	t.err = err
}

// NewBroker returns new tunnel broker
func NewBroker(opts ...broker.Option) (broker.Broker, error) {
	options := broker.NewOptions(opts...)

	t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
	if !ok {
		return nil, fmt.Errorf("tunnel not set")
	}

	a, ok := options.Context.Value(tunnelAddr{}).(string)
	if ok {
		// initialise address
		if err := t.Init(tunnel.Address(a)); err != nil {
			return nil, err
		}
	}

	if len(options.Addrs) > 0 {
		// initialise nodes
		if err := t.Init(tunnel.Nodes(options.Addrs...)); err != nil {
			return nil, err
		}
	}

	return &tunBroker{
		opts:   options,
		tunnel: t,
	}, nil
}

// WithAddress sets the tunnel address
func WithAddress(a string) broker.Option {
	return func(o *broker.Options) {
		if o.Context == nil {
			o.Context = context.Background()
		}
		o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
	}
}

// WithTunnel sets the internal tunnel
func WithTunnel(t tunnel.Tunnel) broker.Option {
	return func(o *broker.Options) {
		if o.Context == nil {
			o.Context = context.Background()
		}
		o.Context = context.WithValue(o.Context, tunnelKey{}, t)
	}
}