86 lines
		
	
	
		
			1.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			86 lines
		
	
	
		
			1.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package buffer
 | 
						|
 | 
						|
import (
 | 
						|
	"io"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
var _ io.WriteCloser = (*DelayedBuffer)(nil)
 | 
						|
 | 
						|
// DelayedBuffer is the buffer that holds items until either the buffer filled or a specified time limit is reached
 | 
						|
type DelayedBuffer struct {
 | 
						|
	mu        sync.Mutex
 | 
						|
	maxWait   time.Duration
 | 
						|
	flushTime time.Time
 | 
						|
	buffer    chan []byte
 | 
						|
	ticker    *time.Ticker
 | 
						|
	w         io.Writer
 | 
						|
	err       error
 | 
						|
}
 | 
						|
 | 
						|
func NewDelayedBuffer(size int, maxWait time.Duration, w io.Writer) *DelayedBuffer {
 | 
						|
	b := &DelayedBuffer{
 | 
						|
		buffer:    make(chan []byte, size),
 | 
						|
		ticker:    time.NewTicker(maxWait),
 | 
						|
		w:         w,
 | 
						|
		flushTime: time.Now(),
 | 
						|
		maxWait:   maxWait,
 | 
						|
	}
 | 
						|
	b.loop()
 | 
						|
	return b
 | 
						|
}
 | 
						|
 | 
						|
func (b *DelayedBuffer) loop() {
 | 
						|
	go func() {
 | 
						|
		for range b.ticker.C {
 | 
						|
			b.mu.Lock()
 | 
						|
			if time.Since(b.flushTime) > b.maxWait {
 | 
						|
				b.flush()
 | 
						|
			}
 | 
						|
			b.mu.Unlock()
 | 
						|
		}
 | 
						|
	}()
 | 
						|
}
 | 
						|
 | 
						|
func (b *DelayedBuffer) flush() {
 | 
						|
	bufLen := len(b.buffer)
 | 
						|
	if bufLen > 0 {
 | 
						|
		tmp := make([][]byte, bufLen)
 | 
						|
		for i := 0; i < bufLen; i++ {
 | 
						|
			tmp[i] = <-b.buffer
 | 
						|
		}
 | 
						|
		for _, t := range tmp {
 | 
						|
			_, b.err = b.w.Write(t)
 | 
						|
		}
 | 
						|
		b.flushTime = time.Now()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (b *DelayedBuffer) Put(items ...[]byte) {
 | 
						|
	b.mu.Lock()
 | 
						|
	for _, item := range items {
 | 
						|
		select {
 | 
						|
		case b.buffer <- item:
 | 
						|
		default:
 | 
						|
			b.flush()
 | 
						|
			b.buffer <- item
 | 
						|
		}
 | 
						|
	}
 | 
						|
	b.mu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
func (b *DelayedBuffer) Close() error {
 | 
						|
	b.mu.Lock()
 | 
						|
	b.flush()
 | 
						|
	close(b.buffer)
 | 
						|
	b.ticker.Stop()
 | 
						|
	b.mu.Unlock()
 | 
						|
	return b.err
 | 
						|
}
 | 
						|
 | 
						|
func (b *DelayedBuffer) Write(data []byte) (int, error) {
 | 
						|
	b.Put(data)
 | 
						|
	return len(data), b.err
 | 
						|
}
 |