139 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			139 lines
		
	
	
		
			2.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Package ring provides a simple ring buffer for storing local data
 | 
						|
package ring // import "go.unistack.org/micro/v3/util/ring"
 | 
						|
 | 
						|
import (
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"go.unistack.org/micro/v3/util/id"
 | 
						|
)
 | 
						|
 | 
						|
// Buffer is ring buffer
 | 
						|
type Buffer struct {
 | 
						|
	streams map[string]*Stream
 | 
						|
	vals    []*Entry
 | 
						|
	size    int
 | 
						|
	sync.RWMutex
 | 
						|
}
 | 
						|
 | 
						|
// Entry is ring buffer data entry
 | 
						|
type Entry struct {
 | 
						|
	Value     interface{}
 | 
						|
	Timestamp time.Time
 | 
						|
}
 | 
						|
 | 
						|
// Stream is used to stream the buffer
 | 
						|
type Stream struct {
 | 
						|
	// Buffered entries
 | 
						|
	Entries chan *Entry
 | 
						|
	// Stop channel
 | 
						|
	Stop chan bool
 | 
						|
	// ID of the stream
 | 
						|
	ID string
 | 
						|
}
 | 
						|
 | 
						|
// Put adds a new value to ring buffer
 | 
						|
func (b *Buffer) Put(v interface{}) {
 | 
						|
	b.Lock()
 | 
						|
	defer b.Unlock()
 | 
						|
 | 
						|
	// append to values
 | 
						|
	entry := &Entry{
 | 
						|
		Value:     v,
 | 
						|
		Timestamp: time.Now(),
 | 
						|
	}
 | 
						|
	b.vals = append(b.vals, entry)
 | 
						|
 | 
						|
	// trim if bigger than size required
 | 
						|
	if len(b.vals) > b.size {
 | 
						|
		b.vals = b.vals[1:]
 | 
						|
	}
 | 
						|
 | 
						|
	// send to every stream
 | 
						|
	for _, stream := range b.streams {
 | 
						|
		select {
 | 
						|
		case <-stream.Stop:
 | 
						|
			delete(b.streams, stream.ID)
 | 
						|
			close(stream.Entries)
 | 
						|
		case stream.Entries <- entry:
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Get returns the last n entries
 | 
						|
func (b *Buffer) Get(n int) []*Entry {
 | 
						|
	b.RLock()
 | 
						|
	defer b.RUnlock()
 | 
						|
 | 
						|
	// reset any invalid values
 | 
						|
	if n > len(b.vals) || n < 0 {
 | 
						|
		n = len(b.vals)
 | 
						|
	}
 | 
						|
 | 
						|
	// create a delta
 | 
						|
	delta := len(b.vals) - n
 | 
						|
 | 
						|
	// return the delta set
 | 
						|
	return b.vals[delta:]
 | 
						|
}
 | 
						|
 | 
						|
// Since returns the entries since a specific time
 | 
						|
func (b *Buffer) Since(t time.Time) []*Entry {
 | 
						|
	b.RLock()
 | 
						|
	defer b.RUnlock()
 | 
						|
 | 
						|
	// return all the values
 | 
						|
	if t.IsZero() {
 | 
						|
		return b.vals
 | 
						|
	}
 | 
						|
 | 
						|
	// if its in the future return nothing
 | 
						|
	if time.Since(t).Seconds() < 0.0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
 | 
						|
	for i, v := range b.vals {
 | 
						|
		// find the starting point
 | 
						|
		d := v.Timestamp.Sub(t)
 | 
						|
 | 
						|
		// return the values
 | 
						|
		if d.Seconds() > 0.0 {
 | 
						|
			return b.vals[i:]
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Stream logs from the buffer
 | 
						|
// Close the channel when you want to stop
 | 
						|
func (b *Buffer) Stream() (<-chan *Entry, chan bool) {
 | 
						|
	b.Lock()
 | 
						|
	defer b.Unlock()
 | 
						|
 | 
						|
	entries := make(chan *Entry, 128)
 | 
						|
	id := id.Must()
 | 
						|
	stop := make(chan bool)
 | 
						|
 | 
						|
	b.streams[id] = &Stream{
 | 
						|
		ID:      id,
 | 
						|
		Entries: entries,
 | 
						|
		Stop:    stop,
 | 
						|
	}
 | 
						|
 | 
						|
	return entries, stop
 | 
						|
}
 | 
						|
 | 
						|
// Size returns the size of the ring buffer
 | 
						|
func (b *Buffer) Size() int {
 | 
						|
	return b.size
 | 
						|
}
 | 
						|
 | 
						|
// New returns a new buffer of the given size
 | 
						|
func New(i int) *Buffer {
 | 
						|
	return &Buffer{
 | 
						|
		size:    i,
 | 
						|
		streams: make(map[string]*Stream),
 | 
						|
	}
 | 
						|
}
 |