Compare commits

...

15 Commits

Author SHA1 Message Date
650d167313 meter: add BuildLabels func that sorts and deletes duplicates
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-21 14:10:20 +03:00
c6ba2a91e6 meter: BuildName func to combine metric name with labels into string
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-21 12:39:59 +03:00
7ece08896f server: use 127.0.0.1:0 if no address provided
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-17 01:57:39 +06:00
dependabot[bot]
57f6f23294 build(deps): bump github.com/google/uuid from 1.2.0 to 1.3.0 (#53)
Bumps [github.com/google/uuid](https://github.com/google/uuid) from 1.2.0 to 1.3.0.
- [Release notes](https://github.com/google/uuid/releases)
- [Commits](https://github.com/google/uuid/compare/v1.2.0...v1.3.0)

---
updated-dependencies:
- dependency-name: github.com/google/uuid
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2021-07-16 00:27:56 +03:00
09e6fa2fed flow: implement new methods, add Async ExecutionOption
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-16 00:17:16 +03:00
10a09a5c6f flow: improve store
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-15 22:56:34 +03:00
b4e5d9462a util/router: move some messages to Trace level
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-15 22:56:34 +03:00
96aa0b6906 store/memory: fix List
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-15 22:53:12 +03:00
f54658830d store/memory: fixup
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-15 12:11:55 +03:00
1e43122660 store/memory: small fixups for flow usage
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-15 11:59:35 +03:00
42800fa247 flow: improve steps handling
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-14 17:12:54 +03:00
5b9c810653 logger: add compile time test for interface compat
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-14 17:12:09 +03:00
c3def24bf4 store: add Wrappers support, create Namespace wrapper
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-14 17:11:37 +03:00
0d1ef31764 client: change AuthToken option signature
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-09 10:47:40 +03:00
d49afa230f logger: add omit logger
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2021-07-05 23:04:20 +03:00
17 changed files with 954 additions and 289 deletions

View File

@@ -96,8 +96,8 @@ type CallOptions struct {
RequestTimeout time.Duration RequestTimeout time.Duration
// DialTimeout dial timeout // DialTimeout dial timeout
DialTimeout time.Duration DialTimeout time.Duration
// AuthToken flag // AuthToken string
AuthToken bool AuthToken string
} }
// Context pass context to client // Context pass context to client
@@ -463,9 +463,9 @@ func WithDialTimeout(d time.Duration) CallOption {
// WithAuthToken is a CallOption which overrides the // WithAuthToken is a CallOption which overrides the
// authorization header with the services own auth token // authorization header with the services own auth token
func WithAuthToken() CallOption { func WithAuthToken(t string) CallOption {
return func(o *CallOptions) { return func(o *CallOptions) {
o.AuthToken = true o.AuthToken = t
} }
} }

View File

@@ -3,12 +3,16 @@ package flow
import ( import (
"context" "context"
"fmt" "fmt"
"path/filepath"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/silas/dag" "github.com/silas/dag"
"github.com/unistack-org/micro/v3/client" "github.com/unistack-org/micro/v3/client"
"github.com/unistack-org/micro/v3/codec" "github.com/unistack-org/micro/v3/codec"
"github.com/unistack-org/micro/v3/logger"
"github.com/unistack-org/micro/v3/metadata"
"github.com/unistack-org/micro/v3/store"
) )
type microFlow struct { type microFlow struct {
@@ -20,27 +24,147 @@ type microWorkflow struct {
g *dag.AcyclicGraph g *dag.AcyclicGraph
init bool init bool
sync.RWMutex sync.RWMutex
opts Options opts Options
steps map[string]Step steps map[string]Step
status Status
} }
func (w *microWorkflow) ID() string { func (w *microWorkflow) ID() string {
return w.id return w.id
} }
func (w *microWorkflow) Steps() [][]Step { func (w *microWorkflow) Steps() ([][]Step, error) {
return w.getSteps("", false)
}
func (w *microWorkflow) Status() Status {
return w.status
}
func (w *microWorkflow) AppendSteps(steps ...Step) error {
w.Lock()
for _, s := range steps {
w.steps[s.String()] = s
w.g.Add(s)
}
for _, dst := range steps {
for _, req := range dst.Requires() {
src, ok := w.steps[req]
if !ok {
return ErrStepNotExists
}
w.g.Connect(dag.BasicEdge(src, dst))
}
}
if err := w.g.Validate(); err != nil {
w.Unlock()
return err
}
w.g.TransitiveReduction()
w.Unlock()
return nil return nil
} }
func (w *microWorkflow) AppendSteps(ctx context.Context, steps ...Step) error { func (w *microWorkflow) RemoveSteps(steps ...Step) error {
// TODO: handle case when some step requires or required by removed step
w.Lock()
for _, s := range steps {
delete(w.steps, s.String())
w.g.Remove(s)
}
for _, dst := range steps {
for _, req := range dst.Requires() {
src, ok := w.steps[req]
if !ok {
return ErrStepNotExists
}
w.g.Connect(dag.BasicEdge(src, dst))
}
}
if err := w.g.Validate(); err != nil {
w.Unlock()
return err
}
w.g.TransitiveReduction()
w.Unlock()
return nil return nil
} }
func (w *microWorkflow) RemoveSteps(ctx context.Context, steps ...Step) error { func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
return nil var steps [][]Step
var root dag.Vertex
var err error
fn := func(n dag.Vertex, idx int) error {
if idx == 0 {
steps = make([][]Step, 1)
steps[0] = make([]Step, 0, 1)
} else if idx >= len(steps) {
tsteps := make([][]Step, idx+1)
copy(tsteps, steps)
steps = tsteps
steps[idx] = make([]Step, 0, 1)
}
steps[idx] = append(steps[idx], n.(Step))
return nil
}
if start != "" {
var ok bool
w.RLock()
root, ok = w.steps[start]
w.RUnlock()
if !ok {
return nil, ErrStepNotExists
}
} else {
root, err = w.g.Root()
if err != nil {
return nil, err
}
}
if reverse {
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
} else {
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
}
if err != nil {
return nil, err
}
return steps, nil
} }
func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) { func (w *microWorkflow) Abort(ctx context.Context, eid string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
}
func (w *microWorkflow) Suspend(ctx context.Context, eid string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
}
func (w *microWorkflow) Resume(ctx context.Context, eid string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
}
func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) {
w.Lock() w.Lock()
if !w.init { if !w.init {
if err := w.g.Validate(); err != nil { if err := w.g.Validate(); err != nil {
@@ -56,74 +180,176 @@ func (w *microWorkflow) Execute(ctx context.Context, req interface{}, opts ...Ex
if err != nil { if err != nil {
return "", err return "", err
} }
eid := uid.String()
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
options := NewExecuteOptions(opts...) options := NewExecuteOptions(opts...)
var steps [][]Step
fn := func(n dag.Vertex, idx int) error {
if idx == 0 {
steps = make([][]Step, 1)
steps[0] = make([]Step, 0, 1)
} else if idx >= len(steps) {
tsteps := make([][]Step, idx+1)
copy(tsteps, steps)
steps = tsteps
steps[idx] = make([]Step, 0, 1)
}
steps[idx] = append(steps[idx], n.(Step))
return nil
}
var root dag.Vertex steps, err := w.getSteps(options.Start, options.Reverse)
if options.Start != "" {
var ok bool
w.RLock()
root, ok = w.steps[options.Start]
w.RUnlock()
if !ok {
return "", ErrStepNotExists
}
} else {
root, err = w.g.Root()
if err != nil {
return "", err
}
}
if options.Reverse {
err = w.g.SortedReverseDepthFirstWalk([]dag.Vertex{root}, fn)
} else {
err = w.g.SortedDepthFirstWalk([]dag.Vertex{root}, fn)
}
if err != nil { if err != nil {
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
return "", err return "", err
} }
var wg sync.WaitGroup var wg sync.WaitGroup
cherr := make(chan error, 1) cherr := make(chan error, 1)
defer close(cherr) chstatus := make(chan Status, 1)
nctx, cancel := context.WithCancel(ctx) nctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
nopts := make([]ExecuteOption, 0, len(opts)+5) nopts := make([]ExecuteOption, 0, len(opts)+5)
nopts = append(nopts, ExecuteClient(w.opts.Client), ExecuteTracer(w.opts.Tracer), ExecuteLogger(w.opts.Logger), ExecuteMeter(w.opts.Meter), ExecuteStore(w.opts.Store))
nopts = append(nopts,
ExecuteClient(w.opts.Client),
ExecuteTracer(w.opts.Tracer),
ExecuteLogger(w.opts.Logger),
ExecuteMeter(w.opts.Meter),
)
nopts = append(nopts, opts...)
done := make(chan struct{})
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
return eid, werr
}
for idx := range steps {
for nidx := range steps[idx] {
cstep := steps[idx][nidx]
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
return eid, werr
}
}
}
go func() { go func() {
for idx := range steps { for idx := range steps {
wg.Add(len(steps[idx]))
for nidx := range steps[idx] { for nidx := range steps[idx] {
go func(step Step) { wStatus := &codec.Frame{}
defer wg.Done() if werr := workflowStore.Read(w.opts.Context, "status", wStatus); werr != nil {
if err = step.Execute(nctx, req, nopts...); err != nil { cherr <- werr
cherr <- err return
cancel() }
if status := StringStatus[string(wStatus.Data)]; status != StatusRunning {
chstatus <- status
return
}
if w.opts.Logger.V(logger.TraceLevel) {
w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx])
}
cstep := steps[idx][nidx]
if len(cstep.Requires()) == 0 {
wg.Add(1)
go func(step Step) {
defer wg.Done()
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil {
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
return
}
rsp, serr := step.Execute(nctx, req, nopts...)
if serr != nil {
step.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr
return
} else {
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
}
}(cstep)
wg.Wait()
} else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil {
cherr <- werr
return
} }
}(steps[idx][nidx]) if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
return
}
rsp, serr := cstep.Execute(nctx, req, nopts...)
if serr != nil {
cstep.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr
return
} else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
cherr <- werr
return
}
}
}
} }
wg.Wait()
} }
cherr <- nil close(done)
}() }()
err = <-cherr if options.Async {
return eid, nil
}
logger.Tracef(ctx, "wait for finish or error")
select {
case <-nctx.Done():
err = nctx.Err()
case cerr := <-cherr:
err = cerr
case <-done:
close(cherr)
case <-chstatus:
close(chstatus)
return uid.String(), nil
}
switch {
case nctx.Err() != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
case err == nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
case err != nil:
if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil {
w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr)
}
break
}
return uid.String(), err return uid.String(), err
} }
@@ -207,6 +433,17 @@ type microCallStep struct {
opts StepOptions opts StepOptions
service string service string
method string method string
rsp *Message
req *Message
status Status
}
func (s *microCallStep) Request() *Message {
return s.req
}
func (s *microCallStep) Response() *Message {
return s.rsp
} }
func (s *microCallStep) ID() string { func (s *microCallStep) ID() string {
@@ -247,23 +484,47 @@ func (s *microCallStep) Hashcode() interface{} {
return s.String() return s.String()
} }
func (s *microCallStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { func (s *microCallStep) GetStatus() Status {
return s.status
}
func (s *microCallStep) SetStatus(status Status) {
s.status = status
}
func (s *microCallStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) {
options := NewExecuteOptions(opts...) options := NewExecuteOptions(opts...)
if options.Client == nil { if options.Client == nil {
return fmt.Errorf("client not set") return nil, ErrMissingClient
} }
rsp := &codec.Frame{} rsp := &codec.Frame{}
copts := []client.CallOption{client.WithRetries(0)} copts := []client.CallOption{client.WithRetries(0)}
if options.Timeout > 0 { if options.Timeout > 0 {
copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout)) copts = append(copts, client.WithRequestTimeout(options.Timeout), client.WithDialTimeout(options.Timeout))
} }
err := options.Client.Call(ctx, options.Client.NewRequest(s.service, s.method, req), rsp) nctx := metadata.NewOutgoingContext(ctx, req.Header)
return err err := options.Client.Call(nctx, options.Client.NewRequest(s.service, s.method, &codec.Frame{Data: req.Body}), rsp)
if err != nil {
return nil, err
}
md, _ := metadata.FromOutgoingContext(nctx)
return &Message{Header: md, Body: rsp.Data}, err
} }
type microPublishStep struct { type microPublishStep struct {
opts StepOptions opts StepOptions
topic string topic string
req *Message
rsp *Message
status Status
}
func (s *microPublishStep) Request() *Message {
return s.req
}
func (s *microPublishStep) Response() *Message {
return s.rsp
} }
func (s *microPublishStep) ID() string { func (s *microPublishStep) ID() string {
@@ -304,13 +565,21 @@ func (s *microPublishStep) Hashcode() interface{} {
return s.String() return s.String()
} }
func (s *microPublishStep) Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error { func (s *microPublishStep) GetStatus() Status {
return nil return s.status
} }
func NewCallStep(service string, method string, opts ...StepOption) Step { func (s *microPublishStep) SetStatus(status Status) {
s.status = status
}
func (s *microPublishStep) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) {
return nil, nil
}
func NewCallStep(service string, name string, method string, opts ...StepOption) Step {
options := NewStepOptions(opts...) options := NewStepOptions(opts...)
return &microCallStep{service: service, method: method, opts: options} return &microCallStep{service: service, method: name + "." + method, opts: options}
} }
func NewPublishStep(topic string, opts ...StepOption) Step { func NewPublishStep(topic string, opts ...StepOption) Step {

View File

@@ -4,12 +4,43 @@ package flow
import ( import (
"context" "context"
"errors" "errors"
"sync"
"sync/atomic"
"github.com/unistack-org/micro/v3/metadata"
) )
var ( var (
ErrStepNotExists = errors.New("step not exists") ErrStepNotExists = errors.New("step not exists")
ErrMissingClient = errors.New("client not set")
) )
// RawMessage is a raw encoded JSON value.
// It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
type RawMessage []byte
// MarshalJSON returns m as the JSON encoding of m.
func (m *RawMessage) MarshalJSON() ([]byte, error) {
if m == nil {
return []byte("null"), nil
}
return *m, nil
}
// UnmarshalJSON sets *m to a copy of data.
func (m *RawMessage) UnmarshalJSON(data []byte) error {
if m == nil {
return errors.New("RawMessage UnmarshalJSON on nil pointer")
}
*m = append((*m)[0:0], data...)
return nil
}
type Message struct {
Header metadata.Metadata
Body RawMessage
}
// Step represents dedicated workflow step // Step represents dedicated workflow step
type Step interface { type Step interface {
// ID returns step id // ID returns step id
@@ -17,7 +48,7 @@ type Step interface {
// Endpoint returns rpc endpoint service_name.service_method or broker topic // Endpoint returns rpc endpoint service_name.service_method or broker topic
Endpoint() string Endpoint() string
// Execute step run // Execute step run
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) error Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error)
// Requires returns dependent steps // Requires returns dependent steps
Requires() []string Requires() []string
// Options returns step options // Options returns step options
@@ -26,20 +57,70 @@ type Step interface {
Require(steps ...Step) error Require(steps ...Step) error
// String // String
String() string String() string
// GetStatus returns step status
GetStatus() Status
// SetStatus sets the step status
SetStatus(Status)
// Request returns step request message
Request() *Message
// Response returns step response message
Response() *Message
} }
type Status int
func (status Status) String() string {
return StatusString[status]
}
const (
StatusPending Status = iota
StatusRunning
StatusFailure
StatusSuccess
StatusAborted
StatusSuspend
)
var (
StatusString = map[Status]string{
StatusPending: "StatusPending",
StatusRunning: "StatusRunning",
StatusFailure: "StatusFailure",
StatusSuccess: "StatusSuccess",
StatusAborted: "StatusAborted",
StatusSuspend: "StatusSuspend",
}
StringStatus = map[string]Status{
"StatusPending": StatusPending,
"StatusRunning": StatusRunning,
"StatusFailure": StatusFailure,
"StatusSuccess": StatusSuccess,
"StatusAborted": StatusAborted,
"StatusSuspend": StatusSuspend,
}
)
// Workflow contains all steps to execute // Workflow contains all steps to execute
type Workflow interface { type Workflow interface {
// ID returns id of the workflow // ID returns id of the workflow
ID() string ID() string
// Steps returns steps slice where parallel steps returned on the same level
Steps() [][]Step
// Execute workflow with args, return execution id and error // Execute workflow with args, return execution id and error
Execute(ctx context.Context, req interface{}, opts ...ExecuteOption) (string, error) Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error)
// RemoveSteps remove steps from workflow // RemoveSteps remove steps from workflow
RemoveSteps(ctx context.Context, steps ...Step) error RemoveSteps(steps ...Step) error
// AppendSteps append steps to workflow // AppendSteps append steps to workflow
AppendSteps(ctx context.Context, steps ...Step) error AppendSteps(steps ...Step) error
// Status returns workflow status
Status() Status
// Steps returns steps slice where parallel steps returned on the same level
Steps() ([][]Step, error)
// Suspend suspends execution
Suspend(ctx context.Context, eid string) error
// Resume resumes execution
Resume(ctx context.Context, eid string) error
// Abort abort execution
Abort(ctx context.Context, eid string) error
} }
// Flow the base interface to interact with workflows // Flow the base interface to interact with workflows
@@ -57,3 +138,15 @@ type Flow interface {
// WorkflowList lists all workflows // WorkflowList lists all workflows
WorkflowList(ctx context.Context) ([]Workflow, error) WorkflowList(ctx context.Context) ([]Workflow, error)
} }
var (
flowMu sync.Mutex
atomicSteps atomic.Value
)
func RegisterStep(step Step) {
flowMu.Lock()
steps, _ := atomicSteps.Load().([]Step)
atomicSteps.Store(append(steps, step))
flowMu.Unlock()
}

View File

@@ -116,8 +116,6 @@ type ExecuteOptions struct {
Logger logger.Logger Logger logger.Logger
// Meter holds the meter // Meter holds the meter
Meter meter.Meter Meter meter.Meter
// Store used for intermediate results
Store store.Store
// Context can be used to abort execution or pass additional opts // Context can be used to abort execution or pass additional opts
Context context.Context Context context.Context
// Start step // Start step
@@ -126,6 +124,8 @@ type ExecuteOptions struct {
Reverse bool Reverse bool
// Timeout for execution // Timeout for execution
Timeout time.Duration Timeout time.Duration
// Async enables async execution
Async bool
} }
type ExecuteOption func(*ExecuteOptions) type ExecuteOption func(*ExecuteOptions)
@@ -154,12 +154,6 @@ func ExecuteMeter(m meter.Meter) ExecuteOption {
} }
} }
func ExecuteStore(s store.Store) ExecuteOption {
return func(o *ExecuteOptions) {
o.Store = s
}
}
func ExecuteContext(ctx context.Context) ExecuteOption { func ExecuteContext(ctx context.Context) ExecuteOption {
return func(o *ExecuteOptions) { return func(o *ExecuteOptions) {
o.Context = ctx o.Context = ctx
@@ -178,8 +172,20 @@ func ExecuteTimeout(td time.Duration) ExecuteOption {
} }
} }
func ExecuteAsync(b bool) ExecuteOption {
return func(o *ExecuteOptions) {
o.Async = b
}
}
func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions { func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions {
options := ExecuteOptions{} options := ExecuteOptions{
Client: client.DefaultClient,
Logger: logger.DefaultLogger,
Tracer: tracer.DefaultTracer,
Meter: meter.DefaultMeter,
Context: context.Background(),
}
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
@@ -196,7 +202,9 @@ type StepOptions struct {
type StepOption func(*StepOptions) type StepOption func(*StepOptions)
func NewStepOptions(opts ...StepOption) StepOptions { func NewStepOptions(opts ...StepOption) StepOptions {
options := StepOptions{Context: context.Background()} options := StepOptions{
Context: context.Background(),
}
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.16
require ( require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/ef-ds/deque v1.0.4 github.com/ef-ds/deque v1.0.4
github.com/google/uuid v1.2.0 github.com/google/uuid v1.3.0
github.com/imdario/mergo v0.3.12 github.com/imdario/mergo v0.3.12
github.com/patrickmn/go-cache v2.1.0+incompatible github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/silas/dag v0.0.0-20210121180416-41cf55125c34 github.com/silas/dag v0.0.0-20210121180416-41cf55125c34

4
go.sum
View File

@@ -2,8 +2,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumC
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI= github.com/ef-ds/deque v1.0.4 h1:iFAZNmveMT9WERAkqLJ+oaABF9AcVQ5AjXem/hroniI=
github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg= github.com/ef-ds/deque v1.0.4/go.mod h1:gXDnTC3yqvBcHbq2lcExjtAcVrOnJCbMcZXmuj8Z4tg=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=

View File

@@ -8,7 +8,8 @@ import (
func TestLogger(t *testing.T) { func TestLogger(t *testing.T) {
ctx := context.TODO() ctx := context.TODO()
l := NewLogger(WithLevel(TraceLevel)) buf := bytes.NewBuffer(nil)
l := NewLogger(WithLevel(TraceLevel), WithOutput(buf))
if err := l.Init(); err != nil { if err := l.Init(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -16,6 +17,18 @@ func TestLogger(t *testing.T) {
l.Warn(ctx, "warn_msg1") l.Warn(ctx, "warn_msg1")
l.Fields(map[string]interface{}{"error": "test"}).Info(ctx, "error message") l.Fields(map[string]interface{}{"error": "test"}).Info(ctx, "error message")
l.Warn(ctx, "first", " ", "second") l.Warn(ctx, "first", " ", "second")
if !bytes.Contains(buf.Bytes(), []byte(`"level":"trace","msg":"trace_msg1"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte(`"warn","msg":"warn_msg1"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte(`"error":"test","level":"info","msg":"error message"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())
}
if !bytes.Contains(buf.Bytes(), []byte(`"level":"warn","msg":"first second"`)) {
t.Fatalf("logger error, buf %s", buf.Bytes())
}
} }
func TestLoggerWrapper(t *testing.T) { func TestLoggerWrapper(t *testing.T) {
@@ -35,3 +48,21 @@ func TestLoggerWrapper(t *testing.T) {
t.Fatalf("omit not works, struct: %v, output: %s", s, buf.Bytes()) t.Fatalf("omit not works, struct: %v, output: %s", s, buf.Bytes())
} }
} }
func TestOmitLoggerWrapper(t *testing.T) {
ctx := context.TODO()
buf := bytes.NewBuffer(nil)
l := NewOmitLogger(NewLogger(WithLevel(TraceLevel), WithOutput(buf)))
if err := l.Init(); err != nil {
t.Fatal(err)
}
type secret struct {
Name string
Passw string `logger:"omit"`
}
s := &secret{Name: "name", Passw: "secret"}
l.Errorf(ctx, "test %#+v", s)
if !bytes.Contains(buf.Bytes(), []byte(`logger.secret{Name:\"name\", Passw:\"\"}"`)) {
t.Fatalf("omit not works, struct: %v, output: %s", s, buf.Bytes())
}
}

View File

@@ -16,10 +16,96 @@ type LogfFunc func(ctx context.Context, level Level, msg string, args ...interfa
type Wrapper interface { type Wrapper interface {
// Log logs message with needed level // Log logs message with needed level
Log(LogFunc) LogFunc Log(LogFunc) LogFunc
// Log(ctx context.Context, level Level, args ...interface{})
// Logf logs message with needed level // Logf logs message with needed level
Logf(LogfFunc) LogfFunc Logf(LogfFunc) LogfFunc
//Logf(ctx context.Context, level Level, msg string, args ...interface{}) }
var (
_ Logger = &OmitLogger{}
)
type OmitLogger struct {
l Logger
}
func NewOmitLogger(l Logger) Logger {
return &OmitLogger{l: l}
}
func (w *OmitLogger) Init(opts ...Option) error {
return w.l.Init(append(opts, WrapLogger(NewOmitWrapper()))...)
}
func (w *OmitLogger) V(level Level) bool {
return w.l.V(level)
}
func (w *OmitLogger) Options() Options {
return w.l.Options()
}
func (w *OmitLogger) Fields(fields map[string]interface{}) Logger {
return w.l.Fields(fields)
}
func (w *OmitLogger) Info(ctx context.Context, args ...interface{}) {
w.l.Info(ctx, args...)
}
func (w *OmitLogger) Trace(ctx context.Context, args ...interface{}) {
w.l.Trace(ctx, args...)
}
func (w *OmitLogger) Debug(ctx context.Context, args ...interface{}) {
w.l.Debug(ctx, args...)
}
func (w *OmitLogger) Warn(ctx context.Context, args ...interface{}) {
w.l.Warn(ctx, args...)
}
func (w *OmitLogger) Error(ctx context.Context, args ...interface{}) {
w.l.Error(ctx, args...)
}
func (w *OmitLogger) Fatal(ctx context.Context, args ...interface{}) {
w.l.Fatal(ctx, args...)
}
func (w *OmitLogger) Infof(ctx context.Context, msg string, args ...interface{}) {
w.l.Infof(ctx, msg, args...)
}
func (w *OmitLogger) Tracef(ctx context.Context, msg string, args ...interface{}) {
w.l.Tracef(ctx, msg, args...)
}
func (w *OmitLogger) Debugf(ctx context.Context, msg string, args ...interface{}) {
w.l.Debugf(ctx, msg, args...)
}
func (w *OmitLogger) Warnf(ctx context.Context, msg string, args ...interface{}) {
w.l.Warnf(ctx, msg, args...)
}
func (w *OmitLogger) Errorf(ctx context.Context, msg string, args ...interface{}) {
w.l.Errorf(ctx, msg, args...)
}
func (w *OmitLogger) Fatalf(ctx context.Context, msg string, args ...interface{}) {
w.l.Fatalf(ctx, msg, args...)
}
func (w *OmitLogger) Log(ctx context.Context, level Level, args ...interface{}) {
w.l.Log(ctx, level, args...)
}
func (w *OmitLogger) Logf(ctx context.Context, level Level, msg string, args ...interface{}) {
w.l.Logf(ctx, level, msg, args...)
}
func (w *OmitLogger) String() string {
return w.l.String()
} }
type OmitWrapper struct{} type OmitWrapper struct{}

View File

@@ -3,8 +3,9 @@ package meter
import ( import (
"io" "io"
"reflect"
"sort" "sort"
"strconv"
"strings"
"time" "time"
) )
@@ -77,36 +78,60 @@ type Summary interface {
UpdateDuration(time.Time) UpdateDuration(time.Time)
} }
// sort labels alphabeticaly by label name
type byKey []string type byKey []string
func (k byKey) Len() int { return len(k) / 2 } func (k byKey) Len() int { return len(k) / 2 }
func (k byKey) Less(i, j int) bool { return k[i*2] < k[j*2] } func (k byKey) Less(i, j int) bool { return k[i*2] < k[j*2] }
func (k byKey) Swap(i, j int) { func (k byKey) Swap(i, j int) {
k[i*2], k[i*2+1], k[j*2], k[j*2+1] = k[j*2], k[j*2+1], k[i*2], k[i*2+1] k[i*2], k[j*2] = k[j*2], k[i*2]
k[i*2+1], k[j*2+1] = k[j*2+1], k[i*2+1]
} }
func Sort(slice *[]string) { // BuildLables used to sort labels and delete duplicates.
bk := byKey(*slice) // Last value wins in case of duplicate label keys.
if bk.Len() <= 1 { func BuildLabels(labels ...string) []string {
return if len(labels)%2 == 1 {
labels = labels[:len(labels)-1]
} }
sort.Sort(bk) sort.Sort(byKey(labels))
v := reflect.ValueOf(slice).Elem() return labels
cnt := 0 }
key := 0
val := 1 // BuildName used to combine metric with labels.
for key < v.Len() { // If labels count is odd, drop last element
if len(bk) > key+2 && bk[key] == bk[key+2] { func BuildName(name string, labels ...string) string {
key += 2 if len(labels)%2 == 1 {
val += 2 labels = labels[:len(labels)-1]
continue }
}
v.Index(cnt).Set(v.Index(key)) sort.Sort(byKey(labels))
cnt++
v.Index(cnt).Set(v.Index(val)) idx := 0
cnt++ for {
key += 2 if labels[idx] == labels[idx+2] {
val += 2 copy(labels[idx:], labels[idx+2:])
} labels = labels[:len(labels)-2]
v.SetLen(cnt) } else {
idx += 2
}
if idx+2 >= len(labels) {
break
}
}
var b strings.Builder
_, _ = b.WriteString(name)
_, _ = b.WriteRune('{')
for idx := 0; idx < len(labels); idx += 2 {
if idx > 0 {
_, _ = b.WriteRune(',')
}
_, _ = b.WriteString(labels[idx])
_, _ = b.WriteString(`=`)
_, _ = b.WriteString(strconv.Quote(labels[idx+1]))
}
_, _ = b.WriteRune('}')
return b.String()
} }

View File

@@ -14,11 +14,53 @@ func TestNoopMeter(t *testing.T) {
cnt.Inc() cnt.Inc()
} }
func TestLabelsSort(t *testing.T) { func testEq(a, b []string) bool {
ls := []string{"server", "http", "register", "mdns", "broker", "broker1", "broker", "broker2", "server", "tcp"} if len(a) != len(b) {
Sort(&ls) return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
if ls[0] != "broker" || ls[1] != "broker2" { func TestBuildLabels(t *testing.T) {
t.Fatalf("sort error: %v", ls) type testData struct {
src []string
dst []string
}
data := []testData{
testData{
src: []string{"zerolabel", "value3", "firstlabel", "value2"},
dst: []string{"firstlabel", "value2", "zerolabel", "value3"},
},
}
for _, d := range data {
if !testEq(d.dst, BuildLabels(d.src...)) {
t.Fatalf("slices not properly sorted: %v %v", d.dst, d.src)
}
}
}
func TestBuildName(t *testing.T) {
data := map[string][]string{
`my_metric{firstlabel="value2",zerolabel="value3"}`: []string{
"my_metric",
"zerolabel", "value3", "firstlabel", "value2",
},
`my_metric{broker="broker2",register="mdns",server="tcp"}`: []string{
"my_metric",
"broker", "broker1", "broker", "broker2", "server", "http", "server", "tcp", "register", "mdns",
},
}
for e, d := range data {
if x := BuildName(d[0], d[1:]...); x != e {
t.Fatalf("expect: %s, result: %s", e, x)
}
} }
} }

View File

@@ -15,8 +15,8 @@ import (
var DefaultServer Server = NewServer() var DefaultServer Server = NewServer()
var ( var (
// DefaultAddress will be used if no address passed // DefaultAddress will be used if no address passed, use secure localhost
DefaultAddress = ":0" DefaultAddress = "127.0.0.1:0"
// DefaultName will be used if no name passed // DefaultName will be used if no name passed
DefaultName = "server" DefaultName = "server"
// DefaultVersion will be used if no version passed // DefaultVersion will be used if no version passed

View File

@@ -36,16 +36,6 @@ func (m *memoryStore) key(prefix, key string) string {
return filepath.Join(prefix, key) return filepath.Join(prefix, key)
} }
func (m *memoryStore) prefix(database, table string) string {
if len(database) == 0 {
database = m.opts.Database
}
if len(table) == 0 {
table = m.opts.Table
}
return filepath.Join(database, table)
}
func (m *memoryStore) exists(prefix, key string) error { func (m *memoryStore) exists(prefix, key string) error {
key = m.key(prefix, key) key = m.key(prefix, key)
@@ -80,15 +70,17 @@ func (m *memoryStore) delete(prefix, key string) {
func (m *memoryStore) list(prefix string, limit, offset uint) []string { func (m *memoryStore) list(prefix string, limit, offset uint) []string {
allItems := m.store.Items() allItems := m.store.Items()
allKeys := make([]string, len(allItems)) allKeys := make([]string, 0, len(allItems))
i := 0
for k := range allItems { for k := range allItems {
if !strings.HasPrefix(k, prefix+"/") { if !strings.HasPrefix(k, prefix) {
continue continue
} }
allKeys[i] = strings.TrimPrefix(k, prefix+"/") k = strings.TrimPrefix(k, prefix)
i++ if k[0] == '/' {
k = k[1:]
}
allKeys = append(allKeys, k)
} }
if limit != 0 || offset != 0 { if limit != 0 || offset != 0 {
@@ -107,7 +99,6 @@ func (m *memoryStore) list(prefix string, limit, offset uint) []string {
} }
return allKeys[offset:end] return allKeys[offset:end]
} }
return allKeys return allKeys
} }
@@ -127,37 +118,48 @@ func (m *memoryStore) Name() string {
} }
func (m *memoryStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error { func (m *memoryStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
prefix := m.prefix(m.opts.Database, m.opts.Table) options := NewExistsOptions(opts...)
return m.exists(prefix, key) if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
return m.exists(options.Namespace, key)
} }
func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error { func (m *memoryStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
readOpts := NewReadOptions(opts...) options := NewReadOptions(opts...)
prefix := m.prefix(readOpts.Database, readOpts.Table) if options.Namespace == "" {
return m.get(prefix, key, val) options.Namespace = m.opts.Namespace
}
return m.get(options.Namespace, key, val)
} }
func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error { func (m *memoryStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
writeOpts := NewWriteOptions(opts...) options := NewWriteOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
if options.TTL == 0 {
options.TTL = cache.NoExpiration
}
prefix := m.prefix(writeOpts.Database, writeOpts.Table) key = m.key(options.Namespace, key)
key = m.key(prefix, key)
buf, err := m.opts.Codec.Marshal(val) buf, err := m.opts.Codec.Marshal(val)
if err != nil { if err != nil {
return err return err
} }
m.store.Set(key, buf, writeOpts.TTL) m.store.Set(key, buf, options.TTL)
return nil return nil
} }
func (m *memoryStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error { func (m *memoryStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
deleteOptions := NewDeleteOptions(opts...) options := NewDeleteOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
prefix := m.prefix(deleteOptions.Database, deleteOptions.Table) m.delete(options.Namespace, key)
m.delete(prefix, key)
return nil return nil
} }
@@ -166,25 +168,27 @@ func (m *memoryStore) Options() Options {
} }
func (m *memoryStore) List(ctx context.Context, opts ...ListOption) ([]string, error) { func (m *memoryStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
listOptions := NewListOptions(opts...) options := NewListOptions(opts...)
if options.Namespace == "" {
options.Namespace = m.opts.Namespace
}
prefix := m.prefix(listOptions.Database, listOptions.Table) keys := m.list(options.Namespace, options.Limit, options.Offset)
keys := m.list(prefix, listOptions.Limit, listOptions.Offset)
if len(listOptions.Prefix) > 0 { if len(options.Prefix) > 0 {
var prefixKeys []string var prefixKeys []string
for _, k := range keys { for _, k := range keys {
if strings.HasPrefix(k, listOptions.Prefix) { if strings.HasPrefix(k, options.Prefix) {
prefixKeys = append(prefixKeys, k) prefixKeys = append(prefixKeys, k)
} }
} }
keys = prefixKeys keys = prefixKeys
} }
if len(listOptions.Suffix) > 0 { if len(options.Suffix) > 0 {
var suffixKeys []string var suffixKeys []string
for _, k := range keys { for _, k := range keys {
if strings.HasSuffix(k, listOptions.Suffix) { if strings.HasSuffix(k, options.Suffix) {
suffixKeys = append(suffixKeys, k) suffixKeys = append(suffixKeys, k)
} }
} }

View File

@@ -9,11 +9,11 @@ import (
) )
func TestMemoryReInit(t *testing.T) { func TestMemoryReInit(t *testing.T) {
s := store.NewStore(store.Table("aaa")) s := store.NewStore(store.Namespace("aaa"))
if err := s.Init(store.Table("")); err != nil { if err := s.Init(store.Namespace("")); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if len(s.Options().Table) > 0 { if len(s.Options().Namespace) > 0 {
t.Error("Init didn't reinitialise the store") t.Error("Init didn't reinitialise the store")
} }
} }
@@ -28,7 +28,7 @@ func TestMemoryBasic(t *testing.T) {
func TestMemoryPrefix(t *testing.T) { func TestMemoryPrefix(t *testing.T) {
s := store.NewStore() s := store.NewStore()
if err := s.Init(store.Table("some-prefix")); err != nil { if err := s.Init(store.Namespace("some-prefix")); err != nil {
t.Fatal(err) t.Fatal(err)
} }
basictest(s, t) basictest(s, t)
@@ -36,7 +36,7 @@ func TestMemoryPrefix(t *testing.T) {
func TestMemoryNamespace(t *testing.T) { func TestMemoryNamespace(t *testing.T) {
s := store.NewStore() s := store.NewStore()
if err := s.Init(store.Database("some-namespace")); err != nil { if err := s.Init(store.Namespace("some-namespace")); err != nil {
t.Fatal(err) t.Fatal(err)
} }
basictest(s, t) basictest(s, t)
@@ -44,7 +44,7 @@ func TestMemoryNamespace(t *testing.T) {
func TestMemoryNamespacePrefix(t *testing.T) { func TestMemoryNamespacePrefix(t *testing.T) {
s := store.NewStore() s := store.NewStore()
if err := s.Init(store.Table("some-prefix"), store.Database("some-namespace")); err != nil { if err := s.Init(store.Namespace("some-namespace")); err != nil {
t.Fatal(err) t.Fatal(err)
} }
basictest(s, t) basictest(s, t)

View File

@@ -28,13 +28,12 @@ type Options struct {
TLSConfig *tls.Config TLSConfig *tls.Config
// Name specifies store name // Name specifies store name
Name string Name string
// Database specifies store database // Namespace of the records
Database string Namespace string
// Table specifies store table // Addrs contains store address
Table string Addrs []string
// Nodes contains store address //Wrappers store wrapper that called before actual functions
// TODO: replace with Addrs //Wrappers []Wrapper
Nodes []string
} }
// NewOptions creates options struct // NewOptions creates options struct
@@ -90,13 +89,20 @@ func Meter(m meter.Meter) Option {
} }
} }
// Name the name // Name the name of the store
func Name(n string) Option { func Name(n string) Option {
return func(o *Options) { return func(o *Options) {
o.Name = n o.Name = n
} }
} }
// Namespace sets namespace of the store
func Namespace(ns string) Option {
return func(o *Options) {
o.Namespace = ns
}
}
// Tracer sets the tracer // Tracer sets the tracer
func Tracer(t tracer.Tracer) Option { func Tracer(t tracer.Tracer) Option {
return func(o *Options) { return func(o *Options) {
@@ -104,27 +110,21 @@ func Tracer(t tracer.Tracer) Option {
} }
} }
// Nodes contains the addresses or other connection information of the backing storage. // Addrs contains the addresses or other connection information of the backing storage.
// For example, an etcd implementation would contain the nodes of the cluster. // For example, an etcd implementation would contain the nodes of the cluster.
// A SQL implementation could contain one or more connection strings. // A SQL implementation could contain one or more connection strings.
func Nodes(a ...string) Option { func Addrs(addrs ...string) Option {
return func(o *Options) { return func(o *Options) {
o.Nodes = a o.Addrs = addrs
} }
} }
// Database allows multiple isolated stores to be kept in one backend, if supported. // ReadOptions configures an individual Read operation
func Database(db string) Option { type ReadOptions struct {
return func(o *Options) { // Context holds external options
o.Database = db Context context.Context
} // Namespace holds namespace
} Namespace string
// Table is analag for a table in database backends or a key prefix in KV backends
func Table(t string) Option {
return func(o *Options) {
o.Table = t
}
} }
// NewReadOptions fills ReadOptions struct with opts slice // NewReadOptions fills ReadOptions struct with opts slice
@@ -136,29 +136,35 @@ func NewReadOptions(opts ...ReadOption) ReadOptions {
return options return options
} }
// ReadOptions configures an individual Read operation
type ReadOptions struct {
// Context holds external options
Context context.Context
// Database holds the database name
Database string
// Table holds table name
Table string
// Namespace holds namespace
Namespace string
}
// ReadOption sets values in ReadOptions // ReadOption sets values in ReadOptions
type ReadOption func(r *ReadOptions) type ReadOption func(r *ReadOptions)
// ReadFrom the database and table // ReadContext pass context.Context to ReadOptions
func ReadFrom(database, table string) ReadOption { func ReadContext(ctx context.Context) ReadOption {
return func(r *ReadOptions) { return func(o *ReadOptions) {
r.Database = database o.Context = ctx
r.Table = table
} }
} }
// ReadNamespace pass namespace to ReadOptions
func ReadNamespace(ns string) ReadOption {
return func(o *ReadOptions) {
o.Namespace = ns
}
}
// WriteOptions configures an individual Write operation
type WriteOptions struct {
// Context holds external options
Context context.Context
// Metadata contains additional metadata
Metadata metadata.Metadata
// Namespace holds namespace
Namespace string
// TTL specifies key TTL
TTL time.Duration
}
// NewWriteOptions fills WriteOptions struct with opts slice // NewWriteOptions fills WriteOptions struct with opts slice
func NewWriteOptions(opts ...WriteOption) WriteOptions { func NewWriteOptions(opts ...WriteOption) WriteOptions {
options := WriteOptions{} options := WriteOptions{}
@@ -168,47 +174,45 @@ func NewWriteOptions(opts ...WriteOption) WriteOptions {
return options return options
} }
// WriteOptions configures an individual Write operation
type WriteOptions struct {
// Context holds external options
Context context.Context
// Metadata contains additional metadata
Metadata metadata.Metadata
// Database holds database name
Database string
// Table holds table name
Table string
// Namespace holds namespace
Namespace string
// TTL specifies key TTL
TTL time.Duration
}
// WriteOption sets values in WriteOptions // WriteOption sets values in WriteOptions
type WriteOption func(w *WriteOptions) type WriteOption func(w *WriteOptions)
// WriteTo the database and table // WriteContext pass context.Context to wirte options
func WriteTo(database, table string) WriteOption { func WriteContext(ctx context.Context) WriteOption {
return func(w *WriteOptions) { return func(o *WriteOptions) {
w.Database = database o.Context = ctx
w.Table = table
}
}
// WriteTTL is the time the record expires
func WriteTTL(d time.Duration) WriteOption {
return func(w *WriteOptions) {
w.TTL = d
} }
} }
// WriteMetadata add metadata.Metadata // WriteMetadata add metadata.Metadata
func WriteMetadata(md metadata.Metadata) WriteOption { func WriteMetadata(md metadata.Metadata) WriteOption {
return func(w *WriteOptions) { return func(o *WriteOptions) {
w.Metadata = metadata.Copy(md) o.Metadata = metadata.Copy(md)
} }
} }
// WriteTTL is the time the record expires
func WriteTTL(d time.Duration) WriteOption {
return func(o *WriteOptions) {
o.TTL = d
}
}
// WriteNamespace pass namespace to write options
func WriteNamespace(ns string) WriteOption {
return func(o *WriteOptions) {
o.Namespace = ns
}
}
// DeleteOptions configures an individual Delete operation
type DeleteOptions struct {
// Context holds external options
Context context.Context
// Namespace holds namespace
Namespace string
}
// NewDeleteOptions fills DeleteOptions struct with opts slice // NewDeleteOptions fills DeleteOptions struct with opts slice
func NewDeleteOptions(opts ...DeleteOption) DeleteOptions { func NewDeleteOptions(opts ...DeleteOption) DeleteOptions {
options := DeleteOptions{} options := DeleteOptions{}
@@ -218,29 +222,33 @@ func NewDeleteOptions(opts ...DeleteOption) DeleteOptions {
return options return options
} }
// DeleteOptions configures an individual Delete operation
type DeleteOptions struct {
// Context holds external options
Context context.Context
// Database holds database name
Database string
// Table holds table name
Table string
// Namespace holds namespace
Namespace string
}
// DeleteOption sets values in DeleteOptions // DeleteOption sets values in DeleteOptions
type DeleteOption func(d *DeleteOptions) type DeleteOption func(d *DeleteOptions)
// DeleteFrom the database and table // DeleteContext pass context.Context to delete options
func DeleteFrom(database, table string) DeleteOption { func DeleteContext(ctx context.Context) DeleteOption {
return func(d *DeleteOptions) { return func(o *DeleteOptions) {
d.Database = database o.Context = ctx
d.Table = table
} }
} }
// DeleteNamespace pass namespace to delete options
func DeleteNamespace(ns string) DeleteOption {
return func(o *DeleteOptions) {
o.Namespace = ns
}
}
// ListOptions configures an individual List operation
type ListOptions struct {
Context context.Context
Prefix string
Suffix string
Namespace string
Limit uint
Offset uint
}
// NewListOptions fills ListOptions struct with opts slice // NewListOptions fills ListOptions struct with opts slice
func NewListOptions(opts ...ListOption) ListOptions { func NewListOptions(opts ...ListOption) ListOptions {
options := ListOptions{} options := ListOptions{}
@@ -250,59 +258,50 @@ func NewListOptions(opts ...ListOption) ListOptions {
return options return options
} }
// ListOptions configures an individual List operation
type ListOptions struct {
Context context.Context
Database string
Prefix string
Suffix string
Namespace string
Table string
Limit uint
Offset uint
}
// ListOption sets values in ListOptions // ListOption sets values in ListOptions
type ListOption func(l *ListOptions) type ListOption func(l *ListOptions)
// ListFrom the database and table // ListContext pass context.Context to list options
func ListFrom(database, table string) ListOption { func ListContext(ctx context.Context) ListOption {
return func(l *ListOptions) { return func(o *ListOptions) {
l.Database = database o.Context = ctx
l.Table = table
} }
} }
// ListPrefix returns all keys that are prefixed with key // ListPrefix returns all keys that are prefixed with key
func ListPrefix(p string) ListOption { func ListPrefix(s string) ListOption {
return func(l *ListOptions) { return func(o *ListOptions) {
l.Prefix = p o.Prefix = s
} }
} }
// ListSuffix returns all keys that end with key // ListSuffix returns all keys that end with key
func ListSuffix(s string) ListOption { func ListSuffix(s string) ListOption {
return func(l *ListOptions) { return func(o *ListOptions) {
l.Suffix = s o.Suffix = s
} }
} }
// ListLimit limits the number of returned keys to l // ListLimit limits the number of returned keys
func ListLimit(l uint) ListOption { func ListLimit(n uint) ListOption {
return func(lo *ListOptions) { return func(o *ListOptions) {
lo.Limit = l o.Limit = n
} }
} }
// ListOffset starts returning responses from o. Use in conjunction with Limit for pagination. // ListOffset use with Limit for pagination
func ListOffset(o uint) ListOption { func ListOffset(n uint) ListOption {
return func(l *ListOptions) { return func(o *ListOptions) {
l.Offset = o o.Offset = n
} }
} }
// ExistsOption specifies Exists call options // ListNamespace pass namespace to list options
type ExistsOption func(*ExistsOptions) func ListNamespace(ns string) ListOption {
return func(o *ListOptions) {
o.Namespace = ns
}
}
// ExistsOptions holds options for Exists method // ExistsOptions holds options for Exists method
type ExistsOptions struct { type ExistsOptions struct {
@@ -312,6 +311,9 @@ type ExistsOptions struct {
Namespace string Namespace string
} }
// ExistsOption specifies Exists call options
type ExistsOption func(*ExistsOptions)
// NewExistsOptions helper for Exists method // NewExistsOptions helper for Exists method
func NewExistsOptions(opts ...ExistsOption) ExistsOptions { func NewExistsOptions(opts ...ExistsOption) ExistsOptions {
options := ExistsOptions{ options := ExistsOptions{
@@ -322,3 +324,24 @@ func NewExistsOptions(opts ...ExistsOption) ExistsOptions {
} }
return options return options
} }
// ExistsContext pass context.Context to exist options
func ExistsContext(ctx context.Context) ExistsOption {
return func(o *ExistsOptions) {
o.Context = ctx
}
}
// ExistsNamespace pass namespace to exist options
func ExistsNamespace(ns string) ExistsOption {
return func(o *ExistsOptions) {
o.Namespace = ns
}
}
// WrapStore adds a store Wrapper to a list of options passed into the store
//func WrapStore(w Wrapper) Option {
// return func(o *Options) {
// o.Wrappers = append(o.Wrappers, w)
// }
//}

84
store/wrapper.go Normal file
View File

@@ -0,0 +1,84 @@
package store
import (
"context"
)
// LogfFunc function used for Logf method
//type LogfFunc func(ctx context.Context, level Level, msg string, args ...interface{})
//type Wrapper interface {
// Logf logs message with needed level
//Logf(LogfFunc) LogfFunc
//}
type NamespaceStore struct {
s Store
ns string
}
var (
_ Store = &NamespaceStore{}
)
func NewNamespaceStore(s Store, ns string) Store {
return &NamespaceStore{s: s, ns: ns}
}
func (w *NamespaceStore) Init(opts ...Option) error {
return w.s.Init(opts...)
}
func (w *NamespaceStore) Connect(ctx context.Context) error {
return w.s.Connect(ctx)
}
func (w *NamespaceStore) Disconnect(ctx context.Context) error {
return w.s.Disconnect(ctx)
}
func (w *NamespaceStore) Read(ctx context.Context, key string, val interface{}, opts ...ReadOption) error {
return w.s.Read(ctx, key, val, append(opts, ReadNamespace(w.ns))...)
}
func (w *NamespaceStore) Write(ctx context.Context, key string, val interface{}, opts ...WriteOption) error {
return w.s.Write(ctx, key, val, append(opts, WriteNamespace(w.ns))...)
}
func (w *NamespaceStore) Delete(ctx context.Context, key string, opts ...DeleteOption) error {
return w.s.Delete(ctx, key, append(opts, DeleteNamespace(w.ns))...)
}
func (w *NamespaceStore) Exists(ctx context.Context, key string, opts ...ExistsOption) error {
return w.s.Exists(ctx, key, append(opts, ExistsNamespace(w.ns))...)
}
func (w *NamespaceStore) List(ctx context.Context, opts ...ListOption) ([]string, error) {
return w.s.List(ctx, append(opts, ListNamespace(w.ns))...)
}
func (w *NamespaceStore) Options() Options {
return w.s.Options()
}
func (w *NamespaceStore) Name() string {
return w.s.Name()
}
func (w *NamespaceStore) String() string {
return w.s.String()
}
//type NamespaceWrapper struct{}
//func NewNamespaceWrapper() Wrapper {
// return &NamespaceWrapper{}
//}
/*
func (w *OmitWrapper) Logf(fn LogfFunc) LogfFunc {
return func(ctx context.Context, level Level, msg string, args ...interface{}) {
fn(ctx, level, msg, getArgs(args)...)
}
}
*/

View File

@@ -102,7 +102,7 @@ type parser struct {
// topLevelSegments is the target of this parser. // topLevelSegments is the target of this parser.
func (p *parser) topLevelSegments() ([]segment, error) { func (p *parser) topLevelSegments() ([]segment, error) {
if logger.V(logger.TraceLevel) { if logger.V(logger.TraceLevel) {
logger.Debug(context.TODO(), "Parsing %q", p.tokens) logger.Trace(context.TODO(), "Parsing %q", p.tokens)
} }
segs, err := p.segments() segs, err := p.segments()
if err != nil { if err != nil {

View File

@@ -63,16 +63,16 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
} }
if version != 1 { if version != 1 {
if logger.V(logger.DebugLevel) { if logger.V(logger.TraceLevel) {
logger.Debug(context.TODO(), "unsupported version: %d", version) logger.Trace(context.TODO(), "unsupported version: %d", version)
} }
return Pattern{}, ErrInvalidPattern return Pattern{}, ErrInvalidPattern
} }
l := len(ops) l := len(ops)
if l%2 != 0 { if l%2 != 0 {
if logger.V(logger.DebugLevel) { if logger.V(logger.TraceLevel) {
logger.Debug(context.TODO(), "odd number of ops codes: %d", l) logger.Trace(context.TODO(), "odd number of ops codes: %d", l)
} }
return Pattern{}, ErrInvalidPattern return Pattern{}, ErrInvalidPattern
} }
@@ -141,13 +141,13 @@ func NewPattern(version int, ops []int, pool []string, verb string, opts ...Patt
vars = append(vars, v) vars = append(vars, v)
stack-- stack--
if stack < 0 { if stack < 0 {
if logger.V(logger.DebugLevel) { if logger.V(logger.TraceLevel) {
logger.Trace(context.TODO(), "stack underflow") logger.Trace(context.TODO(), "stack underflow")
} }
return Pattern{}, ErrInvalidPattern return Pattern{}, ErrInvalidPattern
} }
default: default:
if logger.V(logger.DebugLevel) { if logger.V(logger.TraceLevel) {
logger.Trace(context.TODO(), "invalid opcode: %d", op.code) logger.Trace(context.TODO(), "invalid opcode: %d", op.code)
} }
return Pattern{}, ErrInvalidPattern return Pattern{}, ErrInvalidPattern