// Package cockroach implements the cockroach store
package cockroach

import (
	"database/sql"
	"fmt"
	"net/url"
	"regexp"
	"strings"
	"time"

	"github.com/lib/pq"
	"github.com/micro/go-micro/v2/store"
	"github.com/pkg/errors"
)

// DefaultNamespace is the namespace that the sql store
// will use if no namespace is provided.
var (
	DefaultNamespace = "micro"
	DefaultPrefix    = "micro"
)

type sqlStore struct {
	db *sql.DB

	database string
	table    string

	list       *sql.Stmt
	readOne    *sql.Stmt
	readMany   *sql.Stmt
	readOffset *sql.Stmt
	write      *sql.Stmt
	delete     *sql.Stmt

	options store.Options
}

func (s *sqlStore) Init(opts ...store.Option) error {
	for _, o := range opts {
		o(&s.options)
	}
	// reconfigure
	return s.configure()
}

// List all the known records
func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) {
	rows, err := s.list.Query()
	var keys []string
	var timehelper pq.NullTime
	if err != nil {
		if err == sql.ErrNoRows {
			return keys, nil
		}
		return nil, err
	}
	defer rows.Close()
	for rows.Next() {
		record := &store.Record{}
		if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
			return keys, err
		}
		if timehelper.Valid {
			if timehelper.Time.Before(time.Now()) {
				// record has expired
				go s.Delete(record.Key)
			} else {
				record.Expiry = time.Until(timehelper.Time)
				keys = append(keys, record.Key)
			}
		} else {
			keys = append(keys, record.Key)
		}

	}
	rowErr := rows.Close()
	if rowErr != nil {
		// transaction rollback or something
		return keys, rowErr
	}
	if err := rows.Err(); err != nil {
		return keys, err
	}
	return keys, nil
}

// Read a single key
func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
	var options store.ReadOptions
	for _, o := range opts {
		o(&options)
	}

	if options.Prefix || options.Suffix {
		return s.read(key, options)
	}

	var records []*store.Record
	var timehelper pq.NullTime

	row := s.readOne.QueryRow(key)
	record := &store.Record{}
	if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
		if err == sql.ErrNoRows {
			return records, store.ErrNotFound
		}
		return records, err
	}
	if timehelper.Valid {
		if timehelper.Time.Before(time.Now()) {
			// record has expired
			go s.Delete(key)
			return records, store.ErrNotFound
		}
		record.Expiry = time.Until(timehelper.Time)
		records = append(records, record)
	} else {
		records = append(records, record)
	}

	return records, nil
}

// Read Many records
func (s *sqlStore) read(key string, options store.ReadOptions) ([]*store.Record, error) {
	pattern := "%"
	if options.Prefix {
		pattern = key + pattern
	}
	if options.Suffix {
		pattern = pattern + key
	}
	var rows *sql.Rows
	var err error
	if options.Limit != 0 {
		rows, err = s.readOffset.Query(pattern, options.Limit, options.Offset)
	} else {
		rows, err = s.readMany.Query(pattern)
	}
	if err != nil {
		if err == sql.ErrNoRows {
			return []*store.Record{}, nil
		}
		return []*store.Record{}, errors.Wrap(err, "sqlStore.read failed")
	}
	defer rows.Close()
	var records []*store.Record
	var timehelper pq.NullTime

	for rows.Next() {
		record := &store.Record{}
		if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
			return records, err
		}
		if timehelper.Valid {
			if timehelper.Time.Before(time.Now()) {
				// record has expired
				go s.Delete(record.Key)
			} else {
				record.Expiry = time.Until(timehelper.Time)
				records = append(records, record)
			}
		} else {
			records = append(records, record)
		}
	}
	rowErr := rows.Close()
	if rowErr != nil {
		// transaction rollback or something
		return records, rowErr
	}
	if err := rows.Err(); err != nil {
		return records, err
	}

	return records, nil
}

// Write records
func (s *sqlStore) Write(r *store.Record, opts ...store.WriteOption) error {
	var err error
	if r.Expiry != 0 {
		_, err = s.write.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
	} else {
		_, err = s.write.Exec(r.Key, r.Value, nil)
	}

	if err != nil {
		return errors.Wrap(err, "Couldn't insert record "+r.Key)
	}

	return nil
}

