From 4de346920f60cb92e46121d648e8b647607032b5 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 3 Oct 2019 16:19:02 +0100 Subject: [PATCH 1/8] Add broker service implementation --- broker/service/handler/handler.go | 57 +++++ broker/service/proto/broker.micro.go | 173 +++++++++++++ broker/service/proto/broker.pb.go | 364 +++++++++++++++++++++++++++ broker/service/proto/broker.proto | 25 ++ broker/service/service.go | 102 ++++++++ broker/service/subscriber.go | 79 ++++++ 6 files changed, 800 insertions(+) create mode 100644 broker/service/handler/handler.go create mode 100644 broker/service/proto/broker.micro.go create mode 100644 broker/service/proto/broker.pb.go create mode 100644 broker/service/proto/broker.proto create mode 100644 broker/service/service.go create mode 100644 broker/service/subscriber.go diff --git a/broker/service/handler/handler.go b/broker/service/handler/handler.go new file mode 100644 index 00000000..6e78905f --- /dev/null +++ b/broker/service/handler/handler.go @@ -0,0 +1,57 @@ +package handler + +import ( + "context" + + "github.com/micro/go-micro/broker" + pb "github.com/micro/go-micro/broker/service/proto" + "github.com/micro/go-micro/errors" +) + +type Broker struct { + Broker broker.Broker +} + +func (b *Broker) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.Empty) error { + err := b.Broker.Publish(req.Topic, &broker.Message{ + Header: req.Message.Header, + Body: req.Message.Body, + }) + if err != nil { + return errors.InternalServerError("go.micro.broker", err.Error()) + } + return nil +} + +func (b *Broker) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Broker_SubscribeStream) error { + errChan := make(chan error, 1) + + // message handler to stream back messages from broker + handler := func(p broker.Event) error { + if err := stream.Send(&pb.Message{ + Header: p.Message().Header, + Body: p.Message().Body, + }); err != nil { + select { + case errChan <- err: + return err + default: + return err + } + } + return nil + } + + sub, err := b.Broker.Subscribe(req.Topic, handler, broker.Queue(req.Queue)) + if err != nil { + return errors.InternalServerError("go.micro.broker", err.Error()) + } + defer sub.Unsubscribe() + + select { + case <-ctx.Done(): + return nil + case <-errChan: + return err + } +} diff --git a/broker/service/proto/broker.micro.go b/broker/service/proto/broker.micro.go new file mode 100644 index 00000000..577132c5 --- /dev/null +++ b/broker/service/proto/broker.micro.go @@ -0,0 +1,173 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: github.com/micro/go-micro/broker/proto/broker.proto + +package go_micro_broker + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +import ( + context "context" + client "github.com/micro/go-micro/client" + server "github.com/micro/go-micro/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ client.Option +var _ server.Option + +// Client API for Broker service + +type BrokerService interface { + Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*Empty, error) + Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Broker_SubscribeService, error) +} + +type brokerService struct { + c client.Client + name string +} + +func NewBrokerService(name string, c client.Client) BrokerService { + if c == nil { + c = client.NewClient() + } + if len(name) == 0 { + name = "go.micro.broker" + } + return &brokerService{ + c: c, + name: name, + } +} + +func (c *brokerService) Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*Empty, error) { + req := c.c.NewRequest(c.name, "Broker.Publish", in) + out := new(Empty) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *brokerService) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Broker_SubscribeService, error) { + req := c.c.NewRequest(c.name, "Broker.Subscribe", &SubscribeRequest{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + if err := stream.Send(in); err != nil { + return nil, err + } + return &brokerServiceSubscribe{stream}, nil +} + +type Broker_SubscribeService interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Recv() (*Message, error) +} + +type brokerServiceSubscribe struct { + stream client.Stream +} + +func (x *brokerServiceSubscribe) Close() error { + return x.stream.Close() +} + +func (x *brokerServiceSubscribe) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *brokerServiceSubscribe) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *brokerServiceSubscribe) Recv() (*Message, error) { + m := new(Message) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +// Server API for Broker service + +type BrokerHandler interface { + Publish(context.Context, *PublishRequest, *Empty) error + Subscribe(context.Context, *SubscribeRequest, Broker_SubscribeStream) error +} + +func RegisterBrokerHandler(s server.Server, hdlr BrokerHandler, opts ...server.HandlerOption) error { + type broker interface { + Publish(ctx context.Context, in *PublishRequest, out *Empty) error + Subscribe(ctx context.Context, stream server.Stream) error + } + type Broker struct { + broker + } + h := &brokerHandler{hdlr} + return s.Handle(s.NewHandler(&Broker{h}, opts...)) +} + +type brokerHandler struct { + BrokerHandler +} + +func (h *brokerHandler) Publish(ctx context.Context, in *PublishRequest, out *Empty) error { + return h.BrokerHandler.Publish(ctx, in, out) +} + +func (h *brokerHandler) Subscribe(ctx context.Context, stream server.Stream) error { + m := new(SubscribeRequest) + if err := stream.Recv(m); err != nil { + return err + } + return h.BrokerHandler.Subscribe(ctx, m, &brokerSubscribeStream{stream}) +} + +type Broker_SubscribeStream interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Message) error +} + +type brokerSubscribeStream struct { + stream server.Stream +} + +func (x *brokerSubscribeStream) Close() error { + return x.stream.Close() +} + +func (x *brokerSubscribeStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *brokerSubscribeStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *brokerSubscribeStream) Send(m *Message) error { + return x.stream.Send(m) +} diff --git a/broker/service/proto/broker.pb.go b/broker/service/proto/broker.pb.go new file mode 100644 index 00000000..b54a5480 --- /dev/null +++ b/broker/service/proto/broker.pb.go @@ -0,0 +1,364 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: github.com/micro/go-micro/broker/proto/broker.proto + +package go_micro_broker + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Empty struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Empty) Reset() { *m = Empty{} } +func (m *Empty) String() string { return proto.CompactTextString(m) } +func (*Empty) ProtoMessage() {} +func (*Empty) Descriptor() ([]byte, []int) { + return fileDescriptor_5edf81766900dd99, []int{0} +} + +func (m *Empty) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Empty.Unmarshal(m, b) +} +func (m *Empty) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Empty.Marshal(b, m, deterministic) +} +func (m *Empty) XXX_Merge(src proto.Message) { + xxx_messageInfo_Empty.Merge(m, src) +} +func (m *Empty) XXX_Size() int { + return xxx_messageInfo_Empty.Size(m) +} +func (m *Empty) XXX_DiscardUnknown() { + xxx_messageInfo_Empty.DiscardUnknown(m) +} + +var xxx_messageInfo_Empty proto.InternalMessageInfo + +type PublishRequest struct { + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Message *Message `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PublishRequest) Reset() { *m = PublishRequest{} } +func (m *PublishRequest) String() string { return proto.CompactTextString(m) } +func (*PublishRequest) ProtoMessage() {} +func (*PublishRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_5edf81766900dd99, []int{1} +} + +func (m *PublishRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_PublishRequest.Unmarshal(m, b) +} +func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_PublishRequest.Marshal(b, m, deterministic) +} +func (m *PublishRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_PublishRequest.Merge(m, src) +} +func (m *PublishRequest) XXX_Size() int { + return xxx_messageInfo_PublishRequest.Size(m) +} +func (m *PublishRequest) XXX_DiscardUnknown() { + xxx_messageInfo_PublishRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_PublishRequest proto.InternalMessageInfo + +func (m *PublishRequest) GetTopic() string { + if m != nil { + return m.Topic + } + return "" +} + +func (m *PublishRequest) GetMessage() *Message { + if m != nil { + return m.Message + } + return nil +} + +type SubscribeRequest struct { + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Queue string `protobuf:"bytes,2,opt,name=queue,proto3" json:"queue,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SubscribeRequest) Reset() { *m = SubscribeRequest{} } +func (m *SubscribeRequest) String() string { return proto.CompactTextString(m) } +func (*SubscribeRequest) ProtoMessage() {} +func (*SubscribeRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_5edf81766900dd99, []int{2} +} + +func (m *SubscribeRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SubscribeRequest.Unmarshal(m, b) +} +func (m *SubscribeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SubscribeRequest.Marshal(b, m, deterministic) +} +func (m *SubscribeRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SubscribeRequest.Merge(m, src) +} +func (m *SubscribeRequest) XXX_Size() int { + return xxx_messageInfo_SubscribeRequest.Size(m) +} +func (m *SubscribeRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SubscribeRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_SubscribeRequest proto.InternalMessageInfo + +func (m *SubscribeRequest) GetTopic() string { + if m != nil { + return m.Topic + } + return "" +} + +func (m *SubscribeRequest) GetQueue() string { + if m != nil { + return m.Queue + } + return "" +} + +type Message struct { + Header map[string]string `protobuf:"bytes,1,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_5edf81766900dd99, []int{3} +} + +func (m *Message) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Message.Unmarshal(m, b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return xxx_messageInfo_Message.Size(m) +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetHeader() map[string]string { + if m != nil { + return m.Header + } + return nil +} + +func (m *Message) GetBody() []byte { + if m != nil { + return m.Body + } + return nil +} + +func init() { + proto.RegisterType((*Empty)(nil), "go.micro.broker.Empty") + proto.RegisterType((*PublishRequest)(nil), "go.micro.broker.PublishRequest") + proto.RegisterType((*SubscribeRequest)(nil), "go.micro.broker.SubscribeRequest") + proto.RegisterType((*Message)(nil), "go.micro.broker.Message") + proto.RegisterMapType((map[string]string)(nil), "go.micro.broker.Message.HeaderEntry") +} + +func init() { + proto.RegisterFile("github.com/micro/go-micro/broker/proto/broker.proto", fileDescriptor_5edf81766900dd99) +} + +var fileDescriptor_5edf81766900dd99 = []byte{ + // 309 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x91, 0xcf, 0x4a, 0xf3, 0x40, + 0x14, 0xc5, 0x3b, 0xed, 0xd7, 0x86, 0xde, 0x7e, 0x68, 0x19, 0x8a, 0x84, 0x6e, 0x8c, 0xc1, 0x45, + 0x36, 0x4e, 0x24, 0xdd, 0xa8, 0x88, 0x0b, 0xb1, 0xe0, 0x42, 0x41, 0xc6, 0x9d, 0xbb, 0x4c, 0x3a, + 0x24, 0xa1, 0x8d, 0x93, 0x4e, 0x66, 0x84, 0xbc, 0x88, 0x2b, 0x1f, 0x56, 0x3a, 0x93, 0xfa, 0xa7, + 0xa1, 0xee, 0xee, 0x49, 0x7e, 0x73, 0xee, 0xe1, 0x5c, 0x98, 0xa5, 0xb9, 0xca, 0x34, 0x23, 0x89, + 0x28, 0xc2, 0x22, 0x4f, 0xa4, 0x08, 0x53, 0x71, 0x66, 0x07, 0x26, 0xc5, 0x92, 0xcb, 0xb0, 0x94, + 0x42, 0x6d, 0x05, 0x31, 0x02, 0x1f, 0xa6, 0x82, 0x18, 0x86, 0xd8, 0xcf, 0xbe, 0x03, 0xfd, 0x79, + 0x51, 0xaa, 0xda, 0x7f, 0x81, 0x83, 0x27, 0xcd, 0x56, 0x79, 0x95, 0x51, 0xbe, 0xd6, 0xbc, 0x52, + 0x78, 0x02, 0x7d, 0x25, 0xca, 0x3c, 0x71, 0x91, 0x87, 0x82, 0x21, 0xb5, 0x02, 0x47, 0xe0, 0x14, + 0xbc, 0xaa, 0xe2, 0x94, 0xbb, 0x5d, 0x0f, 0x05, 0xa3, 0xc8, 0x25, 0x3b, 0x9e, 0xe4, 0xd1, 0xfe, + 0xa7, 0x5b, 0xd0, 0xbf, 0x81, 0xf1, 0xb3, 0x66, 0x55, 0x22, 0x73, 0xc6, 0xff, 0x76, 0x9f, 0x40, + 0x7f, 0xad, 0xb9, 0xb6, 0xde, 0x43, 0x6a, 0x85, 0xff, 0x8e, 0xc0, 0x69, 0x4c, 0xf1, 0x35, 0x0c, + 0x32, 0x1e, 0x2f, 0xb8, 0x74, 0x91, 0xd7, 0x0b, 0x46, 0xd1, 0xe9, 0xbe, 0xf5, 0xe4, 0xde, 0x60, + 0xf3, 0x57, 0x25, 0x6b, 0xda, 0xbc, 0xc1, 0x18, 0xfe, 0x31, 0xb1, 0xa8, 0x8d, 0xfd, 0x7f, 0x6a, + 0xe6, 0xe9, 0x25, 0x8c, 0x7e, 0xa0, 0x78, 0x0c, 0xbd, 0x25, 0xaf, 0x9b, 0x58, 0x9b, 0x71, 0x13, + 0xea, 0x2d, 0x5e, 0x7d, 0x87, 0x32, 0xe2, 0xaa, 0x7b, 0x81, 0xa2, 0x0f, 0x04, 0x83, 0x5b, 0xb3, + 0x15, 0xdf, 0x81, 0xd3, 0xf4, 0x87, 0x8f, 0x5b, 0x91, 0x7e, 0x37, 0x3b, 0x3d, 0x6a, 0x01, 0xf6, + 0x06, 0x1d, 0xfc, 0x00, 0xc3, 0xaf, 0xa6, 0xf0, 0x49, 0x0b, 0xdb, 0x6d, 0x71, 0xba, 0xb7, 0x7c, + 0xbf, 0x73, 0x8e, 0xd8, 0xc0, 0x1c, 0x7d, 0xf6, 0x19, 0x00, 0x00, 0xff, 0xff, 0x25, 0x38, 0xfa, + 0x02, 0x2b, 0x02, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// BrokerClient is the client API for Broker service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type BrokerClient interface { + Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*Empty, error) + Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Broker_SubscribeClient, error) +} + +type brokerClient struct { + cc *grpc.ClientConn +} + +func NewBrokerClient(cc *grpc.ClientConn) BrokerClient { + return &brokerClient{cc} +} + +func (c *brokerClient) Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*Empty, error) { + out := new(Empty) + err := c.cc.Invoke(ctx, "/go.micro.broker.Broker/Publish", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *brokerClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Broker_SubscribeClient, error) { + stream, err := c.cc.NewStream(ctx, &_Broker_serviceDesc.Streams[0], "/go.micro.broker.Broker/Subscribe", opts...) + if err != nil { + return nil, err + } + x := &brokerSubscribeClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Broker_SubscribeClient interface { + Recv() (*Message, error) + grpc.ClientStream +} + +type brokerSubscribeClient struct { + grpc.ClientStream +} + +func (x *brokerSubscribeClient) Recv() (*Message, error) { + m := new(Message) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// BrokerServer is the server API for Broker service. +type BrokerServer interface { + Publish(context.Context, *PublishRequest) (*Empty, error) + Subscribe(*SubscribeRequest, Broker_SubscribeServer) error +} + +func RegisterBrokerServer(s *grpc.Server, srv BrokerServer) { + s.RegisterService(&_Broker_serviceDesc, srv) +} + +func _Broker_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PublishRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BrokerServer).Publish(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.broker.Broker/Publish", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BrokerServer).Publish(ctx, req.(*PublishRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Broker_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SubscribeRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(BrokerServer).Subscribe(m, &brokerSubscribeServer{stream}) +} + +type Broker_SubscribeServer interface { + Send(*Message) error + grpc.ServerStream +} + +type brokerSubscribeServer struct { + grpc.ServerStream +} + +func (x *brokerSubscribeServer) Send(m *Message) error { + return x.ServerStream.SendMsg(m) +} + +var _Broker_serviceDesc = grpc.ServiceDesc{ + ServiceName: "go.micro.broker.Broker", + HandlerType: (*BrokerServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Publish", + Handler: _Broker_Publish_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Subscribe", + Handler: _Broker_Subscribe_Handler, + ServerStreams: true, + }, + }, + Metadata: "github.com/micro/go-micro/broker/proto/broker.proto", +} diff --git a/broker/service/proto/broker.proto b/broker/service/proto/broker.proto new file mode 100644 index 00000000..3fc877e6 --- /dev/null +++ b/broker/service/proto/broker.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package go.micro.broker; + +service Broker { + rpc Publish(PublishRequest) returns (Empty) {}; + rpc Subscribe(SubscribeRequest) returns (stream Message) {}; +} + +message Empty {} + +message PublishRequest { + string topic = 1; + Message message = 2; +} + +message SubscribeRequest { + string topic = 1; + string queue = 2; +} + +message Message { + map header = 1; + bytes body = 2; +} diff --git a/broker/service/service.go b/broker/service/service.go new file mode 100644 index 00000000..63c09624 --- /dev/null +++ b/broker/service/service.go @@ -0,0 +1,102 @@ +// Package service provides the broker service client +package service + +import ( + "context" + + "github.com/micro/go-micro/broker" + pb "github.com/micro/go-micro/broker/service/proto" + "github.com/micro/go-micro/client" +) + +type serviceBroker struct { + Addrs []string + Client pb.BrokerService + options broker.Options +} + +var ( + DefaultName = "go.micro.broker" +) + +func (b *serviceBroker) Address() string { + return b.Addrs[0] +} + +func (b *serviceBroker) Connect() error { + return nil +} + +func (b *serviceBroker) Disconnect() error { + return nil +} + +func (b *serviceBroker) Init(opts ...broker.Option) error { + for _, o := range opts { + o(&b.options) + } + return nil +} + +func (b *serviceBroker) Options() broker.Options { + return b.options +} + +func (b *serviceBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { + _, err := b.Client.Publish(context.TODO(), &pb.PublishRequest{ + Topic: topic, + Message: &pb.Message{ + Header: msg.Header, + Body: msg.Body, + }, + }, client.WithAddress(b.Addrs...)) + return err +} + +func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + var options broker.SubscribeOptions + for _, o := range opts { + o(&options) + } + stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ + Topic: topic, + Queue: options.Queue, + }, client.WithAddress(b.Addrs...)) + if err != nil { + return nil, err + } + + sub := &serviceSub{ + topic: topic, + queue: options.Queue, + handler: handler, + stream: stream, + closed: make(chan bool), + options: options, + } + go sub.run() + + return sub, nil +} + +func (b *serviceBroker) String() string { + return "service" +} + +func NewBroker(opts ...broker.Option) broker.Broker { + var options broker.Options + for _, o := range opts { + o(&options) + } + + addrs := options.Addrs + if len(addrs) == 0 { + addrs = []string{"127.0.0.1:8001"} + } + + return &serviceBroker{ + Addrs: addrs, + Client: pb.NewBrokerService(DefaultName, client.DefaultClient), + options: options, + } +} diff --git a/broker/service/subscriber.go b/broker/service/subscriber.go new file mode 100644 index 00000000..09a942c8 --- /dev/null +++ b/broker/service/subscriber.go @@ -0,0 +1,79 @@ +package service + +import ( + "github.com/micro/go-micro/broker" + pb "github.com/micro/go-micro/broker/service/proto" +) + +type serviceSub struct { + topic string + queue string + handler broker.Handler + stream pb.Broker_SubscribeService + closed chan bool + options broker.SubscribeOptions +} + +type serviceEvent struct { + topic string + message *broker.Message +} + +func (s *serviceEvent) Topic() string { + return s.topic +} + +func (s *serviceEvent) Message() *broker.Message { + return s.message +} + +func (s *serviceEvent) Ack() error { + return nil +} + +func (s *serviceSub) run() { + exit := make(chan bool) + go func() { + select { + case <-exit: + return + case <-s.closed: + s.stream.Close() + } + }() + + for { + // TODO: do not fail silently + msg, err := s.stream.Recv() + if err != nil { + close(exit) + return + } + // TODO: handle error + s.handler(&serviceEvent{ + topic: s.topic, + message: &broker.Message{ + Header: msg.Header, + Body: msg.Body, + }, + }) + } +} + +func (s *serviceSub) Options() broker.SubscribeOptions { + return s.options +} + +func (s *serviceSub) Topic() string { + return s.topic +} + +func (s *serviceSub) Unsubscribe() error { + select { + case <-s.closed: + return nil + default: + close(s.closed) + } + return nil +} From b80654bf7e81c13b4434659c238ca8200e78702d Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 3 Oct 2019 16:22:26 +0100 Subject: [PATCH 2/8] Add broker service to config/cmd --- config/cmd/cmd.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/config/cmd/cmd.go b/config/cmd/cmd.go index cfb8b313..6cb6835b 100644 --- a/config/cmd/cmd.go +++ b/config/cmd/cmd.go @@ -23,6 +23,7 @@ import ( "github.com/micro/go-micro/broker/http" "github.com/micro/go-micro/broker/memory" "github.com/micro/go-micro/broker/nats" + brokerSrv "github.com/micro/go-micro/broker/service" // registries "github.com/micro/go-micro/registry" @@ -179,9 +180,11 @@ var ( } DefaultBrokers = map[string]func(...broker.Option) broker.Broker{ - "http": http.NewBroker, - "memory": memory.NewBroker, - "nats": nats.NewBroker, + "go.micro.broker": brokerSrv.NewBroker, + "service": brokerSrv.NewBroker, + "http": http.NewBroker, + "memory": memory.NewBroker, + "nats": nats.NewBroker, } DefaultClients = map[string]func(...client.Option) client.Client{ From 05eacd74c85179d8f03871827f5467d52744d8fb Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Thu, 3 Oct 2019 17:30:37 +0100 Subject: [PATCH 3/8] Add logging for broker handler --- broker/service/handler/handler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/broker/service/handler/handler.go b/broker/service/handler/handler.go index 6e78905f..3d59dc2e 100644 --- a/broker/service/handler/handler.go +++ b/broker/service/handler/handler.go @@ -3,6 +3,7 @@ package handler import ( "context" + "github.com/micro/go-micro/util/log" "github.com/micro/go-micro/broker" pb "github.com/micro/go-micro/broker/service/proto" "github.com/micro/go-micro/errors" @@ -13,6 +14,7 @@ type Broker struct { } func (b *Broker) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.Empty) error { + log.Debugf("Publishing message to %s topic", req.Topic) err := b.Broker.Publish(req.Topic, &broker.Message{ Header: req.Message.Header, Body: req.Message.Body, @@ -42,6 +44,7 @@ func (b *Broker) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream return nil } + log.Debugf("Subscribing to %s topic", req.Topic) sub, err := b.Broker.Subscribe(req.Topic, handler, broker.Queue(req.Queue)) if err != nil { return errors.InternalServerError("go.micro.broker", err.Error()) From c4b6d0f3a891b462a88e87c1bd0953e95a731587 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 4 Oct 2019 16:29:56 +0100 Subject: [PATCH 4/8] fix major deadlock in registry cache --- registry/cache/cache.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/registry/cache/cache.go b/registry/cache/cache.go index 64ec6843..46460f7d 100644 --- a/registry/cache/cache.go +++ b/registry/cache/cache.go @@ -100,7 +100,7 @@ func (c *cache) quit() bool { func (c *cache) del(service string) { // don't blow away cache in error state - if err := c.getStatus(); err != nil { + if err := c.status; err != nil { return } // otherwise delete entries @@ -116,13 +116,14 @@ func (c *cache) get(service string) ([]*registry.Service, error) { services := c.cache[service] // get cache ttl ttl := c.ttls[service] + // make a copy + cp := registry.Copy(services) + + // unlock the read lock + c.RUnlock() // got services && within ttl so return cache - if c.isValid(services, ttl) { - // make a copy - cp := registry.Copy(services) - // unlock the read - c.RUnlock() + if c.isValid(cp, ttl) { // return servics return cp, nil } @@ -136,8 +137,9 @@ func (c *cache) get(service string) ([]*registry.Service, error) { if len(cached) > 0 { // set the error status c.setStatus(err) + // return the stale cache - return registry.Copy(cached), nil + return cached, nil } // otherwise return error return nil, err @@ -161,9 +163,6 @@ func (c *cache) get(service string) ([]*registry.Service, error) { go c.run(service) } - // unlock the read lock - c.RUnlock() - // get and return services return get(service, services) } From 04320d69ff124fbad0e1a1afb0ef238834b89c4b Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 4 Oct 2019 16:30:03 +0100 Subject: [PATCH 5/8] Fix and comment broker service --- broker/http_broker.go | 15 ++++++++++----- broker/service/handler/handler.go | 10 ++++++++-- broker/service/service.go | 32 ++++++++++++++++++++++++++++++- broker/service/subscriber.go | 30 +++++++++++++++++++++++++---- 4 files changed, 75 insertions(+), 12 deletions(-) diff --git a/broker/http_broker.go b/broker/http_broker.go index 5b362c14..4a22fe12 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -324,15 +324,21 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { p := &httpEvent{m: m, t: topic} id := req.Form.Get("id") + var subs []Handler + h.RLock() for _, subscriber := range h.subscribers[topic] { - if id == subscriber.id { - // sub is sync; crufty rate limiting - // so we don't hose the cpu - subscriber.fn(p) + if id != subscriber.id { + continue } + subs = append(subs, subscriber.fn) } h.RUnlock() + + // execute the handler + for _, fn := range subs { + fn(p) + } } func (h *httpBroker) Address() string { @@ -420,7 +426,6 @@ func (h *httpBroker) Connect() error { } func (h *httpBroker) Disconnect() error { - h.RLock() if !h.running { h.RUnlock() diff --git a/broker/service/handler/handler.go b/broker/service/handler/handler.go index 3d59dc2e..e0b5c3d4 100644 --- a/broker/service/handler/handler.go +++ b/broker/service/handler/handler.go @@ -19,6 +19,7 @@ func (b *Broker) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.Em Header: req.Message.Header, Body: req.Message.Body, }) + log.Debugf("Published message to %s topic", req.Topic) if err != nil { return errors.InternalServerError("go.micro.broker", err.Error()) } @@ -49,12 +50,17 @@ func (b *Broker) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream if err != nil { return errors.InternalServerError("go.micro.broker", err.Error()) } - defer sub.Unsubscribe() + defer func() { + log.Debugf("Unsubscribing from topic %s", req.Topic) + sub.Unsubscribe() + }() select { case <-ctx.Done(): + log.Debugf("Context done for subscription to topic %s", req.Topic) return nil - case <-errChan: + case err := <-errChan: + log.Debugf("Subscription error for topic %s: %v", req.Topic, err) return err } } diff --git a/broker/service/service.go b/broker/service/service.go index 63c09624..58942fe3 100644 --- a/broker/service/service.go +++ b/broker/service/service.go @@ -3,7 +3,9 @@ package service import ( "context" + "time" + "github.com/micro/go-micro/util/log" "github.com/micro/go-micro/broker" pb "github.com/micro/go-micro/broker/service/proto" "github.com/micro/go-micro/client" @@ -43,6 +45,7 @@ func (b *serviceBroker) Options() broker.Options { } func (b *serviceBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { + log.Debugf("Publishing to topic %s broker %v", topic, b.Addrs) _, err := b.Client.Publish(context.TODO(), &pb.PublishRequest{ Topic: topic, Message: &pb.Message{ @@ -58,6 +61,7 @@ func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ... for _, o := range opts { o(&options) } + log.Debugf("Subscribing to topic %s queue %s broker %v", topic, options.Queue, b.Addrs) stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ Topic: topic, Queue: options.Queue, @@ -74,7 +78,33 @@ func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ... closed: make(chan bool), options: options, } - go sub.run() + + go func() { + for { + select { + case <-sub.closed: + log.Debugf("Unsubscribed from topic %s", topic) + return + default: + // run the subscriber + log.Debugf("Streaming from broker %v to topic [%s] queue [%s]", b.Addrs, topic, options.Queue) + if err := sub.run(); err != nil { + log.Debugf("Resubscribing to topic %s broker %v", topic, b.Addrs) + stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ + Topic: topic, + Queue: options.Queue, + }, client.WithAddress(b.Addrs...)) + if err != nil { + log.Debugf("Failed to resubscribe to topic %s: %v", topic, err) + time.Sleep(time.Second) + continue + } + // new stream + sub.stream = stream + } + } + } + }() return sub, nil } diff --git a/broker/service/subscriber.go b/broker/service/subscriber.go index 09a942c8..02511196 100644 --- a/broker/service/subscriber.go +++ b/broker/service/subscriber.go @@ -1,6 +1,7 @@ package service import ( + "github.com/micro/go-micro/util/log" "github.com/micro/go-micro/broker" pb "github.com/micro/go-micro/broker/service/proto" ) @@ -31,24 +32,45 @@ func (s *serviceEvent) Ack() error { return nil } -func (s *serviceSub) run() { +func (s *serviceSub) isClosed() bool { + select { + case <-s.closed: + return true + default: + return false + } +} + +func (s *serviceSub) run() error { exit := make(chan bool) go func() { select { case <-exit: - return case <-s.closed: - s.stream.Close() } + + // close the stream + s.stream.Close() }() for { // TODO: do not fail silently msg, err := s.stream.Recv() if err != nil { + log.Debugf("Streaming error for subcription to topic %s: %v", s.Topic(), err) + + // close the exit channel close(exit) - return + + // don't return an error if we unsubscribed + if s.isClosed() { + return nil + } + + // return stream error + return err } + // TODO: handle error s.handler(&serviceEvent{ topic: s.topic, From e36960612a353c454c4727c5a7a9badb3357e30a Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 4 Oct 2019 16:40:16 +0100 Subject: [PATCH 6/8] go fmt --- broker/service/handler/handler.go | 2 +- broker/service/service.go | 2 +- broker/service/subscriber.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/broker/service/handler/handler.go b/broker/service/handler/handler.go index e0b5c3d4..3c96fd0d 100644 --- a/broker/service/handler/handler.go +++ b/broker/service/handler/handler.go @@ -3,10 +3,10 @@ package handler import ( "context" - "github.com/micro/go-micro/util/log" "github.com/micro/go-micro/broker" pb "github.com/micro/go-micro/broker/service/proto" "github.com/micro/go-micro/errors" + "github.com/micro/go-micro/util/log" ) type Broker struct { diff --git a/broker/service/service.go b/broker/service/service.go index 58942fe3..b2fb1bd1 100644 --- a/broker/service/service.go +++ b/broker/service/service.go @@ -5,10 +5,10 @@ import ( "context" "time" - "github.com/micro/go-micro/util/log" "github.com/micro/go-micro/broker" pb "github.com/micro/go-micro/broker/service/proto" "github.com/micro/go-micro/client" + "github.com/micro/go-micro/util/log" ) type serviceBroker struct { diff --git a/broker/service/subscriber.go b/broker/service/subscriber.go index 02511196..085b578a 100644 --- a/broker/service/subscriber.go +++ b/broker/service/subscriber.go @@ -1,9 +1,9 @@ package service import ( - "github.com/micro/go-micro/util/log" "github.com/micro/go-micro/broker" pb "github.com/micro/go-micro/broker/service/proto" + "github.com/micro/go-micro/util/log" ) type serviceSub struct { From cfb846ee7ef2af8b583b6fcc062d023b73801c2c Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 4 Oct 2019 16:40:21 +0100 Subject: [PATCH 7/8] Fix race in cache --- registry/cache/cache.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/registry/cache/cache.go b/registry/cache/cache.go index 46460f7d..a17960e5 100644 --- a/registry/cache/cache.go +++ b/registry/cache/cache.go @@ -119,12 +119,10 @@ func (c *cache) get(service string) ([]*registry.Service, error) { // make a copy cp := registry.Copy(services) - // unlock the read lock - c.RUnlock() - // got services && within ttl so return cache if c.isValid(cp, ttl) { - // return servics + c.RUnlock() + // return services return cp, nil } @@ -163,8 +161,11 @@ func (c *cache) get(service string) ([]*registry.Service, error) { go c.run(service) } + // unlock the read lock + c.RUnlock() + // get and return services - return get(service, services) + return get(service, cp) } func (c *cache) set(service string, services []*registry.Service) { From 86984a8a8a2e37d6ae4de6bd3d6b0422361d9cf2 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Fri, 4 Oct 2019 16:44:21 +0100 Subject: [PATCH 8/8] Extend the stream timeout --- broker/service/service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/broker/service/service.go b/broker/service/service.go index b2fb1bd1..fff207fd 100644 --- a/broker/service/service.go +++ b/broker/service/service.go @@ -65,7 +65,7 @@ func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ... stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ Topic: topic, Queue: options.Queue, - }, client.WithAddress(b.Addrs...)) + }, client.WithAddress(b.Addrs...), client.WithRequestTimeout(time.Hour)) if err != nil { return nil, err } @@ -93,7 +93,7 @@ func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ... stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ Topic: topic, Queue: options.Queue, - }, client.WithAddress(b.Addrs...)) + }, client.WithAddress(b.Addrs...), client.WithRequestTimeout(time.Hour)) if err != nil { log.Debugf("Failed to resubscribe to topic %s: %v", topic, err) time.Sleep(time.Second)