Merge branch 'master' into auth-interface-update
This commit is contained in:
		| @@ -118,6 +118,17 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |||||||
|  |  | ||||||
| 	// create context | 	// create context | ||||||
| 	cx := ctx.FromRequest(r) | 	cx := ctx.FromRequest(r) | ||||||
|  | 	// get context from http handler wrappers | ||||||
|  | 	md, ok := r.Context().Value(metadata.MetadataKey{}).(metadata.Metadata) | ||||||
|  | 	if !ok { | ||||||
|  | 		md = make(metadata.Metadata) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// merge context with overwrite | ||||||
|  | 	cx = metadata.MergeContext(cx, md, true) | ||||||
|  |  | ||||||
|  | 	// set merged context to request | ||||||
|  | 	*r = *r.Clone(cx) | ||||||
|  |  | ||||||
| 	// if stream we currently only support json | 	// if stream we currently only support json | ||||||
| 	if isStream(r, service) { | 	if isStream(r, service) { | ||||||
| @@ -284,26 +295,43 @@ func requestPayload(r *http.Request) ([]byte, error) { | |||||||
| 	if !ok { | 	if !ok { | ||||||
| 		md = make(map[string]string) | 		md = make(map[string]string) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// allocate maximum | 	// allocate maximum | ||||||
| 	matches := make(map[string]string, len(md)) | 	matches := make(map[string]interface{}, len(md)) | ||||||
|  |  | ||||||
|  | 	// get fields from url path | ||||||
| 	for k, v := range md { | 	for k, v := range md { | ||||||
|  | 		// filter own keys | ||||||
| 		if strings.HasPrefix(k, "x-api-field-") { | 		if strings.HasPrefix(k, "x-api-field-") { | ||||||
| 			matches[strings.TrimPrefix(k, "x-api-field-")] = v | 			matches[strings.TrimPrefix(k, "x-api-field-")] = v | ||||||
|  | 			delete(md, k) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// map of all fields | ||||||
|  | 	req := make(map[string]interface{}, len(md)) | ||||||
|  |  | ||||||
|  | 	// get fields from url values | ||||||
|  | 	if len(r.URL.RawQuery) > 0 { | ||||||
|  | 		umd := make(map[string]interface{}) | ||||||
|  | 		err = qson.Unmarshal(&umd, r.URL.RawQuery) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		for k, v := range umd { | ||||||
|  | 			matches[k] = v | ||||||
| 		} | 		} | ||||||
| 		delete(md, k) |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// restore context without fields | 	// restore context without fields | ||||||
| 	ctx = metadata.NewContext(ctx, md) | 	*r = *r.Clone(metadata.NewContext(ctx, md)) | ||||||
| 	*r = *r.WithContext(ctx) |  | ||||||
| 	req := make(map[string]interface{}, len(md)) |  | ||||||
| 	for k, v := range matches { | 	for k, v := range matches { | ||||||
| 		ps := strings.Split(k, ".") | 		ps := strings.Split(k, ".") | ||||||
| 		if len(ps) == 1 { | 		if len(ps) == 1 { | ||||||
| 			req[k] = v | 			req[k] = v | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		em := make(map[string]interface{}) | 		em := make(map[string]interface{}) | ||||||
| 		em[ps[len(ps)-1]] = v | 		em[ps[len(ps)-1]] = v | ||||||
| 		for i := len(ps) - 2; i > 0; i-- { | 		for i := len(ps) - 2; i > 0; i-- { | ||||||
| @@ -311,7 +339,16 @@ func requestPayload(r *http.Request) ([]byte, error) { | |||||||
| 			nm[ps[i]] = em | 			nm[ps[i]] = em | ||||||
| 			em = nm | 			em = nm | ||||||
| 		} | 		} | ||||||
| 		req[ps[0]] = em | 		if vm, ok := req[ps[0]]; ok { | ||||||
|  | 			// nested map | ||||||
|  | 			nm := vm.(map[string]interface{}) | ||||||
|  | 			for vk, vv := range em { | ||||||
|  | 				nm[vk] = vv | ||||||
|  | 			} | ||||||
|  | 			req[ps[0]] = nm | ||||||
|  | 		} else { | ||||||
|  | 			req[ps[0]] = em | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| 	pathbuf := []byte("{}") | 	pathbuf := []byte("{}") | ||||||
| 	if len(req) > 0 { | 	if len(req) > 0 { | ||||||
| @@ -320,14 +357,8 @@ func requestPayload(r *http.Request) ([]byte, error) { | |||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	urlbuf := []byte("{}") |  | ||||||
| 	if len(r.URL.RawQuery) > 0 { |  | ||||||
| 		urlbuf, err = qson.ToJSON(r.URL.RawQuery) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return nil, err |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
|  | 	urlbuf := []byte("{}") | ||||||
| 	out, err := jsonpatch.MergeMergePatches(urlbuf, pathbuf) | 	out, err := jsonpatch.MergeMergePatches(urlbuf, pathbuf) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
|   | |||||||
| @@ -255,14 +255,14 @@ func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) { | |||||||
| 			} | 			} | ||||||
| 			pMatch = true | 			pMatch = true | ||||||
| 			ctx := req.Context() | 			ctx := req.Context() | ||||||
| 			md, ok := metadata.FromContext(ctx) | 			md, ok := ctx.Value(metadata.MetadataKey{}).(metadata.Metadata) | ||||||
| 			if !ok { | 			if !ok { | ||||||
| 				md = make(metadata.Metadata) | 				md = make(metadata.Metadata) | ||||||
| 			} | 			} | ||||||
| 			for k, v := range matches { | 			for k, v := range matches { | ||||||
| 				md[fmt.Sprintf("x-api-field-%s", k)] = v | 				md[fmt.Sprintf("x-api-field-%s", k)] = v | ||||||
| 			} | 			} | ||||||
| 			*req = *req.WithContext(context.WithValue(ctx, metadata.MetadataKey{}, md)) | 			*req = *req.Clone(context.WithValue(ctx, metadata.MetadataKey{}, md)) | ||||||
| 			break pathLoop | 			break pathLoop | ||||||
| 		} | 		} | ||||||
| 		if !pMatch { | 		if !pMatch { | ||||||
|   | |||||||
| @@ -221,7 +221,9 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// set timeout in nanoseconds | 	// set timeout in nanoseconds | ||||||
| 	header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) | 	if opts.StreamTimeout > time.Duration(0) { | ||||||
|  | 		header["timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) | ||||||
|  | 	} | ||||||
| 	// set the content type for the request | 	// set the content type for the request | ||||||
| 	header["x-content-type"] = req.ContentType() | 	header["x-content-type"] = req.ContentType() | ||||||
|  |  | ||||||
|   | |||||||
| @@ -57,6 +57,8 @@ type CallOptions struct { | |||||||
| 	Retries int | 	Retries int | ||||||
| 	// Request/Response timeout | 	// Request/Response timeout | ||||||
| 	RequestTimeout time.Duration | 	RequestTimeout time.Duration | ||||||
|  | 	// Stream timeout for the stream | ||||||
|  | 	StreamTimeout time.Duration | ||||||
| 	// Use the services own auth token | 	// Use the services own auth token | ||||||
| 	ServiceToken bool | 	ServiceToken bool | ||||||
|  |  | ||||||
| @@ -227,6 +229,13 @@ func RequestTimeout(d time.Duration) Option { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // StreamTimeout sets the stream timeout | ||||||
|  | func StreamTimeout(d time.Duration) Option { | ||||||
|  | 	return func(o *Options) { | ||||||
|  | 		o.CallOptions.StreamTimeout = d | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| // Transport dial timeout | // Transport dial timeout | ||||||
| func DialTimeout(d time.Duration) Option { | func DialTimeout(d time.Duration) Option { | ||||||
| 	return func(o *Options) { | 	return func(o *Options) { | ||||||
| @@ -295,6 +304,13 @@ func WithRequestTimeout(d time.Duration) CallOption { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // WithStreamTimeout sets the stream timeout | ||||||
|  | func WithStreamTimeout(d time.Duration) CallOption { | ||||||
|  | 	return func(o *CallOptions) { | ||||||
|  | 		o.StreamTimeout = d | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| // WithDialTimeout is a CallOption which overrides that which | // WithDialTimeout is a CallOption which overrides that which | ||||||
| // set in Options.CallOptions | // set in Options.CallOptions | ||||||
| func WithDialTimeout(d time.Duration) CallOption { | func WithDialTimeout(d time.Duration) CallOption { | ||||||
|   | |||||||
| @@ -198,7 +198,9 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// set timeout in nanoseconds | 	// set timeout in nanoseconds | ||||||
| 	msg.Header["Timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) | 	if opts.StreamTimeout > time.Duration(0) { | ||||||
|  | 		msg.Header["Timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) | ||||||
|  | 	} | ||||||
| 	// set the content type for the request | 	// set the content type for the request | ||||||
| 	msg.Header["Content-Type"] = req.ContentType() | 	msg.Header["Content-Type"] = req.ContentType() | ||||||
| 	// set the accept header | 	// set the accept header | ||||||
|   | |||||||
| @@ -16,6 +16,8 @@ type Config interface { | |||||||
| 	reader.Values | 	reader.Values | ||||||
| 	// Init the config | 	// Init the config | ||||||
| 	Init(opts ...Option) error | 	Init(opts ...Option) error | ||||||
|  | 	// Options in the config | ||||||
|  | 	Options() Options | ||||||
| 	// Stop the config loader/watcher | 	// Stop the config loader/watcher | ||||||
| 	Close() error | 	Close() error | ||||||
| 	// Load config sources | 	// Load config sources | ||||||
|   | |||||||
| @@ -67,6 +67,10 @@ func (c *config) Init(opts ...Option) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (c *config) Options() Options { | ||||||
|  | 	return c.opts | ||||||
|  | } | ||||||
|  |  | ||||||
| func (c *config) run() { | func (c *config) run() { | ||||||
| 	watch := func(w loader.Watcher) error { | 	watch := func(w loader.Watcher) error { | ||||||
| 		for { | 		for { | ||||||
|   | |||||||
| @@ -25,6 +25,13 @@ func (md Metadata) Get(key string) (string, bool) { | |||||||
| 	return val, ok | 	return val, ok | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (md Metadata) Delete(key string) { | ||||||
|  | 	// delete key as-is | ||||||
|  | 	delete(md, key) | ||||||
|  | 	// delete also Title key | ||||||
|  | 	delete(md, strings.Title(key)) | ||||||
|  | } | ||||||
|  |  | ||||||
| // Copy makes a copy of the metadata | // Copy makes a copy of the metadata | ||||||
| func Copy(md Metadata) Metadata { | func Copy(md Metadata) Metadata { | ||||||
| 	cmd := make(Metadata) | 	cmd := make(Metadata) | ||||||
| @@ -34,13 +41,22 @@ func Copy(md Metadata) Metadata { | |||||||
| 	return cmd | 	return cmd | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Delete key from metadata | ||||||
|  | func Delete(ctx context.Context, k string) context.Context { | ||||||
|  | 	return Set(ctx, k, "") | ||||||
|  | } | ||||||
|  |  | ||||||
| // Set add key with val to metadata | // Set add key with val to metadata | ||||||
| func Set(ctx context.Context, k, v string) context.Context { | func Set(ctx context.Context, k, v string) context.Context { | ||||||
| 	md, ok := FromContext(ctx) | 	md, ok := FromContext(ctx) | ||||||
| 	if !ok { | 	if !ok { | ||||||
| 		md = make(Metadata) | 		md = make(Metadata) | ||||||
| 	} | 	} | ||||||
| 	md[k] = v | 	if v == "" { | ||||||
|  | 		delete(md, k) | ||||||
|  | 	} else { | ||||||
|  | 		md[k] = v | ||||||
|  | 	} | ||||||
| 	return context.WithValue(ctx, MetadataKey{}, md) | 	return context.WithValue(ctx, MetadataKey{}, md) | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -96,8 +112,10 @@ func MergeContext(ctx context.Context, patchMd Metadata, overwrite bool) context | |||||||
| 	for k, v := range patchMd { | 	for k, v := range patchMd { | ||||||
| 		if _, ok := cmd[k]; ok && !overwrite { | 		if _, ok := cmd[k]; ok && !overwrite { | ||||||
| 			// skip | 			// skip | ||||||
| 		} else { | 		} else if v != "" { | ||||||
| 			cmd[k] = v | 			cmd[k] = v | ||||||
|  | 		} else { | ||||||
|  | 			delete(cmd, k) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	return context.WithValue(ctx, MetadataKey{}, cmd) | 	return context.WithValue(ctx, MetadataKey{}, cmd) | ||||||
|   | |||||||
| @@ -18,6 +18,27 @@ func TestMetadataSet(t *testing.T) { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestMetadataDelete(t *testing.T) { | ||||||
|  | 	md := Metadata{ | ||||||
|  | 		"Foo": "bar", | ||||||
|  | 		"Baz": "empty", | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	ctx := NewContext(context.TODO(), md) | ||||||
|  | 	ctx = Delete(ctx, "Baz") | ||||||
|  |  | ||||||
|  | 	emd, ok := FromContext(ctx) | ||||||
|  | 	if !ok { | ||||||
|  | 		t.Fatal("key Key not found") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	_, ok = emd["Baz"] | ||||||
|  | 	if ok { | ||||||
|  | 		t.Fatal("key Baz not deleted") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | } | ||||||
|  |  | ||||||
| func TestMetadataCopy(t *testing.T) { | func TestMetadataCopy(t *testing.T) { | ||||||
| 	md := Metadata{ | 	md := Metadata{ | ||||||
| 		"Foo": "bar", | 		"Foo": "bar", | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user