// Delete records with keys
func (s *sqlStore) Delete(key string, opts ...store.DeleteOption) error {
	result, err := s.delete.Exec(key)
	if err != nil {
		return err
	}
	_, err = result.RowsAffected()
	if err != nil {
		return err
	}

	return nil
}

func (s *sqlStore) initDB() error {
	// Create the namespace's database
	_, err := s.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s ;", s.database))
	if err != nil {
		return err
	}

	_, err = s.db.Exec(fmt.Sprintf("SET DATABASE = %s ;", s.database))
	if err != nil {
		return errors.Wrap(err, "Couldn't set database")
	}

	// Create a table for the namespace's prefix
	_, err = s.db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s
	(
		key text NOT NULL,
		value bytea,
		expiry timestamp with time zone,
		CONSTRAINT %s_pkey PRIMARY KEY (key)
	);`, s.table, s.table))
	if err != nil {
		return errors.Wrap(err, "Couldn't create table")
	}

	// Create Index
	_, err = s.db.Exec(fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "%s" ON %s.%s USING btree ("key");`, "key_index_"+s.table, s.database, s.table))
	if err != nil {
		return err
	}

	list, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s;", s.database, s.table))
	if err != nil {
		return errors.Wrap(err, "List statement couldn't be prepared")
	}
	if s.list != nil {
		s.list.Close()
	}
	s.list = list
	readOne, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table))
	if err != nil {
		return errors.Wrap(err, "ReadOne statement couldn't be prepared")
	}
	if s.readOne != nil {
		s.readOne.Close()
	}
	s.readOne = readOne
	readMany, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1;", s.database, s.table))
	if err != nil {
		return errors.Wrap(err, "ReadMany statement couldn't be prepared")
	}
	if s.readMany != nil {
		s.readMany.Close()
	}
	s.readMany = readMany
	readOffset, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;", s.database, s.table))
	if err != nil {
		return errors.Wrap(err, "ReadOffset statement couldn't be prepared")
	}
	if s.readOffset != nil {
		s.readOffset.Close()
	}
	s.readOffset = readOffset
	write, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry)
		VALUES ($1, $2::bytea, $3)
		ON CONFLICT (key)
		DO UPDATE
		SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.database, s.table))
	if err != nil {
		return errors.Wrap(err, "Write statement couldn't be prepared")
	}
	if s.write != nil {
		s.write.Close()
	}
	s.write = write
	delete, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table))
	if err != nil {
		return errors.Wrap(err, "Delete statement couldn't be prepared")
	}
	if s.delete != nil {
		s.delete.Close()
	}
	s.delete = delete

	return nil
}

func (s *sqlStore) configure() error {
	if len(s.options.Nodes) == 0 {
		s.options.Nodes = []string{"localhost:26257"}
	}

	namespace := s.options.Namespace
	if len(namespace) == 0 {
		namespace = DefaultNamespace
	}

	prefix := s.options.Prefix
	if len(prefix) == 0 {
		prefix = DefaultPrefix
	}

	// store.namespace must only contain letters, numbers and underscores
	reg, err := regexp.Compile("[^a-zA-Z0-9]+")
	if err != nil {
		return errors.New("error compiling regex for namespace")
	}
	namespace = reg.ReplaceAllString(namespace, "_")

	source := s.options.Nodes[0]
	// check if it is a standard connection string eg: host=%s port=%d user=%s password=%s dbname=%s sslmode=disable
	// if err is nil which means it would be a URL like postgre://xxxx?yy=zz
	_, err = url.Parse(source)
	if err != nil {
		if !strings.Contains(source, " ") {
			source = fmt.Sprintf("host=%s", source)
		}
	}

	// create source from first node
	db, err := sql.Open("postgres", source)
	if err != nil {
		return err
	}

	if err := db.Ping(); err != nil {
		return err
	}

	if s.db != nil {
		s.db.Close()
	}

	// save the values
	s.db = db
	s.database = namespace
	s.table = prefix

	// initialise the database
	return s.initDB()
}

func (s *sqlStore) String() string {
	return "cockroach"
}

func (s *sqlStore) Options() store.Options {
	return s.options
}

// NewStore returns a new micro Store backed by sql
func NewStore(opts ...store.Option) store.Store {
	var options store.Options
	for _, o := range opts {
		o(&options)
	}

	// new store
	s := new(sqlStore)
	// set the options
	s.options = options

	// best-effort configure the store
	s.configure()

	// return store
	return s
}