resurrect broker event (#26)

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Василий Толстов 2020-08-27 11:18:02 +03:00 committed by GitHub
parent 36c53b4917
commit b4ccde2228
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 56 additions and 23 deletions

View File

@ -1,13 +0,0 @@
FROM golang:1.13-alpine
RUN mkdir /user && \
echo 'nobody:x:65534:65534:nobody:/:' > /user/passwd && \
echo 'nobody:x:65534:' > /user/group
ENV GO111MODULE=on
RUN apk --no-cache add make git gcc libtool musl-dev ca-certificates dumb-init && \
rm -rf /var/cache/apk/* /tmp/*
WORKDIR /
COPY ./go.mod ./go.sum ./
RUN go mod download && rm go.mod go.sum

View File

@ -1 +0,0 @@
theme: jekyll-theme-architect

View File

@ -14,7 +14,15 @@ type Broker interface {
} }
// Handler is used to process messages via a subscription of a topic. // Handler is used to process messages via a subscription of a topic.
type Handler func(*Message) error type Handler func(Event) error
// Event is given to a subscription handler for processing
type Event interface {
Topic() string
Message() *Message
Ack() error
Error() error
}
// Message is used to transfer data // Message is used to transfer data
type Message struct { type Message struct {

View File

@ -9,8 +9,6 @@ import (
) )
type Options struct { type Options struct {
AutoAck bool
Addrs []string Addrs []string
Secure bool Secure bool
Codec codec.Marshaler Codec codec.Marshaler
@ -26,6 +24,12 @@ type Options struct {
Context context.Context Context context.Context
} }
func NewOptions() Options {
return Options{
Context: context.Background(),
}
}
type PublishOptions struct { type PublishOptions struct {
// Other options for implementations of the interface // Other options for implementations of the interface
// can be stored in a context // can be stored in a context
@ -63,7 +67,10 @@ func PublishContext(ctx context.Context) PublishOption {
type SubscribeOption func(*SubscribeOptions) type SubscribeOption func(*SubscribeOptions)
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions {
opt := SubscribeOptions{} opt := SubscribeOptions{
AutoAck: true,
Context: context.Background(),
}
for _, o := range opts { for _, o := range opts {
o(&opt) o(&opt)
@ -87,6 +94,14 @@ func Codec(c codec.Marshaler) Option {
} }
} }
// SubscribeAutoAck will disable auto acking of messages
// after they have been handled.
func SubscribeAutoAck(b bool) SubscribeOption {
return func(o *SubscribeOptions) {
o.AutoAck = b
}
}
// ErrorHandler will catch all broker errors that cant be handled // ErrorHandler will catch all broker errors that cant be handled
// in normal way, for example Codec errors // in normal way, for example Codec errors
func ErrorHandler(h Handler) Option { func ErrorHandler(h Handler) Option {

View File

@ -38,9 +38,9 @@ func (w *Wrapper) HandlerFunc(handlerFunction server.HandlerFunc) server.Handler
// Add a result tag: // Add a result tag:
if err != nil { if err != nil {
tags["result"] = "failure" tags["status"] = "failure"
} else { } else {
tags["result"] = "failure" tags["status"] = "success"
} }
// Instrument the result (if the DefaultClient has been configured): // Instrument the result (if the DefaultClient has been configured):

View File

@ -25,6 +25,11 @@ type tunSubscriber struct {
listener tunnel.Listener listener tunnel.Listener
} }
type tunEvent struct {
topic string
message *broker.Message
}
// used to access tunnel from options context // used to access tunnel from options context
type tunnelKey struct{} type tunnelKey struct{}
type tunnelAddr struct{} type tunnelAddr struct{}
@ -123,9 +128,12 @@ func (t *tunSubscriber) run() {
c.Close() c.Close()
// handle the message // handle the message
go t.handler(&broker.Message{ go t.handler(&tunEvent{
topic: t.topic,
message: &broker.Message{
Header: m.Header, Header: m.Header,
Body: m.Body, Body: m.Body,
},
}) })
} }
} }
@ -148,6 +156,22 @@ func (t *tunSubscriber) Unsubscribe() error {
} }
} }
func (t *tunEvent) Topic() string {
return t.topic
}
func (t *tunEvent) Message() *broker.Message {
return t.message
}
func (t *tunEvent) Ack() error {
return nil
}
func (t *tunEvent) Error() error {
return nil
}
func NewBroker(opts ...broker.Option) (broker.Broker, error) { func NewBroker(opts ...broker.Option) (broker.Broker, error) {
options := broker.Options{ options := broker.Options{
Context: context.Background(), Context: context.Background(),