From 653bd386cc3d3b7259b218d26feea80301c503ff Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Sun, 29 Dec 2024 01:52:53 +0300 Subject: [PATCH] util/buffer: add DelayedBuffer Signed-off-by: Vasiliy Tolstov --- util/buf/buf.go | 27 ------------ util/buffer/buffer.go | 85 ++++++++++++++++++++++++++++++++++++++ util/buffer/buffer_test.go | 22 ++++++++++ 3 files changed, 107 insertions(+), 27 deletions(-) delete mode 100644 util/buf/buf.go create mode 100644 util/buffer/buffer.go create mode 100644 util/buffer/buffer_test.go diff --git a/util/buf/buf.go b/util/buf/buf.go deleted file mode 100644 index e6b86214..00000000 --- a/util/buf/buf.go +++ /dev/null @@ -1,27 +0,0 @@ -package buf - -import ( - "bytes" - "io" -) - -var _ io.Closer = &Buffer{} - -// Buffer bytes.Buffer wrapper to satisfie io.Closer interface -type Buffer struct { - *bytes.Buffer -} - -// Close reset buffer contents -func (b *Buffer) Close() error { - b.Buffer.Reset() - return nil -} - -// New creates new buffer that satisfies Closer interface -func New(b *bytes.Buffer) *Buffer { - if b == nil { - b = bytes.NewBuffer(nil) - } - return &Buffer{b} -} diff --git a/util/buffer/buffer.go b/util/buffer/buffer.go new file mode 100644 index 00000000..8d93fc3b --- /dev/null +++ b/util/buffer/buffer.go @@ -0,0 +1,85 @@ +package buffer + +import ( + "io" + "sync" + "time" +) + +var _ io.WriteCloser = (*DelayedBuffer)(nil) + +// DelayedBuffer is the buffer that holds items until either the buffer filled or a specified time limit is reached +type DelayedBuffer struct { + mu sync.Mutex + maxWait time.Duration + flushTime time.Time + buffer chan []byte + ticker *time.Ticker + w io.Writer + err error +} + +func NewDelayedBuffer(size int, maxWait time.Duration, w io.Writer) *DelayedBuffer { + b := &DelayedBuffer{ + buffer: make(chan []byte, size), + ticker: time.NewTicker(maxWait), + w: w, + flushTime: time.Now(), + maxWait: maxWait, + } + b.loop() + return b +} + +func (b *DelayedBuffer) loop() { + go func() { + for range b.ticker.C { + b.mu.Lock() + if time.Since(b.flushTime) > b.maxWait { + b.flush() + } + b.mu.Unlock() + } + }() +} + +func (b *DelayedBuffer) flush() { + bufLen := len(b.buffer) + if bufLen > 0 { + tmp := make([][]byte, bufLen) + for i := 0; i < bufLen; i++ { + tmp[i] = <-b.buffer + } + for _, t := range tmp { + _, b.err = b.w.Write(t) + } + b.flushTime = time.Now() + } +} + +func (b *DelayedBuffer) Put(items ...[]byte) { + b.mu.Lock() + for _, item := range items { + select { + case b.buffer <- item: + default: + b.flush() + b.buffer <- item + } + } + b.mu.Unlock() +} + +func (b *DelayedBuffer) Close() error { + b.mu.Lock() + b.flush() + close(b.buffer) + b.ticker.Stop() + b.mu.Unlock() + return b.err +} + +func (b *DelayedBuffer) Write(data []byte) (int, error) { + b.Put(data) + return len(data), b.err +} diff --git a/util/buffer/buffer_test.go b/util/buffer/buffer_test.go new file mode 100644 index 00000000..8285a82d --- /dev/null +++ b/util/buffer/buffer_test.go @@ -0,0 +1,22 @@ +package buffer + +import ( + "bytes" + "testing" + "time" +) + +func TestTimedBuffer(t *testing.T) { + buf := bytes.NewBuffer(nil) + b := NewDelayedBuffer(100, 300*time.Millisecond, buf) + for i := 0; i < 100; i++ { + _, _ = b.Write([]byte(`test`)) + } + if buf.Len() != 0 { + t.Fatal("delayed write not worked") + } + time.Sleep(400 * time.Millisecond) + if buf.Len() == 0 { + t.Fatal("delayed write not worked") + } +}