meter: write metrics inside broker implementation #123
							
								
								
									
										2
									
								
								kgo.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								kgo.go
									
									
									
									
									
								
							@@ -55,7 +55,6 @@ type Broker struct {
 | 
				
			|||||||
	c         *kgo.Client
 | 
						c         *kgo.Client
 | 
				
			||||||
	kopts     []kgo.Opt
 | 
						kopts     []kgo.Opt
 | 
				
			||||||
	connected bool
 | 
						connected bool
 | 
				
			||||||
	init      bool
 | 
					 | 
				
			||||||
	sync.RWMutex
 | 
						sync.RWMutex
 | 
				
			||||||
	opts broker.Options
 | 
						opts broker.Options
 | 
				
			||||||
	subs []*subscriber
 | 
						subs []*subscriber
 | 
				
			||||||
@@ -351,7 +350,6 @@ func (k *Broker) String() string {
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func NewBroker(opts ...broker.Option) *Broker {
 | 
					func NewBroker(opts ...broker.Option) *Broker {
 | 
				
			||||||
	rand.Seed(time.Now().Unix())
 | 
					 | 
				
			||||||
	options := broker.NewOptions(opts...)
 | 
						options := broker.NewOptions(opts...)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	kaddrs := options.Addrs
 | 
						kaddrs := options.Addrs
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -3,6 +3,7 @@ package kgo
 | 
				
			|||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/twmb/franz-go/pkg/kgo"
 | 
						"github.com/twmb/franz-go/pkg/kgo"
 | 
				
			||||||
	"go.unistack.org/micro/v4/broker"
 | 
						"go.unistack.org/micro/v4/broker"
 | 
				
			||||||
@@ -163,6 +164,8 @@ func (pc *consumer) consume() {
 | 
				
			|||||||
			return
 | 
								return
 | 
				
			||||||
		case p := <-pc.recs:
 | 
							case p := <-pc.recs:
 | 
				
			||||||
			for _, record := range p.Records {
 | 
								for _, record := range p.Records {
 | 
				
			||||||
 | 
									ts := time.Now()
 | 
				
			||||||
 | 
									pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Inc()
 | 
				
			||||||
				p := eventPool.Get().(*event)
 | 
									p := eventPool.Get().(*event)
 | 
				
			||||||
				p.msg.Header = nil
 | 
									p.msg.Header = nil
 | 
				
			||||||
				p.msg.Body = nil
 | 
									p.msg.Body = nil
 | 
				
			||||||
@@ -179,30 +182,45 @@ func (pc *consumer) consume() {
 | 
				
			|||||||
					p.msg.Body = record.Value
 | 
										p.msg.Body = record.Value
 | 
				
			||||||
				} else {
 | 
									} else {
 | 
				
			||||||
					if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
 | 
										if err := pc.kopts.Codec.Unmarshal(record.Value, p.msg); err != nil {
 | 
				
			||||||
 | 
											pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc()
 | 
				
			||||||
						p.err = err
 | 
											p.err = err
 | 
				
			||||||
						p.msg.Body = record.Value
 | 
											p.msg.Body = record.Value
 | 
				
			||||||
						if eh != nil {
 | 
											if eh != nil {
 | 
				
			||||||
							_ = eh(p)
 | 
												_ = eh(p)
 | 
				
			||||||
 | 
												pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
 | 
				
			||||||
							if p.ack {
 | 
												if p.ack {
 | 
				
			||||||
								pc.c.MarkCommitRecords(record)
 | 
													pc.c.MarkCommitRecords(record)
 | 
				
			||||||
							} else {
 | 
												} else {
 | 
				
			||||||
								eventPool.Put(p)
 | 
													eventPool.Put(p)
 | 
				
			||||||
								pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
 | 
													pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] ErrLostMessage wtf?")
 | 
				
			||||||
								return
 | 
													return
 | 
				
			||||||
							}
 | 
												}
 | 
				
			||||||
							eventPool.Put(p)
 | 
												eventPool.Put(p)
 | 
				
			||||||
 | 
												te := time.Since(ts)
 | 
				
			||||||
 | 
												pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
 | 
				
			||||||
 | 
												pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
 | 
				
			||||||
							continue
 | 
												continue
 | 
				
			||||||
						} else {
 | 
											} else {
 | 
				
			||||||
							if pc.kopts.Logger.V(logger.ErrorLevel) {
 | 
												if pc.kopts.Logger.V(logger.ErrorLevel) {
 | 
				
			||||||
								pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err)
 | 
													pc.kopts.Logger.Errorf(pc.kopts.Context, "[kgo]: failed to unmarshal: %v", err)
 | 
				
			||||||
							}
 | 
												}
 | 
				
			||||||
						}
 | 
											}
 | 
				
			||||||
 | 
											te := time.Since(ts)
 | 
				
			||||||
 | 
											pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
 | 
				
			||||||
 | 
											pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
 | 
				
			||||||
 | 
											pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
 | 
				
			||||||
						eventPool.Put(p)
 | 
											eventPool.Put(p)
 | 
				
			||||||
						pc.kopts.Logger.Infof(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
 | 
											pc.kopts.Logger.Fatalf(pc.kopts.Context, "[kgo] Unmarshal err not handled wtf?")
 | 
				
			||||||
						return
 | 
											return
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				err := pc.handler(p)
 | 
									err := pc.handler(p)
 | 
				
			||||||
 | 
									if err == nil {
 | 
				
			||||||
 | 
										pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "success").Inc()
 | 
				
			||||||
 | 
									} else {
 | 
				
			||||||
 | 
										pc.kopts.Meter.Counter(broker.SubscribeMessageTotal, "endpoint", record.Topic, "status", "failure").Inc()
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									pc.kopts.Meter.Counter(broker.SubscribeMessageInflight, "endpoint", record.Topic).Dec()
 | 
				
			||||||
				if err == nil && pc.opts.AutoAck {
 | 
									if err == nil && pc.opts.AutoAck {
 | 
				
			||||||
					p.ack = true
 | 
										p.ack = true
 | 
				
			||||||
				} else if err != nil {
 | 
									} else if err != nil {
 | 
				
			||||||
@@ -215,6 +233,9 @@ func (pc *consumer) consume() {
 | 
				
			|||||||
						}
 | 
											}
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
									te := time.Since(ts)
 | 
				
			||||||
 | 
									pc.kopts.Meter.Summary(broker.SubscribeMessageLatencyMicroseconds, "endpoint", record.Topic).Update(te.Seconds())
 | 
				
			||||||
 | 
									pc.kopts.Meter.Histogram(broker.SubscribeMessageDurationSeconds, "endpoint", record.Topic).Update(te.Seconds())
 | 
				
			||||||
				if p.ack {
 | 
									if p.ack {
 | 
				
			||||||
					eventPool.Put(p)
 | 
										eventPool.Put(p)
 | 
				
			||||||
					pc.c.MarkCommitRecords(record)
 | 
										pc.c.MarkCommitRecords(record)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user