Merge pull request #815 from micro/broker
Add broker service implementation
This commit is contained in:
		| @@ -324,15 +324,21 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { | |||||||
| 	p := &httpEvent{m: m, t: topic} | 	p := &httpEvent{m: m, t: topic} | ||||||
| 	id := req.Form.Get("id") | 	id := req.Form.Get("id") | ||||||
|  |  | ||||||
|  | 	var subs []Handler | ||||||
|  |  | ||||||
| 	h.RLock() | 	h.RLock() | ||||||
| 	for _, subscriber := range h.subscribers[topic] { | 	for _, subscriber := range h.subscribers[topic] { | ||||||
| 		if id == subscriber.id { | 		if id != subscriber.id { | ||||||
| 			// sub is sync; crufty rate limiting | 			continue | ||||||
| 			// so we don't hose the cpu |  | ||||||
| 			subscriber.fn(p) |  | ||||||
| 		} | 		} | ||||||
|  | 		subs = append(subs, subscriber.fn) | ||||||
| 	} | 	} | ||||||
| 	h.RUnlock() | 	h.RUnlock() | ||||||
|  |  | ||||||
|  | 	// execute the handler | ||||||
|  | 	for _, fn := range subs { | ||||||
|  | 		fn(p) | ||||||
|  | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (h *httpBroker) Address() string { | func (h *httpBroker) Address() string { | ||||||
| @@ -420,7 +426,6 @@ func (h *httpBroker) Connect() error { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (h *httpBroker) Disconnect() error { | func (h *httpBroker) Disconnect() error { | ||||||
|  |  | ||||||
| 	h.RLock() | 	h.RLock() | ||||||
| 	if !h.running { | 	if !h.running { | ||||||
| 		h.RUnlock() | 		h.RUnlock() | ||||||
|   | |||||||
							
								
								
									
										66
									
								
								broker/service/handler/handler.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										66
									
								
								broker/service/handler/handler.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,66 @@ | |||||||
|  | package handler | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  |  | ||||||
|  | 	"github.com/micro/go-micro/broker" | ||||||
|  | 	pb "github.com/micro/go-micro/broker/service/proto" | ||||||
|  | 	"github.com/micro/go-micro/errors" | ||||||
|  | 	"github.com/micro/go-micro/util/log" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type Broker struct { | ||||||
|  | 	Broker broker.Broker | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (b *Broker) Publish(ctx context.Context, req *pb.PublishRequest, rsp *pb.Empty) error { | ||||||
|  | 	log.Debugf("Publishing message to %s topic", req.Topic) | ||||||
|  | 	err := b.Broker.Publish(req.Topic, &broker.Message{ | ||||||
|  | 		Header: req.Message.Header, | ||||||
|  | 		Body:   req.Message.Body, | ||||||
|  | 	}) | ||||||
|  | 	log.Debugf("Published message to %s topic", req.Topic) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return errors.InternalServerError("go.micro.broker", err.Error()) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (b *Broker) Subscribe(ctx context.Context, req *pb.SubscribeRequest, stream pb.Broker_SubscribeStream) error { | ||||||
|  | 	errChan := make(chan error, 1) | ||||||
|  |  | ||||||
|  | 	// message handler to stream back messages from broker | ||||||
|  | 	handler := func(p broker.Event) error { | ||||||
|  | 		if err := stream.Send(&pb.Message{ | ||||||
|  | 			Header: p.Message().Header, | ||||||
|  | 			Body:   p.Message().Body, | ||||||
|  | 		}); err != nil { | ||||||
|  | 			select { | ||||||
|  | 			case errChan <- err: | ||||||
|  | 				return err | ||||||
|  | 			default: | ||||||
|  | 				return err | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	log.Debugf("Subscribing to %s topic", req.Topic) | ||||||
|  | 	sub, err := b.Broker.Subscribe(req.Topic, handler, broker.Queue(req.Queue)) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return errors.InternalServerError("go.micro.broker", err.Error()) | ||||||
|  | 	} | ||||||
|  | 	defer func() { | ||||||
|  | 		log.Debugf("Unsubscribing from topic %s", req.Topic) | ||||||
|  | 		sub.Unsubscribe() | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	select { | ||||||
|  | 	case <-ctx.Done(): | ||||||
|  | 		log.Debugf("Context done for subscription to topic %s", req.Topic) | ||||||
|  | 		return nil | ||||||
|  | 	case err := <-errChan: | ||||||
|  | 		log.Debugf("Subscription error for topic %s: %v", req.Topic, err) | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | } | ||||||
							
								
								
									
										173
									
								
								broker/service/proto/broker.micro.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										173
									
								
								broker/service/proto/broker.micro.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,173 @@ | |||||||
|  | // Code generated by protoc-gen-micro. DO NOT EDIT. | ||||||
|  | // source: github.com/micro/go-micro/broker/proto/broker.proto | ||||||
|  |  | ||||||
|  | package go_micro_broker | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	fmt "fmt" | ||||||
|  | 	proto "github.com/golang/protobuf/proto" | ||||||
|  | 	math "math" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	context "context" | ||||||
|  | 	client "github.com/micro/go-micro/client" | ||||||
|  | 	server "github.com/micro/go-micro/server" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Reference imports to suppress errors if they are not otherwise used. | ||||||
|  | var _ = proto.Marshal | ||||||
|  | var _ = fmt.Errorf | ||||||
|  | var _ = math.Inf | ||||||
|  |  | ||||||
|  | // This is a compile-time assertion to ensure that this generated file | ||||||
|  | // is compatible with the proto package it is being compiled against. | ||||||
|  | // A compilation error at this line likely means your copy of the | ||||||
|  | // proto package needs to be updated. | ||||||
|  | const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package | ||||||
|  |  | ||||||
|  | // Reference imports to suppress errors if they are not otherwise used. | ||||||
|  | var _ context.Context | ||||||
|  | var _ client.Option | ||||||
|  | var _ server.Option | ||||||
|  |  | ||||||
|  | // Client API for Broker service | ||||||
|  |  | ||||||
|  | type BrokerService interface { | ||||||
|  | 	Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*Empty, error) | ||||||
|  | 	Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Broker_SubscribeService, error) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type brokerService struct { | ||||||
|  | 	c    client.Client | ||||||
|  | 	name string | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewBrokerService(name string, c client.Client) BrokerService { | ||||||
|  | 	if c == nil { | ||||||
|  | 		c = client.NewClient() | ||||||
|  | 	} | ||||||
|  | 	if len(name) == 0 { | ||||||
|  | 		name = "go.micro.broker" | ||||||
|  | 	} | ||||||
|  | 	return &brokerService{ | ||||||
|  | 		c:    c, | ||||||
|  | 		name: name, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *brokerService) Publish(ctx context.Context, in *PublishRequest, opts ...client.CallOption) (*Empty, error) { | ||||||
|  | 	req := c.c.NewRequest(c.name, "Broker.Publish", in) | ||||||
|  | 	out := new(Empty) | ||||||
|  | 	err := c.c.Call(ctx, req, out, opts...) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return out, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *brokerService) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...client.CallOption) (Broker_SubscribeService, error) { | ||||||
|  | 	req := c.c.NewRequest(c.name, "Broker.Subscribe", &SubscribeRequest{}) | ||||||
|  | 	stream, err := c.c.Stream(ctx, req, opts...) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	if err := stream.Send(in); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return &brokerServiceSubscribe{stream}, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Broker_SubscribeService interface { | ||||||
|  | 	SendMsg(interface{}) error | ||||||
|  | 	RecvMsg(interface{}) error | ||||||
|  | 	Close() error | ||||||
|  | 	Recv() (*Message, error) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type brokerServiceSubscribe struct { | ||||||
|  | 	stream client.Stream | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (x *brokerServiceSubscribe) Close() error { | ||||||
|  | 	return x.stream.Close() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (x *brokerServiceSubscribe) SendMsg(m interface{}) error { | ||||||
|  | 	return x.stream.Send(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (x *brokerServiceSubscribe) RecvMsg(m interface{}) error { | ||||||
|  | 	return x.stream.Recv(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (x *brokerServiceSubscribe) Recv() (*Message, error) { | ||||||
|  | 	m := new(Message) | ||||||
|  | 	err := x.stream.Recv(m) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return m, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Server API for Broker service | ||||||
|  |  | ||||||
|  | type BrokerHandler interface { | ||||||
|  | 	Publish(context.Context, *PublishRequest, *Empty) error | ||||||
|  | 	Subscribe(context.Context, *SubscribeRequest, Broker_SubscribeStream) error | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func RegisterBrokerHandler(s server.Server, hdlr BrokerHandler, opts ...server.HandlerOption) error { | ||||||
|  | 	type broker interface { | ||||||
|  | 		Publish(ctx context.Context, in *PublishRequest, out *Empty) error | ||||||
|  | 		Subscribe(ctx context.Context, stream server.Stream) error | ||||||
|  | 	} | ||||||
|  | 	type Broker struct { | ||||||
|  | 		broker | ||||||
|  | 	} | ||||||
|  | 	h := &brokerHandler{hdlr} | ||||||
|  | 	return s.Handle(s.NewHandler(&Broker{h}, opts...)) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type brokerHandler struct { | ||||||
|  | 	BrokerHandler | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (h *brokerHandler) Publish(ctx context.Context, in *PublishRequest, out *Empty) error { | ||||||
|  | 	return h.BrokerHandler.Publish(ctx, in, out) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (h *brokerHandler) Subscribe(ctx context.Context, stream server.Stream) error { | ||||||
|  | 	m := new(SubscribeRequest) | ||||||
|  | 	if err := stream.Recv(m); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return h.BrokerHandler.Subscribe(ctx, m, &brokerSubscribeStream{stream}) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Broker_SubscribeStream interface { | ||||||
|  | 	SendMsg(interface{}) error | ||||||
|  | 	RecvMsg(interface{}) error | ||||||
|  | 	Close() error | ||||||
|  | 	Send(*Message) error | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type brokerSubscribeStream struct { | ||||||
|  | 	stream server.Stream | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (x *brokerSubscribeStream) Close() error { | ||||||
|  | 	return x.stream.Close() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (x *brokerSubscribeStream) SendMsg(m interface{}) error { | ||||||
|  | 	return x.stream.Send(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (x *brokerSubscribeStream) RecvMsg(m interface{}) error { | ||||||
|  | 	return x.stream.Recv(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (x *brokerSubscribeStream) Send(m *Message) error { | ||||||
|  | 	return x.stream.Send(m) | ||||||
|  | } | ||||||
							
								
								
									
										364
									
								
								broker/service/proto/broker.pb.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										364
									
								
								broker/service/proto/broker.pb.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,364 @@ | |||||||
|  | // Code generated by protoc-gen-go. DO NOT EDIT. | ||||||
|  | // source: github.com/micro/go-micro/broker/proto/broker.proto | ||||||
|  |  | ||||||
|  | package go_micro_broker | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	context "context" | ||||||
|  | 	fmt "fmt" | ||||||
|  | 	proto "github.com/golang/protobuf/proto" | ||||||
|  | 	grpc "google.golang.org/grpc" | ||||||
|  | 	math "math" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // Reference imports to suppress errors if they are not otherwise used. | ||||||
|  | var _ = proto.Marshal | ||||||
|  | var _ = fmt.Errorf | ||||||
|  | var _ = math.Inf | ||||||
|  |  | ||||||
|  | // This is a compile-time assertion to ensure that this generated file | ||||||
|  | // is compatible with the proto package it is being compiled against. | ||||||
|  | // A compilation error at this line likely means your copy of the | ||||||
|  | // proto package needs to be updated. | ||||||
|  | const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package | ||||||
|  |  | ||||||
|  | type Empty struct { | ||||||
|  | 	XXX_NoUnkeyedLiteral struct{} `json:"-"` | ||||||
|  | 	XXX_unrecognized     []byte   `json:"-"` | ||||||
|  | 	XXX_sizecache        int32    `json:"-"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Empty) Reset()         { *m = Empty{} } | ||||||
|  | func (m *Empty) String() string { return proto.CompactTextString(m) } | ||||||
|  | func (*Empty) ProtoMessage()    {} | ||||||
|  | func (*Empty) Descriptor() ([]byte, []int) { | ||||||
|  | 	return fileDescriptor_5edf81766900dd99, []int{0} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Empty) XXX_Unmarshal(b []byte) error { | ||||||
|  | 	return xxx_messageInfo_Empty.Unmarshal(m, b) | ||||||
|  | } | ||||||
|  | func (m *Empty) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { | ||||||
|  | 	return xxx_messageInfo_Empty.Marshal(b, m, deterministic) | ||||||
|  | } | ||||||
|  | func (m *Empty) XXX_Merge(src proto.Message) { | ||||||
|  | 	xxx_messageInfo_Empty.Merge(m, src) | ||||||
|  | } | ||||||
|  | func (m *Empty) XXX_Size() int { | ||||||
|  | 	return xxx_messageInfo_Empty.Size(m) | ||||||
|  | } | ||||||
|  | func (m *Empty) XXX_DiscardUnknown() { | ||||||
|  | 	xxx_messageInfo_Empty.DiscardUnknown(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var xxx_messageInfo_Empty proto.InternalMessageInfo | ||||||
|  |  | ||||||
|  | type PublishRequest struct { | ||||||
|  | 	Topic                string   `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` | ||||||
|  | 	Message              *Message `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` | ||||||
|  | 	XXX_NoUnkeyedLiteral struct{} `json:"-"` | ||||||
|  | 	XXX_unrecognized     []byte   `json:"-"` | ||||||
|  | 	XXX_sizecache        int32    `json:"-"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *PublishRequest) Reset()         { *m = PublishRequest{} } | ||||||
|  | func (m *PublishRequest) String() string { return proto.CompactTextString(m) } | ||||||
|  | func (*PublishRequest) ProtoMessage()    {} | ||||||
|  | func (*PublishRequest) Descriptor() ([]byte, []int) { | ||||||
|  | 	return fileDescriptor_5edf81766900dd99, []int{1} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *PublishRequest) XXX_Unmarshal(b []byte) error { | ||||||
|  | 	return xxx_messageInfo_PublishRequest.Unmarshal(m, b) | ||||||
|  | } | ||||||
|  | func (m *PublishRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { | ||||||
|  | 	return xxx_messageInfo_PublishRequest.Marshal(b, m, deterministic) | ||||||
|  | } | ||||||
|  | func (m *PublishRequest) XXX_Merge(src proto.Message) { | ||||||
|  | 	xxx_messageInfo_PublishRequest.Merge(m, src) | ||||||
|  | } | ||||||
|  | func (m *PublishRequest) XXX_Size() int { | ||||||
|  | 	return xxx_messageInfo_PublishRequest.Size(m) | ||||||
|  | } | ||||||
|  | func (m *PublishRequest) XXX_DiscardUnknown() { | ||||||
|  | 	xxx_messageInfo_PublishRequest.DiscardUnknown(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var xxx_messageInfo_PublishRequest proto.InternalMessageInfo | ||||||
|  |  | ||||||
|  | func (m *PublishRequest) GetTopic() string { | ||||||
|  | 	if m != nil { | ||||||
|  | 		return m.Topic | ||||||
|  | 	} | ||||||
|  | 	return "" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *PublishRequest) GetMessage() *Message { | ||||||
|  | 	if m != nil { | ||||||
|  | 		return m.Message | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type SubscribeRequest struct { | ||||||
|  | 	Topic                string   `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` | ||||||
|  | 	Queue                string   `protobuf:"bytes,2,opt,name=queue,proto3" json:"queue,omitempty"` | ||||||
|  | 	XXX_NoUnkeyedLiteral struct{} `json:"-"` | ||||||
|  | 	XXX_unrecognized     []byte   `json:"-"` | ||||||
|  | 	XXX_sizecache        int32    `json:"-"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *SubscribeRequest) Reset()         { *m = SubscribeRequest{} } | ||||||
|  | func (m *SubscribeRequest) String() string { return proto.CompactTextString(m) } | ||||||
|  | func (*SubscribeRequest) ProtoMessage()    {} | ||||||
|  | func (*SubscribeRequest) Descriptor() ([]byte, []int) { | ||||||
|  | 	return fileDescriptor_5edf81766900dd99, []int{2} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *SubscribeRequest) XXX_Unmarshal(b []byte) error { | ||||||
|  | 	return xxx_messageInfo_SubscribeRequest.Unmarshal(m, b) | ||||||
|  | } | ||||||
|  | func (m *SubscribeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { | ||||||
|  | 	return xxx_messageInfo_SubscribeRequest.Marshal(b, m, deterministic) | ||||||
|  | } | ||||||
|  | func (m *SubscribeRequest) XXX_Merge(src proto.Message) { | ||||||
|  | 	xxx_messageInfo_SubscribeRequest.Merge(m, src) | ||||||
|  | } | ||||||
|  | func (m *SubscribeRequest) XXX_Size() int { | ||||||
|  | 	return xxx_messageInfo_SubscribeRequest.Size(m) | ||||||
|  | } | ||||||
|  | func (m *SubscribeRequest) XXX_DiscardUnknown() { | ||||||
|  | 	xxx_messageInfo_SubscribeRequest.DiscardUnknown(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var xxx_messageInfo_SubscribeRequest proto.InternalMessageInfo | ||||||
|  |  | ||||||
|  | func (m *SubscribeRequest) GetTopic() string { | ||||||
|  | 	if m != nil { | ||||||
|  | 		return m.Topic | ||||||
|  | 	} | ||||||
|  | 	return "" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *SubscribeRequest) GetQueue() string { | ||||||
|  | 	if m != nil { | ||||||
|  | 		return m.Queue | ||||||
|  | 	} | ||||||
|  | 	return "" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Message struct { | ||||||
|  | 	Header               map[string]string `protobuf:"bytes,1,rep,name=header,proto3" json:"header,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` | ||||||
|  | 	Body                 []byte            `protobuf:"bytes,2,opt,name=body,proto3" json:"body,omitempty"` | ||||||
|  | 	XXX_NoUnkeyedLiteral struct{}          `json:"-"` | ||||||
|  | 	XXX_unrecognized     []byte            `json:"-"` | ||||||
|  | 	XXX_sizecache        int32             `json:"-"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Message) Reset()         { *m = Message{} } | ||||||
|  | func (m *Message) String() string { return proto.CompactTextString(m) } | ||||||
|  | func (*Message) ProtoMessage()    {} | ||||||
|  | func (*Message) Descriptor() ([]byte, []int) { | ||||||
|  | 	return fileDescriptor_5edf81766900dd99, []int{3} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Message) XXX_Unmarshal(b []byte) error { | ||||||
|  | 	return xxx_messageInfo_Message.Unmarshal(m, b) | ||||||
|  | } | ||||||
|  | func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { | ||||||
|  | 	return xxx_messageInfo_Message.Marshal(b, m, deterministic) | ||||||
|  | } | ||||||
|  | func (m *Message) XXX_Merge(src proto.Message) { | ||||||
|  | 	xxx_messageInfo_Message.Merge(m, src) | ||||||
|  | } | ||||||
|  | func (m *Message) XXX_Size() int { | ||||||
|  | 	return xxx_messageInfo_Message.Size(m) | ||||||
|  | } | ||||||
|  | func (m *Message) XXX_DiscardUnknown() { | ||||||
|  | 	xxx_messageInfo_Message.DiscardUnknown(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var xxx_messageInfo_Message proto.InternalMessageInfo | ||||||
|  |  | ||||||
|  | func (m *Message) GetHeader() map[string]string { | ||||||
|  | 	if m != nil { | ||||||
|  | 		return m.Header | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (m *Message) GetBody() []byte { | ||||||
|  | 	if m != nil { | ||||||
|  | 		return m.Body | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func init() { | ||||||
|  | 	proto.RegisterType((*Empty)(nil), "go.micro.broker.Empty") | ||||||
|  | 	proto.RegisterType((*PublishRequest)(nil), "go.micro.broker.PublishRequest") | ||||||
|  | 	proto.RegisterType((*SubscribeRequest)(nil), "go.micro.broker.SubscribeRequest") | ||||||
|  | 	proto.RegisterType((*Message)(nil), "go.micro.broker.Message") | ||||||
|  | 	proto.RegisterMapType((map[string]string)(nil), "go.micro.broker.Message.HeaderEntry") | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func init() { | ||||||
|  | 	proto.RegisterFile("github.com/micro/go-micro/broker/proto/broker.proto", fileDescriptor_5edf81766900dd99) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var fileDescriptor_5edf81766900dd99 = []byte{ | ||||||
|  | 	// 309 bytes of a gzipped FileDescriptorProto | ||||||
|  | 	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x91, 0xcf, 0x4a, 0xf3, 0x40, | ||||||
|  | 	0x14, 0xc5, 0x3b, 0xed, 0xd7, 0x86, 0xde, 0x7e, 0x68, 0x19, 0x8a, 0x84, 0x6e, 0x8c, 0xc1, 0x45, | ||||||
|  | 	0x36, 0x4e, 0x24, 0xdd, 0xa8, 0x88, 0x0b, 0xb1, 0xe0, 0x42, 0x41, 0xc6, 0x9d, 0xbb, 0x4c, 0x3a, | ||||||
|  | 	0x24, 0xa1, 0x8d, 0x93, 0x4e, 0x66, 0x84, 0xbc, 0x88, 0x2b, 0x1f, 0x56, 0x3a, 0x93, 0xfa, 0xa7, | ||||||
|  | 	0xa1, 0xee, 0xee, 0x49, 0x7e, 0x73, 0xee, 0xe1, 0x5c, 0x98, 0xa5, 0xb9, 0xca, 0x34, 0x23, 0x89, | ||||||
|  | 	0x28, 0xc2, 0x22, 0x4f, 0xa4, 0x08, 0x53, 0x71, 0x66, 0x07, 0x26, 0xc5, 0x92, 0xcb, 0xb0, 0x94, | ||||||
|  | 	0x42, 0x6d, 0x05, 0x31, 0x02, 0x1f, 0xa6, 0x82, 0x18, 0x86, 0xd8, 0xcf, 0xbe, 0x03, 0xfd, 0x79, | ||||||
|  | 	0x51, 0xaa, 0xda, 0x7f, 0x81, 0x83, 0x27, 0xcd, 0x56, 0x79, 0x95, 0x51, 0xbe, 0xd6, 0xbc, 0x52, | ||||||
|  | 	0x78, 0x02, 0x7d, 0x25, 0xca, 0x3c, 0x71, 0x91, 0x87, 0x82, 0x21, 0xb5, 0x02, 0x47, 0xe0, 0x14, | ||||||
|  | 	0xbc, 0xaa, 0xe2, 0x94, 0xbb, 0x5d, 0x0f, 0x05, 0xa3, 0xc8, 0x25, 0x3b, 0x9e, 0xe4, 0xd1, 0xfe, | ||||||
|  | 	0xa7, 0x5b, 0xd0, 0xbf, 0x81, 0xf1, 0xb3, 0x66, 0x55, 0x22, 0x73, 0xc6, 0xff, 0x76, 0x9f, 0x40, | ||||||
|  | 	0x7f, 0xad, 0xb9, 0xb6, 0xde, 0x43, 0x6a, 0x85, 0xff, 0x8e, 0xc0, 0x69, 0x4c, 0xf1, 0x35, 0x0c, | ||||||
|  | 	0x32, 0x1e, 0x2f, 0xb8, 0x74, 0x91, 0xd7, 0x0b, 0x46, 0xd1, 0xe9, 0xbe, 0xf5, 0xe4, 0xde, 0x60, | ||||||
|  | 	0xf3, 0x57, 0x25, 0x6b, 0xda, 0xbc, 0xc1, 0x18, 0xfe, 0x31, 0xb1, 0xa8, 0x8d, 0xfd, 0x7f, 0x6a, | ||||||
|  | 	0xe6, 0xe9, 0x25, 0x8c, 0x7e, 0xa0, 0x78, 0x0c, 0xbd, 0x25, 0xaf, 0x9b, 0x58, 0x9b, 0x71, 0x13, | ||||||
|  | 	0xea, 0x2d, 0x5e, 0x7d, 0x87, 0x32, 0xe2, 0xaa, 0x7b, 0x81, 0xa2, 0x0f, 0x04, 0x83, 0x5b, 0xb3, | ||||||
|  | 	0x15, 0xdf, 0x81, 0xd3, 0xf4, 0x87, 0x8f, 0x5b, 0x91, 0x7e, 0x37, 0x3b, 0x3d, 0x6a, 0x01, 0xf6, | ||||||
|  | 	0x06, 0x1d, 0xfc, 0x00, 0xc3, 0xaf, 0xa6, 0xf0, 0x49, 0x0b, 0xdb, 0x6d, 0x71, 0xba, 0xb7, 0x7c, | ||||||
|  | 	0xbf, 0x73, 0x8e, 0xd8, 0xc0, 0x1c, 0x7d, 0xf6, 0x19, 0x00, 0x00, 0xff, 0xff, 0x25, 0x38, 0xfa, | ||||||
|  | 	0x02, 0x2b, 0x02, 0x00, 0x00, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Reference imports to suppress errors if they are not otherwise used. | ||||||
|  | var _ context.Context | ||||||
|  | var _ grpc.ClientConn | ||||||
|  |  | ||||||
|  | // This is a compile-time assertion to ensure that this generated file | ||||||
|  | // is compatible with the grpc package it is being compiled against. | ||||||
|  | const _ = grpc.SupportPackageIsVersion4 | ||||||
|  |  | ||||||
|  | // BrokerClient is the client API for Broker service. | ||||||
|  | // | ||||||
|  | // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. | ||||||
|  | type BrokerClient interface { | ||||||
|  | 	Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*Empty, error) | ||||||
|  | 	Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Broker_SubscribeClient, error) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type brokerClient struct { | ||||||
|  | 	cc *grpc.ClientConn | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewBrokerClient(cc *grpc.ClientConn) BrokerClient { | ||||||
|  | 	return &brokerClient{cc} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *brokerClient) Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*Empty, error) { | ||||||
|  | 	out := new(Empty) | ||||||
|  | 	err := c.cc.Invoke(ctx, "/go.micro.broker.Broker/Publish", in, out, opts...) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return out, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (c *brokerClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (Broker_SubscribeClient, error) { | ||||||
|  | 	stream, err := c.cc.NewStream(ctx, &_Broker_serviceDesc.Streams[0], "/go.micro.broker.Broker/Subscribe", opts...) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	x := &brokerSubscribeClient{stream} | ||||||
|  | 	if err := x.ClientStream.SendMsg(in); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	if err := x.ClientStream.CloseSend(); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return x, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Broker_SubscribeClient interface { | ||||||
|  | 	Recv() (*Message, error) | ||||||
|  | 	grpc.ClientStream | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type brokerSubscribeClient struct { | ||||||
|  | 	grpc.ClientStream | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (x *brokerSubscribeClient) Recv() (*Message, error) { | ||||||
|  | 	m := new(Message) | ||||||
|  | 	if err := x.ClientStream.RecvMsg(m); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return m, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // BrokerServer is the server API for Broker service. | ||||||
|  | type BrokerServer interface { | ||||||
|  | 	Publish(context.Context, *PublishRequest) (*Empty, error) | ||||||
|  | 	Subscribe(*SubscribeRequest, Broker_SubscribeServer) error | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func RegisterBrokerServer(s *grpc.Server, srv BrokerServer) { | ||||||
|  | 	s.RegisterService(&_Broker_serviceDesc, srv) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func _Broker_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { | ||||||
|  | 	in := new(PublishRequest) | ||||||
|  | 	if err := dec(in); err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	if interceptor == nil { | ||||||
|  | 		return srv.(BrokerServer).Publish(ctx, in) | ||||||
|  | 	} | ||||||
|  | 	info := &grpc.UnaryServerInfo{ | ||||||
|  | 		Server:     srv, | ||||||
|  | 		FullMethod: "/go.micro.broker.Broker/Publish", | ||||||
|  | 	} | ||||||
|  | 	handler := func(ctx context.Context, req interface{}) (interface{}, error) { | ||||||
|  | 		return srv.(BrokerServer).Publish(ctx, req.(*PublishRequest)) | ||||||
|  | 	} | ||||||
|  | 	return interceptor(ctx, in, info, handler) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func _Broker_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { | ||||||
|  | 	m := new(SubscribeRequest) | ||||||
|  | 	if err := stream.RecvMsg(m); err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  | 	return srv.(BrokerServer).Subscribe(m, &brokerSubscribeServer{stream}) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Broker_SubscribeServer interface { | ||||||
|  | 	Send(*Message) error | ||||||
|  | 	grpc.ServerStream | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type brokerSubscribeServer struct { | ||||||
|  | 	grpc.ServerStream | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (x *brokerSubscribeServer) Send(m *Message) error { | ||||||
|  | 	return x.ServerStream.SendMsg(m) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var _Broker_serviceDesc = grpc.ServiceDesc{ | ||||||
|  | 	ServiceName: "go.micro.broker.Broker", | ||||||
|  | 	HandlerType: (*BrokerServer)(nil), | ||||||
|  | 	Methods: []grpc.MethodDesc{ | ||||||
|  | 		{ | ||||||
|  | 			MethodName: "Publish", | ||||||
|  | 			Handler:    _Broker_Publish_Handler, | ||||||
|  | 		}, | ||||||
|  | 	}, | ||||||
|  | 	Streams: []grpc.StreamDesc{ | ||||||
|  | 		{ | ||||||
|  | 			StreamName:    "Subscribe", | ||||||
|  | 			Handler:       _Broker_Subscribe_Handler, | ||||||
|  | 			ServerStreams: true, | ||||||
|  | 		}, | ||||||
|  | 	}, | ||||||
|  | 	Metadata: "github.com/micro/go-micro/broker/proto/broker.proto", | ||||||
|  | } | ||||||
							
								
								
									
										25
									
								
								broker/service/proto/broker.proto
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								broker/service/proto/broker.proto
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,25 @@ | |||||||
|  | syntax = "proto3"; | ||||||
|  |  | ||||||
|  | package go.micro.broker; | ||||||
|  |  | ||||||
|  | service Broker { | ||||||
|  | 	rpc Publish(PublishRequest) returns (Empty) {}; | ||||||
|  | 	rpc Subscribe(SubscribeRequest) returns (stream Message) {}; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | message Empty {} | ||||||
|  |  | ||||||
|  | message PublishRequest { | ||||||
|  | 	string topic = 1; | ||||||
|  | 	Message message = 2; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | message SubscribeRequest { | ||||||
|  | 	string topic = 1; | ||||||
|  | 	string queue = 2; | ||||||
|  | } | ||||||
|  |  | ||||||
|  | message Message { | ||||||
|  | 	map<string,string> header = 1; | ||||||
|  | 	bytes body = 2; | ||||||
|  | } | ||||||
							
								
								
									
										132
									
								
								broker/service/service.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										132
									
								
								broker/service/service.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,132 @@ | |||||||
|  | // Package service provides the broker service client | ||||||
|  | package service | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
|  | 	"github.com/micro/go-micro/broker" | ||||||
|  | 	pb "github.com/micro/go-micro/broker/service/proto" | ||||||
|  | 	"github.com/micro/go-micro/client" | ||||||
|  | 	"github.com/micro/go-micro/util/log" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type serviceBroker struct { | ||||||
|  | 	Addrs   []string | ||||||
|  | 	Client  pb.BrokerService | ||||||
|  | 	options broker.Options | ||||||
|  | } | ||||||
|  |  | ||||||
|  | var ( | ||||||
|  | 	DefaultName = "go.micro.broker" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | func (b *serviceBroker) Address() string { | ||||||
|  | 	return b.Addrs[0] | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (b *serviceBroker) Connect() error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (b *serviceBroker) Disconnect() error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (b *serviceBroker) Init(opts ...broker.Option) error { | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&b.options) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (b *serviceBroker) Options() broker.Options { | ||||||
|  | 	return b.options | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (b *serviceBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { | ||||||
|  | 	log.Debugf("Publishing to topic %s broker %v", topic, b.Addrs) | ||||||
|  | 	_, err := b.Client.Publish(context.TODO(), &pb.PublishRequest{ | ||||||
|  | 		Topic: topic, | ||||||
|  | 		Message: &pb.Message{ | ||||||
|  | 			Header: msg.Header, | ||||||
|  | 			Body:   msg.Body, | ||||||
|  | 		}, | ||||||
|  | 	}, client.WithAddress(b.Addrs...)) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (b *serviceBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { | ||||||
|  | 	var options broker.SubscribeOptions | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  | 	log.Debugf("Subscribing to topic %s queue %s broker %v", topic, options.Queue, b.Addrs) | ||||||
|  | 	stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ | ||||||
|  | 		Topic: topic, | ||||||
|  | 		Queue: options.Queue, | ||||||
|  | 	}, client.WithAddress(b.Addrs...), client.WithRequestTimeout(time.Hour)) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	sub := &serviceSub{ | ||||||
|  | 		topic:   topic, | ||||||
|  | 		queue:   options.Queue, | ||||||
|  | 		handler: handler, | ||||||
|  | 		stream:  stream, | ||||||
|  | 		closed:  make(chan bool), | ||||||
|  | 		options: options, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	go func() { | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-sub.closed: | ||||||
|  | 				log.Debugf("Unsubscribed from topic %s", topic) | ||||||
|  | 				return | ||||||
|  | 			default: | ||||||
|  | 				// run the subscriber | ||||||
|  | 				log.Debugf("Streaming from broker %v to topic [%s] queue [%s]", b.Addrs, topic, options.Queue) | ||||||
|  | 				if err := sub.run(); err != nil { | ||||||
|  | 					log.Debugf("Resubscribing to topic %s broker %v", topic, b.Addrs) | ||||||
|  | 					stream, err := b.Client.Subscribe(context.TODO(), &pb.SubscribeRequest{ | ||||||
|  | 						Topic: topic, | ||||||
|  | 						Queue: options.Queue, | ||||||
|  | 					}, client.WithAddress(b.Addrs...), client.WithRequestTimeout(time.Hour)) | ||||||
|  | 					if err != nil { | ||||||
|  | 						log.Debugf("Failed to resubscribe to topic %s: %v", topic, err) | ||||||
|  | 						time.Sleep(time.Second) | ||||||
|  | 						continue | ||||||
|  | 					} | ||||||
|  | 					// new stream | ||||||
|  | 					sub.stream = stream | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	return sub, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (b *serviceBroker) String() string { | ||||||
|  | 	return "service" | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewBroker(opts ...broker.Option) broker.Broker { | ||||||
|  | 	var options broker.Options | ||||||
|  | 	for _, o := range opts { | ||||||
|  | 		o(&options) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	addrs := options.Addrs | ||||||
|  | 	if len(addrs) == 0 { | ||||||
|  | 		addrs = []string{"127.0.0.1:8001"} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return &serviceBroker{ | ||||||
|  | 		Addrs:   addrs, | ||||||
|  | 		Client:  pb.NewBrokerService(DefaultName, client.DefaultClient), | ||||||
|  | 		options: options, | ||||||
|  | 	} | ||||||
|  | } | ||||||
							
								
								
									
										101
									
								
								broker/service/subscriber.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										101
									
								
								broker/service/subscriber.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,101 @@ | |||||||
|  | package service | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"github.com/micro/go-micro/broker" | ||||||
|  | 	pb "github.com/micro/go-micro/broker/service/proto" | ||||||
|  | 	"github.com/micro/go-micro/util/log" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | type serviceSub struct { | ||||||
|  | 	topic   string | ||||||
|  | 	queue   string | ||||||
|  | 	handler broker.Handler | ||||||
|  | 	stream  pb.Broker_SubscribeService | ||||||
|  | 	closed  chan bool | ||||||
|  | 	options broker.SubscribeOptions | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type serviceEvent struct { | ||||||
|  | 	topic   string | ||||||
|  | 	message *broker.Message | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceEvent) Topic() string { | ||||||
|  | 	return s.topic | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceEvent) Message() *broker.Message { | ||||||
|  | 	return s.message | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceEvent) Ack() error { | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceSub) isClosed() bool { | ||||||
|  | 	select { | ||||||
|  | 	case <-s.closed: | ||||||
|  | 		return true | ||||||
|  | 	default: | ||||||
|  | 		return false | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceSub) run() error { | ||||||
|  | 	exit := make(chan bool) | ||||||
|  | 	go func() { | ||||||
|  | 		select { | ||||||
|  | 		case <-exit: | ||||||
|  | 		case <-s.closed: | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// close the stream | ||||||
|  | 		s.stream.Close() | ||||||
|  | 	}() | ||||||
|  |  | ||||||
|  | 	for { | ||||||
|  | 		// TODO: do not fail silently | ||||||
|  | 		msg, err := s.stream.Recv() | ||||||
|  | 		if err != nil { | ||||||
|  | 			log.Debugf("Streaming error for subcription to topic %s: %v", s.Topic(), err) | ||||||
|  |  | ||||||
|  | 			// close the exit channel | ||||||
|  | 			close(exit) | ||||||
|  |  | ||||||
|  | 			// don't return an error if we unsubscribed | ||||||
|  | 			if s.isClosed() { | ||||||
|  | 				return nil | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			// return stream error | ||||||
|  | 			return err | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// TODO: handle error | ||||||
|  | 		s.handler(&serviceEvent{ | ||||||
|  | 			topic: s.topic, | ||||||
|  | 			message: &broker.Message{ | ||||||
|  | 				Header: msg.Header, | ||||||
|  | 				Body:   msg.Body, | ||||||
|  | 			}, | ||||||
|  | 		}) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceSub) Options() broker.SubscribeOptions { | ||||||
|  | 	return s.options | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceSub) Topic() string { | ||||||
|  | 	return s.topic | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (s *serviceSub) Unsubscribe() error { | ||||||
|  | 	select { | ||||||
|  | 	case <-s.closed: | ||||||
|  | 		return nil | ||||||
|  | 	default: | ||||||
|  | 		close(s.closed) | ||||||
|  | 	} | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
| @@ -23,6 +23,7 @@ import ( | |||||||
| 	"github.com/micro/go-micro/broker/http" | 	"github.com/micro/go-micro/broker/http" | ||||||
| 	"github.com/micro/go-micro/broker/memory" | 	"github.com/micro/go-micro/broker/memory" | ||||||
| 	"github.com/micro/go-micro/broker/nats" | 	"github.com/micro/go-micro/broker/nats" | ||||||
|  | 	brokerSrv "github.com/micro/go-micro/broker/service" | ||||||
|  |  | ||||||
| 	// registries | 	// registries | ||||||
| 	"github.com/micro/go-micro/registry" | 	"github.com/micro/go-micro/registry" | ||||||
| @@ -179,6 +180,8 @@ var ( | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	DefaultBrokers = map[string]func(...broker.Option) broker.Broker{ | 	DefaultBrokers = map[string]func(...broker.Option) broker.Broker{ | ||||||
|  | 		"go.micro.broker": brokerSrv.NewBroker, | ||||||
|  | 		"service":         brokerSrv.NewBroker, | ||||||
| 		"http":            http.NewBroker, | 		"http":            http.NewBroker, | ||||||
| 		"memory":          memory.NewBroker, | 		"memory":          memory.NewBroker, | ||||||
| 		"nats":            nats.NewBroker, | 		"nats":            nats.NewBroker, | ||||||
|   | |||||||
							
								
								
									
										16
									
								
								registry/cache/cache.go
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										16
									
								
								registry/cache/cache.go
									
									
									
									
										vendored
									
									
								
							| @@ -100,7 +100,7 @@ func (c *cache) quit() bool { | |||||||
|  |  | ||||||
| func (c *cache) del(service string) { | func (c *cache) del(service string) { | ||||||
| 	// don't blow away cache in error state | 	// don't blow away cache in error state | ||||||
| 	if err := c.getStatus(); err != nil { | 	if err := c.status; err != nil { | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	// otherwise delete entries | 	// otherwise delete entries | ||||||
| @@ -116,14 +116,13 @@ func (c *cache) get(service string) ([]*registry.Service, error) { | |||||||
| 	services := c.cache[service] | 	services := c.cache[service] | ||||||
| 	// get cache ttl | 	// get cache ttl | ||||||
| 	ttl := c.ttls[service] | 	ttl := c.ttls[service] | ||||||
|  |  | ||||||
| 	// got services && within ttl so return cache |  | ||||||
| 	if c.isValid(services, ttl) { |  | ||||||
| 	// make a copy | 	// make a copy | ||||||
| 	cp := registry.Copy(services) | 	cp := registry.Copy(services) | ||||||
| 		// unlock the read |  | ||||||
|  | 	// got services && within ttl so return cache | ||||||
|  | 	if c.isValid(cp, ttl) { | ||||||
| 		c.RUnlock() | 		c.RUnlock() | ||||||
| 		// return servics | 		// return services | ||||||
| 		return cp, nil | 		return cp, nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -136,8 +135,9 @@ func (c *cache) get(service string) ([]*registry.Service, error) { | |||||||
| 			if len(cached) > 0 { | 			if len(cached) > 0 { | ||||||
| 				// set the error status | 				// set the error status | ||||||
| 				c.setStatus(err) | 				c.setStatus(err) | ||||||
|  |  | ||||||
| 				// return the stale cache | 				// return the stale cache | ||||||
| 				return registry.Copy(cached), nil | 				return cached, nil | ||||||
| 			} | 			} | ||||||
| 			// otherwise return error | 			// otherwise return error | ||||||
| 			return nil, err | 			return nil, err | ||||||
| @@ -165,7 +165,7 @@ func (c *cache) get(service string) ([]*registry.Service, error) { | |||||||
| 	c.RUnlock() | 	c.RUnlock() | ||||||
|  |  | ||||||
| 	// get and return services | 	// get and return services | ||||||
| 	return get(service, services) | 	return get(service, cp) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (c *cache) set(service string, services []*registry.Service) { | func (c *cache) set(service string, services []*registry.Service) { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user