From 0131e9468f5e2f643e69edfb74e3f0da3f3648e0 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 16 Dec 2019 12:13:18 +0000 Subject: [PATCH] change use of store namespace/prefix in sql store --- store/options.go | 9 ++++++ store/postgresql/postgresql.go | 37 ++++++++++++++++-------- store/service/service.go | 51 +++++++++++++++++++++++++++++----- 3 files changed, 79 insertions(+), 18 deletions(-) diff --git a/store/options.go b/store/options.go index 99014362..f5dbf7e2 100644 --- a/store/options.go +++ b/store/options.go @@ -4,6 +4,15 @@ import ( "github.com/micro/go-micro/config/options" ) +type Options struct { + // nodes to connect to + Nodes []string + // Namespace of the store + Namespace string + // Prefix of the keys used + Prefix string +} + // Nodes is a list of nodes used to back the store func Nodes(a ...string) options.Option { return options.WithValue("store.nodes", a) diff --git a/store/postgresql/postgresql.go b/store/postgresql/postgresql.go index 58fa935c..3cce21a6 100644 --- a/store/postgresql/postgresql.go +++ b/store/postgresql/postgresql.go @@ -17,18 +17,22 @@ import ( // DefaultNamespace is the namespace that the sql store // will use if no namespace is provided. -const DefaultNamespace = "micro" +var ( + DefaultNamespace = "micro" + DefaultPrefix = "micro" +) type sqlStore struct { db *sql.DB - table string + database string + table string options.Options } // List all the known records func (s *sqlStore) List() ([]*store.Record, error) { - q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s;", s.table)) + q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s;", s.database, s.table)) if err != nil { return nil, err } @@ -73,7 +77,7 @@ func (s *sqlStore) List() ([]*store.Record, error) { // Read all records with keys func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) { - q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM micro.%s WHERE key = $1;", s.table)) + q, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table)) if err != nil { return nil, err } @@ -105,11 +109,11 @@ func (s *sqlStore) Read(keys ...string) ([]*store.Record, error) { // Write records func (s *sqlStore) Write(rec ...*store.Record) error { - q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO micro.%s(key, value, expiry) + q, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry) VALUES ($1, $2::bytea, $3) ON CONFLICT (key) DO UPDATE - SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.table)) + SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.database, s.table)) if err != nil { return err } @@ -130,7 +134,7 @@ func (s *sqlStore) Write(rec ...*store.Record) error { // Delete records with keys func (s *sqlStore) Delete(keys ...string) error { - q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM micro.%s WHERE key = $1;", s.table)) + q, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table)) if err != nil { return err } @@ -151,10 +155,21 @@ func (s *sqlStore) initDB(options options.Options) error { // Get the store.namespace option, or use sql.DefaultNamespace namespaceOpt, found := options.Values().Get("store.namespace") if !found { - s.table = DefaultNamespace + s.database = DefaultNamespace } else { if namespace, ok := namespaceOpt.(string); ok { - s.table = namespace + s.database = namespace + } else { + return errors.New("store.namespace option must be a string") + } + } + // Get the store.namespace option, or use sql.DefaultNamespace + prefixOpt, found := options.Values().Get("store.prefix") + if !found { + s.table = DefaultPrefix + } else { + if prefix, ok := prefixOpt.(string); ok { + s.table = prefix } else { return errors.New("store.namespace option must be a string") } @@ -171,13 +186,13 @@ func (s *sqlStore) initDB(options options.Options) error { } // Create a table for the Store namespace - tableq, err := s.db.Prepare(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS micro.%s + tableq, err := s.db.Prepare(fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s ( key text COLLATE "default" NOT NULL, value bytea, expiry timestamp with time zone, CONSTRAINT %s_pkey PRIMARY KEY (key) - );`, s.table, s.table)) + );`, s.database, s.table, s.table)) if err != nil { return errors.Wrap(err, "SQL statement preparation failed") } diff --git a/store/service/service.go b/store/service/service.go index 1cb486bc..e0432016 100644 --- a/store/service/service.go +++ b/store/service/service.go @@ -8,6 +8,7 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/config/options" + "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/store" pb "github.com/micro/go-micro/store/service/proto" ) @@ -15,16 +16,38 @@ import ( type serviceStore struct { options.Options + // Namespace to use + Namespace string + // Addresses of the nodes Nodes []string + // Prefix to use + Prefix string + // store service client Client pb.StoreService } +func (s *serviceStore) Context() context.Context { + ctx := context.Background() + + md := make(metadata.Metadata) + + if len(s.Namespace) > 0 { + md["Micro-Namespace"] = s.Namespace + } + + if len(s.Prefix) > 0 { + md["Micro-Prefix"] = s.Prefix + } + + return metadata.NewContext(ctx, md) +} + // Sync all the known records func (s *serviceStore) List() ([]*store.Record, error) { - stream, err := s.Client.List(context.Background(), &pb.ListRequest{}, client.WithAddress(s.Nodes...)) + stream, err := s.Client.List(s.Context(), &pb.ListRequest{}, client.WithAddress(s.Nodes...)) if err != nil { return nil, err } @@ -54,7 +77,7 @@ func (s *serviceStore) List() ([]*store.Record, error) { // Read a record with key func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) { - rsp, err := s.Client.Read(context.Background(), &pb.ReadRequest{ + rsp, err := s.Client.Read(s.Context(), &pb.ReadRequest{ Keys: keys, }, client.WithAddress(s.Nodes...)) if err != nil { @@ -84,7 +107,7 @@ func (s *serviceStore) Write(recs ...*store.Record) error { }) } - _, err := s.Client.Write(context.Background(), &pb.WriteRequest{ + _, err := s.Client.Write(s.Context(), &pb.WriteRequest{ Records: records, }, client.WithAddress(s.Nodes...)) @@ -93,7 +116,7 @@ func (s *serviceStore) Write(recs ...*store.Record) error { // Delete a record with key func (s *serviceStore) Delete(keys ...string) error { - _, err := s.Client.Delete(context.Background(), &pb.DeleteRequest{ + _, err := s.Client.Delete(s.Context(), &pb.DeleteRequest{ Keys: keys, }, client.WithAddress(s.Nodes...)) return err @@ -104,16 +127,30 @@ func NewStore(opts ...options.Option) store.Store { options := options.NewOptions(opts...) var nodes []string + var namespace string + var prefix string n, ok := options.Values().Get("store.nodes") if ok { nodes = n.([]string) } + ns, ok := options.Values().Get("store.namespace") + if ok { + namespace = ns.(string) + } + + prx, ok := options.Values().Get("store.prefix") + if ok { + prefix = prx.(string) + } + service := &serviceStore{ - Options: options, - Nodes: nodes, - Client: pb.NewStoreService("go.micro.store", client.DefaultClient), + Options: options, + Namespace: namespace, + Prefix: prefix, + Nodes: nodes, + Client: pb.NewStoreService("go.micro.store", client.DefaultClient), } return service