package nats

import (
	"encoding/json"
	"strings"

	"github.com/apcera/nats"
	"github.com/myodc/go-micro/broker"
)

type nbroker struct {
	addrs []string
	conn  *nats.Conn
}

type subscriber struct {
	s *nats.Subscription
}

func (n *subscriber) Topic() string {
	return n.s.Subject
}

func (n *subscriber) Unsubscribe() error {
	return n.s.Unsubscribe()
}

func (n *nbroker) Address() string {
	if len(n.addrs) > 0 {
		return n.addrs[0]
	}
	return ""
}

func (n *nbroker) Connect() error {
	if n.conn != nil {
		return nil
	}

	opts := nats.DefaultOptions
	opts.Servers = n.addrs
	c, err := opts.Connect()
	if err != nil {
		return err
	}
	n.conn = c
	return nil
}

func (n *nbroker) Disconnect() error {
	n.conn.Close()
	return nil
}

func (n *nbroker) Init() error {
	return nil
}

func (n *nbroker) Publish(topic string, msg *broker.Message) error {
	b, err := json.Marshal(msg)
	if err != nil {
		return err
	}
	return n.conn.Publish(topic, b)
}

func (n *nbroker) Subscribe(topic string, handler broker.Handler) (broker.Subscriber, error) {
	sub, err := n.conn.Subscribe(topic, func(msg *nats.Msg) {
		var m *broker.Message
		if err := json.Unmarshal(msg.Data, &m); err != nil {
			return
		}
		handler(m)
	})
	if err != nil {
		return nil, err
	}
	return &subscriber{s: sub}, nil
}

func NewBroker(addrs []string, opt ...broker.Option) broker.Broker {
	var cAddrs []string
	for _, addr := range addrs {
		if len(addr) == 0 {
			continue
		}
		if !strings.HasPrefix(addr, "nats://") {
			addr = "nats://" + addr
		}
		cAddrs = append(cAddrs, addr)
	}
	if len(cAddrs) == 0 {
		cAddrs = []string{nats.DefaultURL}
	}
	return &nbroker{
		addrs: cAddrs,
	}
}