diff --git a/runtime/default.go b/runtime/default.go index 93c69739..5b85a3a2 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -251,6 +251,18 @@ func (r *runtime) Create(s *Service, opts ...CreateOption) error { return nil } +// exists returns whether the given file or directory exists +func exists(path string) (bool, error) { + _, err := os.Stat(path) + if err == nil { + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return true, err +} + // @todo: Getting existing lines is not supported yet. // The reason for this is because it's hard to calculate line offset // as opposed to character offset. @@ -265,18 +277,41 @@ func (r *runtime) Logs(s *Service, options ...LogsOption) (LogStream, error) { stream: make(chan LogRecord), stop: make(chan bool), } - t, err := tail.TailFile(logFile(s.Name), tail.Config{Follow: true, Location: &tail.SeekInfo{ - Whence: 2, - Offset: 0, + + fpath := logFile(s.Name) + if ex, err := exists(fpath); err != nil { + return nil, err + } else if !ex { + return nil, fmt.Errorf("Log file %v does not exists", fpath) + } + + whence := 2 + // Multiply by length of an average line of log in bytes + offset := -1 * lopts.Count * 200 + + t, err := tail.TailFile(logFile(s.Name), tail.Config{Follow: lopts.Stream, Location: &tail.SeekInfo{ + Whence: whence, + Offset: int64(offset), }, Logger: tail.DiscardingLogger}) if err != nil { return nil, err } + ret.tail = t go func() { - for line := range t.Lines { - ret.stream <- LogRecord{Message: line.Text} + for { + select { + case line, ok := <-t.Lines: + if !ok { + ret.Stop() + return + } + ret.stream <- LogRecord{Message: line.Text} + case <-ret.stop: + return + } } + }() return ret, nil } @@ -301,16 +336,17 @@ func (l *logStream) Error() error { func (l *logStream) Stop() error { l.Lock() defer l.Unlock() - // @todo seems like this is causing a hangup - //err := l.tail.Stop() - //if err != nil { - // return err - //} + select { case <-l.stop: return nil default: close(l.stop) + close(l.stream) + err := l.tail.Stop() + if err != nil { + return err + } } return nil } diff --git a/runtime/service/service.go b/runtime/service/service.go index 06cfc2ad..f2c5c95a 100644 --- a/runtime/service/service.go +++ b/runtime/service/service.go @@ -7,7 +7,6 @@ import ( "github.com/micro/go-micro/v2/client" "github.com/micro/go-micro/v2/runtime" pb "github.com/micro/go-micro/v2/runtime/service/proto" - "github.com/micro/go-micro/v2/util/log" ) type svc struct { @@ -72,14 +71,15 @@ func (s *svc) Logs(service *runtime.Service, opts ...runtime.LogsOption) (runtim for _, o := range opts { o(&options) } + if options.Context == nil { options.Context = context.Background() } ls, err := s.runtime.Logs(options.Context, &pb.LogsRequest{ Service: service.Name, - Stream: true, - Count: 10, // @todo pass in actual options + Stream: options.Stream, + Count: options.Count, }) if err != nil { return nil, err @@ -89,14 +89,39 @@ func (s *svc) Logs(service *runtime.Service, opts ...runtime.LogsOption) (runtim stream: make(chan runtime.LogRecord), stop: make(chan bool), } + go func() { for { - record := runtime.LogRecord{} - err := ls.RecvMsg(&record) - if err != nil { - log.Error(err) + select { + // @todo this never seems to return, investigate + case <-ls.Context().Done(): + logStream.Stop() + } + } + }() + + go func() { + for { + select { + // @todo this never seems to return, investigate + case <-ls.Context().Done(): + return + case _, ok := <-logStream.stream: + if !ok { + return + } + default: + record := pb.LogRecord{} + err := ls.RecvMsg(&record) + if err != nil { + logStream.Stop() + return + } + logStream.stream <- runtime.LogRecord{ + Message: record.GetMessage(), + Metadata: record.GetMetadata(), + } } - logStream.stream <- record } }() return logStream, nil @@ -125,6 +150,7 @@ func (l *serviceLogStream) Stop() error { case <-l.stop: return nil default: + close(l.stream) close(l.stop) } return nil