diff --git a/client/codec.go b/client/codec.go index b9044fec..913f8747 100644 --- a/client/codec.go +++ b/client/codec.go @@ -3,6 +3,9 @@ package client import ( "io" "net/rpc" + "sync" + + "github.com/micro/go-micro/codec" "github.com/youtube/vitess/go/rpcplus" "github.com/youtube/vitess/go/rpcplus/jsonrpc" @@ -21,55 +24,78 @@ var ( } ) -type CodecFunc func(io.ReadWriteCloser) rpc.ClientCodec - // only for internal use type codecFunc func(io.ReadWriteCloser) rpcplus.ClientCodec // wraps an net/rpc ClientCodec to provide an rpcplus.ClientCodec // temporary until we strip out use of rpcplus type rpcCodecWrap struct { - r rpc.ClientCodec + sync.Mutex + c codec.Codec + rwc io.ReadWriteCloser } func (cw *rpcCodecWrap) WriteRequest(r *rpcplus.Request, b interface{}) error { - rc := &rpc.Request{ - ServiceMethod: r.ServiceMethod, - Seq: r.Seq, + cw.Lock() + defer cw.Unlock() + req := &rpc.Request{ServiceMethod: r.ServiceMethod, Seq: r.Seq} + data, err := cw.c.Marshal(req) + if err != nil { + return err } - err := cw.r.WriteRequest(rc, b) - r.ServiceMethod = rc.ServiceMethod - r.Seq = rc.Seq - return err + _, err = pbrpc.WriteNetString(cw.rwc, data) + if err != nil { + return err + } + data, err = cw.c.Marshal(b) + if err != nil { + return err + } + _, err = pbrpc.WriteNetString(cw.rwc, data) + if err != nil { + return err + } + return nil } func (cw *rpcCodecWrap) ReadResponseHeader(r *rpcplus.Response) error { - rc := &rpc.Response{ - ServiceMethod: r.ServiceMethod, - Seq: r.Seq, - Error: r.Error, + data, err := pbrpc.ReadNetString(cw.rwc) + if err != nil { + return err } - err := cw.r.ReadResponseHeader(rc) - r.ServiceMethod = rc.ServiceMethod - r.Seq = rc.Seq - r.Error = r.Error - return err + rtmp := new(rpc.Response) + err = cw.c.Unmarshal(data, rtmp) + if err != nil { + return err + } + r.ServiceMethod = rtmp.ServiceMethod + r.Seq = rtmp.Seq + r.Error = rtmp.Error + return nil } func (cw *rpcCodecWrap) ReadResponseBody(b interface{}) error { - return cw.r.ReadResponseBody(b) + data, err := pbrpc.ReadNetString(cw.rwc) + if err != nil { + return err + } + if b != nil { + return cw.c.Unmarshal(data, b) + } + return nil } func (cw *rpcCodecWrap) Close() error { - return cw.r.Close() + return cw.rwc.Close() } // wraps a CodecFunc to provide an internal codecFunc // temporary until we strip rpcplus out -func codecWrap(cf CodecFunc) codecFunc { +func codecWrap(c codec.Codec) codecFunc { return func(rwc io.ReadWriteCloser) rpcplus.ClientCodec { return &rpcCodecWrap{ - r: cf(rwc), + rwc: rwc, + c: c, } } } diff --git a/client/options.go b/client/options.go index 4d14a804..07db3d66 100644 --- a/client/options.go +++ b/client/options.go @@ -2,14 +2,15 @@ package client import ( "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" ) type options struct { contentType string - codecs map[string]CodecFunc broker broker.Broker + codecs map[string]codec.Codec registry registry.Registry transport transport.Transport wrappers []Wrapper @@ -23,9 +24,9 @@ func Broker(b broker.Broker) Option { } // Codec to be used to encode/decode requests for a given content type -func Codec(contentType string, cf CodecFunc) Option { +func Codec(contentType string, c codec.Codec) Option { return func(o *options) { - o.codecs[contentType] = cf + o.codecs[contentType] = c } } diff --git a/client/rpc_client.go b/client/rpc_client.go index b2184162..3bcb30ed 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" c "github.com/micro/go-micro/context" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/registry" @@ -26,7 +27,7 @@ func newRpcClient(opt ...Option) Client { var once sync.Once opts := options{ - codecs: make(map[string]CodecFunc), + codecs: make(map[string]codec.Codec), } for _, o := range opt { @@ -36,6 +37,7 @@ func newRpcClient(opt ...Option) Client { if len(opts.contentType) == 0 { opts.contentType = defaultContentType } + fmt.Println("content type", opts.contentType) if opts.transport == nil { opts.transport = transport.DefaultTransport @@ -61,8 +63,8 @@ func newRpcClient(opt ...Option) Client { } func (r *rpcClient) codecFunc(contentType string) (codecFunc, error) { - if cf, ok := r.opts.codecs[contentType]; ok { - return codecWrap(cf), nil + if c, ok := r.opts.codecs[contentType]; ok { + return codecWrap(c), nil } if cf, ok := defaultCodecs[contentType]; ok { return cf, nil diff --git a/codec/bson/bson.go b/codec/bson/bson.go new file mode 100644 index 00000000..6e71033a --- /dev/null +++ b/codec/bson/bson.go @@ -0,0 +1,23 @@ +package bson + +import ( + "labix.org/v2/mgo/bson" +) + +var ( + Codec = bsonCodec{} +) + +type bsonCodec struct {} + +func (bsonCodec) Marshal(v interface{}) ([]byte, error) { + return bson.Marshal(v) +} + +func (bsonCodec) Unmarshal(data []byte, v interface{}) error { + return bson.Unmarshal(data, v) +} + +func (bsonCodec) String() string { + return "bson" +} diff --git a/codec/codec.go b/codec/codec.go new file mode 100644 index 00000000..8d59163b --- /dev/null +++ b/codec/codec.go @@ -0,0 +1,12 @@ +package codec + +// Codec used to encode and decode request/responses +type Codec interface { + // Marshal returns the wire format of v. + Marshal(v interface{}) ([]byte, error) + // Unmarshal parses the wire format into v. + Unmarshal(data []byte, v interface{}) error + // String returns the name of the Codec implementation. The returned + // string will be used as part of content type in transmission. + String() string +} diff --git a/codec/pb/pb.go b/codec/pb/pb.go new file mode 100644 index 00000000..50c92b85 --- /dev/null +++ b/codec/pb/pb.go @@ -0,0 +1,24 @@ +package pb + +import ( + "github.com/golang/protobuf/proto" +) + +var ( + Codec = protoCodec{} +) + +type protoCodec struct {} + +func (protoCodec) Marshal(v interface{}) ([]byte, error) { + return proto.Marshal(v.(proto.Message)) +} + +func (protoCodec) Unmarshal(data []byte, v interface{}) error { + return proto.Unmarshal(data, v.(proto.Message)) +} + +func (protoCodec) String() string { + return "proto" +} + diff --git a/examples/client/main.go b/examples/client/main.go index d5602e5d..e0a1470b 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -6,6 +6,7 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/codec/pb" c "github.com/micro/go-micro/context" example "github.com/micro/go-micro/examples/server/proto/example" "golang.org/x/net/context" @@ -119,6 +120,12 @@ func stream() { func main() { cmd.Init() + client.DefaultClient = client.NewClient( + client.Codec("application/pb", pb.Codec), + client.ContentType("application/pb"), + ) + + fmt.Println("\n--- Call example ---\n") for i := 0; i < 10; i++ { call(i) diff --git a/examples/server/main.go b/examples/server/main.go index ef535233..43b3b67f 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -3,6 +3,7 @@ package main import ( log "github.com/golang/glog" "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/codec/bson" "github.com/micro/go-micro/examples/server/handler" "github.com/micro/go-micro/examples/server/subscriber" "github.com/micro/go-micro/server" @@ -12,11 +13,16 @@ func main() { // optionally setup command line usage cmd.Init() + server.DefaultServer = server.NewServer( + server.Codec("application/bson", bson.Codec), + ) + // Initialise Server server.Init( server.Name("go.micro.srv.example"), ) + // Register Handlers server.Handle( server.NewHandler( diff --git a/server/codec.go b/server/codec.go index b229d99a..1116495f 100644 --- a/server/codec.go +++ b/server/codec.go @@ -3,6 +3,9 @@ package server import ( "io" "net/rpc" + "sync" + + "github.com/micro/go-micro/codec" "github.com/youtube/vitess/go/rpcplus" "github.com/youtube/vitess/go/rpcplus/jsonrpc" @@ -19,56 +22,77 @@ var ( } ) -// CodecFunc is used to encode/decode requests/responses -type CodecFunc func(io.ReadWriteCloser) rpc.ServerCodec - // for internal use only type codecFunc func(io.ReadWriteCloser) rpcplus.ServerCodec // wraps an net/rpc ServerCodec to provide an rpcplus.ServerCodec // temporary until we strip out use of rpcplus type rpcCodecWrap struct { - r rpc.ServerCodec + sync.Mutex + rwc io.ReadWriteCloser + c codec.Codec } func (cw *rpcCodecWrap) ReadRequestHeader(r *rpcplus.Request) error { - rc := &rpc.Request{ - ServiceMethod: r.ServiceMethod, - Seq: r.Seq, + data, err := pbrpc.ReadNetString(cw.rwc) + if err != nil { + return err } - err := cw.r.ReadRequestHeader(rc) - r.ServiceMethod = rc.ServiceMethod - r.Seq = rc.Seq - return err + rtmp := new(rpc.Request) + err = cw.c.Unmarshal(data, rtmp) + if err != nil { + return err + } + r.ServiceMethod = rtmp.ServiceMethod + r.Seq = rtmp.Seq + return nil } func (cw *rpcCodecWrap) ReadRequestBody(b interface{}) error { - return cw.r.ReadRequestBody(b) + data, err := pbrpc.ReadNetString(cw.rwc) + if err != nil { + return err + } + if b != nil { + return cw.c.Unmarshal(data, b) + } + return nil } func (cw *rpcCodecWrap) WriteResponse(r *rpcplus.Response, b interface{}, l bool) error { - rc := &rpc.Response{ - ServiceMethod: r.ServiceMethod, - Seq: r.Seq, - Error: r.Error, + cw.Lock() + defer cw.Unlock() + rtmp := &rpc.Response{ServiceMethod: r.ServiceMethod, Seq: r.Seq, Error: r.Error} + data, err := cw.c.Marshal(rtmp) + if err != nil { + return err } - err := cw.r.WriteResponse(rc, b) - r.ServiceMethod = rc.ServiceMethod - r.Seq = rc.Seq - r.Error = r.Error - return err + _, err = pbrpc.WriteNetString(cw.rwc, data) + if err != nil { + return err + } + data, err = cw.c.Marshal(b) + if err != nil { + return err + } + _, err = pbrpc.WriteNetString(cw.rwc, data) + if err != nil { + return err + } + return nil } func (cw *rpcCodecWrap) Close() error { - return cw.r.Close() + return cw.rwc.Close() } // wraps a CodecFunc to provide an internal codecFunc // temporary until we strip rpcplus out -func codecWrap(cf CodecFunc) codecFunc { +func codecWrap(c codec.Codec) codecFunc { return func(rwc io.ReadWriteCloser) rpcplus.ServerCodec { return &rpcCodecWrap{ - r: cf(rwc), + rwc: rwc, + c: c, } } } diff --git a/server/options.go b/server/options.go index ce90c51a..8969cb2f 100644 --- a/server/options.go +++ b/server/options.go @@ -2,12 +2,13 @@ package server import ( "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" ) type options struct { - codecs map[string]CodecFunc + codecs map[string]codec.Codec broker broker.Broker registry registry.Registry transport transport.Transport @@ -21,7 +22,7 @@ type options struct { func newOptions(opt ...Option) options { opts := options{ - codecs: make(map[string]CodecFunc), + codecs: make(map[string]codec.Codec), } for _, o := range opt { @@ -126,9 +127,9 @@ func Broker(b broker.Broker) Option { } // Codec to use to encode/decode requests for a given content type -func Codec(contentType string, cf CodecFunc) Option { +func Codec(contentType string, c codec.Codec) Option { return func(o *options) { - o.codecs[contentType] = cf + o.codecs[contentType] = c } }