All checks were successful
		
		
	
	test / test (push) Successful in 42s
				
			## Pull Request template Please, go through these steps before clicking submit on this PR. 1. Give a descriptive title to your PR. 2. Provide a description of your changes. 3. Make sure you have some relevant tests. 4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable). **PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING** Reviewed-on: #369 Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru> Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
		
			
				
	
	
		
			140 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			140 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package ring provides a simple ring buffer for storing local data
 | |
| package ring
 | |
| 
 | |
| import (
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"go.unistack.org/micro/v3/util/id"
 | |
| )
 | |
| 
 | |
| // Buffer is ring buffer
 | |
| type Buffer struct {
 | |
| 	streams map[string]*Stream
 | |
| 	vals    []*Entry
 | |
| 	size    int
 | |
| 
 | |
| 	sync.RWMutex
 | |
| }
 | |
| 
 | |
| // Entry is ring buffer data entry
 | |
| type Entry struct {
 | |
| 	Timestamp time.Time
 | |
| 	Value     interface{}
 | |
| }
 | |
| 
 | |
| // Stream is used to stream the buffer
 | |
| type Stream struct {
 | |
| 	// Buffered entries
 | |
| 	Entries chan *Entry
 | |
| 	// Stop channel
 | |
| 	Stop chan bool
 | |
| 	// ID of the stream
 | |
| 	ID string
 | |
| }
 | |
| 
 | |
| // Put adds a new value to ring buffer
 | |
| func (b *Buffer) Put(v interface{}) {
 | |
| 	b.Lock()
 | |
| 	defer b.Unlock()
 | |
| 
 | |
| 	// append to values
 | |
| 	entry := &Entry{
 | |
| 		Value:     v,
 | |
| 		Timestamp: time.Now(),
 | |
| 	}
 | |
| 	b.vals = append(b.vals, entry)
 | |
| 
 | |
| 	// trim if bigger than size required
 | |
| 	if len(b.vals) > b.size {
 | |
| 		b.vals = b.vals[1:]
 | |
| 	}
 | |
| 
 | |
| 	// send to every stream
 | |
| 	for _, stream := range b.streams {
 | |
| 		select {
 | |
| 		case <-stream.Stop:
 | |
| 			delete(b.streams, stream.ID)
 | |
| 			close(stream.Entries)
 | |
| 		case stream.Entries <- entry:
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Get returns the last n entries
 | |
| func (b *Buffer) Get(n int) []*Entry {
 | |
| 	b.RLock()
 | |
| 	defer b.RUnlock()
 | |
| 
 | |
| 	// reset any invalid values
 | |
| 	if n > len(b.vals) || n < 0 {
 | |
| 		n = len(b.vals)
 | |
| 	}
 | |
| 
 | |
| 	// create a delta
 | |
| 	delta := len(b.vals) - n
 | |
| 
 | |
| 	// return the delta set
 | |
| 	return b.vals[delta:]
 | |
| }
 | |
| 
 | |
| // Since returns the entries since a specific time
 | |
| func (b *Buffer) Since(t time.Time) []*Entry {
 | |
| 	b.RLock()
 | |
| 	defer b.RUnlock()
 | |
| 
 | |
| 	// return all the values
 | |
| 	if t.IsZero() {
 | |
| 		return b.vals
 | |
| 	}
 | |
| 
 | |
| 	// if its in the future return nothing
 | |
| 	if time.Since(t).Seconds() < 0.0 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	for i, v := range b.vals {
 | |
| 		// find the starting point
 | |
| 		d := v.Timestamp.Sub(t)
 | |
| 
 | |
| 		// return the values
 | |
| 		if d.Seconds() > 0.0 {
 | |
| 			return b.vals[i:]
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Stream logs from the buffer
 | |
| // Close the channel when you want to stop
 | |
| func (b *Buffer) Stream() (<-chan *Entry, chan bool) {
 | |
| 	b.Lock()
 | |
| 	defer b.Unlock()
 | |
| 
 | |
| 	entries := make(chan *Entry, 128)
 | |
| 	id := id.Must()
 | |
| 	stop := make(chan bool)
 | |
| 
 | |
| 	b.streams[id] = &Stream{
 | |
| 		ID:      id,
 | |
| 		Entries: entries,
 | |
| 		Stop:    stop,
 | |
| 	}
 | |
| 
 | |
| 	return entries, stop
 | |
| }
 | |
| 
 | |
| // Size returns the size of the ring buffer
 | |
| func (b *Buffer) Size() int {
 | |
| 	return b.size
 | |
| }
 | |
| 
 | |
| // New returns a new buffer of the given size
 | |
| func New(i int) *Buffer {
 | |
| 	return &Buffer{
 | |
| 		size:    i,
 | |
| 		streams: make(map[string]*Stream),
 | |
| 	}
 | |
| }
 |