diff --git a/api/handler/rpc/rpc.go b/api/handler/rpc/rpc.go index c6bd6daf..b3ab0db5 100644 --- a/api/handler/rpc/rpc.go +++ b/api/handler/rpc/rpc.go @@ -118,6 +118,17 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // create context cx := ctx.FromRequest(r) + // get context from http handler wrappers + md, ok := r.Context().Value(metadata.MetadataKey{}).(metadata.Metadata) + if !ok { + md = make(metadata.Metadata) + } + + // merge context with overwrite + cx = metadata.MergeContext(cx, md, true) + + // set merged context to request + *r = *r.Clone(cx) // if stream we currently only support json if isStream(r, service) { @@ -284,26 +295,43 @@ func requestPayload(r *http.Request) ([]byte, error) { if !ok { md = make(map[string]string) } + // allocate maximum - matches := make(map[string]string, len(md)) + matches := make(map[string]interface{}, len(md)) + + // get fields from url path for k, v := range md { + // filter own keys if strings.HasPrefix(k, "x-api-field-") { matches[strings.TrimPrefix(k, "x-api-field-")] = v + delete(md, k) + } + } + + // map of all fields + req := make(map[string]interface{}, len(md)) + + // get fields from url values + if len(r.URL.RawQuery) > 0 { + umd := make(map[string]interface{}) + err = qson.Unmarshal(&umd, r.URL.RawQuery) + if err != nil { + return nil, err + } + for k, v := range umd { + matches[k] = v } - delete(md, k) } // restore context without fields - ctx = metadata.NewContext(ctx, md) - *r = *r.WithContext(ctx) - req := make(map[string]interface{}, len(md)) + *r = *r.Clone(metadata.NewContext(ctx, md)) + for k, v := range matches { ps := strings.Split(k, ".") if len(ps) == 1 { req[k] = v continue } - em := make(map[string]interface{}) em[ps[len(ps)-1]] = v for i := len(ps) - 2; i > 0; i-- { @@ -311,7 +339,16 @@ func requestPayload(r *http.Request) ([]byte, error) { nm[ps[i]] = em em = nm } - req[ps[0]] = em + if vm, ok := req[ps[0]]; ok { + // nested map + nm := vm.(map[string]interface{}) + for vk, vv := range em { + nm[vk] = vv + } + req[ps[0]] = nm + } else { + req[ps[0]] = em + } } pathbuf := []byte("{}") if len(req) > 0 { @@ -320,14 +357,8 @@ func requestPayload(r *http.Request) ([]byte, error) { return nil, err } } - urlbuf := []byte("{}") - if len(r.URL.RawQuery) > 0 { - urlbuf, err = qson.ToJSON(r.URL.RawQuery) - if err != nil { - return nil, err - } - } + urlbuf := []byte("{}") out, err := jsonpatch.MergeMergePatches(urlbuf, pathbuf) if err != nil { return nil, err diff --git a/api/router/static/static.go b/api/router/static/static.go index 2f8a0962..3cf3bebe 100644 --- a/api/router/static/static.go +++ b/api/router/static/static.go @@ -255,14 +255,14 @@ func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) { } pMatch = true ctx := req.Context() - md, ok := metadata.FromContext(ctx) + md, ok := ctx.Value(metadata.MetadataKey{}).(metadata.Metadata) if !ok { md = make(metadata.Metadata) } for k, v := range matches { md[fmt.Sprintf("x-api-field-%s", k)] = v } - *req = *req.WithContext(context.WithValue(ctx, metadata.MetadataKey{}, md)) + *req = *req.Clone(context.WithValue(ctx, metadata.MetadataKey{}, md)) break pathLoop } if !pMatch { diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 0fa955bf..9d2fd597 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -221,7 +221,9 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client } // set timeout in nanoseconds - header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) + if opts.StreamTimeout > time.Duration(0) { + header["timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) + } // set the content type for the request header["x-content-type"] = req.ContentType() diff --git a/client/options.go b/client/options.go index 378957ed..da3d592e 100644 --- a/client/options.go +++ b/client/options.go @@ -57,6 +57,8 @@ type CallOptions struct { Retries int // Request/Response timeout RequestTimeout time.Duration + // Stream timeout for the stream + StreamTimeout time.Duration // Use the services own auth token ServiceToken bool @@ -227,6 +229,13 @@ func RequestTimeout(d time.Duration) Option { } } +// StreamTimeout sets the stream timeout +func StreamTimeout(d time.Duration) Option { + return func(o *Options) { + o.CallOptions.StreamTimeout = d + } +} + // Transport dial timeout func DialTimeout(d time.Duration) Option { return func(o *Options) { @@ -295,6 +304,13 @@ func WithRequestTimeout(d time.Duration) CallOption { } } +// WithStreamTimeout sets the stream timeout +func WithStreamTimeout(d time.Duration) CallOption { + return func(o *CallOptions) { + o.StreamTimeout = d + } +} + // WithDialTimeout is a CallOption which overrides that which // set in Options.CallOptions func WithDialTimeout(d time.Duration) CallOption { diff --git a/client/rpc_client.go b/client/rpc_client.go index 8b4b806b..ea68bfb2 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -198,7 +198,9 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request } // set timeout in nanoseconds - msg.Header["Timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) + if opts.StreamTimeout > time.Duration(0) { + msg.Header["Timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) + } // set the content type for the request msg.Header["Content-Type"] = req.ContentType() // set the accept header diff --git a/config/config.go b/config/config.go index d70d5778..cd30f8b2 100644 --- a/config/config.go +++ b/config/config.go @@ -16,6 +16,8 @@ type Config interface { reader.Values // Init the config Init(opts ...Option) error + // Options in the config + Options() Options // Stop the config loader/watcher Close() error // Load config sources diff --git a/config/default.go b/config/default.go index 0b70cbcf..905ccad7 100644 --- a/config/default.go +++ b/config/default.go @@ -67,6 +67,10 @@ func (c *config) Init(opts ...Option) error { return nil } +func (c *config) Options() Options { + return c.opts +} + func (c *config) run() { watch := func(w loader.Watcher) error { for { diff --git a/metadata/metadata.go b/metadata/metadata.go index 5ba0e4c0..96693cf6 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -25,6 +25,13 @@ func (md Metadata) Get(key string) (string, bool) { return val, ok } +func (md Metadata) Delete(key string) { + // delete key as-is + delete(md, key) + // delete also Title key + delete(md, strings.Title(key)) +} + // Copy makes a copy of the metadata func Copy(md Metadata) Metadata { cmd := make(Metadata) @@ -34,13 +41,22 @@ func Copy(md Metadata) Metadata { return cmd } +// Delete key from metadata +func Delete(ctx context.Context, k string) context.Context { + return Set(ctx, k, "") +} + // Set add key with val to metadata func Set(ctx context.Context, k, v string) context.Context { md, ok := FromContext(ctx) if !ok { md = make(Metadata) } - md[k] = v + if v == "" { + delete(md, k) + } else { + md[k] = v + } return context.WithValue(ctx, MetadataKey{}, md) } @@ -96,8 +112,10 @@ func MergeContext(ctx context.Context, patchMd Metadata, overwrite bool) context for k, v := range patchMd { if _, ok := cmd[k]; ok && !overwrite { // skip - } else { + } else if v != "" { cmd[k] = v + } else { + delete(cmd, k) } } return context.WithValue(ctx, MetadataKey{}, cmd) diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 8dacfa84..6a3764b4 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -18,6 +18,27 @@ func TestMetadataSet(t *testing.T) { } } +func TestMetadataDelete(t *testing.T) { + md := Metadata{ + "Foo": "bar", + "Baz": "empty", + } + + ctx := NewContext(context.TODO(), md) + ctx = Delete(ctx, "Baz") + + emd, ok := FromContext(ctx) + if !ok { + t.Fatal("key Key not found") + } + + _, ok = emd["Baz"] + if ok { + t.Fatal("key Baz not deleted") + } + +} + func TestMetadataCopy(t *testing.T) { md := Metadata{ "Foo": "bar",