Merge pull request #844 from micro/store

Store service implementation
This commit is contained in:
Asim Aslam 2019-10-11 14:49:44 +01:00 committed by GitHub
commit 9bfe4d9bf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1167 additions and 45 deletions

View File

@ -6,6 +6,7 @@ import (
"log" "log"
client "github.com/coreos/etcd/clientv3" client "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/store" "github.com/micro/go-micro/store"
) )
@ -15,7 +16,10 @@ type ekv struct {
kv client.KV kv client.KV
} }
func (e *ekv) Read(key string) (*store.Record, error) { func (e *ekv) Read(keys ...string) ([]*store.Record, error) {
var values []*mvccpb.KeyValue
for _, key := range keys {
keyval, err := e.kv.Get(context.Background(), key) keyval, err := e.kv.Get(context.Background(), key)
if err != nil { if err != nil {
return nil, err return nil, err
@ -25,20 +29,43 @@ func (e *ekv) Read(key string) (*store.Record, error) {
return nil, store.ErrNotFound return nil, store.ErrNotFound
} }
return &store.Record{ values = append(values, keyval.Kvs...)
Key: string(keyval.Kvs[0].Key), }
Value: keyval.Kvs[0].Value,
}, nil var records []*store.Record
for _, kv := range values {
records = append(records, &store.Record{
Key: string(kv.Key),
Value: kv.Value,
// TODO: implement expiry
})
}
return records, nil
} }
func (e *ekv) Delete(key string) error { func (e *ekv) Delete(keys ...string) error {
var gerr error
for _, key := range keys {
_, err := e.kv.Delete(context.Background(), key) _, err := e.kv.Delete(context.Background(), key)
return err if err != nil {
gerr = err
}
}
return gerr
} }
func (e *ekv) Write(record *store.Record) error { func (e *ekv) Write(records ...*store.Record) error {
var gerr error
for _, record := range records {
// TODO create lease to expire keys
_, err := e.kv.Put(context.Background(), record.Key, string(record.Value)) _, err := e.kv.Put(context.Background(), record.Key, string(record.Value))
return err if err != nil {
gerr = err
}
}
return gerr
} }
func (e *ekv) Sync() ([]*store.Record, error) { func (e *ekv) Sync() ([]*store.Record, error) {

View File

@ -48,10 +48,13 @@ func (m *memoryStore) Sync() ([]*store.Record, error) {
return values, nil return values, nil
} }
func (m *memoryStore) Read(key string) (*store.Record, error) { func (m *memoryStore) Read(keys ...string) ([]*store.Record, error) {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
var records []*store.Record
for _, key := range keys {
v, ok := m.values[key] v, ok := m.values[key]
if !ok { if !ok {
return nil, store.ErrNotFound return nil, store.ErrNotFound
@ -71,28 +74,35 @@ func (m *memoryStore) Read(key string) (*store.Record, error) {
v.c = time.Now() v.c = time.Now()
} }
return v.r, nil records = append(records, v.r)
}
return records, nil
} }
func (m *memoryStore) Write(r *store.Record) error { func (m *memoryStore) Write(records ...*store.Record) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
for _, r := range records {
// set the record // set the record
m.values[r.Key] = &memoryRecord{ m.values[r.Key] = &memoryRecord{
r: r, r: r,
c: time.Now(), c: time.Now(),
} }
}
return nil return nil
} }
func (m *memoryStore) Delete(key string) error { func (m *memoryStore) Delete(keys ...string) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
for _, key := range keys {
// delete the value // delete the value
delete(m.values, key) delete(m.values, key)
}
return nil return nil
} }

View File

@ -25,7 +25,7 @@ func TestReadRecordExpire(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if rrec.Expiry >= expire { if rrec[0].Expiry >= expire {
t.Fatal("expiry of read record is not changed") t.Fatal("expiry of read record is not changed")
} }

View File

@ -0,0 +1,89 @@
package handler
import (
"context"
"io"
"time"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/store"
pb "github.com/micro/go-micro/store/service/proto"
)
type Store struct {
Store store.Store
}
func (s *Store) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadResponse) error {
vals, err := s.Store.Read(req.Keys...)
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
for _, val := range vals {
rsp.Records = append(rsp.Records, &pb.Record{
Key: val.Key,
Value: val.Value,
Expiry: int64(val.Expiry.Seconds()),
})
}
return nil
}
func (s *Store) Write(ctx context.Context, req *pb.WriteRequest, rsp *pb.WriteResponse) error {
var records []*store.Record
for _, record := range req.Records {
records = append(records, &store.Record{
Key: record.Key,
Value: record.Value,
Expiry: time.Duration(record.Expiry) * time.Second,
})
}
err := s.Store.Write(records...)
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
return nil
}
func (s *Store) Delete(ctx context.Context, req *pb.DeleteRequest, rsp *pb.DeleteResponse) error {
err := s.Store.Delete(req.Keys...)
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
return nil
}
func (s *Store) Sync(ctx context.Context, req *pb.SyncRequest, stream pb.Store_SyncStream) error {
var vals []*store.Record
var err error
if len(req.Key) > 0 {
vals, err = s.Store.Read(req.Key)
} else {
vals, err = s.Store.Sync()
}
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
rsp := new(pb.SyncResponse)
// TODO: batch sync
for _, val := range vals {
rsp.Records = append(rsp.Records, &pb.Record{
Key: val.Key,
Value: val.Value,
Expiry: int64(val.Expiry.Seconds()),
})
}
err = stream.Send(rsp)
if err == io.EOF {
return nil
}
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
return nil
}

View File

@ -0,0 +1,207 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: micro/go-micro/store/service/proto/store.proto
package go_micro_store
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
import (
context "context"
client "github.com/micro/go-micro/client"
server "github.com/micro/go-micro/server"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ client.Option
var _ server.Option
// Client API for Store service
type StoreService interface {
Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error)
Write(ctx context.Context, in *WriteRequest, opts ...client.CallOption) (*WriteResponse, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error)
Sync(ctx context.Context, in *SyncRequest, opts ...client.CallOption) (Store_SyncService, error)
}
type storeService struct {
c client.Client
name string
}
func NewStoreService(name string, c client.Client) StoreService {
if c == nil {
c = client.NewClient()
}
if len(name) == 0 {
name = "go.micro.store"
}
return &storeService{
c: c,
name: name,
}
}
func (c *storeService) Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) {
req := c.c.NewRequest(c.name, "Store.Read", in)
out := new(ReadResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeService) Write(ctx context.Context, in *WriteRequest, opts ...client.CallOption) (*WriteResponse, error) {
req := c.c.NewRequest(c.name, "Store.Write", in)
out := new(WriteResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeService) Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error) {
req := c.c.NewRequest(c.name, "Store.Delete", in)
out := new(DeleteResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeService) Sync(ctx context.Context, in *SyncRequest, opts ...client.CallOption) (Store_SyncService, error) {
req := c.c.NewRequest(c.name, "Store.Sync", &SyncRequest{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
if err := stream.Send(in); err != nil {
return nil, err
}
return &storeServiceSync{stream}, nil
}
type Store_SyncService interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Recv() (*SyncResponse, error)
}
type storeServiceSync struct {
stream client.Stream
}
func (x *storeServiceSync) Close() error {
return x.stream.Close()
}
func (x *storeServiceSync) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *storeServiceSync) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *storeServiceSync) Recv() (*SyncResponse, error) {
m := new(SyncResponse)
err := x.stream.Recv(m)
if err != nil {
return nil, err
}
return m, nil
}
// Server API for Store service
type StoreHandler interface {
Read(context.Context, *ReadRequest, *ReadResponse) error
Write(context.Context, *WriteRequest, *WriteResponse) error
Delete(context.Context, *DeleteRequest, *DeleteResponse) error
Sync(context.Context, *SyncRequest, Store_SyncStream) error
}
func RegisterStoreHandler(s server.Server, hdlr StoreHandler, opts ...server.HandlerOption) error {
type store interface {
Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error
Write(ctx context.Context, in *WriteRequest, out *WriteResponse) error
Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error
Sync(ctx context.Context, stream server.Stream) error
}
type Store struct {
store
}
h := &storeHandler{hdlr}
return s.Handle(s.NewHandler(&Store{h}, opts...))
}
type storeHandler struct {
StoreHandler
}
func (h *storeHandler) Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error {
return h.StoreHandler.Read(ctx, in, out)
}
func (h *storeHandler) Write(ctx context.Context, in *WriteRequest, out *WriteResponse) error {
return h.StoreHandler.Write(ctx, in, out)
}
func (h *storeHandler) Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error {
return h.StoreHandler.Delete(ctx, in, out)
}
func (h *storeHandler) Sync(ctx context.Context, stream server.Stream) error {
m := new(SyncRequest)
if err := stream.Recv(m); err != nil {
return err
}
return h.StoreHandler.Sync(ctx, m, &storeSyncStream{stream})
}
type Store_SyncStream interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*SyncResponse) error
}
type storeSyncStream struct {
stream server.Stream
}
func (x *storeSyncStream) Close() error {
return x.stream.Close()
}
func (x *storeSyncStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *storeSyncStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *storeSyncStream) Send(m *SyncResponse) error {
return x.stream.Send(m)
}

