342 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			342 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package broker
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 
 | |
| 	"go.unistack.org/micro/v4/broker"
 | |
| 	"go.unistack.org/micro/v4/codec"
 | |
| 	"go.unistack.org/micro/v4/logger"
 | |
| 	"go.unistack.org/micro/v4/metadata"
 | |
| 	"go.unistack.org/micro/v4/options"
 | |
| 	maddr "go.unistack.org/micro/v4/util/addr"
 | |
| 	"go.unistack.org/micro/v4/util/id"
 | |
| 	mnet "go.unistack.org/micro/v4/util/net"
 | |
| 	"go.unistack.org/micro/v4/util/rand"
 | |
| )
 | |
| 
 | |
| type Broker struct {
 | |
| 	funcPublish   broker.FuncPublish
 | |
| 	funcSubscribe broker.FuncSubscribe
 | |
| 	subscribers   map[string][]*Subscriber
 | |
| 	addr          string
 | |
| 	opts          broker.Options
 | |
| 	mu            sync.RWMutex
 | |
| 	connected     bool
 | |
| }
 | |
| 
 | |
| type memoryMessage struct {
 | |
| 	c     codec.Codec
 | |
| 	topic string
 | |
| 	ctx   context.Context
 | |
| 	body  []byte
 | |
| 	hdr   metadata.Metadata
 | |
| 	opts  broker.MessageOptions
 | |
| }
 | |
| 
 | |
| func (m *memoryMessage) Ack() error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (m *memoryMessage) Body() []byte {
 | |
| 	return m.body
 | |
| }
 | |
| 
 | |
| func (m *memoryMessage) Header() metadata.Metadata {
 | |
| 	return m.hdr
 | |
| }
 | |
| 
 | |
| func (m *memoryMessage) Context() context.Context {
 | |
| 	return m.ctx
 | |
| }
 | |
| 
 | |
| func (m *memoryMessage) Topic() string {
 | |
| 	return ""
 | |
| }
 | |
| 
 | |
| func (m *memoryMessage) Unmarshal(dst interface{}, opts ...codec.Option) error {
 | |
| 	return m.c.Unmarshal(m.body, dst)
 | |
| }
 | |
| 
 | |
| type Subscriber struct {
 | |
| 	ctx     context.Context
 | |
| 	exit    chan bool
 | |
| 	handler interface{}
 | |
| 	id      string
 | |
| 	topic   string
 | |
| 	opts    broker.SubscribeOptions
 | |
| }
 | |
| 
 | |
| func (b *Broker) newCodec(ct string) (codec.Codec, error) {
 | |
| 	if idx := strings.IndexRune(ct, ';'); idx >= 0 {
 | |
| 		ct = ct[:idx]
 | |
| 	}
 | |
| 	b.mu.RLock()
 | |
| 	c, ok := b.opts.Codecs[ct]
 | |
| 	b.mu.RUnlock()
 | |
| 	if ok {
 | |
| 		return c, nil
 | |
| 	}
 | |
| 	return nil, codec.ErrUnknownContentType
 | |
| }
 | |
| 
 | |
| func (b *Broker) Options() broker.Options {
 | |
| 	return b.opts
 | |
| }
 | |
| 
 | |
| func (b *Broker) Address() string {
 | |
| 	return b.addr
 | |
| }
 | |
| 
 | |
