add websocket streaming to api rpc handler (#1326)

This commit is contained in:
Asim Aslam 2020-03-10 15:21:43 +00:00 committed by GitHub
parent 241614ff68
commit ed83c27f0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 168 additions and 4 deletions

View File

@ -104,9 +104,20 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// micro client // micro client
c := h.opts.Service.Client() c := h.opts.Service.Client()
// create context
cx := ctx.FromRequest(r)
// if stream we currently only support json
if isStream(r, service) {
serveWebsocket(cx, w, r, service, c)
return
}
// create strategy // create strategy
so := selector.WithStrategy(strategy(service.Services)) so := selector.WithStrategy(strategy(service.Services))
// walk the standard call path
// get payload // get payload
br, err := requestPayload(r) br, err := requestPayload(r)
if err != nil { if err != nil {
@ -114,9 +125,6 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
// create context
cx := ctx.FromRequest(r)
var rsp []byte var rsp []byte
switch { switch {

150
api/handler/rpc/stream.go Normal file
View File

@ -0,0 +1,150 @@
package rpc
import (
"context"
"encoding/json"
"net/http"
"strings"
"github.com/gorilla/websocket"
"github.com/micro/go-micro/v2/api"
"github.com/micro/go-micro/v2/client"
"github.com/micro/go-micro/v2/client/selector"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
// serveWebsocket will stream rpc back over websockets assuming json
func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, service *api.Service, c client.Client) {
// upgrade the connection
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
// close on exit
defer conn.Close()
// wait for the first request so we know
_, p, err := conn.ReadMessage()
if err != nil {
return
}
// send to backend
// default to trying json
var request json.RawMessage
// if the extracted payload isn't empty lets use it
if len(p) > 0 {
request = json.RawMessage(p)
}
// create a request to the backend
req := c.NewRequest(
service.Name,
service.Endpoint.Name,
&request,
client.WithContentType("application/json"),
)
so := selector.WithStrategy(strategy(service.Services))
// create a new stream
stream, err := c.Stream(ctx, req, client.WithSelectOption(so))
if err != nil {
return
}
// send the first request for the client
// since
if err := stream.Send(request); err != nil {
return
}
go writeLoop(conn, stream)
resp := stream.Response()
// receive from stream and send to client
for {
// read backend response body
body, err := resp.Read()
if err != nil {
return
}
// write the response
if err := conn.WriteMessage(websocket.TextMessage, body); err != nil {
return
}
}
}
// writeLoop
func writeLoop(conn *websocket.Conn, stream client.Stream) {
// close stream when done
defer stream.Close()
for {
_, p, err := conn.ReadMessage()
if err != nil {
return
}
// send to backend
// default to trying json
var request json.RawMessage
// if the extracted payload isn't empty lets use it
if len(p) > 0 {
request = json.RawMessage(p)
}
if err := stream.Send(request); err != nil {
return
}
}
}
func isStream(r *http.Request, srv *api.Service) bool {
// check if it's a web socket
if !isWebSocket(r) {
return false
}
// check if the endpoint supports streaming
for _, service := range srv.Services {
for _, ep := range service.Endpoints {
// skip if it doesn't match the name
if ep.Name != srv.Endpoint.Name {
continue
}
// matched if the name
if v := ep.Metadata["stream"]; v == "true" {
return true
}
}
}
return false
}
func isWebSocket(r *http.Request) bool {
contains := func(key, val string) bool {
vv := strings.Split(r.Header.Get(key), ",")
for _, v := range vv {
if val == strings.ToLower(strings.TrimSpace(v)) {
return true
}
}
return false
}
if contains("Connection", "upgrade") && contains("Upgrade", "websocket") {
return true
}
return false
}

View File

@ -117,7 +117,9 @@ func (jsonCodec) Marshal(v interface{}) ([]byte, error) {
return []byte(s), err return []byte(s), err
} }
if b, ok := v.(*bytes.Frame); ok {
return b.Data, nil
}
return json.Marshal(v) return json.Marshal(v)
} }
@ -125,6 +127,10 @@ func (jsonCodec) Unmarshal(data []byte, v interface{}) error {
if len(data) == 0 { if len(data) == 0 {
return nil return nil
} }
if b, ok := v.(*bytes.Frame); ok {
b.Data = data
return nil
}
if pb, ok := v.(proto.Message); ok { if pb, ok := v.(proto.Message); ok {
return jsonpb.Unmarshal(b.NewReader(data), pb) return jsonpb.Unmarshal(b.NewReader(data), pb)
} }