151 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			151 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package rpc
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"net/http"
 | |
| 	"strings"
 | |
| 
 | |
| 	"github.com/gorilla/websocket"
 | |
| 	"github.com/micro/go-micro/v2/api"
 | |
| 	"github.com/micro/go-micro/v2/client"
 | |
| 	"github.com/micro/go-micro/v2/client/selector"
 | |
| )
 | |
| 
 | |
| var upgrader = websocket.Upgrader{
 | |
| 	ReadBufferSize:  1024,
 | |
| 	WriteBufferSize: 1024,
 | |
| }
 | |
| 
 | |
| // serveWebsocket will stream rpc back over websockets assuming json
 | |
| func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, service *api.Service, c client.Client) {
 | |
| 	// upgrade the connection
 | |
| 	conn, err := upgrader.Upgrade(w, r, nil)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	// close on exit
 | |
| 	defer conn.Close()
 | |
| 
 | |
| 	// wait for the first request so we know
 | |
| 	_, p, err := conn.ReadMessage()
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// send to backend
 | |
| 	// default to trying json
 | |
| 	var request json.RawMessage
 | |
| 	// if the extracted payload isn't empty lets use it
 | |
| 	if len(p) > 0 {
 | |
| 		request = json.RawMessage(p)
 | |
| 	}
 | |
| 
 | |
| 	// create a request to the backend
 | |
| 	req := c.NewRequest(
 | |
| 		service.Name,
 | |
| 		service.Endpoint.Name,
 | |
| 		&request,
 | |
| 		client.WithContentType("application/json"),
 | |
| 	)
 | |
| 
 | |
| 	so := selector.WithStrategy(strategy(service.Services))
 | |
| 
 | |
| 	// create a new stream
 | |
| 	stream, err := c.Stream(ctx, req, client.WithSelectOption(so))
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// send the first request for the client
 | |
| 	// since
 | |
| 	if err := stream.Send(request); err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	go writeLoop(conn, stream)
 | |
| 
 | |
| 	resp := stream.Response()
 | |
| 
 | |
| 	// receive from stream and send to client
 | |
| 	for {
 | |
| 		// read backend response body
 | |
| 		body, err := resp.Read()
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		// write the response
 | |
| 		if err := conn.WriteMessage(websocket.TextMessage, body); err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // writeLoop
 | |
| func writeLoop(conn *websocket.Conn, stream client.Stream) {
 | |
| 	// close stream when done
 | |
| 	defer stream.Close()
 | |
| 
 | |
| 	for {
 | |
| 		_, p, err := conn.ReadMessage()
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		// send to backend
 | |
| 		// default to trying json
 | |
| 		var request json.RawMessage
 | |
| 		// if the extracted payload isn't empty lets use it
 | |
| 		if len(p) > 0 {
 | |
| 			request = json.RawMessage(p)
 | |
| 		}
 | |
| 
 | |
| 		if err := stream.Send(request); err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func isStream(r *http.Request, srv *api.Service) bool {
 | |
| 	// check if it's a web socket
 | |
| 	if !isWebSocket(r) {
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	// check if the endpoint supports streaming
 | |
| 	for _, service := range srv.Services {
 | |
| 		for _, ep := range service.Endpoints {
 | |
| 			// skip if it doesn't match the name
 | |
| 			if ep.Name != srv.Endpoint.Name {
 | |
| 				continue
 | |
| 			}
 | |
| 
 | |
| 			// matched if the name
 | |
| 			if v := ep.Metadata["stream"]; v == "true" {
 | |
| 				return true
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func isWebSocket(r *http.Request) bool {
 | |
| 	contains := func(key, val string) bool {
 | |
| 		vv := strings.Split(r.Header.Get(key), ",")
 | |
| 		for _, v := range vv {
 | |
| 			if val == strings.ToLower(strings.TrimSpace(v)) {
 | |
| 				return true
 | |
| 			}
 | |
| 		}
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	if contains("Connection", "upgrade") && contains("Upgrade", "websocket") {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 |