diff --git a/flow/default.go b/flow/default.go index 8eee3bda..64b70ea8 100644 --- a/flow/default.go +++ b/flow/default.go @@ -190,7 +190,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...optio steps, err := w.getSteps(options.Start, options.Reverse) if err != nil { if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusPending.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + w.opts.Logger.Error(w.opts.Context, "store write error", "error", werr.Error()) } return "", err } @@ -214,7 +214,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...optio done := make(chan struct{}) if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusRunning.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + w.opts.Logger.Error(w.opts.Context, "store write error", "error", werr.Error()) return eid, werr } for idx := range steps { @@ -239,7 +239,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...optio return } if w.opts.Logger.V(logger.TraceLevel) { - w.opts.Logger.Tracef(nctx, "will be executed %v", steps[idx][nidx]) + w.opts.Logger.Trace(nctx, fmt.Sprintf("step will be executed %v", steps[idx][nidx])) } cstep := steps[idx][nidx] // nolint: nestif @@ -259,21 +259,21 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...optio if serr != nil { step.SetStatus(StatusFailure) if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + w.opts.Logger.Error(ctx, "store write error", "error", werr.Error()) } if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + w.opts.Logger.Error(ctx, "store write error", "error", werr.Error()) } cherr <- serr return } if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + w.opts.Logger.Error(ctx, "store write error", "error", werr.Error()) cherr <- werr return } if werr := stepStore.Write(ctx, step.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + w.opts.Logger.Error(ctx, "store write error", "error", werr.Error()) cherr <- werr return } @@ -292,16 +292,16 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...optio if serr != nil { cstep.SetStatus(StatusFailure) if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", serr); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + w.opts.Logger.Error(ctx, "store write error", "error", werr.Error()) } if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil && w.opts.Logger.V(logger.ErrorLevel) { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + w.opts.Logger.Error(ctx, "store write error", "error", werr.Error()) } cherr <- serr return } if werr := stepStore.Write(ctx, cstep.ID()+w.opts.Store.Options().Separator+"rsp", rsp); werr != nil { - w.opts.Logger.Errorf(ctx, "store write error: %v", werr) + w.opts.Logger.Error(ctx, "store write error", "error", werr.Error()) cherr <- werr return } @@ -319,7 +319,7 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...optio return eid, nil } - logger.Tracef(ctx, "wait for finish or error") + w.opts.Logger.Trace(ctx, "wait for finish or error") select { case <-nctx.Done(): err = nctx.Err() @@ -335,15 +335,15 @@ func (w *microWorkflow) Execute(ctx context.Context, req *Message, opts ...optio switch { case nctx.Err() != nil: if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusAborted.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + w.opts.Logger.Error(w.opts.Context, "store write error", "error", werr.Error()) } case err == nil: if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusSuccess.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + w.opts.Logger.Error(w.opts.Context, "store write error", "error", werr.Error()) } case err != nil: if werr := workflowStore.Write(w.opts.Context, "status", &codec.Frame{Data: []byte(StatusFailure.String())}); werr != nil { - w.opts.Logger.Errorf(w.opts.Context, "store error: %v", werr) + w.opts.Logger.Error(w.opts.Context, "store write error", "error", werr.Error()) } } diff --git a/go.mod b/go.mod index 56644783..80403212 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( github.com/google/uuid v1.3.1 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/sync v0.3.0 golang.org/x/sys v0.12.0 google.golang.org/grpc v1.58.2 diff --git a/go.sum b/go.sum index d27f068a..1bc873a5 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,6 @@ github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaR github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5 h1:G/FZtUu7a6NTWl3KUHMV9jkLAh/Rvtf03NWMHaEDl+E= github.com/silas/dag v0.0.0-20220518035006-a7e85ada93c5/go.mod h1:7RTUFBdIRC9nZ7/3RyRNH1bdqIShrDejd1YbLwgPS+I= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= diff --git a/logger/wrapper/wrapper.go b/logger/wrapper/wrapper.go deleted file mode 100644 index eb7cda00..00000000 --- a/logger/wrapper/wrapper.go +++ /dev/null @@ -1,298 +0,0 @@ -// Package wrapper provides wrapper for Logger -package wrapper - -import ( - "context" - "fmt" - - "go.unistack.org/micro/v4/client" - "go.unistack.org/micro/v4/logger" - "go.unistack.org/micro/v4/options" - "go.unistack.org/micro/v4/server" -) - -var ( - // DefaultClientCallObserver called by wrapper in client Call - DefaultClientCallObserver = func(ctx context.Context, req client.Request, rsp interface{}, opts []options.Option, err error) []string { - labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} - if err != nil { - labels = append(labels, "error", err.Error()) - } - return labels - } - - // DefaultClientStreamObserver called by wrapper in client Stream - DefaultClientStreamObserver = func(ctx context.Context, req client.Request, opts []options.Option, stream client.Stream, err error) []string { - labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} - if err != nil { - labels = append(labels, "error", err.Error()) - } - return labels - } - - // DefaultServerHandlerObserver called by wrapper in server Handler - DefaultServerHandlerObserver = func(ctx context.Context, req server.Request, rsp interface{}, err error) []string { - labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} - if err != nil { - labels = append(labels, "error", err.Error()) - } - return labels - } - - // DefaultClientCallFuncObserver called by wrapper in client CallFunc - DefaultClientCallFuncObserver = func(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string { - labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} - if err != nil { - labels = append(labels, "error", err.Error()) - } - return labels - } - - // DefaultSkipEndpoints wrapper not called for this endpoints - DefaultSkipEndpoints = []string{"Meter.Metrics", "Health.Live", "Health.Ready", "Health.Version"} -) - -type lWrapper struct { - client.Client - serverHandler server.HandlerFunc - clientCallFunc client.CallFunc - opts Options -} - -type ( - // ClientCallObserver func signature - ClientCallObserver func(context.Context, client.Request, interface{}, []options.Option, error) []string - // ClientStreamObserver func signature - ClientStreamObserver func(context.Context, client.Request, []options.Option, client.Stream, error) []string - // ClientCallFuncObserver func signature - ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string - // ServerHandlerObserver func signature - ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string -) - -// Options struct for wrapper -type Options struct { - // Logger that used for log - Logger logger.Logger - // ServerHandlerObservers funcs - ServerHandlerObservers []ServerHandlerObserver - // ClientCallObservers funcs - ClientCallObservers []ClientCallObserver - // ClientStreamObservers funcs - ClientStreamObservers []ClientStreamObserver - // ClientCallFuncObservers funcs - ClientCallFuncObservers []ClientCallFuncObserver - // SkipEndpoints - SkipEndpoints []string - // Level for logger - Level logger.Level - // Enabled flag - Enabled bool -} - -// Option func signature -type Option func(*Options) - -// NewOptions creates Options from Option slice -func NewOptions(opts ...Option) Options { - options := Options{ - Logger: logger.DefaultLogger, - Level: logger.TraceLevel, - ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver}, - ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver}, - ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver}, - ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver}, - SkipEndpoints: DefaultSkipEndpoints, - } - - for _, o := range opts { - o(&options) - } - - return options -} - -// WithEnabled enable/diable flag -func WithEnabled(b bool) Option { - return func(o *Options) { - o.Enabled = b - } -} - -// WithLevel log level -func WithLevel(l logger.Level) Option { - return func(o *Options) { - o.Level = l - } -} - -// WithLogger logger -func WithLogger(l logger.Logger) Option { - return func(o *Options) { - o.Logger = l - } -} - -// WithClientCallObservers funcs -func WithClientCallObservers(ob ...ClientCallObserver) Option { - return func(o *Options) { - o.ClientCallObservers = ob - } -} - -// WithClientStreamObservers funcs -func WithClientStreamObservers(ob ...ClientStreamObserver) Option { - return func(o *Options) { - o.ClientStreamObservers = ob - } -} - -// WithClientCallFuncObservers funcs -func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option { - return func(o *Options) { - o.ClientCallFuncObservers = ob - } -} - -// WithServerHandlerObservers funcs -func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option { - return func(o *Options) { - o.ServerHandlerObservers = ob - } -} - -// SkipEndpoins -func SkipEndpoints(eps ...string) Option { - return func(o *Options) { - o.SkipEndpoints = append(o.SkipEndpoints, eps...) - } -} - -func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...options.Option) error { - err := l.Client.Call(ctx, req, rsp, opts...) - - endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) - for _, ep := range l.opts.SkipEndpoints { - if ep == endpoint { - return err - } - } - - if !l.opts.Enabled { - return err - } - - var labels []string - for _, o := range l.opts.ClientCallObservers { - labels = append(labels, o(ctx, req, rsp, opts, err)...) - } - l.opts.Logger.Fields(labels).Log(ctx, l.opts.Level) - - return err -} - -func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...options.Option) (client.Stream, error) { - stream, err := l.Client.Stream(ctx, req, opts...) - - endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) - for _, ep := range l.opts.SkipEndpoints { - if ep == endpoint { - return stream, err - } - } - - if !l.opts.Enabled { - return stream, err - } - - var labels []string - for _, o := range l.opts.ClientStreamObservers { - labels = append(labels, o(ctx, req, opts, stream, err)...) - } - l.opts.Logger.Fields(labels).Log(ctx, l.opts.Level) - - return stream, err -} - -func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error { - err := l.serverHandler(ctx, req, rsp) - - endpoint := req.Endpoint() - for _, ep := range l.opts.SkipEndpoints { - if ep == endpoint { - return err - } - } - - if !l.opts.Enabled { - return err - } - - var labels []string - for _, o := range l.opts.ServerHandlerObservers { - labels = append(labels, o(ctx, req, rsp, err)...) - } - l.opts.Logger.Fields(labels).Log(ctx, l.opts.Level) - - return err -} - -// NewClientWrapper accepts an open options and returns a Client Wrapper -func NewClientWrapper(opts ...Option) client.Wrapper { - return func(c client.Client) client.Client { - options := NewOptions() - for _, o := range opts { - o(&options) - } - return &lWrapper{opts: options, Client: c} - } -} - -// NewClientCallWrapper accepts an options and returns a Call Wrapper -func NewClientCallWrapper(opts ...Option) client.CallWrapper { - return func(h client.CallFunc) client.CallFunc { - options := NewOptions() - for _, o := range opts { - o(&options) - } - - l := &lWrapper{opts: options, clientCallFunc: h} - return l.ClientCallFunc - } -} - -func (l *lWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { - err := l.clientCallFunc(ctx, addr, req, rsp, opts) - - endpoint := fmt.Sprintf("%s.%s", req.Service(), req.Endpoint()) - for _, ep := range l.opts.SkipEndpoints { - if ep == endpoint { - return err - } - } - - if !l.opts.Enabled { - return err - } - - var labels []string - for _, o := range l.opts.ClientCallFuncObservers { - labels = append(labels, o(ctx, addr, req, rsp, opts, err)...) - } - l.opts.Logger.Fields(labels).Log(ctx, l.opts.Level) - - return err -} - -// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper -func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper { - return func(h server.HandlerFunc) server.HandlerFunc { - options := NewOptions() - for _, o := range opts { - o(&options) - } - - l := &lWrapper{opts: options, serverHandler: h} - return l.ServerHandler - } -}