diff --git a/broker/http_broker.go b/broker/http_broker.go index 3e9fbc54..385c7fa0 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -126,7 +126,7 @@ func newHttpBroker(opts ...Option) Broker { } h := &httpBroker{ - id: "broker-" + uuid.New().String(), + id: "go.micro.http.broker-" + uuid.New().String(), address: addr, opts: options, r: reg, @@ -472,7 +472,7 @@ func (h *httpBroker) Init(opts ...Option) error { } if len(h.id) == 0 { - h.id = "broker-" + uuid.New().String() + h.id = "go.micro.http.broker-" + uuid.New().String() } // get registry @@ -648,9 +648,6 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO return nil, err } - // create unique id - id := h.id + "." + uuid.New().String() - var secure bool if h.opts.Secure || h.opts.TLSConfig != nil { @@ -659,7 +656,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO // register service node := ®istry.Node{ - Id: id, + Id: h.id, Address: mnet.HostPort(addr, port), Metadata: map[string]string{ "secure": fmt.Sprintf("%t", secure), @@ -684,7 +681,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO subscriber := &httpSubscriber{ opts: options, hb: h, - id: id, + id: h.id, topic: topic, fn: handler, svc: service, diff --git a/client/common_test.go b/client/common_test.go index 42d1ed87..25b2b09c 100644 --- a/client/common_test.go +++ b/client/common_test.go @@ -15,10 +15,16 @@ var ( { Id: "foo-1.0.0-123", Address: "localhost:9999", + Metadata: map[string]string{ + "protocol": "mucp", + }, }, { Id: "foo-1.0.0-321", Address: "localhost:9999", + Metadata: map[string]string{ + "protocol": "mucp", + }, }, }, }, @@ -29,6 +35,9 @@ var ( { Id: "foo-1.0.1-321", Address: "localhost:6666", + Metadata: map[string]string{ + "protocol": "mucp", + }, }, }, }, @@ -39,6 +48,9 @@ var ( { Id: "foo-1.0.3-345", Address: "localhost:8888", + Metadata: map[string]string{ + "protocol": "mucp", + }, }, }, }, diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index fd5bc853..6ca92586 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -13,6 +13,7 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/codec" + raw "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/registry" @@ -70,8 +71,13 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele }, nil } + // only get the things that are of grpc protocol + selectOptions := append(opts.SelectOptions, selector.WithFilter( + selector.FilterLabel("protocol", "grpc"), + )) + // get next nodes from the selector - next, err := g.opts.Selector.Select(service, opts.SelectOptions...) + next, err := g.opts.Selector.Select(service, selectOptions...) if err != nil { if err == selector.ErrNotFound { return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) @@ -510,29 +516,56 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli } func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { + var options client.PublishOptions + for _, o := range opts { + o(&options) + } + md, ok := metadata.FromContext(ctx) if !ok { md = make(map[string]string) } md["Content-Type"] = p.ContentType() + md["Micro-Topic"] = p.Topic() cf, err := g.newGRPCCodec(p.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } - b, err := cf.Marshal(p.Payload()) - if err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + var body []byte + + // passed in raw data + if d, ok := p.Payload().(*raw.Frame); ok { + body = d.Data + } else { + // set the body + b, err := cf.Marshal(p.Payload()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + body = b } g.once.Do(func() { g.opts.Broker.Connect() }) - return g.opts.Broker.Publish(p.Topic(), &broker.Message{ + topic := p.Topic() + + // get proxy topic + if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 { + options.Exchange = prx + } + + // get the exchange + if len(options.Exchange) > 0 { + topic = options.Exchange + } + + return g.opts.Broker.Publish(topic, &broker.Message{ Header: md, - Body: b, + Body: body, }) } diff --git a/client/grpc/grpc_test.go b/client/grpc/grpc_test.go index e437c0ef..6d975ca3 100644 --- a/client/grpc/grpc_test.go +++ b/client/grpc/grpc_test.go @@ -45,6 +45,9 @@ func TestGRPCClient(t *testing.T) { { Id: "test-1", Address: l.Addr().String(), + Metadata: map[string]string{ + "protocol": "grpc", + }, }, }, }) diff --git a/client/rpc_client.go b/client/rpc_client.go index 0a5a0580..71f55708 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -13,6 +13,7 @@ import ( "github.com/micro/go-micro/client/pool" "github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/codec" + raw "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/registry" @@ -349,8 +350,13 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro }, nil } + // only get the things that are of mucp protocol + selectOptions := append(opts.SelectOptions, selector.WithFilter( + selector.FilterLabel("protocol", "mucp"), + )) + // get next nodes from the selector - next, err := r.opts.Selector.Select(service, opts.SelectOptions...) + next, err := r.opts.Selector.Select(service, selectOptions...) if err != nil { if err == selector.ErrNotFound { return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) @@ -583,26 +589,37 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt return errors.InternalServerError("go.micro.client", err.Error()) } - // new buffer - b := buf.New(nil) + var body []byte - if err := cf(b).Write(&codec.Message{ - Target: topic, - Type: codec.Event, - Header: map[string]string{ - "Micro-Id": id, - "Micro-Topic": msg.Topic(), - }, - }, msg.Payload()); err != nil { - return errors.InternalServerError("go.micro.client", err.Error()) + // passed in raw data + if d, ok := msg.Payload().(*raw.Frame); ok { + body = d.Data + } else { + // new buffer + b := buf.New(nil) + + if err := cf(b).Write(&codec.Message{ + Target: topic, + Type: codec.Event, + Header: map[string]string{ + "Micro-Id": id, + "Micro-Topic": msg.Topic(), + }, + }, msg.Payload()); err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // set the body + body = b.Bytes() } + r.once.Do(func() { r.opts.Broker.Connect() }) return r.opts.Broker.Publish(topic, &broker.Message{ Header: md, - Body: b.Bytes(), + Body: body, }) } diff --git a/client/rpc_client_test.go b/client/rpc_client_test.go index f9dbc04d..ace08fff 100644 --- a/client/rpc_client_test.go +++ b/client/rpc_client_test.go @@ -143,6 +143,9 @@ func TestCallWrapper(t *testing.T) { { Id: id, Address: address, + Metadata: map[string]string{ + "protocol": "mucp", + }, }, }, }) diff --git a/client/rpc_codec.go b/client/rpc_codec.go index c20537ea..60dc02b5 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -145,6 +145,11 @@ func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec { return nil } + // processing topic publishing + if len(msg.Header["Micro-Topic"]) > 0 { + return nil + } + // no protocol use old codecs switch msg.Header["Content-Type"] { case "application/json": diff --git a/network/default.go b/network/default.go index 7c37517d..5d4a9711 100644 --- a/network/default.go +++ b/network/default.go @@ -19,6 +19,7 @@ import ( "github.com/micro/go-micro/server" "github.com/micro/go-micro/transport" "github.com/micro/go-micro/tunnel" + bun "github.com/micro/go-micro/tunnel/broker" tun "github.com/micro/go-micro/tunnel/transport" "github.com/micro/go-micro/util/backoff" "github.com/micro/go-micro/util/log" @@ -112,6 +113,11 @@ func newNetwork(opts ...Option) Network { tun.WithTunnel(options.Tunnel), ) + // create the tunnel broker + tunBroker := bun.NewBroker( + bun.WithTunnel(options.Tunnel), + ) + // server is network server server := server.NewServer( server.Id(options.Id), @@ -119,10 +125,12 @@ func newNetwork(opts ...Option) Network { server.Advertise(advertise), server.Name(options.Name), server.Transport(tunTransport), + server.Broker(tunBroker), ) // client is network client client := client.NewClient( + client.Broker(tunBroker), client.Transport(tunTransport), client.Selector( rtr.NewSelector( diff --git a/proxy/grpc/grpc.go b/proxy/grpc/grpc.go index 89947054..91333662 100644 --- a/proxy/grpc/grpc.go +++ b/proxy/grpc/grpc.go @@ -10,7 +10,6 @@ import ( "github.com/micro/go-micro/client/grpc" "github.com/micro/go-micro/codec" "github.com/micro/go-micro/config/options" - "github.com/micro/go-micro/errors" "github.com/micro/go-micro/proxy" "github.com/micro/go-micro/server" ) @@ -62,8 +61,14 @@ func readLoop(r server.Request, s client.Stream) error { } } -func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error { - return errors.InternalServerError("go.micro.proxy.grpc", "SendRequest is unsupported") +// ProcessMessage acts as a message exchange and forwards messages to ongoing topics +// TODO: should we look at p.Endpoint and only send to the local endpoint? probably +func (p *Proxy) ProcessMessage(ctx context.Context, msg server.Message) error { + // TODO: check that we're not broadcast storming by sending to the same topic + // that we're actually subscribed to + + // directly publish to the local client + return p.Client.Publish(ctx, msg) } // ServeRequest honours the server.Proxy interface diff --git a/proxy/http/http.go b/proxy/http/http.go index 0df61927..9bcf01db 100644 --- a/proxy/http/http.go +++ b/proxy/http/http.go @@ -10,7 +10,6 @@ import ( "net/url" "path" - "github.com/micro/go-micro/client" "github.com/micro/go-micro/config/options" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/proxy" @@ -45,8 +44,69 @@ func getEndpoint(hdr map[string]string) string { return "" } -func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error { - return errors.InternalServerError("go.micro.proxy.http", "SendRequest is unsupported") +func getTopic(hdr map[string]string) string { + ep := hdr["Micro-Topic"] + if len(ep) > 0 && ep[0] == '/' { + return ep + } + return "/" + hdr["Micro-Topic"] +} + +// ProcessMessage handles incoming asynchronous messages +func (p *Proxy) ProcessMessage(ctx context.Context, msg server.Message) error { + if p.Endpoint == "" { + p.Endpoint = proxy.DefaultEndpoint + } + + // get the header + hdr := msg.Header() + + // get topic + // use /topic as endpoint + endpoint := getTopic(hdr) + + // set the endpoint + if len(endpoint) == 0 { + endpoint = p.Endpoint + } else { + // add endpoint to backend + u, err := url.Parse(p.Endpoint) + if err != nil { + return errors.InternalServerError(msg.Topic(), err.Error()) + } + u.Path = path.Join(u.Path, endpoint) + endpoint = u.String() + } + + // send to backend + hreq, err := http.NewRequest("POST", endpoint, bytes.NewReader(msg.Body())) + if err != nil { + return errors.InternalServerError(msg.Topic(), err.Error()) + } + + // set the headers + for k, v := range hdr { + hreq.Header.Set(k, v) + } + + // make the call + hrsp, err := http.DefaultClient.Do(hreq) + if err != nil { + return errors.InternalServerError(msg.Topic(), err.Error()) + } + + // read body + b, err := ioutil.ReadAll(hrsp.Body) + hrsp.Body.Close() + if err != nil { + return errors.InternalServerError(msg.Topic(), err.Error()) + } + + if hrsp.StatusCode != 200 { + return errors.New(msg.Topic(), string(b), int32(hrsp.StatusCode)) + } + + return nil } // ServeRequest honours the server.Router interface diff --git a/proxy/mucp/mucp.go b/proxy/mucp/mucp.go index d0738358..1036f722 100644 --- a/proxy/mucp/mucp.go +++ b/proxy/mucp/mucp.go @@ -281,8 +281,34 @@ func (p *Proxy) watchRoutes() { } } -func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error { - return errors.InternalServerError("go.micro.proxy", "SendRequest is unsupported") +// ProcessMessage acts as a message exchange and forwards messages to ongoing topics +// TODO: should we look at p.Endpoint and only send to the local endpoint? probably +func (p *Proxy) ProcessMessage(ctx context.Context, msg server.Message) error { + // TODO: check that we're not broadcast storming by sending to the same topic + // that we're actually subscribed to + + log.Tracef("Received message for %s", msg.Topic()) + + var errors []string + + // directly publish to the local client + if err := p.Client.Publish(ctx, msg); err != nil { + errors = append(errors, err.Error()) + } + + // publish to all links + for _, client := range p.Links { + if err := client.Publish(ctx, msg); err != nil { + errors = append(errors, err.Error()) + } + } + + if len(errors) == 0 { + return nil + } + + // there is no error...muahaha + return fmt.Errorf("Message processing error: %s", strings.Join(errors, "\n")) } // ServeRequest honours the server.Router interface @@ -302,6 +328,8 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server return errors.BadRequest("go.micro.proxy", "service name is blank") } + log.Tracef("Received request for %s", service) + // are we network routing or local routing if len(p.Links) == 0 { local = true diff --git a/proxy/proxy.go b/proxy/proxy.go index e91eaf0f..e08926e9 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -13,9 +13,9 @@ import ( // Proxy can be used as a proxy server for go-micro services type Proxy interface { options.Options - // SendRequest honours the client.Router interface - SendRequest(context.Context, client.Request, client.Response) error - // ServeRequest honours the server.Router interface + // ProcessMessage handles inbound messages + ProcessMessage(context.Context, server.Message) error + // ServeRequest handles inbound requests ServeRequest(context.Context, server.Request, server.Response) error } diff --git a/server/grpc/grpc.go b/server/grpc/grpc.go index a09dd594..942d1a8f 100644 --- a/server/grpc/grpc.go +++ b/server/grpc/grpc.go @@ -89,6 +89,11 @@ func newGRPCServer(opts ...server.Option) server.Server { type grpcRouter struct { h func(context.Context, server.Request, interface{}) error + m func(context.Context, server.Message) error +} + +func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error { + return r.m(ctx, msg) } func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { @@ -258,7 +263,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error { handler = g.opts.HdlrWrappers[i-1](handler) } - r := grpcRouter{handler} + r := grpcRouter{h: handler} // serve the actual request using the request router if err := r.ServeRequest(ctx, request, response); err != nil { @@ -564,7 +569,7 @@ func (g *grpcServer) Register() error { node.Metadata["registry"] = config.Registry.String() node.Metadata["server"] = g.String() node.Metadata["transport"] = g.String() - // node.Metadata["transport"] = config.Transport.String() + node.Metadata["protocol"] = "grpc" g.RLock() // Maps are ordered randomly, sort the keys for consistency diff --git a/server/grpc/request.go b/server/grpc/request.go index 617b9a7d..adfed48e 100644 --- a/server/grpc/request.go +++ b/server/grpc/request.go @@ -20,6 +20,9 @@ type rpcMessage struct { topic string contentType string payload interface{} + header map[string]string + body []byte + codec codec.Codec } func (r *rpcRequest) ContentType() string { @@ -73,3 +76,15 @@ func (r *rpcMessage) Topic() string { func (r *rpcMessage) Payload() interface{} { return r.payload } + +func (r *rpcMessage) Header() map[string]string { + return r.header +} + +func (r *rpcMessage) Body() []byte { + return r.body +} + +func (r *rpcMessage) Codec() codec.Reader { + return r.codec +} diff --git a/server/grpc/subscriber.go b/server/grpc/subscriber.go index 1f885cab..0e4f4f4f 100644 --- a/server/grpc/subscriber.go +++ b/server/grpc/subscriber.go @@ -33,7 +33,6 @@ type subscriber struct { } func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { - options := server.SubscriberOptions{ AutoAck: true, } @@ -239,6 +238,8 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke topic: sb.topic, contentType: ct, payload: req.Interface(), + header: msg.Header, + body: msg.Body, }) }() } diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 2fbddc0e..342110aa 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -135,12 +135,18 @@ func setupProtocol(msg *transport.Message) codec.NewCodec { endpoint := getHeader("Micro-Endpoint", msg.Header) protocol := getHeader("Micro-Protocol", msg.Header) target := getHeader("Micro-Target", msg.Header) + topic := getHeader("Micro-Topic", msg.Header) // if the protocol exists (mucp) do nothing if len(protocol) > 0 { return nil } + // newer method of processing messages over transport + if len(topic) > 0 { + return nil + } + // if no service/method/endpoint then it's the old protocol if len(service) == 0 && len(method) == 0 && len(endpoint) == 0 { return defaultCodecs[msg.Header["Content-Type"]] diff --git a/server/rpc_event.go b/server/rpc_event.go new file mode 100644 index 00000000..4bd39d4d --- /dev/null +++ b/server/rpc_event.go @@ -0,0 +1,33 @@ +package server + +import ( + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/transport" +) + +// event is a broker event we handle on the server transport +type event struct { + message *broker.Message +} + +func (e *event) Ack() error { + // there is no ack support + return nil +} + +func (e *event) Message() *broker.Message { + return e.message +} + +func (e *event) Topic() string { + return e.message.Header["Micro-Topic"] +} + +func newEvent(msg transport.Message) *event { + return &event{ + message: &broker.Message{ + Header: msg.Header, + Body: msg.Body, + }, + } +} diff --git a/server/rpc_request.go b/server/rpc_request.go index 532cf4b1..065d57e4 100644 --- a/server/rpc_request.go +++ b/server/rpc_request.go @@ -1,8 +1,11 @@ package server import ( + "bytes" + "github.com/micro/go-micro/codec" "github.com/micro/go-micro/transport" + "github.com/micro/go-micro/util/buf" ) type rpcRequest struct { @@ -23,6 +26,9 @@ type rpcMessage struct { topic string contentType string payload interface{} + header map[string]string + body []byte + codec codec.NewCodec } func (r *rpcRequest) Codec() codec.Reader { @@ -86,3 +92,16 @@ func (r *rpcMessage) Topic() string { func (r *rpcMessage) Payload() interface{} { return r.payload } + +func (r *rpcMessage) Header() map[string]string { + return r.header +} + +func (r *rpcMessage) Body() []byte { + return r.body +} + +func (r *rpcMessage) Codec() codec.Reader { + b := buf.New(bytes.NewBuffer(r.body)) + return r.codec(b) +} diff --git a/server/rpc_router.go b/server/rpc_router.go index b1eb66ff..493f2af7 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -9,6 +9,7 @@ package server import ( "context" "errors" + "fmt" "io" "reflect" "strings" @@ -60,19 +61,30 @@ type response struct { // router represents an RPC router. type router struct { - name string - mu sync.Mutex // protects the serviceMap - serviceMap map[string]*service - reqLock sync.Mutex // protects freeReq - freeReq *request - respLock sync.Mutex // protects freeResp - freeResp *response + name string + + mu sync.Mutex // protects the serviceMap + serviceMap map[string]*service + + reqLock sync.Mutex // protects freeReq + freeReq *request + + respLock sync.Mutex // protects freeResp + freeResp *response + + // handler wrappers hdlrWrappers []HandlerWrapper + // subscriber wrappers + subWrappers []SubscriberWrapper + + su sync.RWMutex + subscribers map[string][]*subscriber } func newRpcRouter() *router { return &router{ - serviceMap: make(map[string]*service), + serviceMap: make(map[string]*service), + subscribers: make(map[string][]*subscriber), } } @@ -449,3 +461,144 @@ func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response) } return service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec()) } + +func (router *router) NewSubscriber(topic string, handler interface{}, opts ...SubscriberOption) Subscriber { + return newSubscriber(topic, handler, opts...) +} + +func (router *router) Subscribe(s Subscriber) error { + sub, ok := s.(*subscriber) + if !ok { + return fmt.Errorf("invalid subscriber: expected *subscriber") + } + if len(sub.handlers) == 0 { + return fmt.Errorf("invalid subscriber: no handler functions") + } + + if err := validateSubscriber(sub); err != nil { + return err + } + + router.su.Lock() + defer router.su.Unlock() + + // append to subscribers + subs := router.subscribers[sub.Topic()] + subs = append(subs, sub) + router.subscribers[sub.Topic()] = subs + + return nil +} + +func (router *router) ProcessMessage(ctx context.Context, msg Message) error { + router.su.RLock() + + // get the subscribers by topic + subs, ok := router.subscribers[msg.Topic()] + if !ok { + router.su.RUnlock() + return nil + } + + // unlock since we only need to get the subs + router.su.RUnlock() + + var results []string + + // we may have multiple subscribers for the topic + for _, sub := range subs { + // we may have multiple handlers per subscriber + for i := 0; i < len(sub.handlers); i++ { + // get the handler + handler := sub.handlers[i] + + var isVal bool + var req reflect.Value + + // check whether the handler is a pointer + if handler.reqType.Kind() == reflect.Ptr { + req = reflect.New(handler.reqType.Elem()) + } else { + req = reflect.New(handler.reqType) + isVal = true + } + + // if its a value get the element + if isVal { + req = req.Elem() + } + + if handler.reqType.Kind() == reflect.Ptr { + req = reflect.New(handler.reqType.Elem()) + } else { + req = reflect.New(handler.reqType) + isVal = true + } + + // if its a value get the element + if isVal { + req = req.Elem() + } + + cc := msg.Codec() + + // read the header. mostly a noop + if err := cc.ReadHeader(&codec.Message{}, codec.Event); err != nil { + return err + } + + // read the body into the handler request value + if err := cc.ReadBody(req.Interface()); err != nil { + return err + } + + // create the handler which will honour the SubscriberFunc type + fn := func(ctx context.Context, msg Message) error { + var vals []reflect.Value + if sub.typ.Kind() != reflect.Func { + vals = append(vals, sub.rcvr) + } + if handler.ctxType != nil { + vals = append(vals, reflect.ValueOf(ctx)) + } + + // values to pass the handler + vals = append(vals, reflect.ValueOf(msg.Payload())) + + // execute the actuall call of the handler + returnValues := handler.method.Call(vals) + if err := returnValues[0].Interface(); err != nil { + return err.(error) + } + return nil + } + + // wrap with subscriber wrappers + for i := len(router.subWrappers); i > 0; i-- { + fn = router.subWrappers[i-1](fn) + } + + // create new rpc message + rpcMsg := &rpcMessage{ + topic: msg.Topic(), + contentType: msg.ContentType(), + payload: req.Interface(), + codec: msg.(*rpcMessage).codec, + header: msg.Header(), + body: msg.Body(), + } + + // execute the message handler + if err := fn(ctx, rpcMsg); err != nil { + results = append(results, err.Error()) + } + } + } + + // if no errors just return + if len(results) == 0 { + return nil + } + + return errors.New("subscriber error: " + strings.Join(results, "\n")) +} diff --git a/server/rpc_server.go b/server/rpc_server.go index 850fd97b..607ead53 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -14,6 +14,7 @@ import ( "github.com/micro/go-micro/broker" "github.com/micro/go-micro/codec" + raw "github.com/micro/go-micro/codec/bytes" "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" @@ -30,11 +31,13 @@ type rpcServer struct { sync.RWMutex opts Options handlers map[string]Handler - subscribers map[*subscriber][]broker.Subscriber + subscribers map[Subscriber][]broker.Subscriber // marks the serve as started started bool // used for first registration registered bool + // subscribe to service name + subscriber broker.Subscriber // graceful exit wg *sync.WaitGroup } @@ -43,12 +46,13 @@ func newRpcServer(opts ...Option) Server { options := newOptions(opts...) router := newRpcRouter() router.hdlrWrappers = options.HdlrWrappers + router.subWrappers = options.SubWrappers return &rpcServer{ opts: options, router: router, handlers: make(map[string]Handler), - subscribers: make(map[*subscriber][]broker.Subscriber), + subscribers: make(map[Subscriber][]broker.Subscriber), exit: make(chan chan error), wg: wait(options.Context), } @@ -56,12 +60,85 @@ func newRpcServer(opts ...Option) Server { type rpcRouter struct { h func(context.Context, Request, interface{}) error + m func(context.Context, Message) error +} + +func (r rpcRouter) ProcessMessage(ctx context.Context, msg Message) error { + return r.m(ctx, msg) } func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response) error { return r.h(ctx, req, rsp) } +// HandleEvent handles inbound messages to the service directly +// TODO: handle requests from an event. We won't send a response. +func (s *rpcServer) HandleEvent(e broker.Event) error { + // formatting horrible cruft + msg := e.Message() + + if msg.Header == nil { + // create empty map in case of headers empty to avoid panic later + msg.Header = make(map[string]string) + } + + // get codec + ct := msg.Header["Content-Type"] + + // default content type + if len(ct) == 0 { + msg.Header["Content-Type"] = DefaultContentType + ct = DefaultContentType + } + + // get codec + cf, err := s.newCodec(ct) + if err != nil { + return err + } + + // copy headers + hdr := make(map[string]string) + for k, v := range msg.Header { + hdr[k] = v + } + + // create context + ctx := metadata.NewContext(context.Background(), hdr) + + // TODO: inspect message header + // Micro-Service means a request + // Micro-Topic means a message + + rpcMsg := &rpcMessage{ + topic: msg.Header["Micro-Topic"], + contentType: ct, + payload: &raw.Frame{msg.Body}, + codec: cf, + header: msg.Header, + body: msg.Body, + } + + // existing router + r := Router(s.router) + + // if the router is present then execute it + if s.opts.Router != nil { + // create a wrapped function + handler := s.opts.Router.ProcessMessage + + // execute the wrapper for it + for i := len(s.opts.SubWrappers); i > 0; i-- { + handler = s.opts.SubWrappers[i-1](handler) + } + + // set the router + r = rpcRouter{m: handler} + } + + return r.ProcessMessage(ctx, rpcMsg) +} + // ServeConn serves a single connection func (s *rpcServer) ServeConn(sock transport.Socket) { var wg sync.WaitGroup @@ -97,6 +174,26 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { return } + // check the message header for + // Micro-Service is a request + // Micro-Topic is a message + if t := msg.Header["Micro-Topic"]; len(t) > 0 { + // process the event + ev := newEvent(msg) + // TODO: handle the error event + if err := s.HandleEvent(ev); err != nil { + msg.Header["Micro-Error"] = err.Error() + } + // write back some 200 + sock.Send(&transport.Message{ + Header: msg.Header, + }) + // we're done + continue + } + + // business as usual + // use Micro-Stream as the stream identifier // in the event its blank we'll always process // on the same socket @@ -263,7 +360,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) { } // set the router - r = rpcRouter{handler} + r = rpcRouter{h: handler} } // wait for processing to exit @@ -366,6 +463,7 @@ func (s *rpcServer) Init(opts ...Option) error { r := newRpcRouter() r.hdlrWrappers = s.opts.HdlrWrappers r.serviceMap = s.router.serviceMap + r.subWrappers = s.opts.SubWrappers s.router = r } @@ -391,29 +489,18 @@ func (s *rpcServer) Handle(h Handler) error { } func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber { - return newSubscriber(topic, sb, opts...) + return s.router.NewSubscriber(topic, sb, opts...) } func (s *rpcServer) Subscribe(sb Subscriber) error { - sub, ok := sb.(*subscriber) - if !ok { - return fmt.Errorf("invalid subscriber: expected *subscriber") - } - if len(sub.handlers) == 0 { - return fmt.Errorf("invalid subscriber: no handler functions") - } + s.Lock() + defer s.Unlock() - if err := validateSubscriber(sb); err != nil { + if err := s.router.Subscribe(sb); err != nil { return err } - s.Lock() - defer s.Unlock() - _, ok = s.subscribers[sub] - if ok { - return fmt.Errorf("subscriber %v already exists", s) - } - s.subscribers[sub] = nil + s.subscribers[sb] = nil return nil } @@ -483,7 +570,7 @@ func (s *rpcServer) Register() error { } sort.Strings(handlerList) - var subscriberList []*subscriber + var subscriberList []Subscriber for e := range s.subscribers { // Only advertise non internal subscribers if !e.Options().Internal { @@ -491,7 +578,7 @@ func (s *rpcServer) Register() error { } } sort.Slice(subscriberList, func(i, j int) bool { - return subscriberList[i].topic > subscriberList[j].topic + return subscriberList[i].Topic() > subscriberList[j].Topic() }) endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList)) @@ -535,8 +622,17 @@ func (s *rpcServer) Register() error { s.registered = true + // subscribe to the topic with own name + sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent) + if err != nil { + return err + } + + // save the subscriber + s.subscriber = sub + + // subscribe for all of the subscribers for sb := range s.subscribers { - handler := s.createSubHandler(sb, s.opts) var opts []broker.SubscribeOption if queue := sb.Options().Queue; len(queue) > 0 { opts = append(opts, broker.Queue(queue)) @@ -550,10 +646,11 @@ func (s *rpcServer) Register() error { opts = append(opts, broker.DisableAutoAck()) } - sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...) + sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent, opts...) if err != nil { return err } + log.Logf("Subscribing %s to topic: %s", node.Id, sub.Topic()) s.subscribers[sb] = []broker.Subscriber{sub} } @@ -621,6 +718,12 @@ func (s *rpcServer) Deregister() error { s.registered = false + // close the subscriber + if s.subscriber != nil { + s.subscriber.Unsubscribe() + s.subscriber = nil + } + for sb, subs := range s.subscribers { for _, sub := range subs { log.Logf("Unsubscribing %s from topic: %s", node.Id, sub.Topic()) diff --git a/server/server.go b/server/server.go index 98f2cddb..34c38258 100644 --- a/server/server.go +++ b/server/server.go @@ -29,15 +29,26 @@ type Server interface { // Router handle serving messages type Router interface { + // ProcessMessage processes a message + ProcessMessage(context.Context, Message) error // ServeRequest processes a request to completion ServeRequest(context.Context, Request, Response) error } // Message is an async message interface type Message interface { + // Topic of the message Topic() string + // The decoded payload value Payload() interface{} + // The content type of the payload ContentType() string + // The raw headers of the message + Header() map[string]string + // The raw body of the message + Body() []byte + // Codec used to decode the message + Codec() codec.Reader } // Request is a synchronous request interface diff --git a/server/subscriber.go b/server/subscriber.go index f0f85cf8..62de0ed8 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -1,17 +1,10 @@ package server import ( - "bytes" - "context" "fmt" "reflect" - "strings" - "github.com/micro/go-micro/broker" - "github.com/micro/go-micro/codec" - "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/util/buf" ) const ( @@ -165,124 +158,6 @@ func validateSubscriber(sub Subscriber) error { return nil } -func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler { - return func(p broker.Event) error { - msg := p.Message() - - if msg.Header == nil { - // create empty map in case of headers empty to avoid panic later - msg.Header = make(map[string]string) - } - - // get codec - ct := msg.Header["Content-Type"] - - // default content type - if len(ct) == 0 { - msg.Header["Content-Type"] = DefaultContentType - ct = DefaultContentType - } - - // get codec - cf, err := s.newCodec(ct) - if err != nil { - return err - } - - // copy headers - hdr := make(map[string]string) - for k, v := range msg.Header { - hdr[k] = v - } - - // create context - ctx := metadata.NewContext(context.Background(), hdr) - - results := make(chan error, len(sb.handlers)) - - for i := 0; i < len(sb.handlers); i++ { - handler := sb.handlers[i] - - var isVal bool - var req reflect.Value - - if handler.reqType.Kind() == reflect.Ptr { - req = reflect.New(handler.reqType.Elem()) - } else { - req = reflect.New(handler.reqType) - isVal = true - } - if isVal { - req = req.Elem() - } - - b := buf.New(bytes.NewBuffer(msg.Body)) - co := cf(b) - defer co.Close() - - if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil { - return err - } - - if err := co.ReadBody(req.Interface()); err != nil { - return err - } - - fn := func(ctx context.Context, msg Message) error { - var vals []reflect.Value - if sb.typ.Kind() != reflect.Func { - vals = append(vals, sb.rcvr) - } - if handler.ctxType != nil { - vals = append(vals, reflect.ValueOf(ctx)) - } - - vals = append(vals, reflect.ValueOf(msg.Payload())) - - returnValues := handler.method.Call(vals) - if err := returnValues[0].Interface(); err != nil { - return err.(error) - } - return nil - } - - for i := len(opts.SubWrappers); i > 0; i-- { - fn = opts.SubWrappers[i-1](fn) - } - - if s.wg != nil { - s.wg.Add(1) - } - - go func() { - if s.wg != nil { - defer s.wg.Done() - } - - results <- fn(ctx, &rpcMessage{ - topic: sb.topic, - contentType: ct, - payload: req.Interface(), - }) - }() - } - - var errors []string - - for i := 0; i < len(sb.handlers); i++ { - if err := <-results; err != nil { - errors = append(errors, err.Error()) - } - } - - if len(errors) > 0 { - return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) - } - - return nil - } -} - func (s *subscriber) Topic() string { return s.topic } diff --git a/util/mux/mux.go b/util/mux/mux.go index 4741bd18..741dafcd 100644 --- a/util/mux/mux.go +++ b/util/mux/mux.go @@ -22,6 +22,13 @@ var ( once sync.Once ) +func (s *Server) ProcessMessage(ctx context.Context, msg server.Message) error { + if msg.Topic() == s.Name { + return server.DefaultRouter.ProcessMessage(ctx, msg) + } + return s.Proxy.ProcessMessage(ctx, msg) +} + func (s *Server) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error { if req.Service() == s.Name { return server.DefaultRouter.ServeRequest(ctx, req, rsp)