diff --git a/runtime/default.go b/runtime/default.go index af9772d9..9ebf7a06 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -2,6 +2,7 @@ package runtime import ( "errors" + "io" "strings" "sync" "time" @@ -31,6 +32,9 @@ type service struct { closed chan bool err error + // output for logs + output io.Writer + // service to manage *Service // process creator @@ -81,9 +85,15 @@ func newService(s *Service, c CreateOptions) *service { Env: c.Env, Args: args, }, + output: c.Output, } } +func (s *service) streamOutput() { + go io.Copy(s.output, s.PID.Output) + go io.Copy(s.output, s.PID.Error) +} + func (s *service) Running() bool { s.RLock() defer s.RUnlock() @@ -103,7 +113,7 @@ func (s *service) Start() error { s.closed = make(chan bool) // TODO: pull source & build binary - + log.Debugf("Runtime service %s forking new process\n") p, err := s.Process.Fork(s.Exec) if err != nil { return err @@ -114,6 +124,10 @@ func (s *service) Start() error { // set to running s.running = true + if s.output != nil { + s.streamOutput() + } + // wait and watch go s.Wait() diff --git a/runtime/options.go b/runtime/options.go index 99ae40c5..64221c9e 100644 --- a/runtime/options.go +++ b/runtime/options.go @@ -1,5 +1,9 @@ package runtime +import ( + "io" +) + type CreateOption func(o *CreateOptions) type CreateOptions struct { @@ -7,9 +11,11 @@ type CreateOptions struct { Command []string // Environment to configure Env []string + // Log output + Output io.Writer } -// Command specifies the command to execute +// WithCommand specifies the command to execute func WithCommand(c string, args ...string) CreateOption { return func(o *CreateOptions) { // set command @@ -19,9 +25,16 @@ func WithCommand(c string, args ...string) CreateOption { } } -// Env sets the created service env +// WithEnv sets the created service env func WithEnv(env []string) CreateOption { return func(o *CreateOptions) { o.Env = env } } + +// WithOutput sets the arg output +func WithOutput(out io.Writer) CreateOption { + return func(o *CreateOptions) { + o.Output = out + } +} diff --git a/runtime/proto/runtime.micro.go b/runtime/proto/runtime.micro.go new file mode 100644 index 00000000..f494748f --- /dev/null +++ b/runtime/proto/runtime.micro.go @@ -0,0 +1,108 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: micro/go-micro/runtime/proto/runtime.proto + +package go_micro_runtime + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +import ( + context "context" + client "github.com/micro/go-micro/client" + server "github.com/micro/go-micro/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ client.Option +var _ server.Option + +// Client API for Runtime service + +type RuntimeService interface { + Create(ctx context.Context, in *CreateRequest, opts ...client.CallOption) (*CreateResponse, error) + Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error) +} + +type runtimeService struct { + c client.Client + name string +} + +func NewRuntimeService(name string, c client.Client) RuntimeService { + if c == nil { + c = client.NewClient() + } + if len(name) == 0 { + name = "go.micro.runtime" + } + return &runtimeService{ + c: c, + name: name, + } +} + +func (c *runtimeService) Create(ctx context.Context, in *CreateRequest, opts ...client.CallOption) (*CreateResponse, error) { + req := c.c.NewRequest(c.name, "Runtime.Create", in) + out := new(CreateResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *runtimeService) Delete(ctx context.Context, in *DeleteRequest, opts ...client.CallOption) (*DeleteResponse, error) { + req := c.c.NewRequest(c.name, "Runtime.Delete", in) + out := new(DeleteResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Runtime service + +type RuntimeHandler interface { + Create(context.Context, *CreateRequest, *CreateResponse) error + Delete(context.Context, *DeleteRequest, *DeleteResponse) error +} + +func RegisterRuntimeHandler(s server.Server, hdlr RuntimeHandler, opts ...server.HandlerOption) error { + type runtime interface { + Create(ctx context.Context, in *CreateRequest, out *CreateResponse) error + Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error + } + type Runtime struct { + runtime + } + h := &runtimeHandler{hdlr} + return s.Handle(s.NewHandler(&Runtime{h}, opts...)) +} + +type runtimeHandler struct { + RuntimeHandler +} + +func (h *runtimeHandler) Create(ctx context.Context, in *CreateRequest, out *CreateResponse) error { + return h.RuntimeHandler.Create(ctx, in, out) +} + +func (h *runtimeHandler) Delete(ctx context.Context, in *DeleteRequest, out *DeleteResponse) error { + return h.RuntimeHandler.Delete(ctx, in, out) +} diff --git a/runtime/proto/runtime.pb.go b/runtime/proto/runtime.pb.go new file mode 100644 index 00000000..50f810ab --- /dev/null +++ b/runtime/proto/runtime.pb.go @@ -0,0 +1,366 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: micro/go-micro/runtime/proto/runtime.proto + +package go_micro_runtime + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Service struct { + // name of the service + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + // git url of the source + Source string `protobuf:"bytes,2,opt,name=source,proto3" json:"source,omitempty"` + // local path of the source + Path string `protobuf:"bytes,3,opt,name=path,proto3" json:"path,omitempty"` + // command to execute + Exec string `protobuf:"bytes,4,opt,name=exec,proto3" json:"exec,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Service) Reset() { *m = Service{} } +func (m *Service) String() string { return proto.CompactTextString(m) } +func (*Service) ProtoMessage() {} +func (*Service) Descriptor() ([]byte, []int) { + return fileDescriptor_efcf18966add108e, []int{0} +} + +func (m *Service) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Service.Unmarshal(m, b) +} +func (m *Service) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Service.Marshal(b, m, deterministic) +} +func (m *Service) XXX_Merge(src proto.Message) { + xxx_messageInfo_Service.Merge(m, src) +} +func (m *Service) XXX_Size() int { + return xxx_messageInfo_Service.Size(m) +} +func (m *Service) XXX_DiscardUnknown() { + xxx_messageInfo_Service.DiscardUnknown(m) +} + +var xxx_messageInfo_Service proto.InternalMessageInfo + +func (m *Service) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *Service) GetSource() string { + if m != nil { + return m.Source + } + return "" +} + +func (m *Service) GetPath() string { + if m != nil { + return m.Path + } + return "" +} + +func (m *Service) GetExec() string { + if m != nil { + return m.Exec + } + return "" +} + +type CreateRequest struct { + Service *Service `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CreateRequest) Reset() { *m = CreateRequest{} } +func (m *CreateRequest) String() string { return proto.CompactTextString(m) } +func (*CreateRequest) ProtoMessage() {} +func (*CreateRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_efcf18966add108e, []int{1} +} + +func (m *CreateRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_CreateRequest.Unmarshal(m, b) +} +func (m *CreateRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_CreateRequest.Marshal(b, m, deterministic) +} +func (m *CreateRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_CreateRequest.Merge(m, src) +} +func (m *CreateRequest) XXX_Size() int { + return xxx_messageInfo_CreateRequest.Size(m) +} +func (m *CreateRequest) XXX_DiscardUnknown() { + xxx_messageInfo_CreateRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_CreateRequest proto.InternalMessageInfo + +func (m *CreateRequest) GetService() *Service { + if m != nil { + return m.Service + } + return nil +} + +type CreateResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CreateResponse) Reset() { *m = CreateResponse{} } +func (m *CreateResponse) String() string { return proto.CompactTextString(m) } +func (*CreateResponse) ProtoMessage() {} +func (*CreateResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_efcf18966add108e, []int{2} +} + +func (m *CreateResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_CreateResponse.Unmarshal(m, b) +} +func (m *CreateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_CreateResponse.Marshal(b, m, deterministic) +} +func (m *CreateResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_CreateResponse.Merge(m, src) +} +func (m *CreateResponse) XXX_Size() int { + return xxx_messageInfo_CreateResponse.Size(m) +} +func (m *CreateResponse) XXX_DiscardUnknown() { + xxx_messageInfo_CreateResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_CreateResponse proto.InternalMessageInfo + +type DeleteRequest struct { + Service *Service `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DeleteRequest) Reset() { *m = DeleteRequest{} } +func (m *DeleteRequest) String() string { return proto.CompactTextString(m) } +func (*DeleteRequest) ProtoMessage() {} +func (*DeleteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_efcf18966add108e, []int{3} +} + +func (m *DeleteRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DeleteRequest.Unmarshal(m, b) +} +func (m *DeleteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DeleteRequest.Marshal(b, m, deterministic) +} +func (m *DeleteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_DeleteRequest.Merge(m, src) +} +func (m *DeleteRequest) XXX_Size() int { + return xxx_messageInfo_DeleteRequest.Size(m) +} +func (m *DeleteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_DeleteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_DeleteRequest proto.InternalMessageInfo + +func (m *DeleteRequest) GetService() *Service { + if m != nil { + return m.Service + } + return nil +} + +type DeleteResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *DeleteResponse) Reset() { *m = DeleteResponse{} } +func (m *DeleteResponse) String() string { return proto.CompactTextString(m) } +func (*DeleteResponse) ProtoMessage() {} +func (*DeleteResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_efcf18966add108e, []int{4} +} + +func (m *DeleteResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_DeleteResponse.Unmarshal(m, b) +} +func (m *DeleteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_DeleteResponse.Marshal(b, m, deterministic) +} +func (m *DeleteResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_DeleteResponse.Merge(m, src) +} +func (m *DeleteResponse) XXX_Size() int { + return xxx_messageInfo_DeleteResponse.Size(m) +} +func (m *DeleteResponse) XXX_DiscardUnknown() { + xxx_messageInfo_DeleteResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_DeleteResponse proto.InternalMessageInfo + +func init() { + proto.RegisterType((*Service)(nil), "go.micro.runtime.Service") + proto.RegisterType((*CreateRequest)(nil), "go.micro.runtime.CreateRequest") + proto.RegisterType((*CreateResponse)(nil), "go.micro.runtime.CreateResponse") + proto.RegisterType((*DeleteRequest)(nil), "go.micro.runtime.DeleteRequest") + proto.RegisterType((*DeleteResponse)(nil), "go.micro.runtime.DeleteResponse") +} + +func init() { + proto.RegisterFile("micro/go-micro/runtime/proto/runtime.proto", fileDescriptor_efcf18966add108e) +} + +var fileDescriptor_efcf18966add108e = []byte{ + // 240 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x91, 0x41, 0x4b, 0xc4, 0x30, + 0x14, 0x84, 0xb7, 0xba, 0xb4, 0xf8, 0x44, 0x59, 0x72, 0x90, 0xe8, 0xc5, 0x25, 0x27, 0x11, 0xcc, + 0x42, 0xfb, 0x13, 0xec, 0xd5, 0x4b, 0x3c, 0x7b, 0xa8, 0xe1, 0x51, 0x0b, 0xb6, 0xa9, 0x49, 0x2a, + 0xfe, 0x23, 0xff, 0xa6, 0x24, 0x2f, 0x15, 0xaa, 0xf5, 0xe4, 0x6d, 0xde, 0x30, 0x7c, 0x33, 0x21, + 0x70, 0xdb, 0x77, 0xda, 0x9a, 0x43, 0x6b, 0xee, 0x48, 0xd8, 0x69, 0xf0, 0x5d, 0x8f, 0x87, 0xd1, + 0x1a, 0xff, 0x7d, 0xc9, 0x78, 0xb1, 0x5d, 0x6b, 0x64, 0x4c, 0xc9, 0xe4, 0x8b, 0x27, 0x28, 0x1e, + 0xd1, 0xbe, 0x77, 0x1a, 0x19, 0x83, 0xed, 0xd0, 0xf4, 0xc8, 0xb3, 0x7d, 0x76, 0x73, 0xa2, 0xa2, + 0x66, 0x17, 0x90, 0x3b, 0x33, 0x59, 0x8d, 0xfc, 0x28, 0xba, 0xe9, 0x0a, 0xd9, 0xb1, 0xf1, 0x2f, + 0xfc, 0x98, 0xb2, 0x41, 0x07, 0x0f, 0x3f, 0x50, 0xf3, 0x2d, 0x79, 0x41, 0x8b, 0x1a, 0xce, 0xee, + 0x2d, 0x36, 0x1e, 0x15, 0xbe, 0x4d, 0xe8, 0x3c, 0xab, 0xa0, 0x70, 0xd4, 0x17, 0x7b, 0x4e, 0xcb, + 0x4b, 0xf9, 0x73, 0x93, 0x4c, 0x83, 0xd4, 0x9c, 0x14, 0x3b, 0x38, 0x9f, 0x29, 0x6e, 0x34, 0x83, + 0xc3, 0xc0, 0xad, 0xf1, 0x15, 0xff, 0xcf, 0x9d, 0x29, 0xc4, 0x2d, 0x3f, 0x33, 0x28, 0x14, 0xc5, + 0xd9, 0x03, 0xe4, 0xd4, 0xca, 0xae, 0x7f, 0xb3, 0x16, 0xaf, 0xba, 0xda, 0xff, 0x1d, 0x48, 0x83, + 0x37, 0x01, 0x47, 0x65, 0x6b, 0xb8, 0xc5, 0x63, 0xd6, 0x70, 0xcb, 0x9d, 0x62, 0xf3, 0x9c, 0xc7, + 0x1f, 0xad, 0xbe, 0x02, 0x00, 0x00, 0xff, 0xff, 0x9a, 0xb6, 0x5a, 0xe7, 0xff, 0x01, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// RuntimeClient is the client API for Runtime service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type RuntimeClient interface { + Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error) + Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) +} + +type runtimeClient struct { + cc *grpc.ClientConn +} + +func NewRuntimeClient(cc *grpc.ClientConn) RuntimeClient { + return &runtimeClient{cc} +} + +func (c *runtimeClient) Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error) { + out := new(CreateResponse) + err := c.cc.Invoke(ctx, "/go.micro.runtime.Runtime/Create", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *runtimeClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) { + out := new(DeleteResponse) + err := c.cc.Invoke(ctx, "/go.micro.runtime.Runtime/Delete", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// RuntimeServer is the server API for Runtime service. +type RuntimeServer interface { + Create(context.Context, *CreateRequest) (*CreateResponse, error) + Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) +} + +func RegisterRuntimeServer(s *grpc.Server, srv RuntimeServer) { + s.RegisterService(&_Runtime_serviceDesc, srv) +} + +func _Runtime_Create_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CreateRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RuntimeServer).Create(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.runtime.Runtime/Create", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RuntimeServer).Create(ctx, req.(*CreateRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Runtime_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DeleteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RuntimeServer).Delete(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.runtime.Runtime/Delete", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RuntimeServer).Delete(ctx, req.(*DeleteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Runtime_serviceDesc = grpc.ServiceDesc{ + ServiceName: "go.micro.runtime.Runtime", + HandlerType: (*RuntimeServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Create", + Handler: _Runtime_Create_Handler, + }, + { + MethodName: "Delete", + Handler: _Runtime_Delete_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "micro/go-micro/runtime/proto/runtime.proto", +} diff --git a/runtime/proto/runtime.proto b/runtime/proto/runtime.proto new file mode 100644 index 00000000..6a3759e3 --- /dev/null +++ b/runtime/proto/runtime.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package go.micro.runtime; + +service Runtime { + rpc Create(CreateRequest) returns (CreateResponse) {}; + rpc Delete(DeleteRequest) returns (DeleteResponse) {}; +} + +message Service { + // name of the service + string name = 1; + // git url of the source + string source = 2; + // local path of the source + string path = 3; + // command to execute + string exec = 4; +} + +message CreateRequest { + Service service = 1; +} + +message CreateResponse {} + +message DeleteRequest { + Service service = 1; +} + +message DeleteResponse {}