Compare commits
15 Commits
Author | SHA1 | Date | |
---|---|---|---|
650d167313 | |||
c6ba2a91e6 | |||
7ece08896f | |||
|
57f6f23294 | ||
09e6fa2fed | |||
10a09a5c6f | |||
b4e5d9462a | |||
96aa0b6906 | |||
f54658830d | |||
1e43122660 | |||
42800fa247 | |||
5b9c810653 | |||
c3def24bf4 | |||
0d1ef31764 | |||
d49afa230f |
@@ -96,8 +96,8 @@ type CallOptions struct {
|
|||||||
RequestTimeout time.Duration
|
RequestTimeout time.Duration
|
||||||
// DialTimeout dial timeout
|
// DialTimeout dial timeout
|
||||||
DialTimeout time.Duration
|
DialTimeout time.Duration
|
||||||
// AuthToken flag
|
// AuthToken string
|
||||||
AuthToken bool
|
AuthToken string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Context pass context to client
|
// Context pass context to client
|
||||||
@@ -463,9 +463,9 @@ func WithDialTimeout(d time.Duration) CallOption {
|
|||||||
|
|
||||||
// WithAuthToken is a CallOption which overrides the
|
// WithAuthToken is a CallOption which overrides the
|
||||||
// authorization header with the services own auth token
|
// authorization header with the services own auth token
|
||||||
func WithAuthToken() CallOption {
|
func WithAuthToken(t string) CallOption {
|
||||||
return func(o *CallOptions) {
|
return func(o *CallOptions) {
|
||||||
o.AuthToken = true
|
o.AuthToken = t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
395
flow/default.go
395
flow/default.go
@@ -3,12 +3,16 @@ package flow
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/silas/dag"
|
"github.com/silas/dag"
|
||||||
"github.com/unistack-org/micro/v3/client"
|
"github.com/unistack-org/micro/v3/client"
|
||||||
"github.com/unistack-org/micro/v3/codec"
|
"github.com/unistack-org/micro/v3/codec"
|
||||||
|
"github.com/unistack-org/micro/v3/logger"
|
||||||
|
"github.com/unistack-org/micro/v3/metadata"
|
||||||
|
"github.com/unistack-org/micro/v3/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
type microFlow struct {
|
type microFlow struct {
|
||||||
@@ -20,27 +24,147 @@ type microWorkflow struct {
|
|||||||
g *dag.AcyclicGraph
|
g *dag.AcyclicGraph
|
||||||
init bool
|
init bool
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
opts Options
|
opts Options
|
||||||
steps map[string]Step
|
steps map[string]Step
|
||||||
|
status Status
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *microWorkflow) ID() string {
|
func (w *microWorkflow) ID() string {
|
||||||
return w.id
|
return w.id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *microWorkflow) Steps() [][]Step {
|
func (w *microWorkflow) Steps() ([][]Step, error) {
|
||||||
|
return w.getSteps("", false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *microWorkflow) Status() Status {
|
||||||
|
return w.status
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *microWorkflow) AppendSteps(steps ...Step) error {
|
||||||
|
w.Lock()
|
||||||
|
|
||||||
|
for _, s := range steps {
|
||||||
|
w.steps[s.String()] = s
|
||||||
|
w.g.Add(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, dst := range steps {
|
||||||
|
for _, req := range dst.Requires() {
|
||||||
|
src, ok := w.steps[req]
|
||||||
|
if !ok {
|
||||||
|
return ErrStepNotExists
|
||||||
|
}
|
||||||
|
w.g.Connect(dag.BasicEdge(src, dst))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.g.Validate(); err != nil {
|
||||||
|
w.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
w.g.TransitiveReduction()
|
||||||
|
|
||||||
|
w.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error {
|
func (w *microWorkflow) RemoveSteps(steps ...Step) error {
|
||||||
|
// TODO: handle case when some step requires or required by removed step
|
||||||
|
|
||||||
|
w.Lock()
|
||||||
|
|
||||||
|
for _, s := range steps {
|
||||||
|
delete(w.steps, s.String())
|
||||||
|
w.g.Remove(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, dst := range steps {
|
||||||
|
for _, req := range dst.Requires() {
|
||||||
|
src, ok := w.steps[req]
|
||||||
|
if !ok {
|
||||||
|
return ErrStepNotExists
|
||||||
|
}
|
||||||
|
w.g.Connect(dag.BasicEdge(src, dst))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := w.g.Validate(); err != nil {
|
||||||
|
w.Unlock()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
w.g.TransitiveReduction()
|
||||||
|
|
||||||
|
w.Unlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error {
|
func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
|
||||||
return nil
|
var steps [][]Step
|
||||||
|
var root dag.Vertex
|
||||||
|
var err error
|
||||||
|
|
||||||
|
fn := func(n dag.Vertex, idx int) error {
|
||||||
|
if idx == 0 {
|
||||||
|
steps = make([][]Step, 1)
|
||||||
|
steps[0] = make([]Step, 0, 1)
|
||||||
|
} else if idx >= len(steps) {
|
||||||
|
tsteps := make([][]Step, idx+1)
|
||||||
|
copy(tsteps, steps)
|
||||||
|
steps = tsteps
|
||||||
|
steps[idx] = make([]Step, 0, 1)
|
||||||
|
}
|
||||||
|
steps[idx] = append(steps[idx], n.(Step))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if start != "" {
|
||||||
|
var ok bool
|
||||||
|
w.RLock()
|
||||||
|
root, ok = w.steps[start]
|
||||||
|
w.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
return nil, ErrStepNotExists
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
root, err = w.g.Root()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if reverse {
|
||||||
|
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
|
||||||
|
} else {
|
||||||
|
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return steps, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) {
|
func (w *microWorkflow) Abort(ctx context.Context, eid string) error {
|
||||||
|
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||||
|
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *microWorkflow) Suspend(ctx context.Context, eid string) error {
|
||||||
|
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||||
|
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *microWorkflow) Resume(ctx context.Context, eid string) error {
|
||||||
|
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||||
|
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
|
||||||
w.Lock()
|
w.Lock()
|
||||||
if !w.init {
|
if !w.init {
|
||||||
if err := w.g.Validate(); err != nil {
|
if err := w.g.Validate(); err != nil {
|
||||||
@@ -56,74 +180,176 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
eid := uid.String()
|
||||||
|
|
||||||
|
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
||||||
|
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
||||||
|
|
||||||
options := NewExecuteOptions(opts...)
|
options := NewExecuteOptions(opts...)
|
||||||
var steps [][]Step
|
|
||||||
fn := func(n dag.Vertex, idx int) error {
|
|
||||||
if idx == 0 {
|
|
||||||
steps = make([][]Step, 1)
|
|
||||||
steps[0] = make([]Step, 0, 1)
|
|
||||||
} else if idx >= len(steps) {
|
|
||||||
tsteps := make([][]Step, idx+1)
|
|
||||||
copy(tsteps, steps)
|
|
||||||
steps = tsteps
|
|
||||||
steps[idx] = make([]Step, 0, 1)
|
|
||||||
}
|
|
||||||
steps[idx] = append(steps[idx], n.(Step))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var root dag.Vertex
|
steps, err := w.getSteps(options.Start, options.Reverse)
|
||||||
if options.Start != "" {
|
|
||||||
var ok bool
|
|
||||||
w.RLock()
|
|
||||||
root, ok = w.steps[options.Start]
|
|
||||||
w.RUnlock()
|
|
||||||
if !ok {
|
|
||||||
return "", ErrStepNotExists
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
root, err = w.g.Root()
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if options.Reverse {
|
|
||||||
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
|
|
||||||
} else {
|
|
||||||
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
||||||
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||||
|
}
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
cherr := make(chan error, 1)
|
cherr := make(chan error, 1)
|
||||||
defer close(cherr)
|
chstatus := make(chan Status, 1)
|
||||||
|
|
||||||
nctx, cancel := context.WithCancel(ctx)
|
nctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
nopts := make([]ExecuteOption, 0, len(opts)+5)
|
nopts := make([]ExecuteOption, 0, len(opts)+5)
|
||||||
nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store))
|
|
||||||
|
nopts = append(nopts,
|
||||||
|
ExecuteClient(w.opts.Client),
|
||||||
|
ExecuteTracer(w.opts.Tracer),
|
||||||
|
ExecuteLogger(w.opts.Logger),
|
||||||
|
ExecuteMeter(w.opts.Meter),
|
||||||
|
)
|
||||||
|
nopts = append(nopts, opts...)
|
||||||
|
done := make(chan struct{})
|
||||||
|
|
||||||
|
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||||
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||||
|
return eid, werr
|
||||||
|
}
|
||||||
|
for idx := range steps {
|
||||||
|
for nidx := range steps[idx] {
|
||||||
|
cstep := steps[idx][nidx]
|
||||||
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
||||||
|
return eid, werr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for idx := range steps {
|
for idx := range steps {
|
||||||
wg.Add(len(steps[idx]))
|
|
||||||
for nidx := range steps[idx] {
|
for nidx := range steps[idx] {
|
||||||
go func(step Step) {
|
wStatus := &codec.Frame{}
|
||||||
defer wg.Done()
|
if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil {
|
||||||
if err = step.Execute(nctx, req, nopts...); err != nil {
|
cherr <- werr
|
||||||
cherr <- err
|
return
|
||||||
cancel()
|
}
|
||||||
|
if status := StringStatus[string(wStatus.Data)]; status != StatusRunning {
|
||||||
|
chstatus <- status
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if w.opts.Logger.V(logger.TraceLevel) {
|
||||||
|
w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx])
|
||||||
|
}
|
||||||
|
cstep := steps[idx][nidx]
|
||||||
|
if len(cstep.Requires()) == 0 {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(step Step) {
|
||||||
|
defer wg.Done()
|
||||||
|
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil {
|
||||||
|
cherr <- werr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||||
|
cherr <- werr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rsp, serr := step.Execute(nctx, req, nopts...)
|
||||||
|
if serr != nil {
|
||||||
|
step.SetStatus(StatusFailure)
|
||||||
|
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||||
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
|
}
|
||||||
|
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||||
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
|
}
|
||||||
|
cherr <- serr
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
|
||||||
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
|
cherr <- werr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||||
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
|
cherr <- werr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(cstep)
|
||||||
|
wg.Wait()
|
||||||
|
} else {
|
||||||
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil {
|
||||||
|
cherr <- werr
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}(steps[idx][nidx])
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||||
|
cherr <- werr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rsp, serr := cstep.Execute(nctx, req, nopts...)
|
||||||
|
if serr != nil {
|
||||||
|
cstep.SetStatus(StatusFailure)
|
||||||
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||||
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
|
}
|
||||||
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||||
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
|
}
|
||||||
|
cherr <- serr
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
|
||||||
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
|
cherr <- werr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||||
|
cherr <- werr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
wg.Wait()
|
|
||||||
}
|
}
|
||||||
cherr <- nil
|
close(done)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err = <-cherr
|
if options.Async {
|
||||||
|
return eid, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Tracef(ctx, "wait for finish or error")
|
||||||
|
select {
|
||||||
|
case <-nctx.Done():
|
||||||
|
err = nctx.Err()
|
||||||
|
case cerr := <-cherr:
|
||||||
|
err = cerr
|
||||||
|
case <-done:
|
||||||
|
close(cherr)
|
||||||
|
case <-chstatus:
|
||||||
|
close(chstatus)
|
||||||
|
return uid.String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case nctx.Err() != nil:
|
||||||
|
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil {
|
||||||
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
case err == nil:
|
||||||
|
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||||
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
case err != nil:
|
||||||
|
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil {
|
||||||
|
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
return uid.String(), err
|
return uid.String(), err
|
||||||
}
|
}
|
||||||
@@ -207,6 +433,17 @@ type microCallStep struct {
|
|||||||
opts StepOptions
|
opts StepOptions
|
||||||
service string
|
service string
|
||||||
method string
|
method string
|
||||||
|
rsp *Message
|
||||||
|
req *Message
|
||||||
|
status Status
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *microCallStep) Request() *Message {
|
||||||
|
return s.req
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *microCallStep) Response() *Message {
|
||||||
|
return s.rsp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *microCallStep) ID() string {
|
func (s *microCallStep) ID() string {
|
||||||
@@ -247,23 +484,47 @@ func (s *microCallStep) Hashcode() interface{} {
|
|||||||
return s.String()
|
return s.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
|
func (s *microCallStep) GetStatus() Status {
|
||||||
|
return s.status
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *microCallStep) SetStatus(status Status) {
|
||||||
|
s.status = status
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *microCallStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) {
|
||||||
options := NewExecuteOptions(opts...)
|
options := NewExecuteOptions(opts...)
|
||||||
if options.Client == nil {
|
if options.Client == nil {
|
||||||
return fmt.Errorf("client not set")
|
return nil, ErrMissingClient
|
||||||
}
|
}
|
||||||
rsp := &codec.Frame{}
|
rsp := &codec.Frame{}
|
||||||
copts := []client.CallOption{client.WithRetries(0)}
|
copts := []client.CallOption{client.WithRetries(0)}
|
||||||
if options.Timeout > 0 {
|
if options.Timeout > 0 {
|
||||||
copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout))
|
copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout))
|
||||||
}
|
}
|
||||||
err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp)
|
nctx := metadata.NewOutgoingContext(ctx, req.Header)
|
||||||
return err
|
err := options.Client.Call(nctx, options.Client.NewRequest(s.service, s.method, &codec.Frame{Data: req.Body}), rsp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
md, _ := metadata.FromOutgoingContext(nctx)
|
||||||
|
return &Message{Header: md, Body: rsp.Data}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
type microPublishStep struct {
|
type microPublishStep struct {
|
||||||
opts StepOptions
|
opts StepOptions
|
||||||
topic string
|
topic string
|
||||||
|
req *Message
|
||||||
|
rsp *Message
|
||||||
|
status Status
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *microPublishStep) Request() *Message {
|
||||||
|
return s.req
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *microPublishStep) Response() *Message {
|
||||||
|
return s.rsp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *microPublishStep) ID() string {
|
func (s *microPublishStep) ID() string {
|
||||||
@@ -304,13 +565,21 @@ func (s *microPublishStep) Hashcode() interface{} {
|
|||||||
return s.String()
|
return s.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error {
|
func (s *microPublishStep) GetStatus() Status {
|
||||||
return nil
|
return s.status
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCallStep(service string, method string, opts ...StepOption) Step {
|
func (s *microPublishStep) SetStatus(status Status) {
|
||||||
|
s.status = status
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *microPublishStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCallStep(service string, name string, method string, opts ...StepOption) Step {
|
||||||
options := NewStepOptions(opts...)
|
options := NewStepOptions(opts...)
|
||||||
return µCallStep{service: service, method: method, opts: options}
|
return µCallStep{service: service, method: name + "." + method, opts: options}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPublishStep(topic string, opts ...StepOption) Step {
|
func NewPublishStep(topic string, opts ...StepOption) Step {
|
||||||
|
105
flow/flow.go
105
flow/flow.go
@@ -4,12 +4,43 @@ package flow
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"github.com/unistack-org/micro/v3/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrStepNotExists = errors.New("step not exists")
|
ErrStepNotExists = errors.New("step not exists")
|
||||||
|
ErrMissingClient = errors.New("client not set")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// RawMessage is a raw encoded JSON value.
|
||||||
|
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
|
||||||
|
type RawMessage []byte
|
||||||
|
|
||||||
|
// MarshalJSON returns m as the JSON encoding of m.
|
||||||
|
func (m *RawMessage) MarshalJSON() ([]byte, error) {
|
||||||
|
if m == nil {
|
||||||
|
return []byte("null"), nil
|
||||||
|
}
|
||||||
|
return *m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalJSON sets *m to a copy of data.
|
||||||
|
func (m *RawMessage) UnmarshalJSON(data []byte) error {
|
||||||
|
if m == nil {
|
||||||
|
return errors.New("RawMessage UnmarshalJSON on nil pointer")
|
||||||
|
}
|
||||||
|
*m = append((*m)[0:0], data...)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type Message struct {
|
||||||
|
Header metadata.Metadata
|
||||||
|
Body RawMessage
|
||||||
|
}
|
||||||
|
|
||||||
// Step represents dedicated workflow step
|
// Step represents dedicated workflow step
|
||||||
type Step interface {
|
type Step interface {
|
||||||
// ID returns step id
|
// ID returns step id
|
||||||
@@ -17,7 +48,7 @@ type Step interface {
|
|||||||
// Endpoint returns rpc endpoint service_name.service_method or broker topic
|
// Endpoint returns rpc endpoint service_name.service_method or broker topic
|
||||||
Endpoint() string
|
Endpoint() string
|
||||||
// Execute step run
|
// Execute step run
|
||||||
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error
|
Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error)
|
||||||
// Requires returns dependent steps
|
// Requires returns dependent steps
|
||||||
Requires() []string
|
Requires() []string
|
||||||
// Options returns step options
|
// Options returns step options
|
||||||
@@ -26,20 +57,70 @@ type Step interface {
|
|||||||
Require(steps ...Step) error
|
Require(steps ...Step) error
|
||||||
// String
|
// String
|
||||||
String() string
|
String() string
|
||||||
|
// GetStatus returns step status
|
||||||
|
GetStatus() Status
|
||||||
|
// SetStatus sets the step status
|
||||||
|
SetStatus(Status)
|
||||||
|
// Request returns step request message
|
||||||
|
Request() *Message
|
||||||
|
// Response returns step response message
|
||||||
|
Response() *Message
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Status int
|
||||||
|
|
||||||
|
func (status Status) String() string {
|
||||||
|
return StatusString[status]
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
StatusPending Status = iota
|
||||||
|
StatusRunning
|
||||||
|
StatusFailure
|
||||||
|
StatusSuccess
|
||||||
|
StatusAborted
|
||||||
|
StatusSuspend
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
StatusString = map[Status]string{
|
||||||
|
StatusPending: "StatusPending",
|
||||||
|
StatusRunning: "StatusRunning",
|
||||||
|
StatusFailure: "StatusFailure",
|
||||||
|
StatusSuccess: "StatusSuccess",
|
||||||
|
StatusAborted: "StatusAborted",
|
||||||
|
StatusSuspend: "StatusSuspend",
|
||||||
|
}
|
||||||
|
StringStatus = map[string]Status{
|
||||||
|
"StatusPending": StatusPending,
|
||||||
|
"StatusRunning": StatusRunning,
|
||||||
|
"StatusFailure": StatusFailure,
|
||||||
|
"StatusSuccess": StatusSuccess,
|
||||||
|
"StatusAborted": StatusAborted,
|
||||||
|
"StatusSuspend": StatusSuspend,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// Workflow contains all steps to execute
|
// Workflow contains all steps to execute
|
||||||
type Workflow interface {
|
type Workflow interface {
|
||||||
// ID returns id of the workflow
|
// ID returns id of the workflow
|
||||||
ID() string
|
ID() string
|
||||||
// Steps returns steps slice where parallel steps returned on the same level
|
|
||||||
Steps() [][]Step
|
|
||||||
// Execute workflow with args, return execution id and error
|
// Execute workflow with args, return execution id and error
|
||||||
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error)
|
Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error)
|
||||||
// RemoveSteps remove steps from workflow
|
// RemoveSteps remove steps from workflow
|
||||||
RemoveSteps(ctx context.Context, steps ...Step) error
|
RemoveSteps(steps ...Step) error
|
||||||
// AppendSteps append steps to workflow
|
// AppendSteps append steps to workflow
|
||||||
AppendSteps(ctx context.Context, steps ...Step) error
|
AppendSteps(steps ...Step) error
|
||||||
|
// Status returns workflow status
|
||||||
|
Status() Status
|
||||||
|
// Steps returns steps slice where parallel steps returned on the same level
|
||||||
|
Steps() ([][]Step, error)
|
||||||
|
// Suspend suspends execution
|
||||||
|
Suspend(ctx context.Context, eid string) error
|
||||||
|
// Resume resumes execution
|
||||||
|
Resume(ctx context.Context, eid string) error
|
||||||
|
// Abort abort execution
|
||||||
|
Abort(ctx context.Context, eid string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flow the base interface to interact with workflows
|
// Flow the base interface to interact with workflows
|
||||||
@@ -57,3 +138,15 @@ type Flow interface {
|
|||||||
// WorkflowList lists all workflows
|
// WorkflowList lists all workflows
|
||||||
WorkflowList(ctx context.Context) ([]Workflow, error)
|
WorkflowList(ctx context.Context) ([]Workflow, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
flowMu sync.Mutex
|
||||||
|
atomicSteps atomic.Value
|
||||||
|
)
|
||||||
|
|
||||||
|
func RegisterStep(step Step) {
|
||||||
|
flowMu.Lock()
|
||||||
|
steps, _ := atomicSteps.Load().([]Step)
|
||||||
|
atomicSteps.Store(append(steps, step))
|
||||||
|
flowMu.Unlock()
|
||||||
|
}
|
||||||
|
@@ -116,8 +116,6 @@ type ExecuteOptions struct {
|
|||||||
Logger logger.Logger
|
Logger logger.Logger
|
||||||
// Meter holds the meter
|
// Meter holds the meter
|
||||||
Meter meter.Meter
|
Meter meter.Meter
|
||||||
// Store used for intermediate results
|
|
||||||
Store store.Store
|
|
||||||
// Context can be used to abort execution or pass additional opts
|
// Context can be used to abort execution or pass additional opts
|
||||||
Context context.Context
|
Context context.Context
|
||||||
// Start step
|
// Start step
|
||||||
@@ -126,6 +124,8 @@ type ExecuteOptions struct {
|
|||||||
Reverse bool
|
Reverse bool
|
||||||
// Timeout for execution
|
// Timeout for execution
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
|
// Async enables async execution
|
||||||
|
Async bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type ExecuteOption func(*ExecuteOptions)
|
type ExecuteOption func(*ExecuteOptions)
|
||||||
@@ -154,12 +154,6 @@ func ExecuteMeter(m meter.Meter) ExecuteOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ExecuteStore(s store.Store) ExecuteOption {
|
|
||||||
return func(o *ExecuteOptions) {
|
|
||||||
o.Store = s
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func ExecuteContext(ctx context.Context) ExecuteOption {
|
func ExecuteContext(ctx context.Context) ExecuteOption {
|
||||||
return func(o *ExecuteOptions) {
|
return func(o *ExecuteOptions) {
|
||||||
o.Context = ctx
|
o.Context = ctx
|
||||||
@@ -178,8 +172,20 @@ func ExecuteTimeout(td time.Duration) ExecuteOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ExecuteAsync(b bool) ExecuteOption {
|
||||||
|
return func(o *ExecuteOptions) {
|
||||||
|
o.Async = b
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions {
|
func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions {
|
||||||
options := ExecuteOptions{}
|
options := ExecuteOptions{
|
||||||
|
Client: client.DefaultClient,
|
||||||
|
Logger: logger.DefaultLogger,
|
||||||
|
Tracer: tracer.DefaultTracer,
|
||||||
|
Meter: meter.DefaultMeter,
|
||||||
|
Context: context.Background(),
|
||||||
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
@@ -196,7 +202,9 @@ type StepOptions struct {
|
|||||||
type StepOption func(*StepOptions)
|
type StepOption func(*StepOptions)
|
||||||
|
|
||||||
func NewStepOptions(opts ...StepOption) StepOptions {
|
func NewStepOptions(opts ...StepOption) StepOptions {
|
||||||
options := StepOptions{Context: context.Background()}
|
options := StepOptions{
|
||||||
|
Context: context.Background(),
|
||||||
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
}
|
}
|
||||||
|
2
go.mod
2
go.mod
@@ -5,7 +5,7 @@ go 1.16
|
|||||||
require (
|
require (
|
||||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||||
github.com/ef-ds/deque v1.0.4
|
github.com/ef-ds/deque v1.0.4
|
||||||
github.com/google/uuid v1.2.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/imdario/mergo v0.3.12
|
github.com/imdario/mergo v0.3.12
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||||
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
|
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34
|
||||||
|
4
go.sum
4
go.sum
@@ -2,8 +2,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
|
|||||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
|
||||||
github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
|
github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
|
||||||
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
|
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
|
||||||
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
|
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||||
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
|
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
|
||||||
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||||
|
@@ -8,7 +8,8 @@ import (
|
|||||||
|
|
||||||
func TestLogger(t *testing.T) {
|
func TestLogger(t *testing.T) {
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
l := NewLogger(WithLevel(TraceLevel))
|
buf := bytes.NewBuffer(nil)
|
||||||
|
l := NewLogger(WithLevel(TraceLevel), WithOutput(buf))
|
||||||
if err := l.Init(); err != nil {
|
if err := l.Init(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -16,6 +17,18 @@ func TestLogger(t *testing.T) {
|
|||||||
l.Warn(ctx, "warn_msg1")
|
l.Warn(ctx, "warn_msg1")
|
||||||
l.Fields(map[string]interface{}{"error": "test"}).Info(ctx, "error message")
|
l.Fields(map[string]interface{}{"error": "test"}).Info(ctx, "error message")
|
||||||
l.Warn(ctx, "first", " ", "second")
|
l.Warn(ctx, "first", " ", "second")
|
||||||
|
if !bytes.Contains(buf.Bytes(), []byte(`"level":"trace","msg":"trace_msg1"`)) {
|
||||||
|
t.Fatalf("logger error, buf %s", buf.Bytes())
|
||||||
|
}
|
||||||
|
if !bytes.Contains(buf.Bytes(), []byte(`"warn","msg":"warn_msg1"`)) {
|
||||||
|
t.Fatalf("logger error, buf %s", buf.Bytes())
|
||||||
|
}
|
||||||
|
if !bytes.Contains(buf.Bytes(), []byte(`"error":"test","level":"info","msg":"error message"`)) {
|
||||||
|
t.Fatalf("logger error, buf %s", buf.Bytes())
|
||||||
|
}
|
||||||
|
if !bytes.Contains(buf.Bytes(), []byte(`"level":"warn","msg":"first second"`)) {
|
||||||
|
t.Fatalf("logger error, buf %s", buf.Bytes())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLoggerWrapper(t *testing.T) {
|
func TestLoggerWrapper(t *testing.T) {
|
||||||
@@ -35,3 +48,21 @@ func TestLoggerWrapper(t *testing.T) {
|
|||||||
t.Fatalf("omit not works, struct: %v, output: %s", s, buf.Bytes())
|
t.Fatalf("omit not works, struct: %v, output: %s", s, buf.Bytes())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestOmitLoggerWrapper(t *testing.T) {
|
||||||
|
ctx := context.TODO()
|
||||||
|
buf := bytes.NewBuffer(nil)
|
||||||
|
l := NewOmitLogger(NewLogger(WithLevel(TraceLevel), WithOutput(buf)))
|
||||||
|
if err := l.Init(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
type secret struct {
|
||||||
|
Name string
|
||||||
|
Passw string `logger:"omit"`
|
||||||
|
}
|
||||||
|
s := &secret{Name: "name", Passw: "secret"}
|
||||||
|
l.Errorf(ctx, "test %#+v", s)
|
||||||
|
if !bytes.Contains(buf.Bytes(), []byte(`logger.secret{Name:\"name\", Passw:\"\"}"`)) {
|
||||||
|
t.Fatalf("omit not works, struct: %v, output: %s", s, buf.Bytes())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -16,10 +16,96 @@ type LogfFunc func(ctx context.Context, level Level, msg string, args ...interfa
|
|||||||
type Wrapper interface {
|
type Wrapper interface {
|
||||||
// Log logs message with needed level
|
// Log logs message with needed level
|
||||||
Log(LogFunc) LogFunc
|
Log(LogFunc) LogFunc
|
||||||
// Log(ctx context.Context, level Level, args ...interface{})
|
|
||||||
// Logf logs message with needed level
|
// Logf logs message with needed level
|
||||||
Logf(LogfFunc) LogfFunc
|
Logf(LogfFunc) LogfFunc
|
||||||
//Logf(ctx context.Context, level Level, msg string, args ...interface{})
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ Logger = &OmitLogger{}
|
||||||
|
)
|
||||||
|
|
||||||
|
type OmitLogger struct {
|
||||||
|
l Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewOmitLogger(l Logger) Logger {
|
||||||
|
return &OmitLogger{l: l}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Init(opts ...Option) error {
|
||||||
|
return w.l.Init(append(opts, WrapLogger(NewOmitWrapper()))...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) V(level Level) bool {
|
||||||
|
return w.l.V(level)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Options() Options {
|
||||||
|
return w.l.Options()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Fields(fields map[string]interface{}) Logger {
|
||||||
|
return w.l.Fields(fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Info(ctx context.Context, args ...interface{}) {
|
||||||
|
w.l.Info(ctx, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Trace(ctx context.Context, args ...interface{}) {
|
||||||
|
w.l.Trace(ctx, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Debug(ctx context.Context, args ...interface{}) {
|
||||||
|
w.l.Debug(ctx, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Warn(ctx context.Context, args ...interface{}) {
|
||||||
|
w.l.Warn(ctx, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Error(ctx context.Context, args ...interface{}) {
|
||||||
|
w.l.Error(ctx, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Fatal(ctx context.Context, args ...interface{}) {
|
||||||
|
w.l.Fatal(ctx, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Infof(ctx context.Context, msg string, args ...interface{}) {
|
||||||
|
w.l.Infof(ctx, msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Tracef(ctx context.Context, msg string, args ...interface{}) {
|
||||||
|
w.l.Tracef(ctx, msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Debugf(ctx context.Context, msg string, args ...interface{}) {
|
||||||
|
w.l.Debugf(ctx, msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Warnf(ctx context.Context, msg string, args ...interface{}) {
|
||||||
|
w.l.Warnf(ctx, msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Errorf(ctx context.Context, msg string, args ...interface{}) {
|
||||||
|
w.l.Errorf(ctx, msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Fatalf(ctx context.Context, msg string, args ...interface{}) {
|
||||||
|
w.l.Fatalf(ctx, msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Log(ctx context.Context, level Level, args ...interface{}) {
|
||||||
|
w.l.Log(ctx, level, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) Logf(ctx context.Context, level Level, msg string, args ...interface{}) {
|
||||||
|
w.l.Logf(ctx, level, msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *OmitLogger) String() string {
|
||||||
|
return w.l.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
type OmitWrapper struct{}
|
type OmitWrapper struct{}
|
||||||
|
@@ -3,8 +3,9 @@ package meter
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
"reflect"
|
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -77,36 +78,60 @@ type Summary interface {
|
|||||||
UpdateDuration(time.Time)
|
UpdateDuration(time.Time)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sort labels alphabeticaly by label name
|
||||||
type byKey []string
|
type byKey []string
|
||||||
|
|
||||||
func (k byKey) Len() int { return len(k) / 2 }
|
func (k byKey) Len() int { return len(k) / 2 }
|
||||||
func (k byKey) Less(i, j int) bool { return k[i*2] < k[j*2] }
|
func (k byKey) Less(i, j int) bool { return k[i*2] < k[j*2] }
|
||||||
func (k byKey) Swap(i, j int) {
|
func (k byKey) Swap(i, j int) {
|
||||||
k[i*2], k[i*2+1], k[j*2], k[j*2+1] = k[j*2], k[j*2+1], k[i*2], k[i*2+1]
|
k[i*2], k[j*2] = k[j*2], k[i*2]
|
||||||
|
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
|
||||||
}
|
}
|
||||||
|
|
||||||
func Sort(slice *[]string) {
|
// BuildLables used to sort labels and delete duplicates.
|
||||||
bk := byKey(*slice)
|
// Last value wins in case of duplicate label keys.
|
||||||
if bk.Len() <= 1 {
|
func BuildLabels(labels ...string) []string {
|
||||||
return
|
if len(labels)%2 == 1 {
|
||||||
|
labels = labels[:len(labels)-1]
|
||||||
}
|
}
|
||||||
sort.Sort(bk)
|
sort.Sort(byKey(labels))
|
||||||
v := reflect.ValueOf(slice).Elem()
|
return labels
|
||||||
cnt := 0
|
}
|
||||||
key := 0
|
|
||||||
val := 1
|
// BuildName used to combine metric with labels.
|
||||||
for key < v.Len() {
|
// If labels count is odd, drop last element
|
||||||
if len(bk) > key+2 && bk[key] == bk[key+2] {
|
func BuildName(name string, labels ...string) string {
|
||||||
key += 2
|
if len(labels)%2 == 1 {
|
||||||
val += 2
|
labels = labels[:len(labels)-1]
|
||||||
continue
|
}
|
||||||
}
|
|
||||||
v.Index(cnt).Set(v.Index(key))
|
sort.Sort(byKey(labels))
|
||||||
cnt++
|
|
||||||
v.Index(cnt).Set(v.Index(val))
|
idx := 0
|
||||||
cnt++
|
for {
|
||||||
key += 2
|
if labels[idx] == labels[idx+2] {
|
||||||
val += 2
|
copy(labels[idx:], labels[idx+2:])
|
||||||
}
|
labels = labels[:len(labels)-2]
|
||||||
v.SetLen(cnt)
|
} else {
|
||||||
|
idx += 2
|
||||||
|
}
|
||||||
|
if idx+2 >= len(labels) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var b strings.Builder
|
||||||
|
_, _ = b.WriteString(name)
|
||||||
|
_, _ = b.WriteRune('{')
|
||||||
|
for idx := 0; idx < len(labels); idx += 2 {
|
||||||
|
if idx > 0 {
|
||||||
|
_, _ = b.WriteRune(',')
|
||||||
|
}
|
||||||
|
_, _ = b.WriteString(labels[idx])
|
||||||
|
_, _ = b.WriteString(`=`)
|
||||||
|
_, _ = b.WriteString(strconv.Quote(labels[idx+1]))
|
||||||
|
}
|
||||||
|
_, _ = b.WriteRune('}')
|
||||||
|
|
||||||
|
return b.String()
|
||||||
}
|
}
|
||||||
|
@@ -14,11 +14,53 @@ func TestNoopMeter(t *testing.T) {
|
|||||||
cnt.Inc()
|
cnt.Inc()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLabelsSort(t *testing.T) {
|
func testEq(a, b []string) bool {
|
||||||
ls := []string{"server", "http", "register", "mdns", "broker", "broker1", "broker", "broker2", "server", "tcp"}
|
if len(a) != len(b) {
|
||||||
Sort(&ls)
|
return false
|
||||||
|
}
|
||||||
|
for i := range a {
|
||||||
|
if a[i] != b[i] {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
if ls[0] != "broker" || ls[1] != "broker2" {
|
func TestBuildLabels(t *testing.T) {
|
||||||
t.Fatalf("sort error: %v", ls)
|
type testData struct {
|
||||||
|
src []string
|
||||||
|
dst []string
|
||||||
|
}
|
||||||
|
|
||||||
|
data := []testData{
|
||||||
|
testData{
|
||||||
|
src: []string{"zerolabel", "value3", "firstlabel", "value2"},
|
||||||
|
dst: []string{"firstlabel", "value2", "zerolabel", "value3"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, d := range data {
|
||||||
|
if !testEq(d.dst, BuildLabels(d.src...)) {
|
||||||
|
t.Fatalf("slices not properly sorted: %v %v", d.dst, d.src)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildName(t *testing.T) {
|
||||||
|
data := map[string][]string{
|
||||||
|
`my_metric{firstlabel="value2",zerolabel="value3"}`: []string{
|
||||||
|
"my_metric",
|
||||||
|
"zerolabel", "value3", "firstlabel", "value2",
|
||||||
|
},
|
||||||
|
`my_metric{broker="broker2",register="mdns",server="tcp"}`: []string{
|
||||||
|
"my_metric",
|
||||||
|
"broker", "broker1", "broker", "broker2", "server", "http", "server", "tcp", "register", "mdns",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for e, d := range data {
|
||||||
|
if x := BuildName(d[0], d[1:]...); x != e {
|
||||||
|
t.Fatalf("expect: %s, result: %s", e, x)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -15,8 +15,8 @@ import (
|
|||||||
var DefaultServer Server = NewServer()
|
var DefaultServer Server = NewServer()
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// DefaultAddress will be used if no address passed
|
// DefaultAddress will be used if no address passed, use secure localhost
|
||||||
DefaultAddress = ":0"
|
DefaultAddress = "127.0.0.1:0"
|
||||||
// DefaultName will be used if no name passed
|
// DefaultName will be used if no name passed
|
||||||
DefaultName = "server"
|
DefaultName = "server"
|
||||||
// DefaultVersion will be used if no version passed
|
// DefaultVersion will be used if no version passed
|
||||||
|
@@ -36,16 +36,6 @@ func (m *memoryStore) key(prefix, key string) string {
|
|||||||
return filepath.Join(prefix, key)
|
return filepath.Join(prefix, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) prefix(database, table string) string {
|
|
||||||
if len(database) == 0 {
|
|
||||||
database = m.opts.Database
|
|
||||||
}
|
|
||||||
if len(table) == 0 {
|
|
||||||
table = m.opts.Table
|
|
||||||
}
|
|
||||||
return filepath.Join(database, table)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *memoryStore) exists(prefix, key string) error {
|
func (m *memoryStore) exists(prefix, key string) error {
|
||||||
key = m.key(prefix, key)
|
key = m.key(prefix, key)
|
||||||
|
|
||||||
@@ -80,15 +70,17 @@ func (m *memoryStore) delete(prefix, key string) {
|
|||||||
|
|
||||||
func (m *memoryStore) list(prefix string, limit, offset uint) []string {
|
func (m *memoryStore) list(prefix string, limit, offset uint) []string {
|
||||||
allItems := m.store.Items()
|
allItems := m.store.Items()
|
||||||
allKeys := make([]string, len(allItems))
|
allKeys := make([]string, 0, len(allItems))
|
||||||
i := 0
|
|
||||||
|
|
||||||
for k := range allItems {
|
for k := range allItems {
|
||||||
if !strings.HasPrefix(k, prefix+"/") {
|
if !strings.HasPrefix(k, prefix) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
allKeys[i] = strings.TrimPrefix(k, prefix+"/")
|
k = strings.TrimPrefix(k, prefix)
|
||||||
i++
|
if k[0] == '/' {
|
||||||
|
k = k[1:]
|
||||||
|
}
|
||||||
|
allKeys = append(allKeys, k)
|
||||||
}
|
}
|
||||||
|
|
||||||
if limit != 0 || offset != 0 {
|
if limit != 0 || offset != 0 {
|
||||||
@@ -107,7 +99,6 @@ func (m *memoryStore) list(prefix string, limit, offset uint) []string {
|
|||||||
}
|
}
|
||||||
return allKeys[offset:end]
|
return allKeys[offset:end]
|
||||||
}
|
}
|
||||||
|
|
||||||
return allKeys
|
return allKeys
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,37 +118,48 @@ func (m *memoryStore) Name() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
|
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
|
||||||
prefix := m.prefix(m.opts.Database, m.opts.Table)
|
options := NewExistsOptions(opts...)
|
||||||
return m.exists(prefix, key)
|
if options.Namespace == "" {
|
||||||
|
options.Namespace = m.opts.Namespace
|
||||||
|
}
|
||||||
|
return m.exists(options.Namespace, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
|
func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
|
||||||
readOpts := NewReadOptions(opts...)
|
options := NewReadOptions(opts...)
|
||||||
prefix := m.prefix(readOpts.Database, readOpts.Table)
|
if options.Namespace == "" {
|
||||||
return m.get(prefix, key, val)
|
options.Namespace = m.opts.Namespace
|
||||||
|
}
|
||||||
|
return m.get(options.Namespace, key, val)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
|
func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
|
||||||
writeOpts := NewWriteOptions(opts...)
|
options := NewWriteOptions(opts...)
|
||||||
|
if options.Namespace == "" {
|
||||||
|
options.Namespace = m.opts.Namespace
|
||||||
|
}
|
||||||
|
if options.TTL == 0 {
|
||||||
|
options.TTL = cache.NoExpiration
|
||||||
|
}
|
||||||
|
|
||||||
prefix := m.prefix(writeOpts.Database, writeOpts.Table)
|
key = m.key(options.Namespace, key)
|
||||||
|
|
||||||
key = m.key(prefix, key)
|
|
||||||
|
|
||||||
buf, err := m.opts.Codec.Marshal(val)
|
buf, err := m.opts.Codec.Marshal(val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
m.store.Set(key, buf, writeOpts.TTL)
|
m.store.Set(key, buf, options.TTL)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
|
func (m *memoryStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
|
||||||
deleteOptions := NewDeleteOptions(opts...)
|
options := NewDeleteOptions(opts...)
|
||||||
|
if options.Namespace == "" {
|
||||||
|
options.Namespace = m.opts.Namespace
|
||||||
|
}
|
||||||
|
|
||||||
prefix := m.prefix(deleteOptions.Database, deleteOptions.Table)
|
m.delete(options.Namespace, key)
|
||||||
m.delete(prefix, key)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -166,25 +168,27 @@ func (m *memoryStore) Options() Options {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
|
func (m *memoryStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
|
||||||
listOptions := NewListOptions(opts...)
|
options := NewListOptions(opts...)
|
||||||
|
if options.Namespace == "" {
|
||||||
|
options.Namespace = m.opts.Namespace
|
||||||
|
}
|
||||||
|
|
||||||
prefix := m.prefix(listOptions.Database, listOptions.Table)
|
keys := m.list(options.Namespace, options.Limit, options.Offset)
|
||||||
keys := m.list(prefix, listOptions.Limit, listOptions.Offset)
|
|
||||||
|
|
||||||
if len(listOptions.Prefix) > 0 {
|
if len(options.Prefix) > 0 {
|
||||||
var prefixKeys []string
|
var prefixKeys []string
|
||||||
for _, k := range keys {
|
for _, k := range keys {
|
||||||
if strings.HasPrefix(k, listOptions.Prefix) {
|
if strings.HasPrefix(k, options.Prefix) {
|
||||||
prefixKeys = append(prefixKeys, k)
|
prefixKeys = append(prefixKeys, k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
keys = prefixKeys
|
keys = prefixKeys
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(listOptions.Suffix) > 0 {
|
if len(options.Suffix) > 0 {
|
||||||
var suffixKeys []string
|
var suffixKeys []string
|
||||||
for _, k := range keys {
|
for _, k := range keys {
|
||||||
if strings.HasSuffix(k, listOptions.Suffix) {
|
if strings.HasSuffix(k, options.Suffix) {
|
||||||
suffixKeys = append(suffixKeys, k)
|
suffixKeys = append(suffixKeys, k)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -9,11 +9,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestMemoryReInit(t *testing.T) {
|
func TestMemoryReInit(t *testing.T) {
|
||||||
s := store.NewStore(store.Table("aaa"))
|
s := store.NewStore(store.Namespace("aaa"))
|
||||||
if err := s.Init(store.Table("")); err != nil {
|
if err := s.Init(store.Namespace("")); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if len(s.Options().Table) > 0 {
|
if len(s.Options().Namespace) > 0 {
|
||||||
t.Error("Init didn't reinitialise the store")
|
t.Error("Init didn't reinitialise the store")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -28,7 +28,7 @@ func TestMemoryBasic(t *testing.T) {
|
|||||||
|
|
||||||
func TestMemoryPrefix(t *testing.T) {
|
func TestMemoryPrefix(t *testing.T) {
|
||||||
s := store.NewStore()
|
s := store.NewStore()
|
||||||
if err := s.Init(store.Table("some-prefix")); err != nil {
|
if err := s.Init(store.Namespace("some-prefix")); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
basictest(s, t)
|
basictest(s, t)
|
||||||
@@ -36,7 +36,7 @@ func TestMemoryPrefix(t *testing.T) {
|
|||||||
|
|
||||||
func TestMemoryNamespace(t *testing.T) {
|
func TestMemoryNamespace(t *testing.T) {
|
||||||
s := store.NewStore()
|
s := store.NewStore()
|
||||||
if err := s.Init(store.Database("some-namespace")); err != nil {
|
if err := s.Init(store.Namespace("some-namespace")); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
basictest(s, t)
|
basictest(s, t)
|
||||||
@@ -44,7 +44,7 @@ func TestMemoryNamespace(t *testing.T) {
|
|||||||
|
|
||||||
func TestMemoryNamespacePrefix(t *testing.T) {
|
func TestMemoryNamespacePrefix(t *testing.T) {
|
||||||
s := store.NewStore()
|
s := store.NewStore()
|
||||||
if err := s.Init(store.Table("some-prefix"), store.Database("some-namespace")); err != nil {
|
if err := s.Init(store.Namespace("some-namespace")); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
basictest(s, t)
|
basictest(s, t)
|
||||||
|
261
store/options.go
261
store/options.go
@@ -28,13 +28,12 @@ type Options struct {
|
|||||||
TLSConfig *tls.Config
|
TLSConfig *tls.Config
|
||||||
// Name specifies store name
|
// Name specifies store name
|
||||||
Name string
|
Name string
|
||||||
// Database specifies store database
|
// Namespace of the records
|
||||||
Database string
|
Namespace string
|
||||||
// Table specifies store table
|
// Addrs contains store address
|
||||||
Table string
|
Addrs []string
|
||||||
// Nodes contains store address
|
//Wrappers store wrapper that called before actual functions
|
||||||
// TODO: replace with Addrs
|
//Wrappers []Wrapper
|
||||||
Nodes []string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOptions creates options struct
|
// NewOptions creates options struct
|
||||||
@@ -90,13 +89,20 @@ func Meter(m meter.Meter) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Name the name
|
// Name the name of the store
|
||||||
func Name(n string) Option {
|
func Name(n string) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.Name = n
|
o.Name = n
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Namespace sets namespace of the store
|
||||||
|
func Namespace(ns string) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Namespace = ns
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Tracer sets the tracer
|
// Tracer sets the tracer
|
||||||
func Tracer(t tracer.Tracer) Option {
|
func Tracer(t tracer.Tracer) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
@@ -104,27 +110,21 @@ func Tracer(t tracer.Tracer) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nodes contains the addresses or other connection information of the backing storage.
|
// Addrs contains the addresses or other connection information of the backing storage.
|
||||||
// For example, an etcd implementation would contain the nodes of the cluster.
|
// For example, an etcd implementation would contain the nodes of the cluster.
|
||||||
// A SQL implementation could contain one or more connection strings.
|
// A SQL implementation could contain one or more connection strings.
|
||||||
func Nodes(a ...string) Option {
|
func Addrs(addrs ...string) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
o.Nodes = a
|
o.Addrs = addrs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Database allows multiple isolated stores to be kept in one backend, if supported.
|
// ReadOptions configures an individual Read operation
|
||||||
func Database(db string) Option {
|
type ReadOptions struct {
|
||||||
return func(o *Options) {
|
// Context holds external options
|
||||||
o.Database = db
|
Context context.Context
|
||||||
}
|
// Namespace holds namespace
|
||||||
}
|
Namespace string
|
||||||
|
|
||||||
// Table is analag for a table in database backends or a key prefix in KV backends
|
|
||||||
func Table(t string) Option {
|
|
||||||
return func(o *Options) {
|
|
||||||
o.Table = t
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReadOptions fills ReadOptions struct with opts slice
|
// NewReadOptions fills ReadOptions struct with opts slice
|
||||||
@@ -136,29 +136,35 @@ func NewReadOptions(opts ...ReadOption) ReadOptions {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadOptions configures an individual Read operation
|
|
||||||
type ReadOptions struct {
|
|
||||||
// Context holds external options
|
|
||||||
Context context.Context
|
|
||||||
// Database holds the database name
|
|
||||||
Database string
|
|
||||||
// Table holds table name
|
|
||||||
Table string
|
|
||||||
// Namespace holds namespace
|
|
||||||
Namespace string
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadOption sets values in ReadOptions
|
// ReadOption sets values in ReadOptions
|
||||||
type ReadOption func(r *ReadOptions)
|
type ReadOption func(r *ReadOptions)
|
||||||
|
|
||||||
// ReadFrom the database and table
|
// ReadContext pass context.Context to ReadOptions
|
||||||
func ReadFrom(database, table string) ReadOption {
|
func ReadContext(ctx context.Context) ReadOption {
|
||||||
return func(r *ReadOptions) {
|
return func(o *ReadOptions) {
|
||||||
r.Database = database
|
o.Context = ctx
|
||||||
r.Table = table
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadNamespace pass namespace to ReadOptions
|
||||||
|
func ReadNamespace(ns string) ReadOption {
|
||||||
|
return func(o *ReadOptions) {
|
||||||
|
o.Namespace = ns
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteOptions configures an individual Write operation
|
||||||
|
type WriteOptions struct {
|
||||||
|
// Context holds external options
|
||||||
|
Context context.Context
|
||||||
|
// Metadata contains additional metadata
|
||||||
|
Metadata metadata.Metadata
|
||||||
|
// Namespace holds namespace
|
||||||
|
Namespace string
|
||||||
|
// TTL specifies key TTL
|
||||||
|
TTL time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
// NewWriteOptions fills WriteOptions struct with opts slice
|
// NewWriteOptions fills WriteOptions struct with opts slice
|
||||||
func NewWriteOptions(opts ...WriteOption) WriteOptions {
|
func NewWriteOptions(opts ...WriteOption) WriteOptions {
|
||||||
options := WriteOptions{}
|
options := WriteOptions{}
|
||||||
@@ -168,47 +174,45 @@ func NewWriteOptions(opts ...WriteOption) WriteOptions {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteOptions configures an individual Write operation
|
|
||||||
type WriteOptions struct {
|
|
||||||
// Context holds external options
|
|
||||||
Context context.Context
|
|
||||||
// Metadata contains additional metadata
|
|
||||||
Metadata metadata.Metadata
|
|
||||||
// Database holds database name
|
|
||||||
Database string
|
|
||||||
// Table holds table name
|
|
||||||
Table string
|
|
||||||
// Namespace holds namespace
|
|
||||||
Namespace string
|
|
||||||
// TTL specifies key TTL
|
|
||||||
TTL time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteOption sets values in WriteOptions
|
// WriteOption sets values in WriteOptions
|
||||||
type WriteOption func(w *WriteOptions)
|
type WriteOption func(w *WriteOptions)
|
||||||
|
|
||||||
// WriteTo the database and table
|
// WriteContext pass context.Context to wirte options
|
||||||
func WriteTo(database, table string) WriteOption {
|
func WriteContext(ctx context.Context) WriteOption {
|
||||||
return func(w *WriteOptions) {
|
return func(o *WriteOptions) {
|
||||||
w.Database = database
|
o.Context = ctx
|
||||||
w.Table = table
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WriteTTL is the time the record expires
|
|
||||||
func WriteTTL(d time.Duration) WriteOption {
|
|
||||||
return func(w *WriteOptions) {
|
|
||||||
w.TTL = d
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteMetadata add metadata.Metadata
|
// WriteMetadata add metadata.Metadata
|
||||||
func WriteMetadata(md metadata.Metadata) WriteOption {
|
func WriteMetadata(md metadata.Metadata) WriteOption {
|
||||||
return func(w *WriteOptions) {
|
return func(o *WriteOptions) {
|
||||||
w.Metadata = metadata.Copy(md)
|
o.Metadata = metadata.Copy(md)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WriteTTL is the time the record expires
|
||||||
|
func WriteTTL(d time.Duration) WriteOption {
|
||||||
|
return func(o *WriteOptions) {
|
||||||
|
o.TTL = d
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteNamespace pass namespace to write options
|
||||||
|
func WriteNamespace(ns string) WriteOption {
|
||||||
|
return func(o *WriteOptions) {
|
||||||
|
o.Namespace = ns
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteOptions configures an individual Delete operation
|
||||||
|
type DeleteOptions struct {
|
||||||
|
// Context holds external options
|
||||||
|
Context context.Context
|
||||||
|
// Namespace holds namespace
|
||||||
|
Namespace string
|
||||||
|
}
|
||||||
|
|
||||||
// NewDeleteOptions fills DeleteOptions struct with opts slice
|
// NewDeleteOptions fills DeleteOptions struct with opts slice
|
||||||
func NewDeleteOptions(opts ...DeleteOption) DeleteOptions {
|
func NewDeleteOptions(opts ...DeleteOption) DeleteOptions {
|
||||||
options := DeleteOptions{}
|
options := DeleteOptions{}
|
||||||
@@ -218,29 +222,33 @@ func NewDeleteOptions(opts ...DeleteOption) DeleteOptions {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteOptions configures an individual Delete operation
|
|
||||||
type DeleteOptions struct {
|
|
||||||
// Context holds external options
|
|
||||||
Context context.Context
|
|
||||||
// Database holds database name
|
|
||||||
Database string
|
|
||||||
// Table holds table name
|
|
||||||
Table string
|
|
||||||
// Namespace holds namespace
|
|
||||||
Namespace string
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteOption sets values in DeleteOptions
|
// DeleteOption sets values in DeleteOptions
|
||||||
type DeleteOption func(d *DeleteOptions)
|
type DeleteOption func(d *DeleteOptions)
|
||||||
|
|
||||||
// DeleteFrom the database and table
|
// DeleteContext pass context.Context to delete options
|
||||||
func DeleteFrom(database, table string) DeleteOption {
|
func DeleteContext(ctx context.Context) DeleteOption {
|
||||||
return func(d *DeleteOptions) {
|
return func(o *DeleteOptions) {
|
||||||
d.Database = database
|
o.Context = ctx
|
||||||
d.Table = table
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteNamespace pass namespace to delete options
|
||||||
|
func DeleteNamespace(ns string) DeleteOption {
|
||||||
|
return func(o *DeleteOptions) {
|
||||||
|
o.Namespace = ns
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListOptions configures an individual List operation
|
||||||
|
type ListOptions struct {
|
||||||
|
Context context.Context
|
||||||
|
Prefix string
|
||||||
|
Suffix string
|
||||||
|
Namespace string
|
||||||
|
Limit uint
|
||||||
|
Offset uint
|
||||||
|
}
|
||||||
|
|
||||||
// NewListOptions fills ListOptions struct with opts slice
|
// NewListOptions fills ListOptions struct with opts slice
|
||||||
func NewListOptions(opts ...ListOption) ListOptions {
|
func NewListOptions(opts ...ListOption) ListOptions {
|
||||||
options := ListOptions{}
|
options := ListOptions{}
|
||||||
@@ -250,59 +258,50 @@ func NewListOptions(opts ...ListOption) ListOptions {
|
|||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListOptions configures an individual List operation
|
|
||||||
type ListOptions struct {
|
|
||||||
Context context.Context
|
|
||||||
Database string
|
|
||||||
Prefix string
|
|
||||||
Suffix string
|
|
||||||
Namespace string
|
|
||||||
Table string
|
|
||||||
Limit uint
|
|
||||||
Offset uint
|
|
||||||
}
|
|
||||||
|
|
||||||
// ListOption sets values in ListOptions
|
// ListOption sets values in ListOptions
|
||||||
type ListOption func(l *ListOptions)
|
type ListOption func(l *ListOptions)
|
||||||
|
|
||||||
// ListFrom the database and table
|
// ListContext pass context.Context to list options
|
||||||
func ListFrom(database, table string) ListOption {
|
func ListContext(ctx context.Context) ListOption {
|
||||||
return func(l *ListOptions) {
|
return func(o *ListOptions) {
|
||||||
l.Database = database
|
o.Context = ctx
|
||||||
l.Table = table
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListPrefix returns all keys that are prefixed with key
|
// ListPrefix returns all keys that are prefixed with key
|
||||||
func ListPrefix(p string) ListOption {
|
func ListPrefix(s string) ListOption {
|
||||||
return func(l *ListOptions) {
|
return func(o *ListOptions) {
|
||||||
l.Prefix = p
|
o.Prefix = s
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListSuffix returns all keys that end with key
|
// ListSuffix returns all keys that end with key
|
||||||
func ListSuffix(s string) ListOption {
|
func ListSuffix(s string) ListOption {
|
||||||
return func(l *ListOptions) {
|
return func(o *ListOptions) {
|
||||||
l.Suffix = s
|
o.Suffix = s
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListLimit limits the number of returned keys to l
|
// ListLimit limits the number of returned keys
|
||||||
func ListLimit(l uint) ListOption {
|
func ListLimit(n uint) ListOption {
|
||||||
return func(lo *ListOptions) {
|
return func(o *ListOptions) {
|
||||||
lo.Limit = l
|
o.Limit = n
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListOffset starts returning responses from o. Use in conjunction with Limit for pagination.
|
// ListOffset use with Limit for pagination
|
||||||
func ListOffset(o uint) ListOption {
|
func ListOffset(n uint) ListOption {
|
||||||
return func(l *ListOptions) {
|
return func(o *ListOptions) {
|
||||||
l.Offset = o
|
o.Offset = n
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExistsOption specifies Exists call options
|
// ListNamespace pass namespace to list options
|
||||||
type ExistsOption func(*ExistsOptions)
|
func ListNamespace(ns string) ListOption {
|
||||||
|
return func(o *ListOptions) {
|
||||||
|
o.Namespace = ns
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ExistsOptions holds options for Exists method
|
// ExistsOptions holds options for Exists method
|
||||||
type ExistsOptions struct {
|
type ExistsOptions struct {
|
||||||
@@ -312,6 +311,9 @@ type ExistsOptions struct {
|
|||||||
Namespace string
|
Namespace string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExistsOption specifies Exists call options
|
||||||
|
type ExistsOption func(*ExistsOptions)
|
||||||
|
|
||||||
// NewExistsOptions helper for Exists method
|
// NewExistsOptions helper for Exists method
|
||||||
func NewExistsOptions(opts ...ExistsOption) ExistsOptions {
|
func NewExistsOptions(opts ...ExistsOption) ExistsOptions {
|
||||||
options := ExistsOptions{
|
options := ExistsOptions{
|
||||||
@@ -322,3 +324,24 @@ func NewExistsOptions(opts ...ExistsOption) ExistsOptions {
|
|||||||
}
|
}
|
||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExistsContext pass context.Context to exist options
|
||||||
|
func ExistsContext(ctx context.Context) ExistsOption {
|
||||||
|
return func(o *ExistsOptions) {
|
||||||
|
o.Context = ctx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ExistsNamespace pass namespace to exist options
|
||||||
|
func ExistsNamespace(ns string) ExistsOption {
|
||||||
|
return func(o *ExistsOptions) {
|
||||||
|
o.Namespace = ns
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WrapStore adds a store Wrapper to a list of options passed into the store
|
||||||
|
//func WrapStore(w Wrapper) Option {
|
||||||
|
// return func(o *Options) {
|
||||||
|
// o.Wrappers = append(o.Wrappers, w)
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
84
store/wrapper.go
Normal file
84
store/wrapper.go
Normal file
@@ -0,0 +1,84 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LogfFunc function used for Logf method
|
||||||
|
//type LogfFunc func(ctx context.Context, level Level, msg string, args ...interface{})
|
||||||
|
|
||||||
|
//type Wrapper interface {
|
||||||
|
// Logf logs message with needed level
|
||||||
|
//Logf(LogfFunc) LogfFunc
|
||||||
|
//}
|
||||||
|
|
||||||
|
type NamespaceStore struct {
|
||||||
|
s Store
|
||||||
|
ns string
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
_ Store = &NamespaceStore{}
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewNamespaceStore(s Store, ns string) Store {
|
||||||
|
return &NamespaceStore{s: s, ns: ns}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *NamespaceStore) Init(opts ...Option) error {
|
||||||
|
return w.s.Init(opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *NamespaceStore) Connect(ctx context.Context) error {
|
||||||
|
return w.s.Connect(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *NamespaceStore) Disconnect(ctx context.Context) error {
|
||||||
|
return w.s.Disconnect(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *NamespaceStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
|
||||||
|
return w.s.Read(ctx, key, val, append(opts, ReadNamespace(w.ns))...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *NamespaceStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
|
||||||
|
return w.s.Write(ctx, key, val, append(opts, WriteNamespace(w.ns))...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *NamespaceStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
|
||||||
|
return w.s.Delete(ctx, key, append(opts, DeleteNamespace(w.ns))...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *NamespaceStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
|
||||||
|
return w.s.Exists(ctx, key, append(opts, ExistsNamespace(w.ns))...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *NamespaceStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
|
||||||
|
return w.s.List(ctx, append(opts, ListNamespace(w.ns))...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *NamespaceStore) Options() Options {
|
||||||
|
return w.s.Options()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *NamespaceStore) Name() string {
|
||||||
|
return w.s.Name()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *NamespaceStore) String() string {
|
||||||
|
return w.s.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
//type NamespaceWrapper struct{}
|
||||||
|
|
||||||
|
//func NewNamespaceWrapper() Wrapper {
|
||||||
|
// return &NamespaceWrapper{}
|
||||||
|
//}
|
||||||
|
|
||||||
|
/*
|
||||||
|
func (w *OmitWrapper) Logf(fn LogfFunc) LogfFunc {
|
||||||
|
return func(ctx context.Context, level Level, msg string, args ...interface{}) {
|
||||||
|
fn(ctx, level, msg, getArgs(args)...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
@@ -102,7 +102,7 @@ type parser struct {
|
|||||||
// topLevelSegments is the target of this parser.
|
// topLevelSegments is the target of this parser.
|
||||||
func (p *parser) topLevelSegments() ([]segment, error) {
|
func (p *parser) topLevelSegments() ([]segment, error) {
|
||||||
if logger.V(logger.TraceLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Debug(context.TODO(), "Parsing %q", p.tokens)
|
logger.Trace(context.TODO(), "Parsing %q", p.tokens)
|
||||||
}
|
}
|
||||||
segs, err := p.segments()
|
segs, err := p.segments()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@@ -63,16 +63,16 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
|
|||||||
}
|
}
|
||||||
|
|
||||||
if version != 1 {
|
if version != 1 {
|
||||||
if logger.V(logger.DebugLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Debug(context.TODO(), "unsupported version: %d", version)
|
logger.Trace(context.TODO(), "unsupported version: %d", version)
|
||||||
}
|
}
|
||||||
return Pattern{}, ErrInvalidPattern
|
return Pattern{}, ErrInvalidPattern
|
||||||
}
|
}
|
||||||
|
|
||||||
l := len(ops)
|
l := len(ops)
|
||||||
if l%2 != 0 {
|
if l%2 != 0 {
|
||||||
if logger.V(logger.DebugLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Debug(context.TODO(), "odd number of ops codes: %d", l)
|
logger.Trace(context.TODO(), "odd number of ops codes: %d", l)
|
||||||
}
|
}
|
||||||
return Pattern{}, ErrInvalidPattern
|
return Pattern{}, ErrInvalidPattern
|
||||||
}
|
}
|
||||||
@@ -141,13 +141,13 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
|
|||||||
vars = append(vars, v)
|
vars = append(vars, v)
|
||||||
stack--
|
stack--
|
||||||
if stack < 0 {
|
if stack < 0 {
|
||||||
if logger.V(logger.DebugLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Trace(context.TODO(), "stack underflow")
|
logger.Trace(context.TODO(), "stack underflow")
|
||||||
}
|
}
|
||||||
return Pattern{}, ErrInvalidPattern
|
return Pattern{}, ErrInvalidPattern
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
if logger.V(logger.DebugLevel) {
|
if logger.V(logger.TraceLevel) {
|
||||||
logger.Trace(context.TODO(), "invalid opcode: %d", op.code)
|
logger.Trace(context.TODO(), "invalid opcode: %d", op.code)
|
||||||
}
|
}
|
||||||
return Pattern{}, ErrInvalidPattern
|
return Pattern{}, ErrInvalidPattern
|
||||||
|
Reference in New Issue
Block a user