Move store to data/store

This commit is contained in:
Asim Aslam
2019-06-19 22:04:13 +01:00
parent a8042adac1
commit 3f910038a3
7 changed files with 6 additions and 6 deletions

View File

@@ -0,0 +1,96 @@
// Package consul is a consul implementation of kv
package consul
import (
"fmt"
"net"
"github.com/hashicorp/consul/api"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/data/store"
)
type ckv struct {
options.Options
client *api.Client
}
func (c *ckv) Read(key string) (*store.Record, error) {
keyval, _, err := c.client.KV().Get(key, nil)
if err != nil {
return nil, err
}
if keyval == nil {
return nil, store.ErrNotFound
}
return &store.Record{
Key: keyval.Key,
Value: keyval.Value,
}, nil
}
func (c *ckv) Delete(key string) error {
_, err := c.client.KV().Delete(key, nil)
return err
}
func (c *ckv) Write(record *store.Record) error {
_, err := c.client.KV().Put(&api.KVPair{
Key: record.Key,
Value: record.Value,
}, nil)
return err
}
func (c *ckv) Dump() ([]*store.Record, error) {
keyval, _, err := c.client.KV().List("/", nil)
if err != nil {
return nil, err
}
if keyval == nil {
return nil, store.ErrNotFound
}
var vals []*store.Record
for _, keyv := range keyval {
vals = append(vals, &store.Record{
Key: keyv.Key,
Value: keyv.Value,
})
}
return vals, nil
}
func (c *ckv) String() string {
return "consul"
}
func NewStore(opts ...options.Option) store.Store {
options := options.NewOptions(opts...)
config := api.DefaultConfig()
var nodes []string
if n, ok := options.Values().Get("store.nodes"); ok {
nodes = n.([]string)
}
// set host
if len(nodes) > 0 {
addr, port, err := net.SplitHostPort(nodes[0])
if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" {
port = "8500"
config.Address = fmt.Sprintf("%s:%s", nodes[0], port)
} else if err == nil {
config.Address = fmt.Sprintf("%s:%s", addr, port)
}
}
client, _ := api.NewClient(config)
return &ckv{
Options: options,
client: client,
}
}

View File

@@ -0,0 +1,97 @@
// Package memory is a in-memory store store
package memory
import (
"sync"
"time"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/data/store"
)
type memoryStore struct {
options.Options
sync.RWMutex
values map[string]*memoryRecord
}
type memoryRecord struct {
r *store.Record
c time.Time
}
func (m *memoryStore) Dump() ([]*store.Record, error) {
m.RLock()
defer m.RUnlock()
var values []*store.Record
for _, v := range m.values {
// get expiry
d := v.r.Expiry
t := time.Since(v.c)
// expired
if d > time.Duration(0) && t > d {
continue
}
values = append(values, v.r)
}
return values, nil
}
func (m *memoryStore) Read(key string) (*store.Record, error) {
m.RLock()
defer m.RUnlock()
v, ok := m.values[key]
if !ok {
return nil, store.ErrNotFound
}
// get expiry
d := v.r.Expiry
t := time.Since(v.c)
// expired
if d > time.Duration(0) && t > d {
return nil, store.ErrNotFound
}
return v.r, nil
}
func (m *memoryStore) Write(r *store.Record) error {
m.Lock()
defer m.Unlock()
// set the record
m.values[r.Key] = &memoryRecord{
r: r,
c: time.Now(),
}
return nil
}
func (m *memoryStore) Delete(key string) error {
m.Lock()
defer m.Unlock()
// delete the value
delete(m.values, key)
return nil
}
// NewStore returns a new store.Store
func NewStore(opts ...options.Option) store.Store {
options := options.NewOptions(opts...)
return &memoryStore{
Options: options,
values: make(map[string]*memoryRecord),
}
}

15
data/store/options.go Normal file
View File

@@ -0,0 +1,15 @@
package store
import (
"github.com/micro/go-micro/config/options"
)
// Set the nodes used to back the store
func Nodes(a ...string) options.Option {
return options.WithValue("store.nodes", a)
}
// Prefix sets a prefix to any key ids used
func Prefix(p string) options.Option {
return options.WithValue("store.prefix", p)
}

34
data/store/store.go Normal file
View File

@@ -0,0 +1,34 @@
// Package store is an interface for distribute data storage.
package store
import (
"errors"
"time"
"github.com/micro/go-micro/config/options"
)
var (
ErrNotFound = errors.New("not found")
)
// Store is a data storage interface
type Store interface {
// embed options
options.Options
// Dump the known records
Dump() ([]*Record, error)
// Read a record with key
Read(key string) (*Record, error)
// Write a record
Write(r *Record) error
// Delete a record with key
Delete(key string) error
}
// Record represents a data record
type Record struct {
Key string
Value []byte
Expiry time.Duration
}