add subscriber testing
This commit is contained in:
		
							
								
								
									
										66
									
								
								http_test.go
									
									
									
									
									
								
							
							
						
						
									
										66
									
								
								http_test.go
									
									
									
									
									
								
							| @@ -1,11 +1,17 @@ | ||||
| package http | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
| 	"net/http" | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	api "github.com/micro/go-api/proto" | ||||
| 	micro "github.com/micro/go-micro" | ||||
| 	"github.com/micro/go-micro/client" | ||||
| 	"github.com/micro/go-micro/registry" | ||||
| 	"github.com/micro/go-micro/registry/mock" | ||||
| 	"github.com/micro/go-micro/server" | ||||
| @@ -87,3 +93,63 @@ func TestHTTPServer(t *testing.T) { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestSubscriber(t *testing.T) { | ||||
| 	reg := mock.NewRegistry() | ||||
|  | ||||
| 	// create server | ||||
| 	srv := NewServer(server.Registry(reg)) | ||||
|  | ||||
| 	// register handler | ||||
| 	if err := srv.Handle(srv.NewHandler(http.NewServeMux())); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	// start server | ||||
| 	if err := srv.Start(); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	topic := "topic" | ||||
| 	ctx, cancel := context.WithTimeout(context.TODO(), time.Millisecond*10) | ||||
| 	sub := srv.NewSubscriber(topic, func(context.Context, *api.Event) error { | ||||
| 		cancel() | ||||
| 		return nil | ||||
| 	}) | ||||
| 	// subscribe | ||||
| 	if err := srv.Subscribe(sub); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	// register server | ||||
| 	if err := srv.Register(); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
|  | ||||
| 	// publish | ||||
| 	wg := sync.WaitGroup{} | ||||
| 	go func() { | ||||
| 		wg.Add(1) | ||||
| 		defer wg.Done() | ||||
| 		mClient := client.NewClient( | ||||
| 			client.Registry(reg), | ||||
| 			client.Broker(srv.Options().Broker), | ||||
| 		) | ||||
| 		pub := micro.NewPublisher(topic, mClient) | ||||
| 		if err := pub.Publish(ctx, &api.Event{}); err != nil { | ||||
| 			t.Fatal(err) | ||||
| 		} | ||||
| 	}() | ||||
| 	wg.Wait() | ||||
|  | ||||
| 	// wait cancel | ||||
| 	<-ctx.Done() | ||||
| 	if ctx.Err() != context.Canceled { | ||||
| 		t.Fatalf("subscriber is not working, err: %s", ctx.Err()) | ||||
| 	} | ||||
|  | ||||
| 	// stop server | ||||
| 	if err := srv.Stop(); err != nil { | ||||
| 		t.Fatal(err) | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user