util/buffer: add DelayedBuffer
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
		| @@ -1,27 +0,0 @@ | ||||
| package buf | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"io" | ||||
| ) | ||||
|  | ||||
| var _ io.Closer = &Buffer{} | ||||
|  | ||||
| // Buffer bytes.Buffer wrapper to satisfie io.Closer interface | ||||
| type Buffer struct { | ||||
| 	*bytes.Buffer | ||||
| } | ||||
|  | ||||
| // Close reset buffer contents | ||||
| func (b *Buffer) Close() error { | ||||
| 	b.Buffer.Reset() | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // New creates new buffer that satisfies Closer interface | ||||
| func New(b *bytes.Buffer) *Buffer { | ||||
| 	if b == nil { | ||||
| 		b = bytes.NewBuffer(nil) | ||||
| 	} | ||||
| 	return &Buffer{b} | ||||
| } | ||||
							
								
								
									
										85
									
								
								util/buffer/buffer.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										85
									
								
								util/buffer/buffer.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,85 @@ | ||||
| package buffer | ||||
|  | ||||
| import ( | ||||
| 	"io" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| var _ io.WriteCloser = (*DelayedBuffer)(nil) | ||||
|  | ||||
| // DelayedBuffer is the buffer that holds items until either the buffer filled or a specified time limit is reached | ||||
| type DelayedBuffer struct { | ||||
| 	mu        sync.Mutex | ||||
| 	maxWait   time.Duration | ||||
| 	flushTime time.Time | ||||
| 	buffer    chan []byte | ||||
| 	ticker    *time.Ticker | ||||
| 	w         io.Writer | ||||
| 	err       error | ||||
| } | ||||
|  | ||||
| func NewDelayedBuffer(size int, maxWait time.Duration, w io.Writer) *DelayedBuffer { | ||||
| 	b := &DelayedBuffer{ | ||||
| 		buffer:    make(chan []byte, size), | ||||
| 		ticker:    time.NewTicker(maxWait), | ||||
| 		w:         w, | ||||
| 		flushTime: time.Now(), | ||||
| 		maxWait:   maxWait, | ||||
| 	} | ||||
| 	b.loop() | ||||
| 	return b | ||||
| } | ||||
|  | ||||
| func (b *DelayedBuffer) loop() { | ||||
| 	go func() { | ||||
| 		for range b.ticker.C { | ||||
| 			b.mu.Lock() | ||||
| 			if time.Since(b.flushTime) > b.maxWait { | ||||
| 				b.flush() | ||||
| 			} | ||||
| 			b.mu.Unlock() | ||||
| 		} | ||||
| 	}() | ||||
| } | ||||
|  | ||||
| func (b *DelayedBuffer) flush() { | ||||
| 	bufLen := len(b.buffer) | ||||
| 	if bufLen > 0 { | ||||
| 		tmp := make([][]byte, bufLen) | ||||
| 		for i := 0; i < bufLen; i++ { | ||||
| 			tmp[i] = <-b.buffer | ||||
| 		} | ||||
| 		for _, t := range tmp { | ||||
| 			_, b.err = b.w.Write(t) | ||||
| 		} | ||||
| 		b.flushTime = time.Now() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (b *DelayedBuffer) Put(items ...[]byte) { | ||||
| 	b.mu.Lock() | ||||
| 	for _, item := range items { | ||||
| 		select { | ||||
| 		case b.buffer <- item: | ||||
| 		default: | ||||
| 			b.flush() | ||||
| 			b.buffer <- item | ||||
| 		} | ||||
| 	} | ||||
| 	b.mu.Unlock() | ||||
| } | ||||
|  | ||||
| func (b *DelayedBuffer) Close() error { | ||||
| 	b.mu.Lock() | ||||
| 	b.flush() | ||||
| 	close(b.buffer) | ||||
| 	b.ticker.Stop() | ||||
| 	b.mu.Unlock() | ||||
| 	return b.err | ||||
| } | ||||
|  | ||||
| func (b *DelayedBuffer) Write(data []byte) (int, error) { | ||||
| 	b.Put(data) | ||||
| 	return len(data), b.err | ||||
| } | ||||
							
								
								
									
										22
									
								
								util/buffer/buffer_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										22
									
								
								util/buffer/buffer_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,22 @@ | ||||
| package buffer | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| func TestTimedBuffer(t *testing.T) { | ||||
| 	buf := bytes.NewBuffer(nil) | ||||
| 	b := NewDelayedBuffer(100, 300*time.Millisecond, buf) | ||||
| 	for i := 0; i < 100; i++ { | ||||
| 		_, _ = b.Write([]byte(`test`)) | ||||
| 	} | ||||
| 	if buf.Len() != 0 { | ||||
| 		t.Fatal("delayed write not worked") | ||||
| 	} | ||||
| 	time.Sleep(400 * time.Millisecond) | ||||
| 	if buf.Len() == 0 { | ||||
| 		t.Fatal("delayed write not worked") | ||||
| 	} | ||||
| } | ||||
		Reference in New Issue
	
	Block a user