2019-12-17 18:38:03 +03:00
|
|
|
// Package ring provides a simple ring buffer for storing local data
|
2023-04-11 22:20:37 +03:00
|
|
|
package ring // import "go.unistack.org/micro/v4/util/ring"
|
2019-11-26 17:20:45 +03:00
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
2019-11-27 16:57:19 +03:00
|
|
|
"time"
|
2019-11-30 15:39:29 +03:00
|
|
|
|
2023-04-11 22:20:37 +03:00
|
|
|
"go.unistack.org/micro/v4/util/id"
|
2019-11-26 17:20:45 +03:00
|
|
|
)
|
|
|
|
|
2019-11-28 21:08:48 +03:00
|
|
|
// Buffer is ring buffer
|
2019-11-26 17:20:45 +03:00
|
|
|
type Buffer struct {
|
2022-03-30 15:37:02 +03:00
|
|
|
sync.RWMutex
|
2019-12-17 18:38:03 +03:00
|
|
|
streams map[string]*Stream
|
2021-03-06 19:45:13 +03:00
|
|
|
vals []*Entry
|
|
|
|
size int
|
2019-11-27 16:57:19 +03:00
|
|
|
}
|
|
|
|
|
2019-11-28 21:08:48 +03:00
|
|
|
// Entry is ring buffer data entry
|
2019-11-27 16:57:19 +03:00
|
|
|
type Entry struct {
|
|
|
|
Timestamp time.Time
|
2022-03-30 15:37:02 +03:00
|
|
|
Value interface{}
|
2019-11-26 17:20:45 +03:00
|
|
|
}
|
|
|
|
|
2019-12-17 18:38:03 +03:00
|
|
|
// Stream is used to stream the buffer
|
|
|
|
type Stream struct {
|
|
|
|
// Buffered entries
|
|
|
|
Entries chan *Entry
|
|
|
|
// Stop channel
|
2019-12-17 21:24:00 +03:00
|
|
|
Stop chan bool
|
2021-09-30 21:13:13 +03:00
|
|
|
// ID of the stream
|
|
|
|
ID string
|
2019-11-27 19:02:16 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// Put adds a new value to ring buffer
|
2019-11-26 17:20:45 +03:00
|
|
|
func (b *Buffer) Put(v interface{}) {
|
|
|
|
b.Lock()
|
|
|
|
defer b.Unlock()
|
|
|
|
|
|
|
|
// append to values
|
2019-11-30 15:39:29 +03:00
|
|
|
entry := &Entry{
|
2019-11-27 16:57:19 +03:00
|
|
|
Value: v,
|
|
|
|
Timestamp: time.Now(),
|
2019-11-30 15:39:29 +03:00
|
|
|
}
|
|
|
|
b.vals = append(b.vals, entry)
|
2019-11-26 17:20:45 +03:00
|
|
|
|
|
|
|
// trim if bigger than size required
|
|
|
|
if len(b.vals) > b.size {
|
|
|
|
b.vals = b.vals[1:]
|
|
|
|
}
|
2019-11-30 15:39:29 +03:00
|
|
|
|
2019-12-17 18:38:03 +03:00
|
|
|
// send to every stream
|
2019-11-30 15:39:29 +03:00
|
|
|
for _, stream := range b.streams {
|
|
|
|
select {
|
2019-12-17 18:38:03 +03:00
|
|
|
case <-stream.Stop:
|
2021-09-30 21:13:13 +03:00
|
|
|
delete(b.streams, stream.ID)
|
2019-12-17 18:38:03 +03:00
|
|
|
close(stream.Entries)
|
|
|
|
case stream.Entries <- entry:
|
2019-11-30 15:39:29 +03:00
|
|
|
}
|
|
|
|
}
|
2019-11-26 17:20:45 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// Get returns the last n entries
|
2019-11-27 16:57:19 +03:00
|
|
|
func (b *Buffer) Get(n int) []*Entry {
|
2019-11-28 21:08:48 +03:00
|
|
|
b.RLock()
|
|
|
|
defer b.RUnlock()
|
|
|
|
|
2019-11-26 17:20:45 +03:00
|
|
|
// reset any invalid values
|
2020-05-07 12:45:48 +03:00
|
|
|
if n > len(b.vals) || n < 0 {
|
|
|
|
n = len(b.vals)
|
2019-11-26 17:20:45 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// create a delta
|
2020-05-07 12:45:48 +03:00
|
|
|
delta := len(b.vals) - n
|
2019-11-26 17:20:45 +03:00
|
|
|
|
|
|
|
// return the delta set
|
|
|
|
return b.vals[delta:]
|
|
|
|
}
|
|
|
|
|
2021-02-14 16:16:01 +03:00
|
|
|
// Since returns the entries since a specific time
|
2019-11-27 16:57:19 +03:00
|
|
|
func (b *Buffer) Since(t time.Time) []*Entry {
|
|
|
|
b.RLock()
|
|
|
|
defer b.RUnlock()
|
|
|
|
|
|
|
|
// return all the values
|
|
|
|
if t.IsZero() {
|
|
|
|
return b.vals
|
|
|
|
}
|
|
|
|
|
|
|
|
// if its in the future return nothing
|
|
|
|
if time.Since(t).Seconds() < 0.0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, v := range b.vals {
|
|
|
|
// find the starting point
|
|
|
|
d := v.Timestamp.Sub(t)
|
|
|
|
|
|
|
|
// return the values
|
|
|
|
if d.Seconds() > 0.0 {
|
|
|
|
return b.vals[i:]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-11-30 15:39:29 +03:00
|
|
|
// Stream logs from the buffer
|
2019-12-17 18:38:03 +03:00
|
|
|
// Close the channel when you want to stop
|
|
|
|
func (b *Buffer) Stream() (<-chan *Entry, chan bool) {
|
2019-11-30 15:39:29 +03:00
|
|
|
b.Lock()
|
|
|
|
defer b.Unlock()
|
|
|
|
|
|
|
|
entries := make(chan *Entry, 128)
|
2021-08-20 22:40:48 +03:00
|
|
|
id := id.Must()
|
2019-12-17 18:38:03 +03:00
|
|
|
stop := make(chan bool)
|
|
|
|
|
|
|
|
b.streams[id] = &Stream{
|
2021-09-30 21:13:13 +03:00
|
|
|
ID: id,
|
2019-12-17 18:38:03 +03:00
|
|
|
Entries: entries,
|
|
|
|
Stop: stop,
|
2019-11-30 15:39:29 +03:00
|
|
|
}
|
|
|
|
|
2019-12-17 18:38:03 +03:00
|
|
|
return entries, stop
|
2019-11-30 15:39:29 +03:00
|
|
|
}
|
|
|
|
|
2019-11-27 19:02:16 +03:00
|
|
|
// Size returns the size of the ring buffer
|
2019-11-26 17:20:45 +03:00
|
|
|
func (b *Buffer) Size() int {
|
|
|
|
return b.size
|
|
|
|
}
|
2019-12-17 18:38:03 +03:00
|
|
|
|
|
|
|
// New returns a new buffer of the given size
|
|
|
|
func New(i int) *Buffer {
|
|
|
|
return &Buffer{
|
|
|
|
size: i,
|
|
|
|
streams: make(map[string]*Stream),
|
|
|
|
}
|
|
|
|
}
|