diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 228b07d8..00000000 --- a/Dockerfile +++ /dev/null @@ -1,13 +0,0 @@ -FROM golang:1.13-alpine - -RUN mkdir /user && \ - echo 'nobody:x:65534:65534:nobody:/:' > /user/passwd && \ - echo 'nobody:x:65534:' > /user/group - -ENV GO111MODULE=on -RUN apk --no-cache add make git gcc libtool musl-dev ca-certificates dumb-init && \ - rm -rf /var/cache/apk/* /tmp/* - -WORKDIR / -COPY ./go.mod ./go.sum ./ -RUN go mod download && rm go.mod go.sum diff --git a/_config.yml b/_config.yml deleted file mode 100644 index 3397c9a4..00000000 --- a/_config.yml +++ /dev/null @@ -1 +0,0 @@ -theme: jekyll-theme-architect \ No newline at end of file diff --git a/broker/broker.go b/broker/broker.go index b9026033..adf81d92 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -14,7 +14,15 @@ type Broker interface { } // Handler is used to process messages via a subscription of a topic. -type Handler func(*Message) error +type Handler func(Event) error + +// Event is given to a subscription handler for processing +type Event interface { + Topic() string + Message() *Message + Ack() error + Error() error +} // Message is used to transfer data type Message struct { diff --git a/broker/options.go b/broker/options.go index 5331e784..889b4df4 100644 --- a/broker/options.go +++ b/broker/options.go @@ -9,8 +9,6 @@ import ( ) type Options struct { - AutoAck bool - Addrs []string Secure bool Codec codec.Marshaler @@ -26,6 +24,12 @@ type Options struct { Context context.Context } +func NewOptions() Options { + return Options{ + Context: context.Background(), + } +} + type PublishOptions struct { // Other options for implementations of the interface // can be stored in a context @@ -63,7 +67,10 @@ func PublishContext(ctx context.Context) PublishOption { type SubscribeOption func(*SubscribeOptions) func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { - opt := SubscribeOptions{} + opt := SubscribeOptions{ + AutoAck: true, + Context: context.Background(), + } for _, o := range opts { o(&opt) @@ -87,6 +94,14 @@ func Codec(c codec.Marshaler) Option { } } +// SubscribeAutoAck will disable auto acking of messages +// after they have been handled. +func SubscribeAutoAck(b bool) SubscribeOption { + return func(o *SubscribeOptions) { + o.AutoAck = b + } +} + // ErrorHandler will catch all broker errors that cant be handled // in normal way, for example Codec errors func ErrorHandler(h Handler) Option { diff --git a/metrics/wrapper/metrics_wrapper.go b/metrics/wrapper/metrics_wrapper.go index 2d10fad2..be4a6922 100644 --- a/metrics/wrapper/metrics_wrapper.go +++ b/metrics/wrapper/metrics_wrapper.go @@ -38,9 +38,9 @@ func (w *Wrapper) HandlerFunc(handlerFunction server.HandlerFunc) server.Handler // Add a result tag: if err != nil { - tags["result"] = "failure" + tags["status"] = "failure" } else { - tags["result"] = "failure" + tags["status"] = "success" } // Instrument the result (if the DefaultClient has been configured): diff --git a/tunnel/broker/broker.go b/tunnel/broker/broker.go index a52dffdb..e09a22ef 100644 --- a/tunnel/broker/broker.go +++ b/tunnel/broker/broker.go @@ -25,6 +25,11 @@ type tunSubscriber struct { listener tunnel.Listener } +type tunEvent struct { + topic string + message *broker.Message +} + // used to access tunnel from options context type tunnelKey struct{} type tunnelAddr struct{} @@ -123,9 +128,12 @@ func (t *tunSubscriber) run() { c.Close() // handle the message - go t.handler(&broker.Message{ - Header: m.Header, - Body: m.Body, + go t.handler(&tunEvent{ + topic: t.topic, + message: &broker.Message{ + Header: m.Header, + Body: m.Body, + }, }) } } @@ -148,6 +156,22 @@ func (t *tunSubscriber) Unsubscribe() error { } } +func (t *tunEvent) Topic() string { + return t.topic +} + +func (t *tunEvent) Message() *broker.Message { + return t.message +} + +func (t *tunEvent) Ack() error { + return nil +} + +func (t *tunEvent) Error() error { + return nil +} + func NewBroker(opts ...broker.Option) (broker.Broker, error) { options := broker.Options{ Context: context.Background(),