Merge pull request #143 from micro/codec

Add configurable codecs for broker/transport
This commit is contained in:
Asim Aslam 2016-12-06 19:44:45 +00:00 committed by GitHub
commit 475e584e15
10 changed files with 165 additions and 4 deletions

View File

@ -31,6 +31,7 @@ type Publication interface {
Ack() error Ack() error
} }
// Subscriber is a convenience return type for the Subscribe method
type Subscriber interface { type Subscriber interface {
Options() SubscribeOptions Options() SubscribeOptions
Topic() string Topic() string

10
broker/codec/codec.go Normal file
View File

@ -0,0 +1,10 @@
package codec
// Codec is used for encoding where the broker doesn't natively support
// headers in the message type. In this case the entire message is
// encoded as the payload
type Codec interface {
Marshal(interface{}) ([]byte, error)
Unmarshal([]byte, interface{}) error
String() string
}

25
broker/codec/json/json.go Normal file
View File

@ -0,0 +1,25 @@
package json
import (
"encoding/json"
"github.com/micro/go-micro/broker/codec"
)
type jsonCodec struct{}
func (j jsonCodec) Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (j jsonCodec) Unmarshal(d []byte, v interface{}) error {
return json.Unmarshal(d, v)
}
func (j jsonCodec) String() string {
return "json"
}
func NewCodec() codec.Codec {
return jsonCodec{}
}

35
broker/codec/noop/noop.go Normal file
View File

@ -0,0 +1,35 @@
package noop
import (
"errors"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/broker/codec"
)
type noopCodec struct{}
func (n noopCodec) Marshal(v interface{}) ([]byte, error) {
msg, ok := v.(*broker.Message)
if !ok {
return nil, errors.New("invalid message")
}
return msg.Body, nil
}
func (n noopCodec) Unmarshal(d []byte, v interface{}) error {
msg, ok := v.(*broker.Message)
if !ok {
return errors.New("invalid message")
}
msg.Body = d
return nil
}
func (n noopCodec) String() string {
return "noop"
}
func NewCodec() codec.Codec {
return noopCodec{}
}

View File

@ -3,7 +3,6 @@ package broker
import ( import (
"bytes" "bytes"
"crypto/tls" "crypto/tls"
"encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -18,6 +17,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/micro/go-micro/broker/codec/json"
"github.com/micro/go-micro/errors" "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
mls "github.com/micro/misc/lib/tls" mls "github.com/micro/misc/lib/tls"
@ -96,6 +96,7 @@ func newTransport(config *tls.Config) *http.Transport {
func newHttpBroker(opts ...Option) Broker { func newHttpBroker(opts ...Option) Broker {
options := Options{ options := Options{
Codec: json.NewCodec(),
Context: context.TODO(), Context: context.TODO(),
} }
@ -269,7 +270,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
} }
var m *Message var m *Message
if err = json.Unmarshal(b, &m); err != nil { if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error parsing request body: %v", err)) errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error parsing request body: %v", err))
w.WriteHeader(500) w.WriteHeader(500)
w.Write([]byte(errr.Error())) w.Write([]byte(errr.Error()))
@ -352,7 +353,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
m.Header[":topic"] = topic m.Header[":topic"] = topic
b, err := json.Marshal(m) b, err := h.opts.Codec.Marshal(m)
if err != nil { if err != nil {
return err return err
} }

View File

@ -3,6 +3,7 @@ package broker
import ( import (
"crypto/tls" "crypto/tls"
"github.com/micro/go-micro/broker/codec"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -10,8 +11,8 @@ import (
type Options struct { type Options struct {
Addrs []string Addrs []string
Secure bool Secure bool
Codec codec.Codec
TLSConfig *tls.Config TLSConfig *tls.Config
// Other options for implementations of the interface // Other options for implementations of the interface
// can be stored in a context // can be stored in a context
Context context.Context Context context.Context
@ -68,6 +69,14 @@ func Addrs(addrs ...string) Option {
} }
} }
// Codec sets the codec used for encoding/decoding used where
// a broker does not support headers
func Codec(c codec.Codec) Option {
return func(o *Options) {
o.Codec = c
}
}
// DisableAutoAck will disable auto acking of messages // DisableAutoAck will disable auto acking of messages
// after they have been handled. // after they have been handled.
func DisableAutoAck() SubscribeOption { func DisableAutoAck() SubscribeOption {

10
transport/codec/codec.go Normal file
View File

@ -0,0 +1,10 @@
package codec
// Codec is used for encoding where the transport doesn't natively support
// headers in the message type. In this case the entire message is
// encoded as the payload
type Codec interface {
Marshal(interface{}) ([]byte, error)
Unmarshal([]byte, interface{}) error
String() string
}

View File

@ -0,0 +1,25 @@
package json
import (
"encoding/json"
"github.com/micro/go-micro/transport/codec"
)
type jsonCodec struct{}
func (j jsonCodec) Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (j jsonCodec) Unmarshal(d []byte, v interface{}) error {
return json.Unmarshal(d, v)
}
func (j jsonCodec) String() string {
return "json"
}
func NewCodec() codec.Codec {
return jsonCodec{}
}

View File

@ -0,0 +1,35 @@
package noop
import (
"errors"
"github.com/micro/go-micro/transport"
"github.com/micro/go-micro/transport/codec"
)
type noopCodec struct{}
func (n noopCodec) Marshal(v interface{}) ([]byte, error) {
msg, ok := v.(*transport.Message)
if !ok {
return nil, errors.New("invalid message")
}
return msg.Body, nil
}
func (n noopCodec) Unmarshal(d []byte, v interface{}) error {
msg, ok := v.(*transport.Message)
if !ok {
return errors.New("invalid message")
}
msg.Body = d
return nil
}
func (n noopCodec) String() string {
return "noop"
}
func NewCodec() codec.Codec {
return noopCodec{}
}

View File

@ -4,11 +4,13 @@ import (
"crypto/tls" "crypto/tls"
"time" "time"
"github.com/micro/go-micro/transport/codec"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
type Options struct { type Options struct {
Addrs []string Addrs []string
Codec codec.Codec
Secure bool Secure bool
TLSConfig *tls.Config TLSConfig *tls.Config
// Timeout sets the timeout for Send/Recv // Timeout sets the timeout for Send/Recv
@ -46,6 +48,14 @@ func Addrs(addrs ...string) Option {
} }
} }
// Codec sets the codec used for encoding where the transport
// does not support message headers
func Codec(c codec.Codec) Option {
return func(o *Options) {
o.Codec = c
}
}
// Timeout sets the timeout for Send/Recv execution // Timeout sets the timeout for Send/Recv execution
func Timeout(t time.Duration) Option { func Timeout(t time.Duration) Option {
return func(o *Options) { return func(o *Options) {