86
stream.go
86
stream.go
@@ -1,7 +1,6 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
@@ -12,11 +11,11 @@ import (
|
||||
"github.com/gobwas/httphead"
|
||||
"github.com/gobwas/ws"
|
||||
"github.com/gobwas/ws/wsutil"
|
||||
"github.com/micro/go-micro/v3/api"
|
||||
"github.com/micro/go-micro/v3/client"
|
||||
raw "github.com/micro/go-micro/v3/codec/bytes"
|
||||
"github.com/micro/go-micro/v3/logger"
|
||||
"github.com/micro/go-micro/v3/util/router"
|
||||
"github.com/unistack-org/micro/v3/api"
|
||||
"github.com/unistack-org/micro/v3/client"
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/util/router"
|
||||
)
|
||||
|
||||
// serveWebsocket will stream rpc back over websockets assuming json
|
||||
@@ -29,6 +28,16 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
||||
ct = ct[:idx]
|
||||
}
|
||||
|
||||
// create custom router
|
||||
callOpts := []client.CallOption{client.WithRouter(router.New(service.Services))}
|
||||
|
||||
if t := r.Header.Get("Timeout"); t != "" {
|
||||
// assume timeout integer secodns
|
||||
if td, err := time.ParseDuration(t + "s"); err == nil {
|
||||
callOpts = append(callOpts, client.WithRequestTimeout(td))
|
||||
}
|
||||
}
|
||||
|
||||
// check proto from request
|
||||
switch ct {
|
||||
case "application/json":
|
||||
@@ -44,13 +53,15 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
||||
case "binary":
|
||||
hdr["Sec-WebSocket-Protocol"] = []string{"binary"}
|
||||
op = ws.OpBinary
|
||||
default:
|
||||
op = ws.OpBinary
|
||||
}
|
||||
}
|
||||
}
|
||||
payload, err := requestPayload(r)
|
||||
if err != nil {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(ctx, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -72,36 +83,36 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
||||
|
||||
conn, rw, _, err := upgrader.Upgrade(r, w)
|
||||
if err != nil {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(ctx, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := conn.Close(); err != nil {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(ctx, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
var request interface{}
|
||||
if !bytes.Equal(payload, []byte(`{}`)) {
|
||||
switch ct {
|
||||
case "application/json", "":
|
||||
m := json.RawMessage(payload)
|
||||
request = &m
|
||||
default:
|
||||
request = &raw.Frame{Data: payload}
|
||||
}
|
||||
|
||||
switch ct {
|
||||
case "application/json":
|
||||
m := json.RawMessage(payload)
|
||||
request = &m
|
||||
default:
|
||||
request = &codec.Frame{Data: payload}
|
||||
}
|
||||
|
||||
// we always need to set content type for message
|
||||
if ct == "" {
|
||||
ct = "application/json"
|
||||
}
|
||||
|
||||
req := c.NewRequest(
|
||||
service.Name,
|
||||
service.Endpoint.Name,
|
||||
@@ -110,22 +121,19 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
||||
client.StreamingRequest(),
|
||||
)
|
||||
|
||||
// create custom router
|
||||
callOpt := client.WithRouter(router.New(service.Services))
|
||||
|
||||
// create a new stream
|
||||
stream, err := c.Stream(ctx, req, callOpt)
|
||||
stream, err := c.Stream(ctx, req, callOpts...)
|
||||
if err != nil {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(ctx, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if request != nil {
|
||||
if err = stream.Send(request); err != nil {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(ctx, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -150,22 +158,22 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request,
|
||||
if strings.Contains(err.Error(), "context canceled") {
|
||||
return
|
||||
}
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(ctx, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// write the response
|
||||
if err := wsutil.WriteServerMessage(rw, op, buf); err != nil {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(ctx, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
if err = rw.Flush(); err != nil {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(ctx, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -195,8 +203,8 @@ func writeLoop(rw io.ReadWriter, stream client.Stream) {
|
||||
return
|
||||
}
|
||||
}
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(stream.Context(), err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -210,10 +218,10 @@ func writeLoop(rw io.ReadWriter, stream client.Stream) {
|
||||
// send to backend
|
||||
// default to trying json
|
||||
// if the extracted payload isn't empty lets use it
|
||||
request := &raw.Frame{Data: buf}
|
||||
request := &codec.Frame{Data: buf}
|
||||
if err := stream.Send(request); err != nil {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Error(err)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Error(stream.Context(), err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
Reference in New Issue
Block a user