Merge branch 'master' of https://github.com/micro/go-micro into certmagic

This commit is contained in:
Jake Sanders 2019-10-11 16:25:28 +01:00
commit 09a202ccf0
23 changed files with 1390 additions and 94 deletions

View File

@ -110,12 +110,21 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
var grr error var grr error
cc, err := g.pool.getConn(address, grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)), grpcDialOptions := []grpc.DialOption{
grpc.WithTimeout(opts.DialTimeout), g.secure(), grpc.WithDefaultCallOptions(grpc.ForceCodec(cf)),
grpc.WithTimeout(opts.DialTimeout),
g.secure(),
grpc.WithDefaultCallOptions( grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(maxRecvMsgSize), grpc.MaxCallRecvMsgSize(maxRecvMsgSize),
grpc.MaxCallSendMsgSize(maxSendMsgSize), grpc.MaxCallSendMsgSize(maxSendMsgSize),
)) ),
}
if opts := g.getGrpcDialOptions(); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
cc, err := g.pool.getConn(address, grpcDialOptions...)
if err != nil { if err != nil {
return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -127,7 +136,11 @@ func (g *grpcClient) call(ctx context.Context, node *registry.Node, req client.R
ch := make(chan error, 1) ch := make(chan error, 1)
go func() { go func() {
err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpc.CallContentSubtype(cf.Name())) grpcCallOptions := []grpc.CallOption{grpc.CallContentSubtype(cf.Name())}
if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...)
}
err := cc.Invoke(ctx, methodToGRPC(req.Service(), req.Endpoint()), req.Body(), rsp, grpcCallOptions...)
ch <- microError(err) ch <- microError(err)
}() }()
@ -175,7 +188,16 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
wc := wrapCodec{cf} wc := wrapCodec{cf}
cc, err := grpc.DialContext(dialCtx, address, grpc.WithDefaultCallOptions(grpc.ForceCodec(wc)), g.secure()) grpcDialOptions := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.ForceCodec(wc)),
g.secure(),
}
if opts := g.getGrpcDialOptions(); opts != nil {
grpcDialOptions = append(grpcDialOptions, opts...)
}
cc, err := grpc.DialContext(dialCtx, address, grpcDialOptions...)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err))
} }
@ -186,7 +208,11 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client
ServerStreams: true, ServerStreams: true,
} }
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint())) grpcCallOptions := []grpc.CallOption{}
if opts := g.getGrpcCallOptions(); opts != nil {
grpcCallOptions = append(grpcCallOptions, opts...)
}
st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...)
if err != nil { if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err))
} }
@ -514,6 +540,46 @@ func (g *grpcClient) String() string {
return "grpc" return "grpc"
} }
func (g *grpcClient) getGrpcDialOptions() []grpc.DialOption {
if g.opts.CallOptions.Context == nil {
return nil
}
v := g.opts.CallOptions.Context.Value(grpcDialOptions{})
if v == nil {
return nil
}
opts, ok := v.([]grpc.DialOption)
if !ok {
return nil
}
return opts
}
func (g *grpcClient) getGrpcCallOptions() []grpc.CallOption {
if g.opts.CallOptions.Context == nil {
return nil
}
v := g.opts.CallOptions.Context.Value(grpcCallOptions{})
if v == nil {
return nil
}
opts, ok := v.([]grpc.CallOption)
if !ok {
return nil
}
return opts
}
func newClient(opts ...client.Option) client.Client { func newClient(opts ...client.Option) client.Client {
options := client.Options{ options := client.Options{
Codecs: make(map[string]codec.NewCodec), Codecs: make(map[string]codec.NewCodec),

View File

@ -6,6 +6,7 @@ import (
"crypto/tls" "crypto/tls"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
) )
@ -23,6 +24,8 @@ type codecsKey struct{}
type tlsAuth struct{} type tlsAuth struct{}
type maxRecvMsgSizeKey struct{} type maxRecvMsgSizeKey struct{}
type maxSendMsgSizeKey struct{} type maxSendMsgSizeKey struct{}
type grpcDialOptions struct{}
type grpcCallOptions struct{}
// gRPC Codec to be used to encode/decode requests for a given content type // gRPC Codec to be used to encode/decode requests for a given content type
func Codec(contentType string, c encoding.Codec) client.Option { func Codec(contentType string, c encoding.Codec) client.Option {
@ -72,3 +75,27 @@ func MaxSendMsgSize(s int) client.Option {
o.Context = context.WithValue(o.Context, maxSendMsgSizeKey{}, s) o.Context = context.WithValue(o.Context, maxSendMsgSizeKey{}, s)
} }
} }
//
// DialOptions to be used to configure gRPC dial options
//
func DialOptions(opts ...grpc.DialOption) client.CallOption {
return func(o *client.CallOptions) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, grpcDialOptions{}, opts)
}
}
//
// CallOptions to be used to configure gRPC call options
//
func CallOptions(opts ...grpc.CallOption) client.CallOption {
return func(o *client.CallOptions) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, grpcCallOptions{}, opts)
}
}

View File

