fix the os logger

This commit is contained in:
Asim Aslam
2019-12-18 15:06:25 +00:00
parent 7c21a1b92a
commit 5d7254e79a
3 changed files with 126 additions and 50 deletions

View File

@@ -3,17 +3,117 @@ package log
import (
"bufio"
"encoding/json"
"io"
"os"
"sync"
"time"
"github.com/google/uuid"
)
// Should stream from OS
type osLog struct{}
type osLog struct {
sync.RWMutex
subs map[string]*osStream
}
type osStream struct {
stream chan Record
scanner *bufio.Reader
stop chan bool
stream chan Record
stop chan bool
}
// watch io stream
func (o *osLog) run() {
// save outputs
stdout := *os.Stdout
stderr := *os.Stderr
// new os pipe
r, w := io.Pipe()
// create new iopipes
r1, w1, _ := os.Pipe()
r2, w2, _ := os.Pipe()
// create tea readers
tee1 := io.TeeReader(r1, &stdout)
tee2 := io.TeeReader(r2, &stderr)
// start copying
go io.Copy(w, tee1)
go io.Copy(w, tee2)
// set default go log output
//log.SetOutput(w2)
// replace os stdout and os stderr
*os.Stdout = *w1
*os.Stderr = *w2
// this should short circuit everything
defer func() {
// reset stdout and stderr
*os.Stdout = stdout
*os.Stderr = stderr
//log.SetOutput(stderr)
// close all the outputs
r.Close()
r1.Close()
r2.Close()
w.Close()
w1.Close()
w2.Close()
}()
// read from standard error
scanner := bufio.NewReader(r)
for {
// read the line
line, err := scanner.ReadString('\n')
if err != nil {
return
}
// check if the line exists
if len(line) == 0 {
continue
}
// parse the record
var r Record
if line[0] == '{' {
json.Unmarshal([]byte(line), &r)
} else {
r = Record{
Timestamp: time.Now(),
Value: line,
Metadata: make(map[string]string),
}
}
o.Lock()
// bail if there's no subscribers
if len(o.subs) == 0 {
o.Unlock()
return
}
// check subs and send to stream
for id, sub := range o.subs {
// send to stream
select {
case <-sub.stop:
delete(o.subs, id)
case sub.stream <- r:
// send to stream
default:
// do not block
}
}
o.Unlock()
}
}
// Read reads log entries from the logger
@@ -30,52 +130,24 @@ func (o *osLog) Write(r Record) error {
// Stream log records
func (o *osLog) Stream() (Stream, error) {
// read from standard error
scanner := bufio.NewReader(os.Stderr)
stream := make(chan Record, 128)
stop := make(chan bool)
o.Lock()
defer o.Unlock()
go func() {
for {
select {
case <-stop:
return
default:
// read the line
line, err := scanner.ReadString('\n')
if err != nil {
return
}
// check if the line exists
if len(line) == 0 {
continue
}
// parse the record
var r Record
if line[0] == '{' {
json.Unmarshal([]byte(line), &r)
} else {
r = Record{
Timestamp: time.Now(),
Value: line,
Metadata: make(map[string]string),
}
}
// send to stream
select {
case <-stop:
return
case stream <- r:
}
}
}
}()
// start stream watcher
if len(o.subs) == 0 {
go o.run()
}
return &osStream{
stream: stream,
scanner: scanner,
stop: stop,
}, nil
// create stream
st := &osStream{
stream: make(chan Record, 128),
stop: make(chan bool),
}
// save stream
o.subs[uuid.New().String()] = st
return st, nil
}
func (o *osStream) Chan() <-chan Record {
@@ -93,5 +165,7 @@ func (o *osStream) Stop() error {
}
func NewLog(opts ...Option) Log {
return &osLog{}
return &osLog{
subs: make(map[string]*osStream),
}
}