package server_test import ( "context" "fmt" "testing" "time" kgo "go.unistack.org/micro-broker-kgo/v3" requestid "go.unistack.org/micro-wrapper-requestid/v3" "go.unistack.org/micro/v3" "go.unistack.org/micro/v3/broker" "go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/logger" "go.unistack.org/micro/v3/logger/slog" "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/server" "github.com/google/uuid" ) type TestHandler struct { t *testing.T } type TestMessage struct { Name string } func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) error { log := logger.MustContext(ctx) log.Info(ctx, fmt.Sprintf("%s", msg.Data)) return nil } func TestNoopSub(t *testing.T) { ctx := context.Background() b := broker.NewBroker() if err := b.Init(); err != nil { t.Fatal(err) } if err := b.Connect(ctx); err != nil { t.Fatal(err) } logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel)) s := server.NewServer( server.Broker(b), server.Codec("application/octet-stream", codec.NewCodec()), ) if err := s.Init(); err != nil { t.Fatal(err) } c := client.NewClient( client.Broker(b), client.Codec("application/octet-stream", codec.NewCodec()), client.ContentType("application/octet-stream"), ) if err := c.Init(); err != nil { t.Fatal(err) } h := &TestHandler{t: t} if err := s.Subscribe(s.NewSubscriber("single_topic", h.SingleSubHandler, server.SubscriberQueue("queue"), )); err != nil { t.Fatal(err) } if err := s.Start(); err != nil { t.Fatal(err) } msgs := make([]client.Message, 0, 8) for i := 0; i < 8; i++ { msgs = append(msgs, c.NewMessage("batch_topic", &codec.Frame{Data: []byte(fmt.Sprintf(`{"name": "test_name %d"}`, i))})) } if err := c.BatchPublish(ctx, msgs); err != nil { t.Fatal(err) } defer func() { if err := s.Stop(); err != nil { t.Fatal(err) } }() } func TestConsumerAndServer(t *testing.T) { const ( topicTarget = "single_topic" addressKafka = "localhost:9092" ) ctx := context.Background() logger.DefaultLogger = slog.NewLogger() logger.DefaultLogger.Init(logger.WithLevel(logger.DebugLevel)) rh := requestid.NewHook() b := kgo.NewBroker( kgo.CommitInterval(1*time.Second), broker.Addrs(addressKafka), broker.Logger(logger.DefaultLogger.Clone(logger.WithLevel(logger.ParseLevel("error")))), broker.ErrorHandler(func(event broker.Event) error { msg := event.Message() log := logger.DefaultLogger. Fields("topic", event.Topic(), "header", msg.Header, "body", msg.Body) err := event.Ack() if err != nil { log.Fields("ack_error", err).Error(context.Background(), fmt.Sprintf("brokerHandlerErr: Ack error | %v", event.Error())) return err } log.Error(context.Background(), fmt.Sprintf("brokerHandlerErr: %v", event.Error())) return nil }), ) if err := b.Init(); err != nil { t.Fatal(err) } if err := b.Connect(ctx); err != nil { t.Fatal(err) } s := server.NewServer( server.Broker(b), server.Codec("application/octet-stream", codec.NewCodec()), server.Hooks( server.NewLoggerHookSubHandler(metadata.HeaderEndpoint), server.HookSubHandler(rh.ServerSubscriber), ), ) if err := s.Init(); err != nil { t.Fatal(err) } c := client.NewClient( client.Broker(b), client.Codec("application/octet-stream", codec.NewCodec()), client.ContentType("application/octet-stream"), ) if err := c.Init(); err != nil { t.Fatal(err) } h := &TestHandler{t: t} if err := s.Subscribe(s.NewSubscriber(topicTarget, h.SingleSubHandler, server.SubscriberQueue("queue"), )); err != nil { t.Fatal(err) } if err := s.Start(); err != nil { t.Fatal(err) } msgs := make([]client.Message, 0, 8) for i := 0; i < 8; i++ { req := &codec.Frame{Data: []byte(fmt.Sprintf(`{"name": "test_name %d"}`, i))} opts := []client.MessageOption{ client.MessageMetadata(metadata.HeaderXRequestID, uuid.NewString()), client.MessageMetadata(metadata.HeaderEndpoint, "test_endpoint"), client.MessageMetadata(metadata.HeaderService, "test_service"), } msgs = append(msgs, c.NewMessage(topicTarget, req, opts...)) } if err := c.BatchPublish(ctx, msgs); err != nil { t.Fatal(err) } msv := micro.NewService( micro.Server(s), micro.Client(c), micro.Broker(b), micro.Context(ctx), micro.Name("test_server"), micro.Version("latest")) if err := msv.Run(); err != nil { t.Fatal(err) } }