@ -470,7 +470,7 @@ func (n *network) prunePeerRoutes(peer *node) error {
// lookup all routes routable via gw // lookup all routes routable via gw
q = []router.QueryOption{ q = []router.QueryOption{
router.QueryGateway(peer.id), router.QueryGateway(peer.address),
} }
if err := n.pruneRoutes(q...); err != nil { if err := n.pruneRoutes(q...); err != nil {
return err return err
@ -497,6 +497,25 @@ func (n *network) prune() {
log.Debugf("Network failed pruning peer %s routes: %v", id, err) log.Debugf("Network failed pruning peer %s routes: %v", id, err)
} }
} }
// get a list of all routes
routes, err := n.options.Router.Table().List()
if err != nil {
log.Debugf("Network failed listing routes: %v", err)
continue
}
// collect all the router IDs in the routing table
routers := make(map[string]bool)
for _, route := range routes {
if _, ok := routers[route.Router]; !ok {
routers[route.Router] = true
// if the router is NOT in our peer graph, delete all routes originated by it
if peerNode := n.node.GetPeerNode(route.Router); peerNode == nil {
if err := n.pruneRoutes(router.QueryRouter(route.Router)); err != nil {
log.Debugf("Network failed deleting routes by %s: %v", route.Router, err)
}
}
}
}
} }
} }
} }

View File

@ -27,16 +27,14 @@ func flatten(n network.Node, visited map[string]bool) []network.Node {
visited = make(map[string]bool) visited = make(map[string]bool)
} }
// check if already visited
if visited[n.Id()] == true {
return nil
}
// create new list of nodes // create new list of nodes
var nodes []network.Node var nodes []network.Node
// check if already visited
if visited[n.Id()] == false {
// append the current node // append the current node
nodes = append(nodes, n) nodes = append(nodes, n)
}
// set to visited // set to visited
visited[n.Id()] = true visited[n.Id()] = true

View File

@ -144,7 +144,7 @@ func (c *cache) get(service string) ([]*registry.Service, error) {
} }
// reset the status // reset the status
if c.getStatus(); err != nil { if err := c.getStatus(); err != nil {
c.setStatus(nil) c.setStatus(nil)
} }

View File

@ -8,6 +8,7 @@ import (
"errors" "errors"
"net" "net"
"path" "path"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -20,7 +21,7 @@ import (
) )
var ( var (
prefix = "/micro/registry" prefix = "/micro/registry/"
) )
type etcdRegistry struct { type etcdRegistry struct {
@ -148,7 +149,7 @@ func (e *etcdRegistry) registerNode(s *registry.Service, node *registry.Node, op
defer cancel() defer cancel()
// look for the existing key // look for the existing key
rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id)) rsp, err := e.client.Get(ctx, nodePath(s.Name, node.Id), clientv3.WithSerializable())
if err != nil { if err != nil {
return err return err
} }
@ -310,7 +311,7 @@ func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) {
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel() defer cancel()
rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) rsp, err := e.client.Get(ctx, servicePath(name)+"/", clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -344,6 +345,7 @@ func (e *etcdRegistry) GetService(name string) ([]*registry.Service, error) {
for _, service := range serviceMap { for _, service := range serviceMap {
services = append(services, service) services = append(services, service)
} }
return services, nil return services, nil
} }
@ -354,7 +356,7 @@ func (e *etcdRegistry) ListServices() ([]*registry.Service, error) {
ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout) ctx, cancel := context.WithTimeout(context.Background(), e.options.Timeout)
defer cancel() defer cancel()
rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) rsp, err := e.client.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -381,6 +383,9 @@ func (e *etcdRegistry) ListServices() ([]*registry.Service, error) {
services = append(services, service) services = append(services, service)
} }
// sort the services
sort.Slice(services, func(i, j int) bool { return services[i].Name < services[j].Name })
return services, nil return services, nil
} }

View File

@ -319,23 +319,13 @@ func (r *router) advertiseTable() error {
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
// list routing table routes to announce // do full table flush
routes, err := r.table.List() events, err := r.flushRouteEvents(Update)
if err != nil { if err != nil {
return fmt.Errorf("failed listing routes: %s", err) return fmt.Errorf("failed flushing routes: %s", err)
}
// collect all the added routes before we attempt to add default gateway
events := make([]*Event, len(routes))
for i, route := range routes {
event := &Event{
Type: Update,
Timestamp: time.Now(),
Route: route,
}
events[i] = event
} }
// advertise all routes as Update events to subscribers // advertise routes to subscribers
if len(events) > 0 { if len(events) > 0 {
log.Debugf("Router flushing table with %d events: %s", len(events), r.options.Id) log.Debugf("Router flushing table with %d events: %s", len(events), r.options.Id)
r.advertWg.Add(1) r.advertWg.Add(1)
@ -692,6 +682,7 @@ func (r *router) flushRouteEvents(evType EventType) ([]*Event, error) {
return nil, fmt.Errorf("failed listing routes: %s", err) return nil, fmt.Errorf("failed listing routes: %s", err)
} }
if r.options.Advertise == AdvertiseAll {
// build a list of events to advertise // build a list of events to advertise
events := make([]*Event, len(routes)) events := make([]*Event, len(routes))
for i, route := range routes { for i, route := range routes {
@ -702,6 +693,48 @@ func (r *router) flushRouteEvents(evType EventType) ([]*Event, error) {
} }
events[i] = event events[i] = event
} }
return events, nil
}
// routeMap stores optimal routes per service
bestRoutes := make(map[string]Route)
// go through all routes found in the routing table and collapse them to optimal routes
for _, route := range routes {
routeKey := route.Service + "@" + route.Network
optimal, ok := bestRoutes[routeKey]
if !ok {
bestRoutes[routeKey] = route
continue
}
// if the current optimal route metric is higher than routing table route, replace it
if optimal.Metric > route.Metric {
bestRoutes[routeKey] = route
continue
}
// if the metrics are the same, prefer advertising your own route
if optimal.Metric == route.Metric {
if route.Router == r.options.Id {
bestRoutes[routeKey] = route
continue
}
}
}
log.Debugf("Router advertising %d best routes out of %d", len(bestRoutes), len(routes))
// build a list of events to advertise
events := make([]*Event, len(bestRoutes))
i := 0
for _, route := range bestRoutes {
event := &Event{
Type: evType,
Timestamp: time.Now(),
Route: route,
}
events[i] = event
i++
}
return events, nil return events, nil
} }

