From 359b8bc5030514b4c2a3178acb467dada3d3e823 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 30 Apr 2020 22:51:25 +0100 Subject: [PATCH] Add opts to service proto (#1517) * Add opts to service proto * Support database/table opts --- store/cockroach/cockroach.go | 559 ++++++++++++++------------ store/file/file.go | 190 +++++---- store/memory/memory.go | 257 ++++++------ store/memory/memory_test.go | 6 +- store/service/proto/store.pb.go | 492 ++++++----------------- store/service/proto/store.pb.micro.go | 2 +- store/service/proto/store.proto | 32 +- store/service/service.go | 66 ++- 8 files changed, 774 insertions(+), 830 deletions(-) diff --git a/store/cockroach/cockroach.go b/store/cockroach/cockroach.go index 4aa68ac5..144ce44d 100644 --- a/store/cockroach/cockroach.go +++ b/store/cockroach/cockroach.go @@ -7,6 +7,7 @@ import ( "net/url" "regexp" "strings" + "sync" "time" "github.com/lib/pq" @@ -21,214 +22,51 @@ var ( DefaultTable = "micro" ) +var ( + statements = map[string]string{ + "list": "SELECT key, value, expiry FROM %s.%s;", + "read": "SELECT key, value, expiry FROM %s.%s WHERE key = $1;", + "readMany": "SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1;", + "readOffset": "SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;", + "write": "INSERT INTO %s.%s(key, value, expiry) VALUES ($1, $2::bytea, $3) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;", + "delete": "DELETE FROM %s.%s WHERE key = $1;", + } +) + type sqlStore struct { - db *sql.DB - - database string - table string - - list *sql.Stmt - readOne *sql.Stmt - readMany *sql.Stmt - readOffset *sql.Stmt - write *sql.Stmt - delete *sql.Stmt - options store.Options + db *sql.DB + + sync.RWMutex + // known databases + databases map[string]bool } -func (s *sqlStore) Close() error { - closeStmt(s.delete) - closeStmt(s.list) - closeStmt(s.readMany) - closeStmt(s.readOffset) - closeStmt(s.readOne) - closeStmt(s.write) - if s.db != nil { - return s.db.Close() +func (s *sqlStore) createDB(database, table string) { + if len(database) == 0 { + database = s.options.Database } - return nil + if len(table) == 0 { + table = s.options.Table + } + + s.Lock() + _, ok := s.databases[database+":"+table] + if !ok { + s.initDB(database, table) + s.databases[database+":"+table] = true + } + s.Unlock() } -func (s *sqlStore) Init(opts ...store.Option) error { - for _, o := range opts { - o(&s.options) - } - // reconfigure - return s.configure() -} - -// List all the known records -func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) { - rows, err := s.list.Query() - var keys []string - var timehelper pq.NullTime - if err != nil { - if err == sql.ErrNoRows { - return keys, nil - } - return nil, err - } - defer rows.Close() - for rows.Next() { - record := &store.Record{} - if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil { - return keys, err - } - if timehelper.Valid { - if timehelper.Time.Before(time.Now()) { - // record has expired - go s.Delete(record.Key) - } else { - record.Expiry = time.Until(timehelper.Time) - keys = append(keys, record.Key) - } - } else { - keys = append(keys, record.Key) - } - - } - rowErr := rows.Close() - if rowErr != nil { - // transaction rollback or something - return keys, rowErr - } - if err := rows.Err(); err != nil { - return keys, err - } - return keys, nil -} - -// Read a single key -func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { - var options store.ReadOptions - for _, o := range opts { - o(&options) - } - - if options.Prefix || options.Suffix { - return s.read(key, options) - } - - var records []*store.Record - var timehelper pq.NullTime - - row := s.readOne.QueryRow(key) - record := &store.Record{} - if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil { - if err == sql.ErrNoRows { - return records, store.ErrNotFound - } - return records, err - } - if timehelper.Valid { - if timehelper.Time.Before(time.Now()) { - // record has expired - go s.Delete(key) - return records, store.ErrNotFound - } - record.Expiry = time.Until(timehelper.Time) - records = append(records, record) - } else { - records = append(records, record) - } - - return records, nil -} - -// Read Many records -func (s *sqlStore) read(key string, options store.ReadOptions) ([]*store.Record, error) { - pattern := "%" - if options.Prefix { - pattern = key + pattern - } - if options.Suffix { - pattern = pattern + key - } - var rows *sql.Rows - var err error - if options.Limit != 0 { - rows, err = s.readOffset.Query(pattern, options.Limit, options.Offset) - } else { - rows, err = s.readMany.Query(pattern) - } - if err != nil { - if err == sql.ErrNoRows { - return []*store.Record{}, nil - } - return []*store.Record{}, errors.Wrap(err, "sqlStore.read failed") - } - defer rows.Close() - var records []*store.Record - var timehelper pq.NullTime - - for rows.Next() { - record := &store.Record{} - if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil { - return records, err - } - if timehelper.Valid { - if timehelper.Time.Before(time.Now()) { - // record has expired - go s.Delete(record.Key) - } else { - record.Expiry = time.Until(timehelper.Time) - records = append(records, record) - } - } else { - records = append(records, record) - } - } - rowErr := rows.Close() - if rowErr != nil { - // transaction rollback or something - return records, rowErr - } - if err := rows.Err(); err != nil { - return records, err - } - - return records, nil -} - -// Write records -func (s *sqlStore) Write(r *store.Record, opts ...store.WriteOption) error { - var err error - if r.Expiry != 0 { - _, err = s.write.Exec(r.Key, r.Value, time.Now().Add(r.Expiry)) - } else { - _, err = s.write.Exec(r.Key, r.Value, nil) - } - - if err != nil { - return errors.Wrap(err, "Couldn't insert record "+r.Key) - } - - return nil -} - -// Delete records with keys -func (s *sqlStore) Delete(key string, opts ...store.DeleteOption) error { - result, err := s.delete.Exec(key) - if err != nil { - return err - } - _, err = result.RowsAffected() - if err != nil { - return err - } - - return nil -} - -func (s *sqlStore) initDB() error { +func (s *sqlStore) initDB(database, table string) error { // Create the namespace's database - _, err := s.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s ;", s.database)) + _, err := s.db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s ;", database)) if err != nil { return err } - _, err = s.db.Exec(fmt.Sprintf("SET DATABASE = %s ;", s.database)) + _, err = s.db.Exec(fmt.Sprintf("SET DATABASE = %s ;", database)) if err != nil { return errors.Wrap(err, "Couldn't set database") } @@ -240,58 +78,17 @@ func (s *sqlStore) initDB() error { value bytea, expiry timestamp with time zone, CONSTRAINT %s_pkey PRIMARY KEY (key) - );`, s.table, s.table)) + );`, table, table)) if err != nil { return errors.Wrap(err, "Couldn't create table") } // Create Index - _, err = s.db.Exec(fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "%s" ON %s.%s USING btree ("key");`, "key_index_"+s.table, s.database, s.table)) + _, err = s.db.Exec(fmt.Sprintf(`CREATE INDEX IF NOT EXISTS "%s" ON %s.%s USING btree ("key");`, "key_index_"+table, database, table)) if err != nil { return err } - list, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s;", s.database, s.table)) - if err != nil { - return errors.Wrap(err, "List statement couldn't be prepared") - } - closeStmt(s.list) - s.list = list - readOne, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key = $1;", s.database, s.table)) - if err != nil { - return errors.Wrap(err, "ReadOne statement couldn't be prepared") - } - closeStmt(s.readOne) - s.readOne = readOne - readMany, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1;", s.database, s.table)) - if err != nil { - return errors.Wrap(err, "ReadMany statement couldn't be prepared") - } - closeStmt(s.readMany) - s.readMany = readMany - readOffset, err := s.db.Prepare(fmt.Sprintf("SELECT key, value, expiry FROM %s.%s WHERE key LIKE $1 ORDER BY key DESC LIMIT $2 OFFSET $3;", s.database, s.table)) - if err != nil { - return errors.Wrap(err, "ReadOffset statement couldn't be prepared") - } - closeStmt(s.readOffset) - s.readOffset = readOffset - write, err := s.db.Prepare(fmt.Sprintf(`INSERT INTO %s.%s(key, value, expiry) - VALUES ($1, $2::bytea, $3) - ON CONFLICT (key) - DO UPDATE - SET value = EXCLUDED.value, expiry = EXCLUDED.expiry;`, s.database, s.table)) - if err != nil { - return errors.Wrap(err, "Write statement couldn't be prepared") - } - closeStmt(s.write) - s.write = write - delete, err := s.db.Prepare(fmt.Sprintf("DELETE FROM %s.%s WHERE key = $1;", s.database, s.table)) - if err != nil { - return errors.Wrap(err, "Delete statement couldn't be prepared") - } - closeStmt(s.delete) - s.delete = delete - return nil } @@ -344,21 +141,286 @@ func (s *sqlStore) configure() error { // save the values s.db = db - s.database = database - s.table = table // initialise the database - return s.initDB() + return s.initDB(s.options.Database, s.options.Table) } -func (s *sqlStore) String() string { - return "cockroach" +func (s *sqlStore) prepare(database, table, query string) (*sql.Stmt, error) { + st, ok := statements[query] + if !ok { + return nil, errors.New("unsupported statement") + } + if len(database) == 0 { + database = s.options.Database + } + if len(table) == 0 { + table = s.options.Table + } + + q := fmt.Sprintf(st, database, table) + stmt, err := s.db.Prepare(q) + if err != nil { + return nil, err + } + return stmt, nil +} + +func (s *sqlStore) Close() error { + if s.db != nil { + return s.db.Close() + } + return nil +} + +func (s *sqlStore) Init(opts ...store.Option) error { + for _, o := range opts { + o(&s.options) + } + // reconfigure + return s.configure() +} + +// List all the known records +func (s *sqlStore) List(opts ...store.ListOption) ([]string, error) { + var options store.ListOptions + for _, o := range opts { + o(&options) + } + + // create the db if not exists + s.createDB(options.Database, options.Table) + + st, err := s.prepare(options.Database, options.Table, "list") + if err != nil { + return nil, err + } + defer st.Close() + + rows, err := st.Query() + if err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + defer rows.Close() + + var keys []string + var timehelper pq.NullTime + + for rows.Next() { + record := &store.Record{} + if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil { + return keys, err + } + if timehelper.Valid { + if timehelper.Time.Before(time.Now()) { + // record has expired + go s.Delete(record.Key) + } else { + record.Expiry = time.Until(timehelper.Time) + keys = append(keys, record.Key) + } + } else { + keys = append(keys, record.Key) + } + + } + rowErr := rows.Close() + if rowErr != nil { + // transaction rollback or something + return keys, rowErr + } + if err := rows.Err(); err != nil { + return keys, err + } + return keys, nil +} + +// Read a single key +func (s *sqlStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { + var options store.ReadOptions + for _, o := range opts { + o(&options) + } + + // create the db if not exists + s.createDB(options.Database, options.Table) + + if options.Prefix || options.Suffix { + return s.read(key, options) + } + + var records []*store.Record + var timehelper pq.NullTime + + st, err := s.prepare(options.Database, options.Table, "read") + if err != nil { + return nil, err + } + defer st.Close() + + row := st.QueryRow(key) + record := &store.Record{} + if err := row.Scan(&record.Key, &record.Value, &timehelper); err != nil { + if err == sql.ErrNoRows { + return records, store.ErrNotFound + } + return records, err + } + if timehelper.Valid { + if timehelper.Time.Before(time.Now()) { + // record has expired + go s.Delete(key) + return records, store.ErrNotFound + } + record.Expiry = time.Until(timehelper.Time) + records = append(records, record) + } else { + records = append(records, record) + } + + return records, nil +} + +// Read Many records +func (s *sqlStore) read(key string, options store.ReadOptions) ([]*store.Record, error) { + pattern := "%" + if options.Prefix { + pattern = key + pattern + } + if options.Suffix { + pattern = pattern + key + } + + var rows *sql.Rows + var err error + + if options.Limit != 0 { + st, err := s.prepare(options.Database, options.Table, "readOffset") + if err != nil { + return nil, err + } + defer st.Close() + + rows, err = st.Query(pattern, options.Limit, options.Offset) + } else { + st, err := s.prepare(options.Database, options.Table, "readMany") + if err != nil { + return nil, err + } + defer st.Close() + + rows, err = st.Query(pattern) + } + if err != nil { + if err == sql.ErrNoRows { + return []*store.Record{}, nil + } + return []*store.Record{}, errors.Wrap(err, "sqlStore.read failed") + } + + defer rows.Close() + + var records []*store.Record + var timehelper pq.NullTime + + for rows.Next() { + record := &store.Record{} + if err := rows.Scan(&record.Key, &record.Value, &timehelper); err != nil { + return records, err + } + if timehelper.Valid { + if timehelper.Time.Before(time.Now()) { + // record has expired + go s.Delete(record.Key) + } else { + record.Expiry = time.Until(timehelper.Time) + records = append(records, record) + } + } else { + records = append(records, record) + } + } + rowErr := rows.Close() + if rowErr != nil { + // transaction rollback or something + return records, rowErr + } + if err := rows.Err(); err != nil { + return records, err + } + + return records, nil +} + +// Write records +func (s *sqlStore) Write(r *store.Record, opts ...store.WriteOption) error { + var options store.WriteOptions + for _, o := range opts { + o(&options) + } + + // create the db if not exists + s.createDB(options.Database, options.Table) + + st, err := s.prepare(options.Database, options.Table, "write") + if err != nil { + return err + } + defer st.Close() + + if r.Expiry != 0 { + _, err = st.Exec(r.Key, r.Value, time.Now().Add(r.Expiry)) + } else { + _, err = st.Exec(r.Key, r.Value, nil) + } + + if err != nil { + return errors.Wrap(err, "Couldn't insert record "+r.Key) + } + + return nil +} + +// Delete records with keys +func (s *sqlStore) Delete(key string, opts ...store.DeleteOption) error { + var options store.DeleteOptions + for _, o := range opts { + o(&options) + } + + // create the db if not exists + s.createDB(options.Database, options.Table) + + st, err := s.prepare(options.Database, options.Table, "delete") + if err != nil { + return err + } + defer st.Close() + + result, err := st.Exec(key) + if err != nil { + return err + } + + _, err = result.RowsAffected() + if err != nil { + return err + } + + return nil } func (s *sqlStore) Options() store.Options { return s.options } +func (s *sqlStore) String() string { + return "cockroach" +} + // NewStore returns a new micro Store backed by sql func NewStore(opts ...store.Option) store.Store { var options store.Options @@ -370,16 +432,11 @@ func NewStore(opts ...store.Option) store.Store { s := new(sqlStore) // set the options s.options = options - + // mark known databases + s.databases = make(map[string]bool) // best-effort configure the store s.configure() // return store return s } - -func closeStmt(s *sql.Stmt) { - if s != nil { - s.Close() - } -} diff --git a/store/file/file.go b/store/file/file.go index 5f0fc95e..5abd6da0 100644 --- a/store/file/file.go +++ b/store/file/file.go @@ -7,10 +7,10 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/micro/go-micro/v2/store" - "github.com/pkg/errors" bolt "go.etcd.io/bbolt" ) @@ -29,18 +29,25 @@ var ( // NewStore returns a memory store func NewStore(opts ...store.Option) store.Store { - s := &fileStore{} + s := &fileStore{ + handles: make(map[string]*fileHandle), + } s.init(opts...) return s } type fileStore struct { - options store.Options - dir string - fileName string - dbPath string + options store.Options + dir string + // the database handle - db *bolt.DB + sync.RWMutex + handles map[string]*fileHandle +} + +type fileHandle struct { + key string + db *bolt.DB } // record stored by us @@ -50,8 +57,12 @@ type record struct { ExpiresAt time.Time } -func (m *fileStore) delete(key string) error { - return m.db.Update(func(tx *bolt.Tx) error { +func key(database, table string) string { + return database + ":" + table +} + +func (m *fileStore) delete(fd *fileHandle, key string) error { + return fd.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(dataBucket)) if b == nil { return nil @@ -76,42 +87,63 @@ func (m *fileStore) init(opts ...store.Option) error { // create a directory /tmp/micro dir := filepath.Join(DefaultDir, m.options.Database) - // create the database handle - fname := m.options.Table + ".db" // Ignoring this as the folder might exist. // Reads/Writes updates will return with sensible error messages // about the dir not existing in case this cannot create the path anyway os.MkdirAll(dir, 0700) - m.dir = dir - m.fileName = fname - m.dbPath = filepath.Join(dir, fname) - - // close existing handle - if m.db != nil { - m.db.Close() - } - - // create new db handle - db, err := bolt.Open(m.dbPath, 0700, &bolt.Options{Timeout: 5 * time.Second}) - if err != nil { - return err - } - - // set the new db - m.db = db - - // create the table - return db.Update(func(tx *bolt.Tx) error { - _, err := tx.CreateBucketIfNotExists([]byte(dataBucket)) - return err - }) + return nil } -func (m *fileStore) list(limit, offset uint) []string { +func (f *fileStore) getDB(database, table string) (*fileHandle, error) { + if len(database) == 0 { + database = f.options.Database + } + if len(table) == 0 { + table = f.options.Table + } + + k := key(database, table) + + f.RLock() + fd, ok := f.handles[k] + f.RUnlock() + + // return the file handle + if ok { + return fd, nil + } + + // create a directory /tmp/micro + dir := filepath.Join(DefaultDir, database) + // create the database handle + fname := table + ".db" + // make the dir + os.MkdirAll(dir, 0700) + // database path + dbPath := filepath.Join(dir, fname) + + // create new db handle + db, err := bolt.Open(dbPath, 0700, &bolt.Options{Timeout: 5 * time.Second}) + if err != nil { + return nil, err + } + + f.Lock() + fd = &fileHandle{ + key: k, + db: db, + } + f.handles[k] = fd + f.Unlock() + + return fd, nil +} + +func (m *fileStore) list(fd *fileHandle, limit, offset uint) []string { var allItems []string - m.db.View(func(tx *bolt.Tx) error { + fd.db.View(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(dataBucket)) // nothing to read if b == nil { @@ -162,10 +194,10 @@ func (m *fileStore) list(limit, offset uint) []string { return allKeys } -func (m *fileStore) get(k string) (*store.Record, error) { +func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) { var value []byte - m.db.View(func(tx *bolt.Tx) error { + fd.db.View(func(tx *bolt.Tx) error { // @todo this is still very experimental... b := tx.Bucket([]byte(dataBucket)) if b == nil { @@ -200,7 +232,7 @@ func (m *fileStore) get(k string) (*store.Record, error) { return newRecord, nil } -func (m *fileStore) set(r *store.Record) error { +func (m *fileStore) set(fd *fileHandle, r *store.Record) error { // copy the incoming record and then // convert the expiry in to a hard timestamp item := &record{} @@ -213,7 +245,7 @@ func (m *fileStore) set(r *store.Record) error { // marshal the data data, _ := json.Marshal(item) - return m.db.Update(func(tx *bolt.Tx) error { + return fd.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket([]byte(dataBucket)) if b == nil { var err error @@ -226,53 +258,63 @@ func (m *fileStore) set(r *store.Record) error { }) } -func (m *fileStore) Close() error { - if m.db != nil { - return m.db.Close() +func (f *fileStore) Close() error { + f.Lock() + defer f.Unlock() + for k, v := range f.handles { + v.db.Close() + delete(f.handles, k) } return nil } -func (m *fileStore) Init(opts ...store.Option) error { - return m.init(opts...) +func (f *fileStore) Init(opts ...store.Option) error { + return f.init(opts...) } func (m *fileStore) Delete(key string, opts ...store.DeleteOption) error { - deleteOptions := store.DeleteOptions{} + var deleteOptions store.DeleteOptions for _, o := range opts { o(&deleteOptions) } - return m.delete(key) + + fd, err := m.getDB(deleteOptions.Database, deleteOptions.Table) + if err != nil { + return err + } + + return m.delete(fd, key) } func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { - readOpts := store.ReadOptions{} + var readOpts store.ReadOptions for _, o := range opts { o(&readOpts) } + fd, err := m.getDB(readOpts.Database, readOpts.Table) + if err != nil { + return nil, err + } + var keys []string // Handle Prefix / suffix // TODO: do range scan here rather than listing all keys if readOpts.Prefix || readOpts.Suffix { - var opts []store.ListOption - if readOpts.Prefix { - opts = append(opts, store.ListPrefix(key)) - } - if readOpts.Suffix { - opts = append(opts, store.ListSuffix(key)) - } + // list the keys + k := m.list(fd, readOpts.Limit, readOpts.Offset) - opts = append(opts, store.ListLimit(readOpts.Limit)) - opts = append(opts, store.ListOffset(readOpts.Offset)) - - k, err := m.List(opts...) - if err != nil { - return nil, errors.Wrap(err, "FileStore: Read couldn't List()") + // check for prefix and suffix + for _, v := range k { + if readOpts.Prefix && !strings.HasPrefix(v, key) { + continue + } + if readOpts.Suffix && !strings.HasSuffix(v, key) { + continue + } + keys = append(keys, v) } - - keys = k } else { keys = []string{key} } @@ -280,7 +322,7 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, var results []*store.Record for _, k := range keys { - r, err := m.get(k) + r, err := m.get(fd, k) if err != nil { return results, err } @@ -291,11 +333,16 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, } func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error { - writeOpts := store.WriteOptions{} + var writeOpts store.WriteOptions for _, o := range opts { o(&writeOpts) } + fd, err := m.getDB(writeOpts.Database, writeOpts.Table) + if err != nil { + return err + } + if len(opts) > 0 { // Copy the record before applying options, or the incoming record will be mutated newRecord := store.Record{} @@ -310,10 +357,10 @@ func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error { newRecord.Expiry = writeOpts.TTL } - return m.set(&newRecord) + return m.set(fd, &newRecord) } - return m.set(r) + return m.set(fd, r) } func (m *fileStore) Options() store.Options { @@ -321,14 +368,19 @@ func (m *fileStore) Options() store.Options { } func (m *fileStore) List(opts ...store.ListOption) ([]string, error) { - listOptions := store.ListOptions{} + var listOptions store.ListOptions for _, o := range opts { o(&listOptions) } + fd, err := m.getDB(listOptions.Database, listOptions.Table) + if err != nil { + return nil, err + } + // TODO apply prefix/suffix in range query - allKeys := m.list(listOptions.Limit, listOptions.Offset) + allKeys := m.list(fd, listOptions.Limit, listOptions.Offset) if len(listOptions.Prefix) > 0 { var prefixKeys []string diff --git a/store/memory/memory.go b/store/memory/memory.go index 97d19072..cbe6e38d 100644 --- a/store/memory/memory.go +++ b/store/memory/memory.go @@ -2,12 +2,12 @@ package memory import ( + "path/filepath" "sort" "strings" "time" "github.com/micro/go-micro/v2/store" - "github.com/patrickmn/go-cache" "github.com/pkg/errors" ) @@ -15,8 +15,11 @@ import ( // NewStore returns a memory store func NewStore(opts ...store.Option) store.Store { s := &memoryStore{ - options: store.Options{}, - store: cache.New(cache.NoExpiration, 5*time.Minute), + options: store.Options{ + Database: "micro", + Table: "micro", + }, + store: cache.New(cache.NoExpiration, 5*time.Minute), } for _, o := range opts { o(&s.options) @@ -30,6 +33,101 @@ type memoryStore struct { store *cache.Cache } +type internalRecord struct { + key string + value []byte + expiresAt time.Time +} + +func (m *memoryStore) key(prefix, key string) string { + return filepath.Join(prefix, key) +} + +func (m *memoryStore) prefix(database, table string) string { + if len(database) == 0 { + database = m.options.Database + } + if len(table) == 0 { + table = m.options.Table + } + return filepath.Join(database, table) +} + +func (m *memoryStore) get(prefix, key string) (*store.Record, error) { + key = m.key(prefix, key) + + var storedRecord *internalRecord + r, found := m.store.Get(key) + if !found { + return nil, store.ErrNotFound + } + + storedRecord, ok := r.(*internalRecord) + if !ok { + return nil, errors.New("Retrieved a non *internalRecord from the cache") + } + + // Copy the record on the way out + newRecord := &store.Record{} + newRecord.Key = strings.TrimPrefix(storedRecord.key, prefix+"/") + newRecord.Value = make([]byte, len(storedRecord.value)) + copy(newRecord.Value, storedRecord.value) + if !storedRecord.expiresAt.IsZero() { + newRecord.Expiry = time.Until(storedRecord.expiresAt) + } + + return newRecord, nil +} + +func (m *memoryStore) set(prefix string, r *store.Record) { + key := m.key(prefix, r.Key) + + // copy the incoming record and then + // convert the expiry in to a hard timestamp + i := &internalRecord{} + i.key = r.Key + i.value = make([]byte, len(r.Value)) + copy(i.value, r.Value) + + if r.Expiry != 0 { + i.expiresAt = time.Now().Add(r.Expiry) + } + + m.store.Set(key, i, r.Expiry) +} + +func (m *memoryStore) delete(prefix, key string) { + key = m.key(prefix, key) + m.store.Delete(key) +} + +func (m *memoryStore) list(prefix string, limit, offset uint) []string { + allItems := m.store.Items() + allKeys := make([]string, len(allItems)) + i := 0 + + for k := range allItems { + if !strings.HasPrefix(k, prefix+"/") { + continue + } + allKeys[i] = strings.TrimPrefix(k, prefix+"/") + i++ + } + + if limit != 0 || offset != 0 { + sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] }) + min := func(i, j uint) uint { + if i < j { + return i + } + return j + } + return allKeys[offset:min(limit, uint(len(allKeys)))] + } + + return allKeys +} + func (m *memoryStore) Close() error { m.store.Flush() return nil @@ -53,31 +151,33 @@ func (m *memoryStore) Read(key string, opts ...store.ReadOption) ([]*store.Recor o(&readOpts) } + prefix := m.prefix(readOpts.Database, readOpts.Table) + var keys []string // Handle Prefix / suffix if readOpts.Prefix || readOpts.Suffix { - var opts []store.ListOption - if readOpts.Prefix { - opts = append(opts, store.ListPrefix(key)) + k := m.list(prefix, readOpts.Limit, readOpts.Offset) + + for _, kk := range k { + if readOpts.Prefix && !strings.HasPrefix(kk, key) { + continue + } + + if readOpts.Suffix && !strings.HasSuffix(kk, key) { + continue + } + + keys = append(keys, kk) } - if readOpts.Suffix { - opts = append(opts, store.ListSuffix(key)) - } - opts = append(opts, store.ListLimit(readOpts.Limit)) - opts = append(opts, store.ListOffset(readOpts.Offset)) - k, err := m.List(opts...) - if err != nil { - return nil, errors.Wrap(err, "Memory: Read couldn't List()") - } - keys = k } else { keys = []string{key} } var results []*store.Record + for _, k := range keys { - r, err := m.get(k) + r, err := m.get(prefix, k) if err != nil { return results, err } @@ -87,40 +187,14 @@ func (m *memoryStore) Read(key string, opts ...store.ReadOption) ([]*store.Recor return results, nil } -func (m *memoryStore) get(k string) (*store.Record, error) { - if len(m.options.Table) > 0 { - k = m.options.Table + "/" + k - } - if len(m.options.Database) > 0 { - k = m.options.Database + "/" + k - } - var storedRecord *internalRecord - r, found := m.store.Get(k) - if !found { - return nil, store.ErrNotFound - } - storedRecord, ok := r.(*internalRecord) - if !ok { - return nil, errors.New("Retrieved a non *internalRecord from the cache") - } - // Copy the record on the way out - newRecord := &store.Record{} - newRecord.Key = storedRecord.key - newRecord.Value = make([]byte, len(storedRecord.value)) - copy(newRecord.Value, storedRecord.value) - if !storedRecord.expiresAt.IsZero() { - newRecord.Expiry = time.Until(storedRecord.expiresAt) - } - - return newRecord, nil -} - func (m *memoryStore) Write(r *store.Record, opts ...store.WriteOption) error { writeOpts := store.WriteOptions{} for _, o := range opts { o(&writeOpts) } + prefix := m.prefix(writeOpts.Database, writeOpts.Table) + if len(opts) > 0 { // Copy the record before applying options, or the incoming record will be mutated newRecord := store.Record{} @@ -135,52 +209,25 @@ func (m *memoryStore) Write(r *store.Record, opts ...store.WriteOption) error { if writeOpts.TTL != 0 { newRecord.Expiry = writeOpts.TTL } - m.set(&newRecord) - } else { - m.set(r) + m.set(prefix, &newRecord) + return nil } + + // set + m.set(prefix, r) + return nil } -func (m *memoryStore) set(r *store.Record) { - key := r.Key - if len(m.options.Table) > 0 { - key = m.options.Table + "/" + key - } - if len(m.options.Database) > 0 { - key = m.options.Database + "/" + key - } - - // copy the incoming record and then - // convert the expiry in to a hard timestamp - i := &internalRecord{} - i.key = r.Key - i.value = make([]byte, len(r.Value)) - copy(i.value, r.Value) - if r.Expiry != 0 { - i.expiresAt = time.Now().Add(r.Expiry) - } - - m.store.Set(key, i, r.Expiry) -} - func (m *memoryStore) Delete(key string, opts ...store.DeleteOption) error { deleteOptions := store.DeleteOptions{} for _, o := range opts { o(&deleteOptions) } - m.delete(key) - return nil -} -func (m *memoryStore) delete(key string) { - if len(m.options.Table) > 0 { - key = m.options.Table + "/" + key - } - if len(m.options.Database) > 0 { - key = m.options.Database + "/" + key - } - m.store.Delete(key) + prefix := m.prefix(deleteOptions.Database, deleteOptions.Table) + m.delete(prefix, key) + return nil } func (m *memoryStore) Options() store.Options { @@ -193,59 +240,29 @@ func (m *memoryStore) List(opts ...store.ListOption) ([]string, error) { for _, o := range opts { o(&listOptions) } - allKeys := m.list(listOptions.Limit, listOptions.Offset) + + prefix := m.prefix(listOptions.Database, listOptions.Table) + keys := m.list(prefix, listOptions.Limit, listOptions.Offset) if len(listOptions.Prefix) > 0 { var prefixKeys []string - for _, k := range allKeys { + for _, k := range keys { if strings.HasPrefix(k, listOptions.Prefix) { prefixKeys = append(prefixKeys, k) } } - allKeys = prefixKeys + keys = prefixKeys } + if len(listOptions.Suffix) > 0 { var suffixKeys []string - for _, k := range allKeys { + for _, k := range keys { if strings.HasSuffix(k, listOptions.Suffix) { suffixKeys = append(suffixKeys, k) } } - allKeys = suffixKeys + keys = suffixKeys } - return allKeys, nil -} - -func (m *memoryStore) list(limit, offset uint) []string { - allItems := m.store.Items() - allKeys := make([]string, len(allItems)) - i := 0 - for k := range allItems { - if len(m.options.Database) > 0 { - k = strings.TrimPrefix(k, m.options.Database+"/") - } - if len(m.options.Table) > 0 { - k = strings.TrimPrefix(k, m.options.Table+"/") - } - allKeys[i] = k - i++ - } - if limit != 0 || offset != 0 { - sort.Slice(allKeys, func(i, j int) bool { return allKeys[i] < allKeys[j] }) - min := func(i, j uint) uint { - if i < j { - return i - } - return j - } - return allKeys[offset:min(limit, uint(len(allKeys)))] - } - return allKeys -} - -type internalRecord struct { - key string - value []byte - expiresAt time.Time + return keys, nil } diff --git a/store/memory/memory_test.go b/store/memory/memory_test.go index 8d51ea0d..4598a667 100644 --- a/store/memory/memory_test.go +++ b/store/memory/memory_test.go @@ -259,13 +259,13 @@ func basictest(s store.Store, t *testing.T) { t.Error(err) } else { if len(results) != 5 { - t.Error("Expected 5 results, got ", len(results)) + t.Fatal("Expected 5 results, got ", len(results)) } if results[0].Key != "a0" { - t.Errorf("Expected a0, got %s", results[0].Key) + t.Fatalf("Expected a0, got %s", results[0].Key) } if results[4].Key != "a4" { - t.Errorf("Expected a4, got %s", results[4].Key) + t.Fatalf("Expected a4, got %s", results[4].Key) } } if results, err := s.Read("a", store.ReadLimit(30), store.ReadOffset(5), store.ReadPrefix()); err != nil { diff --git a/store/service/proto/store.pb.go b/store/service/proto/store.pb.go index e6ea5285..71af1132 100644 --- a/store/service/proto/store.pb.go +++ b/store/service/proto/store.pb.go @@ -1,15 +1,11 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: store/service/proto/store.proto +// source: github.com/micro/go-micro/store/service/proto/store.proto package go_micro_store import ( - context "context" fmt "fmt" proto "github.com/golang/protobuf/proto" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" math "math" ) @@ -40,7 +36,7 @@ func (m *Record) Reset() { *m = Record{} } func (m *Record) String() string { return proto.CompactTextString(m) } func (*Record) ProtoMessage() {} func (*Record) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{0} + return fileDescriptor_42854049893ccb13, []int{0} } func (m *Record) XXX_Unmarshal(b []byte) error { @@ -83,10 +79,12 @@ func (m *Record) GetExpiry() int64 { } type ReadOptions struct { - Prefix bool `protobuf:"varint,1,opt,name=prefix,proto3" json:"prefix,omitempty"` - Suffix bool `protobuf:"varint,2,opt,name=suffix,proto3" json:"suffix,omitempty"` - Limit uint64 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"` - Offset uint64 `protobuf:"varint,4,opt,name=offset,proto3" json:"offset,omitempty"` + Database string `protobuf:"bytes,1,opt,name=database,proto3" json:"database,omitempty"` + Table string `protobuf:"bytes,2,opt,name=table,proto3" json:"table,omitempty"` + Prefix bool `protobuf:"varint,3,opt,name=prefix,proto3" json:"prefix,omitempty"` + Suffix bool `protobuf:"varint,4,opt,name=suffix,proto3" json:"suffix,omitempty"` + Limit uint64 `protobuf:"varint,5,opt,name=limit,proto3" json:"limit,omitempty"` + Offset uint64 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -96,7 +94,7 @@ func (m *ReadOptions) Reset() { *m = ReadOptions{} } func (m *ReadOptions) String() string { return proto.CompactTextString(m) } func (*ReadOptions) ProtoMessage() {} func (*ReadOptions) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{1} + return fileDescriptor_42854049893ccb13, []int{1} } func (m *ReadOptions) XXX_Unmarshal(b []byte) error { @@ -117,6 +115,20 @@ func (m *ReadOptions) XXX_DiscardUnknown() { var xxx_messageInfo_ReadOptions proto.InternalMessageInfo +func (m *ReadOptions) GetDatabase() string { + if m != nil { + return m.Database + } + return "" +} + +func (m *ReadOptions) GetTable() string { + if m != nil { + return m.Table + } + return "" +} + func (m *ReadOptions) GetPrefix() bool { if m != nil { return m.Prefix @@ -157,7 +169,7 @@ func (m *ReadRequest) Reset() { *m = ReadRequest{} } func (m *ReadRequest) String() string { return proto.CompactTextString(m) } func (*ReadRequest) ProtoMessage() {} func (*ReadRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{2} + return fileDescriptor_42854049893ccb13, []int{2} } func (m *ReadRequest) XXX_Unmarshal(b []byte) error { @@ -203,7 +215,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} } func (m *ReadResponse) String() string { return proto.CompactTextString(m) } func (*ReadResponse) ProtoMessage() {} func (*ReadResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{3} + return fileDescriptor_42854049893ccb13, []int{3} } func (m *ReadResponse) XXX_Unmarshal(b []byte) error { @@ -232,10 +244,12 @@ func (m *ReadResponse) GetRecords() []*Record { } type WriteOptions struct { + Database string `protobuf:"bytes,1,opt,name=database,proto3" json:"database,omitempty"` + Table string `protobuf:"bytes,2,opt,name=table,proto3" json:"table,omitempty"` // time.Time - Expiry int64 `protobuf:"varint,1,opt,name=expiry,proto3" json:"expiry,omitempty"` + Expiry int64 `protobuf:"varint,3,opt,name=expiry,proto3" json:"expiry,omitempty"` // time.Duration - Ttl int64 `protobuf:"varint,2,opt,name=ttl,proto3" json:"ttl,omitempty"` + Ttl int64 `protobuf:"varint,4,opt,name=ttl,proto3" json:"ttl,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -245,7 +259,7 @@ func (m *WriteOptions) Reset() { *m = WriteOptions{} } func (m *WriteOptions) String() string { return proto.CompactTextString(m) } func (*WriteOptions) ProtoMessage() {} func (*WriteOptions) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{4} + return fileDescriptor_42854049893ccb13, []int{4} } func (m *WriteOptions) XXX_Unmarshal(b []byte) error { @@ -266,6 +280,20 @@ func (m *WriteOptions) XXX_DiscardUnknown() { var xxx_messageInfo_WriteOptions proto.InternalMessageInfo +func (m *WriteOptions) GetDatabase() string { + if m != nil { + return m.Database + } + return "" +} + +func (m *WriteOptions) GetTable() string { + if m != nil { + return m.Table + } + return "" +} + func (m *WriteOptions) GetExpiry() int64 { if m != nil { return m.Expiry @@ -292,7 +320,7 @@ func (m *WriteRequest) Reset() { *m = WriteRequest{} } func (m *WriteRequest) String() string { return proto.CompactTextString(m) } func (*WriteRequest) ProtoMessage() {} func (*WriteRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{5} + return fileDescriptor_42854049893ccb13, []int{5} } func (m *WriteRequest) XXX_Unmarshal(b []byte) error { @@ -337,7 +365,7 @@ func (m *WriteResponse) Reset() { *m = WriteResponse{} } func (m *WriteResponse) String() string { return proto.CompactTextString(m) } func (*WriteResponse) ProtoMessage() {} func (*WriteResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{6} + return fileDescriptor_42854049893ccb13, []int{6} } func (m *WriteResponse) XXX_Unmarshal(b []byte) error { @@ -359,6 +387,8 @@ func (m *WriteResponse) XXX_DiscardUnknown() { var xxx_messageInfo_WriteResponse proto.InternalMessageInfo type DeleteOptions struct { + Database string `protobuf:"bytes,1,opt,name=database,proto3" json:"database,omitempty"` + Table string `protobuf:"bytes,2,opt,name=table,proto3" json:"table,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -368,7 +398,7 @@ func (m *DeleteOptions) Reset() { *m = DeleteOptions{} } func (m *DeleteOptions) String() string { return proto.CompactTextString(m) } func (*DeleteOptions) ProtoMessage() {} func (*DeleteOptions) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{7} + return fileDescriptor_42854049893ccb13, []int{7} } func (m *DeleteOptions) XXX_Unmarshal(b []byte) error { @@ -389,6 +419,20 @@ func (m *DeleteOptions) XXX_DiscardUnknown() { var xxx_messageInfo_DeleteOptions proto.InternalMessageInfo +func (m *DeleteOptions) GetDatabase() string { + if m != nil { + return m.Database + } + return "" +} + +func (m *DeleteOptions) GetTable() string { + if m != nil { + return m.Table + } + return "" +} + type DeleteRequest struct { Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` Options *DeleteOptions `protobuf:"bytes,2,opt,name=options,proto3" json:"options,omitempty"` @@ -401,7 +445,7 @@ func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } func (m *DeleteRequest) String() string { return proto.CompactTextString(m) } func (*DeleteRequest) ProtoMessage() {} func (*DeleteRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{8} + return fileDescriptor_42854049893ccb13, []int{8} } func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { @@ -446,7 +490,7 @@ func (m *DeleteResponse) Reset() { *m = DeleteResponse{} } func (m *DeleteResponse) String() string { return proto.CompactTextString(m) } func (*DeleteResponse) ProtoMessage() {} func (*DeleteResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{9} + return fileDescriptor_42854049893ccb13, []int{9} } func (m *DeleteResponse) XXX_Unmarshal(b []byte) error { @@ -468,10 +512,12 @@ func (m *DeleteResponse) XXX_DiscardUnknown() { var xxx_messageInfo_DeleteResponse proto.InternalMessageInfo type ListOptions struct { - Prefix string `protobuf:"bytes,1,opt,name=prefix,proto3" json:"prefix,omitempty"` - Suffix string `protobuf:"bytes,2,opt,name=suffix,proto3" json:"suffix,omitempty"` - Limit uint64 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"` - Offset uint64 `protobuf:"varint,4,opt,name=offset,proto3" json:"offset,omitempty"` + Database string `protobuf:"bytes,1,opt,name=database,proto3" json:"database,omitempty"` + Table string `protobuf:"bytes,2,opt,name=table,proto3" json:"table,omitempty"` + Prefix string `protobuf:"bytes,3,opt,name=prefix,proto3" json:"prefix,omitempty"` + Suffix string `protobuf:"bytes,4,opt,name=suffix,proto3" json:"suffix,omitempty"` + Limit uint64 `protobuf:"varint,5,opt,name=limit,proto3" json:"limit,omitempty"` + Offset uint64 `protobuf:"varint,6,opt,name=offset,proto3" json:"offset,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -481,7 +527,7 @@ func (m *ListOptions) Reset() { *m = ListOptions{} } func (m *ListOptions) String() string { return proto.CompactTextString(m) } func (*ListOptions) ProtoMessage() {} func (*ListOptions) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{10} + return fileDescriptor_42854049893ccb13, []int{10} } func (m *ListOptions) XXX_Unmarshal(b []byte) error { @@ -502,6 +548,20 @@ func (m *ListOptions) XXX_DiscardUnknown() { var xxx_messageInfo_ListOptions proto.InternalMessageInfo +func (m *ListOptions) GetDatabase() string { + if m != nil { + return m.Database + } + return "" +} + +func (m *ListOptions) GetTable() string { + if m != nil { + return m.Table + } + return "" +} + func (m *ListOptions) GetPrefix() string { if m != nil { return m.Prefix @@ -541,7 +601,7 @@ func (m *ListRequest) Reset() { *m = ListRequest{} } func (m *ListRequest) String() string { return proto.CompactTextString(m) } func (*ListRequest) ProtoMessage() {} func (*ListRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{11} + return fileDescriptor_42854049893ccb13, []int{11} } func (m *ListRequest) XXX_Unmarshal(b []byte) error { @@ -580,7 +640,7 @@ func (m *ListResponse) Reset() { *m = ListResponse{} } func (m *ListResponse) String() string { return proto.CompactTextString(m) } func (*ListResponse) ProtoMessage() {} func (*ListResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{12} + return fileDescriptor_42854049893ccb13, []int{12} } func (m *ListResponse) XXX_Unmarshal(b []byte) error { @@ -618,7 +678,7 @@ func (m *DatabasesRequest) Reset() { *m = DatabasesRequest{} } func (m *DatabasesRequest) String() string { return proto.CompactTextString(m) } func (*DatabasesRequest) ProtoMessage() {} func (*DatabasesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{13} + return fileDescriptor_42854049893ccb13, []int{13} } func (m *DatabasesRequest) XXX_Unmarshal(b []byte) error { @@ -650,7 +710,7 @@ func (m *DatabasesResponse) Reset() { *m = DatabasesResponse{} } func (m *DatabasesResponse) String() string { return proto.CompactTextString(m) } func (*DatabasesResponse) ProtoMessage() {} func (*DatabasesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{14} + return fileDescriptor_42854049893ccb13, []int{14} } func (m *DatabasesResponse) XXX_Unmarshal(b []byte) error { @@ -689,7 +749,7 @@ func (m *TablesRequest) Reset() { *m = TablesRequest{} } func (m *TablesRequest) String() string { return proto.CompactTextString(m) } func (*TablesRequest) ProtoMessage() {} func (*TablesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{15} + return fileDescriptor_42854049893ccb13, []int{15} } func (m *TablesRequest) XXX_Unmarshal(b []byte) error { @@ -728,7 +788,7 @@ func (m *TablesResponse) Reset() { *m = TablesResponse{} } func (m *TablesResponse) String() string { return proto.CompactTextString(m) } func (*TablesResponse) ProtoMessage() {} func (*TablesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_1ba364858f5c3cdb, []int{16} + return fileDescriptor_42854049893ccb13, []int{16} } func (m *TablesResponse) XXX_Unmarshal(b []byte) error { @@ -776,332 +836,48 @@ func init() { proto.RegisterType((*TablesResponse)(nil), "go.micro.store.TablesResponse") } -func init() { proto.RegisterFile("store/service/proto/store.proto", fileDescriptor_1ba364858f5c3cdb) } - -var fileDescriptor_1ba364858f5c3cdb = []byte{ - // 563 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0x5d, 0x8f, 0xd2, 0x40, - 0x14, 0xa5, 0xb4, 0x74, 0xe9, 0x85, 0x45, 0x9c, 0x18, 0x42, 0x90, 0x55, 0x9c, 0xa7, 0x26, 0x26, - 0x65, 0xc5, 0xf8, 0xf1, 0x68, 0x22, 0x1a, 0x35, 0x26, 0x26, 0xa3, 0xd1, 0xc4, 0xb7, 0x02, 0x83, - 0x69, 0x60, 0x77, 0x6a, 0x67, 0x20, 0xcb, 0x0f, 0xf4, 0x7f, 0x99, 0xf9, 0x2a, 0xa5, 0xb4, 0x3e, - 0xf8, 0x36, 0xf7, 0xcc, 0x9d, 0x73, 0xee, 0xb9, 0xf7, 0xb6, 0xf0, 0x98, 0x0b, 0x96, 0xd1, 0x29, - 0xa7, 0xd9, 0x3e, 0x59, 0xd2, 0x69, 0x9a, 0x31, 0xc1, 0xa6, 0x0a, 0x8b, 0xd4, 0x19, 0xf5, 0x7e, - 0xb1, 0xe8, 0x26, 0x59, 0x66, 0x2c, 0x52, 0x28, 0xfe, 0x00, 0x3e, 0xa1, 0x4b, 0x96, 0xad, 0x50, - 0x1f, 0xdc, 0x0d, 0x3d, 0x0c, 0x9d, 0x89, 0x13, 0x06, 0x44, 0x1e, 0xd1, 0x03, 0x68, 0xed, 0xe3, - 0xed, 0x8e, 0x0e, 0x9b, 0x13, 0x27, 0xec, 0x12, 0x1d, 0xa0, 0x01, 0xf8, 0xf4, 0x2e, 0x4d, 0xb2, - 0xc3, 0xd0, 0x9d, 0x38, 0xa1, 0x4b, 0x4c, 0x84, 0x37, 0xd0, 0x21, 0x34, 0x5e, 0x7d, 0x49, 0x45, - 0xc2, 0x6e, 0xb9, 0x4c, 0x4b, 0x33, 0xba, 0x4e, 0xee, 0x14, 0x63, 0x9b, 0x98, 0x48, 0xe2, 0x7c, - 0xb7, 0x96, 0x78, 0x53, 0xe3, 0x3a, 0x92, 0x62, 0xdb, 0xe4, 0x26, 0x11, 0x8a, 0xd5, 0x23, 0x3a, - 0x90, 0xd9, 0x6c, 0xbd, 0xe6, 0x54, 0x0c, 0x3d, 0x05, 0x9b, 0x08, 0x7f, 0xd7, 0x62, 0x84, 0xfe, - 0xde, 0x51, 0x2e, 0x2a, 0x6a, 0x7f, 0x01, 0x17, 0x4c, 0x57, 0xa2, 0x74, 0x3a, 0xb3, 0x87, 0xd1, - 0xa9, 0xf3, 0xa8, 0x50, 0x2c, 0xb1, 0xb9, 0xf8, 0x0d, 0x74, 0x35, 0x2f, 0x4f, 0xd9, 0x2d, 0xa7, - 0xe8, 0x1a, 0x2e, 0x32, 0xd5, 0x1e, 0x3e, 0x74, 0x26, 0x6e, 0xd8, 0x99, 0x0d, 0xce, 0x69, 0xe4, - 0x35, 0xb1, 0x69, 0xf8, 0x35, 0x74, 0x7f, 0x64, 0x89, 0xa0, 0x85, 0x3e, 0x98, 0x76, 0x39, 0xc5, - 0x76, 0xc9, 0x92, 0x85, 0xd8, 0xaa, 0xe2, 0x5c, 0x22, 0x8f, 0x78, 0x6f, 0x5e, 0x5a, 0x53, 0x11, - 0xf8, 0x9a, 0x54, 0xbd, 0xac, 0x97, 0x36, 0x59, 0xe8, 0x65, 0xd9, 0xf2, 0xb8, 0xfc, 0xa0, 0x58, - 0xd8, 0xd1, 0xf3, 0x3d, 0xb8, 0x34, 0xba, 0xda, 0xb4, 0x04, 0xe6, 0x74, 0x4b, 0xf3, 0x54, 0xfc, - 0xd3, 0x02, 0xf5, 0xfd, 0x7e, 0x55, 0x16, 0xbf, 0x2a, 0x8b, 0x9f, 0x50, 0x1e, 0xd5, 0xfb, 0xd0, - 0xb3, 0xdc, 0x46, 0x7e, 0x03, 0x9d, 0xcf, 0x09, 0x17, 0xd5, 0x8b, 0x14, 0xd4, 0x2c, 0x52, 0xf0, - 0x9f, 0x8b, 0x34, 0xd7, 0x62, 0xd6, 0x58, 0x61, 0x6d, 0x9c, 0xea, 0xb5, 0x29, 0x94, 0x76, 0x34, - 0x11, 0x42, 0x57, 0xb3, 0x98, 0xb5, 0x41, 0xe0, 0x6d, 0xe8, 0x41, 0xb6, 0xc2, 0x0d, 0x03, 0xa2, - 0xce, 0x9f, 0xbc, 0xb6, 0xd3, 0x6f, 0x62, 0x04, 0xfd, 0x79, 0x2c, 0xe2, 0x45, 0xcc, 0x29, 0x37, - 0xa2, 0xf8, 0x19, 0xdc, 0x2f, 0x60, 0x86, 0x62, 0x0c, 0xc1, 0xca, 0x82, 0x6a, 0xf7, 0x02, 0x72, - 0x04, 0xf0, 0x53, 0xb8, 0xfc, 0x16, 0x2f, 0xb6, 0x39, 0x07, 0x1a, 0x41, 0xdb, 0xde, 0x9a, 0x3e, - 0xe5, 0x31, 0x0e, 0xa1, 0x67, 0x93, 0x0d, 0xf9, 0x00, 0x7c, 0xa1, 0x10, 0xc3, 0x6c, 0xa2, 0xd9, - 0x1f, 0x17, 0x5a, 0x5f, 0xa5, 0x4d, 0xf4, 0x16, 0x3c, 0xf9, 0x21, 0xa0, 0xca, 0xcf, 0xc6, 0x88, - 0x8e, 0xc6, 0xd5, 0x97, 0x66, 0x8e, 0x0d, 0xf4, 0x1e, 0x5a, 0x6a, 0xb3, 0x50, 0xf5, 0x26, 0x5a, - 0x9a, 0xab, 0x9a, 0xdb, 0x9c, 0xe7, 0x23, 0xf8, 0x7a, 0x47, 0x50, 0xcd, 0x56, 0x59, 0xa6, 0x47, - 0x75, 0xd7, 0x39, 0xd5, 0x3b, 0xf0, 0xe4, 0xa4, 0x50, 0xe5, 0x5c, 0x6b, 0x7d, 0x15, 0x87, 0x8b, - 0x1b, 0xd7, 0x0e, 0x22, 0x10, 0xe4, 0x23, 0x43, 0x93, 0x33, 0xd5, 0xd2, 0x84, 0x47, 0x4f, 0xfe, - 0x91, 0x51, 0x74, 0xa9, 0xc7, 0x74, 0xee, 0xf2, 0x64, 0xd6, 0xe7, 0x2e, 0x4f, 0xa7, 0x8b, 0x1b, - 0x0b, 0x5f, 0xfd, 0xec, 0x9f, 0xff, 0x0d, 0x00, 0x00, 0xff, 0xff, 0xa8, 0x04, 0x5f, 0x7c, 0x0f, - 0x06, 0x00, 0x00, +func init() { + proto.RegisterFile("github.com/micro/go-micro/store/service/proto/store.proto", fileDescriptor_42854049893ccb13) } -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// StoreClient is the client API for Store service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type StoreClient interface { - Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) - Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) - Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) - List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (Store_ListClient, error) - Databases(ctx context.Context, in *DatabasesRequest, opts ...grpc.CallOption) (*DatabasesResponse, error) - Tables(ctx context.Context, in *TablesRequest, opts ...grpc.CallOption) (*TablesResponse, error) -} - -type storeClient struct { - cc *grpc.ClientConn -} - -func NewStoreClient(cc *grpc.ClientConn) StoreClient { - return &storeClient{cc} -} - -func (c *storeClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) { - out := new(ReadResponse) - err := c.cc.Invoke(ctx, "/go.micro.store.Store/Read", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *storeClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) { - out := new(WriteResponse) - err := c.cc.Invoke(ctx, "/go.micro.store.Store/Write", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *storeClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) { - out := new(DeleteResponse) - err := c.cc.Invoke(ctx, "/go.micro.store.Store/Delete", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *storeClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (Store_ListClient, error) { - stream, err := c.cc.NewStream(ctx, &_Store_serviceDesc.Streams[0], "/go.micro.store.Store/List", opts...) - if err != nil { - return nil, err - } - x := &storeListClient{stream} - if err := x.ClientStream.SendMsg(in); err != nil { - return nil, err - } - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type Store_ListClient interface { - Recv() (*ListResponse, error) - grpc.ClientStream -} - -type storeListClient struct { - grpc.ClientStream -} - -func (x *storeListClient) Recv() (*ListResponse, error) { - m := new(ListResponse) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *storeClient) Databases(ctx context.Context, in *DatabasesRequest, opts ...grpc.CallOption) (*DatabasesResponse, error) { - out := new(DatabasesResponse) - err := c.cc.Invoke(ctx, "/go.micro.store.Store/Databases", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *storeClient) Tables(ctx context.Context, in *TablesRequest, opts ...grpc.CallOption) (*TablesResponse, error) { - out := new(TablesResponse) - err := c.cc.Invoke(ctx, "/go.micro.store.Store/Tables", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// StoreServer is the server API for Store service. -type StoreServer interface { - Read(context.Context, *ReadRequest) (*ReadResponse, error) - Write(context.Context, *WriteRequest) (*WriteResponse, error) - Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) - List(*ListRequest, Store_ListServer) error - Databases(context.Context, *DatabasesRequest) (*DatabasesResponse, error) - Tables(context.Context, *TablesRequest) (*TablesResponse, error) -} - -// UnimplementedStoreServer can be embedded to have forward compatible implementations. -type UnimplementedStoreServer struct { -} - -func (*UnimplementedStoreServer) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Read not implemented") -} -func (*UnimplementedStoreServer) Write(ctx context.Context, req *WriteRequest) (*WriteResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Write not implemented") -} -func (*UnimplementedStoreServer) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented") -} -func (*UnimplementedStoreServer) List(req *ListRequest, srv Store_ListServer) error { - return status.Errorf(codes.Unimplemented, "method List not implemented") -} -func (*UnimplementedStoreServer) Databases(ctx context.Context, req *DatabasesRequest) (*DatabasesResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Databases not implemented") -} -func (*UnimplementedStoreServer) Tables(ctx context.Context, req *TablesRequest) (*TablesResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Tables not implemented") -} - -func RegisterStoreServer(s *grpc.Server, srv StoreServer) { - s.RegisterService(&_Store_serviceDesc, srv) -} - -func _Store_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ReadRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(StoreServer).Read(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.store.Store/Read", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(StoreServer).Read(ctx, req.(*ReadRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Store_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(WriteRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(StoreServer).Write(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.store.Store/Write", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(StoreServer).Write(ctx, req.(*WriteRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Store_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(DeleteRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(StoreServer).Delete(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.store.Store/Delete", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(StoreServer).Delete(ctx, req.(*DeleteRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Store_List_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(ListRequest) - if err := stream.RecvMsg(m); err != nil { - return err - } - return srv.(StoreServer).List(m, &storeListServer{stream}) -} - -type Store_ListServer interface { - Send(*ListResponse) error - grpc.ServerStream -} - -type storeListServer struct { - grpc.ServerStream -} - -func (x *storeListServer) Send(m *ListResponse) error { - return x.ServerStream.SendMsg(m) -} - -func _Store_Databases_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(DatabasesRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(StoreServer).Databases(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.store.Store/Databases", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(StoreServer).Databases(ctx, req.(*DatabasesRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Store_Tables_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(TablesRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(StoreServer).Tables(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/go.micro.store.Store/Tables", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(StoreServer).Tables(ctx, req.(*TablesRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _Store_serviceDesc = grpc.ServiceDesc{ - ServiceName: "go.micro.store.Store", - HandlerType: (*StoreServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Read", - Handler: _Store_Read_Handler, - }, - { - MethodName: "Write", - Handler: _Store_Write_Handler, - }, - { - MethodName: "Delete", - Handler: _Store_Delete_Handler, - }, - { - MethodName: "Databases", - Handler: _Store_Databases_Handler, - }, - { - MethodName: "Tables", - Handler: _Store_Tables_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "List", - Handler: _Store_List_Handler, - ServerStreams: true, - }, - }, - Metadata: "store/service/proto/store.proto", +var fileDescriptor_42854049893ccb13 = []byte{ + // 598 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xdd, 0x8e, 0xd2, 0x40, + 0x14, 0xa6, 0xdb, 0xd2, 0xa5, 0x87, 0x1f, 0x71, 0x62, 0x48, 0x83, 0xac, 0xa9, 0x73, 0xd5, 0xc4, + 0x58, 0x56, 0x8c, 0x1a, 0xef, 0x34, 0xa2, 0x51, 0x63, 0x62, 0x32, 0x1a, 0x4d, 0xbc, 0x2b, 0x30, + 0x60, 0x5d, 0xd8, 0x62, 0x67, 0x20, 0xcb, 0xc3, 0xf8, 0x38, 0xbe, 0x97, 0x99, 0x3f, 0x28, 0xd0, + 0xee, 0x85, 0xbb, 0x77, 0x73, 0xce, 0x1c, 0xbe, 0x39, 0xdf, 0x4f, 0x03, 0xbc, 0x9c, 0x25, 0xfc, + 0xe7, 0x6a, 0x14, 0x8d, 0xd3, 0x45, 0x7f, 0x91, 0x8c, 0xb3, 0xb4, 0x3f, 0x4b, 0x1f, 0xab, 0x03, + 0xe3, 0x69, 0x46, 0xfb, 0x8c, 0x66, 0xeb, 0x64, 0x4c, 0xfb, 0xcb, 0x2c, 0xe5, 0xba, 0x17, 0xc9, + 0x33, 0x6a, 0xcd, 0xd2, 0x48, 0x4e, 0x46, 0xb2, 0x8b, 0xdf, 0x83, 0x4b, 0xe8, 0x38, 0xcd, 0x26, + 0xa8, 0x0d, 0xf6, 0x05, 0xdd, 0xf8, 0x56, 0x60, 0x85, 0x1e, 0x11, 0x47, 0x74, 0x0f, 0xaa, 0xeb, + 0x78, 0xbe, 0xa2, 0xfe, 0x49, 0x60, 0x85, 0x0d, 0xa2, 0x0a, 0xd4, 0x01, 0x97, 0x5e, 0x2d, 0x93, + 0x6c, 0xe3, 0xdb, 0x81, 0x15, 0xda, 0x44, 0x57, 0xf8, 0x8f, 0x05, 0x75, 0x42, 0xe3, 0xc9, 0xe7, + 0x25, 0x4f, 0xd2, 0x4b, 0x86, 0xba, 0x50, 0x9b, 0xc4, 0x3c, 0x1e, 0xc5, 0x8c, 0x6a, 0xd0, 0x6d, + 0x2d, 0x90, 0x79, 0x3c, 0x9a, 0x2b, 0x64, 0x8f, 0xa8, 0x42, 0x20, 0x2f, 0x33, 0x3a, 0x4d, 0xae, + 0x24, 0x72, 0x8d, 0xe8, 0x4a, 0xf4, 0xd9, 0x6a, 0x2a, 0xfa, 0x8e, 0xea, 0xab, 0x4a, 0xa0, 0xcc, + 0x93, 0x45, 0xc2, 0xfd, 0x6a, 0x60, 0x85, 0x0e, 0x51, 0x85, 0x98, 0x4e, 0xa7, 0x53, 0x46, 0xb9, + 0xef, 0xca, 0xb6, 0xae, 0xf0, 0x37, 0xb5, 0x1e, 0xa1, 0xbf, 0x57, 0x94, 0xf1, 0x02, 0xba, 0xcf, + 0xe0, 0x34, 0x55, 0xbb, 0xcb, 0xb5, 0xea, 0x83, 0xfb, 0xd1, 0xbe, 0x58, 0x51, 0x8e, 0x1e, 0x31, + 0xb3, 0xf8, 0x15, 0x34, 0x14, 0x2e, 0x5b, 0xa6, 0x97, 0x8c, 0xa2, 0x73, 0x38, 0xcd, 0xa4, 0xa2, + 0xcc, 0xb7, 0x02, 0x3b, 0xac, 0x0f, 0x3a, 0xc7, 0x30, 0xe2, 0x9a, 0x98, 0x31, 0xfc, 0x0b, 0x1a, + 0xdf, 0xb3, 0x84, 0xd3, 0x1b, 0x29, 0x57, 0xe4, 0x89, 0x20, 0xc9, 0xf9, 0x5c, 0xca, 0x66, 0x13, + 0x71, 0xc4, 0x6b, 0xfd, 0x96, 0x91, 0x21, 0x02, 0x57, 0xad, 0x21, 0x5f, 0x2a, 0x5f, 0x56, 0x4f, + 0xa1, 0xe7, 0x87, 0x22, 0xf5, 0x0e, 0x7f, 0x90, 0xa7, 0xb2, 0x53, 0xe9, 0x0e, 0x34, 0xf5, 0xbb, + 0x4a, 0x26, 0xfc, 0x1a, 0x9a, 0x43, 0x3a, 0xa7, 0x37, 0x60, 0x8d, 0x7f, 0x18, 0x88, 0x72, 0x4f, + 0x5f, 0x1c, 0xae, 0x7b, 0x76, 0xb8, 0xee, 0xde, 0x12, 0xbb, 0x7d, 0xdb, 0xd0, 0x32, 0xd8, 0x7a, + 0x61, 0x91, 0xef, 0x4f, 0x09, 0xe3, 0xb7, 0x95, 0x6f, 0xaf, 0x24, 0xdf, 0xde, 0x7f, 0xe6, 0x7b, + 0xa8, 0xd6, 0x33, 0x5a, 0xe4, 0xd2, 0x6c, 0x15, 0xa7, 0x39, 0x47, 0x66, 0xc7, 0x3b, 0x84, 0x86, + 0x42, 0xd1, 0x69, 0x46, 0xe0, 0x5c, 0xd0, 0x8d, 0x50, 0xcf, 0x0e, 0x3d, 0x22, 0xcf, 0x1f, 0x9d, + 0x9a, 0xd5, 0x3e, 0xc1, 0x08, 0xda, 0x43, 0xcd, 0x97, 0xe9, 0x47, 0xf1, 0x13, 0xb8, 0x9b, 0xeb, + 0x69, 0x88, 0x1e, 0x78, 0x46, 0x18, 0xf5, 0x49, 0x78, 0x64, 0xd7, 0xc0, 0x8f, 0xa0, 0xf9, 0x55, + 0xa8, 0x63, 0x30, 0xae, 0xd3, 0x15, 0x87, 0xd0, 0x32, 0xc3, 0x1a, 0xbc, 0x03, 0xae, 0x14, 0xd7, + 0x20, 0xeb, 0x6a, 0xf0, 0xd7, 0x86, 0xea, 0x17, 0x41, 0x13, 0xbd, 0x01, 0x47, 0x7c, 0x9f, 0xa8, + 0xf0, 0x6b, 0xd6, 0x8f, 0x76, 0x7b, 0xc5, 0x97, 0xda, 0xfa, 0x0a, 0x7a, 0x07, 0x55, 0x19, 0x5f, + 0x54, 0x1c, 0x77, 0x03, 0x73, 0x56, 0x72, 0xbb, 0xc5, 0xf9, 0x00, 0xae, 0x8a, 0x15, 0x2a, 0x09, + 0xa2, 0x41, 0x7a, 0x50, 0x76, 0xbd, 0x85, 0x7a, 0x0b, 0x8e, 0x70, 0x0a, 0x15, 0xfa, 0x5a, 0xca, + 0x2b, 0x6f, 0x2e, 0xae, 0x9c, 0x5b, 0x88, 0x80, 0xb7, 0xb5, 0x0c, 0x05, 0x47, 0xaf, 0x1e, 0x38, + 0xdc, 0x7d, 0x78, 0xcd, 0x44, 0x9e, 0xa5, 0xb2, 0xe9, 0x98, 0xe5, 0x9e, 0xd7, 0xc7, 0x2c, 0xf7, + 0xdd, 0xc5, 0x95, 0x91, 0x2b, 0xff, 0xb6, 0x9e, 0xfe, 0x0b, 0x00, 0x00, 0xff, 0xff, 0xab, 0xcb, + 0x6e, 0xac, 0xf3, 0x06, 0x00, 0x00, } diff --git a/store/service/proto/store.pb.micro.go b/store/service/proto/store.pb.micro.go index 0e622f09..668a5a87 100644 --- a/store/service/proto/store.pb.micro.go +++ b/store/service/proto/store.pb.micro.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-micro. DO NOT EDIT. -// source: store/service/proto/store.proto +// source: github.com/micro/go-micro/store/service/proto/store.proto package go_micro_store diff --git a/store/service/proto/store.proto b/store/service/proto/store.proto index 460cf049..22056ef9 100644 --- a/store/service/proto/store.proto +++ b/store/service/proto/store.proto @@ -21,10 +21,12 @@ message Record { } message ReadOptions { - bool prefix = 1; - bool suffix = 2; - uint64 limit = 3; - uint64 offset = 4; + string database = 1; + string table = 2; + bool prefix = 3; + bool suffix = 4; + uint64 limit = 5; + uint64 offset = 6; } message ReadRequest { @@ -37,10 +39,12 @@ message ReadResponse { } message WriteOptions { + string database = 1; + string table = 2; // time.Time - int64 expiry = 1; + int64 expiry = 3; // time.Duration - int64 ttl = 2; + int64 ttl = 4; } message WriteRequest { @@ -50,7 +54,10 @@ message WriteRequest { message WriteResponse {} -message DeleteOptions {} +message DeleteOptions { + string database = 1; + string table = 2; +} message DeleteRequest { string key = 1; @@ -60,12 +67,15 @@ message DeleteRequest { message DeleteResponse {} message ListOptions { - string prefix = 1; - string suffix = 2; - uint64 limit = 3; - uint64 offset = 4; + string database = 1; + string table = 2; + string prefix = 3; + string suffix = 4; + uint64 limit = 5; + uint64 offset = 6; } + message ListRequest { ListOptions options = 1; } diff --git a/store/service/service.go b/store/service/service.go index f812082b..45775eb1 100644 --- a/store/service/service.go +++ b/store/service/service.go @@ -46,23 +46,27 @@ func (s *serviceStore) Init(opts ...store.Option) error { func (s *serviceStore) Context() context.Context { ctx := context.Background() - md := make(metadata.Metadata) - - if len(s.Database) > 0 { - md["Micro-Database"] = s.Database - } - - if len(s.Table) > 0 { - md["Micro-Table"] = s.Table - } - return metadata.NewContext(ctx, md) } // Sync all the known records func (s *serviceStore) List(opts ...store.ListOption) ([]string, error) { - stream, err := s.Client.List(s.Context(), &pb.ListRequest{}, client.WithAddress(s.Nodes...)) + var options store.ListOptions + for _, o := range opts { + o(&options) + } + + listOpts := &pb.ListOptions{ + Database: options.Database, + Table: options.Table, + Prefix: options.Prefix, + Suffix: options.Suffix, + Limit: uint64(options.Limit), + Offset: uint64(options.Offset), + } + + stream, err := s.Client.List(s.Context(), &pb.ListRequest{Options: listOpts}, client.WithAddress(s.Nodes...)) if err != nil && errors.Equal(err, errors.NotFound("", "")) { return nil, store.ErrNotFound } else if err != nil { @@ -96,11 +100,18 @@ func (s *serviceStore) Read(key string, opts ...store.ReadOption) ([]*store.Reco o(&options) } + readOpts := &pb.ReadOptions{ + Database: options.Database, + Table: options.Table, + Prefix: options.Prefix, + Suffix: options.Suffix, + Limit: uint64(options.Limit), + Offset: uint64(options.Offset), + } + rsp, err := s.Client.Read(s.Context(), &pb.ReadRequest{ - Key: key, - Options: &pb.ReadOptions{ - Prefix: options.Prefix, - }, + Key: key, + Options: readOpts, }, client.WithAddress(s.Nodes...)) if err != nil && errors.Equal(err, errors.NotFound("", "")) { return nil, store.ErrNotFound @@ -123,13 +134,23 @@ func (s *serviceStore) Read(key string, opts ...store.ReadOption) ([]*store.Reco // Write a record func (s *serviceStore) Write(record *store.Record, opts ...store.WriteOption) error { + var options store.WriteOptions + for _, o := range opts { + o(&options) + } + + writeOpts := &pb.WriteOptions{ + Database: options.Database, + Table: options.Table, + } + _, err := s.Client.Write(s.Context(), &pb.WriteRequest{ Record: &pb.Record{ Key: record.Key, Value: record.Value, Expiry: int64(record.Expiry.Seconds()), }, - }, client.WithAddress(s.Nodes...)) + Options: writeOpts}, client.WithAddress(s.Nodes...)) if err != nil && errors.Equal(err, errors.NotFound("", "")) { return store.ErrNotFound } @@ -139,8 +160,19 @@ func (s *serviceStore) Write(record *store.Record, opts ...store.WriteOption) er // Delete a record with key func (s *serviceStore) Delete(key string, opts ...store.DeleteOption) error { + var options store.DeleteOptions + for _, o := range opts { + o(&options) + } + + deleteOpts := &pb.DeleteOptions{ + Database: options.Database, + Table: options.Table, + } + _, err := s.Client.Delete(s.Context(), &pb.DeleteRequest{ - Key: key, + Key: key, + Options: deleteOpts, }, client.WithAddress(s.Nodes...)) if err != nil && errors.Equal(err, errors.NotFound("", "")) { return store.ErrNotFound