* Add metadata to store record * Add metadata to cockroach store * add metadata to store service implementation * fix breaking cache test * Test/fix cockroach metadata usage * fix store memory metadata bug
		
			
				
	
	
		
			259 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			259 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Package service implements the store service interface
 | 
						|
package service
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"reflect"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/micro/go-micro/v2/client"
 | 
						|
	"github.com/micro/go-micro/v2/errors"
 | 
						|
	"github.com/micro/go-micro/v2/metadata"
 | 
						|
	"github.com/micro/go-micro/v2/store"
 | 
						|
	pb "github.com/micro/go-micro/v2/store/service/proto"
 | 
						|
)
 | 
						|
 | 
						|
type serviceStore struct {
 | 
						|
	options store.Options
 | 
						|
 | 
						|
	// The database to use
 | 
						|
	Database string
 | 
						|
 | 
						|
	// The table to use
 | 
						|
	Table string
 | 
						|
 | 
						|
	// Addresses of the nodes
 | 
						|
	Nodes []string
 | 
						|
 | 
						|
	// store service client
 | 
						|
	Client pb.StoreService
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceStore) Close() error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceStore) Init(opts ...store.Option) error {
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&s.options)
 | 
						|
	}
 | 
						|
	s.Database = s.options.Database
 | 
						|
	s.Table = s.options.Table
 | 
						|
	s.Nodes = s.options.Nodes
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceStore) Context() context.Context {
 | 
						|
	ctx := context.Background()
 | 
						|
	md := make(metadata.Metadata)
 | 
						|
	if len(s.Database) > 0 {
 | 
						|
		md["Micro-Database"] = s.Database
 | 
						|
	}
 | 
						|
 | 
						|
	if len(s.Table) > 0 {
 | 
						|
		md["Micro-Table"] = s.Table
 | 
						|
	}
 | 
						|
	return metadata.NewContext(ctx, md)
 | 
						|
}
 | 
						|
 | 
						|
// Sync all the known records
 | 
						|
