diff --git a/broker/broker.go b/broker/broker.go index 2cd0de34..c387962b 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -31,6 +31,7 @@ type Publication interface { Ack() error } +// Subscriber is a convenience return type for the Subscribe method type Subscriber interface { Options() SubscribeOptions Topic() string diff --git a/broker/codec/codec.go b/broker/codec/codec.go new file mode 100644 index 00000000..5ebd05f1 --- /dev/null +++ b/broker/codec/codec.go @@ -0,0 +1,10 @@ +package codec + +// Codec is used for encoding where the broker doesn't natively support +// headers in the message type. In this case the entire message is +// encoded as the payload +type Codec interface { + Marshal(interface{}) ([]byte, error) + Unmarshal([]byte, interface{}) error + String() string +} diff --git a/broker/codec/json/json.go b/broker/codec/json/json.go new file mode 100644 index 00000000..25d2ab20 --- /dev/null +++ b/broker/codec/json/json.go @@ -0,0 +1,25 @@ +package json + +import ( + "encoding/json" + + "github.com/micro/go-micro/broker/codec" +) + +type jsonCodec struct{} + +func (j jsonCodec) Marshal(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + +func (j jsonCodec) Unmarshal(d []byte, v interface{}) error { + return json.Unmarshal(d, v) +} + +func (j jsonCodec) String() string { + return "json" +} + +func NewCodec() codec.Codec { + return jsonCodec{} +} diff --git a/broker/codec/noop/noop.go b/broker/codec/noop/noop.go new file mode 100644 index 00000000..82633a24 --- /dev/null +++ b/broker/codec/noop/noop.go @@ -0,0 +1,35 @@ +package noop + +import ( + "errors" + + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/broker/codec" +) + +type noopCodec struct{} + +func (n noopCodec) Marshal(v interface{}) ([]byte, error) { + msg, ok := v.(*broker.Message) + if !ok { + return nil, errors.New("invalid message") + } + return msg.Body, nil +} + +func (n noopCodec) Unmarshal(d []byte, v interface{}) error { + msg, ok := v.(*broker.Message) + if !ok { + return errors.New("invalid message") + } + msg.Body = d + return nil +} + +func (n noopCodec) String() string { + return "noop" +} + +func NewCodec() codec.Codec { + return noopCodec{} +} diff --git a/broker/http_broker.go b/broker/http_broker.go index d135683c..782aff9e 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -3,7 +3,6 @@ package broker import ( "bytes" "crypto/tls" - "encoding/json" "fmt" "io" "io/ioutil" @@ -18,6 +17,7 @@ import ( "sync" "time" + "github.com/micro/go-micro/broker/codec/json" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/registry" mls "github.com/micro/misc/lib/tls" @@ -96,6 +96,7 @@ func newTransport(config *tls.Config) *http.Transport { func newHttpBroker(opts ...Option) Broker { options := Options{ + Codec: json.NewCodec(), Context: context.TODO(), } @@ -269,7 +270,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { } var m *Message - if err = json.Unmarshal(b, &m); err != nil { + if err = h.opts.Codec.Unmarshal(b, &m); err != nil { errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error parsing request body: %v", err)) w.WriteHeader(500) w.Write([]byte(errr.Error())) @@ -352,7 +353,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) m.Header[":topic"] = topic - b, err := json.Marshal(m) + b, err := h.opts.Codec.Marshal(m) if err != nil { return err } diff --git a/broker/options.go b/broker/options.go index 6256cac5..932c7006 100644 --- a/broker/options.go +++ b/broker/options.go @@ -3,6 +3,7 @@ package broker import ( "crypto/tls" + "github.com/micro/go-micro/broker/codec" "github.com/micro/go-micro/registry" "golang.org/x/net/context" ) @@ -10,8 +11,8 @@ import ( type Options struct { Addrs []string Secure bool + Codec codec.Codec TLSConfig *tls.Config - // Other options for implementations of the interface // can be stored in a context Context context.Context @@ -68,6 +69,14 @@ func Addrs(addrs ...string) Option { } } +// Codec sets the codec used for encoding/decoding used where +// a broker does not support headers +func Codec(c codec.Codec) Option { + return func(o *Options) { + o.Codec = c + } +} + // DisableAutoAck will disable auto acking of messages // after they have been handled. func DisableAutoAck() SubscribeOption { diff --git a/transport/codec/codec.go b/transport/codec/codec.go new file mode 100644 index 00000000..85e89320 --- /dev/null +++ b/transport/codec/codec.go @@ -0,0 +1,10 @@ +package codec + +// Codec is used for encoding where the transport doesn't natively support +// headers in the message type. In this case the entire message is +// encoded as the payload +type Codec interface { + Marshal(interface{}) ([]byte, error) + Unmarshal([]byte, interface{}) error + String() string +} diff --git a/transport/codec/json/json.go b/transport/codec/json/json.go new file mode 100644 index 00000000..0adaab04 --- /dev/null +++ b/transport/codec/json/json.go @@ -0,0 +1,25 @@ +package json + +import ( + "encoding/json" + + "github.com/micro/go-micro/transport/codec" +) + +type jsonCodec struct{} + +func (j jsonCodec) Marshal(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + +func (j jsonCodec) Unmarshal(d []byte, v interface{}) error { + return json.Unmarshal(d, v) +} + +func (j jsonCodec) String() string { + return "json" +} + +func NewCodec() codec.Codec { + return jsonCodec{} +} diff --git a/transport/codec/noop/noop.go b/transport/codec/noop/noop.go new file mode 100644 index 00000000..44ac91a1 --- /dev/null +++ b/transport/codec/noop/noop.go @@ -0,0 +1,35 @@ +package noop + +import ( + "errors" + + "github.com/micro/go-micro/transport" + "github.com/micro/go-micro/transport/codec" +) + +type noopCodec struct{} + +func (n noopCodec) Marshal(v interface{}) ([]byte, error) { + msg, ok := v.(*transport.Message) + if !ok { + return nil, errors.New("invalid message") + } + return msg.Body, nil +} + +func (n noopCodec) Unmarshal(d []byte, v interface{}) error { + msg, ok := v.(*transport.Message) + if !ok { + return errors.New("invalid message") + } + msg.Body = d + return nil +} + +func (n noopCodec) String() string { + return "noop" +} + +func NewCodec() codec.Codec { + return noopCodec{} +} diff --git a/transport/options.go b/transport/options.go index de5ee039..36b48225 100644 --- a/transport/options.go +++ b/transport/options.go @@ -4,11 +4,13 @@ import ( "crypto/tls" "time" + "github.com/micro/go-micro/transport/codec" "golang.org/x/net/context" ) type Options struct { Addrs []string + Codec codec.Codec Secure bool TLSConfig *tls.Config // Timeout sets the timeout for Send/Recv @@ -46,6 +48,14 @@ func Addrs(addrs ...string) Option { } } +// Codec sets the codec used for encoding where the transport +// does not support message headers +func Codec(c codec.Codec) Option { + return func(o *Options) { + o.Codec = c + } +} + // Timeout sets the timeout for Send/Recv execution func Timeout(t time.Duration) Option { return func(o *Options) {