diff --git a/logger/micro.go b/logger/micro.go index 9106223f..7e9d8ae5 100644 --- a/logger/micro.go +++ b/logger/micro.go @@ -49,10 +49,18 @@ func (l *defaultLogger) V(level Level) bool { } func (l *defaultLogger) Fields(fields map[string]interface{}) Logger { - l.Lock() - l.opts.Fields = copyFields(fields) - l.Unlock() - return l + nl := &defaultLogger{opts: l.opts, enc: l.enc} + nl.opts.Fields = make(map[string]interface{}, len(l.opts.Fields)+len(fields)) + l.RLock() + for k, v := range l.opts.Fields { + nl.opts.Fields[k] = v + } + l.RUnlock() + + for k, v := range fields { + nl.opts.Fields[k] = v + } + return nl } func copyFields(src map[string]interface{}) map[string]interface{} { @@ -153,7 +161,9 @@ func (l *defaultLogger) Log(ctx context.Context, level Level, args ...interface{ } fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05") - fields["msg"] = fmt.Sprint(args...) + if len(args) > 0 { + fields["msg"] = fmt.Sprint(args...) + } l.RLock() _ = l.enc.Encode(fields) @@ -178,7 +188,7 @@ func (l *defaultLogger) Logf(ctx context.Context, level Level, msg string, args fields["timestamp"] = time.Now().Format("2006-01-02 15:04:05") if len(args) > 0 { fields["msg"] = fmt.Sprintf(msg, args...) - } else { + } else if msg != "" { fields["msg"] = msg } l.RLock() diff --git a/logger/wrapper/wrapper.go b/logger/wrapper/wrapper.go new file mode 100644 index 00000000..86411368 --- /dev/null +++ b/logger/wrapper/wrapper.go @@ -0,0 +1,330 @@ +// Package wrapper provides wrapper for Tracer +package wrapper + +import ( + "context" + + "github.com/unistack-org/micro/v3/client" + "github.com/unistack-org/micro/v3/logger" + "github.com/unistack-org/micro/v3/server" +) + +type lWrapper struct { + client.Client + serverHandler server.HandlerFunc + serverSubscriber server.SubscriberFunc + clientCallFunc client.CallFunc + opts Options +} + +type ClientCallObserver func(context.Context, client.Request, interface{}, []client.CallOption, error) []string +type ClientStreamObserver func(context.Context, client.Request, []client.CallOption, client.Stream, error) []string +type ClientPublishObserver func(context.Context, client.Message, []client.PublishOption, error) []string +type ClientCallFuncObserver func(context.Context, string, client.Request, interface{}, client.CallOptions, error) []string +type ServerHandlerObserver func(context.Context, server.Request, interface{}, error) []string +type ServerSubscriberObserver func(context.Context, server.Message, error) []string + +type Options struct { + Logger logger.Logger + Level logger.Level + Enabled bool + ClientCallObservers []ClientCallObserver + ClientStreamObservers []ClientStreamObserver + ClientPublishObservers []ClientPublishObserver + ClientCallFuncObservers []ClientCallFuncObserver + ServerHandlerObservers []ServerHandlerObserver + ServerSubscriberObservers []ServerSubscriberObserver +} + +type Option func(*Options) + +func NewOptions(opts ...Option) Options { + options := Options{ + Logger: logger.DefaultLogger, + Level: logger.TraceLevel, + ClientCallObservers: []ClientCallObserver{DefaultClientCallObserver}, + ClientStreamObservers: []ClientStreamObserver{DefaultClientStreamObserver}, + ClientPublishObservers: []ClientPublishObserver{DefaultClientPublishObserver}, + ClientCallFuncObservers: []ClientCallFuncObserver{DefaultClientCallFuncObserver}, + ServerHandlerObservers: []ServerHandlerObserver{DefaultServerHandlerObserver}, + ServerSubscriberObservers: []ServerSubscriberObserver{DefaultServerSubscriberObserver}, + } + + for _, o := range opts { + o(&options) + } + + return options +} + +func WithEnabled(b bool) Option { + return func(o *Options) { + o.Enabled = b + } +} + +func WithLevel(l logger.Level) Option { + return func(o *Options) { + o.Level = l + } +} + +func WithLogger(l logger.Logger) Option { + return func(o *Options) { + o.Logger = l + } +} + +func WithClientCallObservers(ob ...ClientCallObserver) Option { + return func(o *Options) { + o.ClientCallObservers = ob + } +} + +func WithClientStreamObservers(ob ...ClientStreamObserver) Option { + return func(o *Options) { + o.ClientStreamObservers = ob + } +} + +func WithClientPublishObservers(ob ...ClientPublishObserver) Option { + return func(o *Options) { + o.ClientPublishObservers = ob + } +} + +func WithClientCallFuncObservers(ob ...ClientCallFuncObserver) Option { + return func(o *Options) { + o.ClientCallFuncObservers = ob + } +} + +func WithServerHandlerObservers(ob ...ServerHandlerObserver) Option { + return func(o *Options) { + o.ServerHandlerObservers = ob + } +} + +func WithServerSubscriberObservers(ob ...ServerSubscriberObserver) Option { + return func(o *Options) { + o.ServerSubscriberObservers = ob + } +} + +func DefaultClientCallObserver(ctx context.Context, req client.Request, rsp interface{}, opts []client.CallOption, err error) []string { + labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} + if err != nil { + labels = append(labels, "error", err.Error()) + } + return labels +} + +func DefaultClientStreamObserver(ctx context.Context, req client.Request, opts []client.CallOption, stream client.Stream, err error) []string { + labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} + if err != nil { + labels = append(labels, "error", err.Error()) + } + return labels +} + +func DefaultClientPublishObserver(ctx context.Context, msg client.Message, opts []client.PublishOption, err error) []string { + labels := []string{"endpoint", msg.Topic()} + if err != nil { + labels = append(labels, "error", err.Error()) + } + return labels +} + +func DefaultServerHandlerObserver(ctx context.Context, req server.Request, rsp interface{}, err error) []string { + labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} + if err != nil { + labels = append(labels, "error", err.Error()) + } + return labels +} + +func DefaultServerSubscriberObserver(ctx context.Context, msg server.Message, err error) []string { + labels := []string{"endpoint", msg.Topic()} + if err != nil { + labels = append(labels, "error", err.Error()) + } + return labels +} + +func DefaultClientCallFuncObserver(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions, err error) []string { + labels := []string{"service", req.Service(), "endpoint", req.Endpoint()} + if err != nil { + labels = append(labels, "error", err.Error()) + } + return labels +} + +func (l *lWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + err := l.Client.Call(ctx, req, rsp, opts...) + + if !l.opts.Enabled { + return err + } + + var labels []string + for _, o := range l.opts.ClientCallObservers { + labels = append(labels, o(ctx, req, rsp, opts, err)...) + } + fields := make(map[string]interface{}, int(len(labels)/2)) + for i := 0; i < len(labels); i += 2 { + fields[labels[i]] = labels[i+1] + } + l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level) + + return err +} + +func (l *lWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) { + stream, err := l.Client.Stream(ctx, req, opts...) + + if !l.opts.Enabled { + return stream, err + } + + var labels []string + for _, o := range l.opts.ClientStreamObservers { + labels = append(labels, o(ctx, req, opts, stream, err)...) + } + fields := make(map[string]interface{}, int(len(labels)/2)) + for i := 0; i < len(labels); i += 2 { + fields[labels[i]] = labels[i+1] + } + l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level) + + return stream, err +} + +func (l *lWrapper) Publish(ctx context.Context, msg client.Message, opts ...client.PublishOption) error { + err := l.Client.Publish(ctx, msg, opts...) + + if !l.opts.Enabled { + return err + } + + var labels []string + for _, o := range l.opts.ClientPublishObservers { + labels = append(labels, o(ctx, msg, opts, err)...) + } + fields := make(map[string]interface{}, int(len(labels)/2)) + for i := 0; i < len(labels); i += 2 { + fields[labels[i]] = labels[i+1] + } + l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level) + + return err +} + +func (l *lWrapper) ServerHandler(ctx context.Context, req server.Request, rsp interface{}) error { + err := l.serverHandler(ctx, req, rsp) + + if !l.opts.Enabled { + return err + } + + var labels []string + for _, o := range l.opts.ServerHandlerObservers { + labels = append(labels, o(ctx, req, rsp, err)...) + } + fields := make(map[string]interface{}, int(len(labels)/2)) + for i := 0; i < len(labels); i += 2 { + fields[labels[i]] = labels[i+1] + } + l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level) + + return err +} + +func (l *lWrapper) ServerSubscriber(ctx context.Context, msg server.Message) error { + err := l.serverSubscriber(ctx, msg) + + if !l.opts.Enabled { + return err + } + + var labels []string + for _, o := range l.opts.ServerSubscriberObservers { + labels = append(labels, o(ctx, msg, err)...) + } + fields := make(map[string]interface{}, int(len(labels)/2)) + for i := 0; i < len(labels); i += 2 { + fields[labels[i]] = labels[i+1] + } + l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level) + + return err +} + +// NewClientWrapper accepts an open tracing Trace and returns a Client Wrapper +func NewClientWrapper(opts ...Option) client.Wrapper { + return func(c client.Client) client.Client { + options := NewOptions() + for _, o := range opts { + o(&options) + } + return &lWrapper{opts: options, Client: c} + } +} + +// NewClientCallWrapper accepts an opentracing Tracer and returns a Call Wrapper +func NewClientCallWrapper(opts ...Option) client.CallWrapper { + return func(h client.CallFunc) client.CallFunc { + options := NewOptions() + for _, o := range opts { + o(&options) + } + + l := &lWrapper{opts: options, clientCallFunc: h} + return l.ClientCallFunc + } +} + +func (l *lWrapper) ClientCallFunc(ctx context.Context, addr string, req client.Request, rsp interface{}, opts client.CallOptions) error { + err := l.clientCallFunc(ctx, addr, req, rsp, opts) + + if !l.opts.Enabled { + return err + } + + var labels []string + for _, o := range l.opts.ClientCallFuncObservers { + labels = append(labels, o(ctx, addr, req, rsp, opts, err)...) + } + fields := make(map[string]interface{}, int(len(labels)/2)) + for i := 0; i < len(labels); i += 2 { + fields[labels[i]] = labels[i+1] + } + l.opts.Logger.Fields(fields).Log(ctx, l.opts.Level) + + return err +} + +// NewServerHandlerWrapper accepts an options and returns a Handler Wrapper +func NewServerHandlerWrapper(opts ...Option) server.HandlerWrapper { + return func(h server.HandlerFunc) server.HandlerFunc { + options := NewOptions() + for _, o := range opts { + o(&options) + } + + l := &lWrapper{opts: options, serverHandler: h} + return l.ServerHandler + } +} + +// NewServerSubscriberWrapper accepts an opentracing Tracer and returns a Subscriber Wrapper +func NewServerSubscriberWrapper(opts ...Option) server.SubscriberWrapper { + return func(h server.SubscriberFunc) server.SubscriberFunc { + options := NewOptions() + for _, o := range opts { + o(&options) + } + + l := &lWrapper{opts: options, serverSubscriber: h} + return l.ServerSubscriber + } +}