* api/handler/rpc: binary streaming support Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * fixup Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * fix Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * fix sec webscoekt protol Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
		
			
				
	
	
		
			478 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			478 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package rpc is a go-micro rpc handler.
 | |
| package rpc
 | |
| 
 | |
| import (
 | |
| 	"encoding/json"
 | |
| 	"io"
 | |
| 	"net/http"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 
 | |
| 	jsonpatch "github.com/evanphx/json-patch/v5"
 | |
| 	"github.com/joncalhoun/qson"
 | |
| 	"github.com/micro/go-micro/v2/api"
 | |
| 	"github.com/micro/go-micro/v2/api/handler"
 | |
| 	"github.com/micro/go-micro/v2/api/internal/proto"
 | |
| 	"github.com/micro/go-micro/v2/client"
 | |
| 	"github.com/micro/go-micro/v2/client/selector"
 | |
| 	"github.com/micro/go-micro/v2/codec"
 | |
| 	"github.com/micro/go-micro/v2/codec/jsonrpc"
 | |
| 	"github.com/micro/go-micro/v2/codec/protorpc"
 | |
| 	"github.com/micro/go-micro/v2/errors"
 | |
| 	"github.com/micro/go-micro/v2/logger"
 | |
| 	"github.com/micro/go-micro/v2/metadata"
 | |
| 	"github.com/micro/go-micro/v2/registry"
 | |
| 	"github.com/micro/go-micro/v2/util/ctx"
 | |
| 	"github.com/oxtoacart/bpool"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	Handler = "rpc"
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	// supported json codecs
 | |
| 	jsonCodecs = []string{
 | |
| 		"application/grpc+json",
 | |
| 		"application/json",
 | |
| 		"application/json-rpc",
 | |
| 	}
 | |
| 
 | |
| 	// support proto codecs
 | |
| 	protoCodecs = []string{
 | |
| 		"application/grpc",
 | |
| 		"application/grpc+proto",
 | |
| 		"application/proto",
 | |
| 		"application/protobuf",
 | |
| 		"application/proto-rpc",
 | |
| 		"application/octet-stream",
 | |
| 	}
 | |
| 
 | |
| 	bufferPool = bpool.NewSizedBufferPool(1024, 8)
 | |
| )
 | |
| 
 | |
| type rpcHandler struct {
 | |
| 	opts handler.Options
 | |
| 	s    *api.Service
 | |
| }
 | |
| 
 | |
| type buffer struct {
 | |
| 	io.ReadCloser
 | |
| }
 | |
| 
 | |
| func (b *buffer) Write(_ []byte) (int, error) {
 | |
| 	return 0, nil
 | |
| }
 | |
| 
 | |
| // strategy is a hack for selection
 | |
