micro/util/ring/buffer.go

139 lines
2.4 KiB
Go
Raw Permalink Normal View History

2019-12-17 18:38:03 +03:00
// Package ring provides a simple ring buffer for storing local data
package ring // import "go.unistack.org/micro/v3/util/ring"
import (
"sync"
2019-11-27 16:57:19 +03:00
"time"
"go.unistack.org/micro/v3/util/id"
)
// Buffer is ring buffer
type Buffer struct {
sync.RWMutex
2019-12-17 18:38:03 +03:00
streams map[string]*Stream
vals []*Entry
size int
2019-11-27 16:57:19 +03:00
}
// Entry is ring buffer data entry
2019-11-27 16:57:19 +03:00
type Entry struct {
Timestamp time.Time
Value interface{}
}
2019-12-17 18:38:03 +03:00
// Stream is used to stream the buffer
type Stream struct {
// Buffered entries
Entries chan *Entry
// Stop channel
2019-12-17 21:24:00 +03:00
Stop chan bool
// ID of the stream
ID string
2019-11-27 19:02:16 +03:00
}
// Put adds a new value to ring buffer
func (b *Buffer) Put(v interface{}) {
b.Lock()
defer b.Unlock()
// append to values
entry := &Entry{
2019-11-27 16:57:19 +03:00
Value: v,
Timestamp: time.Now(),
}
b.vals = append(b.vals, entry)
// trim if bigger than size required
if len(b.vals) > b.size {
b.vals = b.vals[1:]
}
2019-12-17 18:38:03 +03:00
// send to every stream
for _, stream := range b.streams {
select {
2019-12-17 18:38:03 +03:00
case <-stream.Stop:
delete(b.streams, stream.ID)
2019-12-17 18:38:03 +03:00
close(stream.Entries)
case stream.Entries <- entry:
}
}
}
// Get returns the last n entries
2019-11-27 16:57:19 +03:00
func (b *Buffer) Get(n int) []*Entry {
b.RLock()
defer b.RUnlock()
// reset any invalid values
2020-05-07 12:45:48 +03:00
if n > len(b.vals) || n < 0 {
n = len(b.vals)
}
// create a delta
2020-05-07 12:45:48 +03:00
delta := len(b.vals) - n
// return the delta set
return b.vals[delta:]
}
// Since returns the entries since a specific time
2019-11-27 16:57:19 +03:00
func (b *Buffer) Since(t time.Time) []*Entry {
b.RLock()
defer b.RUnlock()
// return all the values
if t.IsZero() {
return b.vals
}
// if its in the future return nothing
if time.Since(t).Seconds() < 0.0 {
return nil
}
for i, v := range b.vals {
// find the starting point
d := v.Timestamp.Sub(t)
// return the values
if d.Seconds() > 0.0 {
return b.vals[i:]
}
}
return nil
}
// Stream logs from the buffer
2019-12-17 18:38:03 +03:00
// Close the channel when you want to stop
func (b *Buffer) Stream() (<-chan *Entry, chan bool) {
b.Lock()
defer b.Unlock()
entries := make(chan *Entry, 128)
id := id.Must()
2019-12-17 18:38:03 +03:00
stop := make(chan bool)
b.streams[id] = &Stream{
ID: id,
2019-12-17 18:38:03 +03:00
Entries: entries,
Stop: stop,
}
2019-12-17 18:38:03 +03:00
return entries, stop
}
2019-11-27 19:02:16 +03:00
// Size returns the size of the ring buffer
func (b *Buffer) Size() int {
return b.size
}
2019-12-17 18:38:03 +03:00
// New returns a new buffer of the given size
func New(i int) *Buffer {
return &Buffer{
size: i,
streams: make(map[string]*Stream),
}
}