86 lines
1.5 KiB
Go
86 lines
1.5 KiB
Go
|
package buffer
|
||
|
|
||
|
import (
|
||
|
"io"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
var _ io.WriteCloser = (*DelayedBuffer)(nil)
|
||
|
|
||
|
// DelayedBuffer is the buffer that holds items until either the buffer filled or a specified time limit is reached
|
||
|
type DelayedBuffer struct {
|
||
|
mu sync.Mutex
|
||
|
maxWait time.Duration
|
||
|
flushTime time.Time
|
||
|
buffer chan []byte
|
||
|
ticker *time.Ticker
|
||
|
w io.Writer
|
||
|
err error
|
||
|
}
|
||
|
|
||
|
func NewDelayedBuffer(size int, maxWait time.Duration, w io.Writer) *DelayedBuffer {
|
||
|
b := &DelayedBuffer{
|
||
|
buffer: make(chan []byte, size),
|
||
|
ticker: time.NewTicker(maxWait),
|
||
|
w: w,
|
||
|
flushTime: time.Now(),
|
||
|
maxWait: maxWait,
|
||
|
}
|
||
|
b.loop()
|
||
|
return b
|
||
|
}
|
||
|
|
||
|
func (b *DelayedBuffer) loop() {
|
||
|
go func() {
|
||
|
for range b.ticker.C {
|
||
|
b.mu.Lock()
|
||
|
if time.Since(b.flushTime) > b.maxWait {
|
||
|
b.flush()
|
||
|
}
|
||
|
b.mu.Unlock()
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
func (b *DelayedBuffer) flush() {
|
||
|
bufLen := len(b.buffer)
|
||
|
if bufLen > 0 {
|
||
|
tmp := make([][]byte, bufLen)
|
||
|
for i := 0; i < bufLen; i++ {
|
||
|
tmp[i] = <-b.buffer
|
||
|
}
|
||
|
for _, t := range tmp {
|
||
|
_, b.err = b.w.Write(t)
|
||
|
}
|
||
|
b.flushTime = time.Now()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (b *DelayedBuffer) Put(items ...[]byte) {
|
||
|
b.mu.Lock()
|
||
|
for _, item := range items {
|
||
|
select {
|
||
|
case b.buffer <- item:
|
||
|
default:
|
||
|
b.flush()
|
||
|
b.buffer <- item
|
||
|
}
|
||
|
}
|
||
|
b.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
func (b *DelayedBuffer) Close() error {
|
||
|
b.mu.Lock()
|
||
|
b.flush()
|
||
|
close(b.buffer)
|
||
|
b.ticker.Stop()
|
||
|
b.mu.Unlock()
|
||
|
return b.err
|
||
|
}
|
||
|
|
||
|
func (b *DelayedBuffer) Write(data []byte) (int, error) {
|
||
|
b.Put(data)
|
||
|
return len(data), b.err
|
||
|
}
|