diff --git a/grpc.go b/grpc.go index 2ee03ba..6653913 100644 --- a/grpc.go +++ b/grpc.go @@ -555,8 +555,8 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error { } g.Lock() - if _, ok = g.subscribers[sub]; ok { + g.Unlock() return fmt.Errorf("subscriber %v already exists", sub) } diff --git a/grpc_test.go b/grpc_test.go index 829a7f1..5003583 100644 --- a/grpc_test.go +++ b/grpc_test.go @@ -1,12 +1,19 @@ -package grpc +package grpc_test import ( "context" + "fmt" "testing" + "github.com/micro/go-micro/v2" + bmemory "github.com/micro/go-micro/v2/broker/memory" + "github.com/micro/go-micro/v2/client" + gcli "github.com/micro/go-micro/v2/client/grpc" "github.com/micro/go-micro/v2/errors" - "github.com/micro/go-micro/v2/registry/memory" + rmemory "github.com/micro/go-micro/v2/registry/memory" "github.com/micro/go-micro/v2/server" + gsrv "github.com/micro/go-micro/v2/server/grpc" + tgrpc "github.com/micro/go-micro/v2/transport/grpc" "google.golang.org/grpc" "google.golang.org/grpc/status" @@ -14,7 +21,17 @@ import ( ) // server is used to implement helloworld.GreeterServer. -type testServer struct{} +type testServer struct { + msgCount int +} + +func (s *testServer) Handle(ctx context.Context, msg *pb.Request) error { + s.msgCount++ + return nil +} +func (s *testServer) HandleError(ctx context.Context, msg *pb.Request) error { + return fmt.Errorf("fake") +} // TestHello implements helloworld.GreeterServer func (s *testServer) Call(ctx context.Context, req *pb.Request, rsp *pb.Response) error { @@ -26,14 +43,75 @@ func (s *testServer) Call(ctx context.Context, req *pb.Request, rsp *pb.Response return nil } -func TestGRPCServer(t *testing.T) { - r := memory.NewRegistry() - s := NewServer( +/* +func BenchmarkServer(b *testing.B) { + r := rmemory.NewRegistry() + br := bmemory.NewBroker() + tr := tgrpc.NewTransport() + s := gsrv.NewServer( + server.Broker(br), server.Name("foo"), server.Registry(r), + server.Transport(tr), ) + c := gcli.NewClient( + client.Registry(r), + client.Broker(br), + client.Transport(tr), + ) + ctx := context.TODO() - pb.RegisterTestHandler(s, &testServer{}) + h := &testServer{} + pb.RegisterTestHandler(s, h) + if err := s.Start(); err != nil { + b.Fatalf("failed to start: %v", err) + } + + // check registration + services, err := r.GetService("foo") + if err != nil || len(services) == 0 { + b.Fatalf("failed to get service: %v # %d", err, len(services)) + } + + defer func() { + if err := s.Stop(); err != nil { + b.Fatalf("failed to stop: %v", err) + } + }() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + c.Call() + } + +} +*/ +func TestGRPCServer(t *testing.T) { + r := rmemory.NewRegistry() + b := bmemory.NewBroker() + tr := tgrpc.NewTransport() + s := gsrv.NewServer( + server.Broker(b), + server.Name("foo"), + server.Registry(r), + server.Transport(tr), + ) + c := gcli.NewClient( + client.Registry(r), + client.Broker(b), + client.Transport(tr), + ) + ctx := context.TODO() + + h := &testServer{} + pb.RegisterTestHandler(s, h) + + if err := micro.RegisterSubscriber("test_topic", s, h.Handle); err != nil { + t.Fatal(err) + } + if err := micro.RegisterSubscriber("error_topic", s, h.HandleError); err != nil { + t.Fatal(err) + } if err := s.Start(); err != nil { t.Fatalf("failed to start: %v", err) @@ -51,6 +129,22 @@ func TestGRPCServer(t *testing.T) { } }() + pub := micro.NewEvent("test_topic", c) + pubErr := micro.NewEvent("error_topic", c) + cnt := 4 + for i := 0; i < cnt; i++ { + if err = pub.Publish(ctx, &pb.Request{Name: fmt.Sprintf("msg %d", i)}); err != nil { + t.Fatal(err) + } + } + + if h.msgCount != cnt { + t.Fatalf("pub/sub not work, or invalid message count %d", h.msgCount) + } + if err = pubErr.Publish(ctx, &pb.Request{}); err == nil { + t.Fatal("this must return error, as we return error from handler") + } + cc, err := grpc.Dial(s.Options().Address, grpc.WithInsecure()) if err != nil { t.Fatalf("failed to dial server: %v", err) diff --git a/subscriber.go b/subscriber.go index a29ebcb..bfe54b8 100644 --- a/subscriber.go +++ b/subscriber.go @@ -246,18 +246,19 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke if g.wg != nil { defer g.wg.Done() } - results <- fn(ctx, &rpcMessage{ + err := fn(ctx, &rpcMessage{ topic: sb.topic, contentType: ct, payload: req.Interface(), header: msg.Header, body: msg.Body, }) + results <- err }() } var errors []string for i := 0; i < len(sb.handlers); i++ { - if rerr := <-results; err != nil { + if rerr := <-results; rerr != nil { errors = append(errors, rerr.Error()) } }