Merge pull request #1044 from micro/store-namespace
change use of store namespace/prefix in sql store
This commit is contained in:
commit
59246e0412
@ -4,6 +4,15 @@ import (
|
|||||||
"github.com/micro/go-micro/config/options"
|
"github.com/micro/go-micro/config/options"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Options struct {
|
||||||
|
// nodes to connect to
|
||||||
|
Nodes []string
|
||||||
|
// Namespace of the store
|
||||||
|
Namespace string
|
||||||
|
// Prefix of the keys used
|
||||||
|
Prefix string
|
||||||
|
}
|
||||||
|
|
||||||
// Nodes is a list of nodes used to back the store
|
// Nodes is a list of nodes used to back the store
|
||||||
func Nodes(a ...string) options.Option {
|
func Nodes(a ...string) options.Option {
|
||||||
return options.WithValue("store.nodes", a)
|
return options.WithValue("store.nodes", a)
|
||||||
|
@ -17,18 +17,22 @@ import (
|
|||||||
|
|
||||||
// DefaultNamespace is the namespace that the sql store
|
// DefaultNamespace is the namespace that the sql store
|
||||||
// will use if no namespace is provided.
|
// will use if no namespace is provided.
|
||||||
const DefaultNamespace = "micro"
|
var (
|
||||||
|
DefaultNamespace = "micro"
|
||||||
|
DefaultPrefix = "micro"
|
||||||
|
)
|
||||||
|
|
||||||
type sqlStore struct {
|
type sqlStore struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
|
||||||
table string
|
database string
|
||||||
|
table string
|
||||||
options.Options
|
options.Options
|
||||||
}
|
}
|
||||||
|
|
||||||
// List all the known records
|
// List all the known records
|
||||||
func (s *sqlStore) List() ([]*store.Record, error) {
|
func (s *sqlStore) List() ([]*store.Record, error) {
|
||||||
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s;", s.table))
|
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s;", s.database, s.table))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -73,7 +77,7 @@ func (s *sqlStore) List() ([]*store.Record, error) {
|
|||||||
|
|
||||||
// Read all records with keys
|
// Read all records with keys
|
||||||
func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) {
|
func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) {
|
||||||
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s WHERE key = $1;", s.table))
|
q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -105,11 +109,11 @@ func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) {
|
|||||||
|
|
||||||
// Write records
|
// Write records
|
||||||
func (s *sqlStore) Write(rec ...*store.Record) error {
|
func (s *sqlStore) Write(rec ...*store.Record) error {
|
||||||
q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO micro.%s(key, value, expiry)
|
q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry)
|
||||||
VALUES ($1, $2::bytea, $3)
|
VALUES ($1, $2::bytea, $3)
|
||||||
ON CONFLICT (key)
|
ON CONFLICT (key)
|
||||||
DO UPDATE
|
DO UPDATE
|
||||||
SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.table))
|
SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.database, s.table))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -130,7 +134,7 @@ func (s *sqlStore) Write(rec ...*store.Record) error {
|
|||||||
|
|
||||||
// Delete records with keys
|
// Delete records with keys
|
||||||
func (s *sqlStore) Delete(keys ...string) error {
|
func (s *sqlStore) Delete(keys ...string) error {
|
||||||
q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM micro.%s WHERE key = $1;", s.table))
|
q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -151,10 +155,21 @@ func (s *sqlStore) initDB(options options.Options) error {
|
|||||||
// Get the store.namespace option, or use sql.DefaultNamespace
|
// Get the store.namespace option, or use sql.DefaultNamespace
|
||||||
namespaceOpt, found := options.Values().Get("store.namespace")
|
namespaceOpt, found := options.Values().Get("store.namespace")
|
||||||
if !found {
|
if !found {
|
||||||
s.table = DefaultNamespace
|
s.database = DefaultNamespace
|
||||||
} else {
|
} else {
|
||||||
if namespace, ok := namespaceOpt.(string); ok {
|
if namespace, ok := namespaceOpt.(string); ok {
|
||||||
s.table = namespace
|
s.database = namespace
|
||||||
|
} else {
|
||||||
|
return errors.New("store.namespace option must be a string")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Get the store.namespace option, or use sql.DefaultNamespace
|
||||||
|
prefixOpt, found := options.Values().Get("store.prefix")
|
||||||
|
if !found {
|
||||||
|
s.table = DefaultPrefix
|
||||||
|
} else {
|
||||||
|
if prefix, ok := prefixOpt.(string); ok {
|
||||||
|
s.table = prefix
|
||||||
} else {
|
} else {
|
||||||
return errors.New("store.namespace option must be a string")
|
return errors.New("store.namespace option must be a string")
|
||||||
}
|
}
|
||||||
@ -171,13 +186,13 @@ func (s *sqlStore) initDB(options options.Options) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create a table for the Store namespace
|
// Create a table for the Store namespace
|
||||||
tableq, err := s.db.Prepare(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS micro.%s
|
tableq, err := s.db.Prepare(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s
|
||||||
(
|
(
|
||||||
key text COLLATE "default" NOT NULL,
|
key text COLLATE "default" NOT NULL,
|
||||||
value bytea,
|
value bytea,
|
||||||
expiry timestamp with time zone,
|
expiry timestamp with time zone,
|
||||||
CONSTRAINT %s_pkey PRIMARY KEY (key)
|
CONSTRAINT %s_pkey PRIMARY KEY (key)
|
||||||
);`, s.table, s.table))
|
);`, s.database, s.table, s.table))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "SQL statement preparation failed")
|
return errors.Wrap(err, "SQL statement preparation failed")
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"github.com/micro/go-micro/client"
|
"github.com/micro/go-micro/client"
|
||||||
"github.com/micro/go-micro/config/options"
|
"github.com/micro/go-micro/config/options"
|
||||||
|
"github.com/micro/go-micro/metadata"
|
||||||
"github.com/micro/go-micro/store"
|
"github.com/micro/go-micro/store"
|
||||||
pb "github.com/micro/go-micro/store/service/proto"
|
pb "github.com/micro/go-micro/store/service/proto"
|
||||||
)
|
)
|
||||||
@ -15,16 +16,38 @@ import (
|
|||||||
type serviceStore struct {
|
type serviceStore struct {
|
||||||
options.Options
|
options.Options
|
||||||
|
|
||||||
|
// Namespace to use
|
||||||
|
Namespace string
|
||||||
|
|
||||||
// Addresses of the nodes
|
// Addresses of the nodes
|
||||||
Nodes []string
|
Nodes []string
|
||||||
|
|
||||||
|
// Prefix to use
|
||||||
|
Prefix string
|
||||||
|
|
||||||
// store service client
|
// store service client
|
||||||
Client pb.StoreService
|
Client pb.StoreService
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *serviceStore) Context() context.Context {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
md := make(metadata.Metadata)
|
||||||
|
|
||||||
|
if len(s.Namespace) > 0 {
|
||||||
|
md["Micro-Namespace"] = s.Namespace
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(s.Prefix) > 0 {
|
||||||
|
md["Micro-Prefix"] = s.Prefix
|
||||||
|
}
|
||||||
|
|
||||||
|
return metadata.NewContext(ctx, md)
|
||||||
|
}
|
||||||
|
|
||||||
// Sync all the known records
|
// Sync all the known records
|
||||||
func (s *serviceStore) List() ([]*store.Record, error) {
|
func (s *serviceStore) List() ([]*store.Record, error) {
|
||||||
stream, err := s.Client.List(context.Background(), &pb.ListRequest{}, client.WithAddress(s.Nodes...))
|
stream, err := s.Client.List(s.Context(), &pb.ListRequest{}, client.WithAddress(s.Nodes...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -54,7 +77,7 @@ func (s *serviceStore) List() ([]*store.Record, error) {
|
|||||||
|
|
||||||
// Read a record with key
|
// Read a record with key
|
||||||
func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) {
|
func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) {
|
||||||
rsp, err := s.Client.Read(context.Background(), &pb.ReadRequest{
|
rsp, err := s.Client.Read(s.Context(), &pb.ReadRequest{
|
||||||
Keys: keys,
|
Keys: keys,
|
||||||
}, client.WithAddress(s.Nodes...))
|
}, client.WithAddress(s.Nodes...))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -84,7 +107,7 @@ func (s *serviceStore) Write(recs ...*store.Record) error {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := s.Client.Write(context.Background(), &pb.WriteRequest{
|
_, err := s.Client.Write(s.Context(), &pb.WriteRequest{
|
||||||
Records: records,
|
Records: records,
|
||||||
}, client.WithAddress(s.Nodes...))
|
}, client.WithAddress(s.Nodes...))
|
||||||
|
|
||||||
@ -93,7 +116,7 @@ func (s *serviceStore) Write(recs ...*store.Record) error {
|
|||||||
|
|
||||||
// Delete a record with key
|
// Delete a record with key
|
||||||
func (s *serviceStore) Delete(keys ...string) error {
|
func (s *serviceStore) Delete(keys ...string) error {
|
||||||
_, err := s.Client.Delete(context.Background(), &pb.DeleteRequest{
|
_, err := s.Client.Delete(s.Context(), &pb.DeleteRequest{
|
||||||
Keys: keys,
|
Keys: keys,
|
||||||
}, client.WithAddress(s.Nodes...))
|
}, client.WithAddress(s.Nodes...))
|
||||||
return err
|
return err
|
||||||
@ -104,16 +127,30 @@ func NewStore(opts ...options.Option) store.Store {
|
|||||||
options := options.NewOptions(opts...)
|
options := options.NewOptions(opts...)
|
||||||
|
|
||||||
var nodes []string
|
var nodes []string
|
||||||
|
var namespace string
|
||||||
|
var prefix string
|
||||||
|
|
||||||
n, ok := options.Values().Get("store.nodes")
|
n, ok := options.Values().Get("store.nodes")
|
||||||
if ok {
|
if ok {
|
||||||
nodes = n.([]string)
|
nodes = n.([]string)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ns, ok := options.Values().Get("store.namespace")
|
||||||
|
if ok {
|
||||||
|
namespace = ns.(string)
|
||||||
|
}
|
||||||
|
|
||||||
|
prx, ok := options.Values().Get("store.prefix")
|
||||||
|
if ok {
|
||||||
|
prefix = prx.(string)
|
||||||
|
}
|
||||||
|
|
||||||
service := &serviceStore{
|
service := &serviceStore{
|
||||||
Options: options,
|
Options: options,
|
||||||
Nodes: nodes,
|
Namespace: namespace,
|
||||||
Client: pb.NewStoreService("go.micro.store", client.DefaultClient),
|
Prefix: prefix,
|
||||||
|
Nodes: nodes,
|
||||||
|
Client: pb.NewStoreService("go.micro.store", client.DefaultClient),
|
||||||
}
|
}
|
||||||
|
|
||||||
return service
|
return service
|
||||||
|
Loading…
Reference in New Issue
Block a user