* Rename Namespace to DB, Rename Prefix to table, Remove Suffix Option * Rename options * Rename options * Add store_table option * Table per service, not Database per service
		
			
				
	
	
		
			379 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			379 lines
		
	
	
		
			8.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package cockroach implements the cockroach store
 | |
| package cockroach
 | |
| 
 | |
| import (
 | |
| 	"database/sql"
 | |
| 	"fmt"
 | |
| 	"net/url"
 | |
| 	"regexp"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/lib/pq"
 | |
| 	"github.com/micro/go-micro/v2/store"
 | |
| 	"github.com/pkg/errors"
 | |
| )
 | |
| 
 | |
| // DefaultNamespace is the namespace that the sql store
 | |
| // will use if no namespace is provided.
 | |
| var (
 | |
| 	DefaultNamespace = "micro"
 | |
| 	DefaultPrefix    = "micro"
 | |
| )
 | |
| 
 | |
| type sqlStore struct {
 | |
| 	db *sql.DB
 | |
| 
 | |
| 	database string
 | |
| 	table    string
 | |
| 
 | |
| 	list       *sql.Stmt
 | |
| 	readOne    *sql.Stmt
 | |
| 	readMany   *sql.Stmt
 | |
| 	readOffset *sql.Stmt
 | |
| 	write      *sql.Stmt
 | |
| 	delete     *sql.Stmt
 | |
| 
 | |
| 	options store.Options
 | |
| }
 | |
| 
 | |
| func (s *sqlStore) Init(opts ...store.Option) error {
 | |
| 	for _, o := range opts {
 | |
| 		o(&s.options)
 | |
| 	}
 | |
| 	// reconfigure
 | |
| 	return s.configure()
 | |
| }
 | |
| 
 | |
| // List all the known records
 | |
| func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) {
 | |
| 	rows, err := s.list.Query()
 | |
| 	var keys []string
 | |
| 	var timehelper pq.NullTime
 | |
| 	if err != nil {
 | |
| 		if err == sql.ErrNoRows {
 | |
| 			return keys, nil
 | |
| 		}
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer rows.Close()
 | |
| 	for rows.Next() {
 | |
| 		record := &store.Record{}
 | |
| 		if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
 | |
| 			return keys, err
 | |
| 		}
 | |
| 		if timehelper.Valid {
 | |
| 			if timehelper.Time.Before(time.Now()) {
 | |
| 				// record has expired
 | |
| 				go s.Delete(record.Key)
 | |
| 			} else {
 | |
| 				record.Expiry = time.Until(timehelper.Time)
 | |
| 				keys = append(keys, record.Key)
 | |
| 			}
 | |
| 		} else {
 | |
| 			keys = append(keys, record.Key)
 | |
| 		}
 | |
| 
 | |
| 	}
 | |
| 	rowErr := rows.Close()
 | |
| 	if rowErr != nil {
 | |
| 		// transaction rollback or something
 | |
| 		return keys, rowErr
 | |
| 	}
 | |
| 	if err := rows.Err(); err != nil {
 | |
| 		return keys, err
 | |
| 	}
 | |
| 	return keys, nil
 | |
| }
 | |
| 
 | |
| // Read a single key
 | |
| func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
 | |
| 	var options store.ReadOptions
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 
 | |
| 	if options.Prefix || options.Suffix {
 | |
| 		return s.read(key, options)
 | |
| 	}
 | |
| 
 | |
| 	var records []*store.Record
 | |
| 	var timehelper pq.NullTime
 | |
| 
 | |
| 	row := s.readOne.QueryRow(key)
 | |
| 	record := &store.Record{}
 | |
| 	if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil {
 | |
| 		if err == sql.ErrNoRows {
 | |
| 			return records, store.ErrNotFound
 | |
| 		}
 | |
| 		return records, err
 | |
| 	}
 | |
| 	if timehelper.Valid {
 | |
| 		if timehelper.Time.Before(time.Now()) {
 | |
| 			// record has expired
 | |
| 			go s.Delete(key)
 | |
| 			return records, store.ErrNotFound
 | |
| 		}
 | |
| 		record.Expiry = time.Until(timehelper.Time)
 | |
| 		records = append(records, record)
 | |
| 	} else {
 | |
| 		records = append(records, record)
 | |
| 	}
 | |
| 
 | |
| 	return records, nil
 | |
| }
 | |
| 
 | |
| // Read Many records
 | |
| func (s *sqlStore) read(key string, options store.ReadOptions) ([]*store.Record, error) {
 | |
| 	pattern := "%"
 | |
| 	if options.Prefix {
 | |
| 		pattern = key + pattern
 | |
| 	}
 | |
| 	if options.Suffix {
 | |
| 		pattern = pattern + key
 | |
| 	}
 | |
| 	var rows *sql.Rows
 | |
| 	var err error
 | |
| 	if options.Limit != 0 {
 | |
| 		rows, err = s.readOffset.Query(pattern, options.Limit, options.Offset)
 | |
| 	} else {
 | |
| 		rows, err = s.readMany.Query(pattern)
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		if err == sql.ErrNoRows {
 | |
| 			return []*store.Record{}, nil
 | |
| 		}
 | |
| 		return []*store.Record{}, errors.Wrap(err, "sqlStore.read failed")
 | |
| 	}
 | |
| 	defer rows.Close()
 | |
| 	var records []*store.Record
 | |
| 	var timehelper pq.NullTime
 | |
| 
 | |
| 	for rows.Next() {
 | |
| 		record := &store.Record{}
 | |
| 		if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil {
 | |
| 			return records, err
 | |
| 		}
 | |
| 		if timehelper.Valid {
 | |
| 			if timehelper.Time.Before(time.Now()) {
 | |
| 				// record has expired
 | |
| 				go s.Delete(record.Key)
 | |
| 			} else {
 | |
| 				record.Expiry = time.Until(timehelper.Time)
 | |
| 				records = append(records, record)
 | |
| 			}
 | |
| 		} else {
 | |
| 			records = append(records, record)
 | |
| 		}
 | |
| 	}
 | |
| 	rowErr := rows.Close()
 | |
| 	if rowErr != nil {
 | |
| 		// transaction rollback or something
 | |
| 		return records, rowErr
 | |
| 	}
 | |
| 	if err := rows.Err(); err != nil {
 | |
| 		return records, err
 | |
| 	}
 | |
| 
 | |
| 	return records, nil
 | |
| }
 | |
| 
 | |
| // Write records
 | |
| func (s *sqlStore) Write(r *store.Record, opts ...store.WriteOption) error {
 | |
| 	var err error
 | |
| 	if r.Expiry != 0 {
 | |
| 		_, err = s.write.Exec(r.Key, r.Value, time.Now().Add(r.Expiry))
 | |
| 	} else {
 | |
| 		_, err = s.write.Exec(r.Key, r.Value, nil)
 | |
| 	}
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return errors.Wrap(err, "Couldn't insert record "+r.Key)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Delete records with keys
 | |
| func (s *sqlStore) Delete(key string, opts ...store.DeleteOption) error {
 | |
| 	result, err := s.delete.Exec(key)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	_, err = result.RowsAffected()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *sqlStore) initDB() error {
 | |
| 	// Create the namespace's database
 | |
| 	_, err := s.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s ;", s.database))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	_, err = s.db.Exec(fmt.Sprintf("SET DATABASE = %s ;", s.database))
 | |
| 	if err != nil {
 | |
| 		return errors.Wrap(err, "Couldn't set database")
 | |
| 	}
 | |
| 
 | |
| 	// Create a table for the namespace's prefix
 | |
| 	_, err = s.db.Exec(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s
 | |
| 	(
 | |
| 		key text NOT NULL,
 | |
| 		value bytea,
 | |
| 		expiry timestamp with time zone,
 | |
| 		CONSTRAINT %s_pkey PRIMARY KEY (key)
 | |
| 	);`, s.table, s.table))
 | |
| 	if err != nil {
 | |
| 		return errors.Wrap(err, "Couldn't create table")
 | |
| 	}
 | |
| 
 | |
| 	// Create Index
 | |
| 	_, err = s.db.Exec(fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "%s" ON %s.%s USING btree ("key");`, "key_index_"+s.table, s.database, s.table))
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	list, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s;", s.database, s.table))
 | |
| 	if err != nil {
 | |
| 		return errors.Wrap(err, "List statement couldn't be prepared")
 | |
| 	}
 | |
| 	if s.list != nil {
 | |
| 		s.list.Close()
 | |
| 	}
 | |
| 	s.list = list
 | |
| 	readOne, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table))
 | |
| 	if err != nil {
 | |
| 		return errors.Wrap(err, "ReadOne statement couldn't be prepared")
 | |
| 	}
 | |