View File

@ -18,6 +18,8 @@ type Options struct {
Network string Network string
// Registry is the local registry // Registry is the local registry
Registry registry.Registry Registry registry.Registry
// Advertise is the advertising strategy
Advertise Strategy
// Client for calling router // Client for calling router
Client client.Client Client client.Client
} }
@ -64,6 +66,13 @@ func Registry(r registry.Registry) Option {
} }
} }
// Strategy sets route advertising strategy
func Advertise(a Strategy) Option {
return func(o *Options) {
o.Advertise = a
}
}
// DefaultOptions returns router default options // DefaultOptions returns router default options
func DefaultOptions() Options { func DefaultOptions() Options {
return Options{ return Options{
@ -71,5 +80,6 @@ func DefaultOptions() Options {
Address: DefaultAddress, Address: DefaultAddress,
Network: DefaultNetwork, Network: DefaultNetwork,
Registry: registry.DefaultRegistry, Registry: registry.DefaultRegistry,
Advertise: AdvertiseBest,
} }
} }

View File

@ -139,6 +139,28 @@ type Advert struct {
Events []*Event Events []*Event
} }
// Strategy is route advertisement strategy
type Strategy int
const (
// AdvertiseAll advertises all routes to the network
AdvertiseAll Strategy = iota
// AdvertiseBest advertises optimal routes to the network
AdvertiseBest
)
// String returns human readable Strategy
func (s Strategy) String() string {
switch s {
case AdvertiseAll:
return "all"
case AdvertiseBest:
return "best"
default:
return "unknown"
}
}
// NewRouter creates new Router and returns it // NewRouter creates new Router and returns it
func NewRouter(opts ...Option) Router { func NewRouter(opts ...Option) Router {
return newRouter(opts...) return newRouter(opts...)

View File

@ -6,6 +6,7 @@ import (
"log" "log"
client "github.com/coreos/etcd/clientv3" client "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/store" "github.com/micro/go-micro/store"
) )
@ -15,7 +16,10 @@ type ekv struct {
kv client.KV kv client.KV
} }
func (e *ekv) Read(key string) (*store.Record, error) { func (e *ekv) Read(keys ...string) ([]*store.Record, error) {
var values []*mvccpb.KeyValue
for _, key := range keys {
keyval, err := e.kv.Get(context.Background(), key) keyval, err := e.kv.Get(context.Background(), key)
if err != nil { if err != nil {
return nil, err return nil, err
@ -25,20 +29,43 @@ func (e *ekv) Read(key string) (*store.Record, error) {
return nil, store.ErrNotFound return nil, store.ErrNotFound
} }
return &store.Record{ values = append(values, keyval.Kvs...)
Key: string(keyval.Kvs[0].Key), }
Value: keyval.Kvs[0].Value,
}, nil var records []*store.Record
for _, kv := range values {
records = append(records, &store.Record{
Key: string(kv.Key),
Value: kv.Value,
// TODO: implement expiry
})
}
return records, nil
} }
func (e *ekv) Delete(key string) error { func (e *ekv) Delete(keys ...string) error {
var gerr error
for _, key := range keys {
_, err := e.kv.Delete(context.Background(), key) _, err := e.kv.Delete(context.Background(), key)
return err if err != nil {
gerr = err
}
}
return gerr
} }
func (e *ekv) Write(record *store.Record) error { func (e *ekv) Write(records ...*store.Record) error {
var gerr error
for _, record := range records {
// TODO create lease to expire keys
_, err := e.kv.Put(context.Background(), record.Key, string(record.Value)) _, err := e.kv.Put(context.Background(), record.Key, string(record.Value))
return err if err != nil {
gerr = err
}
}
return gerr
} }
func (e *ekv) Sync() ([]*store.Record, error) { func (e *ekv) Sync() ([]*store.Record, error) {

View File

@ -48,10 +48,13 @@ func (m *memoryStore) Sync() ([]*store.Record, error) {
return values, nil return values, nil
} }
func (m *memoryStore) Read(key string) (*store.Record, error) { func (m *memoryStore) Read(keys ...string) ([]*store.Record, error) {
m.RLock() m.RLock()
defer m.RUnlock() defer m.RUnlock()
var records []*store.Record
for _, key := range keys {
v, ok := m.values[key] v, ok := m.values[key]
if !ok { if !ok {
return nil, store.ErrNotFound return nil, store.ErrNotFound
@ -71,28 +74,35 @@ func (m *memoryStore) Read(key string) (*store.Record, error) {
v.c = time.Now() v.c = time.Now()
} }
return v.r, nil records = append(records, v.r)
}
return records, nil
} }
func (m *memoryStore) Write(r *store.Record) error { func (m *memoryStore) Write(records ...*store.Record) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
for _, r := range records {
// set the record // set the record
m.values[r.Key] = &memoryRecord{ m.values[r.Key] = &memoryRecord{
r: r, r: r,
c: time.Now(), c: time.Now(),
} }
}
return nil return nil
} }
func (m *memoryStore) Delete(key string) error { func (m *memoryStore) Delete(keys ...string) error {
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
for _, key := range keys {
// delete the value // delete the value
delete(m.values, key) delete(m.values, key)
}
return nil return nil
} }

View File

@ -25,7 +25,7 @@ func TestReadRecordExpire(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if rrec.Expiry >= expire { if rrec[0].Expiry >= expire {
t.Fatal("expiry of read record is not changed") t.Fatal("expiry of read record is not changed")
} }

View File

@ -0,0 +1,89 @@
package handler
import (
"context"
"io"
"time"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/store"
pb "github.com/micro/go-micro/store/service/proto"
)
type Store struct {
Store store.Store
}
func (s *Store) Read(ctx context.Context, req *pb.ReadRequest, rsp *pb.ReadResponse) error {
vals, err := s.Store.Read(req.Keys...)
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
for _, val := range vals {
rsp.Records = append(rsp.Records, &pb.Record{
Key: val.Key,
Value: val.Value,
Expiry: int64(val.Expiry.Seconds()),
})
}
return nil
}
func (s *Store) Write(ctx context.Context, req *pb.WriteRequest, rsp *pb.WriteResponse) error {
var records []*store.Record
for _, record := range req.Records {
records = append(records, &store.Record{
Key: record.Key,
Value: record.Value,
Expiry: time.Duration(record.Expiry) * time.Second,
})
}
err := s.Store.Write(records...)
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
return nil
}
func (s *Store) Delete(ctx context.Context, req *pb.DeleteRequest, rsp *pb.DeleteResponse) error {
err := s.Store.Delete(req.Keys...)
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
return nil
}
func (s *Store) Sync(ctx context.Context, req *pb.SyncRequest, stream pb.Store_SyncStream) error {
var vals []*store.Record
var err error
if len(req.Key) > 0 {
vals, err = s.Store.Read(req.Key)
} else {
vals, err = s.Store.Sync()
}
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
rsp := new(pb.SyncResponse)
// TODO: batch sync
for _, val := range vals {
rsp.Records = append(rsp.Records, &pb.Record{
Key: val.Key,
Value: val.Value,
Expiry: int64(val.Expiry.Seconds()),
})
}
err = stream.Send(rsp)
if err == io.EOF {
return nil
}
if err != nil {
return errors.InternalServerError("go.micro.store", err.Error())
}
return nil
}

View File

@ -0,0 +1,207 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: micro/go-micro/store/service/proto/store.proto
package go_micro_store
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
import (
context "context"
client "github.com/micro/go-micro/client"
server "github.com/micro/go-micro/server"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ client.Option
var _ server.Option
// Client API for Store service
type StoreService interface {
Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error)
Write(ctx context.Context, in *WriteRequest, opts ...client.CallOption) (*WriteResponse, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error)
Sync(ctx context.Context, in *SyncRequest, opts ...client.CallOption) (Store_SyncService, error)
}
type storeService struct {
c client.Client
name string
}
func NewStoreService(name string, c client.Client) StoreService {
if c == nil {
c = client.NewClient()
}
if len(name) == 0 {
name = "go.micro.store"
}
return &storeService{
c: c,
name: name,
}
}
func (c *storeService) Read(ctx context.Context, in *ReadRequest, opts ...client.CallOption) (*ReadResponse, error) {
req := c.c.NewRequest(c.name, "Store.Read", in)
out := new(ReadResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeService) Write(ctx context.Context, in *WriteRequest, opts ...client.CallOption) (*WriteResponse, error) {
req := c.c.NewRequest(c.name, "Store.Write", in)
out := new(WriteResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeService) Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error) {
req := c.c.NewRequest(c.name, "Store.Delete", in)
out := new(DeleteResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeService) Sync(ctx context.Context, in *SyncRequest, opts ...client.CallOption) (Store_SyncService, error) {
req := c.c.NewRequest(c.name, "Store.Sync", &SyncRequest{})
stream, err := c.c.Stream(ctx, req, opts...)
if err != nil {
return nil, err
}
if err := stream.Send(in); err != nil {
return nil, err
}
return &storeServiceSync{stream}, nil
}
type Store_SyncService interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Recv() (*SyncResponse, error)
}
type storeServiceSync struct {
stream client.Stream
}
func (x *storeServiceSync) Close() error {
return x.stream.Close()
}
func (x *storeServiceSync) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *storeServiceSync) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *storeServiceSync) Recv() (*SyncResponse, error) {
m := new(SyncResponse)
err := x.stream.Recv(m)
if err != nil {
return nil, err
}
return m, nil
}
// Server API for Store service
type StoreHandler interface {
Read(context.Context, *ReadRequest, *ReadResponse) error
Write(context.Context, *WriteRequest, *WriteResponse) error
Delete(context.Context, *DeleteRequest, *DeleteResponse) error
Sync(context.Context, *SyncRequest, Store_SyncStream) error
}
func RegisterStoreHandler(s server.Server, hdlr StoreHandler, opts ...server.HandlerOption) error {
type store interface {
Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error
Write(ctx context.Context, in *WriteRequest, out *WriteResponse) error
Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error
Sync(ctx context.Context, stream server.Stream) error
}
type Store struct {
store
}
h := &storeHandler{hdlr}
return s.Handle(s.NewHandler(&Store{h}, opts...))
}
type storeHandler struct {
StoreHandler
}
func (h *storeHandler) Read(ctx context.Context, in *ReadRequest, out *ReadResponse) error {
return h.StoreHandler.Read(ctx, in, out)
}
func (h *storeHandler) Write(ctx context.Context, in *WriteRequest, out *WriteResponse) error {
return h.StoreHandler.Write(ctx, in, out)
}
func (h *storeHandler) Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error {
return h.StoreHandler.Delete(ctx, in, out)
}
func (h *storeHandler) Sync(ctx context.Context, stream server.Stream) error {
m := new(SyncRequest)
if err := stream.Recv(m); err != nil {
return err
}
return h.StoreHandler.Sync(ctx, m, &storeSyncStream{stream})
}
type Store_SyncStream interface {
SendMsg(interface{}) error
RecvMsg(interface{}) error
Close() error
Send(*SyncResponse) error
}
type storeSyncStream struct {
stream server.Stream
}
func (x *storeSyncStream) Close() error {
return x.stream.Close()
}
func (x *storeSyncStream) SendMsg(m interface{}) error {
return x.stream.Send(m)
}
func (x *storeSyncStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m)
}
func (x *storeSyncStream) Send(m *SyncResponse) error {
return x.stream.Send(m)
}

View File

@ -0,0 +1,618 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: micro/go-micro/store/service/proto/store.proto
package go_micro_store
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type Record struct {
// key of the record
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
// value in the record
Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
// timestamp in unix seconds
Expiry int64 `protobuf:"varint,3,opt,name=expiry,proto3" json:"expiry,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Record) Reset() { *m = Record{} }
func (m *Record) String() string { return proto.CompactTextString(m) }
func (*Record) ProtoMessage() {}
func (*Record) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{0}
}
func (m *Record) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Record.Unmarshal(m, b)
}
func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Record.Marshal(b, m, deterministic)
}
func (m *Record) XXX_Merge(src proto.Message) {
xxx_messageInfo_Record.Merge(m, src)
}
func (m *Record) XXX_Size() int {
return xxx_messageInfo_Record.Size(m)
}
func (m *Record) XXX_DiscardUnknown() {
xxx_messageInfo_Record.DiscardUnknown(m)
}
var xxx_messageInfo_Record proto.InternalMessageInfo
func (m *Record) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
func (m *Record) GetValue() []byte {
if m != nil {
return m.Value
}
return nil
}
func (m *Record) GetExpiry() int64 {
if m != nil {
return m.Expiry
}
return 0
}
type ReadRequest struct {
Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{1}
}
func (m *ReadRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReadRequest.Unmarshal(m, b)
}
func (m *ReadRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ReadRequest.Marshal(b, m, deterministic)
}
func (m *ReadRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReadRequest.Merge(m, src)
}
func (m *ReadRequest) XXX_Size() int {
return xxx_messageInfo_ReadRequest.Size(m)
}
func (m *ReadRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ReadRequest.DiscardUnknown(m)
}
var xxx_messageInfo_ReadRequest proto.InternalMessageInfo
func (m *ReadRequest) GetKeys() []string {
if m != nil {
return m.Keys
}
return nil
}
type ReadResponse struct {
Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{2}
}
func (m *ReadResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ReadResponse.Unmarshal(m, b)
}
func (m *ReadResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ReadResponse.Marshal(b, m, deterministic)
}
func (m *ReadResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReadResponse.Merge(m, src)
}
func (m *ReadResponse) XXX_Size() int {
return xxx_messageInfo_ReadResponse.Size(m)
}
func (m *ReadResponse) XXX_DiscardUnknown() {
xxx_messageInfo_ReadResponse.DiscardUnknown(m)
}
var xxx_messageInfo_ReadResponse proto.InternalMessageInfo
func (m *ReadResponse) GetRecords() []*Record {
if m != nil {
return m.Records
}
return nil
}
type WriteRequest struct {
Records []*Record `protobuf:"bytes,2,rep,name=records,proto3" json:"records,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *WriteRequest) Reset() { *m = WriteRequest{} }
func (m *WriteRequest) String() string { return proto.CompactTextString(m) }
func (*WriteRequest) ProtoMessage() {}
func (*WriteRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{3}
}
func (m *WriteRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_WriteRequest.Unmarshal(m, b)
}
func (m *WriteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_WriteRequest.Marshal(b, m, deterministic)
}
func (m *WriteRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_WriteRequest.Merge(m, src)
}
func (m *WriteRequest) XXX_Size() int {
return xxx_messageInfo_WriteRequest.Size(m)
}
func (m *WriteRequest) XXX_DiscardUnknown() {
xxx_messageInfo_WriteRequest.DiscardUnknown(m)
}
var xxx_messageInfo_WriteRequest proto.InternalMessageInfo
func (m *WriteRequest) GetRecords() []*Record {
if m != nil {
return m.Records
}
return nil
}
type WriteResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *WriteResponse) Reset() { *m = WriteResponse{} }
func (m *WriteResponse) String() string { return proto.CompactTextString(m) }
func (*WriteResponse) ProtoMessage() {}
func (*WriteResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{4}
}
func (m *WriteResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_WriteResponse.Unmarshal(m, b)
}
func (m *WriteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_WriteResponse.Marshal(b, m, deterministic)
}
func (m *WriteResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_WriteResponse.Merge(m, src)
}
func (m *WriteResponse) XXX_Size() int {
return xxx_messageInfo_WriteResponse.Size(m)
}
func (m *WriteResponse) XXX_DiscardUnknown() {
xxx_messageInfo_WriteResponse.DiscardUnknown(m)
}
var xxx_messageInfo_WriteResponse proto.InternalMessageInfo
type DeleteRequest struct {
Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *DeleteRequest) Reset() { *m = DeleteRequest{} }
func (m *DeleteRequest) String() string { return proto.CompactTextString(m) }
func (*DeleteRequest) ProtoMessage() {}
func (*DeleteRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{5}
}
func (m *DeleteRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DeleteRequest.Unmarshal(m, b)
}
func (m *DeleteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_DeleteRequest.Marshal(b, m, deterministic)
}
func (m *DeleteRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_DeleteRequest.Merge(m, src)
}
func (m *DeleteRequest) XXX_Size() int {
return xxx_messageInfo_DeleteRequest.Size(m)
}
func (m *DeleteRequest) XXX_DiscardUnknown() {
xxx_messageInfo_DeleteRequest.DiscardUnknown(m)
}
var xxx_messageInfo_DeleteRequest proto.InternalMessageInfo
func (m *DeleteRequest) GetKeys() []string {
if m != nil {
return m.Keys
}
return nil
}
type DeleteResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *DeleteResponse) Reset() { *m = DeleteResponse{} }
func (m *DeleteResponse) String() string { return proto.CompactTextString(m) }
func (*DeleteResponse) ProtoMessage() {}
func (*DeleteResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{6}
}
func (m *DeleteResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DeleteResponse.Unmarshal(m, b)
}
func (m *DeleteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_DeleteResponse.Marshal(b, m, deterministic)
}
func (m *DeleteResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_DeleteResponse.Merge(m, src)
}
func (m *DeleteResponse) XXX_Size() int {
return xxx_messageInfo_DeleteResponse.Size(m)
}
func (m *DeleteResponse) XXX_DiscardUnknown() {
xxx_messageInfo_DeleteResponse.DiscardUnknown(m)
}
var xxx_messageInfo_DeleteResponse proto.InternalMessageInfo
type SyncRequest struct {
// optional key
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SyncRequest) Reset() { *m = SyncRequest{} }
func (m *SyncRequest) String() string { return proto.CompactTextString(m) }
func (*SyncRequest) ProtoMessage() {}
func (*SyncRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{7}
}
func (m *SyncRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SyncRequest.Unmarshal(m, b)
}
func (m *SyncRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SyncRequest.Marshal(b, m, deterministic)
}
func (m *SyncRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_SyncRequest.Merge(m, src)
}
func (m *SyncRequest) XXX_Size() int {
return xxx_messageInfo_SyncRequest.Size(m)
}
func (m *SyncRequest) XXX_DiscardUnknown() {
xxx_messageInfo_SyncRequest.DiscardUnknown(m)
}
var xxx_messageInfo_SyncRequest proto.InternalMessageInfo
func (m *SyncRequest) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
type SyncResponse struct {
Records []*Record `protobuf:"bytes,1,rep,name=records,proto3" json:"records,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SyncResponse) Reset() { *m = SyncResponse{} }
func (m *SyncResponse) String() string { return proto.CompactTextString(m) }
func (*SyncResponse) ProtoMessage() {}
func (*SyncResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_f84ccc98e143ed3e, []int{8}
}
func (m *SyncResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SyncResponse.Unmarshal(m, b)
}
func (m *SyncResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SyncResponse.Marshal(b, m, deterministic)
}
func (m *SyncResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_SyncResponse.Merge(m, src)
}
func (m *SyncResponse) XXX_Size() int {
return xxx_messageInfo_SyncResponse.Size(m)
}
func (m *SyncResponse) XXX_DiscardUnknown() {
xxx_messageInfo_SyncResponse.DiscardUnknown(m)
}
var xxx_messageInfo_SyncResponse proto.InternalMessageInfo
func (m *SyncResponse) GetRecords() []*Record {
if m != nil {
return m.Records
}
return nil
}
func init() {
proto.RegisterType((*Record)(nil), "go.micro.store.Record")
proto.RegisterType((*ReadRequest)(nil), "go.micro.store.ReadRequest")
proto.RegisterType((*ReadResponse)(nil), "go.micro.store.ReadResponse")
proto.RegisterType((*WriteRequest)(nil), "go.micro.store.WriteRequest")
proto.RegisterType((*WriteResponse)(nil), "go.micro.store.WriteResponse")
proto.RegisterType((*DeleteRequest)(nil), "go.micro.store.DeleteRequest")
proto.RegisterType((*DeleteResponse)(nil), "go.micro.store.DeleteResponse")
proto.RegisterType((*SyncRequest)(nil), "go.micro.store.SyncRequest")
proto.RegisterType((*SyncResponse)(nil), "go.micro.store.SyncResponse")
}
func init() {
proto.RegisterFile("micro/go-micro/store/service/proto/store.proto", fileDescriptor_f84ccc98e143ed3e)
}
var fileDescriptor_f84ccc98e143ed3e = []byte{
// 333 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0xcb, 0x6e, 0xf2, 0x30,
0x10, 0x85, 0x09, 0x81, 0xfc, 0x62, 0xb8, 0xfc, 0x68, 0x54, 0xa1, 0x88, 0xde, 0xd2, 0x74, 0x93,
0x4d, 0x03, 0xa2, 0x2f, 0x50, 0xa9, 0x17, 0xb5, 0x5b, 0xb3, 0xe8, 0x9a, 0x86, 0x11, 0x8a, 0xa0,
0x98, 0x3a, 0x01, 0x35, 0x2f, 0xd4, 0xe7, 0xac, 0x6c, 0x27, 0x69, 0x90, 0x41, 0xaa, 0xba, 0x1b,
0x7b, 0xce, 0x1c, 0x9f, 0xf9, 0x64, 0x08, 0xdf, 0xe3, 0x48, 0xf0, 0xd1, 0x82, 0xdf, 0xe8, 0x22,
0x49, 0xb9, 0xa0, 0x51, 0x42, 0x62, 0x17, 0x47, 0x34, 0xda, 0x08, 0x9e, 0xe6, 0x77, 0xa1, 0xaa,
0xb1, 0xb7, 0xe0, 0x7a, 0x24, 0x54, 0xb7, 0xfe, 0x33, 0x38, 0x8c, 0x22, 0x2e, 0xe6, 0xd8, 0x07,
0x7b, 0x49, 0x99, 0x6b, 0x79, 0x56, 0xd0, 0x62, 0xb2, 0xc4, 0x13, 0x68, 0xee, 0x66, 0xab, 0x2d,
0xb9, 0x75, 0xcf, 0x0a, 0x3a, 0x4c, 0x1f, 0x70, 0x00, 0x0e, 0x7d, 0x6e, 0x62, 0x91, 0xb9, 0xb6,
0x67, 0x05, 0x36, 0xcb, 0x4f, 0xfe, 0x15, 0xb4, 0x19, 0xcd, 0xe6, 0x8c, 0x3e, 0xb6, 0x94, 0xa4,
0x88, 0xd0, 0x58, 0x52, 0x96, 0xb8, 0x96, 0x67, 0x07, 0x2d, 0xa6, 0x6a, 0xff, 0x0e, 0x3a, 0x5a,
0x92, 0x6c, 0xf8, 0x3a, 0x21, 0x1c, 0xc3, 0x3f, 0xa1, 0x1e, 0xd7, 0xb2, 0xf6, 0x64, 0x10, 0xee,
0xc7, 0x0b, 0x75, 0x36, 0x56, 0xc8, 0xa4, 0xc3, 0xab, 0x88, 0x53, 0x2a, 0x5e, 0xa9, 0x38, 0xd4,
0x7f, 0xe7, 0xf0, 0x1f, 0xba, 0xb9, 0x83, 0x0e, 0xe1, 0x5f, 0x43, 0xf7, 0x81, 0x56, 0xf4, 0xe3,
0x79, 0x28, 0x79, 0x1f, 0x7a, 0x85, 0x28, 0x1f, 0xbb, 0x84, 0xf6, 0x34, 0x5b, 0x47, 0xc5, 0x90,
0x41, 0x4f, 0x46, 0xd5, 0x82, 0xbf, 0x2e, 0x3b, 0xf9, 0xaa, 0x43, 0x73, 0x2a, 0x3b, 0x78, 0x0f,
0x0d, 0x09, 0x0e, 0x4f, 0xcd, 0x91, 0x92, 0xf8, 0xf0, 0xec, 0x70, 0x33, 0xcf, 0x5b, 0xc3, 0x27,
0x68, 0xaa, 0xcd, 0xd1, 0x10, 0x56, 0x91, 0x0e, 0xcf, 0x8f, 0x74, 0x4b, 0x9f, 0x17, 0x70, 0x34,
0x0b, 0x34, 0xa4, 0x7b, 0x20, 0x87, 0x17, 0xc7, 0xda, 0xa5, 0xd5, 0x23, 0x34, 0x24, 0x23, 0x73,
0xaf, 0x0a, 0x5a, 0x73, 0xaf, 0x2a, 0x56, 0xbf, 0x36, 0xb6, 0xde, 0x1c, 0xf5, 0xb7, 0x6f, 0xbf,
0x03, 0x00, 0x00, 0xff, 0xff, 0x30, 0xc8, 0x99, 0x52, 0x0d, 0x03, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// StoreClient is the client API for Store service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StoreClient interface {
Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error)
Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error)
Sync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (Store_SyncClient, error)
}
type storeClient struct {
cc *grpc.ClientConn
}
func NewStoreClient(cc *grpc.ClientConn) StoreClient {
return &storeClient{cc}
}
func (c *storeClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) {
out := new(ReadResponse)
err := c.cc.Invoke(ctx, "/go.micro.store.Store/Read", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) {
out := new(WriteResponse)
err := c.cc.Invoke(ctx, "/go.micro.store.Store/Write", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) {
out := new(DeleteResponse)
err := c.cc.Invoke(ctx, "/go.micro.store.Store/Delete", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeClient) Sync(ctx context.Context, in *SyncRequest, opts ...grpc.CallOption) (Store_SyncClient, error) {
stream, err := c.cc.NewStream(ctx, &_Store_serviceDesc.Streams[0], "/go.micro.store.Store/Sync", opts...)
if err != nil {
return nil, err
}
x := &storeSyncClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Store_SyncClient interface {
Recv() (*SyncResponse, error)
grpc.ClientStream
}
type storeSyncClient struct {
grpc.ClientStream
}
func (x *storeSyncClient) Recv() (*SyncResponse, error) {
m := new(SyncResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// StoreServer is the server API for Store service.
type StoreServer interface {
Read(context.Context, *ReadRequest) (*ReadResponse, error)
Write(context.Context, *WriteRequest) (*WriteResponse, error)
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
Sync(*SyncRequest, Store_SyncServer) error
}
func RegisterStoreServer(s *grpc.Server, srv StoreServer) {
s.RegisterService(&_Store_serviceDesc, srv)
}
func _Store_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReadRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StoreServer).Read(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.store.Store/Read",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StoreServer).Read(ctx, req.(*ReadRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Store_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(WriteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StoreServer).Write(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.store.Store/Write",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StoreServer).Write(ctx, req.(*WriteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Store_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StoreServer).Delete(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.store.Store/Delete",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StoreServer).Delete(ctx, req.(*DeleteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Store_Sync_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SyncRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(StoreServer).Sync(m, &storeSyncServer{stream})
}
type Store_SyncServer interface {
Send(*SyncResponse) error
grpc.ServerStream
}
type storeSyncServer struct {
grpc.ServerStream
}
func (x *storeSyncServer) Send(m *SyncResponse) error {
return x.ServerStream.SendMsg(m)
}
var _Store_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.store.Store",
HandlerType: (*StoreServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Read",
Handler: _Store_Read_Handler,
},
{
MethodName: "Write",
Handler: _Store_Write_Handler,
},
{
MethodName: "Delete",
Handler: _Store_Delete_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Sync",
Handler: _Store_Sync_Handler,
ServerStreams: true,
},
},
Metadata: "micro/go-micro/store/service/proto/store.proto",
}

View File

@ -0,0 +1,48 @@
syntax = "proto3";
package go.micro.store;
service Store {
rpc Read(ReadRequest) returns (ReadResponse) {};
rpc Write(WriteRequest) returns (WriteResponse) {};
rpc Delete(DeleteRequest) returns (DeleteResponse) {};
rpc Sync(SyncRequest) returns (stream SyncResponse) {};
}
message Record {
// key of the record
string key = 1;
// value in the record
bytes value = 2;
// timestamp in unix seconds
int64 expiry = 3;
}
message ReadRequest {
repeated string keys = 1;
}
message ReadResponse {
repeated Record records = 1;
}
message WriteRequest {
repeated Record records = 2;
}
message WriteResponse {}
message DeleteRequest {
repeated string keys = 1;
}
message DeleteResponse {}
message SyncRequest {
// optional key
string key = 1;
}
message SyncResponse {
repeated Record records = 1;
}

119
store/service/service.go Normal file
View File

@ -0,0 +1,119 @@
// Package service implements the store service interface
package service
import (
"context"
"io"
"time"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/store"
pb "github.com/micro/go-micro/store/service/proto"
)
type serviceStore struct {
options.Options
// Addresses of the nodes
Nodes []string
// store service client
Client pb.StoreService
}
// Sync all the known records
func (s *serviceStore) Sync() ([]*store.Record, error) {
stream, err := s.Client.Sync(context.Background(), &pb.SyncRequest{}, client.WithAddress(s.Nodes...))
if err != nil {
return nil, err
}
defer stream.Close()
var records []*store.Record
for {
rsp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return records, err
}
for _, record := range rsp.Records {
records = append(records, &store.Record{
Key: record.Key,
Value: record.Value,
Expiry: time.Duration(record.Expiry) * time.Second,
})
}
}
return records, nil
}
// Read a record with key
func (s *serviceStore) Read(keys ...string) ([]*store.Record, error) {
rsp, err := s.Client.Read(context.Background(), &pb.ReadRequest{
Keys: keys,
}, client.WithAddress(s.Nodes...))
if err != nil {
return nil, err
}
var records []*store.Record
for _, val := range rsp.Records {
records = append(records, &store.Record{
Key: val.Key,
Value: val.Value,
Expiry: time.Duration(val.Expiry) * time.Second,
})
}
return records, nil
}
// Write a record
func (s *serviceStore) Write(recs ...*store.Record) error {
var records []*pb.Record
for _, record := range recs {
records = append(records, &pb.Record{
Key: record.Key,
Value: record.Value,
Expiry: int64(record.Expiry.Seconds()),
})
}
_, err := s.Client.Write(context.Background(), &pb.WriteRequest{
Records: records,
}, client.WithAddress(s.Nodes...))
return err
}
// Delete a record with key
func (s *serviceStore) Delete(keys ...string) error {
_, err := s.Client.Delete(context.Background(), &pb.DeleteRequest{
Keys: keys,
}, client.WithAddress(s.Nodes...))
return err
}
// NewStore returns a new store service implementation
func NewStore(opts ...options.Option) store.Store {
options := options.NewOptions(opts...)
var nodes []string
n, ok := options.Values().Get("store.nodes")
if ok {
nodes = n.([]string)
}
service := &serviceStore{
Options: options,
Nodes: nodes,
Client: pb.NewStoreService("go.micro.store", client.DefaultClient),
}
return service
}

View File

@ -19,11 +19,11 @@ type Store interface {
// Sync all the known records // Sync all the known records
Sync() ([]*Record, error) Sync() ([]*Record, error)
// Read a record with key // Read a record with key
Read(key string) (*Record, error) Read(keys ...string) ([]*Record, error)
// Write a record // Write a record
Write(r *Record) error Write(recs ...*Record) error
// Delete a record with key // Delete a record with key
Delete(key string) error Delete(keys ...string) error
} }
// Record represents a data record // Record represents a data record

View File

@ -39,8 +39,12 @@ func (m *syncMap) Read(key, val interface{}) error {
return err return err
} }
if len(kval) == 0 {
return store.ErrNotFound
}
// decode value // decode value
return json.Unmarshal(kval.Value, val) return json.Unmarshal(kval[0].Value, val)
} }
func (m *syncMap) Write(key, val interface{}) error { func (m *syncMap) Write(key, val interface{}) error {

View File

@ -919,14 +919,12 @@ func (t *tun) Close() error {
default: default:
close(t.closed) close(t.closed)
t.connected = false t.connected = false
}
// send a close message // send a close message
// we don't close the link // we don't close the link
// just the tunnel // just the tunnel
return t.close() return t.close()
}
return nil
} }
// Dial an address // Dial an address

View File

@ -98,7 +98,6 @@ func (l *link) Close() error {
return nil return nil
default: default:
close(l.closed) close(l.closed)
return nil
} }
return nil return nil

View File

@ -181,5 +181,4 @@ func (t *tunListener) Accept() (Session, error) {
} }
return c, nil return c, nil
} }
return nil, nil
} }

View File

@ -237,8 +237,6 @@ func (s *session) Send(m *transport.Message) error {
case <-s.closed: case <-s.closed:
return io.EOF return io.EOF
} }
return nil
} }
// Recv is used to receive a message // Recv is used to receive a message