diff --git a/broker/broker.go b/broker/broker.go index 2d02eae5..4e124fb1 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -1,24 +1,19 @@ package broker -import ( - "code.google.com/p/go-uuid/uuid" - "golang.org/x/net/context" -) - type Broker interface { Address() string Connect() error Disconnect() error Init() error - Publish(context.Context, string, []byte) error - Subscribe(string, func(context.Context, *Message)) (Subscriber, error) + Publish(string, *Message) error + Subscribe(string, Handler) (Subscriber, error) } +type Handler func(*Message) + type Message struct { - Id string - Timestamp int64 - Topic string - Body []byte + Header map[string]string + Body []byte } type Subscriber interface { @@ -31,24 +26,14 @@ type options struct{} type Option func(*options) var ( - Address string - Id string - DefaultBroker Broker + DefaultBroker Broker = newHttpBroker([]string{}) ) func NewBroker(addrs []string, opt ...Option) Broker { - return newHttpBroker([]string{Address}, opt...) + return newHttpBroker(addrs, opt...) } func Init() error { - if len(Id) == 0 { - Id = "broker-" + uuid.NewUUID().String() - } - - if DefaultBroker == nil { - DefaultBroker = newHttpBroker([]string{Address}) - } - return DefaultBroker.Init() } @@ -60,10 +45,10 @@ func Disconnect() error { return DefaultBroker.Disconnect() } -func Publish(ctx context.Context, topic string, body []byte) error { - return DefaultBroker.Publish(ctx, topic, body) +func Publish(topic string, msg *Message) error { + return DefaultBroker.Publish(topic, msg) } -func Subscribe(topic string, function func(context.Context, *Message)) (Subscriber, error) { - return DefaultBroker.Subscribe(topic, function) +func Subscribe(topic string, handler Handler) (Subscriber, error) { + return DefaultBroker.Subscribe(topic, handler) } diff --git a/broker/http_broker.go b/broker/http_broker.go index 455e8858..fb0d2623 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -7,21 +7,14 @@ import ( "io/ioutil" "net" "net/http" - "os" - "os/signal" "strconv" "strings" "sync" - "syscall" - "time" "code.google.com/p/go-uuid/uuid" log "github.com/golang/glog" - c "github.com/myodc/go-micro/context" "github.com/myodc/go-micro/errors" "github.com/myodc/go-micro/registry" - - "golang.org/x/net/context" ) type httpBroker struct { @@ -39,28 +32,22 @@ type httpSubscriber struct { id string topic string ch chan *httpSubscriber - fn func(context.Context, *Message) + fn Handler svc *registry.Service } -// used in brokers where there is no support for headers -type envelope struct { - Header map[string]string - Message *Message -} - var ( DefaultSubPath = "/_sub" ) func newHttpBroker(addrs []string, opt ...Option) Broker { addr := ":0" - if len(addrs) > 0 { + if len(addrs) > 0 && len(addrs[0]) > 0 { addr = addrs[0] } return &httpBroker{ - id: Id, + id: "broker-" + uuid.NewUUID().String(), address: addr, subscribers: make(map[string][]*httpSubscriber), unsubscribe: make(chan *httpSubscriber), @@ -96,9 +83,6 @@ func (h *httpBroker) start() error { go http.Serve(l, h) go func() { - ce := make(chan os.Signal, 1) - signal.Notify(ce, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) - for { select { case ch := <-h.exit: @@ -107,8 +91,6 @@ func (h *httpBroker) start() error { h.running = false h.Unlock() return - case <-ce: - h.stop() case subscriber := <-h.unsubscribe: h.Lock() var subscribers []*httpSubscriber @@ -150,26 +132,27 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - var e *envelope - if err = json.Unmarshal(b, &e); err != nil { + var m *Message + if err = json.Unmarshal(b, &m); err != nil { errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error parsing request body: %v", err)) w.WriteHeader(500) w.Write([]byte(errr.Error())) return } - if len(e.Message.Topic) == 0 { + topic := m.Header[":topic"] + delete(m.Header, ":topic") + + if len(topic) == 0 { errr := errors.InternalServerError("go.micro.broker", "Topic not found") w.WriteHeader(500) w.Write([]byte(errr.Error())) return } - ctx := c.WithMetadata(context.Background(), e.Header) - h.RLock() - for _, subscriber := range h.subscribers[e.Message.Topic] { - subscriber.fn(ctx, e.Message) + for _, subscriber := range h.subscribers[topic] { + subscriber.fn(m) } h.RUnlock() } @@ -195,26 +178,14 @@ func (h *httpBroker) Init() error { return nil } -func (h *httpBroker) Publish(ctx context.Context, topic string, body []byte) error { +func (h *httpBroker) Publish(topic string, msg *Message) error { s, err := registry.GetService("topic:" + topic) if err != nil { return err } - message := &Message{ - Id: uuid.NewUUID().String(), - Timestamp: time.Now().Unix(), - Topic: topic, - Body: body, - } - - header, _ := c.GetMetadata(ctx) - - b, err := json.Marshal(&envelope{ - header, - message, - }) - + msg.Header[":topic"] = topic + b, err := json.Marshal(msg) if err != nil { return err } @@ -229,7 +200,7 @@ func (h *httpBroker) Publish(ctx context.Context, topic string, body []byte) err return nil } -func (h *httpBroker) Subscribe(topic string, function func(context.Context, *Message)) (Subscriber, error) { +func (h *httpBroker) Subscribe(topic string, handler Handler) (Subscriber, error) { // parse address for host, port parts := strings.Split(h.Address(), ":") host := strings.Join(parts[:len(parts)-1], ":") @@ -251,11 +222,10 @@ func (h *httpBroker) Subscribe(topic string, function func(context.Context, *Mes id: uuid.NewUUID().String(), topic: topic, ch: h.unsubscribe, - fn: function, + fn: handler, svc: service, } - log.Infof("Registering subscriber %s", node.Id) if err := registry.Register(service); err != nil { return nil, err } diff --git a/broker/nats/nats.go b/broker/nats/nats.go index 0f63a17a..d3b50b89 100644 --- a/broker/nats/nats.go +++ b/broker/nats/nats.go @@ -3,14 +3,9 @@ package nats import ( "encoding/json" "strings" - "time" - "code.google.com/p/go-uuid/uuid" "github.com/apcera/nats" "github.com/myodc/go-micro/broker" - c "github.com/myodc/go-micro/context" - - "golang.org/x/net/context" ) type nbroker struct { @@ -22,12 +17,6 @@ type subscriber struct { s *nats.Subscription } -// used in brokers where there is no support for headers -type envelope struct { - Header map[string]string - Message *broker.Message -} - func (n *subscriber) Topic() string { return n.s.Subject } @@ -67,34 +56,21 @@ func (n *nbroker) Init() error { return nil } -func (n *nbroker) Publish(ctx context.Context, topic string, body []byte) error { - header, _ := c.GetMetadata(ctx) - - message := &broker.Message{ - Id: uuid.NewUUID().String(), - Timestamp: time.Now().Unix(), - Topic: topic, - Body: body, - } - - b, err := json.Marshal(&envelope{ - header, - message, - }) +func (n *nbroker) Publish(topic string, msg *broker.Message) error { + b, err := json.Marshal(msg) if err != nil { return err } return n.conn.Publish(topic, b) } -func (n *nbroker) Subscribe(topic string, function func(context.Context, *broker.Message)) (broker.Subscriber, error) { +func (n *nbroker) Subscribe(topic string, handler broker.Handler) (broker.Subscriber, error) { sub, err := n.conn.Subscribe(topic, func(msg *nats.Msg) { - var e *envelope - if err := json.Unmarshal(msg.Data, &e); err != nil { + var m *broker.Message + if err := json.Unmarshal(msg.Data, &m); err != nil { return } - ctx := c.WithMetadata(context.Background(), e.Header) - function(ctx, e.Message) + handler(m) }) if err != nil { return nil, err diff --git a/broker/rabbitmq/rabbitmq.go b/broker/rabbitmq/rabbitmq.go index 903dfcb5..426bf831 100644 --- a/broker/rabbitmq/rabbitmq.go +++ b/broker/rabbitmq/rabbitmq.go @@ -1,13 +1,8 @@ package rabbitmq import ( - "time" - - "code.google.com/p/go-uuid/uuid" "github.com/myodc/go-micro/broker" - c "github.com/myodc/go-micro/context" "github.com/streadway/amqp" - "golang.org/x/net/context" ) type rbroker struct { @@ -28,24 +23,20 @@ func (s *subscriber) Unsubscribe() error { return s.ch.Close() } -func (r *rbroker) Publish(ctx context.Context, topic string, body []byte) error { - header, _ := c.GetMetadata(ctx) - - msg := amqp.Publishing{ - MessageId: uuid.NewUUID().String(), - Timestamp: time.Now().UTC(), - Body: body, - Headers: amqp.Table{}, +func (r *rbroker) Publish(topic string, msg *broker.Message) error { + m := amqp.Publishing{ + Body: msg.Body, + Headers: amqp.Table{}, } - for k, v := range header { - msg.Headers[k] = v + for k, v := range msg.Header { + m.Headers[k] = v } - return r.conn.Publish("", topic, msg) + return r.conn.Publish("", topic, m) } -func (r *rbroker) Subscribe(topic string, function func(context.Context, *broker.Message)) (broker.Subscriber, error) { +func (r *rbroker) Subscribe(topic string, handler broker.Handler) (broker.Subscriber, error) { ch, sub, err := r.conn.Consume(topic) if err != nil { return nil, err @@ -56,12 +47,9 @@ func (r *rbroker) Subscribe(topic string, function func(context.Context, *broker for k, v := range msg.Headers { header[k], _ = v.(string) } - ctx := c.WithMetadata(context.Background(), header) - function(ctx, &broker.Message{ - Id: msg.MessageId, - Timestamp: msg.Timestamp.Unix(), - Topic: topic, - Body: msg.Body, + handler(&broker.Message{ + Header: header, + Body: msg.Body, }) } diff --git a/client/client.go b/client/client.go index f6720cdf..8e7a8e4e 100644 --- a/client/client.go +++ b/client/client.go @@ -1,19 +1,32 @@ package client import ( - "github.com/myodc/go-micro/registry" - "github.com/myodc/go-micro/transport" "golang.org/x/net/context" ) type Client interface { - NewRequest(string, string, interface{}) Request - NewProtoRequest(string, string, interface{}) Request - NewJsonRequest(string, string, interface{}) Request - Call(context.Context, Request, interface{}) error - CallRemote(context.Context, string, Request, interface{}) error - Stream(context.Context, Request, interface{}) (Streamer, error) - StreamRemote(context.Context, string, Request, interface{}) (Streamer, error) + NewPublication(topic string, msg interface{}) Publication + NewRequest(service, method string, req interface{}) Request + NewProtoRequest(service, method string, req interface{}) Request + NewJsonRequest(service, method string, req interface{}) Request + Call(ctx context.Context, req Request, rsp interface{}) error + CallRemote(ctx context.Context, addr string, req Request, rsp interface{}) error + Stream(ctx context.Context, req Request, rspChan interface{}) (Streamer, error) + StreamRemote(ctx context.Context, addr string, req Request, rspChan interface{}) (Streamer, error) + Publish(ctx context.Context, p Publication) error +} + +type Publication interface { + Topic() string + Message() interface{} + ContentType() string +} + +type Request interface { + Service() string + Method() string + ContentType() string + Request() interface{} } type Streamer interface { @@ -22,29 +35,12 @@ type Streamer interface { Close() error } -type options struct { - registry registry.Registry - transport transport.Transport -} - type Option func(*options) var ( DefaultClient Client = newRpcClient() ) -func Registry(r registry.Registry) Option { - return func(o *options) { - o.registry = r - } -} - -func Transport(t transport.Transport) Option { - return func(o *options) { - o.transport = t - } -} - func Call(ctx context.Context, request Request, response interface{}) error { return DefaultClient.Call(ctx, request, response) } @@ -61,10 +57,18 @@ func StreamRemote(ctx context.Context, address string, request Request, response return DefaultClient.StreamRemote(ctx, address, request, responseChan) } +func Publish(ctx context.Context, p Publication) error { + return DefaultClient.Publish(ctx, p) +} + func NewClient(opt ...Option) Client { return newRpcClient(opt...) } +func NewPublication(topic string, message interface{}) Publication { + return DefaultClient.NewPublication(topic, message) +} + func NewRequest(service, method string, request interface{}) Request { return DefaultClient.NewRequest(service, method, request) } diff --git a/client/options.go b/client/options.go new file mode 100644 index 00000000..419c5a57 --- /dev/null +++ b/client/options.go @@ -0,0 +1,31 @@ +package client + +import ( + "github.com/myodc/go-micro/broker" + "github.com/myodc/go-micro/registry" + "github.com/myodc/go-micro/transport" +) + +type options struct { + broker broker.Broker + registry registry.Registry + transport transport.Transport +} + +func Broker(b broker.Broker) Option { + return func(o *options) { + o.broker = b + } +} + +func Registry(r registry.Registry) Option { + return func(o *options) { + o.registry = r + } +} + +func Transport(t transport.Transport) Option { + return func(o *options) { + o.transport = t + } +} diff --git a/client/request.go b/client/request.go deleted file mode 100644 index 7a2b7607..00000000 --- a/client/request.go +++ /dev/null @@ -1,8 +0,0 @@ -package client - -type Request interface { - Service() string - Method() string - ContentType() string - Request() interface{} -} diff --git a/client/rpc_client.go b/client/rpc_client.go index 6c185456..1f4b9309 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -1,11 +1,14 @@ package client import ( + "encoding/json" "fmt" "math/rand" "net/http" + "sync" "time" + "github.com/myodc/go-micro/broker" c "github.com/myodc/go-micro/context" "github.com/myodc/go-micro/errors" "github.com/myodc/go-micro/registry" @@ -13,6 +16,7 @@ import ( rpc "github.com/youtube/vitess/go/rpcplus" + "github.com/golang/protobuf/proto" "golang.org/x/net/context" ) @@ -21,6 +25,7 @@ type headerRoundTripper struct { } type rpcClient struct { + once sync.Once opts options } @@ -39,7 +44,14 @@ func newRpcClient(opt ...Option) Client { opts.transport = transport.DefaultTransport } + if opts.broker == nil { + opts.broker = broker.DefaultBroker + } + + var once sync.Once + return &rpcClient{ + once: once, opts: opts, } } @@ -152,6 +164,48 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in return r.stream(ctx, address, request, responseChan) } +func (r *rpcClient) Publish(ctx context.Context, p Publication) error { + md, ok := c.GetMetadata(ctx) + if !ok { + md = make(map[string]string) + } + md["Content-Type"] = p.ContentType() + + // encode message body + var body []byte + + switch p.ContentType() { + case "application/octet-stream": + b, err := proto.Marshal(p.Message().(proto.Message)) + if err != nil { + return err + } + body = b + case "application/json": + b, err := json.Marshal(p.Message()) + if err != nil { + return err + } + body = b + } + + r.once.Do(func() { + r.opts.broker.Connect() + }) + + return r.opts.broker.Publish(p.Topic(), &broker.Message{ + Header: md, + Body: body, + }) +} + +func (r *rpcClient) NewPublication(topic string, message interface{}) Publication { + return r.NewProtoPublication(topic, message) +} + +func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication { + return newRpcPublication(topic, message, "application/octet-stream") +} func (r *rpcClient) NewRequest(service, method string, request interface{}) Request { return r.NewProtoRequest(service, method, request) } diff --git a/client/rpc_publication.go b/client/rpc_publication.go new file mode 100644 index 00000000..01a0073e --- /dev/null +++ b/client/rpc_publication.go @@ -0,0 +1,27 @@ +package client + +type rpcPublication struct { + topic string + contentType string + message interface{} +} + +func newRpcPublication(topic string, message interface{}, contentType string) Publication { + return &rpcPublication{ + message: message, + topic: topic, + contentType: contentType, + } +} + +func (r *rpcPublication) ContentType() string { + return r.contentType +} + +func (r *rpcPublication) Topic() string { + return r.topic +} + +func (r *rpcPublication) Message() interface{} { + return r.message +} diff --git a/examples/client/main.go b/examples/client/main.go index 5cbbe331..0c13a4ad 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -10,6 +10,26 @@ import ( "golang.org/x/net/context" ) +func pub() { + msg := client.NewPublication("topic.go.micro.srv.example", &example.Message{ + Say: "This is a publication", + }) + + // create context with metadata + ctx := c.WithMetadata(context.Background(), map[string]string{ + "X-User-Id": "john", + "X-From-Id": "script", + }) + + // publish message + if err := client.Publish(ctx, msg); err != nil { + fmt.Println("pub err: ", err) + return + } + + fmt.Printf("Published: %v\n", msg) +} + func call(i int) { // Create new request to service go.micro.srv.example, method Example.Call req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{ @@ -26,7 +46,7 @@ func call(i int) { // Call service if err := client.Call(ctx, req, rsp); err != nil { - fmt.Println("err: ", err, rsp) + fmt.Println("call err: ", err, rsp) return } @@ -52,7 +72,7 @@ func stream() { } if stream.Error() != nil { - fmt.Println("err:", err) + fmt.Println("stream err:", err) return } @@ -67,4 +87,5 @@ func main() { } stream() + pub() } diff --git a/examples/pubsub/main.go b/examples/pubsub/main.go index 9bfd85aa..b531a05e 100644 --- a/examples/pubsub/main.go +++ b/examples/pubsub/main.go @@ -7,9 +7,6 @@ import ( log "github.com/golang/glog" "github.com/myodc/go-micro/broker" "github.com/myodc/go-micro/cmd" - c "github.com/myodc/go-micro/context" - - "golang.org/x/net/context" ) var ( @@ -20,24 +17,24 @@ func pub() { tick := time.NewTicker(time.Second) i := 0 for _ = range tick.C { - ctx := c.WithMetadata(context.Background(), map[string]string{ - "id": fmt.Sprintf("%d", i), - }) - - msg := fmt.Sprintf("%d: %s", i, time.Now().String()) - if err := broker.Publish(ctx, topic, []byte(msg)); err != nil { + msg := &broker.Message{ + Header: map[string]string{ + "id": fmt.Sprintf("%d", i), + }, + Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), + } + if err := broker.Publish(topic, msg); err != nil { log.Errorf("[pub] failed: %v", err) } else { - fmt.Println("[pub] pubbed message:", msg) + fmt.Println("[pub] pubbed message:", string(msg.Body)) } i++ } } func sub() { - _, err := broker.Subscribe(topic, func(ctx context.Context, msg *broker.Message) { - md, _ := c.GetMetadata(ctx) - fmt.Println("[sub] received message:", string(msg.Body), "context", md) + _, err := broker.Subscribe(topic, func(msg *broker.Message) { + fmt.Println("[sub] received message:", string(msg.Body), "header", msg.Header) }) if err != nil { fmt.Println(err) diff --git a/examples/server/main.go b/examples/server/main.go index cc3c6ea1..d94f1444 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -4,6 +4,7 @@ import ( log "github.com/golang/glog" "github.com/myodc/go-micro/cmd" "github.com/myodc/go-micro/examples/server/handler" + "github.com/myodc/go-micro/examples/server/subscriber" "github.com/myodc/go-micro/server" ) @@ -23,6 +24,21 @@ func main() { ), ) + // Register Subscribers + server.Subscribe( + server.NewSubscriber( + "topic.go.micro.srv.example", + new(subscriber.Example), + ), + ) + + server.Subscribe( + server.NewSubscriber( + "topic.go.micro.srv.example", + subscriber.Handler, + ), + ) + // Run server if err := server.Run(); err != nil { log.Fatal(err) diff --git a/examples/server/proto/example/example.pb.go b/examples/server/proto/example/example.pb.go index 33ebb3bf..ea9e3f36 100644 --- a/examples/server/proto/example/example.pb.go +++ b/examples/server/proto/example/example.pb.go @@ -9,6 +9,7 @@ It is generated from these files: go-micro/examples/server/proto/example/example.proto It has these top-level messages: + Message Request Response StreamingRequest @@ -21,6 +22,14 @@ import proto "github.com/golang/protobuf/proto" // Reference imports to suppress errors if they are not otherwise used. var _ = proto.Marshal +type Message struct { + Say string `protobuf:"bytes,1,opt,name=say" json:"say,omitempty"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} + type Request struct { Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` } diff --git a/examples/server/proto/example/example.proto b/examples/server/proto/example/example.proto index 676613fd..23ad31bc 100644 --- a/examples/server/proto/example/example.proto +++ b/examples/server/proto/example/example.proto @@ -1,5 +1,9 @@ syntax = "proto3"; +message Message { + string say = 1; +} + message Request { string name = 1; } diff --git a/examples/server/subscriber/subscriber.go b/examples/server/subscriber/subscriber.go new file mode 100644 index 00000000..675a3999 --- /dev/null +++ b/examples/server/subscriber/subscriber.go @@ -0,0 +1,18 @@ +package subscriber + +import ( + log "github.com/golang/glog" + example "github.com/myodc/go-micro/examples/server/proto/example" + "golang.org/x/net/context" +) + +type Example struct{} + +func (e *Example) Handle(ctx context.Context, msg *example.Message) error { + log.Info("Handler Received message: ", msg.Say) + return nil +} + +func Handler(msg *example.Message) { + log.Info("Function Received message: ", msg.Say) +} diff --git a/registry/service.go b/registry/service.go index ac806fd8..319883c5 100644 --- a/registry/service.go +++ b/registry/service.go @@ -19,6 +19,7 @@ type Endpoint struct { Name string Request *Value Response *Value + Metadata map[string]string } type Value struct { diff --git a/server/extractor.go b/server/extractor.go index 9660b24f..4809913b 100644 --- a/server/extractor.go +++ b/server/extractor.go @@ -1,6 +1,7 @@ package server import ( + "fmt" "reflect" "github.com/myodc/go-micro/registry" @@ -37,7 +38,7 @@ func extractEndpoint(method reflect.Method) *registry.Endpoint { } var rspType, reqType reflect.Type - // var stream bool + var stream bool mt := method.Type switch mt.NumIn() { @@ -51,9 +52,9 @@ func extractEndpoint(method reflect.Method) *registry.Endpoint { return nil } - // if rspType.Kind() == reflect.Func { - // stream = true - // } + if rspType.Kind() == reflect.Func { + stream = true + } request := extractValue(reqType) response := extractValue(rspType) @@ -62,5 +63,21 @@ func extractEndpoint(method reflect.Method) *registry.Endpoint { Name: method.Name, Request: request, Response: response, + Metadata: map[string]string{ + "stream": fmt.Sprintf("%v", stream), + }, } } + +func extractSubValue(typ reflect.Type) *registry.Value { + var reqType reflect.Type + switch typ.NumIn() { + case 1: + reqType = typ.In(0) + case 2: + reqType = typ.In(1) + default: + return nil + } + return extractValue(reqType) +} diff --git a/server/handler.go b/server/handler.go index 4ca9cf84..1a50b810 100644 --- a/server/handler.go +++ b/server/handler.go @@ -9,3 +9,9 @@ type Handler interface { Handler() interface{} Endpoints() []*registry.Endpoint } + +type Subscriber interface { + Topic() string + Subscriber() interface{} + Endpoints() []*registry.Endpoint +} diff --git a/server/options.go b/server/options.go index 2cca49ee..633c28be 100644 --- a/server/options.go +++ b/server/options.go @@ -1,11 +1,13 @@ package server import ( + "github.com/myodc/go-micro/broker" "github.com/myodc/go-micro/registry" "github.com/myodc/go-micro/transport" ) type options struct { + broker broker.Broker registry registry.Registry transport transport.Transport metadata map[string]string @@ -22,6 +24,10 @@ func newOptions(opt ...Option) options { o(&opts) } + if opts.broker == nil { + opts.broker = broker.DefaultBroker + } + if opts.registry == nil { opts.registry = registry.DefaultRegistry } @@ -93,6 +99,12 @@ func Address(a string) Option { } } +func Broker(b broker.Broker) Option { + return func(o *options) { + o.broker = b + } +} + func Registry(r registry.Registry) Option { return func(o *options) { o.registry = r diff --git a/server/rpc_server.go b/server/rpc_server.go index 7d6e0941..ae34a02b 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -1,10 +1,12 @@ package server import ( + "fmt" "strconv" "strings" "sync" + "github.com/myodc/go-micro/broker" c "github.com/myodc/go-micro/context" "github.com/myodc/go-micro/registry" "github.com/myodc/go-micro/transport" @@ -20,16 +22,18 @@ type rpcServer struct { exit chan chan error sync.RWMutex - opts options - handlers map[string]Handler + opts options + handlers map[string]Handler + subscribers map[*subscriber][]broker.Subscriber } func newRpcServer(opts ...Option) Server { return &rpcServer{ - opts: newOptions(opts...), - rpc: rpc.NewServer(), - handlers: make(map[string]Handler), - exit: make(chan chan error), + opts: newOptions(opts...), + rpc: rpc.NewServer(), + handlers: make(map[string]Handler), + subscribers: make(map[*subscriber][]broker.Subscriber), + exit: make(chan chan error), } } @@ -84,6 +88,29 @@ func (s *rpcServer) Handle(h Handler) error { return nil } +func (s *rpcServer) NewSubscriber(topic string, sb interface{}) Subscriber { + return newSubscriber(topic, sb) +} + +func (s *rpcServer) Subscribe(sb Subscriber) error { + sub, ok := sb.(*subscriber) + if !ok { + return fmt.Errorf("invalid subscriber: expected *subscriber") + } + if len(sub.handlers) == 0 { + return fmt.Errorf("invalid subscriber: no handler functions") + } + + s.Lock() + _, ok = s.subscribers[sub] + if ok { + return fmt.Errorf("subscriber %v already exists", s) + } + s.subscribers[sub] = nil + s.Unlock() + return nil +} + func (s *rpcServer) Register() error { // parse address for host, port config := s.Config() @@ -110,6 +137,9 @@ func (s *rpcServer) Register() error { for _, e := range s.handlers { endpoints = append(endpoints, e.Endpoints()...) } + for e, _ := range s.subscribers { + endpoints = append(endpoints, e.Endpoints()...) + } s.RUnlock() service := ®istry.Service{ @@ -120,7 +150,23 @@ func (s *rpcServer) Register() error { } log.Infof("Registering node: %s", node.Id) - return config.registry.Register(service) + if err := config.registry.Register(service); err != nil { + return err + } + + s.Lock() + defer s.Unlock() + + for sb, _ := range s.subscribers { + handler := createSubHandler(sb) + sub, err := config.broker.Subscribe(sb.Topic(), handler) + if err != nil { + return err + } + s.subscribers[sb] = []broker.Subscriber{sub} + } + + return nil } func (s *rpcServer) Deregister() error { @@ -147,7 +193,21 @@ func (s *rpcServer) Deregister() error { Nodes: []*registry.Node{node}, } - return config.registry.Deregister(service) + log.Infof("Deregistering node: %s", node.Id) + if err := config.registry.Deregister(service); err != nil { + return err + } + + s.Lock() + for sb, subs := range s.subscribers { + for _, sub := range subs { + log.Infof("Unsubscribing from topic: %s", sub.Topic()) + sub.Unsubscribe() + } + s.subscribers[sb] = nil + } + s.Unlock() + return nil } func (s *rpcServer) Start() error { @@ -169,9 +229,11 @@ func (s *rpcServer) Start() error { go func() { ch := <-s.exit ch <- ts.Close() + config.broker.Disconnect() }() - return nil + // TODO: subscribe to cruft + return config.broker.Connect() } func (s *rpcServer) Stop() error { diff --git a/server/server.go b/server/server.go index 22dfb798..997e623b 100644 --- a/server/server.go +++ b/server/server.go @@ -14,6 +14,8 @@ type Server interface { Init(...Option) Handle(Handler) error NewHandler(interface{}) Handler + NewSubscriber(string, interface{}) Subscriber + Subscribe(Subscriber) error Register() error Deregister() error Start() error @@ -45,6 +47,10 @@ func NewServer(opt ...Option) Server { return newRpcServer(opt...) } +func NewSubscriber(topic string, h interface{}) Subscriber { + return DefaultServer.NewSubscriber(topic, h) +} + func NewHandler(h interface{}) Handler { return DefaultServer.NewHandler(h) } @@ -53,6 +59,10 @@ func Handle(h Handler) error { return DefaultServer.Handle(h) } +func Subscribe(s Subscriber) error { + return DefaultServer.Subscribe(s) +} + func Register() error { return DefaultServer.Register() } @@ -78,9 +88,6 @@ func Run() error { return err } - log.Infof("Deregistering %s", DefaultServer.Config().Id()) - DefaultServer.Deregister() - return Stop() } diff --git a/server/subscriber.go b/server/subscriber.go new file mode 100644 index 00000000..eddfc5a7 --- /dev/null +++ b/server/subscriber.go @@ -0,0 +1,154 @@ +package server + +import ( + "encoding/json" + "reflect" + + "github.com/golang/protobuf/proto" + "github.com/myodc/go-micro/broker" + c "github.com/myodc/go-micro/context" + "github.com/myodc/go-micro/registry" + "golang.org/x/net/context" +) + +type handler struct { + method reflect.Value + reqType reflect.Type + ctxType reflect.Type +} + +type subscriber struct { + topic string + rcvr reflect.Value + typ reflect.Type + subscriber interface{} + handlers []*handler + endpoints []*registry.Endpoint +} + +func newSubscriber(topic string, sub interface{}) Subscriber { + var endpoints []*registry.Endpoint + var handlers []*handler + + if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { + h := &handler{ + method: reflect.ValueOf(sub), + } + + switch typ.NumIn() { + case 1: + h.reqType = typ.In(0) + case 2: + h.ctxType = typ.In(0) + h.reqType = typ.In(1) + } + + handlers = append(handlers, h) + + endpoints = append(endpoints, ®istry.Endpoint{ + Name: "Func", + Request: extractSubValue(typ), + Metadata: map[string]string{ + "topic": topic, + }, + }) + } else { + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + h := &handler{ + method: method.Func, + } + + switch method.Type.NumIn() { + case 2: + h.reqType = method.Type.In(1) + case 3: + h.ctxType = method.Type.In(1) + h.reqType = method.Type.In(2) + } + + handlers = append(handlers, h) + + endpoints = append(endpoints, ®istry.Endpoint{ + Name: method.Name, + Request: extractSubValue(method.Type), + Metadata: map[string]string{ + "topic": topic, + }, + }) + } + } + + return &subscriber{ + rcvr: reflect.ValueOf(sub), + typ: reflect.TypeOf(sub), + topic: topic, + subscriber: sub, + handlers: handlers, + endpoints: endpoints, + } +} + +func createSubHandler(sb *subscriber) broker.Handler { + return func(msg *broker.Message) { + hdr := make(map[string]string) + for k, v := range msg.Header { + hdr[k] = v + } + delete(hdr, "Content-Type") + ctx := c.WithMetadata(context.Background(), hdr) + rctx := reflect.ValueOf(ctx) + + for _, handler := range sb.handlers { + var isVal bool + var req reflect.Value + var uerr error + + if handler.reqType.Kind() == reflect.Ptr { + req = reflect.New(handler.reqType.Elem()) + } else { + req = reflect.New(handler.reqType) + isVal = true + } + + switch msg.Header["Content-Type"] { + case "application/octet-stream": + uerr = proto.Unmarshal(msg.Body, req.Interface().(proto.Message)) + case "application/json": + uerr = json.Unmarshal(msg.Body, req.Interface()) + } + + if uerr != nil { + continue + } + + if isVal { + req = req.Elem() + } + + var vals []reflect.Value + if sb.typ.Kind() != reflect.Func { + vals = append(vals, sb.rcvr) + } + + if handler.ctxType != nil { + vals = append(vals, rctx) + } + + vals = append(vals, req) + go handler.method.Call(vals) + } + } +} + +func (s *subscriber) Topic() string { + return s.topic +} + +func (s *subscriber) Subscriber() interface{} { + return s.subscriber +} + +func (s *subscriber) Endpoints() []*registry.Endpoint { + return s.endpoints +} diff --git a/transport/http_transport.go b/transport/http_transport.go index 818033bc..1c892b53 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -137,7 +137,6 @@ func (h *httpTransportSocket) Send(m *Message) error { ProtoMajor: 1, ProtoMinor: 1, ContentLength: int64(len(m.Body)), - // Request: h.r, } for k, v := range m.Header { diff --git a/transport/transport.go b/transport/transport.go index 1c84b889..3981f8bc 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -1,7 +1,6 @@ package transport type Message struct { - Id string Header map[string]string Body []byte }