diff --git a/debug/log/os.go b/debug/log/os.go index 458c4bdc..60f47d05 100644 --- a/debug/log/os.go +++ b/debug/log/os.go @@ -3,17 +3,117 @@ package log import ( "bufio" "encoding/json" + "io" "os" + "sync" "time" + + "github.com/google/uuid" ) // Should stream from OS -type osLog struct{} +type osLog struct { + sync.RWMutex + subs map[string]*osStream +} type osStream struct { - stream chan Record - scanner *bufio.Reader - stop chan bool + stream chan Record + stop chan bool +} + +// watch io stream +func (o *osLog) run() { + // save outputs + stdout := *os.Stdout + stderr := *os.Stderr + + // new os pipe + r, w := io.Pipe() + + // create new iopipes + r1, w1, _ := os.Pipe() + r2, w2, _ := os.Pipe() + + // create tea readers + tee1 := io.TeeReader(r1, &stdout) + tee2 := io.TeeReader(r2, &stderr) + + // start copying + go io.Copy(w, tee1) + go io.Copy(w, tee2) + + // set default go log output + //log.SetOutput(w2) + + // replace os stdout and os stderr + *os.Stdout = *w1 + *os.Stderr = *w2 + + // this should short circuit everything + defer func() { + // reset stdout and stderr + *os.Stdout = stdout + *os.Stderr = stderr + //log.SetOutput(stderr) + + // close all the outputs + r.Close() + r1.Close() + r2.Close() + w.Close() + w1.Close() + w2.Close() + }() + + // read from standard error + scanner := bufio.NewReader(r) + + for { + // read the line + line, err := scanner.ReadString('\n') + if err != nil { + return + } + // check if the line exists + if len(line) == 0 { + continue + } + // parse the record + var r Record + if line[0] == '{' { + json.Unmarshal([]byte(line), &r) + } else { + r = Record{ + Timestamp: time.Now(), + Value: line, + Metadata: make(map[string]string), + } + } + + o.Lock() + + // bail if there's no subscribers + if len(o.subs) == 0 { + o.Unlock() + return + } + + // check subs and send to stream + for id, sub := range o.subs { + // send to stream + select { + case <-sub.stop: + delete(o.subs, id) + case sub.stream <- r: + // send to stream + default: + // do not block + } + } + + o.Unlock() + } } // Read reads log entries from the logger @@ -30,52 +130,24 @@ func (o *osLog) Write(r Record) error { // Stream log records func (o *osLog) Stream() (Stream, error) { - // read from standard error - scanner := bufio.NewReader(os.Stderr) - stream := make(chan Record, 128) - stop := make(chan bool) + o.Lock() + defer o.Unlock() - go func() { - for { - select { - case <-stop: - return - default: - // read the line - line, err := scanner.ReadString('\n') - if err != nil { - return - } - // check if the line exists - if len(line) == 0 { - continue - } - // parse the record - var r Record - if line[0] == '{' { - json.Unmarshal([]byte(line), &r) - } else { - r = Record{ - Timestamp: time.Now(), - Value: line, - Metadata: make(map[string]string), - } - } - // send to stream - select { - case <-stop: - return - case stream <- r: - } - } - } - }() + // start stream watcher + if len(o.subs) == 0 { + go o.run() + } - return &osStream{ - stream: stream, - scanner: scanner, - stop: stop, - }, nil + // create stream + st := &osStream{ + stream: make(chan Record, 128), + stop: make(chan bool), + } + + // save stream + o.subs[uuid.New().String()] = st + + return st, nil } func (o *osStream) Chan() <-chan Record { @@ -93,5 +165,7 @@ func (o *osStream) Stop() error { } func NewLog(opts ...Option) Log { - return &osLog{} + return &osLog{ + subs: make(map[string]*osStream), + } } diff --git a/go.mod b/go.mod index 5e442a33..498f4f3f 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/fsouza/go-dockerclient v1.6.0 github.com/ghodss/yaml v1.0.0 github.com/go-acme/lego/v3 v3.1.0 - github.com/go-log/log v0.1.0 + github.com/go-log/log v0.2.0 github.com/go-playground/locales v0.13.0 // indirect github.com/golang/protobuf v1.3.2 github.com/google/uuid v1.1.1 diff --git a/go.sum b/go.sum index 8a137787..469bfba7 100644 --- a/go.sum +++ b/go.sum @@ -129,6 +129,8 @@ github.com/go-ini/ini v1.44.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3I github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-log/log v0.1.0 h1:wudGTNsiGzrD5ZjgIkVZ517ugi2XRe9Q/xRCzwEO4/U= github.com/go-log/log v0.1.0/go.mod h1:4mBwpdRMFLiuXZDCwU2lKQFsoSCo72j3HqBK9d81N2M= +github.com/go-log/log v0.2.0 h1:z8i91GBudxD5L3RmF0KVpetCbcGWAV7q1Tw1eRwQM9Q= +github.com/go-log/log v0.2.0/go.mod h1:xzCnwajcues/6w7lne3yK2QU7DBPW7kqbgPGG5AF65U= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM=