Micro log fixes (#1570)

This commit is contained in:
Janos Dobronszki 2020-04-28 09:49:39 +02:00 committed by GitHub
parent 25c82245b1
commit 8148e0a0f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 80 additions and 18 deletions

View File

@ -251,6 +251,18 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error {
return nil return nil
} }
// exists returns whether the given file or directory exists
func exists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return true, err
}
// @todo: Getting existing lines is not supported yet. // @todo: Getting existing lines is not supported yet.
// The reason for this is because it's hard to calculate line offset // The reason for this is because it's hard to calculate line offset
// as opposed to character offset. // as opposed to character offset.
@ -265,18 +277,41 @@ func (r *runtime) Logs(s *Service, options ...LogsOption) (LogStream, error) {
stream: make(chan LogRecord), stream: make(chan LogRecord),
stop: make(chan bool), stop: make(chan bool),
} }
t, err := tail.TailFile(logFile(s.Name), tail.Config{Follow: true, Location: &tail.SeekInfo{
Whence: 2, fpath := logFile(s.Name)
Offset: 0, if ex, err := exists(fpath); err != nil {
return nil, err
} else if !ex {
return nil, fmt.Errorf("Log file %v does not exists", fpath)
}
whence := 2
// Multiply by length of an average line of log in bytes
offset := -1 * lopts.Count * 200
t, err := tail.TailFile(logFile(s.Name), tail.Config{Follow: lopts.Stream, Location: &tail.SeekInfo{
Whence: whence,
Offset: int64(offset),
}, Logger: tail.DiscardingLogger}) }, Logger: tail.DiscardingLogger})
if err != nil { if err != nil {
return nil, err return nil, err
} }
ret.tail = t ret.tail = t
go func() { go func() {
for line := range t.Lines { for {
ret.stream <- LogRecord{Message: line.Text} select {
case line, ok := <-t.Lines:
if !ok {
ret.Stop()
return
}
ret.stream <- LogRecord{Message: line.Text}
case <-ret.stop:
return
}
} }
}() }()
return ret, nil return ret, nil
} }
@ -301,16 +336,17 @@ func (l *logStream) Error() error {
func (l *logStream) Stop() error { func (l *logStream) Stop() error {
l.Lock() l.Lock()
defer l.Unlock() defer l.Unlock()
// @todo seems like this is causing a hangup
//err := l.tail.Stop()
//if err != nil {
// return err
//}
select { select {
case <-l.stop: case <-l.stop:
return nil return nil
default: default:
close(l.stop) close(l.stop)
close(l.stream)
err := l.tail.Stop()
if err != nil {
return err
}
} }
return nil return nil
} }

View File

@ -7,7 +7,6 @@ import (
"github.com/micro/go-micro/v2/client" "github.com/micro/go-micro/v2/client"
"github.com/micro/go-micro/v2/runtime" "github.com/micro/go-micro/v2/runtime"
pb "github.com/micro/go-micro/v2/runtime/service/proto" pb "github.com/micro/go-micro/v2/runtime/service/proto"
"github.com/micro/go-micro/v2/util/log"
) )
type svc struct { type svc struct {
@ -72,14 +71,15 @@ func (s *svc) Logs(service *runtime.Service, opts ...runtime.LogsOption) (runtim
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }
if options.Context == nil { if options.Context == nil {
options.Context = context.Background() options.Context = context.Background()
} }
ls, err := s.runtime.Logs(options.Context, &pb.LogsRequest{ ls, err := s.runtime.Logs(options.Context, &pb.LogsRequest{
Service: service.Name, Service: service.Name,
Stream: true, Stream: options.Stream,
Count: 10, // @todo pass in actual options Count: options.Count,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -89,14 +89,39 @@ func (s *svc) Logs(service *runtime.Service, opts ...runtime.LogsOption) (runtim
stream: make(chan runtime.LogRecord), stream: make(chan runtime.LogRecord),
stop: make(chan bool), stop: make(chan bool),
} }
go func() { go func() {
for { for {
record := runtime.LogRecord{} select {
err := ls.RecvMsg(&record) // @todo this never seems to return, investigate
if err != nil { case <-ls.Context().Done():
log.Error(err) logStream.Stop()
}
}
}()
go func() {
for {
select {
// @todo this never seems to return, investigate
case <-ls.Context().Done():
return
case _, ok := <-logStream.stream:
if !ok {
return
}
default:
record := pb.LogRecord{}
err := ls.RecvMsg(&record)
if err != nil {
logStream.Stop()
return
}
logStream.stream <- runtime.LogRecord{
Message: record.GetMessage(),
Metadata: record.GetMetadata(),
}
} }
logStream.stream <- record
} }
}() }()
return logStream, nil return logStream, nil
@ -125,6 +150,7 @@ func (l *serviceLogStream) Stop() error {
case <-l.stop: case <-l.stop:
return nil return nil
default: default:
close(l.stream)
close(l.stop) close(l.stop)
} }
return nil return nil