From 0374a2de2ccf46b585e7238ee113a15b84c43fd2 Mon Sep 17 00:00:00 2001 From: "xinfei.wu" Date: Thu, 20 Dec 2018 13:21:12 +0800 Subject: [PATCH] add subscriber testing --- http_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/http_test.go b/http_test.go index f5346f4..7aad22e 100644 --- a/http_test.go +++ b/http_test.go @@ -1,11 +1,17 @@ package http import ( + "context" "fmt" "io/ioutil" "net/http" + "sync" "testing" + "time" + api "github.com/micro/go-api/proto" + micro "github.com/micro/go-micro" + "github.com/micro/go-micro/client" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry/mock" "github.com/micro/go-micro/server" @@ -87,3 +93,63 @@ func TestHTTPServer(t *testing.T) { t.Fatal(err) } } + +func TestSubscriber(t *testing.T) { + reg := mock.NewRegistry() + + // create server + srv := NewServer(server.Registry(reg)) + + // register handler + if err := srv.Handle(srv.NewHandler(http.NewServeMux())); err != nil { + t.Fatal(err) + } + + // start server + if err := srv.Start(); err != nil { + t.Fatal(err) + } + + topic := "topic" + ctx, cancel := context.WithTimeout(context.TODO(), time.Millisecond*10) + sub := srv.NewSubscriber(topic, func(context.Context, *api.Event) error { + cancel() + return nil + }) + // subscribe + if err := srv.Subscribe(sub); err != nil { + t.Fatal(err) + } + + // register server + if err := srv.Register(); err != nil { + t.Fatal(err) + } + + // publish + wg := sync.WaitGroup{} + go func() { + wg.Add(1) + defer wg.Done() + mClient := client.NewClient( + client.Registry(reg), + client.Broker(srv.Options().Broker), + ) + pub := micro.NewPublisher(topic, mClient) + if err := pub.Publish(ctx, &api.Event{}); err != nil { + t.Fatal(err) + } + }() + wg.Wait() + + // wait cancel + <-ctx.Done() + if ctx.Err() != context.Canceled { + t.Fatalf("subscriber is not working, err: %s", ctx.Err()) + } + + // stop server + if err := srv.Stop(); err != nil { + t.Fatal(err) + } +}