| 	if s.readOne != nil {
 | |
| 		s.readOne.Close()
 | |
| 	}
 | |
| 	s.readOne = readOne
 | |
| 	readMany, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1;", s.database, s.table))
 | |
| 	if err != nil {
 | |
| 		return errors.Wrap(err, "ReadMany statement couldn't be prepared")
 | |
| 	}
 | |
| 	if s.readMany != nil {
 | |
| 		s.readMany.Close()
 | |
| 	}
 | |
| 	s.readMany = readMany
 | |
| 	readOffset, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;", s.database, s.table))
 | |
| 	if err != nil {
 | |
| 		return errors.Wrap(err, "ReadOffset statement couldn't be prepared")
 | |
| 	}
 | |
| 	if s.readOffset != nil {
 | |
| 		s.readOffset.Close()
 | |
| 	}
 | |
| 	s.readOffset = readOffset
 | |
| 	write, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry)
 | |
| 		VALUES ($1, $2::bytea, $3)
 | |
| 		ON CONFLICT (key)
 | |
| 		DO UPDATE
 | |
| 		SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.database, s.table))
 | |
| 	if err != nil {
 | |
| 		return errors.Wrap(err, "Write statement couldn't be prepared")
 | |
| 	}
 | |
| 	if s.write != nil {
 | |
| 		s.write.Close()
 | |
| 	}
 | |
| 	s.write = write
 | |
| 	delete, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table))
 | |
| 	if err != nil {
 | |
| 		return errors.Wrap(err, "Delete statement couldn't be prepared")
 | |
| 	}
 | |
| 	if s.delete != nil {
 | |
| 		s.delete.Close()
 | |
| 	}
 | |
| 	s.delete = delete
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *sqlStore) configure() error {
 | |
| 	if len(s.options.Nodes) == 0 {
 | |
| 		s.options.Nodes = []string{"localhost:26257"}
 | |
| 	}
 | |
| 
 | |
| 	namespace := s.options.Database
 | |
| 	if len(namespace) == 0 {
 | |
| 		namespace = DefaultNamespace
 | |
| 	}
 | |
| 
 | |
| 	prefix := s.options.Table
 | |
| 	if len(prefix) == 0 {
 | |
| 		prefix = DefaultPrefix
 | |
| 	}
 | |
| 
 | |
| 	// store.namespace must only contain letters, numbers and underscores
 | |
| 	reg, err := regexp.Compile("[^a-zA-Z0-9]+")
 | |
| 	if err != nil {
 | |
| 		return errors.New("error compiling regex for namespace")
 | |
| 	}
 | |
| 	namespace = reg.ReplaceAllString(namespace, "_")
 | |
| 	prefix = reg.ReplaceAllString(prefix, "_")
 | |
| 
 | |
| 	source := s.options.Nodes[0]
 | |
| 	// check if it is a standard connection string eg: host=%s port=%d user=%s password=%s dbname=%s sslmode=disable
 | |
| 	// if err is nil which means it would be a URL like postgre://xxxx?yy=zz
 | |
| 	_, err = url.Parse(source)
 | |
| 	if err != nil {
 | |
| 		if !strings.Contains(source, " ") {
 | |
| 			source = fmt.Sprintf("host=%s", source)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// create source from first node
 | |
| 	db, err := sql.Open("postgres", source)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if err := db.Ping(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if s.db != nil {
 | |
| 		s.db.Close()
 | |
| 	}
 | |
| 
 | |
| 	// save the values
 | |
| 	s.db = db
 | |
| 	s.database = namespace
 | |
| 	s.table = prefix
 | |
| 
 | |
| 	// initialise the database
 | |
| 	return s.initDB()
 | |
| }
 | |
| 
 | |
| func (s *sqlStore) String() string {
 | |
| 	return "cockroach"
 | |
| }
 | |
| 
 | |
| func (s *sqlStore) Options() store.Options {
 | |
| 	return s.options
 | |
| }
 | |
| 
 | |
| // NewStore returns a new micro Store backed by sql
 | |
| func NewStore(opts ...store.Option) store.Store {
 | |
| 	var options store.Options
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 
 | |
| 	// new store
 | |
| 	s := new(sqlStore)
 | |
| 	// set the options
 | |
| 	s.options = options
 | |
| 
 | |
| 	// best-effort configure the store
 | |
| 	s.configure()
 | |
| 
 | |
| 	// return store
 | |
| 	return s
 | |
| }
 |