change use of store namespace/prefix in sql store

This commit is contained in:
Asim Aslam 2019-12-16 12:13:18 +00:00
parent 64e438a8d4
commit 0131e9468f
3 changed files with 79 additions and 18 deletions

View File

@ -4,6 +4,15 @@ import (
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
) )
type Options struct {
// nodes to connect to
Nodes []string
// Namespace of the store
Namespace string
// Prefix of the keys used
Prefix string
}
// Nodes is a list of nodes used to back the store // Nodes is a list of nodes used to back the store
func Nodes(a ...string) options.Option { func Nodes(a ...string) options.Option {
return options.WithValue("store.nodes", a) return options.WithValue("store.nodes", a)

View File

@ -17,18 +17,22 @@ import (
// DefaultNamespace is the namespace that the sql store // DefaultNamespace is the namespace that the sql store
// will use if no namespace is provided. // will use if no namespace is provided.
const DefaultNamespace = "micro" var (
DefaultNamespace = "micro"
DefaultPrefix = "micro"
)
type sqlStore struct { type sqlStore struct {
db *sql.DB db *sql.DB
database string
table string table string
options.Options options.Options
} }
// List all the known records // List all the known records
func (s *sqlStore) List() ([]*store.Record, error) { func (s *sqlStore) List() ([]*store.Record, error) {
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s;", s.table)) q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s;", s.database, s.table))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -73,7 +77,7 @@ func (s *sqlStore) List() ([]*store.Record, error) {
// Read all records with keys // Read all records with keys
func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) { func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) {
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s WHERE key = $1;", s.table)) q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -105,11 +109,11 @@ func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) {
// Write records // Write records
func (s *sqlStore) Write(rec ...*store.Record) error { func (s *sqlStore) Write(rec ...*store.Record) error {
q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO micro.%s(key, value, expiry) q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry)
VALUES ($1, $2::bytea, $3) VALUES ($1, $2::bytea, $3)
ON CONFLICT (key) ON CONFLICT (key)
DO UPDATE DO UPDATE
SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.table)) SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.database, s.table))
if err != nil { if err != nil {
return err return err
} }
@ -130,7 +134,7 @@ func (s *sqlStore) Write(rec ...*store.Record) error {
// Delete records with keys // Delete records with keys
func (s *sqlStore) Delete(keys ...string) error { func (s *sqlStore) Delete(keys ...string) error {
q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM micro.%s WHERE key = $1;", s.table)) q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table))
if err != nil { if err != nil {
return err return err
} }
@ -151,10 +155,21 @@ func (s *sqlStore) initDB(options options.Options) error {
// Get the store.namespace option, or use sql.DefaultNamespace // Get the store.namespace option, or use sql.DefaultNamespace
namespaceOpt, found := options.Values().Get("store.namespace") namespaceOpt, found := options.Values().Get("store.namespace")
if !found { if !found {
s.table = DefaultNamespace s.database = DefaultNamespace
} else { } else {
if namespace, ok := namespaceOpt.(string); ok { if namespace, ok := namespaceOpt.(string); ok {
s.table = namespace s.database = namespace
} else {
return errors.New("store.namespace option must be a string")
}
}
// Get the store.namespace option, or use sql.DefaultNamespace
prefixOpt, found := options.Values().Get("store.prefix")
if !found {
s.table = DefaultPrefix
} else {
if prefix, ok := prefixOpt.(string); ok {
s.table = prefix
} else { } else {
return errors.New("store.namespace option must be a string") return errors.New("store.namespace option must be a string")
} }
@ -171,13 +186,13 @@ func (s *sqlStore) initDB(options options.Options) error {
} }
// Create a table for the Store namespace // Create a table for the Store namespace
tableq, err := s.db.Prepare(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS micro.%s tableq, err := s.db.Prepare(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s
( (
key text COLLATE "default" NOT NULL, key text COLLATE "default" NOT NULL,
value bytea, value bytea,
expiry timestamp with time zone, expiry timestamp with time zone,
CONSTRAINT %s_pkey PRIMARY KEY (key) CONSTRAINT %s_pkey PRIMARY KEY (key)
);`, s.table, s.table)) );`, s.database, s.table, s.table))
if err != nil { if err != nil {
return errors.Wrap(err, "SQL statement preparation failed") return errors.Wrap(err, "SQL statement preparation failed")
} }

View File

@ -8,6 +8,7 @@ import (
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/store" "github.com/micro/go-micro/store"
pb "github.com/micro/go-micro/store/service/proto" pb "github.com/micro/go-micro/store/service/proto"
) )
@ -15,16 +16,38 @@ import (
type serviceStore struct { type serviceStore struct {
options.Options options.Options
// Namespace to use
Namespace string
// Addresses of the nodes // Addresses of the nodes
Nodes []string Nodes []string
// Prefix to use
Prefix string
// store service client // store service client
Client pb.StoreService Client pb.StoreService
} }
func (s *serviceStore) Context() context.Context {
ctx := context.Background()
md := make(metadata.Metadata)
if len(s.Namespace) > 0 {
md["Micro-Namespace"] = s.Namespace
}
if len(s.Prefix) > 0 {
md["Micro-Prefix"] = s.Prefix
}
return metadata.NewContext(ctx, md)
}
// Sync all the known records // Sync all the known records
func (s *serviceStore) List() ([]*store.Record, error) { func (s *serviceStore) List() ([]*store.Record, error) {
stream, err := s.Client.List(context.Background(), &pb.ListRequest{}, client.WithAddress(s.Nodes...)) stream, err := s.Client.List(s.Context(), &pb.ListRequest{}, client.WithAddress(s.Nodes...))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -54,7 +77,7 @@ func (s *serviceStore) List() ([]*store.Record, error) {
// Read a record with key // Read a record with key
func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) { func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) {
rsp, err := s.Client.Read(context.Background(), &pb.ReadRequest{ rsp, err := s.Client.Read(s.Context(), &pb.ReadRequest{
Keys: keys, Keys: keys,
}, client.WithAddress(s.Nodes...)) }, client.WithAddress(s.Nodes...))
if err != nil { if err != nil {
@ -84,7 +107,7 @@ func (s *serviceStore) Write(recs ...*store.Record) error {
}) })
} }
_, err := s.Client.Write(context.Background(), &pb.WriteRequest{ _, err := s.Client.Write(s.Context(), &pb.WriteRequest{
Records: records, Records: records,
}, client.WithAddress(s.Nodes...)) }, client.WithAddress(s.Nodes...))
@ -93,7 +116,7 @@ func (s *serviceStore) Write(recs ...*store.Record) error {
// Delete a record with key // Delete a record with key
func (s *serviceStore) Delete(keys ...string) error { func (s *serviceStore) Delete(keys ...string) error {
_, err := s.Client.Delete(context.Background(), &pb.DeleteRequest{ _, err := s.Client.Delete(s.Context(), &pb.DeleteRequest{
Keys: keys, Keys: keys,
}, client.WithAddress(s.Nodes...)) }, client.WithAddress(s.Nodes...))
return err return err
@ -104,14 +127,28 @@ func NewStore(opts ...options.Option) store.Store {
options := options.NewOptions(opts...) options := options.NewOptions(opts...)
var nodes []string var nodes []string
var namespace string
var prefix string
n, ok := options.Values().Get("store.nodes") n, ok := options.Values().Get("store.nodes")
if ok { if ok {
nodes = n.([]string) nodes = n.([]string)
} }
ns, ok := options.Values().Get("store.namespace")
if ok {
namespace = ns.(string)
}
prx, ok := options.Values().Get("store.prefix")
if ok {
prefix = prx.(string)
}
service := &serviceStore{ service := &serviceStore{
Options: options, Options: options,
Namespace: namespace,
Prefix: prefix,
Nodes: nodes, Nodes: nodes,
Client: pb.NewStoreService("go.micro.store", client.DefaultClient), Client: pb.NewStoreService("go.micro.store", client.DefaultClient),
} }