201 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			201 lines
		
	
	
		
			3.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package log
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"encoding/json"
 | 
						|
	"io"
 | 
						|
	"os"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/google/uuid"
 | 
						|
	"github.com/micro/go-micro/util/ring"
 | 
						|
)
 | 
						|
 | 
						|
// Should stream from OS
 | 
						|
type osLog struct {
 | 
						|
	format FormatFunc
 | 
						|
	once   sync.Once
 | 
						|
 | 
						|
	sync.RWMutex
 | 
						|
	buffer *ring.Buffer
 | 
						|
	subs   map[string]*osStream
 | 
						|
}
 | 
						|
 | 
						|
type osStream struct {
 | 
						|
	stream chan Record
 | 
						|
	stop   chan bool
 | 
						|
}
 | 
						|
 | 
						|
// watch io stream
 | 
						|
func (o *osLog) run() {
 | 
						|
	// save outputs
 | 
						|
	stdout := *os.Stdout
 | 
						|
	stderr := *os.Stderr
 | 
						|
 | 
						|
	// new os pipe
 | 
						|
	r, w := io.Pipe()
 | 
						|
 | 
						|
	// create new iopipes
 | 
						|
	r1, w1, _ := os.Pipe()
 | 
						|
	r2, w2, _ := os.Pipe()
 | 
						|
 | 
						|
	// create tea readers
 | 
						|
	tee1 := io.TeeReader(r1, &stdout)
 | 
						|
	tee2 := io.TeeReader(r2, &stderr)
 | 
						|
 | 
						|
	// start copying
 | 
						|
	go io.Copy(w, tee1)
 | 
						|
	go io.Copy(w, tee2)
 | 
						|
 | 
						|
	// set default go log output
 | 
						|
	//log.SetOutput(w2)
 | 
						|
 | 
						|
	// replace os stdout and os stderr
 | 
						|
	*os.Stdout = *w1
 | 
						|
	*os.Stderr = *w2
 | 
						|
 | 
						|
	// this should short circuit everything
 | 
						|
	defer func() {
 | 
						|
		// reset stdout and stderr
 | 
						|
		*os.Stdout = stdout
 | 
						|
		*os.Stderr = stderr
 | 
						|
		//log.SetOutput(stderr)
 | 
						|
 | 
						|
		// close all the outputs
 | 
						|
		r.Close()
 | 
						|
		r1.Close()
 | 
						|
		r2.Close()
 | 
						|
		w.Close()
 | 
						|
		w1.Close()
 | 
						|
		w2.Close()
 | 
						|
	}()
 | 
						|
 | 
						|
	// read from standard error
 | 
						|
	scanner := bufio.NewReader(r)
 | 
						|
 | 
						|
	for {
 | 
						|
		// read the line
 | 
						|
		line, err := scanner.ReadString('\n')
 | 
						|
		if err != nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		// check if the line exists
 | 
						|
		if len(line) == 0 {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		// parse the record
 | 
						|
		var r Record
 | 
						|
		if line[0] == '{' {
 | 
						|
			json.Unmarshal([]byte(line), &r)
 | 
						|
		} else {
 | 
						|
			r = Record{
 | 
						|
				Timestamp: time.Now(),
 | 
						|
				Message:   strings.TrimSuffix(line, "\n"),
 | 
						|
				Metadata:  make(map[string]string),
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		o.Lock()
 | 
						|
 | 
						|
		// write to the buffer
 | 
						|
		o.buffer.Put(r)
 | 
						|
 | 
						|
		// check subs and send to stream
 | 
						|
		for id, sub := range o.subs {
 | 
						|
			// send to stream
 | 
						|
			select {
 | 
						|
			case <-sub.stop:
 | 
						|
				delete(o.subs, id)
 | 
						|
			case sub.stream <- r:
 | 
						|
				// send to stream
 | 
						|
			default:
 | 
						|
				// do not block
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		o.Unlock()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Read reads log entries from the logger
 | 
						|
func (o *osLog) Read(...ReadOption) ([]Record, error) {
 | 
						|
	o.once.Do(func() {
 | 
						|
		go o.run()
 | 
						|
	})
 | 
						|
 | 
						|
	var records []Record
 | 
						|
 | 
						|
	// read the last 100 records
 | 
						|
	for _, v := range o.buffer.Get(100) {
 | 
						|
		records = append(records, v.Value.(Record))
 | 
						|
	}
 | 
						|
 | 
						|
	return records, nil
 | 
						|
}
 | 
						|
 | 
						|
// Write writes records to log
 | 
						|
func (o *osLog) Write(r Record) error {
 | 
						|
	o.once.Do(func() {
 | 
						|
		go o.run()
 | 
						|
	})
 | 
						|
 | 
						|
	// generate output
 | 
						|
	out := o.format(r) + "\n"
 | 
						|
	_, err := os.Stderr.Write([]byte(out))
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// Stream log records
 | 
						|
func (o *osLog) Stream() (Stream, error) {
 | 
						|
	o.once.Do(func() {
 | 
						|
		go o.run()
 | 
						|
	})
 | 
						|
 | 
						|
	o.Lock()
 | 
						|
	defer o.Unlock()
 | 
						|
 | 
						|
	// create stream
 | 
						|
	st := &osStream{
 | 
						|
		stream: make(chan Record, 128),
 | 
						|
		stop:   make(chan bool),
 | 
						|
	}
 | 
						|
 | 
						|
	// save stream
 | 
						|
	o.subs[uuid.New().String()] = st
 | 
						|
 | 
						|
	return st, nil
 | 
						|
}
 | 
						|
 | 
						|
func (o *osStream) Chan() <-chan Record {
 | 
						|
	return o.stream
 | 
						|
}
 | 
						|
 | 
						|
func (o *osStream) Stop() error {
 | 
						|
	select {
 | 
						|
	case <-o.stop:
 | 
						|
		return nil
 | 
						|
	default:
 | 
						|
		close(o.stop)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func NewLog(opts ...Option) Log {
 | 
						|
	options := Options{
 | 
						|
		Format: DefaultFormat,
 | 
						|
	}
 | 
						|
	for _, o := range opts {
 | 
						|
		o(&options)
 | 
						|
	}
 | 
						|
 | 
						|
	l := &osLog{
 | 
						|
		format: options.Format,
 | 
						|
		buffer: ring.New(1024),
 | 
						|
		subs:   make(map[string]*osStream),
 | 
						|
	}
 | 
						|
 | 
						|
	return l
 | 
						|
}
 |