minor improvements #149

Merged
vtolstov merged 1 commits from improvements into v3 2022-11-07 14:56:24 +03:00
7 changed files with 125 additions and 32 deletions

View File

@ -3,7 +3,6 @@ package flow
import ( import (
"context" "context"
"fmt" "fmt"
"path/filepath"
"sync" "sync"
"github.com/silas/dag" "github.com/silas/dag"
@ -150,17 +149,17 @@ func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
} }
func (w *microWorkflow) Abort(ctx context.Context, id string) error { func (w *microWorkflow) Abort(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id)) workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())}) return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
} }
func (w *microWorkflow) Suspend(ctx context.Context, id string) error { func (w *microWorkflow) Suspend(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id)) workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())}) return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
} }
func (w *microWorkflow) Resume(ctx context.Context, id string) error { func (w *microWorkflow) Resume(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id)) workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())}) return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
} }
@ -181,8 +180,8 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
return "", err return "", err
} }
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid)) stepStore := store.NewNamespaceStore(w.opts.Store, "steps"+w.opts.Store.Options().Separator+eid)
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid)) workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+eid)
options := NewExecuteOptions(opts...) options := NewExecuteOptions(opts...)
@ -219,7 +218,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
for idx := range steps { for idx := range steps {
for nidx := range steps[idx] { for nidx := range steps[idx] {
cstep := steps[idx][nidx] cstep := steps[idx][nidx]
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil { if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
return eid, werr return eid, werr
} }
} }
@ -246,32 +245,32 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
wg.Add(1) wg.Add(1)
go func(step Step) { go func(step Step) {
defer wg.Done() defer wg.Done()
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil { if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil {
cherr <- werr cherr <- werr
return return
} }
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr cherr <- werr
return return
} }
rsp, serr := step.Execute(nctx, req, nopts...) rsp, serr := step.Execute(nctx, req, nopts...)
if serr != nil { if serr != nil {
step.SetStatus(StatusFailure) step.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr) w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
} }
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr) w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
} }
cherr <- serr cherr <- serr
return return
} }
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil { if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr) w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr cherr <- werr
return return
} }
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr) w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr cherr <- werr
return return
@ -279,32 +278,32 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
}(cstep) }(cstep)
wg.Wait() wg.Wait()
} else { } else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil { if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil {
cherr <- werr cherr <- werr
return return
} }
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr cherr <- werr
return return
} }
rsp, serr := cstep.Execute(nctx, req, nopts...) rsp, serr := cstep.Execute(nctx, req, nopts...)
if serr != nil { if serr != nil {
cstep.SetStatus(StatusFailure) cstep.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr) w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
} }
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr) w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
} }
cherr <- serr cherr <- serr
return return
} }
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil { if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr) w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr cherr <- werr
return return
} }
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
cherr <- werr cherr <- werr
return return
} }

View File

