353 lines
8.7 KiB
Go
353 lines
8.7 KiB
Go
|
package combo_test
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"net"
|
||
|
"net/http"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
drpccli "go.unistack.org/micro-client-drpc/v3"
|
||
|
grpccli "go.unistack.org/micro-client-grpc/v3"
|
||
|
httpcli "go.unistack.org/micro-client-http/v3"
|
||
|
jsonpbcodec "go.unistack.org/micro-codec-jsonpb/v3"
|
||
|
protocodec "go.unistack.org/micro-codec-proto/v3"
|
||
|
httpsrv "go.unistack.org/micro-server-http/v3"
|
||
|
mdpb "go.unistack.org/micro-tests/server/combo/mdpb"
|
||
|
mgpb "go.unistack.org/micro-tests/server/combo/mgpb"
|
||
|
mhpb "go.unistack.org/micro-tests/server/combo/mhpb"
|
||
|
ndpb "go.unistack.org/micro-tests/server/combo/ndpb"
|
||
|
ngpb "go.unistack.org/micro-tests/server/combo/ngpb"
|
||
|
pb "go.unistack.org/micro-tests/server/combo/proto"
|
||
|
"go.unistack.org/micro/v3/client"
|
||
|
"go.unistack.org/micro/v3/codec"
|
||
|
"go.unistack.org/micro/v3/logger"
|
||
|
"go.unistack.org/micro/v3/metadata"
|
||
|
"go.unistack.org/micro/v3/register"
|
||
|
"go.unistack.org/micro/v3/server"
|
||
|
"golang.org/x/net/http2"
|
||
|
"golang.org/x/net/http2/h2c"
|
||
|
"google.golang.org/grpc"
|
||
|
"google.golang.org/grpc/codes"
|
||
|
"google.golang.org/grpc/credentials/insecure"
|
||
|
"google.golang.org/grpc/encoding"
|
||
|
gmetadata "google.golang.org/grpc/metadata"
|
||
|
"google.golang.org/grpc/peer"
|
||
|
"google.golang.org/grpc/status"
|
||
|
"storj.io/drpc"
|
||
|
"storj.io/drpc/drpcconn"
|
||
|
"storj.io/drpc/drpchttp"
|
||
|
)
|
||
|
|
||
|
type Handler struct {
|
||
|
t *testing.T
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
grpcDefaultContentType = "application/grpc+proto"
|
||
|
drpcDefaultContentType = "application/drpc+proto"
|
||
|
httpDefaultContentType = "application/json"
|
||
|
)
|
||
|
|
||
|
type wrapMicroCodec struct{ codec.Codec }
|
||
|
|
||
|
func (w *wrapMicroCodec) Name() string {
|
||
|
return w.Codec.String()
|
||
|
}
|
||
|
|
||
|
func (w *wrapMicroCodec) Marshal(v interface{}) ([]byte, error) {
|
||
|
return w.Codec.Marshal(v)
|
||
|
}
|
||
|
|
||
|
func (w *wrapMicroCodec) Unmarshal(d []byte, v interface{}) error {
|
||
|
return w.Codec.Unmarshal(d, v)
|
||
|
}
|
||
|
|
||
|
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||
|
// ctx := r.Context()
|
||
|
w.Header().Add("Content-Type", httpDefaultContentType)
|
||
|
w.WriteHeader(http.StatusOK)
|
||
|
_ = jsonpbcodec.NewCodec().Write(w, nil, &pb.CallRsp{Rsp: "name_my_name"})
|
||
|
}
|
||
|
|
||
|
func (h *Handler) ServeGRPC(_ interface{}, stream grpc.ServerStream) error {
|
||
|
ctx := stream.Context()
|
||
|
|
||
|
fullMethod, ok := grpc.MethodFromServerStream(stream)
|
||
|
if !ok {
|
||
|
return status.Errorf(codes.Internal, "method does not exist in context")
|
||
|
}
|
||
|
|
||
|
serviceName, methodName, err := grpcServiceMethod(fullMethod)
|
||
|
if err != nil {
|
||
|
return status.New(codes.InvalidArgument, err.Error()).Err()
|
||
|
}
|
||
|
_, _ = serviceName, methodName
|
||
|
// get grpc metadata
|
||
|
gmd, ok := gmetadata.FromIncomingContext(stream.Context())
|
||
|
if !ok {
|
||
|
gmd = gmetadata.MD{}
|
||
|
}
|
||
|
|
||
|
md := metadata.New(len(gmd))
|
||
|
for k, v := range gmd {
|
||
|
md.Set(k, strings.Join(v, ", "))
|
||
|
}
|
||
|
|
||
|
// timeout for server deadline
|
||
|
to, ok := md.Get("timeout")
|
||
|
if ok {
|
||
|
md.Del("timeout")
|
||
|
}
|
||
|
|
||
|
// get content type
|
||
|
ct := grpcDefaultContentType
|
||
|
|
||
|
if ctype, ok := md.Get("content-type"); ok {
|
||
|
ct = ctype
|
||
|
} else if ctype, ok := md.Get("x-content-type"); ok {
|
||
|
ct = ctype
|
||
|
md.Del("x-content-type")
|
||
|
}
|
||
|
|
||
|
_ = ct
|
||
|
|
||
|
// get peer from context
|
||
|
if p, ok := peer.FromContext(ctx); ok {
|
||
|
md["Remote"] = p.Addr.String()
|
||
|
ctx = peer.NewContext(ctx, p)
|
||
|
}
|
||
|
|
||
|
// create new context
|
||
|
ctx = metadata.NewIncomingContext(ctx, md)
|
||
|
|
||
|
// set the timeout if we have it
|
||
|
if len(to) > 0 {
|
||
|
if n, err := strconv.ParseUint(to, 10, 64); err == nil {
|
||
|
var cancel context.CancelFunc
|
||
|
ctx, cancel = context.WithTimeout(ctx, time.Duration(n))
|
||
|
defer cancel()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
frame := &codec.Frame{}
|
||
|
if err := stream.RecvMsg(frame); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// logger.Infof(ctx, "frame: %s", frame.Data)
|
||
|
|
||
|
if err := stream.SendMsg(&pb.CallRsp{Rsp: "name_my_name"}); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func grpcServiceMethod(m string) (string, string, error) {
|
||
|
if len(m) == 0 {
|
||
|
return "", "", fmt.Errorf("malformed method name: %q", m)
|
||
|
}
|
||
|
|
||
|
// grpc method
|
||
|
if m[0] == '/' {
|
||
|
// [ , Foo, Bar]
|
||
|
// [ , package.Foo, Bar]
|
||
|
// [ , a.package.Foo, Bar]
|
||
|
parts := strings.Split(m, "/")
|
||
|
if len(parts) != 3 || len(parts[1]) == 0 || len(parts[2]) == 0 {
|
||
|
return "", "", fmt.Errorf("malformed method name: %q", m)
|
||
|
}
|
||
|
service := strings.Split(parts[1], ".")
|
||
|
return service[len(service)-1], parts[2], nil
|
||
|
}
|
||
|
|
||
|
// non grpc method
|
||
|
parts := strings.Split(m, ".")
|
||
|
|
||
|
// expect [Foo, Bar]
|
||
|
if len(parts) != 2 {
|
||
|
return "", "", fmt.Errorf("malformed method name: %q", m)
|
||
|
}
|
||
|
|
||
|
return parts[0], parts[1], nil
|
||
|
}
|
||
|
|
||
|
func (h *Handler) ServeDRPC(stream drpc.Stream, rpc string) error {
|
||
|
ctx := stream.Context()
|
||
|
logger.Infof(ctx, "drpc: %#+v", rpc)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (h *Handler) ServeWS(w http.ResponseWriter, r *http.Request) {
|
||
|
ctx := r.Context()
|
||
|
logger.Infof(ctx, "ws: %#+v", r)
|
||
|
}
|
||
|
|
||
|
func (h *Handler) HandleRPC(stream drpc.Stream, rpc string) error {
|
||
|
return h.ServeDRPC(stream, rpc)
|
||
|
}
|
||
|
|
||
|
func TestComboServer(t *testing.T) {
|
||
|
req := &pb.CallReq{Req: "my_name"}
|
||
|
var rsp *pb.CallRsp
|
||
|
|
||
|
reg := register.NewRegister()
|
||
|
ctx := context.Background()
|
||
|
|
||
|
h := &Handler{t: t}
|
||
|
|
||
|
_ = logger.DefaultLogger.Init(logger.WithCallerSkipCount(3))
|
||
|
encoding.RegisterCodec(&wrapMicroCodec{protocodec.NewCodec()})
|
||
|
|
||
|
gsrv := grpc.NewServer(grpc.UnknownServiceHandler(h.ServeGRPC))
|
||
|
|
||
|
comboHandler := newComboMux(h, gsrv, drpchttp.New(h))
|
||
|
http2Server := &http2.Server{}
|
||
|
hs := &http.Server{Handler: h2c.NewHandler(comboHandler, http2Server)}
|
||
|
|
||
|
// create server
|
||
|
srv := httpsrv.NewServer(
|
||
|
server.Address("127.0.0.1:0"),
|
||
|
server.Name("helloworld"),
|
||
|
server.Register(reg),
|
||
|
httpsrv.Server(hs),
|
||
|
)
|
||
|
|
||
|
// init server
|
||
|
if err := srv.Init(); err != nil {
|
||
|
t.Fatal(err)
|
||
|
}
|
||
|
|
||
|
// start server
|
||
|
if err := srv.Start(); err != nil {
|
||
|
t.Fatal(err)
|
||
|
}
|
||
|
|
||
|
// lookup server
|
||
|
service, err := reg.LookupService(ctx, "helloworld")
|
||
|
if err != nil {
|
||
|
t.Fatal(err)
|
||
|
}
|
||
|
|
||
|
if len(service) != 1 {
|
||
|
t.Fatalf("Expected 1 service got %d: %+v", len(service), service)
|
||
|
}
|
||
|
|
||
|
if len(service[0].Nodes) != 1 {
|
||
|
t.Fatalf("Expected 1 node got %d: %+v", len(service[0].Nodes), service[0].Nodes)
|
||
|
}
|
||
|
|
||
|
mhcli := client.NewClientCallOptions(httpcli.NewClient(client.ContentType(httpDefaultContentType), client.Codec(httpDefaultContentType, jsonpbcodec.NewCodec())), client.WithAddress("http://"+service[0].Nodes[0].Address))
|
||
|
|
||
|
mhttpsvc := mhpb.NewTestClient("helloworld", mhcli)
|
||
|
|
||
|
mgcli := client.NewClientCallOptions(grpccli.NewClient(client.ContentType(grpcDefaultContentType), client.Codec(grpcDefaultContentType, protocodec.NewCodec())), client.WithAddress("http://"+service[0].Nodes[0].Address))
|
||
|
|
||
|
mgrpcsvc := mgpb.NewTestClient("helloworld", mgcli)
|
||
|
|
||
|
mdcli := client.NewClientCallOptions(drpccli.NewClient(client.ContentType(drpcDefaultContentType), client.Codec(drpcDefaultContentType, protocodec.NewCodec())), client.WithAddress("http://"+service[0].Nodes[0].Address))
|
||
|
|
||
|
mdrpcsvc := mdpb.NewTestClient("helloworld", mdcli)
|
||
|
|
||
|
t.Logf("call via micro grpc")
|
||
|
rsp, err = mgrpcsvc.Call(ctx, req)
|
||
|
if err != nil {
|
||
|
t.Fatal(err)
|
||
|
} else {
|
||
|
if rsp.Rsp != "name_my_name" {
|
||
|
t.Fatalf("invalid response: %#+v\n", rsp)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
ngcli, err := grpc.DialContext(ctx, service[0].Nodes[0].Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||
|
if err != nil {
|
||
|
t.Fatal(err)
|
||
|
}
|
||
|
defer ngcli.Close()
|
||
|
|
||
|
ngrpcsvc := ngpb.NewTestClient(ngcli)
|
||
|
t.Logf("call via native grpc")
|
||
|
rsp, err = ngrpcsvc.Call(ctx, req)
|
||
|
if err != nil {
|
||
|
t.Fatal(err)
|
||
|
} else {
|
||
|
if rsp.Rsp != "name_my_name" {
|
||
|
t.Fatalf("invalid response: %#+v\n", rsp)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
t.Logf("call via micro http")
|
||
|
rsp, err = mhttpsvc.Call(ctx, req)
|
||
|
if err != nil {
|
||
|
t.Fatal(err)
|
||
|
} else {
|
||
|
if rsp.Rsp != "name_my_name" {
|
||
|
t.Fatalf("invalid response: %#+v\n", rsp)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
tc, err := net.Dial("tcp", service[0].Nodes[0].Address)
|
||
|
if err != nil {
|
||
|
t.Fatal(err)
|
||
|
}
|
||
|
|
||
|
ndcli := drpcconn.New(tc)
|
||
|
defer ndcli.Close()
|
||
|
|
||
|
ndrpcsvc := ndpb.NewDRPCTestClient(ndcli)
|
||
|
|
||
|
t.Logf("call via native drpc")
|
||
|
rsp, err = ndrpcsvc.Call(context.TODO(), req)
|
||
|
if err != nil {
|
||
|
t.Logf("native drpc err: %v", err)
|
||
|
// t.Fatal(err)
|
||
|
} else {
|
||
|
if rsp.Rsp != "name_my_name" {
|
||
|
t.Fatalf("invalid response: %#+v\n", rsp)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
t.Logf("call via micro drpc")
|
||
|
rsp, err = mdrpcsvc.Call(ctx, req)
|
||
|
if err != nil {
|
||
|
t.Logf("micro drpc err: %v", err)
|
||
|
// t.Fatal(err)
|
||
|
} else {
|
||
|
if rsp.Rsp != "name_my_name" {
|
||
|
t.Fatalf("invalid response: %#+v\n", rsp)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func newComboMux(httph http.Handler, grpch http.Handler, drpch http.Handler) http.Handler {
|
||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||
|
if r.ProtoMajor == 2 {
|
||
|
ct := r.Header.Get("content-type")
|
||
|
switch {
|
||
|
case strings.HasPrefix(ct, "application/grpc"):
|
||
|
grpch.ServeHTTP(w, r)
|
||
|
return
|
||
|
case strings.HasPrefix(ct, "application/drpc"):
|
||
|
drpch.ServeHTTP(w, r)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
ws := false
|
||
|
for _, header := range r.Header["Upgrade"] {
|
||
|
if header == "websocket" {
|
||
|
ws = true
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
if ws {
|
||
|
httph.(*Handler).ServeWS(w, r)
|
||
|
return
|
||
|
}
|
||
|
httph.ServeHTTP(w, r)
|
||
|
})
|
||
|
}
|