From 1fa8c2456aea706f2c429d5420d43bd32c37efad Mon Sep 17 00:00:00 2001
From: Vasiliy Tolstov <v.tolstov@unistack.org>
Date: Sun, 29 Dec 2024 01:52:53 +0300
Subject: [PATCH] util/buffer: add DelayedBuffer

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
---
 util/buf/buf.go            | 27 ------------
 util/buffer/buffer.go      | 85 ++++++++++++++++++++++++++++++++++++++
 util/buffer/buffer_test.go | 22 ++++++++++
 3 files changed, 107 insertions(+), 27 deletions(-)
 delete mode 100644 util/buf/buf.go
 create mode 100644 util/buffer/buffer.go
 create mode 100644 util/buffer/buffer_test.go

diff --git a/util/buf/buf.go b/util/buf/buf.go
deleted file mode 100644
index e6b86214..00000000
--- a/util/buf/buf.go
+++ /dev/null
@@ -1,27 +0,0 @@
-package buf
-
-import (
-	"bytes"
-	"io"
-)
-
-var _ io.Closer = &Buffer{}
-
-// Buffer bytes.Buffer wrapper to satisfie io.Closer interface
-type Buffer struct {
-	*bytes.Buffer
-}
-
-// Close reset buffer contents
-func (b *Buffer) Close() error {
-	b.Buffer.Reset()
-	return nil
-}
-
-// New creates new buffer that satisfies Closer interface
-func New(b *bytes.Buffer) *Buffer {
-	if b == nil {
-		b = bytes.NewBuffer(nil)
-	}
-	return &Buffer{b}
-}
diff --git a/util/buffer/buffer.go b/util/buffer/buffer.go
new file mode 100644
index 00000000..8d93fc3b
--- /dev/null
+++ b/util/buffer/buffer.go
@@ -0,0 +1,85 @@
+package buffer
+
+import (
+	"io"
+	"sync"
+	"time"
+)
+
+var _ io.WriteCloser = (*DelayedBuffer)(nil)
+
+// DelayedBuffer is the buffer that holds items until either the buffer filled or a specified time limit is reached
+type DelayedBuffer struct {
+	mu        sync.Mutex
+	maxWait   time.Duration
+	flushTime time.Time
+	buffer    chan []byte
+	ticker    *time.Ticker
+	w         io.Writer
+	err       error
+}
+
+func NewDelayedBuffer(size int, maxWait time.Duration, w io.Writer) *DelayedBuffer {
+	b := &DelayedBuffer{
+		buffer:    make(chan []byte, size),
+		ticker:    time.NewTicker(maxWait),
+		w:         w,
+		flushTime: time.Now(),
+		maxWait:   maxWait,
+	}
+	b.loop()
+	return b
+}
+
+func (b *DelayedBuffer) loop() {
+	go func() {
+		for range b.ticker.C {
+			b.mu.Lock()
+			if time.Since(b.flushTime) > b.maxWait {
+				b.flush()
+			}
+			b.mu.Unlock()
+		}
+	}()
+}
+
+func (b *DelayedBuffer) flush() {
+	bufLen := len(b.buffer)
+	if bufLen > 0 {
+		tmp := make([][]byte, bufLen)
+		for i := 0; i < bufLen; i++ {
+			tmp[i] = <-b.buffer
+		}
+		for _, t := range tmp {
+			_, b.err = b.w.Write(t)
+		}
+		b.flushTime = time.Now()
+	}
+}
+
+func (b *DelayedBuffer) Put(items ...[]byte) {
+	b.mu.Lock()
+	for _, item := range items {
+		select {
+		case b.buffer <- item:
+		default:
+			b.flush()
+			b.buffer <- item
+		}
+	}
+	b.mu.Unlock()
+}
+
+func (b *DelayedBuffer) Close() error {
+	b.mu.Lock()
+	b.flush()
+	close(b.buffer)
+	b.ticker.Stop()
+	b.mu.Unlock()
+	return b.err
+}
+
+func (b *DelayedBuffer) Write(data []byte) (int, error) {
+	b.Put(data)
+	return len(data), b.err
+}
diff --git a/util/buffer/buffer_test.go b/util/buffer/buffer_test.go
new file mode 100644
index 00000000..8285a82d
--- /dev/null
+++ b/util/buffer/buffer_test.go
@@ -0,0 +1,22 @@
+package buffer
+
+import (
+	"bytes"
+	"testing"
+	"time"
+)
+
+func TestTimedBuffer(t *testing.T) {
+	buf := bytes.NewBuffer(nil)
+	b := NewDelayedBuffer(100, 300*time.Millisecond, buf)
+	for i := 0; i < 100; i++ {
+		_, _ = b.Write([]byte(`test`))
+	}
+	if buf.Len() != 0 {
+		t.Fatal("delayed write not worked")
+	}
+	time.Sleep(400 * time.Millisecond)
+	if buf.Len() == 0 {
+		t.Fatal("delayed write not worked")
+	}
+}