func (s *serviceStore) List(opts ...store.ListOption) ([]string, error) {
 | 
						|
	options := store.ListOptions{
 | 
						|
		Database: s.Database,
 | 
						|
		Table:    s.Table,
 | 
						|
	}
 | 
						|
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	listOpts := &pb.ListOptions{
 | 
						|
		Database: options.Database,
 | 
						|
		Table:    options.Table,
 | 
						|
		Prefix:   options.Prefix,
 | 
						|
		Suffix:   options.Suffix,
 | 
						|
		Limit:    uint64(options.Limit),
 | 
						|
		Offset:   uint64(options.Offset),
 | 
						|
	}
 | 
						|
 | 
						|
	stream, err := s.Client.List(s.Context(), &pb.ListRequest{Options: listOpts}, client.WithAddress(s.Nodes...))
 | 
						|
	if err != nil && errors.Equal(err, errors.NotFound("", "")) {
 | 
						|
		return nil, store.ErrNotFound
 | 
						|
	} else if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	defer stream.Close()
 | 
						|
 | 
						|
	var keys []string
 | 
						|
 | 
						|
	for {
 | 
						|
		rsp, err := stream.Recv()
 | 
						|
		if err == io.EOF {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if err != nil {
 | 
						|
			return keys, err
 | 
						|
		}
 | 
						|
 | 
						|
		for _, key := range rsp.Keys {
 | 
						|
			keys = append(keys, key)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return keys, nil
 | 
						|
}
 | 
						|
 | 
						|
// Read a record with key
 | 
						|
func (s *serviceStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
 | 
						|
	options := store.ReadOptions{
 | 
						|
		Database: s.Database,
 | 
						|
		Table:    s.Table,
 | 
						|
	}
 | 
						|
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	readOpts := &pb.ReadOptions{
 | 
						|
		Database: options.Database,
 | 
						|
		Table:    options.Table,
 | 
						|
		Prefix:   options.Prefix,
 | 
						|
		Suffix:   options.Suffix,
 | 
						|
		Limit:    uint64(options.Limit),
 | 
						|
		Offset:   uint64(options.Offset),
 | 
						|
	}
 | 
						|
 | 
						|
	rsp, err := s.Client.Read(s.Context(), &pb.ReadRequest{
 | 
						|
		Key:     key,
 | 
						|
		Options: readOpts,
 | 
						|
	}, client.WithAddress(s.Nodes...))
 | 
						|
	if err != nil && errors.Equal(err, errors.NotFound("", "")) {
 | 
						|
		return nil, store.ErrNotFound
 | 
						|
	} else if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	records := make([]*store.Record, 0, len(rsp.Records))
 | 
						|
 | 
						|
	for _, val := range rsp.Records {
 | 
						|
		metadata := make(map[string]interface{})
 | 
						|
 | 
						|
		for k, v := range val.Metadata {
 | 
						|
			switch v.Type {
 | 
						|
			// TODO: parse all types
 | 
						|
			default:
 | 
						|
				metadata[k] = v
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		records = append(records, &store.Record{
 | 
						|
			Key:      val.Key,
 | 
						|
			Value:    val.Value,
 | 
						|
			Expiry:   time.Duration(val.Expiry) * time.Second,
 | 
						|
			Metadata: metadata,
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	return records, nil
 | 
						|
}
 | 
						|
 | 
						|
// Write a record
 | 
						|
func (s *serviceStore) Write(record *store.Record, opts ...store.WriteOption) error {
 | 
						|
	options := store.WriteOptions{
 | 
						|
		Database: s.Database,
 | 
						|
		Table:    s.Table,
 | 
						|
	}
 | 
						|
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	writeOpts := &pb.WriteOptions{
 | 
						|
		Database: options.Database,
 | 
						|
		Table:    options.Table,
 | 
						|
	}
 | 
						|
 | 
						|
	metadata := make(map[string]*pb.Field)
 | 
						|
 | 
						|
	for k, v := range record.Metadata {
 | 
						|
		metadata[k] = &pb.Field{
 | 
						|
			Type:  reflect.TypeOf(v).String(),
 | 
						|
			Value: fmt.Sprintf("%v", v),
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	_, err := s.Client.Write(s.Context(), &pb.WriteRequest{
 | 
						|
		Record: &pb.Record{
 | 
						|
			Key:      record.Key,
 | 
						|
			Value:    record.Value,
 | 
						|
			Expiry:   int64(record.Expiry.Seconds()),
 | 
						|
			Metadata: metadata,
 | 
						|
		},
 | 
						|
		Options: writeOpts}, client.WithAddress(s.Nodes...))
 | 
						|
	if err != nil && errors.Equal(err, errors.NotFound("", "")) {
 | 
						|
		return store.ErrNotFound
 | 
						|
	}
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// Delete a record with key
 | 
						|
func (s *serviceStore) Delete(key string, opts ...store.DeleteOption) error {
 | 
						|
	options := store.DeleteOptions{
 | 
						|
		Database: s.Database,
 | 
						|
		Table:    s.Table,
 | 
						|
	}
 | 
						|
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	deleteOpts := &pb.DeleteOptions{
 | 
						|
		Database: options.Database,
 | 
						|
		Table:    options.Table,
 | 
						|
	}
 | 
						|
 | 
						|
	_, err := s.Client.Delete(s.Context(), &pb.DeleteRequest{
 | 
						|
		Key:     key,
 | 
						|
		Options: deleteOpts,
 | 
						|
	}, client.WithAddress(s.Nodes...))
 | 
						|
	if err != nil && errors.Equal(err, errors.NotFound("", "")) {
 | 
						|
		return store.ErrNotFound
 | 
						|
	}
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceStore) String() string {
 | 
						|
	return "service"
 | 
						|
}
 | 
						|
 | 
						|
func (s *serviceStore) Options() store.Options {
 | 
						|
	return s.options
 | 
						|
}
 | 
						|
 | 
						|
// NewStore returns a new store service implementation
 | 
						|
func NewStore(opts ...store.Option) store.Store {
 | 
						|
	var options store.Options
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	if options.Client == nil {
 | 
						|
		options.Client = client.DefaultClient
 | 
						|
	}
 | 
						|
 | 
						|
	service := &serviceStore{
 | 
						|
		options:  options,
 | 
						|
		Database: options.Database,
 | 
						|
		Table:    options.Table,
 | 
						|
		Nodes:    options.Nodes,
 | 
						|
		Client:   pb.NewStoreService("go.micro.store", options.Client),
 | 
						|
	}
 | 
						|
 | 
						|
	return service
 | 
						|
}
 |