diff --git a/client/buffer.go b/client/buffer.go index 72c3c529..9eee2b3d 100644 --- a/client/buffer.go +++ b/client/buffer.go @@ -1,13 +1,14 @@ package client import ( - "io" + "bytes" ) type buffer struct { - io.ReadWriter + *bytes.Buffer } func (b *buffer) Close() error { + b.Buffer.Reset() return nil } diff --git a/client/codec.go b/client/codec.go deleted file mode 100644 index 913f8747..00000000 --- a/client/codec.go +++ /dev/null @@ -1,101 +0,0 @@ -package client - -import ( - "io" - "net/rpc" - "sync" - - "github.com/micro/go-micro/codec" - - "github.com/youtube/vitess/go/rpcplus" - "github.com/youtube/vitess/go/rpcplus/jsonrpc" - "github.com/youtube/vitess/go/rpcplus/pbrpc" -) - -var ( - defaultContentType = "application/octet-stream" - - defaultCodecs = map[string]codecFunc{ - "application/json": jsonrpc.NewClientCodec, - "application/json-rpc": jsonrpc.NewClientCodec, - "application/protobuf": pbrpc.NewClientCodec, - "application/proto-rpc": pbrpc.NewClientCodec, - "application/octet-stream": pbrpc.NewClientCodec, - } -) - -// only for internal use -type codecFunc func(io.ReadWriteCloser) rpcplus.ClientCodec - -// wraps an net/rpc ClientCodec to provide an rpcplus.ClientCodec -// temporary until we strip out use of rpcplus -type rpcCodecWrap struct { - sync.Mutex - c codec.Codec - rwc io.ReadWriteCloser -} - -func (cw *rpcCodecWrap) WriteRequest(r *rpcplus.Request, b interface{}) error { - cw.Lock() - defer cw.Unlock() - req := &rpc.Request{ServiceMethod: r.ServiceMethod, Seq: r.Seq} - data, err := cw.c.Marshal(req) - if err != nil { - return err - } - _, err = pbrpc.WriteNetString(cw.rwc, data) - if err != nil { - return err - } - data, err = cw.c.Marshal(b) - if err != nil { - return err - } - _, err = pbrpc.WriteNetString(cw.rwc, data) - if err != nil { - return err - } - return nil -} - -func (cw *rpcCodecWrap) ReadResponseHeader(r *rpcplus.Response) error { - data, err := pbrpc.ReadNetString(cw.rwc) - if err != nil { - return err - } - rtmp := new(rpc.Response) - err = cw.c.Unmarshal(data, rtmp) - if err != nil { - return err - } - r.ServiceMethod = rtmp.ServiceMethod - r.Seq = rtmp.Seq - r.Error = rtmp.Error - return nil -} - -func (cw *rpcCodecWrap) ReadResponseBody(b interface{}) error { - data, err := pbrpc.ReadNetString(cw.rwc) - if err != nil { - return err - } - if b != nil { - return cw.c.Unmarshal(data, b) - } - return nil -} - -func (cw *rpcCodecWrap) Close() error { - return cw.rwc.Close() -} - -// wraps a CodecFunc to provide an internal codecFunc -// temporary until we strip rpcplus out -func codecWrap(c codec.Codec) codecFunc { - return func(rwc io.ReadWriteCloser) rpcplus.ClientCodec { - return &rpcCodecWrap{ - rwc: rwc, - c: c, - } - } -} diff --git a/client/options.go b/client/options.go index 07db3d66..e8be919f 100644 --- a/client/options.go +++ b/client/options.go @@ -10,7 +10,7 @@ import ( type options struct { contentType string broker broker.Broker - codecs map[string]codec.Codec + codecs map[string]codec.NewCodec registry registry.Registry transport transport.Transport wrappers []Wrapper @@ -24,7 +24,7 @@ func Broker(b broker.Broker) Option { } // Codec to be used to encode/decode requests for a given content type -func Codec(contentType string, c codec.Codec) Option { +func Codec(contentType string, c codec.NewCodec) Option { return func(o *options) { o.codecs[contentType] = c } diff --git a/client/rpc_client.go b/client/rpc_client.go index 3bcb30ed..6fa72d8f 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -1,7 +1,7 @@ package client import ( - "encoding/json" + "bytes" "fmt" "sync" @@ -13,8 +13,6 @@ import ( "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" - - "github.com/golang/protobuf/proto" "golang.org/x/net/context" ) @@ -27,7 +25,7 @@ func newRpcClient(opt ...Option) Client { var once sync.Once opts := options{ - codecs: make(map[string]codec.Codec), + codecs: make(map[string]codec.NewCodec), } for _, o := range opt { @@ -37,7 +35,6 @@ func newRpcClient(opt ...Option) Client { if len(opts.contentType) == 0 { opts.contentType = defaultContentType } - fmt.Println("content type", opts.contentType) if opts.transport == nil { opts.transport = transport.DefaultTransport @@ -62,9 +59,9 @@ func newRpcClient(opt ...Option) Client { return c } -func (r *rpcClient) codecFunc(contentType string) (codecFunc, error) { +func (r *rpcClient) newCodec(contentType string) (codec.NewCodec, error) { if c, ok := r.opts.codecs[contentType]; ok { - return codecWrap(c), nil + return c, nil } if cf, ok := defaultCodecs[contentType]; ok { return cf, nil @@ -86,7 +83,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r msg.Header["Content-Type"] = request.ContentType() - cf, err := r.codecFunc(request.ContentType()) + cf, err := r.newCodec(request.ContentType()) if err != nil { return errors.InternalServerError("go.micro.client", err.Error()) } @@ -119,7 +116,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request, msg.Header["Content-Type"] = request.ContentType() - cf, err := r.codecFunc(request.ContentType()) + cf, err := r.newCodec(request.ContentType()) if err != nil { return nil, errors.InternalServerError("go.micro.client", err.Error()) } @@ -194,30 +191,21 @@ func (r *rpcClient) Publish(ctx context.Context, p Publication) error { md["Content-Type"] = p.ContentType() // encode message body - var body []byte - - switch p.ContentType() { - case "application/octet-stream": - b, err := proto.Marshal(p.Message().(proto.Message)) - if err != nil { - return err - } - body = b - case "application/json": - b, err := json.Marshal(p.Message()) - if err != nil { - return err - } - body = b + cf, err := r.newCodec(p.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + b := &buffer{bytes.NewBuffer(nil)} + if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) } - r.once.Do(func() { r.opts.broker.Connect() }) return r.opts.broker.Publish(p.Topic(), &broker.Message{ Header: md, - Body: body, + Body: b.Bytes(), }) } diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 95c0b332..d151844f 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -3,13 +3,15 @@ package client import ( "bytes" + "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" ) type rpcPlusCodec struct { client transport.Client - codec rpc.ClientCodec + codec codec.Codec req *transport.Message buf *readWriteCloser @@ -20,6 +22,18 @@ type readWriteCloser struct { rbuf *bytes.Buffer } +var ( + defaultContentType = "application/octet-stream" + + defaultCodecs = map[string]codec.NewCodec{ + // "application/json": jsonrpc.NewClientCodec, + // "application/json-rpc": jsonrpc.NewClientCodec, + "application/protobuf": proto.NewCodec, + "application/proto-rpc": proto.NewCodec, + "application/octet-stream": proto.NewCodec, + } +) + func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { return rwc.rbuf.Read(p) } @@ -34,7 +48,7 @@ func (rwc *readWriteCloser) Close() error { return nil } -func newRpcPlusCodec(req *transport.Message, client transport.Client, cf codecFunc) *rpcPlusCodec { +func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.NewCodec) *rpcPlusCodec { rwc := &readWriteCloser{ wbuf: bytes.NewBuffer(nil), rbuf: bytes.NewBuffer(nil), @@ -42,14 +56,19 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, cf codecFu r := &rpcPlusCodec{ buf: rwc, client: client, - codec: cf(rwc), + codec: c(rwc), req: req, } return r } func (c *rpcPlusCodec) WriteRequest(req *rpc.Request, body interface{}) error { - if err := c.codec.WriteRequest(req, body); err != nil { + m := &codec.Message{ + Id: req.Seq, + Method: req.ServiceMethod, + Type: codec.Request, + } + if err := c.codec.Write(m, body); err != nil { return err } c.req.Body = c.buf.wbuf.Bytes() @@ -63,14 +82,20 @@ func (c *rpcPlusCodec) ReadResponseHeader(r *rpc.Response) error { } c.buf.rbuf.Reset() c.buf.rbuf.Write(m.Body) - return c.codec.ReadResponseHeader(r) + var me codec.Message + err := c.codec.ReadHeader(&me, codec.Response) + r.ServiceMethod = me.Method + r.Seq = me.Id + r.Error = me.Error + return err } -func (c *rpcPlusCodec) ReadResponseBody(r interface{}) error { - return c.codec.ReadResponseBody(r) +func (c *rpcPlusCodec) ReadResponseBody(b interface{}) error { + return c.codec.ReadBody(b) } func (c *rpcPlusCodec) Close() error { c.buf.Close() + c.codec.Close() return c.client.Close() } diff --git a/codec/bson/bson.go b/codec/bson/bson.go deleted file mode 100644 index 6e71033a..00000000 --- a/codec/bson/bson.go +++ /dev/null @@ -1,23 +0,0 @@ -package bson - -import ( - "labix.org/v2/mgo/bson" -) - -var ( - Codec = bsonCodec{} -) - -type bsonCodec struct {} - -func (bsonCodec) Marshal(v interface{}) ([]byte, error) { - return bson.Marshal(v) -} - -func (bsonCodec) Unmarshal(data []byte, v interface{}) error { - return bson.Unmarshal(data, v) -} - -func (bsonCodec) String() string { - return "bson" -} diff --git a/codec/codec.go b/codec/codec.go index 8d59163b..bb67429f 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -1,12 +1,47 @@ package codec -// Codec used to encode and decode request/responses +import ( + "io" +) + +const ( + Error MessageType = iota + Request + Response + Publication +) + +type MessageType int + +// Takes in a connection/buffer and returns a new Codec +type NewCodec func(io.ReadWriteCloser) Codec + +// Codec encodes/decodes various types of +// messages used within go-micro type Codec interface { - // Marshal returns the wire format of v. - Marshal(v interface{}) ([]byte, error) - // Unmarshal parses the wire format into v. - Unmarshal(data []byte, v interface{}) error - // String returns the name of the Codec implementation. The returned - // string will be used as part of content type in transmission. + Encoder + Decoder + Close() error String() string } + +type Encoder interface { + Write(*Message, interface{}) error +} + +type Decoder interface { + ReadHeader(*Message, MessageType) error + ReadBody(interface{}) error +} + +// Message represents detailed information about +// the communication, likely followed by the body. +// In the case of an error, body may be nil. +type Message struct { + Id uint64 + Type MessageType + Target string + Method string + Error string + Headers map[string]string +} diff --git a/codec/pb/pb.go b/codec/pb/pb.go deleted file mode 100644 index 50c92b85..00000000 --- a/codec/pb/pb.go +++ /dev/null @@ -1,24 +0,0 @@ -package pb - -import ( - "github.com/golang/protobuf/proto" -) - -var ( - Codec = protoCodec{} -) - -type protoCodec struct {} - -func (protoCodec) Marshal(v interface{}) ([]byte, error) { - return proto.Marshal(v.(proto.Message)) -} - -func (protoCodec) Unmarshal(data []byte, v interface{}) error { - return proto.Unmarshal(data, v.(proto.Message)) -} - -func (protoCodec) String() string { - return "proto" -} - diff --git a/codec/proto/netstring.go b/codec/proto/netstring.go new file mode 100644 index 00000000..83540328 --- /dev/null +++ b/codec/proto/netstring.go @@ -0,0 +1,36 @@ +package proto + +import ( + "encoding/binary" + "io" +) + +// WriteNetString writes data to a big-endian netstring on a Writer. +// Size is always a 32-bit unsigned int. +func WriteNetString(w io.Writer, data []byte) (written int, err error) { + size := make([]byte, 4) + binary.BigEndian.PutUint32(size, uint32(len(data))) + if written, err = w.Write(size); err != nil { + return + } + return w.Write(data) +} + +// ReadNetString reads data from a big-endian netstring. +func ReadNetString(r io.Reader) (data []byte, err error) { + sizeBuf := make([]byte, 4) + _, err = r.Read(sizeBuf) + if err != nil { + return nil, err + } + size := binary.BigEndian.Uint32(sizeBuf) + if size == 0 { + return nil, nil + } + data = make([]byte, size) + _, err = r.Read(data) + if err != nil { + return nil, err + } + return +} diff --git a/codec/proto/proto.go b/codec/proto/proto.go new file mode 100644 index 00000000..f24126f2 --- /dev/null +++ b/codec/proto/proto.go @@ -0,0 +1,163 @@ +package proto + +import ( + "bytes" + "fmt" + "io" + "sync" + + "github.com/golang/protobuf/proto" + "github.com/micro/go-micro/codec" + rpc "github.com/youtube/vitess/go/rpcplus/pbrpc" +) + +type flusher interface { + Flush() error +} + +type protoCodec struct { + sync.Mutex + rwc io.ReadWriteCloser + mt codec.MessageType + buf *bytes.Buffer +} + +func (c *protoCodec) Close() error { + c.buf.Reset() + return c.rwc.Close() +} + +func (c *protoCodec) String() string { + return "proto" +} + +func (c *protoCodec) Write(m *codec.Message, b interface{}) error { + switch m.Type { + case codec.Request: + c.Lock() + defer c.Unlock() + // This is protobuf, of course we copy it. + pbr := &rpc.Request{ServiceMethod: &m.Method, Seq: &m.Id} + data, err := proto.Marshal(pbr) + if err != nil { + return err + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return err + } + // Of course this is a protobuf! Trust me or detonate the program. + data, err = proto.Marshal(b.(proto.Message)) + if err != nil { + return err + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return err + } + if flusher, ok := c.rwc.(flusher); ok { + err = flusher.Flush() + } + case codec.Response: + c.Lock() + defer c.Unlock() + rtmp := &rpc.Response{ServiceMethod: &m.Method, Seq: &m.Id, Error: &m.Error} + data, err := proto.Marshal(rtmp) + if err != nil { + return err + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return err + } + if pb, ok := b.(proto.Message); ok { + data, err = proto.Marshal(pb) + if err != nil { + return err + } + } else { + data = nil + } + _, err = WriteNetString(c.rwc, data) + if err != nil { + return err + } + if flusher, ok := c.rwc.(flusher); ok { + err = flusher.Flush() + } + case codec.Publication: + data, err := proto.Marshal(b.(proto.Message)) + if err != nil { + return err + } + c.rwc.Write(data) + default: + return fmt.Errorf("Unrecognised message type: %v", m.Type) + } + return nil +} + +func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { + c.buf.Reset() + c.mt = mt + + switch mt { + case codec.Request: + data, err := ReadNetString(c.rwc) + if err != nil { + return err + } + rtmp := new(rpc.Request) + err = proto.Unmarshal(data, rtmp) + if err != nil { + return err + } + m.Method = *rtmp.ServiceMethod + m.Id = *rtmp.Seq + case codec.Response: + data, err := ReadNetString(c.rwc) + if err != nil { + return err + } + rtmp := new(rpc.Response) + err = proto.Unmarshal(data, rtmp) + if err != nil { + return err + } + m.Method = *rtmp.ServiceMethod + m.Id = *rtmp.Seq + m.Error = *rtmp.Error + case codec.Publication: + io.Copy(c.buf, c.rwc) + default: + return fmt.Errorf("Unrecognised message type: %v", mt) + } + return nil +} + +func (c *protoCodec) ReadBody(b interface{}) error { + var data []byte + switch c.mt { + case codec.Request, codec.Response: + var err error + data, err = ReadNetString(c.rwc) + if err != nil { + return err + } + case codec.Publication: + data = c.buf.Bytes() + default: + return fmt.Errorf("Unrecognised message type: %v", c.mt) + } + if b != nil { + return proto.Unmarshal(data, b.(proto.Message)) + } + return nil +} + +func NewCodec(rwc io.ReadWriteCloser) codec.Codec { + return &protoCodec{ + buf: bytes.NewBuffer(nil), + rwc: rwc, + } +} diff --git a/examples/client/main.go b/examples/client/main.go index e0a1470b..8b0a7910 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -6,7 +6,6 @@ import ( "github.com/micro/go-micro/client" "github.com/micro/go-micro/cmd" - "github.com/micro/go-micro/codec/pb" c "github.com/micro/go-micro/context" example "github.com/micro/go-micro/examples/server/proto/example" "golang.org/x/net/context" @@ -120,35 +119,36 @@ func stream() { func main() { cmd.Init() - client.DefaultClient = client.NewClient( - client.Codec("application/pb", pb.Codec), - client.ContentType("application/pb"), - ) + // client.DefaultClient = client.NewClient( + // client.Codec("application/pb", pb.Codec), + // client.ContentType("application/pb"), + // ) + for { + fmt.Println("\n--- Call example ---\n") + for i := 0; i < 10; i++ { + call(i) + } + fmt.Println("\n--- Streamer example ---\n") + stream() - fmt.Println("\n--- Call example ---\n") - for i := 0; i < 10; i++ { - call(i) + fmt.Println("\n--- Publisher example ---\n") + pub() + + fmt.Println("\n--- Wrapper example ---\n") + + // Wrap the default client + client.DefaultClient = logWrap(client.DefaultClient) + + call(0) + + // Wrap using client.Wrap option + client.DefaultClient = client.NewClient( + client.Wrap(traceWrap), + client.Wrap(logWrap), + ) + + call(1) + time.Sleep(time.Millisecond * 100) } - - fmt.Println("\n--- Streamer example ---\n") - stream() - - fmt.Println("\n--- Publisher example ---\n") - pub() - - fmt.Println("\n--- Wrapper example ---\n") - - // Wrap the default client - client.DefaultClient = logWrap(client.DefaultClient) - - call(0) - - // Wrap using client.Wrap option - client.DefaultClient = client.NewClient( - client.Wrap(traceWrap), - client.Wrap(logWrap), - ) - - call(1) } diff --git a/examples/server/main.go b/examples/server/main.go index 43b3b67f..846b0241 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -3,7 +3,6 @@ package main import ( log "github.com/golang/glog" "github.com/micro/go-micro/cmd" - "github.com/micro/go-micro/codec/bson" "github.com/micro/go-micro/examples/server/handler" "github.com/micro/go-micro/examples/server/subscriber" "github.com/micro/go-micro/server" @@ -13,16 +12,15 @@ func main() { // optionally setup command line usage cmd.Init() - server.DefaultServer = server.NewServer( - server.Codec("application/bson", bson.Codec), - ) + // server.DefaultServer = server.NewServer( + // server.Codec("application/bson", bson.Codec), + // ) // Initialise Server server.Init( server.Name("go.micro.srv.example"), ) - // Register Handlers server.Handle( server.NewHandler( diff --git a/server/buffer.go b/server/buffer.go index e833f1a6..4df03c27 100644 --- a/server/buffer.go +++ b/server/buffer.go @@ -1,13 +1,14 @@ package server import ( - "io" + "bytes" ) type buffer struct { - io.ReadWriter + *bytes.Buffer } func (b *buffer) Close() error { + b.Buffer.Reset() return nil } diff --git a/server/codec.go b/server/codec.go deleted file mode 100644 index 1116495f..00000000 --- a/server/codec.go +++ /dev/null @@ -1,98 +0,0 @@ -package server - -import ( - "io" - "net/rpc" - "sync" - - "github.com/micro/go-micro/codec" - - "github.com/youtube/vitess/go/rpcplus" - "github.com/youtube/vitess/go/rpcplus/jsonrpc" - "github.com/youtube/vitess/go/rpcplus/pbrpc" -) - -var ( - defaultCodecs = map[string]codecFunc{ - "application/json": jsonrpc.NewServerCodec, - "application/json-rpc": jsonrpc.NewServerCodec, - "application/protobuf": pbrpc.NewServerCodec, - "application/proto-rpc": pbrpc.NewServerCodec, - "application/octet-stream": pbrpc.NewServerCodec, - } -) - -// for internal use only -type codecFunc func(io.ReadWriteCloser) rpcplus.ServerCodec - -// wraps an net/rpc ServerCodec to provide an rpcplus.ServerCodec -// temporary until we strip out use of rpcplus -type rpcCodecWrap struct { - sync.Mutex - rwc io.ReadWriteCloser - c codec.Codec -} - -func (cw *rpcCodecWrap) ReadRequestHeader(r *rpcplus.Request) error { - data, err := pbrpc.ReadNetString(cw.rwc) - if err != nil { - return err - } - rtmp := new(rpc.Request) - err = cw.c.Unmarshal(data, rtmp) - if err != nil { - return err - } - r.ServiceMethod = rtmp.ServiceMethod - r.Seq = rtmp.Seq - return nil -} - -func (cw *rpcCodecWrap) ReadRequestBody(b interface{}) error { - data, err := pbrpc.ReadNetString(cw.rwc) - if err != nil { - return err - } - if b != nil { - return cw.c.Unmarshal(data, b) - } - return nil -} - -func (cw *rpcCodecWrap) WriteResponse(r *rpcplus.Response, b interface{}, l bool) error { - cw.Lock() - defer cw.Unlock() - rtmp := &rpc.Response{ServiceMethod: r.ServiceMethod, Seq: r.Seq, Error: r.Error} - data, err := cw.c.Marshal(rtmp) - if err != nil { - return err - } - _, err = pbrpc.WriteNetString(cw.rwc, data) - if err != nil { - return err - } - data, err = cw.c.Marshal(b) - if err != nil { - return err - } - _, err = pbrpc.WriteNetString(cw.rwc, data) - if err != nil { - return err - } - return nil -} - -func (cw *rpcCodecWrap) Close() error { - return cw.rwc.Close() -} - -// wraps a CodecFunc to provide an internal codecFunc -// temporary until we strip rpcplus out -func codecWrap(c codec.Codec) codecFunc { - return func(rwc io.ReadWriteCloser) rpcplus.ServerCodec { - return &rpcCodecWrap{ - rwc: rwc, - c: c, - } - } -} diff --git a/server/options.go b/server/options.go index 8969cb2f..ca17fb3c 100644 --- a/server/options.go +++ b/server/options.go @@ -8,7 +8,7 @@ import ( ) type options struct { - codecs map[string]codec.Codec + codecs map[string]codec.NewCodec broker broker.Broker registry registry.Registry transport transport.Transport @@ -22,7 +22,7 @@ type options struct { func newOptions(opt ...Option) options { opts := options{ - codecs: make(map[string]codec.Codec), + codecs: make(map[string]codec.NewCodec), } for _, o := range opt { @@ -127,7 +127,7 @@ func Broker(b broker.Broker) Option { } // Codec to use to encode/decode requests for a given content type -func Codec(contentType string, c codec.Codec) Option { +func Codec(contentType string, c codec.NewCodec) Option { return func(o *options) { o.codecs[contentType] = c } diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 31c1a476..7ec7cdef 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -3,13 +3,15 @@ package server import ( "bytes" + "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/proto" "github.com/micro/go-micro/transport" rpc "github.com/youtube/vitess/go/rpcplus" ) type rpcPlusCodec struct { socket transport.Socket - codec rpc.ServerCodec + codec codec.Codec req *transport.Message buf *readWriteCloser @@ -20,6 +22,16 @@ type readWriteCloser struct { rbuf *bytes.Buffer } +var ( + defaultCodecs = map[string]codec.NewCodec{ + // "application/json": jsonrpc.NewServerCodec, + // "application/json-rpc": jsonrpc.NewServerCodec, + "application/protobuf": proto.NewCodec, + "application/proto-rpc": proto.NewCodec, + "application/octet-stream": proto.NewCodec, + } +) + func (rwc *readWriteCloser) Read(p []byte) (n int, err error) { return rwc.rbuf.Read(p) } @@ -34,14 +46,14 @@ func (rwc *readWriteCloser) Close() error { return nil } -func newRpcPlusCodec(req *transport.Message, socket transport.Socket, cf codecFunc) rpc.ServerCodec { +func newRpcPlusCodec(req *transport.Message, socket transport.Socket, c codec.NewCodec) rpc.ServerCodec { rwc := &readWriteCloser{ rbuf: bytes.NewBuffer(req.Body), wbuf: bytes.NewBuffer(nil), } r := &rpcPlusCodec{ buf: rwc, - codec: cf(rwc), + codec: c(rwc), req: req, socket: socket, } @@ -49,16 +61,26 @@ func newRpcPlusCodec(req *transport.Message, socket transport.Socket, cf codecFu } func (c *rpcPlusCodec) ReadRequestHeader(r *rpc.Request) error { - return c.codec.ReadRequestHeader(r) + var m codec.Message + err := c.codec.ReadHeader(&m, codec.Request) + r.ServiceMethod = m.Method + r.Seq = m.Id + return err } -func (c *rpcPlusCodec) ReadRequestBody(r interface{}) error { - return c.codec.ReadRequestBody(r) +func (c *rpcPlusCodec) ReadRequestBody(b interface{}) error { + return c.codec.ReadBody(b) } func (c *rpcPlusCodec) WriteResponse(r *rpc.Response, body interface{}, last bool) error { c.buf.wbuf.Reset() - if err := c.codec.WriteResponse(r, body, last); err != nil { + m := &codec.Message{ + Method: r.ServiceMethod, + Id: r.Seq, + Error: r.Error, + Type: codec.Response, + } + if err := c.codec.Write(m, body); err != nil { return err } return c.socket.Send(&transport.Message{ @@ -69,5 +91,6 @@ func (c *rpcPlusCodec) WriteResponse(r *rpc.Response, body interface{}, last boo func (c *rpcPlusCodec) Close() error { c.buf.Close() + c.codec.Close() return c.socket.Close() } diff --git a/server/rpc_server.go b/server/rpc_server.go index 98755db7..8e55fe15 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" c "github.com/micro/go-micro/context" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/transport" @@ -43,7 +44,7 @@ func (s *rpcServer) accept(sock transport.Socket) { return } - cf, err := s.codecFunc(msg.Header["Content-Type"]) + cf, err := s.newCodec(msg.Header["Content-Type"]) // TODO: needs better error handling if err != nil { sock.Send(&transport.Message{ @@ -73,9 +74,9 @@ func (s *rpcServer) accept(sock transport.Socket) { } } -func (s *rpcServer) codecFunc(contentType string) (codecFunc, error) { +func (s *rpcServer) newCodec(contentType string) (codec.NewCodec, error) { if cf, ok := s.opts.codecs[contentType]; ok { - return codecWrap(cf), nil + return cf, nil } if cf, ok := defaultCodecs[contentType]; ok { return cf, nil @@ -200,7 +201,7 @@ func (s *rpcServer) Register() error { defer s.Unlock() for sb, _ := range s.subscribers { - handler := createSubHandler(sb) + handler := s.createSubHandler(sb) sub, err := config.broker.Subscribe(sb.Topic(), handler) if err != nil { return err diff --git a/server/subscriber.go b/server/subscriber.go index 9395bc44..ea4d82f0 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -1,11 +1,11 @@ package server import ( - "encoding/json" + "bytes" "reflect" - "github.com/golang/protobuf/proto" "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" c "github.com/micro/go-micro/context" "github.com/micro/go-micro/registry" "golang.org/x/net/context" @@ -94,8 +94,19 @@ func newSubscriber(topic string, sub interface{}) Subscriber { } } -func createSubHandler(sb *subscriber) broker.Handler { +func (s *rpcServer) createSubHandler(sb *subscriber) broker.Handler { return func(msg *broker.Message) { + cf, err := s.newCodec(msg.Header["Content-Type"]) + if err != nil { + return + } + + b := &buffer{bytes.NewBuffer(msg.Body)} + co := cf(b) + if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { + return + } + hdr := make(map[string]string) for k, v := range msg.Header { hdr[k] = v @@ -107,7 +118,6 @@ func createSubHandler(sb *subscriber) broker.Handler { for _, handler := range sb.handlers { var isVal bool var req reflect.Value - var uerr error if handler.reqType.Kind() == reflect.Ptr { req = reflect.New(handler.reqType.Elem()) @@ -116,14 +126,7 @@ func createSubHandler(sb *subscriber) broker.Handler { isVal = true } - switch msg.Header["Content-Type"] { - case "application/octet-stream": - uerr = proto.Unmarshal(msg.Body, req.Interface().(proto.Message)) - case "application/json": - uerr = json.Unmarshal(msg.Body, req.Interface()) - } - - if uerr != nil { + if err := co.ReadBody(req.Interface()); err != nil { continue }