micro/store/service/service.go

120 lines
2.5 KiB
Go
Raw Normal View History

2019-10-11 14:44:42 +01:00
// Package service implements the store service interface
package service
import (
"context"
"io"
"time"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/store"
pb "github.com/micro/go-micro/store/service/proto"
)
type serviceStore struct {
options.Options
// Addresses of the nodes
Nodes []string
// store service client
Client pb.StoreService
}
// Sync all the known records
func (s *serviceStore) Sync() ([]*store.Record, error) {
stream, err := s.Client.Sync(context.Background(), &pb.SyncRequest{}, client.WithAddress(s.Nodes...))
if err != nil {
return nil, err
}
defer stream.Close()
var records []*store.Record
for {
rsp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return records, err
}
for _, record := range rsp.Records {
records = append(records, &store.Record{
Key: record.Key,
Value: record.Value,
Expiry: time.Duration(record.Expiry) * time.Second,
})
}
}
return records, nil
}
// Read a record with key
func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) {
rsp, err := s.Client.Read(context.Background(), &pb.ReadRequest{
Keys: keys,
}, client.WithAddress(s.Nodes...))
if err != nil {
return nil, err
}
var records []*store.Record
for _, val := range rsp.Records {
records = append(records, &store.Record{
Key: val.Key,
Value: val.Value,
Expiry: time.Duration(val.Expiry) * time.Second,
})
}
return records, nil
}
// Write a record
func (s *serviceStore) Write(recs ...*store.Record) error {
var records []*pb.Record
for _, record := range recs {
records = append(records, &pb.Record{
Key: record.Key,
Value: record.Value,
Expiry: int64(record.Expiry.Seconds()),
})
}
_, err := s.Client.Write(context.Background(), &pb.WriteRequest{
Records: records,
}, client.WithAddress(s.Nodes...))
return err
}
// Delete a record with key
func (s *serviceStore) Delete(keys ...string) error {
_, err := s.Client.Delete(context.Background(), &pb.DeleteRequest{
Keys: keys,
}, client.WithAddress(s.Nodes...))
return err
}
// NewStore returns a new store service implementation
func NewStore(opts ...options.Option) store.Store {
options := options.NewOptions(opts...)
var nodes []string
n, ok := options.Values().Get("store.nodes")
if ok {
nodes = n.([]string)
}
service := &serviceStore{
Options: options,
Nodes: nodes,
Client: pb.NewStoreService("go.micro.store", client.DefaultClient),
}
return service
}