diff --git a/broker/service/handler/handler.go b/broker/service/handler/handler.go new file mode 100644 index 00000000..6e78905f --- /dev/null +++ b/broker/service/handler/handler.go @@ -0,0 +1,57 @@ +package handler + +import ( + "context" + + "github.com/micro/go-micro/broker" + pb "github.com/micro/go-micro/broker/service/proto" + "github.com/micro/go-micro/errors" +) + +type Broker struct { + Broker broker.Broker +} + +func (b *Broker) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.Empty) error { + err := b.Broker.Publish(req.Topic, &broker.Message{ + Header: req.Message.Header, + Body: req.Message.Body, + }) + if err != nil { + return errors.InternalServerError("go.micro.broker", err.Error()) + } + return nil +} + +func (b *Broker) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Broker_SubscribeStream) error { + errChan := make(chan error, 1) + + // message handler to stream back messages from broker + handler := func(p broker.Event) error { + if err := stream.Send(&pb.Message{ + Header: p.Message().Header, + Body: p.Message().Body, + }); err != nil { + select { + case errChan <- err: + return err + default: + return err + } + } + return nil + } + + sub, err := b.Broker.Subscribe(req.Topic, handler, broker.Queue(req.Queue)) + if err != nil { + return errors.InternalServerError("go.micro.broker", err.Error()) + } + defer sub.Unsubscribe() + + select { + case <-ctx.Done(): + return nil + case <-errChan: + return err + } +} diff --git a/broker/service/proto/broker.micro.go b/broker/service/proto/broker.micro.go new file mode 100644 index 00000000..577132c5 --- /dev/null +++ b/broker/service/proto/broker.micro.go @@ -0,0 +1,173 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: github.com/micro/go-micro/broker/proto/broker.proto + +package go_micro_broker + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +import ( + context "context" + client "github.com/micro/go-micro/client" + server "github.com/micro/go-micro/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ client.Option +var _ server.Option + +// Client API for Broker service + +type BrokerService interface { + Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*Empty, error) + Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Broker_SubscribeService, error) +} + +type brokerService struct { + c client.Client + name string +} + +func NewBrokerService(name string, c client.Client) BrokerService { + if c == nil { + c = client.NewClient() + } + if len(name) == 0 { + name = "go.micro.broker" + } + return &brokerService{ + c: c, + name: name, + } +} + +func (c *brokerService) Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*Empty, error) { + req := c.c.NewRequest(c.name, "Broker.Publish", in) + out := new(Empty) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *brokerService) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Broker_SubscribeService, error) { + req := c.c.NewRequest(c.name, "Broker.Subscribe", &SubscribeRequest{}) + stream, err := c.c.Stream(ctx, req, opts...) + if err != nil { + return nil, err + } + if err := stream.Send(in); err != nil { + return nil, err + } + return &brokerServiceSubscribe{stream}, nil +} + +type Broker_SubscribeService interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Recv() (*Message, error) +} + +type brokerServiceSubscribe struct { + stream client.Stream +} + +func (x *brokerServiceSubscribe) Close() error { + return x.stream.Close() +} + +func (x *brokerServiceSubscribe) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *brokerServiceSubscribe) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *brokerServiceSubscribe) Recv() (*Message, error) { + m := new(Message) + err := x.stream.Recv(m) + if err != nil { + return nil, err + } + return m, nil +} + +// Server API for Broker service + +type BrokerHandler interface { + Publish(context.Context, *PublishRequest, *Empty) error + Subscribe(context.Context, *SubscribeRequest, Broker_SubscribeStream) error +} + +func RegisterBrokerHandler(s server.Server, hdlr BrokerHandler, opts ...server.HandlerOption) error { + type broker interface { + Publish(ctx context.Context, in *PublishRequest, out *Empty) error + Subscribe(ctx context.Context, stream server.Stream) error + } + type Broker struct { + broker + } + h := &brokerHandler{hdlr} + return s.Handle(s.NewHandler(&Broker{h}, opts...)) +} + +type brokerHandler struct { + BrokerHandler +} + +func (h *brokerHandler) Publish(ctx context.Context, in *PublishRequest, out *Empty) error { + return h.BrokerHandler.Publish(ctx, in, out) +} + +func (h *brokerHandler) Subscribe(ctx context.Context, stream server.Stream) error { + m := new(SubscribeRequest) + if err := stream.Recv(m); err != nil { + return err + } + return h.BrokerHandler.Subscribe(ctx, m, &brokerSubscribeStream{stream}) +} + +type Broker_SubscribeStream interface { + SendMsg(interface{}) error + RecvMsg(interface{}) error + Close() error + Send(*Message) error +} + +type brokerSubscribeStream struct { + stream server.Stream +} + +func (x *brokerSubscribeStream) Close() error { + return x.stream.Close() +} + +func (x *brokerSubscribeStream) SendMsg(m interface{}) error { + return x.stream.Send(m) +} + +func (x *brokerSubscribeStream) RecvMsg(m interface{}) error { + return x.stream.Recv(m) +} + +func (x *brokerSubscribeStream) Send(m *Message) error { + return x.stream.Send(m) +} diff --git a/broker/service/proto/broker.pb.go b/broker/service/proto/broker.pb.go new file mode 100644 index 00000000..b54a5480 --- /dev/null +++ b/broker/service/proto/broker.pb.go @@ -0,0 +1,364 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: github.com/micro/go-micro/broker/proto/broker.proto + +package go_micro_broker + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Empty struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Empty) Reset() { *m = Empty{} } +func (m *Empty) String() string { return proto.CompactTextString(m) } +func (*Empty) ProtoMessage() {} +func (*Empty) Descriptor() ([]byte, []int) { + return fileDescriptor_5edf81766900dd99, []int{0} +} + +func (m *Empty) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Empty.Unmarshal(m, b) +} +func (m *Empty) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Empty.Marshal(b, m, deterministic) +} +func (m *Empty) XXX_Merge(src proto.Message) { + xxx_messageInfo_Empty.Merge(m, src) +} +func (m *Empty) XXX_Size() int { + return xxx_messageInfo_Empty.Size(m) +} +func (m *Empty) XXX_DiscardUnknown() { + xxx_messageInfo_Empty.DiscardUnknown(m) +} + +var xxx_messageInfo_Empty proto.InternalMessageInfo + +type PublishRequest struct { + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Message *Message `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PublishRequest) Reset() { *m = PublishRequest{} } +func (m *PublishRequest) String() string { return proto.CompactTextString(m) } +func (*PublishRequest) ProtoMessage() {} +func (*PublishRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_5edf81766900dd99, []int{1} +} + +func (m *PublishRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_PublishRequest.Unmarshal(m, b) +} +func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_PublishRequest.Marshal(b, m, deterministic) +} +func (m *PublishRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_PublishRequest.Merge(m, src) +} +func (m *PublishRequest) XXX_Size() int { + return xxx_messageInfo_PublishRequest.Size(m) +} +func (m *PublishRequest) XXX_DiscardUnknown() { + xxx_messageInfo_PublishRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_PublishRequest proto.InternalMessageInfo + +func (m *PublishRequest) GetTopic() string { + if m != nil { + return m.Topic + } + return "" +} + +func (m *PublishRequest) GetMessage() *Message { + if m != nil { + return m.Message + } + return nil +} + +type SubscribeRequest struct { + Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` + Queue string `protobuf:"bytes,2,opt,name=queue,proto3" json:"queue,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SubscribeRequest) Reset() { *m = SubscribeRequest{} } +func (m *SubscribeRequest) String() string { return proto.CompactTextString(m) } +func (*SubscribeRequest) ProtoMessage() {} +func (*SubscribeRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_5edf81766900dd99, []int{2} +} + +func (m *SubscribeRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_SubscribeRequest.Unmarshal(m, b) +} +func (m *SubscribeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_SubscribeRequest.Marshal(b, m, deterministic) +} +func (m *SubscribeRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_SubscribeRequest.Merge(m, src) +} +func (m *SubscribeRequest) XXX_Size() int { + return xxx_messageInfo_SubscribeRequest.Size(m) +} +func (m *SubscribeRequest) XXX_DiscardUnknown() { + xxx_messageInfo_SubscribeRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_SubscribeRequest proto.InternalMessageInfo + +func (m *SubscribeRequest) GetTopic() string { + if m != nil { + return m.Topic + } + return "" +} + +func (m *SubscribeRequest) GetQueue() string { + if m != nil { + return m.Queue + } + return "" +} + +type Message struct { + Header map[string]string `protobuf:"bytes,1,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Body []byte `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_5edf81766900dd99, []int{3} +} + +func (m *Message) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Message.Unmarshal(m, b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return xxx_messageInfo_Message.Size(m) +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetHeader() map[string]string { + if m != nil { + return m.Header + } + return nil +} + +func (m *Message) GetBody() []byte { + if m != nil { + return m.Body + } + return nil +} + +func init() { + proto.RegisterType((*Empty)(nil), "go.micro.broker.Empty") + proto.RegisterType((*PublishRequest)(nil), "go.micro.broker.PublishRequest") + proto.RegisterType((*SubscribeRequest)(nil), "go.micro.broker.SubscribeRequest") + proto.RegisterType((*Message)(nil), "go.micro.broker.Message") + proto.RegisterMapType((map[string]string)(nil), "go.micro.broker.Message.HeaderEntry") +} + +func init() { + proto.RegisterFile("github.com/micro/go-micro/broker/proto/broker.proto", fileDescriptor_5edf81766900dd99) +} + +var fileDescriptor_5edf81766900dd99 = []byte{ + // 309 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x91, 0xcf, 0x4a, 0xf3, 0x40, + 0x14, 0xc5, 0x3b, 0xed, 0xd7, 0x86, 0xde, 0x7e, 0x68, 0x19, 0x8a, 0x84, 0x6e, 0x8c, 0xc1, 0x45, + 0x36, 0x4e, 0x24, 0xdd, 0xa8, 0x88, 0x0b, 0xb1, 0xe0, 0x42, 0x41, 0xc6, 0x9d, 0xbb, 0x4c, 0x3a, + 0x24, 0xa1, 0x8d, 0x93, 0x4e, 0x66, 0x84, 0xbc, 0x88, 0x2b, 0x1f, 0x56, 0x3a, 0x93, 0xfa, 0xa7, + 0xa1, 0xee, 0xee, 0x49, 0x7e, 0x73, 0xee, 0xe1, 0x5c, 0x98, 0xa5, 0xb9, 0xca, 0x34, 0x23, 0x89, + 0x28, 0xc2, 0x22, 0x4f, 0xa4, 0x08, 0x53, 0x71, 0x66, 0x07, 0x26, 0xc5, 0x92, 0xcb, 0xb0, 0x94, + 0x42, 0x6d, 0x05, 0x31, 0x02, 0x1f, 0xa6, 0x82, 0x18, 0x86, 0xd8, 0xcf, 0xbe, 0x03, 0xfd, 0x79, + 0x51, 0xaa, 0xda, 0x7f, 0x81, 0x83, 0x27, 0xcd, 0x56, 0x79, 0x95, 0x51, 0xbe, 0xd6, 0xbc, 0x52, + 0x78, 0x02, 0x7d, 0x25, 0xca, 0x3c, 0x71, 0x91, 0x87, 0x82, 0x21, 0xb5, 0x02, 0x47, 0xe0, 0x14, + 0xbc, 0xaa, 0xe2, 0x94, 0xbb, 0x5d, 0x0f, 0x05, 0xa3, 0xc8, 0x25, 0x3b, 0x9e, 0xe4, 0xd1, 0xfe, + 0xa7, 0x5b, 0xd0, 0xbf, 0x81, 0xf1, 0xb3, 0x66, 0x55, 0x22, 0x73, 0xc6, 0xff, 0x76, 0x9f, 0x40, + 0x7f, 0xad, 0xb9, 0xb6, 0xde, 0x43, 0x6a, 0x85, 0xff, 0x8e, 0xc0, 0x69, 0x4c, 0xf1, 0x35, 0x0c, + 0x32, 0x1e, 0x2f, 0xb8, 0x74, 0x91, 0xd7, 0x0b, 0x46, 0xd1, 0xe9, 0xbe, 0xf5, 0xe4, 0xde, 0x60, + 0xf3, 0x57, 0x25, 0x6b, 0xda, 0xbc, 0xc1, 0x18, 0xfe, 0x31, 0xb1, 0xa8, 0x8d, 0xfd, 0x7f, 0x6a, + 0xe6, 0xe9, 0x25, 0x8c, 0x7e, 0xa0, 0x78, 0x0c, 0xbd, 0x25, 0xaf, 0x9b, 0x58, 0x9b, 0x71, 0x13, + 0xea, 0x2d, 0x5e, 0x7d, 0x87, 0x32, 0xe2, 0xaa, 0x7b, 0x81, 0xa2, 0x0f, 0x04, 0x83, 0x5b, 0xb3, + 0x15, 0xdf, 0x81, 0xd3, 0xf4, 0x87, 0x8f, 0x5b, 0x91, 0x7e, 0x37, 0x3b, 0x3d, 0x6a, 0x01, 0xf6, + 0x06, 0x1d, 0xfc, 0x00, 0xc3, 0xaf, 0xa6, 0xf0, 0x49, 0x0b, 0xdb, 0x6d, 0x71, 0xba, 0xb7, 0x7c, + 0xbf, 0x73, 0x8e, 0xd8, 0xc0, 0x1c, 0x7d, 0xf6, 0x19, 0x00, 0x00, 0xff, 0xff, 0x25, 0x38, 0xfa, + 0x02, 0x2b, 0x02, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// BrokerClient is the client API for Broker service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type BrokerClient interface { + Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*Empty, error) + Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Broker_SubscribeClient, error) +} + +type brokerClient struct { + cc *grpc.ClientConn +} + +func NewBrokerClient(cc *grpc.ClientConn) BrokerClient { + return &brokerClient{cc} +} + +func (c *brokerClient) Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*Empty, error) { + out := new(Empty) + err := c.cc.Invoke(ctx, "/go.micro.broker.Broker/Publish", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *brokerClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Broker_SubscribeClient, error) { + stream, err := c.cc.NewStream(ctx, &_Broker_serviceDesc.Streams[0], "/go.micro.broker.Broker/Subscribe", opts...) + if err != nil { + return nil, err + } + x := &brokerSubscribeClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Broker_SubscribeClient interface { + Recv() (*Message, error) + grpc.ClientStream +} + +type brokerSubscribeClient struct { + grpc.ClientStream +} + +func (x *brokerSubscribeClient) Recv() (*Message, error) { + m := new(Message) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// BrokerServer is the server API for Broker service. +type BrokerServer interface { + Publish(context.Context, *PublishRequest) (*Empty, error) + Subscribe(*SubscribeRequest, Broker_SubscribeServer) error +} + +func RegisterBrokerServer(s *grpc.Server, srv BrokerServer) { + s.RegisterService(&_Broker_serviceDesc, srv) +} + +func _Broker_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PublishRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BrokerServer).Publish(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/go.micro.broker.Broker/Publish", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BrokerServer).Publish(ctx, req.(*PublishRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Broker_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SubscribeRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(BrokerServer).Subscribe(m, &brokerSubscribeServer{stream}) +} + +type Broker_SubscribeServer interface { + Send(*Message) error + grpc.ServerStream +} + +type brokerSubscribeServer struct { + grpc.ServerStream +} + +func (x *brokerSubscribeServer) Send(m *Message) error { + return x.ServerStream.SendMsg(m) +} + +var _Broker_serviceDesc = grpc.ServiceDesc{ + ServiceName: "go.micro.broker.Broker", + HandlerType: (*BrokerServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Publish", + Handler: _Broker_Publish_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Subscribe", + Handler: _Broker_Subscribe_Handler, + ServerStreams: true, + }, + }, + Metadata: "github.com/micro/go-micro/broker/proto/broker.proto", +} diff --git a/broker/service/proto/broker.proto b/broker/service/proto/broker.proto new file mode 100644 index 00000000..3fc877e6 --- /dev/null +++ b/broker/service/proto/broker.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package go.micro.broker; + +service Broker { + rpc Publish(PublishRequest) returns (Empty) {}; + rpc Subscribe(SubscribeRequest) returns (stream Message) {}; +} + +message Empty {} + +message PublishRequest { + string topic = 1; + Message message = 2; +} + +message SubscribeRequest { + string topic = 1; + string queue = 2; +} + +message Message { + map header = 1; + bytes body = 2; +} diff --git a/broker/service/service.go b/broker/service/service.go new file mode 100644 index 00000000..63c09624 --- /dev/null +++ b/broker/service/service.go @@ -0,0 +1,102 @@ +// Package service provides the broker service client +package service + +import ( + "context" + + "github.com/micro/go-micro/broker" + pb "github.com/micro/go-micro/broker/service/proto" + "github.com/micro/go-micro/client" +) + +type serviceBroker struct { + Addrs []string + Client pb.BrokerService + options broker.Options +} + +var ( + DefaultName = "go.micro.broker" +) + +func (b *serviceBroker) Address() string { + return b.Addrs[0] +} + +func (b *serviceBroker) Connect() error { + return nil +} + +func (b *serviceBroker) Disconnect() error { + return nil +} + +func (b *serviceBroker) Init(opts ...broker.Option) error { + for _, o := range opts { + o(&b.options) + } + return nil +} + +func (b *serviceBroker) Options() broker.Options { + return b.options +} + +func (b *serviceBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { + _, err := b.Client.Publish(context.TODO(), &pb.PublishRequest{ + Topic: topic, + Message: &pb.Message{ + Header: msg.Header, + Body: msg.Body, + }, + }, client.WithAddress(b.Addrs...)) + return err +} + +func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + var options broker.SubscribeOptions + for _, o := range opts { + o(&options) + } + stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ + Topic: topic, + Queue: options.Queue, + }, client.WithAddress(b.Addrs...)) + if err != nil { + return nil, err + } + + sub := &serviceSub{ + topic: topic, + queue: options.Queue, + handler: handler, + stream: stream, + closed: make(chan bool), + options: options, + } + go sub.run() + + return sub, nil +} + +func (b *serviceBroker) String() string { + return "service" +} + +func NewBroker(opts ...broker.Option) broker.Broker { + var options broker.Options + for _, o := range opts { + o(&options) + } + + addrs := options.Addrs + if len(addrs) == 0 { + addrs = []string{"127.0.0.1:8001"} + } + + return &serviceBroker{ + Addrs: addrs, + Client: pb.NewBrokerService(DefaultName, client.DefaultClient), + options: options, + } +} diff --git a/broker/service/subscriber.go b/broker/service/subscriber.go new file mode 100644 index 00000000..09a942c8 --- /dev/null +++ b/broker/service/subscriber.go @@ -0,0 +1,79 @@ +package service + +import ( + "github.com/micro/go-micro/broker" + pb "github.com/micro/go-micro/broker/service/proto" +) + +type serviceSub struct { + topic string + queue string + handler broker.Handler + stream pb.Broker_SubscribeService + closed chan bool + options broker.SubscribeOptions +} + +type serviceEvent struct { + topic string + message *broker.Message +} + +func (s *serviceEvent) Topic() string { + return s.topic +} + +func (s *serviceEvent) Message() *broker.Message { + return s.message +} + +func (s *serviceEvent) Ack() error { + return nil +} + +func (s *serviceSub) run() { + exit := make(chan bool) + go func() { + select { + case <-exit: + return + case <-s.closed: + s.stream.Close() + } + }() + + for { + // TODO: do not fail silently + msg, err := s.stream.Recv() + if err != nil { + close(exit) + return + } + // TODO: handle error + s.handler(&serviceEvent{ + topic: s.topic, + message: &broker.Message{ + Header: msg.Header, + Body: msg.Body, + }, + }) + } +} + +func (s *serviceSub) Options() broker.SubscribeOptions { + return s.options +} + +func (s *serviceSub) Topic() string { + return s.topic +} + +func (s *serviceSub) Unsubscribe() error { + select { + case <-s.closed: + return nil + default: + close(s.closed) + } + return nil +}