broker: refactor (#396)
All checks were successful
coverage / build (push) Successful in 1m6s
test / test (push) Successful in 2m2s

* remove subscribe from server
* remove publish from client
* broker package refactoring

Co-authored-by: vtolstov <vtolstov@users.noreply.github.com>
Reviewed-on: #396
Co-authored-by: Vasiliy Tolstov <v.tolstov@unistack.org>
Co-committed-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2025-01-30 23:26:45 +03:00
parent 816abc2bbc
commit ffa01de78f
54 changed files with 1011 additions and 4298 deletions

View File

@@ -1,20 +1,13 @@
package server
import (
"context"
"fmt"
"reflect"
"runtime/debug"
"strings"
"sync"
"time"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/errors"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/metadata"
"go.unistack.org/micro/v4/options"
"go.unistack.org/micro/v4/register"
maddr "go.unistack.org/micro/v4/util/addr"
mnet "go.unistack.org/micro/v4/util/net"
@@ -26,10 +19,6 @@ var DefaultCodecs = map[string]codec.Codec{
"application/octet-stream": codec.NewCodec(),
}
const (
defaultContentType = "application/json"
)
type rpcHandler struct {
opts HandlerOptions
handler interface{}
@@ -62,13 +51,12 @@ func (r *rpcHandler) Options() HandlerOptions {
}
type noopServer struct {
h Handler
wg *sync.WaitGroup
rsvc *register.Service
handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber
exit chan chan error
opts Options
h Handler
wg *sync.WaitGroup
rsvc *register.Service
handlers map[string]Handler
exit chan chan error
opts Options
sync.RWMutex
registered bool
started bool
@@ -80,25 +68,12 @@ func NewServer(opts ...Option) Server {
if n.handlers == nil {
n.handlers = make(map[string]Handler)
}
if n.subscribers == nil {
n.subscribers = make(map[*subscriber][]broker.Subscriber)
}
if n.exit == nil {
n.exit = make(chan chan error)
}
return n
}
func (n *noopServer) newCodec(contentType string) (codec.Codec, error) {
if cf, ok := n.opts.Codecs[contentType]; ok {
return cf, nil
}
if cf, ok := DefaultCodecs[contentType]; ok {
return cf, nil
}
return nil, codec.ErrUnknownContentType
}
func (n *noopServer) Live() bool {
return true
}
@@ -120,65 +95,10 @@ func (n *noopServer) Name() string {
return n.opts.Name
}
func (n *noopServer) Subscribe(sb Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
} else if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := ValidateSubscriber(sb); err != nil {
return err
}
n.Lock()
if _, ok = n.subscribers[sub]; ok {
n.Unlock()
return fmt.Errorf("subscriber %v already exists", sub)
}
n.subscribers[sub] = nil
n.Unlock()
return nil
}
type rpcMessage struct {
payload interface{}
codec codec.Codec
header metadata.Metadata
topic string
contentType string
}
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcMessage) Body() interface{} {
return r.payload
}
func (r *rpcMessage) Header() metadata.Metadata {
return r.header
}
func (r *rpcMessage) Codec() codec.Codec {
return r.codec
}
func (n *noopServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
return newRPCHandler(h, opts...)
}
func (n *noopServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
return newSubscriber(topic, sb, opts...)
}
func (n *noopServer) Init(opts ...Option) error {
for _, o := range opts {
o(&n.opts)
@@ -187,9 +107,6 @@ func (n *noopServer) Init(opts ...Option) error {
if n.handlers == nil {
n.handlers = make(map[string]Handler, 1)
}
if n.subscribers == nil {
n.subscribers = make(map[*subscriber][]broker.Subscriber, 1)
}
if n.exit == nil {
n.exit = make(chan chan error)
@@ -288,33 +205,6 @@ func (n *noopServer) Deregister() error {
n.registered = false
cx := config.Context
wg := sync.WaitGroup{}
for sb, subs := range n.subscribers {
for idx := range subs {
if sb.Options().Context != nil {
cx = sb.Options().Context
}
ncx := cx
wg.Add(1)
go func(s broker.Subscriber) {
defer wg.Done()
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(n.opts.Context, "unsubscribing from topic: "+s.Topic())
}
if err := s.Unsubscribe(ncx); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(n.opts.Context, "unsubscribing from topic: "+s.Topic(), err)
}
}
}(subs[idx])
}
n.subscribers[sb] = nil
}
wg.Wait()
n.Unlock()
return nil
}
@@ -351,21 +241,6 @@ func (n *noopServer) Start() error {
}
n.Unlock()
// only connect if we're subscribed
if len(n.subscribers) > 0 {
// connect to the broker
if err := config.Broker.Connect(config.Context); err != nil {
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(n.opts.Context, fmt.Sprintf("broker [%s] connect error", config.Broker.String()), err)
}
return err
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(n.opts.Context, fmt.Sprintf("broker [%s] Connected to %s", config.Broker.String(), config.Broker.Address()))
}
}
// use RegisterCheck func before register
// nolint: nestif
if err := config.RegisterCheck(config.Context); err != nil {
@@ -381,10 +256,6 @@ func (n *noopServer) Start() error {
}
}
if err := n.subscribe(); err != nil {
return err
}
go func() {
t := new(time.Ticker)
@@ -468,42 +339,6 @@ func (n *noopServer) Start() error {
return nil
}
func (n *noopServer) subscribe() error {
config := n.Options()
subCtx := config.Context
for sb := range n.subscribers {
if cx := sb.Options().Context; cx != nil {
subCtx = cx
}
opts := []broker.SubscribeOption{
broker.SubscribeContext(subCtx),
broker.SubscribeAutoAck(sb.Options().AutoAck),
broker.SubscribeBodyOnly(sb.Options().BodyOnly),
}
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.SubscribeGroup(queue))
}
if config.Logger.V(logger.InfoLevel) {
config.Logger.Info(n.opts.Context, "subscribing to topic: "+sb.Topic())
}
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), n.createSubHandler(sb, config), opts...)
if err != nil {
return err
}
n.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}
func (n *noopServer) Stop() error {
n.RLock()
if !n.started {
@@ -523,195 +358,3 @@ func (n *noopServer) Stop() error {
return err
}
func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber {
var handlers []*handler
options := NewSubscriberOptions(opts...)
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
h := &handler{
method: reflect.ValueOf(sub),
}
switch typ.NumIn() {
case 1:
h.reqType = typ.In(0)
case 2:
h.ctxType = typ.In(0)
h.reqType = typ.In(1)
}
handlers = append(handlers, h)
} else {
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
method: method.Func,
}
switch method.Type.NumIn() {
case 2:
h.reqType = method.Type.In(1)
case 3:
h.ctxType = method.Type.In(1)
h.reqType = method.Type.In(2)
}
handlers = append(handlers, h)
}
}
return &subscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
subscriber: sub,
handlers: handlers,
opts: options,
}
}
//nolint:gocyclo
func (n *noopServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) (err error) {
defer func() {
if r := recover(); r != nil {
n.RLock()
config := n.opts
n.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Error(n.opts.Context, "panic recovered: ", r)
config.Logger.Error(n.opts.Context, string(debug.Stack()))
}
err = errors.InternalServerError(n.opts.Name+".subscriber", "panic recovered: %v", r)
}
}()
msg := p.Message()
// if we don't have headers, create empty map
if msg.Header == nil {
msg.Header = metadata.New(2)
}
ct := msg.Header["Content-Type"]
if len(ct) == 0 {
msg.Header.Set(metadata.HeaderContentType, defaultContentType)
ct = defaultContentType
}
cf, err := n.newCodec(ct)
if err != nil {
return err
}
hdr := metadata.New(len(msg.Header))
for k, v := range msg.Header {
hdr.Set(k, v)
}
ctx := metadata.NewIncomingContext(sb.opts.Context, hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
if isVal {
req = req.Elem()
}
if err = cf.Unmarshal(msg.Body, req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Body()))
returnValues := handler.method.Call(vals)
if rerr := returnValues[0].Interface(); rerr != nil {
return rerr.(error)
}
return nil
}
opts.Hooks.EachPrev(func(hook options.Hook) {
if h, ok := hook.(HookSubHandler); ok {
fn = h(fn)
}
})
if n.wg != nil {
n.wg.Add(1)
}
go func() {
if n.wg != nil {
defer n.wg.Done()
}
cerr := fn(ctx, &rpcMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
header: msg.Header,
})
results <- cerr
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if rerr := <-results; rerr != nil {
errors = append(errors, rerr.Error())
}
}
if len(errors) > 0 {
err = fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return err
}
}
func (s *subscriber) Topic() string {
return s.topic
}
func (s *subscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *subscriber) Options() SubscriberOptions {
return s.opts
}
type subscriber struct {
topic string
typ reflect.Type
subscriber interface{}
handlers []*handler
rcvr reflect.Value
opts SubscriberOptions
}
type handler struct {
reqType reflect.Type
ctxType reflect.Type
method reflect.Value
}

