From ed83c27f0e384347e7080acab7b2901a3217fc10 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 10 Mar 2020 15:21:43 +0000 Subject: [PATCH] add websocket streaming to api rpc handler (#1326) --- api/handler/rpc/rpc.go | 14 +++- api/handler/rpc/stream.go | 150 ++++++++++++++++++++++++++++++++++++++ client/grpc/codec.go | 8 +- 3 files changed, 168 insertions(+), 4 deletions(-) create mode 100644 api/handler/rpc/stream.go diff --git a/api/handler/rpc/rpc.go b/api/handler/rpc/rpc.go index daf601a1..5804dbc8 100644 --- a/api/handler/rpc/rpc.go +++ b/api/handler/rpc/rpc.go @@ -104,9 +104,20 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // micro client c := h.opts.Service.Client() + // create context + cx := ctx.FromRequest(r) + + // if stream we currently only support json + if isStream(r, service) { + serveWebsocket(cx, w, r, service, c) + return + } + // create strategy so := selector.WithStrategy(strategy(service.Services)) + // walk the standard call path + // get payload br, err := requestPayload(r) if err != nil { @@ -114,9 +125,6 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - // create context - cx := ctx.FromRequest(r) - var rsp []byte switch { diff --git a/api/handler/rpc/stream.go b/api/handler/rpc/stream.go new file mode 100644 index 00000000..29eee41b --- /dev/null +++ b/api/handler/rpc/stream.go @@ -0,0 +1,150 @@ +package rpc + +import ( + "context" + "encoding/json" + "net/http" + "strings" + + "github.com/gorilla/websocket" + "github.com/micro/go-micro/v2/api" + "github.com/micro/go-micro/v2/client" + "github.com/micro/go-micro/v2/client/selector" +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +// serveWebsocket will stream rpc back over websockets assuming json +func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, service *api.Service, c client.Client) { + // upgrade the connection + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + // close on exit + defer conn.Close() + + // wait for the first request so we know + _, p, err := conn.ReadMessage() + if err != nil { + return + } + + // send to backend + // default to trying json + var request json.RawMessage + // if the extracted payload isn't empty lets use it + if len(p) > 0 { + request = json.RawMessage(p) + } + + // create a request to the backend + req := c.NewRequest( + service.Name, + service.Endpoint.Name, + &request, + client.WithContentType("application/json"), + ) + + so := selector.WithStrategy(strategy(service.Services)) + + // create a new stream + stream, err := c.Stream(ctx, req, client.WithSelectOption(so)) + if err != nil { + return + } + + // send the first request for the client + // since + if err := stream.Send(request); err != nil { + return + } + + go writeLoop(conn, stream) + + resp := stream.Response() + + // receive from stream and send to client + for { + // read backend response body + body, err := resp.Read() + if err != nil { + return + } + + // write the response + if err := conn.WriteMessage(websocket.TextMessage, body); err != nil { + return + } + } +} + +// writeLoop +func writeLoop(conn *websocket.Conn, stream client.Stream) { + // close stream when done + defer stream.Close() + + for { + _, p, err := conn.ReadMessage() + if err != nil { + return + } + + // send to backend + // default to trying json + var request json.RawMessage + // if the extracted payload isn't empty lets use it + if len(p) > 0 { + request = json.RawMessage(p) + } + + if err := stream.Send(request); err != nil { + return + } + } +} + +func isStream(r *http.Request, srv *api.Service) bool { + // check if it's a web socket + if !isWebSocket(r) { + return false + } + + // check if the endpoint supports streaming + for _, service := range srv.Services { + for _, ep := range service.Endpoints { + // skip if it doesn't match the name + if ep.Name != srv.Endpoint.Name { + continue + } + + // matched if the name + if v := ep.Metadata["stream"]; v == "true" { + return true + } + } + } + + return false +} + +func isWebSocket(r *http.Request) bool { + contains := func(key, val string) bool { + vv := strings.Split(r.Header.Get(key), ",") + for _, v := range vv { + if val == strings.ToLower(strings.TrimSpace(v)) { + return true + } + } + return false + } + + if contains("Connection", "upgrade") && contains("Upgrade", "websocket") { + return true + } + + return false +} diff --git a/client/grpc/codec.go b/client/grpc/codec.go index 0366675a..9f89647a 100644 --- a/client/grpc/codec.go +++ b/client/grpc/codec.go @@ -117,7 +117,9 @@ func (jsonCodec) Marshal(v interface{}) ([]byte, error) { return []byte(s), err } - + if b, ok := v.(*bytes.Frame); ok { + return b.Data, nil + } return json.Marshal(v) } @@ -125,6 +127,10 @@ func (jsonCodec) Unmarshal(data []byte, v interface{}) error { if len(data) == 0 { return nil } + if b, ok := v.(*bytes.Frame); ok { + b.Data = data + return nil + } if pb, ok := v.(proto.Message); ok { return jsonpb.Unmarshal(b.NewReader(data), pb) }