From 3573ac818f5ce53b9bdfb76db08314c57ffa131b Mon Sep 17 00:00:00 2001 From: magodo Date: Sun, 9 Jun 2019 11:50:50 +0800 Subject: [PATCH 1/4] unmarshal json with `jsonpb` if accepter is pb --- codec/json/json.go | 19 +++++++++++++------ codec/json/marshaler.go | 6 ++++++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/codec/json/json.go b/codec/json/json.go index 42de4a4d..f17eef72 100644 --- a/codec/json/json.go +++ b/codec/json/json.go @@ -5,13 +5,16 @@ import ( "encoding/json" "io" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" "github.com/micro/go-micro/codec" ) type Codec struct { - Conn io.ReadWriteCloser - Encoder *json.Encoder - Decoder *json.Decoder + Conn io.ReadWriteCloser + Encoder *json.Encoder + Decoder *json.Decoder + Unmarshaler *jsonpb.Unmarshaler } func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { @@ -22,6 +25,9 @@ func (c *Codec) ReadBody(b interface{}) error { if b == nil { return nil } + if pb, ok := b.(proto.Message); ok { + return c.Unmarshaler.UnmarshalNext(c.Decoder, pb) + } return c.Decoder.Decode(b) } @@ -42,8 +48,9 @@ func (c *Codec) String() string { func NewCodec(c io.ReadWriteCloser) codec.Codec { return &Codec{ - Conn: c, - Decoder: json.NewDecoder(c), - Encoder: json.NewEncoder(c), + Conn: c, + Decoder: json.NewDecoder(c), + Encoder: json.NewEncoder(c), + Unmarshaler: &jsonpb.Unmarshaler{}, } } diff --git a/codec/json/marshaler.go b/codec/json/marshaler.go index b9d0be28..a13e5f39 100644 --- a/codec/json/marshaler.go +++ b/codec/json/marshaler.go @@ -2,6 +2,9 @@ package json import ( "encoding/json" + + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" ) type Marshaler struct{} @@ -11,6 +14,9 @@ func (j Marshaler) Marshal(v interface{}) ([]byte, error) { } func (j Marshaler) Unmarshal(d []byte, v interface{}) error { + if pb, ok := v.(proto.Message); ok { + return jsonpb.UnmarshalString(string(d), pb) + } return json.Unmarshal(d, v) } From 748c20c9794d997aed3f037567902926295cbaef Mon Sep 17 00:00:00 2001 From: magodo Date: Sun, 9 Jun 2019 11:55:36 +0800 Subject: [PATCH 2/4] use package level func for unmarshal --- codec/json/json.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/codec/json/json.go b/codec/json/json.go index f17eef72..5af47a07 100644 --- a/codec/json/json.go +++ b/codec/json/json.go @@ -11,10 +11,9 @@ import ( ) type Codec struct { - Conn io.ReadWriteCloser - Encoder *json.Encoder - Decoder *json.Decoder - Unmarshaler *jsonpb.Unmarshaler + Conn io.ReadWriteCloser + Encoder *json.Encoder + Decoder *json.Decoder } func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { @@ -26,7 +25,7 @@ func (c *Codec) ReadBody(b interface{}) error { return nil } if pb, ok := b.(proto.Message); ok { - return c.Unmarshaler.UnmarshalNext(c.Decoder, pb) + return jsonpb.UnmarshalNext(c.Decoder, pb) } return c.Decoder.Decode(b) } @@ -48,9 +47,8 @@ func (c *Codec) String() string { func NewCodec(c io.ReadWriteCloser) codec.Codec { return &Codec{ - Conn: c, - Decoder: json.NewDecoder(c), - Encoder: json.NewEncoder(c), - Unmarshaler: &jsonpb.Unmarshaler{}, + Conn: c, + Decoder: json.NewDecoder(c), + Encoder: json.NewEncoder(c), } } From 73b0a0ed0e272b9600c602e0a39dad40b42b3543 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Sun, 9 Jun 2019 15:51:27 +0100 Subject: [PATCH 3/4] Return registry.ErrWatcherStopped when consul watcher stops. The original code returns "result chan closed" errors.Error which does not carry higher semantics signal to downstream despite go-micro having a clearly defined Error for this behaviour. This commit fixes that and lets the downstream i.e. consumer of this code to act based on different errors. --- registry/consul/watcher.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/registry/consul/watcher.go b/registry/consul/watcher.go index 2fc81886..1bb9d5b7 100644 --- a/registry/consul/watcher.go +++ b/registry/consul/watcher.go @@ -1,7 +1,6 @@ package consul import ( - "errors" "log" "os" "sync" @@ -246,14 +245,16 @@ func (cw *consulWatcher) handle(idx uint64, data interface{}) { func (cw *consulWatcher) Next() (*registry.Result, error) { select { case <-cw.exit: - return nil, errors.New("result chan closed") + return nil, registry.ErrWatcherStopped case r, ok := <-cw.next: if !ok { - return nil, errors.New("result chan closed") + return nil, registry.ErrWatcherStopped } return r, nil } - return nil, errors.New("result chan closed") + // NOTE: This is a dead code path: e.g. it will never be reached + // as we return in all previous code paths never leading to this return + return nil, registry.ErrWatcherStopped } func (cw *consulWatcher) Stop() { From 46de3ae9a9ec60ef840b59ce3c4c8a96204807c6 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 10 Jun 2019 12:42:43 +0100 Subject: [PATCH 4/4] Fix text codec --- codec/text/text.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/codec/text/text.go b/codec/text/text.go index f6f28859..da43da50 100644 --- a/codec/text/text.go +++ b/codec/text/text.go @@ -30,6 +30,9 @@ func (c *Codec) ReadBody(b interface{}) error { } switch b.(type) { + case *string: + v := b.(*string) + *v = string(buf) case *[]byte: v := b.(*[]byte) *v = buf @@ -51,6 +54,12 @@ func (c *Codec) Write(m *codec.Message, b interface{}) error { case *[]byte: ve := b.(*[]byte) v = *ve + case *string: + ve := b.(*string) + v = []byte(*ve) + case string: + ve := b.(string) + v = []byte(ve) case []byte: v = b.([]byte) default: @@ -65,7 +74,7 @@ func (c *Codec) Close() error { } func (c *Codec) String() string { - return "bytes" + return "text" } func NewCodec(c io.ReadWriteCloser) codec.Codec {