View File

@@ -1,124 +0,0 @@
package server_test
import (
"context"
"fmt"
"testing"
"go.unistack.org/micro/v4/broker"
"go.unistack.org/micro/v4/client"
"go.unistack.org/micro/v4/codec"
"go.unistack.org/micro/v4/logger"
"go.unistack.org/micro/v4/options"
"go.unistack.org/micro/v4/server"
)
type TestHandler struct {
t *testing.T
}
type TestMessage struct {
Name string
}
func (h *TestHandler) SingleSubHandler(ctx context.Context, msg *codec.Frame) error {
// fmt.Printf("msg %s\n", msg.Data)
return nil
}
func TestNoopSub(t *testing.T) {
ctx := context.Background()
b := broker.NewBroker()
if err := b.Init(); err != nil {
t.Fatal(err)
}
if err := b.Connect(ctx); err != nil {
t.Fatal(err)
}
if err := logger.DefaultLogger.Init(logger.WithLevel(logger.ErrorLevel)); err != nil {
t.Fatal(err)
}
s := server.NewServer(
server.Broker(b),
server.Codec("application/octet-stream", codec.NewCodec()),
)
if err := s.Init(); err != nil {
t.Fatal(err)
}
c := client.NewClient(
client.Broker(b),
client.Codec("application/octet-stream", codec.NewCodec()),
client.ContentType("application/octet-stream"),
)
if err := c.Init(); err != nil {
t.Fatal(err)
}
h := &TestHandler{t: t}
if err := s.Subscribe(s.NewSubscriber("single_topic", h.SingleSubHandler,
server.SubscriberQueue("queue"),
)); err != nil {
t.Fatal(err)
}
if err := s.Start(); err != nil {
t.Fatal(err)
}
msgs := make([]client.Message, 0, 8)
for i := 0; i < 8; i++ {
msgs = append(msgs, c.NewMessage("batch_topic", &codec.Frame{Data: []byte(fmt.Sprintf(`{"name": "test_name %d"}`, i))}))
}
if err := c.BatchPublish(ctx, msgs); err != nil {
t.Fatal(err)
}
defer func() {
if err := s.Stop(); err != nil {
t.Fatal(err)
}
}()
}
func TestHooks_Wrap(t *testing.T) {
n := 5
fn1 := func(next server.FuncSubHandler) server.FuncSubHandler {
return func(ctx context.Context, msg server.Message) (err error) {
n *= 2
return next(ctx, msg)
}
}
fn2 := func(next server.FuncSubHandler) server.FuncSubHandler {
return func(ctx context.Context, msg server.Message) (err error) {
n -= 10
return next(ctx, msg)
}
}
hs := &options.Hooks{}
hs.Append(server.HookSubHandler(fn1), server.HookSubHandler(fn2))
var fn = func(ctx context.Context, msg server.Message) error {
return nil
}
hs.EachPrev(func(hook options.Hook) {
if h, ok := hook.(server.HookSubHandler); ok {
fn = h(fn)
}
})
if err := fn(nil, nil); err != nil {
t.Fatal(err)
}
if n != 0 {
t.Fatalf("uncorrected hooks call")
}
}

