From 6c6c5359b188597d73455bda91ca40f2fde1d3c2 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 31 Mar 2020 17:13:21 +0100 Subject: [PATCH 1/9] Add options to config (#1450) --- config/config.go | 2 ++ config/default.go | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/config/config.go b/config/config.go index d70d5778..cd30f8b2 100644 --- a/config/config.go +++ b/config/config.go @@ -16,6 +16,8 @@ type Config interface { reader.Values // Init the config Init(opts ...Option) error + // Options in the config + Options() Options // Stop the config loader/watcher Close() error // Load config sources diff --git a/config/default.go b/config/default.go index 0b70cbcf..905ccad7 100644 --- a/config/default.go +++ b/config/default.go @@ -67,6 +67,10 @@ func (c *config) Init(opts ...Option) error { return nil } +func (c *config) Options() Options { + return c.opts +} + func (c *config) run() { watch := func(w loader.Watcher) error { for { From d6bef84de08b00c3059a073fc26872f9545b2893 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 31 Mar 2020 21:59:35 +0300 Subject: [PATCH 2/9] api/handler/rpc: fix metadata cleanup (#1451) Signed-off-by: Vasiliy Tolstov --- api/handler/rpc/rpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/handler/rpc/rpc.go b/api/handler/rpc/rpc.go index c6bd6daf..d7c3c75c 100644 --- a/api/handler/rpc/rpc.go +++ b/api/handler/rpc/rpc.go @@ -289,8 +289,8 @@ func requestPayload(r *http.Request) ([]byte, error) { for k, v := range md { if strings.HasPrefix(k, "x-api-field-") { matches[strings.TrimPrefix(k, "x-api-field-")] = v + delete(md, k) } - delete(md, k) } // restore context without fields From 18061723bbddef80393b809c2cf9096ea2a4f874 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 31 Mar 2020 22:36:51 +0300 Subject: [PATCH 3/9] fix api metadata extract from context (#1452) Signed-off-by: Vasiliy Tolstov --- api/handler/rpc/rpc.go | 16 ++++++++++++++-- api/router/static/static.go | 2 +- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/api/handler/rpc/rpc.go b/api/handler/rpc/rpc.go index d7c3c75c..8e830a8c 100644 --- a/api/handler/rpc/rpc.go +++ b/api/handler/rpc/rpc.go @@ -118,6 +118,17 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // create context cx := ctx.FromRequest(r) + // get context from http handler wrappers + md, ok := r.Context().Value(metadata.MetadataKey{}).(metadata.Metadata) + if !ok { + md = make(metadata.Metadata) + } + + // merge context with overwrite + cx = metadata.MergeContext(cx, md, true) + + // set merged context to request + *r = *r.WithContext(cx) // if stream we currently only support json if isStream(r, service) { @@ -287,6 +298,7 @@ func requestPayload(r *http.Request) ([]byte, error) { // allocate maximum matches := make(map[string]string, len(md)) for k, v := range md { + // filter own keys if strings.HasPrefix(k, "x-api-field-") { matches[strings.TrimPrefix(k, "x-api-field-")] = v delete(md, k) @@ -294,8 +306,8 @@ func requestPayload(r *http.Request) ([]byte, error) { } // restore context without fields - ctx = metadata.NewContext(ctx, md) - *r = *r.WithContext(ctx) + *r = *r.WithContext(metadata.NewContext(ctx, md)) + req := make(map[string]interface{}, len(md)) for k, v := range matches { ps := strings.Split(k, ".") diff --git a/api/router/static/static.go b/api/router/static/static.go index 2f8a0962..f391cd5a 100644 --- a/api/router/static/static.go +++ b/api/router/static/static.go @@ -255,7 +255,7 @@ func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) { } pMatch = true ctx := req.Context() - md, ok := metadata.FromContext(ctx) + md, ok := ctx.Value(metadata.MetadataKey{}).(metadata.Metadata) if !ok { md = make(metadata.Metadata) } From 5e65a46be384c8051a89e498a080afe47472762b Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 31 Mar 2020 22:55:33 +0300 Subject: [PATCH 4/9] metadata: allow to remove key from metadata (#1453) Signed-off-by: Vasiliy Tolstov --- metadata/metadata.go | 22 ++++++++++++++++++++-- metadata/metadata_test.go | 21 +++++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/metadata/metadata.go b/metadata/metadata.go index 5ba0e4c0..d9d89236 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -25,6 +25,13 @@ func (md Metadata) Get(key string) (string, bool) { return val, ok } +func (md Metadata) Del(key string) { + // delete key as-is + delete(md, key) + // delete also Title key + delete(md, strings.Title(key)) +} + // Copy makes a copy of the metadata func Copy(md Metadata) Metadata { cmd := make(Metadata) @@ -34,13 +41,22 @@ func Copy(md Metadata) Metadata { return cmd } +// Del key from metadata +func Del(ctx context.Context, k string) context.Context { + return Set(ctx, k, "") +} + // Set add key with val to metadata func Set(ctx context.Context, k, v string) context.Context { md, ok := FromContext(ctx) if !ok { md = make(Metadata) } - md[k] = v + if v == "" { + delete(md, k) + } else { + md[k] = v + } return context.WithValue(ctx, MetadataKey{}, md) } @@ -96,8 +112,10 @@ func MergeContext(ctx context.Context, patchMd Metadata, overwrite bool) context for k, v := range patchMd { if _, ok := cmd[k]; ok && !overwrite { // skip - } else { + } else if v != "" { cmd[k] = v + } else { + delete(cmd, k) } } return context.WithValue(ctx, MetadataKey{}, cmd) diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 8dacfa84..485923e2 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -18,6 +18,27 @@ func TestMetadataSet(t *testing.T) { } } +func TestMetadataDel(t *testing.T) { + md := Metadata{ + "Foo": "bar", + "Baz": "empty", + } + + ctx := NewContext(context.TODO(), md) + ctx = Set(ctx, "Baz", "") + + emd, ok := FromContext(ctx) + if !ok { + t.Fatal("key Key not found") + } + + _, ok = emd["Baz"] + if ok { + t.Fatal("key Baz not deleted") + } + +} + func TestMetadataCopy(t *testing.T) { md := Metadata{ "Foo": "bar", From 3a22efbd7dad62d75f6cdb233db6ecfef078053c Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Tue, 31 Mar 2020 23:39:18 +0300 Subject: [PATCH 5/9] metadata: change method name (#1454) Signed-off-by: Vasiliy Tolstov --- metadata/metadata.go | 6 +++--- metadata/metadata_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/metadata/metadata.go b/metadata/metadata.go index d9d89236..96693cf6 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -25,7 +25,7 @@ func (md Metadata) Get(key string) (string, bool) { return val, ok } -func (md Metadata) Del(key string) { +func (md Metadata) Delete(key string) { // delete key as-is delete(md, key) // delete also Title key @@ -41,8 +41,8 @@ func Copy(md Metadata) Metadata { return cmd } -// Del key from metadata -func Del(ctx context.Context, k string) context.Context { +// Delete key from metadata +func Delete(ctx context.Context, k string) context.Context { return Set(ctx, k, "") } diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 485923e2..6a3764b4 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -18,14 +18,14 @@ func TestMetadataSet(t *testing.T) { } } -func TestMetadataDel(t *testing.T) { +func TestMetadataDelete(t *testing.T) { md := Metadata{ "Foo": "bar", "Baz": "empty", } ctx := NewContext(context.TODO(), md) - ctx = Set(ctx, "Baz", "") + ctx = Delete(ctx, "Baz") emd, ok := FromContext(ctx) if !ok { From 1490aff38ee3ee820c7c01bcd7fb4cfc116204f7 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 1 Apr 2020 00:23:17 +0300 Subject: [PATCH 6/9] api/handler/rpc: correctly parse nested url vars (#1455) Signed-off-by: Vasiliy Tolstov --- api/handler/rpc/rpc.go | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/api/handler/rpc/rpc.go b/api/handler/rpc/rpc.go index 8e830a8c..0aeceb43 100644 --- a/api/handler/rpc/rpc.go +++ b/api/handler/rpc/rpc.go @@ -297,6 +297,8 @@ func requestPayload(r *http.Request) ([]byte, error) { } // allocate maximum matches := make(map[string]string, len(md)) + + // get fields from url path for k, v := range md { // filter own keys if strings.HasPrefix(k, "x-api-field-") { @@ -305,6 +307,18 @@ func requestPayload(r *http.Request) ([]byte, error) { } } + // get fields from url values + if len(r.URL.RawQuery) > 0 { + umd := make(map[string]string) + err = qson.Unmarshal(&umd, r.URL.RawQuery) + if err != nil { + return nil, err + } + for k, v := range umd { + matches[k] = v + } + } + // restore context without fields *r = *r.WithContext(metadata.NewContext(ctx, md)) @@ -315,7 +329,6 @@ func requestPayload(r *http.Request) ([]byte, error) { req[k] = v continue } - em := make(map[string]interface{}) em[ps[len(ps)-1]] = v for i := len(ps) - 2; i > 0; i-- { @@ -323,7 +336,15 @@ func requestPayload(r *http.Request) ([]byte, error) { nm[ps[i]] = em em = nm } - req[ps[0]] = em + if vm, ok := req[ps[0]]; ok { + // nested map + nm := vm.(map[string]interface{}) + for vk, vv := range em { + nm[vk] = vv + } + } else { + req[ps[0]] = em + } } pathbuf := []byte("{}") if len(req) > 0 { @@ -332,14 +353,8 @@ func requestPayload(r *http.Request) ([]byte, error) { return nil, err } } - urlbuf := []byte("{}") - if len(r.URL.RawQuery) > 0 { - urlbuf, err = qson.ToJSON(r.URL.RawQuery) - if err != nil { - return nil, err - } - } + urlbuf := []byte("{}") out, err := jsonpatch.MergeMergePatches(urlbuf, pathbuf) if err != nil { return nil, err From 68b0238a5d37fd48d70be14162bd6b33c4528e54 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Tue, 31 Mar 2020 23:22:11 +0100 Subject: [PATCH 7/9] add stream timeout option which defaults to 0 (#1456) * add stream timeout option which defaults to 0 * fix option --- client/grpc/grpc.go | 4 +++- client/options.go | 16 ++++++++++++++++ client/rpc_client.go | 4 +++- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 5f14c25d..71a89f9c 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -221,7 +221,9 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client } // set timeout in nanoseconds - header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) + if opts.StreamTimeout > time.Duration(0) { + header["timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) + } // set the content type for the request header["x-content-type"] = req.ContentType() diff --git a/client/options.go b/client/options.go index 378957ed..da3d592e 100644 --- a/client/options.go +++ b/client/options.go @@ -57,6 +57,8 @@ type CallOptions struct { Retries int // Request/Response timeout RequestTimeout time.Duration + // Stream timeout for the stream + StreamTimeout time.Duration // Use the services own auth token ServiceToken bool @@ -227,6 +229,13 @@ func RequestTimeout(d time.Duration) Option { } } +// StreamTimeout sets the stream timeout +func StreamTimeout(d time.Duration) Option { + return func(o *Options) { + o.CallOptions.StreamTimeout = d + } +} + // Transport dial timeout func DialTimeout(d time.Duration) Option { return func(o *Options) { @@ -295,6 +304,13 @@ func WithRequestTimeout(d time.Duration) CallOption { } } +// WithStreamTimeout sets the stream timeout +func WithStreamTimeout(d time.Duration) CallOption { + return func(o *CallOptions) { + o.StreamTimeout = d + } +} + // WithDialTimeout is a CallOption which overrides that which // set in Options.CallOptions func WithDialTimeout(d time.Duration) CallOption { diff --git a/client/rpc_client.go b/client/rpc_client.go index 8b4b806b..ea68bfb2 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -198,7 +198,9 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request } // set timeout in nanoseconds - msg.Header["Timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) + if opts.StreamTimeout > time.Duration(0) { + msg.Header["Timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) + } // set the content type for the request msg.Header["Content-Type"] = req.ContentType() // set the accept header From 8a8742f86765688dda3fef4ee27220eed75053e2 Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 1 Apr 2020 01:26:58 +0300 Subject: [PATCH 8/9] api/handler/rpc: dont change types of url fields (#1457) Signed-off-by: Vasiliy Tolstov --- api/handler/rpc/rpc.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/api/handler/rpc/rpc.go b/api/handler/rpc/rpc.go index 0aeceb43..543f5e8c 100644 --- a/api/handler/rpc/rpc.go +++ b/api/handler/rpc/rpc.go @@ -295,8 +295,9 @@ func requestPayload(r *http.Request) ([]byte, error) { if !ok { md = make(map[string]string) } + // allocate maximum - matches := make(map[string]string, len(md)) + matches := make(map[string]interface{}, len(md)) // get fields from url path for k, v := range md { @@ -307,9 +308,12 @@ func requestPayload(r *http.Request) ([]byte, error) { } } + // map of all fields + req := make(map[string]interface{}, len(md)) + // get fields from url values if len(r.URL.RawQuery) > 0 { - umd := make(map[string]string) + umd := make(map[string]interface{}) err = qson.Unmarshal(&umd, r.URL.RawQuery) if err != nil { return nil, err @@ -322,7 +326,6 @@ func requestPayload(r *http.Request) ([]byte, error) { // restore context without fields *r = *r.WithContext(metadata.NewContext(ctx, md)) - req := make(map[string]interface{}, len(md)) for k, v := range matches { ps := strings.Split(k, ".") if len(ps) == 1 { @@ -342,6 +345,7 @@ func requestPayload(r *http.Request) ([]byte, error) { for vk, vv := range em { nm[vk] = vv } + req[ps[0]] = nm } else { req[ps[0]] = em } From 7b7a859a03b5da591d839071818027df00a14b5b Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Wed, 1 Apr 2020 01:50:37 +0300 Subject: [PATCH 9/9] api: use http request Clone (#1458) Signed-off-by: Vasiliy Tolstov --- api/handler/rpc/rpc.go | 4 ++-- api/router/static/static.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/api/handler/rpc/rpc.go b/api/handler/rpc/rpc.go index 543f5e8c..b3ab0db5 100644 --- a/api/handler/rpc/rpc.go +++ b/api/handler/rpc/rpc.go @@ -128,7 +128,7 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { cx = metadata.MergeContext(cx, md, true) // set merged context to request - *r = *r.WithContext(cx) + *r = *r.Clone(cx) // if stream we currently only support json if isStream(r, service) { @@ -324,7 +324,7 @@ func requestPayload(r *http.Request) ([]byte, error) { } // restore context without fields - *r = *r.WithContext(metadata.NewContext(ctx, md)) + *r = *r.Clone(metadata.NewContext(ctx, md)) for k, v := range matches { ps := strings.Split(k, ".") diff --git a/api/router/static/static.go b/api/router/static/static.go index f391cd5a..3cf3bebe 100644 --- a/api/router/static/static.go +++ b/api/router/static/static.go @@ -262,7 +262,7 @@ func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) { for k, v := range matches { md[fmt.Sprintf("x-api-field-%s", k)] = v } - *req = *req.WithContext(context.WithValue(ctx, metadata.MetadataKey{}, md)) + *req = *req.Clone(context.WithValue(ctx, metadata.MetadataKey{}, md)) break pathLoop } if !pMatch {