161 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			161 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package service implements the store service interface
 | |
| package service
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"io"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/micro/go-micro/client"
 | |
| 	"github.com/micro/go-micro/metadata"
 | |
| 	"github.com/micro/go-micro/store"
 | |
| 	pb "github.com/micro/go-micro/store/service/proto"
 | |
| )
 | |
| 
 | |
| type serviceStore struct {
 | |
| 	options store.Options
 | |
| 
 | |
| 	// Namespace to use
 | |
| 	Namespace string
 | |
| 
 | |
| 	// Addresses of the nodes
 | |
| 	Nodes []string
 | |
| 
 | |
| 	// Prefix to use
 | |
| 	Prefix string
 | |
| 
 | |
| 	// store service client
 | |
| 	Client pb.StoreService
 | |
| }
 | |
| 
 | |
| func (s *serviceStore) Init(opts ...store.Option) error {
 | |
| 	for _, o := range opts {
 | |
| 		o(&s.options)
 | |
| 	}
 | |
| 	s.Namespace = s.options.Namespace
 | |
| 	s.Prefix = s.options.Prefix
 | |
| 	s.Nodes = s.options.Nodes
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *serviceStore) Context() context.Context {
 | |
| 	ctx := context.Background()
 | |
| 
 | |
| 	md := make(metadata.Metadata)
 | |
| 
 | |
| 	if len(s.Namespace) > 0 {
 | |
| 		md["Micro-Namespace"] = s.Namespace
 | |
| 	}
 | |
| 
 | |
| 	if len(s.Prefix) > 0 {
 | |
| 		md["Micro-Prefix"] = s.Prefix
 | |
| 	}
 | |
| 
 | |
| 	return metadata.NewContext(ctx, md)
 | |
| }
 | |
| 
 | |
| // Sync all the known records
 | |
| func (s *serviceStore) List() ([]*store.Record, error) {
 | |
| 	stream, err := s.Client.List(s.Context(), &pb.ListRequest{}, client.WithAddress(s.Nodes...))
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer stream.Close()
 | |
| 
 | |
| 	var records []*store.Record
 | |
| 
 | |
| 	for {
 | |
| 		rsp, err := stream.Recv()
 | |
| 		if err == io.EOF {
 | |
| 			break
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return records, err
 | |
| 		}
 | |
| 
 | |
| 		for _, record := range rsp.Records {
 | |
| 			records = append(records, &store.Record{
 | |
| 				Key:    record.Key,
 | |
| 				Value:  record.Value,
 | |
| 				Expiry: time.Duration(record.Expiry) * time.Second,
 | |
| 			})
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return records, nil
 | |
| }
 | |
| 
 | |
| // Read a record with key
 | |
| func (s *serviceStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
 | |
| 	var options store.ReadOptions
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 
 | |
| 	rsp, err := s.Client.Read(s.Context(), &pb.ReadRequest{
 | |
| 		Key: key,
 | |
| 		Options: &pb.ReadOptions{
 | |
| 			Prefix: options.Prefix,
 | |
| 		},
 | |
| 	}, client.WithAddress(s.Nodes...))
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	records := make([]*store.Record, 0, len(rsp.Records))
 | |
| 
 | |
| 	for _, val := range rsp.Records {
 | |
| 		records = append(records, &store.Record{
 | |
| 			Key:    val.Key,
 | |
| 			Value:  val.Value,
 | |
| 			Expiry: time.Duration(val.Expiry) * time.Second,
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	return records, nil
 | |
| }
 | |
| 
 | |
| // Write a record
 | |
| func (s *serviceStore) Write(record *store.Record) error {
 | |
| 	_, err := s.Client.Write(s.Context(), &pb.WriteRequest{
 | |
| 		Record: &pb.Record{
 | |
| 			Key:    record.Key,
 | |
| 			Value:  record.Value,
 | |
| 			Expiry: int64(record.Expiry.Seconds()),
 | |
| 		},
 | |
| 	}, client.WithAddress(s.Nodes...))
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Delete a record with key
 | |
| func (s *serviceStore) Delete(key string) error {
 | |
| 	_, err := s.Client.Delete(s.Context(), &pb.DeleteRequest{
 | |
| 		Key: key,
 | |
| 	}, client.WithAddress(s.Nodes...))
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (s *serviceStore) String() string {
 | |
| 	return "service"
 | |
| }
 | |
| 
 | |
| // NewStore returns a new store service implementation
 | |
| func NewStore(opts ...store.Option) store.Store {
 | |
| 	var options store.Options
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 
 | |
| 	service := &serviceStore{
 | |
| 		options:   options,
 | |
| 		Namespace: options.Namespace,
 | |
| 		Prefix:    options.Prefix,
 | |
| 		Nodes:     options.Nodes,
 | |
| 		Client:    pb.NewStoreService("go.micro.store", client.DefaultClient),
 | |
| 	}
 | |
| 
 | |
| 	return service
 | |
| }
 |