298 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			298 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package service
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/micro/go-micro/v2/client"
 | |
| 	"github.com/micro/go-micro/v2/runtime"
 | |
| 	pb "github.com/micro/go-micro/v2/runtime/service/proto"
 | |
| )
 | |
| 
 | |
| type svc struct {
 | |
| 	sync.RWMutex
 | |
| 	options runtime.Options
 | |
| 	runtime pb.RuntimeService
 | |
| }
 | |
| 
 | |
| // Init initializes runtime with given options
 | |
| func (s *svc) Init(opts ...runtime.Option) error {
 | |
| 	s.Lock()
 | |
| 	defer s.Unlock()
 | |
| 
 | |
| 	for _, o := range opts {
 | |
| 		o(&s.options)
 | |
| 	}
 | |
| 
 | |
| 	// reset the runtime as the client could have changed
 | |
| 	s.runtime = pb.NewRuntimeService(runtime.DefaultName, s.options.Client)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Create registers a service in the runtime
 | |
| func (s *svc) Create(svc *runtime.Service, opts ...runtime.CreateOption) error {
 | |
| 	var options runtime.CreateOptions
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 	if options.Context == nil {
 | |
| 		options.Context = context.Background()
 | |
| 	}
 | |
| 
 | |
| 	// set the default source from MICRO_RUNTIME_SOURCE
 | |
| 	if len(svc.Source) == 0 {
 | |
| 		svc.Source = s.options.Source
 | |
| 	}
 | |
| 
 | |
| 	// runtime service create request
 | |
| 	req := &pb.CreateRequest{
 | |
| 		Service: &pb.Service{
 | |
| 			Name:     svc.Name,
 | |
| 			Version:  svc.Version,
 | |
| 			Source:   svc.Source,
 | |
| 			Metadata: svc.Metadata,
 | |
| 		},
 | |
| 		Options: &pb.CreateOptions{
 | |
| 			Command:   options.Command,
 | |
| 			Args:      options.Args,
 | |
| 			Env:       options.Env,
 | |
| 			Type:      options.Type,
 | |
| 			Image:     options.Image,
 | |
| 			Namespace: options.Namespace,
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	if _, err := s.runtime.Create(options.Context, req); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *svc) Logs(service *runtime.Service, opts ...runtime.LogsOption) (runtime.LogStream, error) {
 | |
| 	var options runtime.LogsOptions
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 
 | |
| 	if options.Context == nil {
 | |
| 		options.Context = context.Background()
 | |
| 	}
 | |
| 
 | |
| 	ls, err := s.runtime.Logs(options.Context, &pb.LogsRequest{
 | |
| 		Service: service.Name,
 | |
| 		Stream:  options.Stream,
 | |
| 		Count:   options.Count,
 | |
| 		Options: &pb.LogsOptions{
 | |
| 			Namespace: options.Namespace,
 | |
| 		},
 | |
| 	})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	logStream := &serviceLogStream{
 | |
| 		service: service.Name,
 | |
| 		stream:  make(chan runtime.LogRecord),
 | |
| 		stop:    make(chan bool),
 | |
| 	}
 | |
| 
 | |
| 	go func() {
 | |
| 		for {
 | |
| 			select {
 | |
| 			// @todo this never seems to return, investigate
 | |
| 			case <-ls.Context().Done():
 | |
| 				logStream.Stop()
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	go func() {
 | |
| 		for {
 | |
| 			select {
 | |
| 			// @todo this never seems to return, investigate
 | |
| 			case <-ls.Context().Done():
 | |
| 				return
 | |
| 			case _, ok := <-logStream.stream:
 | |
| 				if !ok {
 | |
| 					return
 | |
| 				}
 | |
| 			default:
 | |
| 				record := pb.LogRecord{}
 | |
| 				err := ls.RecvMsg(&record)
 | |
| 				if err != nil {
 | |
| 					logStream.Stop()
 | |
| 					return
 | |
| 				}
 | |
| 				logStream.stream <- runtime.LogRecord{
 | |
| 					Message:  record.GetMessage(),
 | |
| 					Metadata: record.GetMetadata(),
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 	return logStream, nil
 | |
| }
 | |
| 
 | |
| type serviceLogStream struct {
 | |
| 	service string
 | |
| 	stream  chan runtime.LogRecord
 | |
| 	sync.Mutex
 | |
| 	stop chan bool
 | |
| 	err  error
 | |
| }
 | |
| 
 | |
| func (l *serviceLogStream) Error() error {
 | |
| 	return l.err
 | |
| }
 | |
| 
 | |
| func (l *serviceLogStream) Chan() chan runtime.LogRecord {
 | |
| 	return l.stream
 | |
| }
 | |
| 
 | |
| func (l *serviceLogStream) Stop() error {
 | |
| 	l.Lock()
 | |
| 	defer l.Unlock()
 | |
| 	select {
 | |
| 	case <-l.stop:
 | |
| 		return nil
 | |
| 	default:
 | |
| 		close(l.stream)
 | |
| 		close(l.stop)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Read returns the service with the given name from the runtime
 | |
| func (s *svc) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) {
 | |
| 	var options runtime.ReadOptions
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 	if options.Context == nil {
 | |
| 		options.Context = context.Background()
 | |
| 	}
 | |
| 
 | |
| 	// runtime service create request
 | |
| 	req := &pb.ReadRequest{
 | |
| 		Options: &pb.ReadOptions{
 | |
| 			Service:   options.Service,
 | |
| 			Version:   options.Version,
 | |
| 			Type:      options.Type,
 | |
| 			Namespace: options.Namespace,
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	resp, err := s.runtime.Read(options.Context, req)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	services := make([]*runtime.Service, 0, len(resp.Services))
 | |
| 	for _, service := range resp.Services {
 | |
| 		svc := &runtime.Service{
 | |
| 			Name:     service.Name,
 | |
| 			Version:  service.Version,
 | |
| 			Source:   service.Source,
 | |
| 			Metadata: service.Metadata,
 | |
| 		}
 | |
| 		services = append(services, svc)
 | |
| 	}
 | |
| 
 | |
| 	return services, nil
 | |
| }
 | |
| 
 | |
| // Update updates the running service
 | |
| func (s *svc) Update(svc *runtime.Service, opts ...runtime.UpdateOption) error {
 | |
| 	var options runtime.UpdateOptions
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 	if options.Context == nil {
 | |
| 		options.Context = context.Background()
 | |
| 	}
 | |
| 
 | |
| 	// runtime service create request
 | |
| 	req := &pb.UpdateRequest{
 | |
| 		Service: &pb.Service{
 | |
| 			Name:     svc.Name,
 | |
| 			Version:  svc.Version,
 | |
| 			Source:   svc.Source,
 | |
| 			Metadata: svc.Metadata,
 | |
| 		},
 | |
| 		Options: &pb.UpdateOptions{
 | |
| 			Namespace: options.Namespace,
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	if _, err := s.runtime.Update(options.Context, req); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Delete stops and removes the service from the runtime
 | |
| func (s *svc) Delete(svc *runtime.Service, opts ...runtime.DeleteOption) error {
 | |
| 	var options runtime.DeleteOptions
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 	if options.Context == nil {
 | |
| 		options.Context = context.Background()
 | |
| 	}
 | |
| 
 | |
| 	// runtime service create request
 | |
| 	req := &pb.DeleteRequest{
 | |
| 		Service: &pb.Service{
 | |
| 			Name:     svc.Name,
 | |
| 			Version:  svc.Version,
 | |
| 			Source:   svc.Source,
 | |
| 			Metadata: svc.Metadata,
 | |
| 		},
 | |
| 		Options: &pb.DeleteOptions{
 | |
| 			Namespace: options.Namespace,
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	if _, err := s.runtime.Delete(options.Context, req); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Start starts the runtime
 | |
| func (s *svc) Start() error {
 | |
| 	// NOTE: nothing to be done here
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Stop stops the runtime
 | |
| func (s *svc) Stop() error {
 | |
| 	// NOTE: nothing to be done here
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Returns the runtime service implementation
 | |
| func (s *svc) String() string {
 | |
| 	return "service"
 | |
| }
 | |
| 
 | |
| // NewRuntime creates new service runtime and returns it
 | |
| func NewRuntime(opts ...runtime.Option) runtime.Runtime {
 | |
| 	var options runtime.Options
 | |
| 
 | |
| 	for _, o := range opts {
 | |
| 		o(&options)
 | |
| 	}
 | |
| 	if options.Client == nil {
 | |
| 		options.Client = client.DefaultClient
 | |
| 	}
 | |
| 
 | |
| 	return &svc{
 | |
| 		options: options,
 | |
| 		runtime: pb.NewRuntimeService(runtime.DefaultName, options.Client),
 | |
| 	}
 | |
| }
 |