| func strategy(services []*registry.Service) selector.Strategy {
 | |
| 	return func(_ []*registry.Service) selector.Next {
 | |
| 		// ignore input to this function, use services above
 | |
| 		return selector.Random(services)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | |
| 	bsize := handler.DefaultMaxRecvSize
 | |
| 	if h.opts.MaxRecvSize > 0 {
 | |
| 		bsize = h.opts.MaxRecvSize
 | |
| 	}
 | |
| 
 | |
| 	r.Body = http.MaxBytesReader(w, r.Body, bsize)
 | |
| 
 | |
| 	defer r.Body.Close()
 | |
| 	var service *api.Service
 | |
| 
 | |
| 	if h.s != nil {
 | |
| 		// we were given the service
 | |
| 		service = h.s
 | |
| 	} else if h.opts.Router != nil {
 | |
| 		// try get service from router
 | |
| 		s, err := h.opts.Router.Route(r)
 | |
| 		if err != nil {
 | |
| 			writeError(w, r, errors.InternalServerError("go.micro.api", err.Error()))
 | |
| 			return
 | |
| 		}
 | |
| 		service = s
 | |
| 	} else {
 | |
| 		// we have no way of routing the request
 | |
| 		writeError(w, r, errors.InternalServerError("go.micro.api", "no route found"))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// only allow post when we have the router
 | |
| 	if r.Method != "GET" && (h.opts.Router != nil && r.Method != "POST") {
 | |
| 		writeError(w, r, errors.MethodNotAllowed("go.micro.api", "method not allowed"))
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	ct := r.Header.Get("Content-Type")
 | |
| 
 | |
| 	// Strip charset from Content-Type (like `application/json; charset=UTF-8`)
 | |
| 	if idx := strings.IndexRune(ct, ';'); idx >= 0 {
 | |
| 		ct = ct[:idx]
 | |
| 	}
 | |
| 
 | |
| 	// micro client
 | |
| 	c := h.opts.Service.Client()
 | |
| 
 | |
| 	// create context
 | |
| 	cx := ctx.FromRequest(r)
 | |
| 	// get context from http handler wrappers
 | |
| 	md, ok := r.Context().Value(metadata.MetadataKey{}).(metadata.Metadata)
 | |
| 	if !ok {
 | |
| 		md = make(metadata.Metadata)
 | |
| 	}
 | |
| 
 | |
| 	// merge context with overwrite
 | |
| 	cx = metadata.MergeContext(cx, md, true)
 | |
| 
 | |
| 	// set merged context to request
 | |
| 	*r = *r.Clone(cx)
 | |
| 	// if stream we currently only support json
 | |
| 	if isStream(r, service) {
 | |
| 		// drop older context as it can have timeouts and create new
 | |
| 		//		md, _ := metadata.FromContext(cx)
 | |
| 		//serveWebsocket(context.TODO(), w, r, service, c)
 | |
| 		serveWebsocket(cx, w, r, service, c)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// create strategy
 | |
| 	so := selector.WithStrategy(strategy(service.Services))
 | |
| 
 | |
| 	// walk the standard call path
 | |
| 	// get payload
 | |
| 	br, err := requestPayload(r)
 | |
| 	if err != nil {
 | |
| 		writeError(w, r, err)
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	var rsp []byte
 | |
| 
 | |
| 	switch {
 | |
| 	// proto codecs
 | |
| 	case hasCodec(ct, protoCodecs):
 | |
| 		request := &proto.Message{}
 | |
| 		// if the extracted payload isn't empty lets use it
 | |
| 		if len(br) > 0 {
 | |
| 			request = proto.NewMessage(br)
 | |
| 		}
 | |
| 
 | |
| 		// create request/response
 | |
| 		response := &proto.Message{}
 | |
| 
 | |
| 		req := c.NewRequest(
 | |
| 			service.Name,
 | |
| 			service.Endpoint.Name,
 | |
| 			request,
 | |
| 			client.WithContentType(ct),
 | |
| 		)
 | |
| 
 | |
| 		// make the call
 | |
| 		if err := c.Call(cx, req, response, client.WithSelectOption(so)); err != nil {
 | |
| 			writeError(w, r, err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		// marshall response
 | |
| 		rsp, err = response.Marshal()
 | |
| 		if err != nil {
 | |
| 			writeError(w, r, err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 	default:
 | |
| 		// if json codec is not present set to json
 | |
| 		if !hasCodec(ct, jsonCodecs) {
 | |
| 			ct = "application/json"
 | |
| 		}
 | |
| 
 | |
| 		// default to trying json
 | |
| 		var request json.RawMessage
 | |
| 		// if the extracted payload isn't empty lets use it
 | |
| 		if len(br) > 0 {
 | |
| 			request = json.RawMessage(br)
 | |
| 		}
 | |
| 
 | |
| 		// create request/response
 | |
| 		var response json.RawMessage
 | |
| 
 | |
| 		req := c.NewRequest(
 | |
| 			service.Name,
 | |
| 			service.Endpoint.Name,
 | |
| 			&request,
 | |
| 			client.WithContentType(ct),
 | |
| 		)
 | |
| 
 | |
| 		// make the call
 | |
| 		if err := c.Call(cx, req, &response, client.WithSelectOption(so)); err != nil {
 | |
| 			writeError(w, r, err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		// marshall response
 | |
| 		rsp, err = response.MarshalJSON()
 | |
| 		if err != nil {
 | |
| 			writeError(w, r, err)
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// write the response
 | |
| 	writeResponse(w, r, rsp)
 | |
| }
 | |
| 
 | |
| func (rh *rpcHandler) String() string {
 | |
| 	return "rpc"
 | |
| }
 | |
| 
 | |
| func hasCodec(ct string, codecs []string) bool {
 | |
| 	for _, codec := range codecs {
 | |
| 		if ct == codec {
 | |
| 			return true
 | |
| 		}
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // requestPayload takes a *http.Request.
 | |
| // If the request is a GET the query string parameters are extracted and marshaled to JSON and the raw bytes are returned.
 | |
| // If the request method is a POST the request body is read and returned
 | |
| func requestPayload(r *http.Request) ([]byte, error) {
 | |
| 	var err error
 | |
| 
 | |
| 	// we have to decode json-rpc and proto-rpc because we suck
 | |
| 	// well actually because there's no proxy codec right now
 | |
| 
 | |
| 	ct := r.Header.Get("Content-Type")
 | |
| 	switch {
 | |
| 	case strings.Contains(ct, "application/json-rpc"):
 | |
| 		msg := codec.Message{
 | |
| 			Type:   codec.Request,
 | |
| 			Header: make(map[string]string),
 | |
| 		}
 | |
| 		c := jsonrpc.NewCodec(&buffer{r.Body})
 | |
| 		if err = c.ReadHeader(&msg, codec.Request); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		var raw json.RawMessage
 | |
| 		if err = c.ReadBody(&raw); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		return ([]byte)(raw), nil
 | |
| 	case strings.Contains(ct, "application/proto-rpc"), strings.Contains(ct, "application/octet-stream"):
 | |
| 		msg := codec.Message{
 | |
| 			Type:   codec.Request,
 | |
| 			Header: make(map[string]string),
 | |
| 		}
 | |
| 		c := protorpc.NewCodec(&buffer{r.Body})
 | |
| 		if err = c.ReadHeader(&msg, codec.Request); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		var raw proto.Message
 | |
| 		if err = c.ReadBody(&raw); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		return raw.Marshal()
 | |
| 	case strings.Contains(ct, "application/www-x-form-urlencoded"):
 | |
| 		r.ParseForm()
 | |
| 
 | |
| 		// generate a new set of values from the form
 | |
| 		vals := make(map[string]string)
 | |
| 		for k, v := range r.Form {
 | |
| 			vals[k] = strings.Join(v, ",")
 | |
| 		}
 | |
| 
 | |
| 		// marshal
 | |
| 		return json.Marshal(vals)
 | |
| 		// TODO: application/grpc
 | |
| 	}
 | |
| 
 | |
| 	// otherwise as per usual
 | |
| 	ctx := r.Context()
 | |
| 	// dont user meadata.FromContext as it mangles names
 | |
| 	md, ok := ctx.Value(metadata.MetadataKey{}).(metadata.Metadata)
 | |
| 	if !ok {
 | |
| 		md = make(map[string]string)
 | |
| 	}
 | |
| 
 | |
| 	// allocate maximum
 | |
| 	matches := make(map[string]interface{}, len(md))
 | |
| 
 | |
| 	// get fields from url path
 | |
| 	for k, v := range md {
 | |
| 		// filter own keys
 | |
| 		if strings.HasPrefix(k, "x-api-field-") {
 | |
| 			matches[strings.TrimPrefix(k, "x-api-field-")] = v
 | |
| 			delete(md, k)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// map of all fields
 | |
| 	req := make(map[string]interface{}, len(md))
 | |
| 
 | |
| 	// get fields from url values
 | |
| 	if len(r.URL.RawQuery) > 0 {
 | |
| 		umd := make(map[string]interface{})
 | |
| 		err = qson.Unmarshal(&umd, r.URL.RawQuery)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		for k, v := range umd {
 | |
| 			matches[k] = v
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// restore context without fields
 | |
| 	*r = *r.Clone(metadata.NewContext(ctx, md))
 | |
| 
 | |
| 	for k, v := range matches {
 | |
| 		ps := strings.Split(k, ".")
 | |
| 		if len(ps) == 1 {
 | |
| 			req[k] = v
 | |
| 			continue
 | |
| 		}
 | |
| 		em := make(map[string]interface{})
 | |
| 		em[ps[len(ps)-1]] = v
 | |
| 		for i := len(ps) - 2; i > 0; i-- {
 | |
| 			nm := make(map[string]interface{})
 | |
| 			nm[ps[i]] = em
 | |
| 			em = nm
 | |
| 		}
 | |
| 		if vm, ok := req[ps[0]]; ok {
 | |
| 			// nested map
 | |
| 			nm := vm.(map[string]interface{})
 | |
| 			for vk, vv := range em {
 | |
| 				nm[vk] = vv
 | |
| 			}
 | |
| 			req[ps[0]] = nm
 | |
| 		} else {
 | |
| 			req[ps[0]] = em
 | |
| 		}
 | |
| 	}
 | |
| 	pathbuf := []byte("{}")
 | |
| 	if len(req) > 0 {
 | |
| 		pathbuf, err = json.Marshal(req)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	urlbuf := []byte("{}")
 | |
| 	out, err := jsonpatch.MergeMergePatches(urlbuf, pathbuf)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	switch r.Method {
 | |
| 	case "GET":
 | |
| 		// empty response
 | |
| 		if strings.Contains(ct, "application/json") && string(out) == "{}" {
 | |
| 			return out, nil
 | |
| 		} else if string(out) == "{}" && !strings.Contains(ct, "application/json") {
 | |
| 			return []byte{}, nil
 | |
| 		}
 | |
| 		return out, nil
 | |
| 	case "PATCH", "POST", "PUT", "DELETE":
 | |
| 		bodybuf := []byte("{}")
 | |
| 		buf := bufferPool.Get()
 | |
| 		defer bufferPool.Put(buf)
 | |
| 		if _, err := buf.ReadFrom(r.Body); err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if b := buf.Bytes(); len(b) > 0 {
 | |
| 			bodybuf = b
 | |
| 		} else {
 | |
| 			return []byte{}, nil
 | |
| 		}
 | |
| 
 | |
| 		if out, err = jsonpatch.MergeMergePatches(out, bodybuf); err == nil {
 | |
| 			return out, nil
 | |
| 		}
 | |
| 
 | |
| 		//fallback to previous unknown behaviour
 | |
| 		return bodybuf, nil
 | |
| 
 | |
| 	}
 | |
| 
 | |
| 	return []byte{}, nil
 | |
| }
 | |
| 
 | |
| func writeError(w http.ResponseWriter, r *http.Request, err error) {
 | |
| 	ce := errors.Parse(err.Error())
 | |
| 
 | |
| 	switch ce.Code {
 | |
| 	case 0:
 | |
| 		// assuming it's totally screwed
 | |
| 		ce.Code = 500
 | |
| 		ce.Id = "go.micro.api"
 | |
| 		ce.Status = http.StatusText(500)
 | |
| 		ce.Detail = "error during request: " + ce.Detail
 | |
| 		w.WriteHeader(500)
 | |
| 	default:
 | |
| 		w.WriteHeader(int(ce.Code))
 | |
| 	}
 | |
| 
 | |
| 	// response content type
 | |
| 	w.Header().Set("Content-Type", "application/json")
 | |
| 
 | |
| 	// Set trailers
 | |
| 	if strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
 | |
| 		w.Header().Set("Trailer", "grpc-status")
 | |
| 		w.Header().Set("Trailer", "grpc-message")
 | |
| 		w.Header().Set("grpc-status", "13")
 | |
| 		w.Header().Set("grpc-message", ce.Detail)
 | |
| 	}
 | |
| 
 | |
| 	_, werr := w.Write([]byte(ce.Error()))
 | |
| 	if werr != nil {
 | |
| 		if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
 | |
| 			logger.Error(werr)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func writeResponse(w http.ResponseWriter, r *http.Request, rsp []byte) {
 | |
| 	w.Header().Set("Content-Type", r.Header.Get("Content-Type"))
 | |
| 	w.Header().Set("Content-Length", strconv.Itoa(len(rsp)))
 | |
| 
 | |
| 	// Set trailers
 | |
| 	if strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
 | |
| 		w.Header().Set("Trailer", "grpc-status")
 | |
| 		w.Header().Set("Trailer", "grpc-message")
 | |
| 		w.Header().Set("grpc-status", "0")
 | |
| 		w.Header().Set("grpc-message", "")
 | |
| 	}
 | |
| 
 | |
| 	// write 204 status if rsp is nil
 | |
| 	if len(rsp) == 0 {
 | |
| 		w.WriteHeader(http.StatusNoContent)
 | |
| 	}
 | |
| 
 | |
| 	// write response
 | |
| 	_, err := w.Write(rsp)
 | |
| 	if err != nil {
 | |
| 		if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
 | |
| 			logger.Error(err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| }
 | |
| 
 | |
| func NewHandler(opts ...handler.Option) handler.Handler {
 | |
| 	options := handler.NewOptions(opts...)
 | |
| 	return &rpcHandler{
 | |
| 		opts: options,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func WithService(s *api.Service, opts ...handler.Option) handler.Handler {
 | |
| 	options := handler.NewOptions(opts...)
 | |
| 	return &rpcHandler{
 | |
| 		opts: options,
 | |
| 		s:    s,
 | |
| 	}
 | |
| }
 |