View File

@ -0,0 +1,618 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: micro/go-micro/store/service/proto/store.proto
package go_micro_store
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type Record struct {
// key of the record
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
// value in the record
Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
// timestamp in unix seconds
Expiry int64 `protobuf:"varint,3,opt,name=expiry,proto3" json:"expiry,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Record) Reset() { *m = Record{} }
func (m *Record) String() string { return proto.CompactTextString(m) }
func (*Record) ProtoMessage() {}
func (*Record) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{0}
}
func (m *Record) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Record.Unmarshal(m, b)
}
func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Record.Marshal(b, m, deterministic)
}
func (m *Record) XXX_Merge(src proto.Message) {
xxx_messageInfo_Record.Merge(m, src)
}
func (m *Record) XXX_Size() int {
return xxx_messageInfo_Record.Size(m)
}
func (m *Record) XXX_DiscardUnknown() {
xxx_messageInfo_Record.DiscardUnknown(m)
}
var xxx_messageInfo_Record proto.InternalMessageInfo
func (m *Record) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
func (m *Record) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *Record) GetExpiry() int64 {
if m != nil {
return m.Expiry
}
return 0
}
type ReadRequest struct {
Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{1}
}
func (m *ReadRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReadRequest.Unmarshal(m, b)
}
func (m *ReadRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ReadRequest.Marshal(b, m, deterministic)
}
func (m *ReadRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReadRequest.Merge(m, src)
}
func (m *ReadRequest) XXX_Size() int {
return xxx_messageInfo_ReadRequest.Size(m)
}
func (m *ReadRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ReadRequest.DiscardUnknown(m)
}
var xxx_messageInfo_ReadRequest proto.InternalMessageInfo
func (m *ReadRequest) GetKeys() []string {
if m != nil {
return m.Keys
}
return nil
}
type ReadResponse struct {
Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{2}
}
func (m *ReadResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReadResponse.Unmarshal(m, b)
}
func (m *ReadResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ReadResponse.Marshal(b, m, deterministic)
}
func (m *ReadResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReadResponse.Merge(m, src)
}
func (m *ReadResponse) XXX_Size() int {
return xxx_messageInfo_ReadResponse.Size(m)
}
func (m *ReadResponse) XXX_DiscardUnknown() {
xxx_messageInfo_ReadResponse.DiscardUnknown(m)
}
var xxx_messageInfo_ReadResponse proto.InternalMessageInfo
func (m *ReadResponse) GetRecords() []*Record {
if m != nil {
return m.Records
}
return nil
}
type WriteRequest struct {
Records []*Record `protobuf:"bytes,2,rep,name=records,proto3" json:"records,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *WriteRequest) Reset() { *m = WriteRequest{} }
func (m *WriteRequest) String() string { return proto.CompactTextString(m) }
func (*WriteRequest) ProtoMessage() {}
func (*WriteRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{3}
}
func (m *WriteRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_WriteRequest.Unmarshal(m, b)
}
func (m *WriteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_WriteRequest.Marshal(b, m, deterministic)
}
func (m *WriteRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_WriteRequest.Merge(m, src)
}
func (m *WriteRequest) XXX_Size() int {
return xxx_messageInfo_WriteRequest.Size(m)
}
func (m *WriteRequest) XXX_DiscardUnknown() {
xxx_messageInfo_WriteRequest.DiscardUnknown(m)
}
var xxx_messageInfo_WriteRequest proto.InternalMessageInfo
func (m *WriteRequest) GetRecords() []*Record {
if m != nil {
return m.Records
}
return nil
}
type WriteResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *WriteResponse) Reset() { *m = WriteResponse{} }
func (m *WriteResponse) String() string { return proto.CompactTextString(m) }
func (*WriteResponse) ProtoMessage() {}
func (*WriteResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{4}
}
func (m *WriteResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_WriteResponse.Unmarshal(m, b)
}
func (m *WriteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_WriteResponse.Marshal(b, m, deterministic)
}
func (m *WriteResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_WriteResponse.Merge(m, src)
}
func (m *WriteResponse) XXX_Size() int {
return xxx_messageInfo_WriteResponse.Size(m)
}
func (m *WriteResponse) XXX_DiscardUnknown() {
xxx_messageInfo_WriteResponse.DiscardUnknown(m)
}
var xxx_messageInfo_WriteResponse proto.InternalMessageInfo
type DeleteRequest struct {
Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *DeleteRequest) Reset() { *m = DeleteRequest{} }
func (m *DeleteRequest) String() string { return proto.CompactTextString(m) }
func (*DeleteRequest) ProtoMessage() {}
func (*DeleteRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{5}
}
func (m *DeleteRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DeleteRequest.Unmarshal(m, b)
}
func (m *DeleteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_DeleteRequest.Marshal(b, m, deterministic)
}
func (m *DeleteRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_DeleteRequest.Merge(m, src)
}
func (m *DeleteRequest) XXX_Size() int {
return xxx_messageInfo_DeleteRequest.Size(m)
}
func (m *DeleteRequest) XXX_DiscardUnknown() {
xxx_messageInfo_DeleteRequest.DiscardUnknown(m)
}
var xxx_messageInfo_DeleteRequest proto.InternalMessageInfo
func (m *DeleteRequest) GetKeys() []string {
if m != nil {
return m.Keys
}
return nil
}
type DeleteResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *DeleteResponse) Reset() { *m = DeleteResponse{} }
func (m *DeleteResponse) String() string { return proto.CompactTextString(m) }
func (*DeleteResponse) ProtoMessage() {}
func (*DeleteResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{6}
}
func (m *DeleteResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DeleteResponse.Unmarshal(m, b)
}
func (m *DeleteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_DeleteResponse.Marshal(b, m, deterministic)
}
func (m *DeleteResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_DeleteResponse.Merge(m, src)
}
func (m *DeleteResponse) XXX_Size() int {
return xxx_messageInfo_DeleteResponse.Size(m)
}
func (m *DeleteResponse) XXX_DiscardUnknown() {
xxx_messageInfo_DeleteResponse.DiscardUnknown(m)
}
var xxx_messageInfo_DeleteResponse proto.InternalMessageInfo
type SyncRequest struct {
// optional key
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SyncRequest) Reset() { *m = SyncRequest{} }
func (m *SyncRequest) String() string { return proto.CompactTextString(m) }
func (*SyncRequest) ProtoMessage() {}
func (*SyncRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{7}
}
func (m *SyncRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SyncRequest.Unmarshal(m, b)
}
func (m *SyncRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SyncRequest.Marshal(b, m, deterministic)
}
func (m *SyncRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_SyncRequest.Merge(m, src)
}
func (m *SyncRequest) XXX_Size() int {
return xxx_messageInfo_SyncRequest.Size(m)
}
func (m *SyncRequest) XXX_DiscardUnknown() {
xxx_messageInfo_SyncRequest.DiscardUnknown(m)
}
var xxx_messageInfo_SyncRequest proto.InternalMessageInfo
func (m *SyncRequest) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
type SyncResponse struct {
Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SyncResponse) Reset() { *m = SyncResponse{} }
func (m *SyncResponse) String() string { return proto.CompactTextString(m) }
func (*SyncResponse) ProtoMessage() {}
func (*SyncResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{8}
}
func (m *SyncResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SyncResponse.Unmarshal(m, b)
}
func (m *SyncResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SyncResponse.Marshal(b, m, deterministic)
}
func (m *SyncResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_SyncResponse.Merge(m, src)
}
func (m *SyncResponse) XXX_Size() int {
return xxx_messageInfo_SyncResponse.Size(m)
}
func (m *SyncResponse) XXX_DiscardUnknown() {
xxx_messageInfo_SyncResponse.DiscardUnknown(m)
}
var xxx_messageInfo_SyncResponse proto.InternalMessageInfo
func (m *SyncResponse) GetRecords() []*Record {
if m != nil {
return m.Records
}
return nil
}
func init() {
proto.RegisterType((*Record)(nil), "go.micro.store.Record")
proto.RegisterType((*ReadRequest)(nil), "go.micro.store.ReadRequest")
proto.RegisterType((*ReadResponse)(nil), "go.micro.store.ReadResponse")
proto.RegisterType((*WriteRequest)(nil), "go.micro.store.WriteRequest")
proto.RegisterType((*WriteResponse)(nil), "go.micro.store.WriteResponse")
proto.RegisterType((*DeleteRequest)(nil), "go.micro.store.DeleteRequest")
proto.RegisterType((*DeleteResponse)(nil), "go.micro.store.DeleteResponse")
proto.RegisterType((*SyncRequest)(nil), "go.micro.store.SyncRequest")
proto.RegisterType((*SyncResponse)(nil), "go.micro.store.SyncResponse")
}
func init() {
proto.RegisterFile("micro/go-micro/store/service/proto/store.proto", fileDescriptor_f84ccc98e143ed3e)
}
var fileDescriptor_f84ccc98e143ed3e = []byte{
// 333 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xcb, 0x6e, 0xf2, 0x30,
0x10, 0x85, 0x09, 0x81, 0xfc, 0x62, 0xb8, 0xfc, 0x68, 0x54, 0xa1, 0x88, 0xde, 0xd2, 0x74, 0x93,
0x4d, 0x03, 0xa2, 0x2f, 0x50, 0xa9, 0x17, 0xb5, 0x5b, 0xb3, 0xe8, 0x9a, 0x86, 0x11, 0x8a, 0xa0,
0x98, 0x3a, 0x01, 0x35, 0x2f, 0xd4, 0xe7, 0xac, 0x6c, 0x27, 0x69, 0x90, 0x41, 0xaa, 0xba, 0x1b,
0x7b, 0xce, 0x1c, 0x9f, 0xf9, 0x64, 0x08, 0xdf, 0xe3, 0x48, 0xf0, 0xd1, 0x82, 0xdf, 0xe8, 0x22,
0x49, 0xb9, 0xa0, 0x51, 0x42, 0x62, 0x17, 0x47, 0x34, 0xda, 0x08, 0x9e, 0xe6, 0x77, 0xa1, 0xaa,
0xb1, 0xb7, 0xe0, 0x7a, 0x24, 0x54, 0xb7, 0xfe, 0x33, 0x38, 0x8c, 0x22, 0x2e, 0xe6, 0xd8, 0x07,
0x7b, 0x49, 0x99, 0x6b, 0x79, 0x56, 0xd0, 0x62, 0xb2, 0xc4, 0x13, 0x68, 0xee, 0x66, 0xab, 0x2d,
0xb9, 0x75, 0xcf, 0x0a, 0x3a, 0x4c, 0x1f, 0x70, 0x00, 0x0e, 0x7d, 0x6e, 0x62, 0x91, 0xb9, 0xb6,
0x67, 0x05, 0x36, 0xcb, 0x4f, 0xfe, 0x15, 0xb4, 0x19, 0xcd, 0xe6, 0x8c, 0x3e, 0xb6, 0x94, 0xa4,
0x88, 0xd0, 0x58, 0x52, 0x96, 0xb8, 0x96, 0x67, 0x07, 0x2d, 0xa6, 0x6a, 0xff, 0x0e, 0x3a, 0x5a,
0x92, 0x6c, 0xf8, 0x3a, 0x21, 0x1c, 0xc3, 0x3f, 0xa1, 0x1e, 0xd7, 0xb2, 0xf6, 0x64, 0x10, 0xee,
0xc7, 0x0b, 0x75, 0x36, 0x56, 0xc8, 0xa4, 0xc3, 0xab, 0x88, 0x53, 0x2a, 0x5e, 0xa9, 0x38, 0xd4,
0x7f, 0xe7, 0xf0, 0x1f, 0xba, 0xb9, 0x83, 0x0e, 0xe1, 0x5f, 0x43, 0xf7, 0x81, 0x56, 0xf4, 0xe3,
0x79, 0x28, 0x79, 0x1f, 0x7a, 0x85, 0x28, 0x1f, 0xbb, 0x84, 0xf6, 0x34, 0x5b, 0x47, 0xc5, 0x90,
0x41, 0x4f, 0x46, 0xd5, 0x82, 0xbf, 0x2e, 0x3b, 0xf9, 0xaa, 0x43, 0x73, 0x2a, 0x3b, 0x78, 0x0f,
0x0d, 0x09, 0x0e, 0x4f, 0xcd, 0x91, 0x92, 0xf8, 0xf0, 0xec, 0x70, 0x33, 0xcf, 0x5b, 0xc3, 0x27,
0x68, 0xaa, 0xcd, 0xd1, 0x10, 0x56, 0x91, 0x0e, 0xcf, 0x8f, 0x74, 0x4b, 0x9f, 0x17, 0x70, 0x34,
0x0b, 0x34, 0xa4, 0x7b, 0x20, 0x87, 0x17, 0xc7, 0xda, 0xa5, 0xd5, 0x23, 0x34, 0x24, 0x23, 0x73,
0xaf, 0x0a, 0x5a, 0x73, 0xaf, 0x2a, 0x56, 0xbf, 0x36, 0xb6, 0xde, 0x1c, 0xf5, 0xb7, 0x6f, 0xbf,
0x03, 0x00, 0x00, 0xff, 0xff, 0x30, 0xc8, 0x99, 0x52, 0x0d, 0x03, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// StoreClient is the client API for Store service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StoreClient interface {
Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error)
Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error)
Sync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (Store_SyncClient, error)
}
type storeClient struct {
cc *grpc.ClientConn
}
func NewStoreClient(cc *grpc.ClientConn) StoreClient {
return &storeClient{cc}
}
func (c *storeClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) {
out := new(ReadResponse)
err := c.cc.Invoke(ctx, "/go.micro.store.Store/Read", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) {
out := new(WriteResponse)
err := c.cc.Invoke(ctx, "/go.micro.store.Store/Write", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) {
out := new(DeleteResponse)
err := c.cc.Invoke(ctx, "/go.micro.store.Store/Delete", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeClient) Sync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (Store_SyncClient, error) {
stream, err := c.cc.NewStream(ctx, &_Store_serviceDesc.Streams[0], "/go.micro.store.Store/Sync", opts...)
if err != nil {
return nil, err
}
x := &storeSyncClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Store_SyncClient interface {
Recv() (*SyncResponse, error)
grpc.ClientStream
}
type storeSyncClient struct {
grpc.ClientStream
}
func (x *storeSyncClient) Recv() (*SyncResponse, error) {
m := new(SyncResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// StoreServer is the server API for Store service.
type StoreServer interface {
Read(context.Context, *ReadRequest) (*ReadResponse, error)
Write(context.Context, *WriteRequest) (*WriteResponse, error)
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
Sync(*SyncRequest, Store_SyncServer) error
}
func RegisterStoreServer(s *grpc.Server, srv StoreServer) {
s.RegisterService(&_Store_serviceDesc, srv)
}
func _Store_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReadRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StoreServer).Read(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.store.Store/Read",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StoreServer).Read(ctx, req.(*ReadRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Store_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(WriteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StoreServer).Write(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.store.Store/Write",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StoreServer).Write(ctx, req.(*WriteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Store_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StoreServer).Delete(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.store.Store/Delete",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StoreServer).Delete(ctx, req.(*DeleteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Store_Sync_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SyncRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(StoreServer).Sync(m, &storeSyncServer{stream})
}
type Store_SyncServer interface {
Send(*SyncResponse) error
grpc.ServerStream
}
type storeSyncServer struct {
grpc.ServerStream
}
func (x *storeSyncServer) Send(m *SyncResponse) error {
return x.ServerStream.SendMsg(m)
}
var _Store_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.store.Store",
HandlerType: (*StoreServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Read",
Handler: _Store_Read_Handler,
},
{
MethodName: "Write",
Handler: _Store_Write_Handler,
},
{
MethodName: "Delete",
Handler: _Store_Delete_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Sync",
Handler: _Store_Sync_Handler,
ServerStreams: true,
},
},
Metadata: "micro/go-micro/store/service/proto/store.proto",
}

View File

@ -0,0 +1,48 @@
syntax = "proto3";
package go.micro.store;
service Store {
rpc Read(ReadRequest) returns (ReadResponse) {};
rpc Write(WriteRequest) returns (WriteResponse) {};
rpc Delete(DeleteRequest) returns (DeleteResponse) {};
rpc Sync(SyncRequest) returns (stream SyncResponse) {};
}
message Record {
// key of the record
string key = 1;
// value in the record
bytes value = 2;
// timestamp in unix seconds
int64 expiry = 3;
}
message ReadRequest {
repeated string keys = 1;
}
message ReadResponse {
repeated Record records = 1;
}
message WriteRequest {
repeated Record records = 2;
}
message WriteResponse {}
message DeleteRequest {
repeated string keys = 1;
}
message DeleteResponse {}
message SyncRequest {
// optional key
string key = 1;
}
message SyncResponse {
repeated Record records = 1;
}

119
store/service/service.go Normal file
View File

@ -0,0 +1,119 @@
// Package service implements the store service interface
package service
import (
"context"
"io"
"time"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/store"
pb "github.com/micro/go-micro/store/service/proto"
)
type serviceStore struct {
options.Options
// Addresses of the nodes
Nodes []string
// store service client
Client pb.StoreService
}
// Sync all the known records
func (s *serviceStore) Sync() ([]*store.Record, error) {
stream, err := s.Client.Sync(context.Background(), &pb.SyncRequest{}, client.WithAddress(s.Nodes...))
if err != nil {
return nil, err
}
defer stream.Close()
var records []*store.Record
for {
rsp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return records, err
}
for _, record := range rsp.Records {
records = append(records, &store.Record{
Key: record.Key,
Value: record.Value,
Expiry: time.Duration(record.Expiry) * time.Second,
})
}
}
return records, nil
}
// Read a record with key
func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) {
rsp, err := s.Client.Read(context.Background(), &pb.ReadRequest{
Keys: keys,
}, client.WithAddress(s.Nodes...))
if err != nil {
return nil, err
}
var records []*store.Record
for _, val := range rsp.Records {
records = append(records, &store.Record{
Key: val.Key,
Value: val.Value,
Expiry: time.Duration(val.Expiry) * time.Second,
})
}
return records, nil
}
// Write a record
func (s *serviceStore) Write(recs ...*store.Record) error {
var records []*pb.Record
for _, record := range recs {
records = append(records, &pb.Record{
Key: record.Key,
Value: record.Value,
Expiry: int64(record.Expiry.Seconds()),
})
}
_, err := s.Client.Write(context.Background(), &pb.WriteRequest{
Records: records,
}, client.WithAddress(s.Nodes...))
return err
}
// Delete a record with key
func (s *serviceStore) Delete(keys ...string) error {
_, err := s.Client.Delete(context.Background(), &pb.DeleteRequest{
Keys: keys,
}, client.WithAddress(s.Nodes...))
return err
}
// NewStore returns a new store service implementation
func NewStore(opts ...options.Option) store.Store {
options := options.NewOptions(opts...)
var nodes []string
n, ok := options.Values().Get("store.nodes")
if ok {
nodes = n.([]string)
}
service := &serviceStore{
Options: options,
Nodes: nodes,
Client: pb.NewStoreService("go.micro.store", client.DefaultClient),
}
return service
}

View File

@ -19,11 +19,11 @@ type Store interface {
// Sync all the known records // Sync all the known records
Sync() ([]*Record, error) Sync() ([]*Record, error)
// Read a record with key // Read a record with key
Read(key string) (*Record, error) Read(keys ...string) ([]*Record, error)
// Write a record // Write a record
Write(r *Record) error Write(recs ...*Record) error
// Delete a record with key // Delete a record with key
Delete(key string) error Delete(keys ...string) error
} }
// Record represents a data record // Record represents a data record

View File

@ -39,8 +39,12 @@ func (m *syncMap) Read(key, val interface{}) error {
return err return err
} }
if len(kval) == 0 {
return store.ErrNotFound
}
// decode value // decode value
return json.Unmarshal(kval.Value, val) return json.Unmarshal(kval[0].Value, val)
} }
func (m *syncMap) Write(key, val interface{}) error { func (m *syncMap) Write(key, val interface{}) error {