View File

@@ -77,10 +77,6 @@ func NewRegisterService(s Server) (*register.Service, error) {
}
node.Metadata = metadata.Copy(opts.Metadata)
node.Metadata["server"] = s.String()
node.Metadata["broker"] = opts.Broker.String()
node.Metadata["register"] = opts.Register.String()
return &register.Service{
Name: opts.Name,
Version: opts.Version,

View File

@@ -3,6 +3,7 @@ package server
import (
"context"
"errors"
"time"
"go.unistack.org/micro/v4/codec"
@@ -51,10 +52,6 @@ type Server interface {
Handle(h Handler) error
// Create a new handler
NewHandler(h interface{}, opts ...HandlerOption) Handler
// Create a new subscriber
NewSubscriber(topic string, h interface{}, opts ...SubscriberOption) Subscriber
// Register a subscriber
Subscribe(s Subscriber) error
// Start the server
Start() error
// Stop the server
@@ -70,36 +67,10 @@ type Server interface {
}
type (
FuncSubHandler func(ctx context.Context, ms Message) error
HookSubHandler func(next FuncSubHandler) FuncSubHandler
FuncHandler func(ctx context.Context, req Request, rsp interface{}) error
HookHandler func(next FuncHandler) FuncHandler
FuncHandler func(ctx context.Context, req Request, rsp interface{}) error
HookHandler func(next FuncHandler) FuncHandler
)
/*
// Router handle serving messages
type Router interface {
// ProcessMessage processes a message
ProcessMessage(ctx context.Context, msg Message) error
// ServeRequest processes a request to completion
ServeRequest(ctx context.Context, req Request, rsp Response) error
}
*/
// Message is an async message interface
type Message interface {
// Topic of the message
Topic() string
// The decoded payload value
Body() interface{}
// The content type of the payload
ContentType() string
// The raw headers of the message
Header() metadata.Metadata
// Codec used to decode the message
Codec() codec.Codec
}
// Request is a synchronous request interface
type Request interface {
// Service name requested
@@ -172,11 +143,20 @@ type Handler interface {
Options() HandlerOptions
}
// Subscriber interface represents a subscription to a given topic using
// a specific subscriber function or object with endpoints. It mirrors
// the handler in its behaviour.
type Subscriber interface {
Topic() string
Subscriber() interface{}
Options() SubscriberOptions
type serverHeaderKey struct{}
func ResponseMetadata(ctx context.Context, md *metadata.Metadata) context.Context {
return context.WithValue(ctx, serverHeaderKey{}, md)
}
func SetResponseMetadata(ctx context.Context, md metadata.Metadata) error {
if md.Len() == 0 {
return nil
}
h, ok := ctx.Value(serverHeaderKey{}).(*metadata.Metadata)
if !ok || h == nil {
return errors.New("missing metadata")
}
md.CopyTo(*h)
return nil
}

View File

@@ -1,86 +0,0 @@
package server
import (
"fmt"
"reflect"
"unicode"
"unicode/utf8"
)
const (
subSig = "func(context.Context, interface{}) error"
)
// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// Is this an exported - upper case - name?
func isExported(name string) bool {
r, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(r)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
// ValidateSubscriber func signature
func ValidateSubscriber(sub Subscriber) error {
typ := reflect.TypeOf(sub.Subscriber())
var argType reflect.Type
switch typ.Kind() {
case reflect.Func:
name := "Func"
switch typ.NumIn() {
case 2:
argType = typ.In(1)
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of return values: %v require signature %s",
name, typ.NumOut(), subSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
}
default:
hdlr := reflect.ValueOf(sub.Subscriber())
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("%v argument type not exported: %v", name, argType)
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of return values: %v require signature %s",
name, method.Name, method.Type.NumOut(), subSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
}
}
}
return nil
}

View File

@@ -9,17 +9,9 @@ import (
// request and response types.
type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
// SubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete
// publication message.
type SubscriberFunc func(ctx context.Context, msg Message) error
// HandlerWrapper wraps the HandlerFunc and returns the equivalent
type HandlerWrapper func(HandlerFunc) HandlerFunc
// SubscriberWrapper wraps the SubscriberFunc and returns the equivalent
type SubscriberWrapper func(SubscriberFunc) SubscriberFunc
// StreamWrapper wraps a Stream interface and returns the equivalent.
// Because streams exist for the lifetime of a method invocation this
// is a convenient way to wrap a Stream as its in use for trace, monitoring,