add combo test example
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
@@ -1,199 +1,72 @@
|
||||
package combo_test
|
||||
|
||||
/*
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"embed"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
drpccli "go.unistack.org/micro-client-drpc/v3"
|
||||
grpccli "go.unistack.org/micro-client-grpc/v3"
|
||||
httpcli "go.unistack.org/micro-client-http/v3"
|
||||
jsonpbcodec "go.unistack.org/micro-codec-jsonpb/v3"
|
||||
protocodec "go.unistack.org/micro-codec-proto/v3"
|
||||
grpcsrv "go.unistack.org/micro-server-grpc/v3"
|
||||
httpsrv "go.unistack.org/micro-server-http/v3"
|
||||
mdpb "go.unistack.org/micro-tests/server/combo/mdpb"
|
||||
mgpb "go.unistack.org/micro-tests/server/combo/mgpb"
|
||||
mhpb "go.unistack.org/micro-tests/server/combo/mhpb"
|
||||
|
||||
// ndpb "go.unistack.org/micro-tests/server/combo/ndpb"
|
||||
// ngpb "go.unistack.org/micro-tests/server/combo/ngpb"
|
||||
ngpb "go.unistack.org/micro-tests/server/combo/ngpb"
|
||||
pb "go.unistack.org/micro-tests/server/combo/proto"
|
||||
"go.unistack.org/micro/v3/client"
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/h2c"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/encoding"
|
||||
gmetadata "google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/status"
|
||||
"storj.io/drpc"
|
||||
"storj.io/drpc/drpcconn"
|
||||
"storj.io/drpc/drpchttp"
|
||||
)
|
||||
|
||||
//go:embed swagger-ui
|
||||
var assets embed.FS
|
||||
|
||||
type Handler struct {
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
const (
|
||||
grpcDefaultContentType = "application/grpc+proto"
|
||||
drpcDefaultContentType = "application/drpc+proto"
|
||||
httpDefaultContentType = "application/json"
|
||||
)
|
||||
|
||||
type wrapMicroCodec struct{ codec.Codec }
|
||||
|
||||
func (w *wrapMicroCodec) Name() string {
|
||||
return w.Codec.String()
|
||||
}
|
||||
|
||||
func (w *wrapMicroCodec) Marshal(v interface{}) ([]byte, error) {
|
||||
return w.Codec.Marshal(v)
|
||||
}
|
||||
|
||||
func (w *wrapMicroCodec) Unmarshal(d []byte, v interface{}) error {
|
||||
return w.Codec.Unmarshal(d, v)
|
||||
}
|
||||
|
||||
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// ctx := r.Context()
|
||||
w.Header().Add("Content-Type", httpDefaultContentType)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_ = jsonpbcodec.NewCodec().Write(w, nil, &pb.CallRsp{Rsp: "name_my_name"})
|
||||
}
|
||||
|
||||
func (h *Handler) ServeGRPC(_ interface{}, stream grpc.ServerStream) error {
|
||||
ctx := stream.Context()
|
||||
|
||||
fullMethod, ok := grpc.MethodFromServerStream(stream)
|
||||
if !ok {
|
||||
return status.Errorf(codes.Internal, "method does not exist in context")
|
||||
}
|
||||
|
||||
serviceName, methodName, err := grpcServiceMethod(fullMethod)
|
||||
if err != nil {
|
||||
return status.New(codes.InvalidArgument, err.Error()).Err()
|
||||
}
|
||||
_, _ = serviceName, methodName
|
||||
// get grpc metadata
|
||||
gmd, ok := gmetadata.FromIncomingContext(stream.Context())
|
||||
if !ok {
|
||||
gmd = gmetadata.MD{}
|
||||
}
|
||||
|
||||
md := metadata.New(len(gmd))
|
||||
for k, v := range gmd {
|
||||
md.Set(k, strings.Join(v, ", "))
|
||||
}
|
||||
|
||||
// timeout for server deadline
|
||||
to, ok := md.Get("timeout")
|
||||
if ok {
|
||||
md.Del("timeout")
|
||||
}
|
||||
|
||||
// get content type
|
||||
ct := grpcDefaultContentType
|
||||
|
||||
if ctype, ok := md.Get("content-type"); ok {
|
||||
ct = ctype
|
||||
} else if ctype, ok := md.Get("x-content-type"); ok {
|
||||
ct = ctype
|
||||
md.Del("x-content-type")
|
||||
}
|
||||
|
||||
_ = ct
|
||||
|
||||
// get peer from context
|
||||
if p, ok := peer.FromContext(ctx); ok {
|
||||
md["Remote"] = p.Addr.String()
|
||||
ctx = peer.NewContext(ctx, p)
|
||||
}
|
||||
|
||||
// create new context
|
||||
ctx = metadata.NewIncomingContext(ctx, md)
|
||||
|
||||
// set the timeout if we have it
|
||||
if len(to) > 0 {
|
||||
if n, err := strconv.ParseUint(to, 10, 64); err == nil {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, time.Duration(n))
|
||||
defer cancel()
|
||||
func newComboMux(httph http.Handler, grpch http.Handler, drpch http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.ProtoMajor == 2 {
|
||||
ct := r.Header.Get("content-type")
|
||||
switch {
|
||||
case strings.HasPrefix(ct, "application/grpc"):
|
||||
if grpch != nil {
|
||||
grpch.ServeHTTP(w, r)
|
||||
}
|
||||
return
|
||||
case strings.HasPrefix(ct, "application/drpc"):
|
||||
if drpch != nil {
|
||||
drpch.ServeHTTP(w, r)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
frame := &codec.Frame{}
|
||||
if err := stream.RecvMsg(frame); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// logger.Infof(ctx, "frame: %s", frame.Data)
|
||||
|
||||
if err := stream.SendMsg(&pb.CallRsp{Rsp: "name_my_name"}); err != nil {
|
||||
return err
|
||||
}
|
||||
httph.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) Call(ctx context.Context, req *pb.CallReq, rsp *pb.CallRsp) error {
|
||||
rsp.Rsp = "name_my_name"
|
||||
return nil
|
||||
}
|
||||
|
||||
func grpcServiceMethod(m string) (string, string, error) {
|
||||
if len(m) == 0 {
|
||||
return "", "", fmt.Errorf("malformed method name: %q", m)
|
||||
}
|
||||
|
||||
// grpc method
|
||||
if m[0] == '/' {
|
||||
// [ , Foo, Bar]
|
||||
// [ , package.Foo, Bar]
|
||||
// [ , a.package.Foo, Bar]
|
||||
parts := strings.Split(m, "/")
|
||||
if len(parts) != 3 || len(parts[1]) == 0 || len(parts[2]) == 0 {
|
||||
return "", "", fmt.Errorf("malformed method name: %q", m)
|
||||
}
|
||||
service := strings.Split(parts[1], ".")
|
||||
return service[len(service)-1], parts[2], nil
|
||||
}
|
||||
|
||||
// non grpc method
|
||||
parts := strings.Split(m, ".")
|
||||
|
||||
// expect [Foo, Bar]
|
||||
if len(parts) != 2 {
|
||||
return "", "", fmt.Errorf("malformed method name: %q", m)
|
||||
}
|
||||
|
||||
return parts[0], parts[1], nil
|
||||
}
|
||||
|
||||
func (h *Handler) ServeDRPC(stream drpc.Stream, rpc string) error {
|
||||
ctx := stream.Context()
|
||||
logger.Infof(ctx, "drpc: %#+v", rpc)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Handler) ServeWS(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
logger.Infof(ctx, "ws: %#+v", r)
|
||||
}
|
||||
|
||||
func (h *Handler) HandleRPC(stream drpc.Stream, rpc string) error {
|
||||
return h.ServeDRPC(stream, rpc)
|
||||
}
|
||||
|
||||
func TestComboServer(t *testing.T) {
|
||||
reg := register.NewRegister()
|
||||
ctx := context.Background()
|
||||
@@ -201,29 +74,51 @@ func TestComboServer(t *testing.T) {
|
||||
h := &Handler{t: t}
|
||||
|
||||
_ = logger.DefaultLogger.Init(logger.WithCallerSkipCount(3))
|
||||
encoding.RegisterCodec(&wrapMicroCodec{protocodec.NewCodec()})
|
||||
|
||||
gsrv := grpc.NewServer(grpc.UnknownServiceHandler(h.ServeGRPC))
|
||||
// create grpc server
|
||||
gsrv := grpcsrv.NewServer(
|
||||
server.Name("helloworld"),
|
||||
server.Register(reg),
|
||||
server.Codec("application/json", jsonpbcodec.NewCodec()),
|
||||
server.Codec("application/grpc", protocodec.NewCodec()),
|
||||
server.Codec("application/grpc+proto", protocodec.NewCodec()),
|
||||
server.Codec("application/grpc+json", jsonpbcodec.NewCodec()),
|
||||
)
|
||||
|
||||
comboHandler := newComboMux(h, gsrv, drpchttp.New(h))
|
||||
http2Server := &http2.Server{}
|
||||
hs := &http.Server{Handler: h2c.NewHandler(comboHandler, http2Server)}
|
||||
// init grpc server
|
||||
if err := gsrv.Init(); err != nil {
|
||||
t.Fatalf("grpc err: %v", err)
|
||||
}
|
||||
|
||||
// create server
|
||||
srv := httpsrv.NewServer(
|
||||
if err := mgpb.RegisterTestServer(gsrv, h); err != nil {
|
||||
t.Fatalf("grpc err: %v", err)
|
||||
}
|
||||
|
||||
swaggerdir, _ := fs.Sub(assets, "swagger-ui")
|
||||
|
||||
// create http server
|
||||
hsrv := httpsrv.NewServer(
|
||||
server.Address("127.0.0.1:0"),
|
||||
server.Name("helloworld"),
|
||||
server.Register(reg),
|
||||
httpsrv.Server(hs),
|
||||
server.Codec("application/json", jsonpbcodec.NewCodec()),
|
||||
httpsrv.PathHandler(http.MethodGet, "/swagger-ui/*", http.StripPrefix("/swagger-ui", http.FileServer(http.FS(swaggerdir))).ServeHTTP),
|
||||
)
|
||||
|
||||
// init server
|
||||
if err := srv.Init(); err != nil {
|
||||
// fill http server handler struct
|
||||
hs := &http.Server{Handler: h2c.NewHandler(newComboMux(hsrv, gsrv.GRPCServer(), nil), &http2.Server{})}
|
||||
|
||||
// init http server
|
||||
if err := hsrv.Init(httpsrv.Server(hs)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// start server
|
||||
if err := srv.Start(); err != nil {
|
||||
if err := mhpb.RegisterTestServer(hsrv, h); err != nil {
|
||||
t.Fatalf("grpc err: %v", err)
|
||||
}
|
||||
|
||||
// start http server
|
||||
if err := hsrv.Start(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -249,10 +144,6 @@ func TestComboServer(t *testing.T) {
|
||||
|
||||
mgrpcsvc := mgpb.NewTestClient("helloworld", mgcli)
|
||||
|
||||
mdcli := client.NewClientCallOptions(drpccli.NewClient(client.ContentType(drpcDefaultContentType), client.Codec(drpcDefaultContentType, protocodec.NewCodec())), client.WithAddress("http://"+service[0].Nodes[0].Address))
|
||||
|
||||
mdrpcsvc := mdpb.NewTestClient("helloworld", mdcli)
|
||||
|
||||
t.Logf("call via micro grpc")
|
||||
rsp, err := mgrpcsvc.Call(ctx, &pb.CallReq{Req: "my_name"})
|
||||
if err != nil {
|
||||
@@ -263,34 +154,21 @@ func TestComboServer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
lis, err := net.Listen("tcp", fmt.Sprintf(":0"))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
s := grpc.NewServer()
|
||||
pb.RegisterGreeterServer(s, &server{})
|
||||
log.Printf("server listening at %v", lis.Addr())
|
||||
if err := s.Serve(lis); err != nil {
|
||||
log.Fatalf("failed to serve: %v", err)
|
||||
}
|
||||
|
||||
ngcli, err := grpc.DialContext(ctx, service[0].Nodes[0].Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer ngcli.Close()
|
||||
|
||||
/*
|
||||
ngrpcsvc := ngpb.NewTestClient(ngcli)
|
||||
t.Logf("call via native grpc")
|
||||
if rsp, err := ngrpcsvc.Call(ctx, &ngpb.CallReq{Req: "my_name"}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if rsp.Rsp != "name_my_name" {
|
||||
t.Fatalf("invalid response: %#+v\n", rsp)
|
||||
}
|
||||
ngrpcsvc := ngpb.NewTestClient(ngcli)
|
||||
t.Logf("call via native grpc")
|
||||
if rsp, err := ngrpcsvc.Call(ctx, &ngpb.CallReq{Req: "my_name"}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
if rsp.Rsp != "name_my_name" {
|
||||
t.Fatalf("invalid response: %#+v\n", rsp)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
t.Logf("call via micro http")
|
||||
if rsp, err := mhttpsvc.Call(ctx, &pb.CallReq{Req: "my_name"}); err != nil {
|
||||
@@ -301,62 +179,5 @@ func TestComboServer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
tc, err := net.Dial("tcp", service[0].Nodes[0].Address)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ndcli := drpcconn.New(tc)
|
||||
defer ndcli.Close()
|
||||
/*
|
||||
ndrpcsvc := ndpb.NewDRPCTestClient(ndcli)
|
||||
|
||||
t.Logf("call via native drpc")
|
||||
if rsp, err := ndrpcsvc.Call(context.TODO(), &ndpb.CallReq{Req: "my_name"}); err != nil {
|
||||
t.Logf("native drpc err: %v", err)
|
||||
// t.Fatal(err)
|
||||
} else {
|
||||
if rsp.Rsp != "name_my_name" {
|
||||
t.Fatalf("invalid response: %#+v\n", rsp)
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("call via micro drpc")
|
||||
if rsp, err = mdrpcsvc.Call(ctx, &pb.CallReq{Req: "my_name"}); err != nil {
|
||||
t.Logf("micro drpc err: %v", err)
|
||||
// t.Fatal(err)
|
||||
} else {
|
||||
if rsp.Rsp != "name_my_name" {
|
||||
t.Fatalf("invalid response: %#+v\n", rsp)
|
||||
}
|
||||
}
|
||||
select {}
|
||||
}
|
||||
|
||||
func newComboMux(httph http.Handler, grpch http.Handler, drpch http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.ProtoMajor == 2 {
|
||||
ct := r.Header.Get("content-type")
|
||||
switch {
|
||||
case strings.HasPrefix(ct, "application/grpc"):
|
||||
grpch.ServeHTTP(w, r)
|
||||
return
|
||||
case strings.HasPrefix(ct, "application/drpc"):
|
||||
drpch.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
}
|
||||
ws := false
|
||||
for _, header := range r.Header["Upgrade"] {
|
||||
if header == "websocket" {
|
||||
ws = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if ws {
|
||||
httph.(*Handler).ServeWS(w, r)
|
||||
return
|
||||
}
|
||||
httph.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
*/
|
||||
|
Reference in New Issue
Block a user