Client Cache
This commit is contained in:
		| @@ -261,7 +261,7 @@ func (s *svc) listRules(filters ...string) []*pb.Rule { | |||||||
|  |  | ||||||
| // loadRules retrieves the rules from the auth service | // loadRules retrieves the rules from the auth service | ||||||
| func (s *svc) loadRules() { | func (s *svc) loadRules() { | ||||||
| 	rsp, err := s.rule.List(context.TODO(), &pb.ListRequest{}) | 	rsp, err := s.rule.List(context.TODO(), &pb.ListRequest{}, client.WithCache(time.Minute)) | ||||||
| 	s.Lock() | 	s.Lock() | ||||||
| 	defer s.Unlock() | 	defer s.Unlock() | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										64
									
								
								client/cache.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										64
									
								
								client/cache.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,64 @@ | |||||||
|  | package client | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"crypto/sha1" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"fmt" | ||||||
|  | 	"sync" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/micro/go-micro/v2/metadata" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // NewCache returns an initialised cache. | ||||||
|  | // TODO: Setup a go routine to expire records in the cache. | ||||||
|  | func NewCache() *Cache { | ||||||
|  | 	return &Cache{ | ||||||
|  | 		values: make(map[string]interface{}), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Cache for responses | ||||||
|  | type Cache struct { | ||||||
|  | 	values map[string]interface{} | ||||||
|  | 	mutex  sync.Mutex | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Get a response from the cache | ||||||
|  | func (c *Cache) Get(ctx context.Context, req *Request) interface{} { | ||||||
|  | 	md, _ := metadata.FromContext(ctx) | ||||||
|  | 	ck := cacheKey{req, md} | ||||||
|  |  | ||||||
|  | 	c.mutex.Lock() | ||||||
|  | 	defer c.mutex.Unlock() | ||||||
|  |  | ||||||
|  | 	if val, ok := c.values[ck.Hash()]; ok { | ||||||
|  | 		return val | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Set a response in the cache | ||||||
|  | func (c *Cache) Set(ctx context.Context, req *Request, rsp interface{}, expiry time.Duration) { | ||||||
|  | 	md, _ := metadata.FromContext(ctx) | ||||||
|  | 	ck := cacheKey{req, md} | ||||||
|  |  | ||||||
|  | 	c.mutex.Lock() | ||||||
|  | 	c.values[ck.Hash()] = rsp | ||||||
|  | 	defer c.mutex.Unlock() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type cacheKey struct { | ||||||
|  | 	Request  *Request | ||||||
|  | 	Metadata metadata.Metadata | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Source: https://gobyexample.com/sha1-hashes | ||||||
|  | func (k *cacheKey) Hash() string { | ||||||
|  | 	bytes, _ := json.Marshal(k) | ||||||
|  | 	h := sha1.New() | ||||||
|  | 	h.Write(bytes) | ||||||
|  | 	return fmt.Sprintf("%x", h.Sum(nil)) | ||||||
|  | } | ||||||
| @@ -29,6 +29,9 @@ type Options struct { | |||||||
| 	PoolSize int | 	PoolSize int | ||||||
| 	PoolTTL  time.Duration | 	PoolTTL  time.Duration | ||||||
|  |  | ||||||
|  | 	// Response cache | ||||||
|  | 	Cache *Cache | ||||||
|  |  | ||||||
| 	// Middleware for client | 	// Middleware for client | ||||||
| 	Wrappers []Wrapper | 	Wrappers []Wrapper | ||||||
|  |  | ||||||
| @@ -59,6 +62,8 @@ type CallOptions struct { | |||||||
| 	StreamTimeout time.Duration | 	StreamTimeout time.Duration | ||||||
| 	// Use the services own auth token | 	// Use the services own auth token | ||||||
| 	ServiceToken bool | 	ServiceToken bool | ||||||
|  | 	// Duration to cache the response for | ||||||
|  | 	CacheExpiry time.Duration | ||||||
|  |  | ||||||
| 	// Middleware for low level call func | 	// Middleware for low level call func | ||||||
| 	CallWrappers []CallWrapper | 	CallWrappers []CallWrapper | ||||||
| @@ -91,6 +96,7 @@ type RequestOptions struct { | |||||||
|  |  | ||||||
| func NewOptions(options ...Option) Options { | func NewOptions(options ...Option) Options { | ||||||
| 	opts := Options{ | 	opts := Options{ | ||||||
|  | 		Cache:       NewCache(), | ||||||
| 		Context:     context.Background(), | 		Context:     context.Background(), | ||||||
| 		ContentType: DefaultContentType, | 		ContentType: DefaultContentType, | ||||||
| 		Codecs:      make(map[string]codec.NewCodec), | 		Codecs:      make(map[string]codec.NewCodec), | ||||||
| @@ -324,6 +330,14 @@ func WithServiceToken() CallOption { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // WithCache is a CallOption which sets the duration the response | ||||||
|  | // shoull be cached for | ||||||
|  | func WithCache(c time.Duration) CallOption { | ||||||
|  | 	return func(o *CallOptions) { | ||||||
|  | 		o.CacheExpiry = c | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| func WithMessageContentType(ct string) MessageOption { | func WithMessageContentType(ct string) MessageOption { | ||||||
| 	return func(o *MessageOptions) { | 	return func(o *MessageOptions) { | ||||||
| 		o.ContentType = ct | 		o.ContentType = ct | ||||||
|   | |||||||
| @@ -471,9 +471,12 @@ func (c *cmd) Before(ctx *cli.Context) error { | |||||||
| 	var serverOpts []server.Option | 	var serverOpts []server.Option | ||||||
| 	var clientOpts []client.Option | 	var clientOpts []client.Option | ||||||
|  |  | ||||||
| 	// setup a client to use when calling the runtime | 	// setup a client to use when calling the runtime. It is important the auth client is wrapped | ||||||
|  | 	// after the cache client since the wrappers are applied in reverse order and the cache will use | ||||||
|  | 	// some of the headers set by the auth client. | ||||||
| 	authFn := func() auth.Auth { return *c.opts.Auth } | 	authFn := func() auth.Auth { return *c.opts.Auth } | ||||||
| 	microClient := wrapper.AuthClient(authFn, grpc.NewClient()) | 	microClient := wrapper.CacheClient(grpc.NewClient()) | ||||||
|  | 	microClient = wrapper.AuthClient(authFn, microClient) | ||||||
|  |  | ||||||
| 	// Set the store | 	// Set the store | ||||||
| 	if name := ctx.String("store"); len(name) > 0 { | 	if name := ctx.String("store"); len(name) > 0 { | ||||||
|   | |||||||
| @@ -42,6 +42,7 @@ func newService(opts ...Option) Service { | |||||||
| 	// wrap client to inject From-Service header on any calls | 	// wrap client to inject From-Service header on any calls | ||||||
| 	options.Client = wrapper.FromService(serviceName, options.Client) | 	options.Client = wrapper.FromService(serviceName, options.Client) | ||||||
| 	options.Client = wrapper.TraceCall(serviceName, trace.DefaultTracer, options.Client) | 	options.Client = wrapper.TraceCall(serviceName, trace.DefaultTracer, options.Client) | ||||||
|  | 	options.Client = wrapper.CacheClient(options.Client) | ||||||
| 	options.Client = wrapper.AuthClient(authFn, options.Client) | 	options.Client = wrapper.AuthClient(authFn, options.Client) | ||||||
|  |  | ||||||
| 	// wrap the server to provide handler stats | 	// wrap the server to provide handler stats | ||||||
|   | |||||||
| @@ -227,3 +227,48 @@ func AuthHandler(fn func() auth.Auth) server.HandlerWrapper { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | type cacheWrapper struct { | ||||||
|  | 	client.Client | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Call executes the request. If the CacheExpiry option was set, the response will be cached using | ||||||
|  | // a hash of the metadata and request as the key. | ||||||
|  | func (c *cacheWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { | ||||||
|  | 	// parse the options | ||||||
|  | 	var options client.CallOptions | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// if the client doesn't have a cacbe setup don't continue | ||||||
|  | 	cache := c.Options().Cache | ||||||
|  | 	if cache == nil { | ||||||
|  | 		return c.Client.Call(ctx, req, rsp, opts...) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// if the cache expiry is not set, execute the call without the cache | ||||||
|  | 	if options.CacheExpiry == 0 { | ||||||
|  | 		return c.Client.Call(ctx, req, rsp, opts...) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// check to see if there is a response | ||||||
|  | 	if cRsp := cache.Get(ctx, &req); cRsp != nil { | ||||||
|  | 		rsp = cRsp | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// don't cache the result if there was an error | ||||||
|  | 	if err := c.Client.Call(ctx, req, rsp, opts...); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// set the result in the cache | ||||||
|  | 	cache.Set(ctx, &req, rsp, options.CacheExpiry) | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // CacheClient wraps requests with the cache wrapper | ||||||
|  | func CacheClient(c client.Client) client.Client { | ||||||
|  | 	return &cacheWrapper{c} | ||||||
|  | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user