minor improvements #149

Merged
vtolstov merged 1 commits from improvements into v3 2022-11-07 14:56:24 +03:00
7 changed files with 125 additions and 32 deletions

View File

@ -3,7 +3,6 @@ package flow
import (
"context"
"fmt"
"path/filepath"
"sync"
"github.com/silas/dag"
@ -150,17 +149,17 @@ func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
}
func (w *microWorkflow) Abort(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
}
func (w *microWorkflow) Suspend(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
}
func (w *microWorkflow) Resume(ctx context.Context, id string) error {
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
}
@ -181,8 +180,8 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
return "", err
}
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
stepStore := store.NewNamespaceStore(w.opts.Store, "steps"+w.opts.Store.Options().Separator+eid)
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+eid)
options := NewExecuteOptions(opts...)
@ -219,7 +218,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
for idx := range steps {
for nidx := range steps[idx] {
cstep := steps[idx][nidx]
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
return eid, werr
}
}
@ -246,32 +245,32 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
wg.Add(1)
go func(step Step) {
defer wg.Done()
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil {
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
return
}
rsp, serr := step.Execute(nctx, req, nopts...)
if serr != nil {
step.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
@ -279,32 +278,32 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
}(cstep)
wg.Wait()
} else {
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil {
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
cherr <- werr
return
}
rsp, serr := cstep.Execute(nctx, req, nopts...)
if serr != nil {
cstep.SetStatus(StatusFailure)
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
}
cherr <- serr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil {
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
cherr <- werr
return
}
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
cherr <- werr
return
}

View File

@ -49,12 +49,12 @@ func (p *profiler) Start() error {
// create exit channel
p.exit = make(chan bool)
cpuFile := filepath.Join("/tmp", "cpu.pprof")
memFile := filepath.Join("/tmp", "mem.pprof")
cpuFile := filepath.Join(string(os.PathSeparator)+"tmp", "cpu.pprof")
memFile := filepath.Join(string(os.PathSeparator)+"tmp", "mem.pprof")
if len(p.opts.Name) > 0 {
cpuFile = filepath.Join("/tmp", p.opts.Name+".cpu.pprof")
memFile = filepath.Join("/tmp", p.opts.Name+".mem.pprof")
cpuFile = filepath.Join(string(os.PathSeparator)+"tmp", p.opts.Name+".cpu.pprof")
memFile = filepath.Join(string(os.PathSeparator)+"tmp", p.opts.Name+".mem.pprof")
}
f1, err := os.Create(cpuFile)

View File

@ -164,7 +164,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
continue
}
metadata := make(map[string]string)
metadata := make(map[string]string, len(n.Metadata))
// make copy of metadata
for k, v := range n.Metadata {

View File

@ -2,7 +2,6 @@ package store
import (
"context"
"path/filepath"
"sort"
"strings"
"time"
@ -33,12 +32,11 @@ type memoryStore struct {
}
func (m *memoryStore) key(prefix, key string) string {
return filepath.Join(prefix, key)
return prefix + m.opts.Separator + key
}
func (m *memoryStore) exists(prefix, key string) error {
key = m.key(prefix, key)
_, found := m.store.Get(key)
if !found {
return ErrNotFound

View File

@ -30,6 +30,8 @@ type Options struct {
Name string
// Namespace of the records
Namespace string
// Separator used as key parts separator
Separator string
// Addrs contains store address
Addrs []string
// Wrappers store wrapper that called before actual functions
@ -46,6 +48,7 @@ func NewOptions(opts ...Option) Options {
Codec: codec.DefaultCodec,
Tracer: tracer.DefaultTracer,
Meter: meter.DefaultMeter,
Separator: DefaultSeparator,
}
for _, o := range opts {
o(&options)
@ -98,6 +101,13 @@ func Name(n string) Option {
}
}
// Separator the value used as key parts separator
func Separator(s string) Option {
return func(o *Options) {
o.Separator = s
}
}
// Namespace sets namespace of the store
func Namespace(ns string) Option {
return func(o *Options) {

View File

@ -13,6 +13,8 @@ var (
ErrInvalidKey = errors.New("invalid key")
// DefaultStore is the global default store
DefaultStore = NewStore()
// DefaultSeparator is the gloabal default key parts separator
DefaultSeparator = "/"
)
// Store is a data storage interface

84
util/text/text.go Normal file
View File

@ -0,0 +1,84 @@
package text
func DetectEncoding(text string) map[string]int {
charsets := map[string]int{
"UTF-8": 0,
"CP1251": 0,
"KOI8-R": 0,
"IBM866": 0,
"ISO-8859-5": 0,
"MAC": 0,
}
if len(text) == 0 {
return charsets
}
utflower := 7
utfupper := 5
lowercase := 3
uppercase := 1
last_simb := 0
for a := 0; a < len(text); a++ {
char := int(text[a])
// non-russian characters
if char < 128 || char > 256 {
continue
}
// UTF-8
if (last_simb == 208) && ((char > 143 && char < 176) || char == 129) {
charsets["UTF-8"] += (utfupper * 2)
}
if ((last_simb == 208) && ((char > 175 && char < 192) || char == 145)) || (last_simb == 209 && char > 127 && char < 144) {
charsets["UTF-8"] += (utflower * 2)
}
// CP1251
if (char > 223 && char < 256) || char == 184 {
charsets["CP1251"] += lowercase
}
if (char > 191 && char < 224) || char == 168 {
charsets["CP1251"] += uppercase
}
// KOI8-R
if (char > 191 && char < 224) || char == 163 {
charsets["KOI8-R"] += lowercase
}
if (char > 222 && char < 256) || char == 179 {
charsets["KOI8-R"] += uppercase
}
// IBM866
if (char > 159 && char < 176) || (char > 223 && char < 241) {
charsets["IBM866"] += lowercase
}
if (char > 127 && char < 160) || char == 241 {
charsets["IBM866"] += uppercase
}
// ISO-8859-5
if (char > 207 && char < 240) || char == 161 {
charsets["ISO-8859-5"] += lowercase
}
if (char > 175 && char < 208) || char == 241 {
charsets["ISO-8859-5"] += uppercase
}
// MAC
if char > 221 && char < 255 {
charsets["MAC"] += lowercase
}
if char > 127 && char < 160 {
charsets["MAC"] += uppercase
}
last_simb = char
}
return charsets
}