@ -49,12 +49,12 @@ func (p *profiler) Start() error {
// create exit channel // create exit channel
p.exit = make(chan bool) p.exit = make(chan bool)
cpuFile := filepath.Join("/tmp", "cpu.pprof") cpuFile := filepath.Join(string(os.PathSeparator)+"tmp", "cpu.pprof")
memFile := filepath.Join("/tmp", "mem.pprof") memFile := filepath.Join(string(os.PathSeparator)+"tmp", "mem.pprof")
if len(p.opts.Name) > 0 { if len(p.opts.Name) > 0 {
cpuFile = filepath.Join("/tmp", p.opts.Name+".cpu.pprof") cpuFile = filepath.Join(string(os.PathSeparator)+"tmp", p.opts.Name+".cpu.pprof")
memFile = filepath.Join("/tmp", p.opts.Name+".mem.pprof") memFile = filepath.Join(string(os.PathSeparator)+"tmp", p.opts.Name+".mem.pprof")
} }
f1, err := os.Create(cpuFile) f1, err := os.Create(cpuFile)

View File

@ -164,7 +164,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
continue continue
} }
metadata := make(map[string]string) metadata := make(map[string]string, len(n.Metadata))
// make copy of metadata // make copy of metadata
for k, v := range n.Metadata { for k, v := range n.Metadata {

View File

@ -2,7 +2,6 @@ package store
import ( import (
"context" "context"
"path/filepath"
"sort" "sort"
"strings" "strings"
"time" "time"
@ -33,12 +32,11 @@ type memoryStore struct {
} }
func (m *memoryStore) key(prefix, key string) string { func (m *memoryStore) key(prefix, key string) string {
return filepath.Join(prefix, key) return prefix + m.opts.Separator + key
} }
func (m *memoryStore) exists(prefix, key string) error { func (m *memoryStore) exists(prefix, key string) error {
key = m.key(prefix, key) key = m.key(prefix, key)
_, found := m.store.Get(key) _, found := m.store.Get(key)
if !found { if !found {
return ErrNotFound return ErrNotFound

View File

@ -30,6 +30,8 @@ type Options struct {
Name string Name string
// Namespace of the records // Namespace of the records
Namespace string Namespace string
// Separator used as key parts separator
Separator string
// Addrs contains store address // Addrs contains store address
Addrs []string Addrs []string
// Wrappers store wrapper that called before actual functions // Wrappers store wrapper that called before actual functions
@ -46,6 +48,7 @@ func NewOptions(opts ...Option) Options {
Codec: codec.DefaultCodec, Codec: codec.DefaultCodec,
Tracer: tracer.DefaultTracer, Tracer: tracer.DefaultTracer,
Meter: meter.DefaultMeter, Meter: meter.DefaultMeter,
Separator: DefaultSeparator,
} }
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
@ -98,6 +101,13 @@ func Name(n string) Option {
} }
} }
// Separator the value used as key parts separator
func Separator(s string) Option {
return func(o *Options) {
o.Separator = s
}
}
// Namespace sets namespace of the store // Namespace sets namespace of the store
func Namespace(ns string) Option { func Namespace(ns string) Option {
return func(o *Options) { return func(o *Options) {

View File

@ -13,6 +13,8 @@ var (
ErrInvalidKey = errors.New("invalid key") ErrInvalidKey = errors.New("invalid key")
// DefaultStore is the global default store // DefaultStore is the global default store
DefaultStore = NewStore() DefaultStore = NewStore()
// DefaultSeparator is the gloabal default key parts separator
DefaultSeparator = "/"
) )
// Store is a data storage interface // Store is a data storage interface

84
util/text/text.go Normal file
View File

@ -0,0 +1,84 @@
package text
func DetectEncoding(text string) map[string]int {
charsets := map[string]int{
"UTF-8": 0,
"CP1251": 0,
"KOI8-R": 0,
"IBM866": 0,
"ISO-8859-5": 0,
"MAC": 0,
}
if len(text) == 0 {
return charsets
}
utflower := 7
utfupper := 5
lowercase := 3
uppercase := 1
last_simb := 0
for a := 0; a < len(text); a++ {
char := int(text[a])
// non-russian characters
if char < 128 || char > 256 {
continue
}
// UTF-8
if (last_simb == 208) && ((char > 143 && char < 176) || char == 129) {
charsets["UTF-8"] += (utfupper * 2)
}
if ((last_simb == 208) && ((char > 175 && char < 192) || char == 145)) || (last_simb == 209 && char > 127 && char < 144) {
charsets["UTF-8"] += (utflower * 2)
}
// CP1251
if (char > 223 && char < 256) || char == 184 {
charsets["CP1251"] += lowercase
}
if (char > 191 && char < 224) || char == 168 {
charsets["CP1251"] += uppercase
}
// KOI8-R
if (char > 191 && char < 224) || char == 163 {
charsets["KOI8-R"] += lowercase
}
if (char > 222 && char < 256) || char == 179 {
charsets["KOI8-R"] += uppercase
}
// IBM866
if (char > 159 && char < 176) || (char > 223 && char < 241) {
charsets["IBM866"] += lowercase
}
if (char > 127 && char < 160) || char == 241 {
charsets["IBM866"] += uppercase
}
// ISO-8859-5
if (char > 207 && char < 240) || char == 161 {
charsets["ISO-8859-5"] += lowercase
}
if (char > 175 && char < 208) || char == 241 {
charsets["ISO-8859-5"] += uppercase
}
// MAC
if char > 221 && char < 255 {
charsets["MAC"] += lowercase
}
if char > 127 && char < 160 {
charsets["MAC"] += uppercase
}
last_simb = char
}
return charsets
}