// Package events is for event streaming package events import ( "encoding/json" "errors" "time" ) var ( // ErrMissingTopic is returned if a blank topic was provided to publish ErrMissingTopic = errors.New("Missing topic") // ErrEncodingMessage is returned from publish if there was an error encoding the message option ErrEncodingMessage = errors.New("Error encoding message") ) // Stream is an events streaming interface type Stream interface { Publish(topic string, msg interface{}, opts ...PublishOption) error Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) String() string } type AckFunc func() error type NackFunc func() error // Event is the object returned by the broker when you subscribe to a topic type Event struct { // ID to uniquely identify the event ID string // Topic of event, e.g. "registry.service.created" Topic string // Timestamp of the event Timestamp time.Time // Metadata contains the values the event was indexed by Metadata map[string]string // Payload contains the encoded message Payload []byte ackFunc AckFunc nackFunc NackFunc } // Unmarshal the events message into an object func (e *Event) Unmarshal(v interface{}) error { return json.Unmarshal(e.Payload, v) } // Ack acknowledges successful processing of the event in ManualAck mode func (e *Event) Ack() error { return e.ackFunc() } func (e *Event) SetAckFunc(f AckFunc) { e.ackFunc = f } // Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode func (e *Event) Nack() error { return e.nackFunc() } func (e *Event) SetNackFunc(f NackFunc) { e.nackFunc = f }