| func (b *Broker) Connect(ctx context.Context) error {
 | |
| 	select {
 | |
| 	case <-ctx.Done():
 | |
| 		return ctx.Err()
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 
 | |
| 	if b.connected {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// use 127.0.0.1 to avoid scan of all network interfaces
 | |
| 	addr, err := maddr.Extract("127.0.0.1")
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	var rng rand.Rand
 | |
| 	i := rng.Intn(20000)
 | |
| 	// set addr with port
 | |
| 	addr = mnet.HostPort(addr, 10000+i)
 | |
| 
 | |
| 	b.addr = addr
 | |
| 	b.connected = true
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *Broker) Disconnect(ctx context.Context) error {
 | |
| 	select {
 | |
| 	case <-ctx.Done():
 | |
| 		return ctx.Err()
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	b.mu.Lock()
 | |
| 	defer b.mu.Unlock()
 | |
| 
 | |
| 	if !b.connected {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	b.connected = false
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *Broker) Init(opts ...broker.Option) error {
 | |
| 	for _, o := range opts {
 | |
| 		o(&b.opts)
 | |
| 	}
 | |
| 
 | |
| 	b.funcPublish = b.fnPublish
 | |
| 	b.funcSubscribe = b.fnSubscribe
 | |
| 
 | |
| 	b.opts.Hooks.EachPrev(func(hook options.Hook) {
 | |
| 		switch h := hook.(type) {
 | |
| 		case broker.HookPublish:
 | |
| 			b.funcPublish = h(b.funcPublish)
 | |
| 		case broker.HookSubscribe:
 | |
| 			b.funcSubscribe = h(b.funcSubscribe)
 | |
| 		}
 | |
| 	})
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *Broker) NewMessage(ctx context.Context, hdr metadata.Metadata, body interface{}, opts ...broker.MessageOption) (broker.Message, error) {
 | |
| 	options := broker.NewMessageOptions(opts...)
 | |
| 	if options.ContentType == "" {
 | |
| 		options.ContentType = b.opts.ContentType
 | |
| 	}
 | |
| 	m := &memoryMessage{ctx: ctx, hdr: hdr, opts: options}
 | |
| 	c, err := b.newCodec(m.opts.ContentType)
 | |
| 	if err == nil {
 | |
| 		m.body, err = c.Marshal(body)
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return m, nil
 | |
| }
 | |
| 
 | |
| func (b *Broker) Publish(ctx context.Context, topic string, messages ...broker.Message) error {
 | |
| 	return b.funcPublish(ctx, topic, messages...)
 | |
| }
 | |
| 
 | |
| func (b *Broker) fnPublish(ctx context.Context, topic string, messages ...broker.Message) error {
 | |
| 	return b.publish(ctx, topic, messages...)
 | |
| }
 | |
| 
 | |
| func (b *Broker) publish(ctx context.Context, topic string, messages ...broker.Message) error {
 | |
| 	b.mu.RLock()
 | |
| 	if !b.connected {
 | |
| 		b.mu.RUnlock()
 | |
| 		return broker.ErrNotConnected
 | |
| 	}
 | |
| 	b.mu.RUnlock()
 | |
| 
 | |
| 	select {
 | |
| 	case <-ctx.Done():
 | |
| 		return ctx.Err()
 | |
| 	default:
 | |
| 	}
 | |
| 
 | |
| 	b.mu.RLock()
 | |
| 	subs, ok := b.subscribers[topic]
 | |
| 	b.mu.RUnlock()
 | |
| 	if !ok {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	var err error
 | |
| 
 | |
| 	for _, sub := range subs {
 | |
| 		switch s := sub.handler.(type) {
 | |
| 		default:
 | |
| 			if b.opts.Logger.V(logger.ErrorLevel) {
 | |
| 				b.opts.Logger.Error(ctx, "broker  handler error", broker.ErrInvalidHandler)
 | |
| 			}
 | |
| 		case func(broker.Message) error:
 | |
| 			for _, message := range messages {
 | |
| 				msg, ok := message.(*memoryMessage)
 | |
| 				if !ok {
 | |
| 					if b.opts.Logger.V(logger.ErrorLevel) {
 | |
| 						b.opts.Logger.Error(ctx, "broker handler error", broker.ErrInvalidMessage)
 | |
| 					}
 | |
| 				}
 | |
| 				msg.topic = topic
 | |
| 				if err = s(msg); err == nil && sub.opts.AutoAck {
 | |
| 					err = msg.Ack()
 | |
| 				}
 | |
| 				if err != nil {
 | |
| 					if b.opts.Logger.V(logger.ErrorLevel) {
 | |
| 						b.opts.Logger.Error(ctx, "broker handler error", err)
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		case func([]broker.Message) error:
 | |
| 			if err = s(messages); err == nil && sub.opts.AutoAck {
 | |
| 				for _, message := range messages {
 | |
| 					err = message.Ack()
 | |
| 					if err != nil {
 | |
| 						if b.opts.Logger.V(logger.ErrorLevel) {
 | |
| 							b.opts.Logger.Error(ctx, "broker handler error", err)
 | |
| 						}
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *Broker) Subscribe(ctx context.Context, topic string, handler interface{}, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
 | |
| 	return b.funcSubscribe(ctx, topic, handler, opts...)
 | |
| }
 | |
| 
 | |
| func (b *Broker) fnSubscribe(ctx context.Context, topic string, handler interface{}, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
 | |
| 	if err := broker.IsValidHandler(handler); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	b.mu.RLock()
 | |
| 	if !b.connected {
 | |
| 		b.mu.RUnlock()
 | |
| 		return nil, broker.ErrNotConnected
 | |
| 	}
 | |
| 	b.mu.RUnlock()
 | |
| 
 | |
| 	sid, err := id.New()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	options := broker.NewSubscribeOptions(opts...)
 | |
| 
 | |
| 	sub := &Subscriber{
 | |
| 		exit:    make(chan bool, 1),
 | |
| 		id:      sid,
 | |
| 		topic:   topic,
 | |
| 		handler: handler,
 | |
| 		opts:    options,
 | |
| 		ctx:     ctx,
 | |
| 	}
 | |
| 
 | |
| 	b.mu.Lock()
 | |
| 	b.subscribers[topic] = append(b.subscribers[topic], sub)
 | |
| 	b.mu.Unlock()
 | |
| 
 | |
| 	go func() {
 | |
| 		<-sub.exit
 | |
| 		b.mu.Lock()
 | |
| 		newSubscribers := make([]*Subscriber, 0, len(b.subscribers)-1)
 | |
| 		for _, sb := range b.subscribers[topic] {
 | |
| 			if sb.id == sub.id {
 | |
| 				continue
 | |
| 			}
 | |
| 			newSubscribers = append(newSubscribers, sb)
 | |
| 		}
 | |
| 		b.subscribers[topic] = newSubscribers
 | |
| 		b.mu.Unlock()
 | |
| 	}()
 | |
| 
 | |
| 	return sub, nil
 | |
| }
 | |
| 
 | |
| func (b *Broker) String() string {
 | |
| 	return "memory"
 | |
| }
 | |
| 
 | |
| func (b *Broker) Name() string {
 | |
| 	return b.opts.Name
 | |
| }
 | |
| 
 | |
| func (b *Broker) Live() bool {
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (b *Broker) Ready() bool {
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (b *Broker) Health() bool {
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (m *Subscriber) Options() broker.SubscribeOptions {
 | |
| 	return m.opts
 | |
| }
 | |
| 
 | |
| func (m *Subscriber) Topic() string {
 | |
| 	return m.topic
 | |
| }
 | |
| 
 | |
| func (m *Subscriber) Unsubscribe(ctx context.Context) error {
 | |
| 	m.exit <- true
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // NewBroker return new memory broker
 | |
| func NewBroker(opts ...broker.Option) broker.Broker {
 | |
| 	return &Broker{
 | |
| 		opts:        broker.NewOptions(opts...),
 | |
| 		subscribers: make(map[string][]*Subscriber),
 | |
| 	}
 | |
| }
 |