Merge pull request #1 from micro/master

merge
This commit is contained in:
Shu xian 2019-08-27 09:38:18 +08:00 committed by GitHub
commit 75e20b5bf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 894 additions and 336 deletions

View File

@ -31,37 +31,37 @@ var _ context.Context
var _ client.Option var _ client.Option
var _ server.Option var _ server.Option
// Client API for Micro service // Client API for Client service
type MicroService interface { type ClientService interface {
// Call allows a single request to be made // Call allows a single request to be made
Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error)
// Stream is a bidirectional stream // Stream is a bidirectional stream
Stream(ctx context.Context, opts ...client.CallOption) (Micro_StreamService, error) Stream(ctx context.Context, opts ...client.CallOption) (Client_StreamService, error)
// Publish publishes a message and returns an empty Message // Publish publishes a message and returns an empty Message
Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*Message, error) Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*Message, error)
} }
type microService struct { type clientService struct {
c client.Client c client.Client
name string name string
} }
func NewMicroService(name string, c client.Client) MicroService { func NewClientService(name string, c client.Client) ClientService {
if c == nil { if c == nil {
c = client.NewClient() c = client.NewClient()
} }
if len(name) == 0 { if len(name) == 0 {
name = "go.micro.client" name = "go.micro.client"
} }
return &microService{ return &clientService{
c: c, c: c,
name: name, name: name,
} }
} }
func (c *microService) Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) { func (c *clientService) Call(ctx context.Context, in *Request, opts ...client.CallOption) (*Response, error) {
req := c.c.NewRequest(c.name, "Micro.Call", in) req := c.c.NewRequest(c.name, "Client.Call", in)
out := new(Response) out := new(Response)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
@ -70,16 +70,16 @@ func (c *microService) Call(ctx context.Context, in *Request, opts ...client.Cal
return out, nil return out, nil
} }
func (c *microService) Stream(ctx context.Context, opts ...client.CallOption) (Micro_StreamService, error) { func (c *clientService) Stream(ctx context.Context, opts ...client.CallOption) (Client_StreamService, error) {
req := c.c.NewRequest(c.name, "Micro.Stream", &Request{}) req := c.c.NewRequest(c.name, "Client.Stream", &Request{})
stream, err := c.c.Stream(ctx, req, opts...) stream, err := c.c.Stream(ctx, req, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &microServiceStream{stream}, nil return &clientServiceStream{stream}, nil
} }
type Micro_StreamService interface { type Client_StreamService interface {
SendMsg(interface{}) error SendMsg(interface{}) error
RecvMsg(interface{}) error RecvMsg(interface{}) error
Close() error Close() error
@ -87,27 +87,27 @@ type Micro_StreamService interface {
Recv() (*Response, error) Recv() (*Response, error)
} }
type microServiceStream struct { type clientServiceStream struct {
stream client.Stream stream client.Stream
} }
func (x *microServiceStream) Close() error { func (x *clientServiceStream) Close() error {
return x.stream.Close() return x.stream.Close()
} }
func (x *microServiceStream) SendMsg(m interface{}) error { func (x *clientServiceStream) SendMsg(m interface{}) error {
return x.stream.Send(m) return x.stream.Send(m)
} }
func (x *microServiceStream) RecvMsg(m interface{}) error { func (x *clientServiceStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m) return x.stream.Recv(m)
} }
func (x *microServiceStream) Send(m *Request) error { func (x *clientServiceStream) Send(m *Request) error {
return x.stream.Send(m) return x.stream.Send(m)
} }
func (x *microServiceStream) Recv() (*Response, error) { func (x *clientServiceStream) Recv() (*Response, error) {
m := new(Response) m := new(Response)
err := x.stream.Recv(m) err := x.stream.Recv(m)
if err != nil { if err != nil {
@ -116,8 +116,8 @@ func (x *microServiceStream) Recv() (*Response, error) {
return m, nil return m, nil
} }
func (c *microService) Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*Message, error) { func (c *clientService) Publish(ctx context.Context, in *Message, opts ...client.CallOption) (*Message, error) {
req := c.c.NewRequest(c.name, "Micro.Publish", in) req := c.c.NewRequest(c.name, "Client.Publish", in)
out := new(Message) out := new(Message)
err := c.c.Call(ctx, req, out, opts...) err := c.c.Call(ctx, req, out, opts...)
if err != nil { if err != nil {
@ -126,43 +126,43 @@ func (c *microService) Publish(ctx context.Context, in *Message, opts ...client.
return out, nil return out, nil
} }
// Server API for Micro service // Server API for Client service
type MicroHandler interface { type ClientHandler interface {
// Call allows a single request to be made // Call allows a single request to be made
Call(context.Context, *Request, *Response) error Call(context.Context, *Request, *Response) error
// Stream is a bidirectional stream // Stream is a bidirectional stream
Stream(context.Context, Micro_StreamStream) error Stream(context.Context, Client_StreamStream) error
// Publish publishes a message and returns an empty Message // Publish publishes a message and returns an empty Message
Publish(context.Context, *Message, *Message) error Publish(context.Context, *Message, *Message) error
} }
func RegisterMicroHandler(s server.Server, hdlr MicroHandler, opts ...server.HandlerOption) error { func RegisterClientHandler(s server.Server, hdlr ClientHandler, opts ...server.HandlerOption) error {
type micro interface { type client interface {
Call(ctx context.Context, in *Request, out *Response) error Call(ctx context.Context, in *Request, out *Response) error
Stream(ctx context.Context, stream server.Stream) error Stream(ctx context.Context, stream server.Stream) error
Publish(ctx context.Context, in *Message, out *Message) error Publish(ctx context.Context, in *Message, out *Message) error
} }
type Micro struct { type Client struct {
micro client
} }
h := &microHandler{hdlr} h := &clientHandler{hdlr}
return s.Handle(s.NewHandler(&Micro{h}, opts...)) return s.Handle(s.NewHandler(&Client{h}, opts...))
} }
type microHandler struct { type clientHandler struct {
MicroHandler ClientHandler
} }
func (h *microHandler) Call(ctx context.Context, in *Request, out *Response) error { func (h *clientHandler) Call(ctx context.Context, in *Request, out *Response) error {
return h.MicroHandler.Call(ctx, in, out) return h.ClientHandler.Call(ctx, in, out)
} }
func (h *microHandler) Stream(ctx context.Context, stream server.Stream) error { func (h *clientHandler) Stream(ctx context.Context, stream server.Stream) error {
return h.MicroHandler.Stream(ctx, &microStreamStream{stream}) return h.ClientHandler.Stream(ctx, &clientStreamStream{stream})
} }
type Micro_StreamStream interface { type Client_StreamStream interface {
SendMsg(interface{}) error SendMsg(interface{}) error
RecvMsg(interface{}) error RecvMsg(interface{}) error
Close() error Close() error
@ -170,27 +170,27 @@ type Micro_StreamStream interface {
Recv() (*Request, error) Recv() (*Request, error)
} }
type microStreamStream struct { type clientStreamStream struct {
stream server.Stream stream server.Stream
} }
func (x *microStreamStream) Close() error { func (x *clientStreamStream) Close() error {
return x.stream.Close() return x.stream.Close()
} }
func (x *microStreamStream) SendMsg(m interface{}) error { func (x *clientStreamStream) SendMsg(m interface{}) error {
return x.stream.Send(m) return x.stream.Send(m)
} }
func (x *microStreamStream) RecvMsg(m interface{}) error { func (x *clientStreamStream) RecvMsg(m interface{}) error {
return x.stream.Recv(m) return x.stream.Recv(m)
} }
func (x *microStreamStream) Send(m *Response) error { func (x *clientStreamStream) Send(m *Response) error {
return x.stream.Send(m) return x.stream.Send(m)
} }
func (x *microStreamStream) Recv() (*Request, error) { func (x *clientStreamStream) Recv() (*Request, error) {
m := new(Request) m := new(Request)
if err := x.stream.Recv(m); err != nil { if err := x.stream.Recv(m); err != nil {
return nil, err return nil, err
@ -198,6 +198,6 @@ func (x *microStreamStream) Recv() (*Request, error) {
return m, nil return m, nil
} }
func (h *microHandler) Publish(ctx context.Context, in *Message, out *Message) error { func (h *clientHandler) Publish(ctx context.Context, in *Message, out *Message) error {
return h.MicroHandler.Publish(ctx, in, out) return h.ClientHandler.Publish(ctx, in, out)
} }

View File

@ -191,23 +191,23 @@ func init() {
var fileDescriptor_7d733ae29171347b = []byte{ var fileDescriptor_7d733ae29171347b = []byte{
// 270 bytes of a gzipped FileDescriptorProto // 270 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x91, 0x3f, 0x4f, 0xc3, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x91, 0x41, 0x4b, 0xc3, 0x40,
0x10, 0xc5, 0xeb, 0xfe, 0x4b, 0x39, 0x2a, 0x21, 0x9d, 0x18, 0x4c, 0x06, 0x54, 0x32, 0x65, 0xc1, 0x10, 0x85, 0xbb, 0x6d, 0x4c, 0xea, 0x58, 0x10, 0x06, 0x0f, 0x6b, 0x0e, 0x52, 0x73, 0xca, 0xc5,
0x45, 0x30, 0x23, 0x86, 0xce, 0x95, 0x50, 0x40, 0xac, 0x28, 0x71, 0x4f, 0xc1, 0x52, 0x6a, 0x9b, 0x54, 0xf4, 0x2c, 0x1e, 0x72, 0x16, 0x24, 0x8a, 0x57, 0x49, 0xb6, 0x43, 0x5c, 0x48, 0x77, 0xd7,
0xd8, 0xad, 0x94, 0xef, 0xc8, 0x87, 0x42, 0x38, 0x29, 0x45, 0xd0, 0x2e, 0x6c, 0xf7, 0xee, 0x67, 0xec, 0xb6, 0x90, 0x1f, 0xe9, 0x7f, 0x12, 0x36, 0xa9, 0x15, 0x6d, 0x2f, 0xbd, 0xcd, 0x9b, 0x6f,
0xbd, 0x3b, 0xbf, 0x83, 0x74, 0xad, 0x64, 0x6d, 0xe6, 0xa5, 0xb9, 0x6e, 0x0b, 0x59, 0x29, 0xd2, 0x79, 0x33, 0xfb, 0x06, 0xd2, 0x95, 0x14, 0xad, 0x5e, 0xd4, 0xfa, 0xa6, 0x2f, 0x44, 0x23, 0x49,
0x7e, 0x6e, 0x6b, 0xe3, 0x77, 0x42, 0x04, 0x81, 0x67, 0xa5, 0x11, 0xe1, 0x8d, 0x68, 0xdb, 0xc9, 0xb9, 0x85, 0x69, 0xb5, 0xdb, 0x8a, 0xcc, 0x0b, 0x3c, 0xaf, 0x75, 0xe6, 0xdf, 0x64, 0x7d, 0x3b,
0x16, 0xa2, 0x8c, 0xde, 0x37, 0xe4, 0x3c, 0x72, 0x88, 0x1c, 0xd5, 0x5b, 0x25, 0x89, 0xb3, 0x19, 0xd9, 0x40, 0x54, 0xd0, 0xe7, 0x9a, 0xac, 0x43, 0x0e, 0x91, 0xa5, 0x76, 0x23, 0x05, 0x71, 0x36,
0x4b, 0x4f, 0xb2, 0x9d, 0xc4, 0x18, 0x26, 0xa4, 0x57, 0xd6, 0x28, 0xed, 0x79, 0x3f, 0xa0, 0x6f, 0x67, 0xe9, 0x69, 0xb1, 0x95, 0x18, 0xc3, 0x94, 0xd4, 0xd2, 0x68, 0xa9, 0x1c, 0x1f, 0x7b, 0xf4,
0x8d, 0x57, 0x30, 0x95, 0x46, 0x7b, 0xd2, 0xfe, 0xd5, 0x37, 0x96, 0xf8, 0x20, 0xf0, 0xd3, 0xae, 0xa3, 0xf1, 0x1a, 0x66, 0x42, 0x2b, 0x47, 0xca, 0xbd, 0xbb, 0xce, 0x10, 0x9f, 0x78, 0x7e, 0x36,
0xf7, 0xdc, 0x58, 0x42, 0x84, 0x61, 0x61, 0x56, 0x0d, 0x1f, 0xce, 0x58, 0x3a, 0xcd, 0x42, 0x9d, 0xf4, 0x5e, 0x3b, 0x43, 0x88, 0x10, 0x54, 0x7a, 0xd9, 0xf1, 0x60, 0xce, 0xd2, 0x59, 0xe1, 0xeb,
0x5c, 0xc2, 0x24, 0x23, 0x67, 0x8d, 0x76, 0x7b, 0xce, 0x7e, 0xf0, 0x17, 0x88, 0x96, 0xe4, 0x5c, 0xe4, 0x0a, 0xa6, 0x05, 0x59, 0xa3, 0x95, 0xdd, 0x71, 0xf6, 0x8b, 0xbf, 0x41, 0xf4, 0x44, 0xd6,
0x5e, 0x12, 0x9e, 0xc3, 0xc8, 0x1b, 0xab, 0x64, 0xb7, 0x55, 0x2b, 0xfe, 0xcc, 0xed, 0x1f, 0x9f, 0x96, 0x35, 0xe1, 0x05, 0x9c, 0x38, 0x6d, 0xa4, 0x18, 0xb6, 0xea, 0xc5, 0xbf, 0xb9, 0xe3, 0xc3,
0x3b, 0xd8, 0xfb, 0xde, 0x7e, 0x30, 0x18, 0x2d, 0xbf, 0x02, 0xc0, 0x7b, 0x18, 0x2e, 0xf2, 0xaa, 0x73, 0x27, 0x3b, 0xdf, 0xbb, 0x2f, 0x06, 0x61, 0xee, 0xbf, 0x8e, 0x0f, 0x10, 0xe4, 0x65, 0xd3,
0x42, 0x2e, 0x7e, 0x65, 0x22, 0xba, 0x40, 0xe2, 0x8b, 0x03, 0xa4, 0x5d, 0x39, 0xe9, 0xe1, 0x02, 0x20, 0xcf, 0xfe, 0x84, 0x92, 0x0d, 0x89, 0xc4, 0x97, 0x7b, 0x48, 0xbf, 0x73, 0x32, 0xc2, 0x1c,
0xc6, 0x4f, 0xbe, 0xa6, 0x7c, 0xfd, 0x4f, 0x83, 0x94, 0xdd, 0x30, 0x7c, 0x80, 0xe8, 0x71, 0x53, 0xc2, 0x17, 0xd7, 0x52, 0xb9, 0x3a, 0xd2, 0x20, 0x65, 0xb7, 0x0c, 0x1f, 0x21, 0x7a, 0x5e, 0x57,
0x54, 0xca, 0xbd, 0x1d, 0x70, 0xe9, 0xfe, 0x1f, 0x1f, 0x25, 0x49, 0xaf, 0x18, 0x87, 0xb3, 0xde, 0x8d, 0xb4, 0x1f, 0x7b, 0x5c, 0x86, 0x00, 0xe2, 0x83, 0x24, 0x19, 0x55, 0xa1, 0xbf, 0xeb, 0xfd,
0x7d, 0x06, 0x00, 0x00, 0xff, 0xff, 0xd3, 0x63, 0x94, 0x1a, 0x02, 0x02, 0x00, 0x00, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0a, 0x76, 0x1f, 0x51, 0x03, 0x02, 0x00, 0x00,
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.
@ -218,59 +218,59 @@ var _ grpc.ClientConn
// is compatible with the grpc package it is being compiled against. // is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4 const _ = grpc.SupportPackageIsVersion4
// MicroClient is the client API for Micro service. // ClientClient is the client API for Client service.
// //
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type MicroClient interface { type ClientClient interface {
// Call allows a single request to be made // Call allows a single request to be made
Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
// Stream is a bidirectional stream // Stream is a bidirectional stream
Stream(ctx context.Context, opts ...grpc.CallOption) (Micro_StreamClient, error) Stream(ctx context.Context, opts ...grpc.CallOption) (Client_StreamClient, error)
// Publish publishes a message and returns an empty Message // Publish publishes a message and returns an empty Message
Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error)
} }
type microClient struct { type clientClient struct {
cc *grpc.ClientConn cc *grpc.ClientConn
} }
func NewMicroClient(cc *grpc.ClientConn) MicroClient { func NewClientClient(cc *grpc.ClientConn) ClientClient {
return &microClient{cc} return &clientClient{cc}
} }
func (c *microClient) Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) { func (c *clientClient) Call(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) {
out := new(Response) out := new(Response)
err := c.cc.Invoke(ctx, "/go.micro.client.Micro/Call", in, out, opts...) err := c.cc.Invoke(ctx, "/go.micro.client.Client/Call", in, out, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return out, nil return out, nil
} }
func (c *microClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Micro_StreamClient, error) { func (c *clientClient) Stream(ctx context.Context, opts ...grpc.CallOption) (Client_StreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Micro_serviceDesc.Streams[0], "/go.micro.client.Micro/Stream", opts...) stream, err := c.cc.NewStream(ctx, &_Client_serviceDesc.Streams[0], "/go.micro.client.Client/Stream", opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
x := &microStreamClient{stream} x := &clientStreamClient{stream}
return x, nil return x, nil
} }
type Micro_StreamClient interface { type Client_StreamClient interface {
Send(*Request) error Send(*Request) error
Recv() (*Response, error) Recv() (*Response, error)
grpc.ClientStream grpc.ClientStream
} }
type microStreamClient struct { type clientStreamClient struct {
grpc.ClientStream grpc.ClientStream
} }
func (x *microStreamClient) Send(m *Request) error { func (x *clientStreamClient) Send(m *Request) error {
return x.ClientStream.SendMsg(m) return x.ClientStream.SendMsg(m)
} }
func (x *microStreamClient) Recv() (*Response, error) { func (x *clientStreamClient) Recv() (*Response, error) {
m := new(Response) m := new(Response)
if err := x.ClientStream.RecvMsg(m); err != nil { if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err return nil, err
@ -278,66 +278,66 @@ func (x *microStreamClient) Recv() (*Response, error) {
return m, nil return m, nil
} }
func (c *microClient) Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) { func (c *clientClient) Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) {
out := new(Message) out := new(Message)
err := c.cc.Invoke(ctx, "/go.micro.client.Micro/Publish", in, out, opts...) err := c.cc.Invoke(ctx, "/go.micro.client.Client/Publish", in, out, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return out, nil return out, nil
} }
// MicroServer is the server API for Micro service. // ClientServer is the server API for Client service.
type MicroServer interface { type ClientServer interface {
// Call allows a single request to be made // Call allows a single request to be made
Call(context.Context, *Request) (*Response, error) Call(context.Context, *Request) (*Response, error)
// Stream is a bidirectional stream // Stream is a bidirectional stream
Stream(Micro_StreamServer) error Stream(Client_StreamServer) error
// Publish publishes a message and returns an empty Message // Publish publishes a message and returns an empty Message
Publish(context.Context, *Message) (*Message, error) Publish(context.Context, *Message) (*Message, error)
} }
func RegisterMicroServer(s *grpc.Server, srv MicroServer) { func RegisterClientServer(s *grpc.Server, srv ClientServer) {
s.RegisterService(&_Micro_serviceDesc, srv) s.RegisterService(&_Client_serviceDesc, srv)
} }
func _Micro_Call_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _Client_Call_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Request) in := new(Request)
if err := dec(in); err != nil { if err := dec(in); err != nil {
return nil, err return nil, err
} }
if interceptor == nil { if interceptor == nil {
return srv.(MicroServer).Call(ctx, in) return srv.(ClientServer).Call(ctx, in)
} }
info := &grpc.UnaryServerInfo{ info := &grpc.UnaryServerInfo{
Server: srv, Server: srv,
FullMethod: "/go.micro.client.Micro/Call", FullMethod: "/go.micro.client.Client/Call",
} }
handler := func(ctx context.Context, req interface{}) (interface{}, error) { handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MicroServer).Call(ctx, req.(*Request)) return srv.(ClientServer).Call(ctx, req.(*Request))
} }
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _Micro_Stream_Handler(srv interface{}, stream grpc.ServerStream) error { func _Client_Stream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(MicroServer).Stream(&microStreamServer{stream}) return srv.(ClientServer).Stream(&clientStreamServer{stream})
} }
type Micro_StreamServer interface { type Client_StreamServer interface {
Send(*Response) error Send(*Response) error
Recv() (*Request, error) Recv() (*Request, error)
grpc.ServerStream grpc.ServerStream
} }
type microStreamServer struct { type clientStreamServer struct {
grpc.ServerStream grpc.ServerStream
} }
func (x *microStreamServer) Send(m *Response) error { func (x *clientStreamServer) Send(m *Response) error {
return x.ServerStream.SendMsg(m) return x.ServerStream.SendMsg(m)
} }
func (x *microStreamServer) Recv() (*Request, error) { func (x *clientStreamServer) Recv() (*Request, error) {
m := new(Request) m := new(Request)
if err := x.ServerStream.RecvMsg(m); err != nil { if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err return nil, err
@ -345,41 +345,41 @@ func (x *microStreamServer) Recv() (*Request, error) {
return m, nil return m, nil
} }
func _Micro_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _Client_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Message) in := new(Message)
if err := dec(in); err != nil { if err := dec(in); err != nil {
return nil, err return nil, err
} }
if interceptor == nil { if interceptor == nil {
return srv.(MicroServer).Publish(ctx, in) return srv.(ClientServer).Publish(ctx, in)
} }
info := &grpc.UnaryServerInfo{ info := &grpc.UnaryServerInfo{
Server: srv, Server: srv,
FullMethod: "/go.micro.client.Micro/Publish", FullMethod: "/go.micro.client.Client/Publish",
} }
handler := func(ctx context.Context, req interface{}) (interface{}, error) { handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MicroServer).Publish(ctx, req.(*Message)) return srv.(ClientServer).Publish(ctx, req.(*Message))
} }
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
var _Micro_serviceDesc = grpc.ServiceDesc{ var _Client_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.client.Micro", ServiceName: "go.micro.client.Client",
HandlerType: (*MicroServer)(nil), HandlerType: (*ClientServer)(nil),
Methods: []grpc.MethodDesc{ Methods: []grpc.MethodDesc{
{ {
MethodName: "Call", MethodName: "Call",
Handler: _Micro_Call_Handler, Handler: _Client_Call_Handler,
}, },
{ {
MethodName: "Publish", MethodName: "Publish",
Handler: _Micro_Publish_Handler, Handler: _Client_Publish_Handler,
}, },
}, },
Streams: []grpc.StreamDesc{ Streams: []grpc.StreamDesc{
{ {
StreamName: "Stream", StreamName: "Stream",
Handler: _Micro_Stream_Handler, Handler: _Client_Stream_Handler,
ServerStreams: true, ServerStreams: true,
ClientStreams: true, ClientStreams: true,
}, },

View File

@ -2,8 +2,8 @@ syntax = "proto3";
package go.micro.client; package go.micro.client;
// Micro is the micro client interface // Client is the micro client interface
service Micro { service Client {
// Call allows a single request to be made // Call allows a single request to be made
rpc Call(Request) returns (Response) {}; rpc Call(Request) returns (Response) {};
// Stream is a bidirectional stream // Stream is a bidirectional stream

View File

@ -89,9 +89,22 @@ func (c *Codec) Write(m *codec.Message, b interface{}) error {
m.Header[":authority"] = m.Target m.Header[":authority"] = m.Target
m.Header["content-type"] = c.ContentType m.Header["content-type"] = c.ContentType
case codec.Response: case codec.Response:
m.Header["Trailer"] = "grpc-status, grpc-message" m.Header["Trailer"] = "grpc-status" //, grpc-message"
m.Header["content-type"] = c.ContentType
m.Header[":status"] = "200"
m.Header["grpc-status"] = "0" m.Header["grpc-status"] = "0"
m.Header["grpc-message"] = "" // m.Header["grpc-message"] = ""
case codec.Error:
m.Header["Trailer"] = "grpc-status, grpc-message"
// micro end of stream
if m.Error == "EOS" {
m.Header["grpc-status"] = "0"
} else {
m.Header["grpc-message"] = m.Error
m.Header["grpc-status"] = "13"
}
return nil
} }
// marshal content // marshal content

View File

@ -1,7 +1,6 @@
package config package config
import ( import (
"encoding/json"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
@ -9,7 +8,6 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/uuid"
"github.com/micro/go-micro/config/source/env" "github.com/micro/go-micro/config/source/env"
"github.com/micro/go-micro/config/source/file" "github.com/micro/go-micro/config/source/file"
) )
@ -122,57 +120,3 @@ func TestConfigMerge(t *testing.T) {
actualHost) actualHost)
} }
} }
func TestFileChange(t *testing.T) {
// create a temp file
fileName := uuid.New().String() + "testWatcher.json"
f, err := os.OpenFile("."+sep+fileName, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
t.Error(err)
}
defer f.Close()
defer os.Remove("." + sep + fileName)
// load the file
if err := Load(file.NewSource(
file.WithPath("." + sep + fileName),
)); err != nil {
t.Error(err)
}
// watch changes
watcher, err := Watch()
if err != nil {
t.Error(err)
}
changeTimes := 0
go func() {
for {
v, err := watcher.Next()
if err != nil {
t.Error(err)
return
}
changeTimes++
t.Logf("file change%s", string(v.Bytes()))
}
}()
content := map[int]string{}
// change the file
for i := 0; i < 5; i++ {
content[i] = time.Now().String()
bytes, _ := json.Marshal(content)
f.Truncate(0)
f.Seek(0, 0)
if _, err := f.Write(bytes); err != nil {
t.Error(err)
}
time.Sleep(time.Second)
}
if changeTimes != 4 {
t.Error(fmt.Errorf("watcher error: change times %d is not enough", changeTimes))
}
}

View File

@ -1,7 +1,6 @@
package consul package consul
import ( import (
"errors"
"time" "time"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
@ -80,7 +79,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
case cs := <-w.ch: case cs := <-w.ch:
return cs, nil return cs, nil
case <-w.exit: case <-w.exit:
return nil, errors.New("watcher stopped") return nil, source.ErrWatcherStopped
} }
} }

View File

@ -86,8 +86,8 @@ func TestEnvvar_Prefixes(t *testing.T) {
} }
func TestEnvvar_WatchNextNoOpsUntilStop(t *testing.T) { func TestEnvvar_WatchNextNoOpsUntilStop(t *testing.T) {
source := NewSource(WithStrippedPrefix("GOMICRO_")) src := NewSource(WithStrippedPrefix("GOMICRO_"))
w, err := source.Watch() w, err := src.Watch()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -97,7 +97,7 @@ func TestEnvvar_WatchNextNoOpsUntilStop(t *testing.T) {
w.Stop() w.Stop()
}() }()
if _, err := w.Next(); err.Error() != "watcher stopped" { if _, err := w.Next(); err != source.ErrWatcherStopped {
t.Errorf("expected watcher stopped error, got %v", err) t.Errorf("expected watcher stopped error, got %v", err)
} }
} }

View File

@ -1,8 +1,6 @@
package env package env
import ( import (
"errors"
"github.com/micro/go-micro/config/source" "github.com/micro/go-micro/config/source"
) )
@ -13,7 +11,7 @@ type watcher struct {
func (w *watcher) Next() (*source.ChangeSet, error) { func (w *watcher) Next() (*source.ChangeSet, error) {
<-w.exit <-w.exit
return nil, errors.New("watcher stopped") return nil, source.ErrWatcherStopped
} }
func (w *watcher) Stop() error { func (w *watcher) Stop() error {

View File

@ -3,7 +3,6 @@
package file package file
import ( import (
"errors"
"os" "os"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
@ -36,7 +35,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
// is it closed? // is it closed?
select { select {
case <-w.exit: case <-w.exit:
return nil, errors.New("watcher stopped") return nil, source.ErrWatcherStopped
default: default:
} }
@ -59,7 +58,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
case err := <-w.fw.Errors: case err := <-w.fw.Errors:
return nil, err return nil, err
case <-w.exit: case <-w.exit:
return nil, errors.New("watcher stopped") return nil, source.ErrWatcherStopped
} }
} }

View File

@ -3,7 +3,6 @@
package file package file
import ( import (
"errors"
"os" "os"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
@ -36,7 +35,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
// is it closed? // is it closed?
select { select {
case <-w.exit: case <-w.exit:
return nil, errors.New("watcher stopped") return nil, source.ErrWatcherStopped
default: default:
} }
@ -63,7 +62,7 @@ func (w *watcher) Next() (*source.ChangeSet, error) {
case err := <-w.fw.Errors: case err := <-w.fw.Errors:
return nil, err return nil, err
case <-w.exit: case <-w.exit:
return nil, errors.New("watcher stopped") return nil, source.ErrWatcherStopped
} }
} }

View File

@ -2,9 +2,15 @@
package source package source
import ( import (
"errors"
"time" "time"
) )
var (
// ErrWatcherStopped is returned when source watcher has been stopped
ErrWatcherStopped = errors.New("watcher stopped")
)
// Source is the source from which config is loaded // Source is the source from which config is loaded
type Source interface { type Source interface {
Read() (*ChangeSet, error) Read() (*ChangeSet, error)

View File

@ -0,0 +1,33 @@
// Package static is a static resolver
package registry
import (
"github.com/micro/go-micro/network/resolver"
)
// Resolver returns a static list of nodes. In the event the node list
// is not present it will return the name of the network passed in.
type Resolver struct {
// A static list of nodes
Nodes []string
}
// Resolve returns the list of nodes
func (r *Resolver) Resolve(name string) ([]*resolver.Record, error) {
// if there are no nodes just return the name
if len(r.Nodes) == 0 {
return []*resolver.Record{
{Address: name},
}, nil
}
var records []*resolver.Record
for _, node := range r.Nodes {
records = append(records, &resolver.Record{
Address: node,
})
}
return records, nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/micro/go-micro/client/grpc" "github.com/micro/go-micro/client/grpc"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/proxy" "github.com/micro/go-micro/proxy"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
) )
@ -61,6 +62,10 @@ func readLoop(r server.Request, s client.Stream) error {
} }
} }
func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error {
return errors.InternalServerError("go.micro.proxy.grpc", "SendRequest is unsupported")
}
// ServeRequest honours the server.Proxy interface // ServeRequest honours the server.Proxy interface
func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
// set default client // set default client

View File

@ -10,6 +10,7 @@ import (
"net/url" "net/url"
"path" "path"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/proxy" "github.com/micro/go-micro/proxy"
@ -44,6 +45,10 @@ func getEndpoint(hdr map[string]string) string {
return "" return ""
} }
func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error {
return errors.InternalServerError("go.micro.proxy.http", "SendRequest is unsupported")
}
// ServeRequest honours the server.Router interface // ServeRequest honours the server.Router interface
func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
if p.Endpoint == "" { if p.Endpoint == "" {

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"sort"
"strings" "strings"
"sync" "sync"
@ -12,6 +13,7 @@ import (
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/config/options" "github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/proxy" "github.com/micro/go-micro/proxy"
"github.com/micro/go-micro/router" "github.com/micro/go-micro/router"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
@ -26,9 +28,12 @@ type Proxy struct {
// Endpoint specifies the fixed service endpoint to call. // Endpoint specifies the fixed service endpoint to call.
Endpoint string Endpoint string
// The client to use for outbound requests // The client to use for outbound requests in the local network
Client client.Client Client client.Client
// Links are used for outbound requests not in the local network
Links map[string]client.Client
// The router for routes // The router for routes
Router router.Router Router router.Router
@ -76,7 +81,7 @@ func readLoop(r server.Request, s client.Stream) error {
} }
// toNodes returns a list of node addresses from given routes // toNodes returns a list of node addresses from given routes
func toNodes(routes map[uint64]router.Route) []string { func toNodes(routes []router.Route) []string {
var nodes []string var nodes []string
for _, node := range routes { for _, node := range routes {
address := node.Address address := node.Address
@ -88,15 +93,37 @@ func toNodes(routes map[uint64]router.Route) []string {
return nodes return nodes
} }
func (p *Proxy) getRoute(service string) ([]string, error) { func (p *Proxy) getLink(r router.Route) (client.Client, error) {
if r.Link == "local" || len(p.Links) == 0 {
return p.Client, nil
}
l, ok := p.Links[r.Link]
if !ok {
return nil, errors.InternalServerError("go.micro.proxy", "link not found")
}
return l, nil
}
func (p *Proxy) getRoute(service string) ([]router.Route, error) {
toSlice := func(r map[uint64]router.Route) []router.Route {
var routes []router.Route
for _, v := range r {
routes = append(routes, v)
}
// sort the routes in order of metric
sort.Slice(routes, func(i, j int) bool { return routes[i].Metric < routes[j].Metric })
return routes
}
// lookup the route cache first // lookup the route cache first
p.Lock() p.Lock()
routes, ok := p.Routes[service] routes, ok := p.Routes[service]
if ok { if ok {
p.Unlock() p.Unlock()
return toNodes(routes), nil return toSlice(routes), nil
} }
p.Routes[service] = make(map[uint64]router.Route)
p.Unlock() p.Unlock()
// lookup the routes in the router // lookup the routes in the router
@ -113,12 +140,16 @@ func (p *Proxy) getRoute(service string) ([]string, error) {
// update the proxy cache // update the proxy cache
p.Lock() p.Lock()
for _, route := range results { for _, route := range results {
// create if does not exist
if _, ok := p.Routes[service]; !ok {
p.Routes[service] = make(map[uint64]router.Route)
}
p.Routes[service][route.Hash()] = route p.Routes[service][route.Hash()] = route
} }
routes = p.Routes[service] routes = p.Routes[service]
p.Unlock() p.Unlock()
return toNodes(routes), nil return toSlice(routes), nil
} }
// manageRouteCache applies action on a given route to Proxy route cache // manageRouteCache applies action on a given route to Proxy route cache
@ -171,12 +202,27 @@ func (p *Proxy) watchRoutes() {
} }
} }
func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error {
return errors.InternalServerError("go.micro.proxy", "SendRequest is unsupported")
}
// ServeRequest honours the server.Router interface // ServeRequest honours the server.Router interface
func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
// service name // determine if its local routing
service := req.Service() var local bool
endpoint := req.Endpoint() // address to call
var addresses []string var addresses []string
// routes
var routes []router.Route
// service name to call
service := req.Service()
// endpoint to call
endpoint := req.Endpoint()
// are we network routing or local routing
if len(p.Links) == 0 {
local = true
}
// call a specific backend endpoint either by name or address // call a specific backend endpoint either by name or address
if len(p.Endpoint) > 0 { if len(p.Endpoint) > 0 {
@ -190,7 +236,7 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
return err return err
} }
// set the address // set the address
addresses = addr routes = addr
// set the name // set the name
service = p.Endpoint service = p.Endpoint
} }
@ -201,16 +247,66 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
if err != nil { if err != nil {
return err return err
} }
addresses = addr routes = addr
} }
// if the address is already set just serve it
// TODO: figure it out if we should know to pick a link
if len(addresses) > 0 {
// serve the normal way
return p.serveRequest(ctx, p.Client, service, endpoint, req, rsp, client.WithAddress(addresses...))
}
// there's no links e.g we're local routing then just serve it with addresses
if local {
var opts []client.CallOption var opts []client.CallOption
// set address if available // set address if available via routes or specific endpoint
if len(addresses) > 0 { if len(routes) > 0 {
addresses := toNodes(routes)
opts = append(opts, client.WithAddress(addresses...)) opts = append(opts, client.WithAddress(addresses...))
} }
// serve the normal way
return p.serveRequest(ctx, p.Client, service, endpoint, req, rsp, opts...)
}
var gerr error
// we're routing globally with multiple links
// so we need to pick a link per route
for _, route := range routes {
// pick the link or error out
link, err := p.getLink(route)
if err != nil {
// ok let's try again
gerr = err
continue
}
// set the address to call
addresses := toNodes([]router.Route{route})
// do the request with the link
gerr = p.serveRequest(ctx, link, service, endpoint, req, rsp, client.WithAddress(addresses...))
// return on no error since we succeeded
if gerr == nil {
return nil
}
// return where the context deadline was exceeded
if gerr == context.Canceled || gerr == context.DeadlineExceeded {
return err
}
// otherwise attempt to do it all over again
}
// if we got here something went really badly wrong
return gerr
}
func (p *Proxy) serveRequest(ctx context.Context, link client.Client, service, endpoint string, req server.Request, rsp server.Response, opts ...client.CallOption) error {
// read initial request // read initial request
body, err := req.Read() body, err := req.Read()
if err != nil { if err != nil {
@ -218,14 +314,14 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
} }
// create new request with raw bytes body // create new request with raw bytes body
creq := p.Client.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType())) creq := link.NewRequest(service, endpoint, &bytes.Frame{body}, client.WithContentType(req.ContentType()))
// not a stream so make a client.Call request // not a stream so make a client.Call request
if !req.Stream() { if !req.Stream() {
crsp := new(bytes.Frame) crsp := new(bytes.Frame)
// make a call to the backend // make a call to the backend
if err := p.Client.Call(ctx, creq, crsp, opts...); err != nil { if err := link.Call(ctx, creq, crsp, opts...); err != nil {
return err return err
} }
@ -238,7 +334,7 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
} }
// create new stream // create new stream
stream, err := p.Client.Stream(ctx, creq, opts...) stream, err := link.Stream(ctx, creq, opts...)
if err != nil { if err != nil {
return err return err
} }
@ -300,6 +396,7 @@ func NewSingleHostProxy(endpoint string) *Proxy {
// NewProxy returns a new proxy which will route based on mucp headers // NewProxy returns a new proxy which will route based on mucp headers
func NewProxy(opts ...options.Option) proxy.Proxy { func NewProxy(opts ...options.Option) proxy.Proxy {
p := new(Proxy) p := new(Proxy)
p.Links = map[string]client.Client{}
p.Options = options.NewOptions(opts...) p.Options = options.NewOptions(opts...)
p.Options.Init(options.WithString("mucp")) p.Options.Init(options.WithString("mucp"))
@ -320,6 +417,12 @@ func NewProxy(opts ...options.Option) proxy.Proxy {
p.Client = client.DefaultClient p.Client = client.DefaultClient
} }
// get client
links, ok := p.Options.Values().Get("proxy.links")
if ok {
p.Links = links.(map[string]client.Client)
}
// get router // get router
r, ok := p.Options.Values().Get("proxy.router") r, ok := p.Options.Values().Get("proxy.router")
if ok { if ok {

View File

@ -13,6 +13,8 @@ import (
// Proxy can be used as a proxy server for go-micro services // Proxy can be used as a proxy server for go-micro services
type Proxy interface { type Proxy interface {
options.Options options.Options
// SendRequest honours the client.Router interface
SendRequest(context.Context, client.Request, client.Response) error
// ServeRequest honours the server.Router interface // ServeRequest honours the server.Router interface
ServeRequest(context.Context, server.Request, server.Response) error ServeRequest(context.Context, server.Request, server.Response) error
} }
@ -35,3 +37,20 @@ func WithClient(c client.Client) options.Option {
func WithRouter(r router.Router) options.Option { func WithRouter(r router.Router) options.Option {
return options.WithValue("proxy.router", r) return options.WithValue("proxy.router", r)
} }
// WithLink sets a link for outbound requests
func WithLink(name string, c client.Client) options.Option {
return func(o *options.Values) error {
var links map[string]client.Client
v, ok := o.Get("proxy.links")
if ok {
links = v.(map[string]client.Client)
} else {
links = map[string]client.Client{}
}
links[name] = c
// save the links
o.Set("proxy.links", links)
return nil
}
}

View File

@ -181,10 +181,6 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro
// watchRegistry watches registry and updates routing table based on the received events. // watchRegistry watches registry and updates routing table based on the received events.
// It returns error if either the registry watcher fails with error or if the routing table update fails. // It returns error if either the registry watcher fails with error or if the routing table update fails.
func (r *router) watchRegistry(w registry.Watcher) error { func (r *router) watchRegistry(w registry.Watcher) error {
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
exit := make(chan bool) exit := make(chan bool)
defer func() { defer func() {
@ -193,6 +189,9 @@ func (r *router) watchRegistry(w registry.Watcher) error {
r.wg.Done() r.wg.Done()
}() }()
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() { go func() {
defer w.Stop() defer w.Stop()
@ -226,9 +225,6 @@ func (r *router) watchRegistry(w registry.Watcher) error {
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry // watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
// It returns error if the locally registered services either fails to be added/deleted to/from network registry. // It returns error if the locally registered services either fails to be added/deleted to/from network registry.
func (r *router) watchTable(w Watcher) error { func (r *router) watchTable(w Watcher) error {
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
exit := make(chan bool) exit := make(chan bool)
defer func() { defer func() {
@ -237,6 +233,9 @@ func (r *router) watchTable(w Watcher) error {
r.wg.Done() r.wg.Done()
}() }()
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() { go func() {
defer w.Stop() defer w.Stop()
@ -471,35 +470,51 @@ func (r *router) advertiseEvents() error {
} }
} }
// watchErrors watches router errors and takes appropriate actions // close closes exit channels
func (r *router) watchErrors() { func (r *router) close() {
var err error
select {
case <-r.exit:
case err = <-r.errChan:
}
r.Lock()
defer r.Unlock()
// if the router is not stopped, stop it
if r.status.Code != Stopped {
// notify all goroutines to finish // notify all goroutines to finish
close(r.exit) close(r.exit)
// drain the advertise channel only if the router is advertising // drain the advertise channel only if advertising
if r.status.Code == Advertising { if r.status.Code == Advertising {
// drain the event channel // drain the event channel
for range r.eventChan { for range r.eventChan {
} }
// close advert subscribers
for id, sub := range r.subscribers {
// close the channel
close(sub)
// delete the subscriber
delete(r.subscribers, id)
}
} }
// mark the router as Stopped and set its Error to nil // mark the router as Stopped and set its Error to nil
r.status = Status{Code: Stopped, Error: nil} r.status = Status{Code: Stopped, Error: nil}
} }
// watchErrors watches router errors and takes appropriate actions
func (r *router) watchErrors() {
var err error
select {
case <-r.exit:
return
case err = <-r.errChan:
}
r.Lock()
defer r.Unlock()
// if the router is not stopped, stop it
if r.status.Code != Stopped {
// close all the channels
r.close()
// set the status error
if err != nil { if err != nil {
r.status = Status{Code: Error, Error: err} r.status.Error = err
}
} }
} }
@ -508,6 +523,11 @@ func (r *router) Start() error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
// only start if we're stopped
if r.status.Code != Stopped {
return nil
}
// add all local service routes into the routing table // add all local service routes into the routing table
if err := r.manageRegistryRoutes(r.opts.Registry, "create"); err != nil { if err := r.manageRegistryRoutes(r.opts.Registry, "create"); err != nil {
e := fmt.Errorf("failed adding registry routes: %s", err) e := fmt.Errorf("failed adding registry routes: %s", err)
@ -660,10 +680,12 @@ func (r *router) Process(a *Advert) error {
return nil return nil
} }
// Lookup routes in the routing table
func (r *router) Lookup(q Query) ([]Route, error) { func (r *router) Lookup(q Query) ([]Route, error) {
return r.table.Query(q) return r.table.Query(q)
} }
// Watch routes
func (r *router) Watch(opts ...WatchOption) (Watcher, error) { func (r *router) Watch(opts ...WatchOption) (Watcher, error) {
return r.table.Watch(opts...) return r.table.Watch(opts...)
} }
@ -682,31 +704,15 @@ func (r *router) Status() Status {
// Stop stops the router // Stop stops the router
func (r *router) Stop() error { func (r *router) Stop() error {
r.Lock() r.Lock()
// only close the channel if the router is running and/or advertising defer r.Unlock()
if r.status.Code == Running || r.status.Code == Advertising {
// notify all goroutines to finish
close(r.exit)
// drain the advertise channel only if advertising switch r.status.Code {
if r.status.Code == Advertising { case Stopped, Error:
// drain the event channel return r.status.Error
for range r.eventChan { case Running, Advertising:
// close all the channels
r.close()
} }
}
// close advert subscribers
for id, sub := range r.subscribers {
// close the channel
close(sub)
// delete the subscriber
delete(r.subscribers, id)
}
// mark the router as Stopped and set its Error to nil
r.status = Status{Code: Stopped, Error: nil}
}
r.Unlock()
// wait for all goroutines to finish // wait for all goroutines to finish
r.wg.Wait() r.wg.Wait()
@ -716,5 +722,5 @@ func (r *router) Stop() error {
// String prints debugging information about router // String prints debugging information about router
func (r *router) String() string { func (r *router) String() string {
return "default" return "memory"
} }

View File

@ -53,6 +53,8 @@ type grpcServer struct {
opts server.Options opts server.Options
handlers map[string]server.Handler handlers map[string]server.Handler
subscribers map[*subscriber][]broker.Subscriber subscribers map[*subscriber][]broker.Subscriber
// marks the serve as started
started bool
// used for first registration // used for first registration
registered bool registered bool
} }
@ -454,7 +456,10 @@ func (g *grpcServer) newCodec(contentType string) (codec.NewCodec, error) {
} }
func (g *grpcServer) Options() server.Options { func (g *grpcServer) Options() server.Options {
g.RLock()
opts := g.opts opts := g.opts
g.RUnlock()
return opts return opts
} }
@ -700,7 +705,14 @@ func (g *grpcServer) Deregister() error {
} }
func (g *grpcServer) Start() error { func (g *grpcServer) Start() error {
config := g.opts g.RLock()
if g.started {
g.RUnlock()
return nil
}
g.RUnlock()
config := g.Options()
// micro: config.Transport.Listen(config.Address) // micro: config.Transport.Listen(config.Address)
ts, err := net.Listen("tcp", config.Address) ts, err := net.Listen("tcp", config.Address)
@ -781,13 +793,34 @@ func (g *grpcServer) Start() error {
config.Broker.Disconnect() config.Broker.Disconnect()
}() }()
// mark the server as started
g.Lock()
g.started = true
g.Unlock()
return nil return nil
} }
func (g *grpcServer) Stop() error { func (g *grpcServer) Stop() error {
g.RLock()
if !g.started {
g.RUnlock()
return nil
}
g.RUnlock()
ch := make(chan error) ch := make(chan error)
g.exit <- ch g.exit <- ch
return <-ch
var err error
select {
case err = <-ch:
g.Lock()
g.started = false
g.Unlock()
}
return err
} }
func (g *grpcServer) String() string { func (g *grpcServer) String() string {

View File

@ -18,6 +18,7 @@ type rpcCodec struct {
socket transport.Socket socket transport.Socket
codec codec.Codec codec codec.Codec
first bool first bool
protocol string
req *transport.Message req *transport.Message
buf *readWriteCloser buf *readWriteCloser
@ -157,12 +158,27 @@ func newRpcCodec(req *transport.Message, socket transport.Socket, c codec.NewCod
rbuf: bytes.NewBuffer(nil), rbuf: bytes.NewBuffer(nil),
wbuf: bytes.NewBuffer(nil), wbuf: bytes.NewBuffer(nil),
} }
r := &rpcCodec{ r := &rpcCodec{
buf: rwc, buf: rwc,
codec: c(rwc), codec: c(rwc),
req: req, req: req,
socket: socket, socket: socket,
protocol: "mucp",
} }
// if grpc pre-load the buffer
// TODO: remove this terrible hack
switch r.codec.String() {
case "grpc":
// set as first
r.first = true
// write the body
rwc.rbuf.Write(req.Body)
// set the protocol
r.protocol = "grpc"
}
return r return r
} }
@ -173,6 +189,8 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
Body: c.req.Body, Body: c.req.Body,
} }
// first message could be pre-loaded
if !c.first {
var tm transport.Message var tm transport.Message
// read off the socket // read off the socket
@ -194,6 +212,10 @@ func (c *rpcCodec) ReadHeader(r *codec.Message, t codec.MessageType) error {
// set req // set req
c.req = &tm c.req = &tm
}
// disable first
c.first = false
// set some internal things // set some internal things
getHeaders(&m) getHeaders(&m)
@ -293,5 +315,5 @@ func (c *rpcCodec) Close() error {
} }
func (c *rpcCodec) String() string { func (c *rpcCodec) String() string {
return "rpc" return c.protocol
} }

View File

@ -30,6 +30,8 @@ type rpcServer struct {
opts Options opts Options
handlers map[string]Handler handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber subscribers map[*subscriber][]broker.Subscriber
// marks the serve as started
started bool
// used for first registration // used for first registration
registered bool registered bool
// graceful exit // graceful exit
@ -61,20 +63,33 @@ func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response)
// ServeConn serves a single connection // ServeConn serves a single connection
func (s *rpcServer) ServeConn(sock transport.Socket) { func (s *rpcServer) ServeConn(sock transport.Socket) {
var wg sync.WaitGroup
var mtx sync.RWMutex
// streams are multiplexed on Micro-Stream or Micro-Id header
sockets := make(map[string]*socket.Socket)
defer func() { defer func() {
// close socket // wait till done
wg.Wait()
// close underlying socket
sock.Close() sock.Close()
// close the sockets
mtx.Lock()
for id, psock := range sockets {
psock.Close()
delete(sockets, id)
}
mtx.Unlock()
// recover any panics
if r := recover(); r != nil { if r := recover(); r != nil {
log.Log("panic recovered: ", r) log.Log("panic recovered: ", r)
log.Log(string(debug.Stack())) log.Log(string(debug.Stack()))
} }
}() }()
// multiplex the streams on a single socket by Micro-Stream
var mtx sync.RWMutex
sockets := make(map[string]*socket.Socket)
for { for {
var msg transport.Message var msg transport.Message
if err := sock.Recv(&msg); err != nil { if err := sock.Recv(&msg); err != nil {
@ -92,6 +107,9 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
id = msg.Header["Micro-Id"] id = msg.Header["Micro-Id"]
} }
// we're starting processing
wg.Add(1)
// add to wait group if "wait" is opt-in // add to wait group if "wait" is opt-in
if s.wg != nil { if s.wg != nil {
s.wg.Add(1) s.wg.Add(1)
@ -117,6 +135,8 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
s.wg.Done() s.wg.Done()
} }
wg.Done()
// continue to the next message // continue to the next message
continue continue
} }
@ -134,28 +154,6 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
sockets[id] = psock sockets[id] = psock
mtx.Unlock() mtx.Unlock()
// process the outbound messages from the socket
go func(id string, psock *socket.Socket) {
defer psock.Close()
for {
// get the message from our internal handler/stream
m := new(transport.Message)
if err := psock.Process(m); err != nil {
// delete the socket
mtx.Lock()
delete(sockets, id)
mtx.Unlock()
return
}
// send the message back over the socket
if err := sock.Send(m); err != nil {
return
}
}
}(id, psock)
// now walk the usual path // now walk the usual path
// we use this Timeout header to set a server deadline // we use this Timeout header to set a server deadline
@ -203,17 +201,23 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
}, },
Body: []byte(err.Error()), Body: []byte(err.Error()),
}) })
if s.wg != nil { if s.wg != nil {
s.wg.Done() s.wg.Done()
} }
wg.Done()
return return
} }
} }
rcodec := newRpcCodec(&msg, psock, cf) rcodec := newRpcCodec(&msg, psock, cf)
protocol := rcodec.String()
// check stream id // check stream id
var stream bool var stream bool
if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 { if v := getHeader("Micro-Stream", msg.Header); len(v) > 0 {
stream = true stream = true
} }
@ -257,8 +261,44 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
r = rpcRouter{handler} r = rpcRouter{handler}
} }
// wait for processing to exit
wg.Add(1)
// process the outbound messages from the socket
go func(id string, psock *socket.Socket) {
defer func() {
// TODO: don't hack this but if its grpc just break out of the stream
// We do this because the underlying connection is h2 and its a stream
switch protocol {
case "grpc":
sock.Close()
}
wg.Done()
}()
for {
// get the message from our internal handler/stream
m := new(transport.Message)
if err := psock.Process(m); err != nil {
// delete the socket
mtx.Lock()
delete(sockets, id)
mtx.Unlock()
return
}
// send the message back over the socket
if err := sock.Send(m); err != nil {
return
}
}
}(id, psock)
// serve the request in a go routine as this may be a stream // serve the request in a go routine as this may be a stream
go func(id string, psock *socket.Socket) { go func(id string, psock *socket.Socket) {
defer psock.Close()
// serve the actual request using the request router // serve the actual request using the request router
if err := r.ServeRequest(ctx, request, response); err != nil { if err := r.ServeRequest(ctx, request, response); err != nil {
// write an error response // write an error response
@ -278,16 +318,14 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
delete(sockets, id) delete(sockets, id)
mtx.Unlock() mtx.Unlock()
// once done serving signal we're done
if s.wg != nil {
s.wg.Done()
}
}(id, psock)
// signal we're done // signal we're done
if s.wg != nil { if s.wg != nil {
s.wg.Done() s.wg.Done()
} }
// done with this socket
wg.Done()
}(id, psock)
} }
} }
@ -587,6 +625,13 @@ func (s *rpcServer) Deregister() error {
} }
func (s *rpcServer) Start() error { func (s *rpcServer) Start() error {
s.RLock()
if s.started {
s.RUnlock()
return nil
}
s.RUnlock()
config := s.Options() config := s.Options()
// start listening on the transport // start listening on the transport
@ -711,13 +756,34 @@ func (s *rpcServer) Start() error {
s.Unlock() s.Unlock()
}() }()
// mark the server as started
s.Lock()
s.started = true
s.Unlock()
return nil return nil
} }
func (s *rpcServer) Stop() error { func (s *rpcServer) Stop() error {
s.RLock()
if !s.started {
s.RUnlock()
return nil
}
s.RUnlock()
ch := make(chan error) ch := make(chan error)
s.exit <- ch s.exit <- ch
return <-ch
var err error
select {
case err = <-ch:
s.Lock()
s.started = false
s.Unlock()
}
return err
} }
func (s *rpcServer) String() string { func (s *rpcServer) String() string {

View File

@ -3,9 +3,11 @@ package server
import ( import (
"context" "context"
"fmt"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
@ -116,8 +118,8 @@ type Option func(*Options)
var ( var (
DefaultAddress = ":0" DefaultAddress = ":0"
DefaultName = "server" DefaultName = "go.micro.server"
DefaultVersion = "latest" DefaultVersion = fmt.Sprintf("%d", time.Now().Unix())
DefaultId = uuid.New().String() DefaultId = uuid.New().String()
DefaultServer Server = newRpcServer() DefaultServer Server = newRpcServer()
DefaultRouter = newRpcRouter() DefaultRouter = newRpcRouter()

View File

@ -17,5 +17,5 @@ func NewService(opts ...micro.Option) micro.Service {
options = append(options, opts...) options = append(options, opts...)
return micro.NewService(opts...) return micro.NewService(options...)
} }

View File

@ -33,6 +33,8 @@ type httpTransportClient struct {
once sync.Once once sync.Once
sync.RWMutex sync.RWMutex
// request must be stored for response processing
r chan *http.Request r chan *http.Request
bl []*http.Request bl []*http.Request
buff *bufio.Reader buff *bufio.Reader
@ -48,10 +50,18 @@ type httpTransportSocket struct {
r *http.Request r *http.Request
rw *bufio.ReadWriter rw *bufio.ReadWriter
mtx sync.RWMutex
// the hijacked when using http 1
conn net.Conn conn net.Conn
// for the first request // for the first request
ch chan *http.Request ch chan *http.Request
// h2 things
buf *bufio.Reader
// indicate if socket is closed
closed chan bool
// local/remote ip // local/remote ip
local string local string
remote string remote string
@ -161,14 +171,13 @@ func (h *httpTransportClient) Recv(m *Message) error {
} }
func (h *httpTransportClient) Close() error { func (h *httpTransportClient) Close() error {
err := h.conn.Close()
h.once.Do(func() { h.once.Do(func() {
h.Lock() h.Lock()
h.buff.Reset(nil) h.buff.Reset(nil)
h.Unlock() h.Unlock()
close(h.r) close(h.r)
}) })
return err return h.conn.Close()
} }
func (h *httpTransportSocket) Local() string { func (h *httpTransportSocket) Local() string {
@ -232,14 +241,23 @@ func (h *httpTransportSocket) Recv(m *Message) error {
return nil return nil
} }
// only process if the socket is open
select {
case <-h.closed:
return io.EOF
default:
// no op
}
// processing http2 request // processing http2 request
// read streaming body // read streaming body
// set max buffer size // set max buffer size
buf := make([]byte, 4*1024) // TODO: adjustable buffer size
buf := make([]byte, 4*1024*1024)
// read the request body // read the request body
n, err := h.r.Body.Read(buf) n, err := h.buf.Read(buf)
// not an eof error // not an eof error
if err != nil { if err != nil {
return err return err
@ -290,7 +308,13 @@ func (h *httpTransportSocket) Send(m *Message) error {
return rsp.Write(h.conn) return rsp.Write(h.conn)
} }
// http2 request // only process if the socket is open
select {
case <-h.closed:
return io.EOF
default:
// no op
}
// set headers // set headers
for k, v := range m.Header { for k, v := range m.Header {
@ -299,6 +323,10 @@ func (h *httpTransportSocket) Send(m *Message) error {
// write request // write request
_, err := h.w.Write(m.Body) _, err := h.w.Write(m.Body)
// flush the trailers
h.w.(http.Flusher).Flush()
return err return err
} }
@ -321,13 +349,29 @@ func (h *httpTransportSocket) error(m *Message) error {
return rsp.Write(h.conn) return rsp.Write(h.conn)
} }
return nil return nil
} }
func (h *httpTransportSocket) Close() error { func (h *httpTransportSocket) Close() error {
h.mtx.Lock()
defer h.mtx.Unlock()
select {
case <-h.closed:
return nil
default:
// close the channel
close(h.closed)
// close the buffer
h.r.Body.Close()
// close the connection
if h.r.ProtoMajor == 1 { if h.r.ProtoMajor == 1 {
return h.conn.Close() return h.conn.Close()
} }
}
return nil return nil
} }
@ -374,20 +418,29 @@ func (h *httpTransportListener) Accept(fn func(Socket)) error {
con = conn con = conn
} }
// buffered reader
bufr := bufio.NewReader(r.Body)
// save the request // save the request
ch := make(chan *http.Request, 1) ch := make(chan *http.Request, 1)
ch <- r ch <- r
fn(&httpTransportSocket{ // create a new transport socket
sock := &httpTransportSocket{
ht: h.ht, ht: h.ht,
w: w, w: w,
r: r, r: r,
rw: buf, rw: buf,
buf: bufr,
ch: ch, ch: ch,
conn: con, conn: con,
local: h.Addr(), local: h.Addr(),
remote: r.RemoteAddr, remote: r.RemoteAddr,
}) closed: make(chan bool),
}
// execute the socket
fn(sock)
}) })
// get optional handlers // get optional handlers

213
tunnel/broker/broker.go Normal file
View File

@ -0,0 +1,213 @@
// Package broker is a tunnel broker
package broker
import (
"context"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/transport"
"github.com/micro/go-micro/tunnel"
)
type tunBroker struct {
opts broker.Options
tunnel tunnel.Tunnel
}
type tunSubscriber struct {
topic string
handler broker.Handler
opts broker.SubscribeOptions
closed chan bool
listener tunnel.Listener
}
type tunEvent struct {
topic string
message *broker.Message
}
// used to access tunnel from options context
type tunnelKey struct{}
type tunnelAddr struct{}
func (t *tunBroker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *tunBroker) Options() broker.Options {
return t.opts
}
func (t *tunBroker) Address() string {
return t.tunnel.Address()
}
func (t *tunBroker) Connect() error {
return t.tunnel.Connect()
}
func (t *tunBroker) Disconnect() error {
return t.tunnel.Close()
}
func (t *tunBroker) Publish(topic string, m *broker.Message, opts ...broker.PublishOption) error {
// TODO: this is probably inefficient, we might want to just maintain an open connection
// it may be easier to add broadcast to the tunnel
c, err := t.tunnel.Dial(topic)
if err != nil {
return err
}
defer c.Close()
return c.Send(&transport.Message{
Header: m.Header,
Body: m.Body,
})
}
func (t *tunBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
l, err := t.tunnel.Listen(topic)
if err != nil {
return nil, err
}
var options broker.SubscribeOptions
for _, o := range opts {
o(&options)
}
tunSub := &tunSubscriber{
topic: topic,
handler: h,
opts: options,
closed: make(chan bool),
listener: l,
}
// start processing
go tunSub.run()
return tunSub, nil
}
func (t *tunBroker) String() string {
return "tunnel"
}
func (t *tunSubscriber) run() {
for {
// accept a new connection
c, err := t.listener.Accept()
if err != nil {
select {
case <-t.closed:
return
default:
continue
}
}
// receive message
m := new(transport.Message)
if err := c.Recv(m); err != nil {
c.Close()
continue
}
// close the connection
c.Close()
// handle the message
go t.handler(&tunEvent{
topic: t.topic,
message: &broker.Message{
Header: m.Header,
Body: m.Body,
},
})
}
}
func (t *tunSubscriber) Options() broker.SubscribeOptions {
return t.opts
}
func (t *tunSubscriber) Topic() string {
return t.topic
}
func (t *tunSubscriber) Unsubscribe() error {
select {
case <-t.closed:
return nil
default:
close(t.closed)
return t.listener.Close()
}
}
func (t *tunEvent) Topic() string {
return t.topic
}
func (t *tunEvent) Message() *broker.Message {
return t.message
}
func (t *tunEvent) Ack() error {
return nil
}
func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
if !ok {
t = tunnel.NewTunnel()
}
a, ok := options.Context.Value(tunnelAddr{}).(string)
if ok {
// initialise address
t.Init(tunnel.Address(a))
}
if len(options.Addrs) > 0 {
// initialise nodes
t.Init(tunnel.Nodes(options.Addrs...))
}
return &tunBroker{
opts: options,
tunnel: t,
}
}
// WithAddress sets the tunnel address
func WithAddress(a string) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
}
}
// WithTunnel sets the internal tunnel
func WithTunnel(t tunnel.Tunnel) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, tunnelKey{}, t)
}
}

View File

@ -73,6 +73,8 @@ func newTunnel(opts ...Option) *tun {
// Init initializes tunnel options // Init initializes tunnel options
func (t *tun) Init(opts ...Option) error { func (t *tun) Init(opts ...Option) error {
t.Lock()
defer t.Unlock()
for _, o := range opts { for _, o := range opts {
o(&t.options) o(&t.options)
} }
@ -170,6 +172,9 @@ func (t *tun) process() {
newMsg.Header[k] = v newMsg.Header[k] = v
} }
// set message head
newMsg.Header["Micro-Tunnel"] = "message"
// set the tunnel id on the outgoing message // set the tunnel id on the outgoing message
newMsg.Header["Micro-Tunnel-Id"] = msg.id newMsg.Header["Micro-Tunnel-Id"] = msg.id
@ -227,8 +232,12 @@ func (t *tun) listen(link *link) {
// are we connecting to ourselves? // are we connecting to ourselves?
if token == t.token { if token == t.token {
t.Lock()
link.loopback = true link.loopback = true
t.Unlock()
} }
// nothing more to do
continue continue
case "close": case "close":
log.Debugf("Tunnel link %s closing connection", link.Remote()) log.Debugf("Tunnel link %s closing connection", link.Remote())
@ -237,10 +246,21 @@ func (t *tun) listen(link *link) {
continue continue
case "keepalive": case "keepalive":
log.Debugf("Tunnel link %s received keepalive", link.Remote()) log.Debugf("Tunnel link %s received keepalive", link.Remote())
t.Lock()
link.lastKeepAlive = time.Now() link.lastKeepAlive = time.Now()
t.Unlock()
continue
case "message":
// process message
log.Debugf("Received %+v from %s", msg, link.Remote())
default:
// blackhole it
continue continue
} }
// strip message header
delete(msg.Header, "Micro-Tunnel")
// the tunnel id // the tunnel id
id := msg.Header["Micro-Tunnel-Id"] id := msg.Header["Micro-Tunnel-Id"]
delete(msg.Header, "Micro-Tunnel-Id") delete(msg.Header, "Micro-Tunnel-Id")
@ -249,6 +269,9 @@ func (t *tun) listen(link *link) {
session := msg.Header["Micro-Tunnel-Session"] session := msg.Header["Micro-Tunnel-Session"]
delete(msg.Header, "Micro-Tunnel-Session") delete(msg.Header, "Micro-Tunnel-Session")
// strip token header
delete(msg.Header, "Micro-Tunnel-Token")
// if the session id is blank there's nothing we can do // if the session id is blank there's nothing we can do
// TODO: check this is the case, is there any reason // TODO: check this is the case, is there any reason
// why we'd have a blank session? Is the tunnel // why we'd have a blank session? Is the tunnel
@ -260,8 +283,6 @@ func (t *tun) listen(link *link) {
var s *socket var s *socket
var exists bool var exists bool
log.Debugf("Received %+v from %s", msg, link.Remote())
switch { switch {
case link.loopback: case link.loopback:
s, exists = t.getSocket(id, "listener") s, exists = t.getSocket(id, "listener")
@ -506,6 +527,17 @@ func (t *tun) close() error {
return t.listener.Close() return t.listener.Close()
} }
func (t *tun) Address() string {
t.RLock()
defer t.RUnlock()
if !t.connected {
return t.options.Address
}
return t.listener.Addr()
}
// Close the tunnel // Close the tunnel
func (t *tun) Close() error { func (t *tun) Close() error {
t.Lock() t.Lock()
@ -591,3 +623,7 @@ func (t *tun) Listen(addr string) (Listener, error) {
// return the listener // return the listener
return tl, nil return tl, nil
} }
func (t *tun) String() string {
return "mucp"
}

View File

@ -8,7 +8,7 @@ import (
var ( var (
// DefaultAddress is default tunnel bind address // DefaultAddress is default tunnel bind address
DefaultAddress = ":9096" DefaultAddress = ":0"
) )
type Option func(*Options) type Option func(*Options)

View File

@ -11,6 +11,8 @@ import (
// the address being requested. // the address being requested.
type Tunnel interface { type Tunnel interface {
Init(opts ...Option) error Init(opts ...Option) error
// Address the tunnel is listening on
Address() string
// Connect connects the tunnel // Connect connects the tunnel
Connect() error Connect() error
// Close closes the tunnel // Close closes the tunnel
@ -19,6 +21,8 @@ type Tunnel interface {
Dial(addr string) (Conn, error) Dial(addr string) (Conn, error)
// Accept connections // Accept connections
Listen(addr string) (Listener, error) Listen(addr string) (Listener, error)
// Name of the tunnel implementation
String() string
} }
// The listener provides similar constructs to the transport.Listener // The listener provides similar constructs to the transport.Listener

View File

@ -10,6 +10,8 @@ import (
// testAccept will accept connections on the transport, create a new link and tunnel on top // testAccept will accept connections on the transport, create a new link and tunnel on top
func testAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) { func testAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) {
defer wg.Done()
// listen on some virtual address // listen on some virtual address
tl, err := tun.Listen("test-tunnel") tl, err := tun.Listen("test-tunnel")
if err != nil { if err != nil {
@ -43,7 +45,8 @@ func testAccept(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) {
t.Fatal(err) t.Fatal(err)
} }
wg.Done() wait <- true
return return
} }
} }
@ -79,6 +82,8 @@ func testSend(t *testing.T, tun Tunnel, wait chan bool, wg *sync.WaitGroup) {
t.Fatal(err) t.Fatal(err)
} }
<-wait
if v := mr.Header["test"]; v != "accept" { if v := mr.Header["test"]; v != "accept" {
t.Fatalf("Message not received from accepted side. Received: %s", v) t.Fatalf("Message not received from accepted side. Received: %s", v)
} }
@ -140,6 +145,8 @@ func TestLoopbackTunnel(t *testing.T) {
} }
defer tun.Close() defer tun.Close()
time.Sleep(500 * time.Millisecond)
wait := make(chan bool) wait := make(chan bool)
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -32,10 +32,10 @@ func (s *Socket) SetRemote(r string) {
// Accept passes a message to the socket which will be processed by the call to Recv // Accept passes a message to the socket which will be processed by the call to Recv
func (s *Socket) Accept(m *transport.Message) error { func (s *Socket) Accept(m *transport.Message) error {
select { select {
case <-s.closed:
return io.EOF
case s.recv <- m: case s.recv <- m:
return nil return nil
case <-s.closed:
return io.EOF
} }
return nil return nil
} }
@ -43,10 +43,17 @@ func (s *Socket) Accept(m *transport.Message) error {
// Process takes the next message off the send queue created by a call to Send // Process takes the next message off the send queue created by a call to Send
func (s *Socket) Process(m *transport.Message) error { func (s *Socket) Process(m *transport.Message) error {
select { select {
case <-s.closed:
return io.EOF
case msg := <-s.send: case msg := <-s.send:
*m = *msg *m = *msg
case <-s.closed:
// see if we need to drain
select {
case msg := <-s.send:
*m = *msg
return nil
default:
return io.EOF
}
} }
return nil return nil
} }
@ -60,13 +67,6 @@ func (s *Socket) Local() string {
} }
func (s *Socket) Send(m *transport.Message) error { func (s *Socket) Send(m *transport.Message) error {
select {
case <-s.closed:
return io.EOF
default:
// no op
}
// make copy // make copy
msg := &transport.Message{ msg := &transport.Message{
Header: make(map[string]string), Header: make(map[string]string),
@ -92,13 +92,6 @@ func (s *Socket) Send(m *transport.Message) error {
} }
func (s *Socket) Recv(m *transport.Message) error { func (s *Socket) Recv(m *transport.Message) error {
select {
case <-s.closed:
return io.EOF
default:
// no op
}
// receive a message // receive a message
select { select {
case msg := <-s.recv: case msg := <-s.recv: