minor improvements #149
@ -3,7 +3,6 @@ package flow
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/silas/dag"
|
"github.com/silas/dag"
|
||||||
@ -150,17 +149,17 @@ func (w *microWorkflow) getSteps(start string, reverse bool) ([][]Step, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *microWorkflow) Abort(ctx context.Context, id string) error {
|
func (w *microWorkflow) Abort(ctx context.Context, id string) error {
|
||||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
|
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
|
||||||
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
|
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusAborted.String())})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *microWorkflow) Suspend(ctx context.Context, id string) error {
|
func (w *microWorkflow) Suspend(ctx context.Context, id string) error {
|
||||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
|
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
|
||||||
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
|
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusSuspend.String())})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *microWorkflow) Resume(ctx context.Context, id string) error {
|
func (w *microWorkflow) Resume(ctx context.Context, id string) error {
|
||||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", id))
|
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+id)
|
||||||
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
|
return workflowStore.Write(ctx, "status", &codec.Frame{Data: []byte(StatusRunning.String())})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -181,8 +180,8 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
stepStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("steps", eid))
|
stepStore := store.NewNamespaceStore(w.opts.Store, "steps"+w.opts.Store.Options().Separator+eid)
|
||||||
workflowStore := store.NewNamespaceStore(w.opts.Store, filepath.Join("workflows", eid))
|
workflowStore := store.NewNamespaceStore(w.opts.Store, "workflows"+w.opts.Store.Options().Separator+eid)
|
||||||
|
|
||||||
options := NewExecuteOptions(opts...)
|
options := NewExecuteOptions(opts...)
|
||||||
|
|
||||||
@ -219,7 +218,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
for idx := range steps {
|
for idx := range steps {
|
||||||
for nidx := range steps[idx] {
|
for nidx := range steps[idx] {
|
||||||
cstep := steps[idx][nidx]
|
cstep := steps[idx][nidx]
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil {
|
||||||
return eid, werr
|
return eid, werr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -246,32 +245,32 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(step Step) {
|
go func(step Step) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "req"), req); werr != nil {
|
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil {
|
||||||
cherr <- werr
|
cherr <- werr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||||
cherr <- werr
|
cherr <- werr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rsp, serr := step.Execute(nctx, req, nopts...)
|
rsp, serr := step.Execute(nctx, req, nopts...)
|
||||||
if serr != nil {
|
if serr != nil {
|
||||||
step.SetStatus(StatusFailure)
|
step.SetStatus(StatusFailure)
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
}
|
}
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
}
|
}
|
||||||
cherr <- serr
|
cherr <- serr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "rsp"), rsp); werr != nil {
|
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil {
|
||||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
cherr <- werr
|
cherr <- werr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(step.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
cherr <- werr
|
cherr <- werr
|
||||||
return
|
return
|
||||||
@ -279,32 +278,32 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...Execu
|
|||||||
}(cstep)
|
}(cstep)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
} else {
|
} else {
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "req"), req); werr != nil {
|
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"req", req); werr != nil {
|
||||||
cherr <- werr
|
cherr <- werr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil {
|
||||||
cherr <- werr
|
cherr <- werr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rsp, serr := cstep.Execute(nctx, req, nopts...)
|
rsp, serr := cstep.Execute(nctx, req, nopts...)
|
||||||
if serr != nil {
|
if serr != nil {
|
||||||
cstep.SetStatus(StatusFailure)
|
cstep.SetStatus(StatusFailure)
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
}
|
}
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) {
|
||||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
}
|
}
|
||||||
cherr <- serr
|
cherr <- serr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "rsp"), rsp); werr != nil {
|
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil {
|
||||||
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
w.opts.Logger.Errorf(ctx, "store write error: %v", werr)
|
||||||
cherr <- werr
|
cherr <- werr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if werr := stepStore.Write(ctx, filepath.Join(cstep.ID(), "status"), &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil {
|
||||||
cherr <- werr
|
cherr <- werr
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -49,12 +49,12 @@ func (p *profiler) Start() error {
|
|||||||
// create exit channel
|
// create exit channel
|
||||||
p.exit = make(chan bool)
|
p.exit = make(chan bool)
|
||||||
|
|
||||||
cpuFile := filepath.Join("/tmp", "cpu.pprof")
|
cpuFile := filepath.Join(string(os.PathSeparator)+"tmp", "cpu.pprof")
|
||||||
memFile := filepath.Join("/tmp", "mem.pprof")
|
memFile := filepath.Join(string(os.PathSeparator)+"tmp", "mem.pprof")
|
||||||
|
|
||||||
if len(p.opts.Name) > 0 {
|
if len(p.opts.Name) > 0 {
|
||||||
cpuFile = filepath.Join("/tmp", p.opts.Name+".cpu.pprof")
|
cpuFile = filepath.Join(string(os.PathSeparator)+"tmp", p.opts.Name+".cpu.pprof")
|
||||||
memFile = filepath.Join("/tmp", p.opts.Name+".mem.pprof")
|
memFile = filepath.Join(string(os.PathSeparator)+"tmp", p.opts.Name+".mem.pprof")
|
||||||
}
|
}
|
||||||
|
|
||||||
f1, err := os.Create(cpuFile)
|
f1, err := os.Create(cpuFile)
|
||||||
|
@ -164,7 +164,7 @@ func (m *memory) Register(ctx context.Context, s *Service, opts ...RegisterOptio
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
metadata := make(map[string]string)
|
metadata := make(map[string]string, len(n.Metadata))
|
||||||
|
|
||||||
// make copy of metadata
|
// make copy of metadata
|
||||||
for k, v := range n.Metadata {
|
for k, v := range n.Metadata {
|
||||||
|
@ -2,7 +2,6 @@ package store
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"path/filepath"
|
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@ -33,12 +32,11 @@ type memoryStore struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) key(prefix, key string) string {
|
func (m *memoryStore) key(prefix, key string) string {
|
||||||
return filepath.Join(prefix, key)
|
return prefix + m.opts.Separator + key
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *memoryStore) exists(prefix, key string) error {
|
func (m *memoryStore) exists(prefix, key string) error {
|
||||||
key = m.key(prefix, key)
|
key = m.key(prefix, key)
|
||||||
|
|
||||||
_, found := m.store.Get(key)
|
_, found := m.store.Get(key)
|
||||||
if !found {
|
if !found {
|
||||||
return ErrNotFound
|
return ErrNotFound
|
||||||
|
@ -30,6 +30,8 @@ type Options struct {
|
|||||||
Name string
|
Name string
|
||||||
// Namespace of the records
|
// Namespace of the records
|
||||||
Namespace string
|
Namespace string
|
||||||
|
// Separator used as key parts separator
|
||||||
|
Separator string
|
||||||
// Addrs contains store address
|
// Addrs contains store address
|
||||||
Addrs []string
|
Addrs []string
|
||||||
// Wrappers store wrapper that called before actual functions
|
// Wrappers store wrapper that called before actual functions
|
||||||
@ -46,6 +48,7 @@ func NewOptions(opts ...Option) Options {
|
|||||||
Codec: codec.DefaultCodec,
|
Codec: codec.DefaultCodec,
|
||||||
Tracer: tracer.DefaultTracer,
|
Tracer: tracer.DefaultTracer,
|
||||||
Meter: meter.DefaultMeter,
|
Meter: meter.DefaultMeter,
|
||||||
|
Separator: DefaultSeparator,
|
||||||
}
|
}
|
||||||
for _, o := range opts {
|
for _, o := range opts {
|
||||||
o(&options)
|
o(&options)
|
||||||
@ -98,6 +101,13 @@ func Name(n string) Option {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Separator the value used as key parts separator
|
||||||
|
func Separator(s string) Option {
|
||||||
|
return func(o *Options) {
|
||||||
|
o.Separator = s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Namespace sets namespace of the store
|
// Namespace sets namespace of the store
|
||||||
func Namespace(ns string) Option {
|
func Namespace(ns string) Option {
|
||||||
return func(o *Options) {
|
return func(o *Options) {
|
||||||
|
@ -13,6 +13,8 @@ var (
|
|||||||
ErrInvalidKey = errors.New("invalid key")
|
ErrInvalidKey = errors.New("invalid key")
|
||||||
// DefaultStore is the global default store
|
// DefaultStore is the global default store
|
||||||
DefaultStore = NewStore()
|
DefaultStore = NewStore()
|
||||||
|
// DefaultSeparator is the gloabal default key parts separator
|
||||||
|
DefaultSeparator = "/"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Store is a data storage interface
|
// Store is a data storage interface
|
||||||
|
84
util/text/text.go
Normal file
84
util/text/text.go
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
package text
|
||||||
|
|
||||||
|
|
||||||
|
func DetectEncoding(text string) map[string]int {
|
||||||
|
charsets := map[string]int{
|
||||||
|
"UTF-8": 0,
|
||||||
|
"CP1251": 0,
|
||||||
|
"KOI8-R": 0,
|
||||||
|
"IBM866": 0,
|
||||||
|
"ISO-8859-5": 0,
|
||||||
|
"MAC": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(text) == 0 {
|
||||||
|
return charsets
|
||||||
|
}
|
||||||
|
|
||||||
|
utflower := 7
|
||||||
|
utfupper := 5
|
||||||
|
lowercase := 3
|
||||||
|
uppercase := 1
|
||||||
|
last_simb := 0
|
||||||
|
|
||||||
|
for a := 0; a < len(text); a++ {
|
||||||
|
char := int(text[a])
|
||||||
|
|
||||||
|
// non-russian characters
|
||||||
|
if char < 128 || char > 256 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// UTF-8
|
||||||
|
if (last_simb == 208) && ((char > 143 && char < 176) || char == 129) {
|
||||||
|
charsets["UTF-8"] += (utfupper * 2)
|
||||||
|
}
|
||||||
|
if ((last_simb == 208) && ((char > 175 && char < 192) || char == 145)) || (last_simb == 209 && char > 127 && char < 144) {
|
||||||
|
charsets["UTF-8"] += (utflower * 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CP1251
|
||||||
|
if (char > 223 && char < 256) || char == 184 {
|
||||||
|
charsets["CP1251"] += lowercase
|
||||||
|
}
|
||||||
|
if (char > 191 && char < 224) || char == 168 {
|
||||||
|
charsets["CP1251"] += uppercase
|
||||||
|
}
|
||||||
|
|
||||||
|
// KOI8-R
|
||||||
|
if (char > 191 && char < 224) || char == 163 {
|
||||||
|
charsets["KOI8-R"] += lowercase
|
||||||
|
}
|
||||||
|
if (char > 222 && char < 256) || char == 179 {
|
||||||
|
charsets["KOI8-R"] += uppercase
|
||||||
|
}
|
||||||
|
|
||||||
|
// IBM866
|
||||||
|
if (char > 159 && char < 176) || (char > 223 && char < 241) {
|
||||||
|
charsets["IBM866"] += lowercase
|
||||||
|
}
|
||||||
|
if (char > 127 && char < 160) || char == 241 {
|
||||||
|
charsets["IBM866"] += uppercase
|
||||||
|
}
|
||||||
|
|
||||||
|
// ISO-8859-5
|
||||||
|
if (char > 207 && char < 240) || char == 161 {
|
||||||
|
charsets["ISO-8859-5"] += lowercase
|
||||||
|
}
|
||||||
|
if (char > 175 && char < 208) || char == 241 {
|
||||||
|
charsets["ISO-8859-5"] += uppercase
|
||||||
|
}
|
||||||
|
|
||||||
|
// MAC
|
||||||
|
if char > 221 && char < 255 {
|
||||||
|
charsets["MAC"] += lowercase
|
||||||
|
}
|
||||||
|
if char > 127 && char < 160 {
|
||||||
|
charsets["MAC"] += uppercase
|
||||||
|
}
|
||||||
|
|
||||||
|
last_simb = char
|
||||||
|
}
|
||||||
|
|
||||||
|
return charsets
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user