From f49922f6b33378323ca0957c8336763baf60f4ec Mon Sep 17 00:00:00 2001 From: Asim Date: Fri, 27 Nov 2015 00:17:36 +0000 Subject: [PATCH 1/7] experimental codec branch --- client/codec.go | 72 ++++++++++++++++++++++++++++------------- client/options.go | 7 ++-- client/rpc_client.go | 8 +++-- codec/bson/bson.go | 23 +++++++++++++ codec/codec.go | 12 +++++++ codec/pb/pb.go | 24 ++++++++++++++ examples/client/main.go | 7 ++++ examples/server/main.go | 6 ++++ server/codec.go | 72 +++++++++++++++++++++++++++-------------- server/options.go | 9 +++--- 10 files changed, 183 insertions(+), 57 deletions(-) create mode 100644 codec/bson/bson.go create mode 100644 codec/codec.go create mode 100644 codec/pb/pb.go diff --git a/client/codec.go b/client/codec.go index b9044fec..913f8747 100644 --- a/client/codec.go +++ b/client/codec.go @@ -3,6 +3,9 @@ package client import ( "io" "net/rpc" + "sync" + + "github.com/micro/go-micro/codec" "github.com/youtube/vitess/go/rpcplus" "github.com/youtube/vitess/go/rpcplus/jsonrpc" @@ -21,55 +24,78 @@ var ( } ) -type CodecFunc func(io.ReadWriteCloser) rpc.ClientCodec - // only for internal use type codecFunc func(io.ReadWriteCloser) rpcplus.ClientCodec // wraps an net/rpc ClientCodec to provide an rpcplus.ClientCodec // temporary until we strip out use of rpcplus type rpcCodecWrap struct { - r rpc.ClientCodec + sync.Mutex + c codec.Codec + rwc io.ReadWriteCloser } func (cw *rpcCodecWrap) WriteRequest(r *rpcplus.Request, b interface{}) error { - rc := &rpc.Request{ - ServiceMethod: r.ServiceMethod, - Seq: r.Seq, + cw.Lock() + defer cw.Unlock() + req := &rpc.Request{ServiceMethod: r.ServiceMethod, Seq: r.Seq} + data, err := cw.c.Marshal(req) + if err != nil { + return err } - err := cw.r.WriteRequest(rc, b) - r.ServiceMethod = rc.ServiceMethod - r.Seq = rc.Seq - return err + _, err = pbrpc.WriteNetString(cw.rwc, data) + if err != nil { + return err + } + data, err = cw.c.Marshal(b) + if err != nil { + return err + } + _, err = pbrpc.WriteNetString(cw.rwc, data) + if err != nil { + return err + } + return nil } func (cw *rpcCodecWrap) ReadResponseHeader(r *rpcplus.Response) error { - rc := &rpc.Response{ - ServiceMethod: r.ServiceMethod, - Seq: r.Seq, - Error: r.Error, + data, err := pbrpc.ReadNetString(cw.rwc) + if err != nil { + return err } - err := cw.r.ReadResponseHeader(rc) - r.ServiceMethod = rc.ServiceMethod - r.Seq = rc.Seq - r.Error = r.Error - return err + rtmp := new(rpc.Response) + err = cw.c.Unmarshal(data, rtmp) + if err != nil { + return err + } + r.ServiceMethod = rtmp.ServiceMethod + r.Seq = rtmp.Seq + r.Error = rtmp.Error + return nil } func (cw *rpcCodecWrap) ReadResponseBody(b interface{}) error { - return cw.r.ReadResponseBody(b) + data, err := pbrpc.ReadNetString(cw.rwc) + if err != nil { + return err + } + if b != nil { + return cw.c.Unmarshal(data, b) + } + return nil } func (cw *rpcCodecWrap) Close() error { - return cw.r.Close() + return cw.rwc.Close() } // wraps a CodecFunc to provide an internal codecFunc // temporary until we strip rpcplus out -func codecWrap(cf CodecFunc) codecFunc { +func codecWrap(c codec.Codec) codecFunc { return func(rwc io.ReadWriteCloser) rpcplus.ClientCodec { return &rpcCodecWrap{ - r: cf(rwc), + rwc: rwc, + c: c, } } } diff --git a/client/options.go b/client/options.go index 4d14a804..07db3d66 100644 --- a/client/options.go +++ b/client/options.go @@ -2,14 +2,15 @@ package client import ( "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" ) type options struct { contentType string - codecs map[string]CodecFunc broker broker.Broker + codecs map[string]codec.Codec registry registry.Registry transport transport.Transport wrappers []Wrapper @@ -23,9 +24,9 @@ func Broker(b broker.Broker) Option { } // Codec to be used to encode/decode requests for a given content type -func Codec(contentType string, cf CodecFunc) Option { +func Codec(contentType string, c codec.Codec) Option { return func(o *options) { - o.codecs[contentType] = cf + o.codecs[contentType] = c } } diff --git a/client/rpc_client.go b/client/rpc_client.go index b2184162..3bcb30ed 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" c "github.com/micro/go-micro/context" "github.com/micro/go-micro/errors" "github.com/micro/go-micro/registry" @@ -26,7 +27,7 @@ func newRpcClient(opt ...Option) Client { var once sync.Once opts := options{ - codecs: make(map[string]CodecFunc), + codecs: make(map[string]codec.Codec), } for _, o := range opt { @@ -36,6 +37,7 @@ func newRpcClient(opt ...Option) Client { if len(opts.contentType) == 0 { opts.contentType = defaultContentType } + fmt.Println("content type", opts.contentType) if opts.transport == nil { opts.transport = transport.DefaultTransport @@ -61,8 +63,8 @@ func newRpcClient(opt ...Option) Client { } func (r *rpcClient) codecFunc(contentType string) (codecFunc, error) { - if cf, ok := r.opts.codecs[contentType]; ok { - return codecWrap(cf), nil + if c, ok := r.opts.codecs[contentType]; ok { + return codecWrap(c), nil } if cf, ok := defaultCodecs[contentType]; ok { return cf, nil diff --git a/codec/bson/bson.go b/codec/bson/bson.go new file mode 100644 index 00000000..6e71033a --- /dev/null +++ b/codec/bson/bson.go @@ -0,0 +1,23 @@ +package bson + +import ( + "labix.org/v2/mgo/bson" +) + +var ( + Codec = bsonCodec{} +) + +type bsonCodec struct {} + +func (bsonCodec) Marshal(v interface{}) ([]byte, error) { + return bson.Marshal(v) +} + +func (bsonCodec) Unmarshal(data []byte, v interface{}) error { + return bson.Unmarshal(data, v) +} + +func (bsonCodec) String() string { + return "bson" +} diff --git a/codec/codec.go b/codec/codec.go new file mode 100644 index 00000000..8d59163b --- /dev/null +++ b/codec/codec.go @@ -0,0 +1,12 @@ +package codec + +// Codec used to encode and decode request/responses +type Codec interface { + // Marshal returns the wire format of v. + Marshal(v interface{}) ([]byte, error) + // Unmarshal parses the wire format into v. + Unmarshal(data []byte, v interface{}) error + // String returns the name of the Codec implementation. The returned + // string will be used as part of content type in transmission. + String() string +} diff --git a/codec/pb/pb.go b/codec/pb/pb.go new file mode 100644 index 00000000..50c92b85 --- /dev/null +++ b/codec/pb/pb.go @@ -0,0 +1,24 @@ +package pb + +import ( + "github.com/golang/protobuf/proto" +) + +var ( + Codec = protoCodec{} +) + +type protoCodec struct {} + +func (protoCodec) Marshal(v interface{}) ([]byte, error) { + return proto.Marshal(v.(proto.Message)) +} + +func (protoCodec) Unmarshal(data []byte, v interface{}) error { + return proto.Unmarshal(data, v.(proto.Message)) +} + +func (protoCodec) String() string { + return "proto" +} + diff --git a/examples/client/main.go b/examples/client/main.go index d5602e5d..e0a1470b 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -6,6 +6,7 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/codec/pb" c "github.com/micro/go-micro/context" example "github.com/micro/go-micro/examples/server/proto/example" "golang.org/x/net/context" @@ -119,6 +120,12 @@ func stream() { func main() { cmd.Init() + client.DefaultClient = client.NewClient( + client.Codec("application/pb", pb.Codec), + client.ContentType("application/pb"), + ) + + fmt.Println("\n--- Call example ---\n") for i := 0; i < 10; i++ { call(i) diff --git a/examples/server/main.go b/examples/server/main.go index ef535233..43b3b67f 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -3,6 +3,7 @@ package main import ( log "github.com/golang/glog" "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/codec/bson" "github.com/micro/go-micro/examples/server/handler" "github.com/micro/go-micro/examples/server/subscriber" "github.com/micro/go-micro/server" @@ -12,11 +13,16 @@ func main() { // optionally setup command line usage cmd.Init() + server.DefaultServer = server.NewServer( + server.Codec("application/bson", bson.Codec), + ) + // Initialise Server server.Init( server.Name("go.micro.srv.example"), ) + // Register Handlers server.Handle( server.NewHandler( diff --git a/server/codec.go b/server/codec.go index b229d99a..1116495f 100644 --- a/server/codec.go +++ b/server/codec.go @@ -3,6 +3,9 @@ package server import ( "io" "net/rpc" + "sync" + + "github.com/micro/go-micro/codec" "github.com/youtube/vitess/go/rpcplus" "github.com/youtube/vitess/go/rpcplus/jsonrpc" @@ -19,56 +22,77 @@ var ( } ) -// CodecFunc is used to encode/decode requests/responses -type CodecFunc func(io.ReadWriteCloser) rpc.ServerCodec - // for internal use only type codecFunc func(io.ReadWriteCloser) rpcplus.ServerCodec // wraps an net/rpc ServerCodec to provide an rpcplus.ServerCodec // temporary until we strip out use of rpcplus type rpcCodecWrap struct { - r rpc.ServerCodec + sync.Mutex + rwc io.ReadWriteCloser + c codec.Codec } func (cw *rpcCodecWrap) ReadRequestHeader(r *rpcplus.Request) error { - rc := &rpc.Request{ - ServiceMethod: r.ServiceMethod, - Seq: r.Seq, + data, err := pbrpc.ReadNetString(cw.rwc) + if err != nil { + return err } - err := cw.r.ReadRequestHeader(rc) - r.ServiceMethod = rc.ServiceMethod - r.Seq = rc.Seq - return err + rtmp := new(rpc.Request) + err = cw.c.Unmarshal(data, rtmp) + if err != nil { + return err + } + r.ServiceMethod = rtmp.ServiceMethod + r.Seq = rtmp.Seq + return nil } func (cw *rpcCodecWrap) ReadRequestBody(b interface{}) error { - return cw.r.ReadRequestBody(b) + data, err := pbrpc.ReadNetString(cw.rwc) + if err != nil { + return err + } + if b != nil { + return cw.c.Unmarshal(data, b) + } + return nil } func (cw *rpcCodecWrap) WriteResponse(r *rpcplus.Response, b interface{}, l bool) error { - rc := &rpc.Response{ - ServiceMethod: r.ServiceMethod, - Seq: r.Seq, - Error: r.Error, + cw.Lock() + defer cw.Unlock() + rtmp := &rpc.Response{ServiceMethod: r.ServiceMethod, Seq: r.Seq, Error: r.Error} + data, err := cw.c.Marshal(rtmp) + if err != nil { + return err } - err := cw.r.WriteResponse(rc, b) - r.ServiceMethod = rc.ServiceMethod - r.Seq = rc.Seq - r.Error = r.Error - return err + _, err = pbrpc.WriteNetString(cw.rwc, data) + if err != nil { + return err + } + data, err = cw.c.Marshal(b) + if err != nil { + return err + } + _, err = pbrpc.WriteNetString(cw.rwc, data) + if err != nil { + return err + } + return nil } func (cw *rpcCodecWrap) Close() error { - return cw.r.Close() + return cw.rwc.Close() } // wraps a CodecFunc to provide an internal codecFunc // temporary until we strip rpcplus out -func codecWrap(cf CodecFunc) codecFunc { +func codecWrap(c codec.Codec) codecFunc { return func(rwc io.ReadWriteCloser) rpcplus.ServerCodec { return &rpcCodecWrap{ - r: cf(rwc), + rwc: rwc, + c: c, } } } diff --git a/server/options.go b/server/options.go index ce90c51a..8969cb2f 100644 --- a/server/options.go +++ b/server/options.go @@ -2,12 +2,13 @@ package server import ( "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" ) type options struct { - codecs map[string]CodecFunc + codecs map[string]codec.Codec broker broker.Broker registry registry.Registry transport transport.Transport @@ -21,7 +22,7 @@ type options struct { func newOptions(opt ...Option) options { opts := options{ - codecs: make(map[string]CodecFunc), + codecs: make(map[string]codec.Codec), } for _, o := range opt { @@ -126,9 +127,9 @@ func Broker(b broker.Broker) Option { } // Codec to use to encode/decode requests for a given content type -func Codec(contentType string, cf CodecFunc) Option { +func Codec(contentType string, c codec.Codec) Option { return func(o *options) { - o.codecs[contentType] = cf + o.codecs[contentType] = c } } From 654728027b0ba6d2d13b8008c6b07613d95221e1 Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 28 Nov 2015 11:22:29 +0000 Subject: [PATCH 2/7] Updated codec interface and code. Painful stuff --- client/buffer.go | 5 +- client/codec.go | 101 ------------------------ client/options.go | 4 +- client/rpc_client.go | 40 ++++------ client/rpc_codec.go | 39 ++++++++-- codec/bson/bson.go | 23 ------ codec/codec.go | 49 ++++++++++-- codec/pb/pb.go | 24 ------ codec/proto/netstring.go | 36 +++++++++ codec/proto/proto.go | 163 +++++++++++++++++++++++++++++++++++++++ examples/client/main.go | 58 +++++++------- examples/server/main.go | 8 +- server/buffer.go | 5 +- server/codec.go | 98 ----------------------- server/options.go | 6 +- server/rpc_codec.go | 37 +++++++-- server/rpc_server.go | 9 ++- server/subscriber.go | 27 ++++--- 18 files changed, 380 insertions(+), 352 deletions(-) delete mode 100644 client/codec.go delete mode 100644 codec/bson/bson.go delete mode 100644 codec/pb/pb.go create mode 100644 codec/proto/netstring.go create mode 100644 codec/proto/proto.go delete mode 100644 server/codec.go diff --git a/client/buffer.go b/client/buffer.go index 72c3c529..9eee2b3d 100644 --- a/client/buffer.go +++ b/client/buffer.go @@ -1,13 +1,14 @@ package client import ( - "io" + "bytes" ) type buffer struct { - io.ReadWriter + *bytes.Buffer } func (b *buffer) Close() error { + b.Buffer.Reset() return nil } diff --git a/client/codec.go b/client/codec.go deleted file mode 100644 index 913f8747..00000000 --- a/client/codec.go +++ /dev/null @@ -1,101 +0,0 @@ -package client - -import ( - "io" - "net/rpc" - "sync" - - "github.com/micro/go-micro/codec" - - "github.com/youtube/vitess/go/rpcplus" - "github.com/youtube/vitess/go/rpcplus/jsonrpc" - "github.com/youtube/vitess/go/rpcplus/pbrpc" -) - -var ( - defaultContentType = "application/octet-stream" - - defaultCodecs = map[string]codecFunc{ - "application/json": jsonrpc.NewClientCodec, - "application/json-rpc": jsonrpc.NewClientCodec, - "application/protobuf": pbrpc.NewClientCodec, - "application/proto-rpc": pbrpc.NewClientCodec, - "application/octet-stream": pbrpc.NewClientCodec, - } -) - -// only for internal use -type codecFunc func(io.ReadWriteCloser) rpcplus.ClientCodec - -// wraps an net/rpc ClientCodec to provide an rpcplus.ClientCodec -// temporary until we strip out use of rpcplus -type rpcCodecWrap struct { - sync.Mutex - c codec.Codec - rwc io.ReadWriteCloser -} - -func (cw *rpcCodecWrap) WriteRequest(r *rpcplus.Request, b interface{}) error { - cw.Lock() - defer cw.Unlock() - req := &rpc.Request{ServiceMethod: r.ServiceMethod, Seq: r.Seq} - data, err := cw.c.Marshal(req) - if err != nil { - return err - } - _, err = pbrpc.WriteNetString(cw.rwc, data) - if err != nil { - return err - } - data, err = cw.c.Marshal(b) - if err != nil { - return err - } - _, err = pbrpc.WriteNetString(cw.rwc, data) - if err != nil { - return err - } - return nil -} - -func (cw *rpcCodecWrap) ReadResponseHeader(r *rpcplus.Response) error { - data, err := pbrpc.ReadNetString(cw.rwc) - if err != nil { - return err - } - rtmp := new(rpc.Response) - err = cw.c.Unmarshal(data, rtmp) - if err != nil { - return err - } - r.ServiceMethod = rtmp.ServiceMethod - r.Seq = rtmp.Seq - r.Error = rtmp.Error - return nil -} - -func (cw *rpcCodecWrap) ReadResponseBody(b interface{}) error { - data, err := pbrpc.ReadNetString(cw.rwc) - if err != nil { - return err - } - if b != nil { - return cw.c.Unmarshal(data, b) - } - return nil -} - -func (cw *rpcCodecWrap) Close() error { - return cw.rwc.Close() -} - -// wraps a CodecFunc to provide an internal codecFunc -// temporary until we strip rpcplus out -func codecWrap(c codec.Codec) codecFunc { - return func(rwc io.ReadWriteCloser) rpcplus.ClientCodec { - return &rpcCodecWrap{ - rwc: rwc, - c: c, - } - } -} diff --git a/client/options.go b/client/options.go index 07db3d66..e8be919f 100644 --- a/client/options.go +++ b/client/options.go @@ -10,7 +10,7 @@ import ( type options struct { contentType string broker broker.Broker - codecs map[string]codec.Codec + codecs map[string]codec.NewCodec registry registry.Registry transport transport.Transport wrappers []Wrapper @@ -24,7 +24,7 @@ func Broker(b broker.Broker) Option { } // Codec to be used to encode/decode requests for a given content type -func Codec(contentType string, c codec.Codec) Option { +func Codec(contentType string, c codec.NewCodec) Option { return func(o *options) { o.codecs[contentType] = c } diff --git a/client/rpc_client.go b/client/rpc_client.go index 3bcb30ed..6fa72d8f 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -1,7 +1,7 @@ package client import ( - "encoding/json" + "bytes" "fmt" "sync" @@ -13,8 +13,6 @@ import ( "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" - - "github.com/golang/protobuf/proto" "golang.org/x/net/context" ) @@ -27,7 +25,7 @@ func newRpcClient(opt ...Option) Client { var once sync.Once opts := options{ - codecs: make(map[string]codec.Codec), + codecs: make(map[string]codec.NewCodec), } for _, o := range opt { @@ -37,7 +35,6 @@ func newRpcClient(opt ...Option) Client { if len(opts.contentType) == 0 { opts.contentType = defaultContentType } - fmt.Println("content type", opts.contentType) if opts.transport == nil { opts.transport = transport.DefaultTransport @@ -62,9 +59,9 @@ func newRpcClient(opt ...Option) Client { return c } -func (r *rpcClient) codecFunc(contentType string) (codecFunc, error) { +func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) { if c, ok := r.opts.codecs[contentType]; ok { - return codecWrap(c), nil + return c, nil } if cf, ok := defaultCodecs[contentType]; ok { return cf, nil @@ -86,7 +83,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r msg.Header["Content-Type"] = request.ContentType() - cf, err := r.codecFunc(request.ContentType()) + cf, err := r.newCodec(request.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } @@ -119,7 +116,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request, msg.Header["Content-Type"] = request.ContentType() - cf, err := r.codecFunc(request.ContentType()) + cf, err := r.newCodec(request.ContentType()) if err != nil { return nil, errors.InternalServerError("go.micro.client", err.Error()) } @@ -194,30 +191,21 @@ func (r *rpcClient) Publish(ctx context.Context, p Publication) error { md["Content-Type"] = p.ContentType() // encode message body - var body []byte - - switch p.ContentType() { - case "application/octet-stream": - b, err := proto.Marshal(p.Message().(proto.Message)) - if err != nil { - return err - } - body = b - case "application/json": - b, err := json.Marshal(p.Message()) - if err != nil { - return err - } - body = b + cf, err := r.newCodec(p.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + b := &buffer{bytes.NewBuffer(nil)} + if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) } - r.once.Do(func() { r.opts.broker.Connect() }) return r.opts.broker.Publish(p.Topic(), &broker.Message{ Header: md, - Body: body, + Body: b.Bytes(), }) } diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 95c0b332..d151844f 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -3,13 +3,15 @@ package client import ( "bytes" + "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" ) type rpcPlusCodec struct { client transport.Client - codec rpc.ClientCodec + codec codec.Codec req *transport.Message buf *readWriteCloser @@ -20,6 +22,18 @@ type readWriteCloser struct { rbuf *bytes.Buffer } +var ( + defaultContentType = "application/octet-stream" + + defaultCodecs = map[string]codec.NewCodec{ + // "application/json": jsonrpc.NewClientCodec, + // "application/json-rpc": jsonrpc.NewClientCodec, + "application/protobuf": proto.NewCodec, + "application/proto-rpc": proto.NewCodec, + "application/octet-stream": proto.NewCodec, + } +) + func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { return rwc.rbuf.Read(p) } @@ -34,7 +48,7 @@ func (rwc *readWriteCloser) Close() error { return nil } -func newRpcPlusCodec(req *transport.Message, client transport.Client, cf codecFunc) *rpcPlusCodec { +func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.NewCodec) *rpcPlusCodec { rwc := &readWriteCloser{ wbuf: bytes.NewBuffer(nil), rbuf: bytes.NewBuffer(nil), @@ -42,14 +56,19 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, cf codecFu r := &rpcPlusCodec{ buf: rwc, client: client, - codec: cf(rwc), + codec: c(rwc), req: req, } return r } func (c *rpcPlusCodec) WriteRequest(req *rpc.Request, body interface{}) error { - if err := c.codec.WriteRequest(req, body); err != nil { + m := &codec.Message{ + Id: req.Seq, + Method: req.ServiceMethod, + Type: codec.Request, + } + if err := c.codec.Write(m, body); err != nil { return err } c.req.Body = c.buf.wbuf.Bytes() @@ -63,14 +82,20 @@ func (c *rpcPlusCodec) ReadResponseHeader(r *rpc.Response) error { } c.buf.rbuf.Reset() c.buf.rbuf.Write(m.Body) - return c.codec.ReadResponseHeader(r) + var me codec.Message + err := c.codec.ReadHeader(&me, codec.Response) + r.ServiceMethod = me.Method + r.Seq = me.Id + r.Error = me.Error + return err } -func (c *rpcPlusCodec) ReadResponseBody(r interface{}) error { - return c.codec.ReadResponseBody(r) +func (c *rpcPlusCodec) ReadResponseBody(b interface{}) error { + return c.codec.ReadBody(b) } func (c *rpcPlusCodec) Close() error { c.buf.Close() + c.codec.Close() return c.client.Close() } diff --git a/codec/bson/bson.go b/codec/bson/bson.go deleted file mode 100644 index 6e71033a..00000000 --- a/codec/bson/bson.go +++ /dev/null @@ -1,23 +0,0 @@ -package bson - -import ( - "labix.org/v2/mgo/bson" -) - -var ( - Codec = bsonCodec{} -) - -type bsonCodec struct {} - -func (bsonCodec) Marshal(v interface{}) ([]byte, error) { - return bson.Marshal(v) -} - -func (bsonCodec) Unmarshal(data []byte, v interface{}) error { - return bson.Unmarshal(data, v) -} - -func (bsonCodec) String() string { - return "bson" -} diff --git a/codec/codec.go b/codec/codec.go index 8d59163b..bb67429f 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -1,12 +1,47 @@ package codec -// Codec used to encode and decode request/responses +import ( + "io" +) + +const ( + Error MessageType = iota + Request + Response + Publication +) + +type MessageType int + +// Takes in a connection/buffer and returns a new Codec +type NewCodec func(io.ReadWriteCloser) Codec + +// Codec encodes/decodes various types of +// messages used within go-micro type Codec interface { - // Marshal returns the wire format of v. - Marshal(v interface{}) ([]byte, error) - // Unmarshal parses the wire format into v. - Unmarshal(data []byte, v interface{}) error - // String returns the name of the Codec implementation. The returned - // string will be used as part of content type in transmission. + Encoder + Decoder + Close() error String() string } + +type Encoder interface { + Write(*Message, interface{}) error +} + +type Decoder interface { + ReadHeader(*Message, MessageType) error + ReadBody(interface{}) error +} + +// Message represents detailed information about +// the communication, likely followed by the body. +// In the case of an error, body may be nil. +type Message struct { + Id uint64 + Type MessageType + Target string + Method string + Error string + Headers map[string]string +} diff --git a/codec/pb/pb.go b/codec/pb/pb.go deleted file mode 100644 index 50c92b85..00000000 --- a/codec/pb/pb.go +++ /dev/null @@ -1,24 +0,0 @@ -package pb - -import ( - "github.com/golang/protobuf/proto" -) - -var ( - Codec = protoCodec{} -) - -type protoCodec struct {} - -func (protoCodec) Marshal(v interface{}) ([]byte, error) { - return proto.Marshal(v.(proto.Message)) -} - -func (protoCodec) Unmarshal(data []byte, v interface{}) error { - return proto.Unmarshal(data, v.(proto.Message)) -} - -func (protoCodec) String() string { - return "proto" -} - diff --git a/codec/proto/netstring.go b/codec/proto/netstring.go new file mode 100644 index 00000000..83540328 --- /dev/null +++ b/codec/proto/netstring.go @@ -0,0 +1,36 @@ +package proto + +import ( + "encoding/binary" + "io" +) + +// WriteNetString writes data to a big-endian netstring on a Writer. +// Size is always a 32-bit unsigned int. +func WriteNetString(w io.Writer, data []byte) (written int, err error) { + size := make([]byte, 4) + binary.BigEndian.PutUint32(size, uint32(len(data))) + if written, err = w.Write(size); err != nil { + return + } + return w.Write(data) +} + +// ReadNetString reads data from a big-endian netstring. +func ReadNetString(r io.Reader) (data []byte, err error) { + sizeBuf := make([]byte, 4) + _, err = r.Read(sizeBuf) + if err != nil { + return nil, err + } + size := binary.BigEndian.Uint32(sizeBuf) + if size == 0 { + return nil, nil + } + data = make([]byte, size) + _, err = r.Read(data) + if err != nil { + return nil, err + } + return +} diff --git a/codec/proto/proto.go b/codec/proto/proto.go new file mode 100644 index 00000000..f24126f2 --- /dev/null +++ b/codec/proto/proto.go @@ -0,0 +1,163 @@ +package proto + +import ( + "bytes" + "fmt" + "io" + "sync" + + "github.com/golang/protobuf/proto" + "github.com/micro/go-micro/codec" + rpc "github.com/youtube/vitess/go/rpcplus/pbrpc" +) + +type flusher interface { + Flush() error +} + +type protoCodec struct { + sync.Mutex + rwc io.ReadWriteCloser + mt codec.MessageType + buf *bytes.Buffer +} + +func (c *protoCodec) Close() error { + c.buf.Reset() + return c.rwc.Close() +} + +func (c *protoCodec) String() string { + return "proto" +} + +func (c *protoCodec) Write(m *codec.Message, b interface{}) error { + switch m.Type { + case codec.Request: + c.Lock() + defer c.Unlock() + // This is protobuf, of course we copy it. + pbr := &rpc.Request{ServiceMethod: &m.Method, Seq: &m.Id} + data, err := proto.Marshal(pbr) + if err != nil { + return err + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return err + } + // Of course this is a protobuf! Trust me or detonate the program. + data, err = proto.Marshal(b.(proto.Message)) + if err != nil { + return err + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return err + } + if flusher, ok := c.rwc.(flusher); ok { + err = flusher.Flush() + } + case codec.Response: + c.Lock() + defer c.Unlock() + rtmp := &rpc.Response{ServiceMethod: &m.Method, Seq: &m.Id, Error: &m.Error} + data, err := proto.Marshal(rtmp) + if err != nil { + return err + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return err + } + if pb, ok := b.(proto.Message); ok { + data, err = proto.Marshal(pb) + if err != nil { + return err + } + } else { + data = nil + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return err + } + if flusher, ok := c.rwc.(flusher); ok { + err = flusher.Flush() + } + case codec.Publication: + data, err := proto.Marshal(b.(proto.Message)) + if err != nil { + return err + } + c.rwc.Write(data) + default: + return fmt.Errorf("Unrecognised message type: %v", m.Type) + } + return nil +} + +func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { + c.buf.Reset() + c.mt = mt + + switch mt { + case codec.Request: + data, err := ReadNetString(c.rwc) + if err != nil { + return err + } + rtmp := new(rpc.Request) + err = proto.Unmarshal(data, rtmp) + if err != nil { + return err + } + m.Method = *rtmp.ServiceMethod + m.Id = *rtmp.Seq + case codec.Response: + data, err := ReadNetString(c.rwc) + if err != nil { + return err + } + rtmp := new(rpc.Response) + err = proto.Unmarshal(data, rtmp) + if err != nil { + return err + } + m.Method = *rtmp.ServiceMethod + m.Id = *rtmp.Seq + m.Error = *rtmp.Error + case codec.Publication: + io.Copy(c.buf, c.rwc) + default: + return fmt.Errorf("Unrecognised message type: %v", mt) + } + return nil +} + +func (c *protoCodec) ReadBody(b interface{}) error { + var data []byte + switch c.mt { + case codec.Request, codec.Response: + var err error + data, err = ReadNetString(c.rwc) + if err != nil { + return err + } + case codec.Publication: + data = c.buf.Bytes() + default: + return fmt.Errorf("Unrecognised message type: %v", c.mt) + } + if b != nil { + return proto.Unmarshal(data, b.(proto.Message)) + } + return nil +} + +func NewCodec(rwc io.ReadWriteCloser) codec.Codec { + return &protoCodec{ + buf: bytes.NewBuffer(nil), + rwc: rwc, + } +} diff --git a/examples/client/main.go b/examples/client/main.go index e0a1470b..8b0a7910 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -6,7 +6,6 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/cmd" - "github.com/micro/go-micro/codec/pb" c "github.com/micro/go-micro/context" example "github.com/micro/go-micro/examples/server/proto/example" "golang.org/x/net/context" @@ -120,35 +119,36 @@ func stream() { func main() { cmd.Init() - client.DefaultClient = client.NewClient( - client.Codec("application/pb", pb.Codec), - client.ContentType("application/pb"), - ) + // client.DefaultClient = client.NewClient( + // client.Codec("application/pb", pb.Codec), + // client.ContentType("application/pb"), + // ) + for { + fmt.Println("\n--- Call example ---\n") + for i := 0; i < 10; i++ { + call(i) + } + fmt.Println("\n--- Streamer example ---\n") + stream() - fmt.Println("\n--- Call example ---\n") - for i := 0; i < 10; i++ { - call(i) + fmt.Println("\n--- Publisher example ---\n") + pub() + + fmt.Println("\n--- Wrapper example ---\n") + + // Wrap the default client + client.DefaultClient = logWrap(client.DefaultClient) + + call(0) + + // Wrap using client.Wrap option + client.DefaultClient = client.NewClient( + client.Wrap(traceWrap), + client.Wrap(logWrap), + ) + + call(1) + time.Sleep(time.Millisecond * 100) } - - fmt.Println("\n--- Streamer example ---\n") - stream() - - fmt.Println("\n--- Publisher example ---\n") - pub() - - fmt.Println("\n--- Wrapper example ---\n") - - // Wrap the default client - client.DefaultClient = logWrap(client.DefaultClient) - - call(0) - - // Wrap using client.Wrap option - client.DefaultClient = client.NewClient( - client.Wrap(traceWrap), - client.Wrap(logWrap), - ) - - call(1) } diff --git a/examples/server/main.go b/examples/server/main.go index 43b3b67f..846b0241 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -3,7 +3,6 @@ package main import ( log "github.com/golang/glog" "github.com/micro/go-micro/cmd" - "github.com/micro/go-micro/codec/bson" "github.com/micro/go-micro/examples/server/handler" "github.com/micro/go-micro/examples/server/subscriber" "github.com/micro/go-micro/server" @@ -13,16 +12,15 @@ func main() { // optionally setup command line usage cmd.Init() - server.DefaultServer = server.NewServer( - server.Codec("application/bson", bson.Codec), - ) + // server.DefaultServer = server.NewServer( + // server.Codec("application/bson", bson.Codec), + // ) // Initialise Server server.Init( server.Name("go.micro.srv.example"), ) - // Register Handlers server.Handle( server.NewHandler( diff --git a/server/buffer.go b/server/buffer.go index e833f1a6..4df03c27 100644 --- a/server/buffer.go +++ b/server/buffer.go @@ -1,13 +1,14 @@ package server import ( - "io" + "bytes" ) type buffer struct { - io.ReadWriter + *bytes.Buffer } func (b *buffer) Close() error { + b.Buffer.Reset() return nil } diff --git a/server/codec.go b/server/codec.go deleted file mode 100644 index 1116495f..00000000 --- a/server/codec.go +++ /dev/null @@ -1,98 +0,0 @@ -package server - -import ( - "io" - "net/rpc" - "sync" - - "github.com/micro/go-micro/codec" - - "github.com/youtube/vitess/go/rpcplus" - "github.com/youtube/vitess/go/rpcplus/jsonrpc" - "github.com/youtube/vitess/go/rpcplus/pbrpc" -) - -var ( - defaultCodecs = map[string]codecFunc{ - "application/json": jsonrpc.NewServerCodec, - "application/json-rpc": jsonrpc.NewServerCodec, - "application/protobuf": pbrpc.NewServerCodec, - "application/proto-rpc": pbrpc.NewServerCodec, - "application/octet-stream": pbrpc.NewServerCodec, - } -) - -// for internal use only -type codecFunc func(io.ReadWriteCloser) rpcplus.ServerCodec - -// wraps an net/rpc ServerCodec to provide an rpcplus.ServerCodec -// temporary until we strip out use of rpcplus -type rpcCodecWrap struct { - sync.Mutex - rwc io.ReadWriteCloser - c codec.Codec -} - -func (cw *rpcCodecWrap) ReadRequestHeader(r *rpcplus.Request) error { - data, err := pbrpc.ReadNetString(cw.rwc) - if err != nil { - return err - } - rtmp := new(rpc.Request) - err = cw.c.Unmarshal(data, rtmp) - if err != nil { - return err - } - r.ServiceMethod = rtmp.ServiceMethod - r.Seq = rtmp.Seq - return nil -} - -func (cw *rpcCodecWrap) ReadRequestBody(b interface{}) error { - data, err := pbrpc.ReadNetString(cw.rwc) - if err != nil { - return err - } - if b != nil { - return cw.c.Unmarshal(data, b) - } - return nil -} - -func (cw *rpcCodecWrap) WriteResponse(r *rpcplus.Response, b interface{}, l bool) error { - cw.Lock() - defer cw.Unlock() - rtmp := &rpc.Response{ServiceMethod: r.ServiceMethod, Seq: r.Seq, Error: r.Error} - data, err := cw.c.Marshal(rtmp) - if err != nil { - return err - } - _, err = pbrpc.WriteNetString(cw.rwc, data) - if err != nil { - return err - } - data, err = cw.c.Marshal(b) - if err != nil { - return err - } - _, err = pbrpc.WriteNetString(cw.rwc, data) - if err != nil { - return err - } - return nil -} - -func (cw *rpcCodecWrap) Close() error { - return cw.rwc.Close() -} - -// wraps a CodecFunc to provide an internal codecFunc -// temporary until we strip rpcplus out -func codecWrap(c codec.Codec) codecFunc { - return func(rwc io.ReadWriteCloser) rpcplus.ServerCodec { - return &rpcCodecWrap{ - rwc: rwc, - c: c, - } - } -} diff --git a/server/options.go b/server/options.go index 8969cb2f..ca17fb3c 100644 --- a/server/options.go +++ b/server/options.go @@ -8,7 +8,7 @@ import ( ) type options struct { - codecs map[string]codec.Codec + codecs map[string]codec.NewCodec broker broker.Broker registry registry.Registry transport transport.Transport @@ -22,7 +22,7 @@ type options struct { func newOptions(opt ...Option) options { opts := options{ - codecs: make(map[string]codec.Codec), + codecs: make(map[string]codec.NewCodec), } for _, o := range opt { @@ -127,7 +127,7 @@ func Broker(b broker.Broker) Option { } // Codec to use to encode/decode requests for a given content type -func Codec(contentType string, c codec.Codec) Option { +func Codec(contentType string, c codec.NewCodec) Option { return func(o *options) { o.codecs[contentType] = c } diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 31c1a476..7ec7cdef 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -3,13 +3,15 @@ package server import ( "bytes" + "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" ) type rpcPlusCodec struct { socket transport.Socket - codec rpc.ServerCodec + codec codec.Codec req *transport.Message buf *readWriteCloser @@ -20,6 +22,16 @@ type readWriteCloser struct { rbuf *bytes.Buffer } +var ( + defaultCodecs = map[string]codec.NewCodec{ + // "application/json": jsonrpc.NewServerCodec, + // "application/json-rpc": jsonrpc.NewServerCodec, + "application/protobuf": proto.NewCodec, + "application/proto-rpc": proto.NewCodec, + "application/octet-stream": proto.NewCodec, + } +) + func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { return rwc.rbuf.Read(p) } @@ -34,14 +46,14 @@ func (rwc *readWriteCloser) Close() error { return nil } -func newRpcPlusCodec(req *transport.Message, socket transport.Socket, cf codecFunc) rpc.ServerCodec { +func newRpcPlusCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) rpc.ServerCodec { rwc := &readWriteCloser{ rbuf: bytes.NewBuffer(req.Body), wbuf: bytes.NewBuffer(nil), } r := &rpcPlusCodec{ buf: rwc, - codec: cf(rwc), + codec: c(rwc), req: req, socket: socket, } @@ -49,16 +61,26 @@ func newRpcPlusCodec(req *transport.Message, socket transport.Socket, cf codecFu } func (c *rpcPlusCodec) ReadRequestHeader(r *rpc.Request) error { - return c.codec.ReadRequestHeader(r) + var m codec.Message + err := c.codec.ReadHeader(&m, codec.Request) + r.ServiceMethod = m.Method + r.Seq = m.Id + return err } -func (c *rpcPlusCodec) ReadRequestBody(r interface{}) error { - return c.codec.ReadRequestBody(r) +func (c *rpcPlusCodec) ReadRequestBody(b interface{}) error { + return c.codec.ReadBody(b) } func (c *rpcPlusCodec) WriteResponse(r *rpc.Response, body interface{}, last bool) error { c.buf.wbuf.Reset() - if err := c.codec.WriteResponse(r, body, last); err != nil { + m := &codec.Message{ + Method: r.ServiceMethod, + Id: r.Seq, + Error: r.Error, + Type: codec.Response, + } + if err := c.codec.Write(m, body); err != nil { return err } return c.socket.Send(&transport.Message{ @@ -69,5 +91,6 @@ func (c *rpcPlusCodec) WriteResponse(r *rpc.Response, body interface{}, last boo func (c *rpcPlusCodec) Close() error { c.buf.Close() + c.codec.Close() return c.socket.Close() } diff --git a/server/rpc_server.go b/server/rpc_server.go index 98755db7..8e55fe15 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" c "github.com/micro/go-micro/context" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" @@ -43,7 +44,7 @@ func (s *rpcServer) accept(sock transport.Socket) { return } - cf, err := s.codecFunc(msg.Header["Content-Type"]) + cf, err := s.newCodec(msg.Header["Content-Type"]) // TODO: needs better error handling if err != nil { sock.Send(&transport.Message{ @@ -73,9 +74,9 @@ func (s *rpcServer) accept(sock transport.Socket) { } } -func (s *rpcServer) codecFunc(contentType string) (codecFunc, error) { +func (s *rpcServer) newCodec(contentType string) (codec.NewCodec, error) { if cf, ok := s.opts.codecs[contentType]; ok { - return codecWrap(cf), nil + return cf, nil } if cf, ok := defaultCodecs[contentType]; ok { return cf, nil @@ -200,7 +201,7 @@ func (s *rpcServer) Register() error { defer s.Unlock() for sb, _ := range s.subscribers { - handler := createSubHandler(sb) + handler := s.createSubHandler(sb) sub, err := config.broker.Subscribe(sb.Topic(), handler) if err != nil { return err diff --git a/server/subscriber.go b/server/subscriber.go index 9395bc44..ea4d82f0 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -1,11 +1,11 @@ package server import ( - "encoding/json" + "bytes" "reflect" - "github.com/golang/protobuf/proto" "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" c "github.com/micro/go-micro/context" "github.com/micro/go-micro/registry" "golang.org/x/net/context" @@ -94,8 +94,19 @@ func newSubscriber(topic string, sub interface{}) Subscriber { } } -func createSubHandler(sb *subscriber) broker.Handler { +func (s *rpcServer) createSubHandler(sb *subscriber) broker.Handler { return func(msg *broker.Message) { + cf, err := s.newCodec(msg.Header["Content-Type"]) + if err != nil { + return + } + + b := &buffer{bytes.NewBuffer(msg.Body)} + co := cf(b) + if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { + return + } + hdr := make(map[string]string) for k, v := range msg.Header { hdr[k] = v @@ -107,7 +118,6 @@ func createSubHandler(sb *subscriber) broker.Handler { for _, handler := range sb.handlers { var isVal bool var req reflect.Value - var uerr error if handler.reqType.Kind() == reflect.Ptr { req = reflect.New(handler.reqType.Elem()) @@ -116,14 +126,7 @@ func createSubHandler(sb *subscriber) broker.Handler { isVal = true } - switch msg.Header["Content-Type"] { - case "application/octet-stream": - uerr = proto.Unmarshal(msg.Body, req.Interface().(proto.Message)) - case "application/json": - uerr = json.Unmarshal(msg.Body, req.Interface()) - } - - if uerr != nil { + if err := co.ReadBody(req.Interface()); err != nil { continue } From 5488904404395b9f1df482205dd7462e883b72d9 Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 28 Nov 2015 16:34:27 +0000 Subject: [PATCH 3/7] Fix a data race issue with the buffer --- transport/http_transport.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/transport/http_transport.go b/transport/http_transport.go index 429e7627..91527e57 100644 --- a/transport/http_transport.go +++ b/transport/http_transport.go @@ -22,10 +22,12 @@ type httpTransportClient struct { ht *httpTransport addr string conn net.Conn - buff *bufio.Reader dialOpts dialOptions r chan *http.Request once sync.Once + + sync.Mutex + buff *bufio.Reader } type httpTransportSocket struct { @@ -81,6 +83,12 @@ func (h *httpTransportClient) Recv(m *Message) error { r = rc } + h.Lock() + defer h.Unlock() + if h.buff == nil { + return io.EOF + } + rsp, err := http.ReadResponse(h.buff, r) if err != nil { return err @@ -110,11 +118,15 @@ func (h *httpTransportClient) Recv(m *Message) error { } func (h *httpTransportClient) Close() error { - h.buff.Reset(nil) + err := h.conn.Close() h.once.Do(func() { + h.Lock() + h.buff.Reset(nil) + h.buff = nil + h.Unlock() close(h.r) }) - return h.conn.Close() + return err } func (h *httpTransportSocket) Recv(m *Message) error { From c68984eccc50399a4e38f8008399939daa542d86 Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 28 Nov 2015 16:34:48 +0000 Subject: [PATCH 4/7] log any stream close error --- examples/client/main.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/examples/client/main.go b/examples/client/main.go index 8b0a7910..b59b8f7c 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -113,7 +113,9 @@ func stream() { return } - stream.Close() + if err := stream.Close(); err != nil { + fmt.Println("stream close err:", err) + } } func main() { From 609a59d3ab7bac45eeed0b2dfd30434872c89215 Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 28 Nov 2015 16:39:25 +0000 Subject: [PATCH 5/7] add local envelope --- codec/proto/envelope.pb.go | 83 ++++++++++++++++++++++++++++++++++++++ codec/proto/envelope.proto | 12 ++++++ codec/proto/proto.go | 9 ++--- 3 files changed, 99 insertions(+), 5 deletions(-) create mode 100644 codec/proto/envelope.pb.go create mode 100644 codec/proto/envelope.proto diff --git a/codec/proto/envelope.pb.go b/codec/proto/envelope.pb.go new file mode 100644 index 00000000..426ed6a1 --- /dev/null +++ b/codec/proto/envelope.pb.go @@ -0,0 +1,83 @@ +// Code generated by protoc-gen-go. +// source: envelope.proto +// DO NOT EDIT! + +/* +Package proto is a generated protocol buffer package. + +It is generated from these files: + envelope.proto + +It has these top-level messages: + Request + Response +*/ +package proto + +import proto "github.com/golang/protobuf/proto" +import json "encoding/json" +import math "math" + +// Reference proto, json, and math imports to suppress error if they are not otherwise used. +var _ = proto.Marshal +var _ = &json.SyntaxError{} +var _ = math.Inf + +type Request struct { + ServiceMethod *string `protobuf:"bytes,1,opt,name=service_method" json:"service_method,omitempty"` + Seq *uint64 `protobuf:"fixed64,2,opt,name=seq" json:"seq,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} + +func (m *Request) GetServiceMethod() string { + if m != nil && m.ServiceMethod != nil { + return *m.ServiceMethod + } + return "" +} + +func (m *Request) GetSeq() uint64 { + if m != nil && m.Seq != nil { + return *m.Seq + } + return 0 +} + +type Response struct { + ServiceMethod *string `protobuf:"bytes,1,opt,name=service_method" json:"service_method,omitempty"` + Seq *uint64 `protobuf:"fixed64,2,opt,name=seq" json:"seq,omitempty"` + Error *string `protobuf:"bytes,3,opt,name=error" json:"error,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} + +func (m *Response) GetServiceMethod() string { + if m != nil && m.ServiceMethod != nil { + return *m.ServiceMethod + } + return "" +} + +func (m *Response) GetSeq() uint64 { + if m != nil && m.Seq != nil { + return *m.Seq + } + return 0 +} + +func (m *Response) GetError() string { + if m != nil && m.Error != nil { + return *m.Error + } + return "" +} + +func init() { +} diff --git a/codec/proto/envelope.proto b/codec/proto/envelope.proto new file mode 100644 index 00000000..630714ca --- /dev/null +++ b/codec/proto/envelope.proto @@ -0,0 +1,12 @@ +package proto; + +message Request { + optional string service_method = 1; + optional fixed64 seq = 2; +} + +message Response { + optional string service_method = 1; + optional fixed64 seq = 2; + optional string error = 3; +} diff --git a/codec/proto/proto.go b/codec/proto/proto.go index f24126f2..4cd933c2 100644 --- a/codec/proto/proto.go +++ b/codec/proto/proto.go @@ -8,7 +8,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/micro/go-micro/codec" - rpc "github.com/youtube/vitess/go/rpcplus/pbrpc" ) type flusher interface { @@ -37,7 +36,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { c.Lock() defer c.Unlock() // This is protobuf, of course we copy it. - pbr := &rpc.Request{ServiceMethod: &m.Method, Seq: &m.Id} + pbr := &Request{ServiceMethod: &m.Method, Seq: &m.Id} data, err := proto.Marshal(pbr) if err != nil { return err @@ -61,7 +60,7 @@ func (c *protoCodec) Write(m *codec.Message, b interface{}) error { case codec.Response: c.Lock() defer c.Unlock() - rtmp := &rpc.Response{ServiceMethod: &m.Method, Seq: &m.Id, Error: &m.Error} + rtmp := &Response{ServiceMethod: &m.Method, Seq: &m.Id, Error: &m.Error} data, err := proto.Marshal(rtmp) if err != nil { return err @@ -107,7 +106,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { if err != nil { return err } - rtmp := new(rpc.Request) + rtmp := new(Request) err = proto.Unmarshal(data, rtmp) if err != nil { return err @@ -119,7 +118,7 @@ func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { if err != nil { return err } - rtmp := new(rpc.Response) + rtmp := new(Response) err = proto.Unmarshal(data, rtmp) if err != nil { return err From 80f53ab17600e3efde94dc1a79961a1ff4e9199b Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 28 Nov 2015 18:40:32 +0000 Subject: [PATCH 6/7] Add json codec --- client/rpc_codec.go | 5 +- codec/json/client.go | 97 +++++++++++++++++++++++++++++++++++++ codec/json/json.go | 88 ++++++++++++++++++++++++++++++++++ codec/json/server.go | 111 +++++++++++++++++++++++++++++++++++++++++++ server/rpc_codec.go | 5 +- 5 files changed, 302 insertions(+), 4 deletions(-) create mode 100644 codec/json/client.go create mode 100644 codec/json/json.go create mode 100644 codec/json/server.go diff --git a/client/rpc_codec.go b/client/rpc_codec.go index d151844f..c75f0029 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/json" "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" @@ -26,8 +27,8 @@ var ( defaultContentType = "application/octet-stream" defaultCodecs = map[string]codec.NewCodec{ - // "application/json": jsonrpc.NewClientCodec, - // "application/json-rpc": jsonrpc.NewClientCodec, + "application/json": json.NewCodec, + "application/json-rpc": json.NewCodec, "application/protobuf": proto.NewCodec, "application/proto-rpc": proto.NewCodec, "application/octet-stream": proto.NewCodec, diff --git a/codec/json/client.go b/codec/json/client.go new file mode 100644 index 00000000..7a26bd9f --- /dev/null +++ b/codec/json/client.go @@ -0,0 +1,97 @@ +package json + +import ( + "encoding/json" + "fmt" + "io" + "sync" + + "github.com/micro/go-micro/codec" +) + +type clientCodec struct { + dec *json.Decoder // for reading JSON values + enc *json.Encoder // for writing JSON values + c io.Closer + + // temporary work space + req clientRequest + resp clientResponse + + sync.Mutex + pending map[uint64]string +} + +type clientRequest struct { + Method string `json:"method"` + Params [1]interface{} `json:"params"` + ID uint64 `json:"id"` +} + +type clientResponse struct { + ID uint64 `json:"id"` + Result *json.RawMessage `json:"result"` + Error interface{} `json:"error"` +} + +func newClientCodec(conn io.ReadWriteCloser) *clientCodec { + return &clientCodec{ + dec: json.NewDecoder(conn), + enc: json.NewEncoder(conn), + c: conn, + pending: make(map[uint64]string), + } +} + +func (c *clientCodec) Write(m *codec.Message, b interface{}) error { + c.Lock() + c.pending[m.Id] = m.Method + c.Unlock() + c.req.Method = m.Method + c.req.Params[0] = b + c.req.ID = m.Id + return c.enc.Encode(&c.req) +} + +func (r *clientResponse) reset() { + r.ID = 0 + r.Result = nil + r.Error = nil +} + +func (c *clientCodec) ReadHeader(m *codec.Message) error { + c.resp.reset() + if err := c.dec.Decode(&c.resp); err != nil { + return err + } + + c.Lock() + m.Method = c.pending[c.resp.ID] + delete(c.pending, c.resp.ID) + c.Unlock() + + m.Error = "" + m.Id = c.resp.ID + if c.resp.Error != nil { + x, ok := c.resp.Error.(string) + if !ok { + return fmt.Errorf("invalid error %v", c.resp.Error) + } + if x == "" { + x = "unspecified error" + } + m.Error = x + } + return nil +} + +func (c *clientCodec) ReadBody(x interface{}) error { + if x == nil { + return nil + } + return json.Unmarshal(*c.resp.Result, x) +} + +func (c *clientCodec) Close() error { + return c.c.Close() +} diff --git a/codec/json/json.go b/codec/json/json.go new file mode 100644 index 00000000..d2909b04 --- /dev/null +++ b/codec/json/json.go @@ -0,0 +1,88 @@ +package json + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + + "github.com/micro/go-micro/codec" +) + +type jsonCodec struct { + buf *bytes.Buffer + mt codec.MessageType + rwc io.ReadWriteCloser + c *clientCodec + s *serverCodec +} + +func (j *jsonCodec) Close() error { + j.buf.Reset() + return j.rwc.Close() +} + +func (j *jsonCodec) String() string { + return "json" +} + +func (j *jsonCodec) Write(m *codec.Message, b interface{}) error { + switch m.Type { + case codec.Request: + return j.c.Write(m, b) + case codec.Response: + return j.s.Write(m, b) + case codec.Publication: + data, err := json.Marshal(b) + if err != nil { + return err + } + _, err = j.rwc.Write(data) + return err + default: + return fmt.Errorf("Unrecognised message type: %v", m.Type) + } + return nil +} + +func (j *jsonCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { + j.buf.Reset() + j.mt = mt + + switch mt { + case codec.Request: + return j.s.ReadHeader(m) + case codec.Response: + return j.c.ReadHeader(m) + case codec.Publication: + io.Copy(j.buf, j.rwc) + default: + return fmt.Errorf("Unrecognised message type: %v", mt) + } + return nil +} + +func (j *jsonCodec) ReadBody(b interface{}) error { + switch j.mt { + case codec.Request: + return j.s.ReadBody(b) + case codec.Response: + return j.c.ReadBody(b) + case codec.Publication: + if b != nil { + return json.Unmarshal(j.buf.Bytes(), b) + } + default: + return fmt.Errorf("Unrecognised message type: %v", j.mt) + } + return nil +} + +func NewCodec(rwc io.ReadWriteCloser) codec.Codec { + return &jsonCodec{ + buf: bytes.NewBuffer(nil), + rwc: rwc, + c: newClientCodec(rwc), + s: newServerCodec(rwc), + } +} diff --git a/codec/json/server.go b/codec/json/server.go new file mode 100644 index 00000000..ee035fcb --- /dev/null +++ b/codec/json/server.go @@ -0,0 +1,111 @@ +package json + +import ( + "encoding/json" + "errors" + "io" + "sync" + + "github.com/micro/go-micro/codec" +) + +type serverCodec struct { + dec *json.Decoder // for reading JSON values + enc *json.Encoder // for writing JSON values + c io.Closer + + // temporary work space + req serverRequest + resp serverResponse + + sync.Mutex + seq uint64 + pending map[uint64]*json.RawMessage +} + +type serverRequest struct { + Method string `json:"method"` + Params *json.RawMessage `json:"params"` + ID *json.RawMessage `json:"id"` +} + +type serverResponse struct { + ID *json.RawMessage `json:"id"` + Result interface{} `json:"result"` + Error interface{} `json:"error"` +} + +func newServerCodec(conn io.ReadWriteCloser) *serverCodec { + return &serverCodec{ + dec: json.NewDecoder(conn), + enc: json.NewEncoder(conn), + c: conn, + pending: make(map[uint64]*json.RawMessage), + } +} + +func (r *serverRequest) reset() { + r.Method = "" + if r.Params != nil { + *r.Params = (*r.Params)[0:0] + } + if r.ID != nil { + *r.ID = (*r.ID)[0:0] + } +} + +func (c *serverCodec) ReadHeader(m *codec.Message) error { + c.req.reset() + if err := c.dec.Decode(&c.req); err != nil { + return err + } + m.Method = c.req.Method + + c.Lock() + c.seq++ + c.pending[c.seq] = c.req.ID + c.req.ID = nil + m.Id = c.seq + c.Unlock() + + return nil +} + +func (c *serverCodec) ReadBody(x interface{}) error { + if x == nil { + return nil + } + var params [1]interface{} + params[0] = x + return json.Unmarshal(*c.req.Params, ¶ms) +} + +var null = json.RawMessage([]byte("null")) + +func (c *serverCodec) Write(m *codec.Message, x interface{}) error { + var resp serverResponse + c.Lock() + b, ok := c.pending[m.Id] + if !ok { + c.Unlock() + return errors.New("invalid sequence number in response") + } + c.Unlock() + + if b == nil { + // Invalid request so no id. Use JSON null. + b = &null + } + resp.ID = b + resp.Result = x + if m.Error == "" { + resp.Error = nil + } else { + resp.Error = m.Error + } + return c.enc.Encode(resp) +} + +func (c *serverCodec) Close() error { + return c.c.Close() +} diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 7ec7cdef..9d567210 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/json" "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" @@ -24,8 +25,8 @@ type readWriteCloser struct { var ( defaultCodecs = map[string]codec.NewCodec{ - // "application/json": jsonrpc.NewServerCodec, - // "application/json-rpc": jsonrpc.NewServerCodec, + "application/json": json.NewCodec, + "application/json-rpc": json.NewCodec, "application/protobuf": proto.NewCodec, "application/proto-rpc": proto.NewCodec, "application/octet-stream": proto.NewCodec, From dddcdc34edcba50827240b8e0db8a906a8b75541 Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 28 Nov 2015 18:54:38 +0000 Subject: [PATCH 7/7] rename to protorpc and jsonrpc --- client/rpc_codec.go | 14 +++++++------- codec/{json => jsonrpc}/client.go | 2 +- codec/{json => jsonrpc}/json.go | 4 ++-- codec/{json => jsonrpc}/server.go | 2 +- codec/{proto => protorpc}/envelope.pb.go | 2 +- codec/{proto => protorpc}/envelope.proto | 2 +- codec/{proto => protorpc}/netstring.go | 2 +- codec/{proto => protorpc}/proto.go | 4 ++-- server/rpc_codec.go | 14 +++++++------- 9 files changed, 23 insertions(+), 23 deletions(-) rename codec/{json => jsonrpc}/client.go (99%) rename codec/{json => jsonrpc}/json.go (97%) rename codec/{json => jsonrpc}/server.go (99%) rename codec/{proto => protorpc}/envelope.pb.go (99%) rename codec/{proto => protorpc}/envelope.proto (91%) rename codec/{proto => protorpc}/netstring.go (97%) rename codec/{proto => protorpc}/proto.go (98%) diff --git a/client/rpc_codec.go b/client/rpc_codec.go index c75f0029..867ba100 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -4,8 +4,8 @@ import ( "bytes" "github.com/micro/go-micro/codec" - "github.com/micro/go-micro/codec/json" - "github.com/micro/go-micro/codec/proto" + "github.com/micro/go-micro/codec/jsonrpc" + "github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" ) @@ -27,11 +27,11 @@ var ( defaultContentType = "application/octet-stream" defaultCodecs = map[string]codec.NewCodec{ - "application/json": json.NewCodec, - "application/json-rpc": json.NewCodec, - "application/protobuf": proto.NewCodec, - "application/proto-rpc": proto.NewCodec, - "application/octet-stream": proto.NewCodec, + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, + "application/protobuf": protorpc.NewCodec, + "application/proto-rpc": protorpc.NewCodec, + "application/octet-stream": protorpc.NewCodec, } ) diff --git a/codec/json/client.go b/codec/jsonrpc/client.go similarity index 99% rename from codec/json/client.go rename to codec/jsonrpc/client.go index 7a26bd9f..7b079e05 100644 --- a/codec/json/client.go +++ b/codec/jsonrpc/client.go @@ -1,4 +1,4 @@ -package json +package jsonrpc import ( "encoding/json" diff --git a/codec/json/json.go b/codec/jsonrpc/json.go similarity index 97% rename from codec/json/json.go rename to codec/jsonrpc/json.go index d2909b04..b843dfd4 100644 --- a/codec/json/json.go +++ b/codec/jsonrpc/json.go @@ -1,4 +1,4 @@ -package json +package jsonrpc import ( "bytes" @@ -23,7 +23,7 @@ func (j *jsonCodec) Close() error { } func (j *jsonCodec) String() string { - return "json" + return "json-rpc" } func (j *jsonCodec) Write(m *codec.Message, b interface{}) error { diff --git a/codec/json/server.go b/codec/jsonrpc/server.go similarity index 99% rename from codec/json/server.go rename to codec/jsonrpc/server.go index ee035fcb..bf78ca43 100644 --- a/codec/json/server.go +++ b/codec/jsonrpc/server.go @@ -1,4 +1,4 @@ -package json +package jsonrpc import ( "encoding/json" diff --git a/codec/proto/envelope.pb.go b/codec/protorpc/envelope.pb.go similarity index 99% rename from codec/proto/envelope.pb.go rename to codec/protorpc/envelope.pb.go index 426ed6a1..9af273e0 100644 --- a/codec/proto/envelope.pb.go +++ b/codec/protorpc/envelope.pb.go @@ -12,7 +12,7 @@ It has these top-level messages: Request Response */ -package proto +package protorpc import proto "github.com/golang/protobuf/proto" import json "encoding/json" diff --git a/codec/proto/envelope.proto b/codec/protorpc/envelope.proto similarity index 91% rename from codec/proto/envelope.proto rename to codec/protorpc/envelope.proto index 630714ca..9c4fd19d 100644 --- a/codec/proto/envelope.proto +++ b/codec/protorpc/envelope.proto @@ -1,4 +1,4 @@ -package proto; +package protorpc; message Request { optional string service_method = 1; diff --git a/codec/proto/netstring.go b/codec/protorpc/netstring.go similarity index 97% rename from codec/proto/netstring.go rename to codec/protorpc/netstring.go index 83540328..8204a0e2 100644 --- a/codec/proto/netstring.go +++ b/codec/protorpc/netstring.go @@ -1,4 +1,4 @@ -package proto +package protorpc import ( "encoding/binary" diff --git a/codec/proto/proto.go b/codec/protorpc/proto.go similarity index 98% rename from codec/proto/proto.go rename to codec/protorpc/proto.go index 4cd933c2..51f9e06c 100644 --- a/codec/proto/proto.go +++ b/codec/protorpc/proto.go @@ -1,4 +1,4 @@ -package proto +package protorpc import ( "bytes" @@ -27,7 +27,7 @@ func (c *protoCodec) Close() error { } func (c *protoCodec) String() string { - return "proto" + return "proto-rpc" } func (c *protoCodec) Write(m *codec.Message, b interface{}) error { diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 9d567210..44cd45b5 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -4,8 +4,8 @@ import ( "bytes" "github.com/micro/go-micro/codec" - "github.com/micro/go-micro/codec/json" - "github.com/micro/go-micro/codec/proto" + "github.com/micro/go-micro/codec/jsonrpc" + "github.com/micro/go-micro/codec/protorpc" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" ) @@ -25,11 +25,11 @@ type readWriteCloser struct { var ( defaultCodecs = map[string]codec.NewCodec{ - "application/json": json.NewCodec, - "application/json-rpc": json.NewCodec, - "application/protobuf": proto.NewCodec, - "application/proto-rpc": proto.NewCodec, - "application/octet-stream": proto.NewCodec, + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, + "application/protobuf": protorpc.NewCodec, + "application/proto-rpc": protorpc.NewCodec, + "application/octet-stream": protorpc.NewCodec, } )