Compare commits
	
		
			6 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 3f3c3a4471 | |||
|  | cee0fde959 | ||
| b122fed3cc | |||
|  | fa00663740 | ||
| 905398dcb6 | |||
| a6dd9c0455 | 
| @@ -1,5 +1,5 @@ | ||||
| # HTTP Client | ||||
|  | ||||
|  | ||||
|  | ||||
| This plugin is an HTTP client for [Micro](https://pkg.go.dev/go.unistack.org/micro/v3). | ||||
| It implements the [micro.Client](https://pkg.go.dev/go.unistack.org/micro/v3/client#Client) interface. | ||||
| @@ -13,6 +13,7 @@ implements HTTP rules defined in the [google/api/http.proto](https://github.com/ | ||||
|  | ||||
| * Streaming is not yet implemented. | ||||
| * Only protobuf-generated messages are supported. | ||||
| * In `micro/v3`, metadata is implemented as `map[string]string`, which works for most headers but not for multiple `Set-Cookie` headers. The HTTP specification forbids the use of commas in `Set-Cookie` headers; therefore, their values cannot be parsed reliably. In `micro/v4`, metadata uses `map[string][]string`, resolving this issue. | ||||
|  | ||||
| ## Usage | ||||
|  | ||||
|   | ||||
							
								
								
									
										74
									
								
								client.go
									
									
									
									
									
								
							
							
						
						
									
										74
									
								
								client.go
									
									
									
									
									
								
							| @@ -12,16 +12,22 @@ import ( | ||||
| 	"go.unistack.org/micro/v3/options" | ||||
| 	"go.unistack.org/micro/v3/semconv" | ||||
| 	"go.unistack.org/micro/v3/tracer" | ||||
|  | ||||
| 	"go.unistack.org/micro-client-http/v3/status" | ||||
| ) | ||||
|  | ||||
| var _ client.Client = (*Client)(nil) | ||||
|  | ||||
| var DefaultContentType = "application/json" | ||||
|  | ||||
| type Client struct { | ||||
| 	funcCall   client.FuncCall | ||||
| 	funcStream client.FuncStream | ||||
| 	httpClient *http.Client | ||||
| 	opts       client.Options | ||||
| 	mu         sync.RWMutex | ||||
| 	funcPublish      client.FuncPublish | ||||
| 	funcBatchPublish client.FuncBatchPublish | ||||
| 	funcCall         client.FuncCall | ||||
| 	funcStream       client.FuncStream | ||||
| 	httpClient       *http.Client | ||||
| 	opts             client.Options | ||||
| 	mu               sync.RWMutex | ||||
| } | ||||
|  | ||||
| func NewClient(opts ...client.Option) *Client { | ||||
| @@ -43,6 +49,8 @@ func NewClient(opts ...client.Option) *Client { | ||||
| 		c.httpClient = defaultHTTPClient(dialer, clientOpts.TLSConfig) | ||||
| 	} | ||||
|  | ||||
| 	c.funcPublish = c.fnPublish | ||||
| 	c.funcBatchPublish = c.fnBatchPublish | ||||
| 	c.funcCall = c.fnCall | ||||
| 	c.funcStream = c.fnStream | ||||
|  | ||||
| @@ -64,6 +72,10 @@ func (c *Client) Init(opts ...client.Option) error { | ||||
| 			c.funcCall = h(c.funcCall) | ||||
| 		case client.HookStream: | ||||
| 			c.funcStream = h(c.funcStream) | ||||
| 		case client.HookPublish: | ||||
| 			c.funcPublish = h(c.funcPublish) | ||||
| 		case client.HookBatchPublish: | ||||
| 			c.funcBatchPublish = h(c.funcBatchPublish) | ||||
| 		} | ||||
| 	}) | ||||
|  | ||||
| @@ -74,6 +86,20 @@ func (c *Client) Options() client.Options { | ||||
| 	return c.opts | ||||
| } | ||||
|  | ||||
| func (c *Client) NewMessage(topic string, msg interface{}, opts ...client.MessageOption) client.Message { | ||||
| 	msgOpts := client.NewMessageOptions(opts...) | ||||
| 	if msgOpts.ContentType == "" { | ||||
| 		msgOpts.ContentType = c.opts.ContentType | ||||
| 	} | ||||
|  | ||||
| 	return &httpMessage{ | ||||
| 		topic:   topic, | ||||
| 		payload: msg, | ||||
| 		opts:    msgOpts, | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| func (c *Client) NewRequest(service, method string, req any, opts ...client.RequestOption) client.Request { | ||||
| 	reqOpts := client.NewRequestOptions(opts...) | ||||
| 	if reqOpts.ContentType == "" { | ||||
| @@ -91,25 +117,45 @@ func (c *Client) NewRequest(service, method string, req any, opts ...client.Requ | ||||
| func (c *Client) Call(ctx context.Context, req client.Request, rsp any, opts ...client.CallOption) error { | ||||
| 	ts := time.Now() | ||||
| 	c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Inc() | ||||
|  | ||||
| 	var sp tracer.Span | ||||
| 	ctx, sp = c.opts.Tracer.Start(ctx, req.Endpoint()+" rpc-client", | ||||
| 		tracer.WithSpanKind(tracer.SpanKindClient), | ||||
| 		tracer.WithSpanLabels("endpoint", req.Endpoint()), | ||||
| 	) | ||||
| 	defer sp.Finish() | ||||
|  | ||||
| 	err := c.funcCall(ctx, req, rsp, opts...) | ||||
|  | ||||
| 	c.opts.Meter.Counter(semconv.ClientRequestInflight, "endpoint", req.Endpoint()).Dec() | ||||
| 	te := time.Since(ts) | ||||
| 	c.opts.Meter.Summary(semconv.ClientRequestLatencyMicroseconds, "endpoint", req.Endpoint()).Update(te.Seconds()) | ||||
| 	c.opts.Meter.Histogram(semconv.ClientRequestDurationSeconds, "endpoint", req.Endpoint()).Update(te.Seconds()) | ||||
|  | ||||
| 	if me := errors.FromError(err); me == nil { | ||||
| 		sp.Finish() | ||||
| 		c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "success", "code", strconv.Itoa(int(200))).Inc() | ||||
| 	} else { | ||||
| 	var ( | ||||
| 		statusCode  int | ||||
| 		statusLabel string | ||||
| 	) | ||||
|  | ||||
| 	if err == nil { | ||||
| 		statusCode = http.StatusOK | ||||
| 		statusLabel = "success" | ||||
| 	} else if st, ok := status.FromError(err); ok { | ||||
| 		statusCode = st.Code() | ||||
| 		statusLabel = "failure" | ||||
| 		sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 	} else if me := errors.FromError(err); me != nil { | ||||
| 		statusCode = int(me.Code) | ||||
| 		statusLabel = "failure" | ||||
| 		sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 	} else { | ||||
| 		statusCode = http.StatusInternalServerError | ||||
| 		statusLabel = "failure" | ||||
| 		sp.SetStatus(tracer.SpanStatusError, err.Error()) | ||||
| 		c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", "failure", "code", strconv.Itoa(int(me.Code))).Inc() | ||||
| 	} | ||||
|  | ||||
| 	c.opts.Meter.Counter(semconv.ClientRequestTotal, "endpoint", req.Endpoint(), "status", statusLabel, "code", strconv.Itoa(statusCode)).Inc() | ||||
|  | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| @@ -117,6 +163,14 @@ func (c *Client) Stream(ctx context.Context, req client.Request, opts ...client. | ||||
| 	return c.funcStream(ctx, req, opts...) | ||||
| } | ||||
|  | ||||
| func (c *Client) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { | ||||
| 	return c.funcPublish(ctx, p, opts...) | ||||
| } | ||||
|  | ||||
| func (c *Client) BatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { | ||||
| 	return c.funcBatchPublish(ctx, ps, opts...) | ||||
| } | ||||
|  | ||||
| func (c *Client) String() string { | ||||
| 	return "http" | ||||
| } | ||||
|   | ||||
| @@ -119,10 +119,23 @@ func buildHTTPRequest( | ||||
| 	} | ||||
|  | ||||
| 	if log.V(logger.DebugLevel) { | ||||
| 		log.Debug( | ||||
| 			ctx, | ||||
| 			fmt.Sprintf("request %s to %s with headers %v body %s", method, u.String(), hreq.Header, body), | ||||
| 		) | ||||
| 		if shouldLogBody(ct) { | ||||
| 			log.Debug( | ||||
| 				ctx, | ||||
| 				fmt.Sprintf( | ||||
| 					"micro.client http request: method=%s url=%s headers=%v body=%s", | ||||
| 					method, u.String(), hreq.Header, body, | ||||
| 				), | ||||
| 			) | ||||
| 		} else { | ||||
| 			log.Debug( | ||||
| 				ctx, | ||||
| 				fmt.Sprintf( | ||||
| 					"micro.client http request: method=%s url=%s headers=%v", | ||||
| 					method, u.String(), hreq.Header, | ||||
| 				), | ||||
| 			) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return hreq, nil | ||||
| @@ -259,3 +272,18 @@ func validateHeadersAndCookies(r *http.Request, parameters map[string]map[string | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func shouldLogBody(contentType string) bool { | ||||
| 	ct := strings.ToLower(strings.Split(contentType, ";")[0]) | ||||
| 	switch { | ||||
| 	case strings.HasPrefix(ct, "text/"): // => text/html, text/plain, text/csv etc. | ||||
| 		return true | ||||
| 	case ct == "application/json", | ||||
| 		ct == "application/xml", | ||||
| 		ct == "application/x-www-form-urlencoded", | ||||
| 		ct == "application/yaml": | ||||
| 		return true | ||||
| 	default: | ||||
| 		return false | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -399,3 +399,49 @@ func TestValidateHeadersAndCookies(t *testing.T) { | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestShouldLogBody(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		name        string | ||||
| 		contentType string | ||||
| 		want        bool | ||||
| 	}{ | ||||
| 		// --- text/* | ||||
| 		{"plain text", "text/plain", true}, | ||||
| 		{"html", "text/html", true}, | ||||
| 		{"csv", "text/csv", true}, | ||||
| 		{"yaml text", "text/yaml", true}, | ||||
|  | ||||
| 		// --- application/* | ||||
| 		{"json", "application/json", true}, | ||||
| 		{"xml", "application/xml", true}, | ||||
| 		{"form-urlencoded", "application/x-www-form-urlencoded", true}, | ||||
| 		{"yaml", "application/yaml", true}, | ||||
|  | ||||
| 		// --- with parameters | ||||
| 		{"json with charset", "application/json; charset=utf-8", true}, | ||||
| 		{"binary with charset", "application/octet-stream; charset=utf-8", false}, | ||||
|  | ||||
| 		// --- binary | ||||
| 		{"multipart form", "multipart/form-data", false}, | ||||
| 		{"binary stream", "application/octet-stream", false}, | ||||
| 		{"pdf", "application/pdf", false}, | ||||
| 		{"protobuf", "application/protobuf", false}, | ||||
| 		{"image", "image/png", false}, | ||||
|  | ||||
| 		// --- edge cases | ||||
| 		{"upper case type", "APPLICATION/JSON", true}, | ||||
| 		{"mixed case type", "Text/HTML", true}, | ||||
| 		{"unknown text prefix", "TEXT/FOO", true}, | ||||
| 		{"weird semicolon only", ";", false}, | ||||
| 		{"spaces only", "   ", false}, | ||||
| 		{"empty content-type", "", false}, | ||||
| 		{"missing main type", "/plain", false}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tt := range tests { | ||||
| 		t.Run(tt.name, func(t *testing.T) { | ||||
| 			require.Equal(t, tt.want, shouldLogBody(tt.contentType)) | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|   | ||||
							
								
								
									
										84
									
								
								client_publish.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										84
									
								
								client_publish.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,84 @@ | ||||
| package http | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"os" | ||||
|  | ||||
| 	"go.unistack.org/micro/v3/broker" | ||||
| 	"go.unistack.org/micro/v3/client" | ||||
| 	"go.unistack.org/micro/v3/codec" | ||||
| 	"go.unistack.org/micro/v3/errors" | ||||
| 	"go.unistack.org/micro/v3/metadata" | ||||
| ) | ||||
|  | ||||
| func (c *Client) publish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { | ||||
| 	var body []byte | ||||
|  | ||||
| 	options := client.NewPublishOptions(opts...) | ||||
|  | ||||
| 	// get proxy | ||||
| 	exchange := "" | ||||
| 	if v, ok := os.LookupEnv("MICRO_PROXY"); ok { | ||||
| 		exchange = v | ||||
| 	} | ||||
| 	// get the exchange | ||||
| 	if len(options.Exchange) > 0 { | ||||
| 		exchange = options.Exchange | ||||
| 	} | ||||
|  | ||||
| 	msgs := make([]*broker.Message, 0, len(ps)) | ||||
|  | ||||
| 	omd, ok := metadata.FromOutgoingContext(ctx) | ||||
| 	if !ok { | ||||
| 		omd = metadata.New(2) | ||||
| 	} | ||||
|  | ||||
| 	for _, p := range ps { | ||||
| 		md := metadata.Copy(omd) | ||||
| 		topic := p.Topic() | ||||
| 		if len(exchange) > 0 { | ||||
| 			topic = exchange | ||||
| 		} | ||||
| 		md.Set(metadata.HeaderTopic, topic) | ||||
| 		iter := p.Metadata().Iterator() | ||||
| 		var k, v string | ||||
| 		for iter.Next(&k, &v) { | ||||
| 			md.Set(k, v) | ||||
| 		} | ||||
|  | ||||
| 		md[metadata.HeaderContentType] = p.ContentType() | ||||
|  | ||||
| 		// passed in raw data | ||||
| 		if d, ok := p.Payload().(*codec.Frame); ok { | ||||
| 			body = d.Data | ||||
| 		} else { | ||||
| 			// use codec for payload | ||||
| 			cf, err := c.newCodec(p.ContentType()) | ||||
| 			if err != nil { | ||||
| 				return errors.InternalServerError("go.micro.client", "%+v", err) | ||||
| 			} | ||||
| 			// set the body | ||||
| 			b, err := cf.Marshal(p.Payload()) | ||||
| 			if err != nil { | ||||
| 				return errors.InternalServerError("go.micro.client", "%+v", err) | ||||
| 			} | ||||
| 			body = b | ||||
| 		} | ||||
| 		msgs = append(msgs, &broker.Message{Header: md, Body: body}) | ||||
| 	} | ||||
|  | ||||
| 	return c.opts.Broker.BatchPublish( | ||||
| 		ctx, | ||||
| 		msgs, | ||||
| 		broker.PublishContext(options.Context), | ||||
| 		broker.PublishBodyOnly(options.BodyOnly), | ||||
| 	) | ||||
| } | ||||
|  | ||||
| func (c *Client) fnPublish(ctx context.Context, p client.Message, opts ...client.PublishOption) error { | ||||
| 	return c.publish(ctx, []client.Message{p}, opts...) | ||||
| } | ||||
|  | ||||
| func (c *Client) fnBatchPublish(ctx context.Context, ps []client.Message, opts ...client.PublishOption) error { | ||||
| 	return c.publish(ctx, ps, opts...) | ||||
| } | ||||
| @@ -207,8 +207,6 @@ func (c *Client) parseRsp(ctx context.Context, hrsp *http.Response, rsp any, opt | ||||
| 	default: | ||||
| 	} | ||||
|  | ||||
| 	var buf []byte | ||||
|  | ||||
| 	if opts.ResponseMetadata != nil { | ||||
| 		for k, v := range hrsp.Header { | ||||
| 			opts.ResponseMetadata.Set(k, strings.Join(v, ",")) | ||||
| @@ -224,6 +222,8 @@ func (c *Client) parseRsp(ctx context.Context, hrsp *http.Response, rsp any, opt | ||||
| 		ct = htype | ||||
| 	} | ||||
|  | ||||
| 	var buf []byte | ||||
|  | ||||
| 	if hrsp.Body != nil { | ||||
| 		var err error | ||||
| 		buf, err = io.ReadAll(hrsp.Body) | ||||
| @@ -232,15 +232,31 @@ func (c *Client) parseRsp(ctx context.Context, hrsp *http.Response, rsp any, opt | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if log.V(logger.DebugLevel) { | ||||
| 		if shouldLogBody(ct) { | ||||
| 			log.Debug( | ||||
| 				ctx, | ||||
| 				fmt.Sprintf( | ||||
| 					"micro.client http response: status=%s headers=%v body=%s", | ||||
| 					hrsp.Status, hrsp.Header, buf, | ||||
| 				), | ||||
| 			) | ||||
| 		} else { | ||||
| 			log.Debug( | ||||
| 				ctx, | ||||
| 				fmt.Sprintf( | ||||
| 					"micro.client http response: status=%s headers=%v", | ||||
| 					hrsp.Status, hrsp.Header, | ||||
| 				), | ||||
| 			) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	cf, err := c.newCodec(ct) | ||||
| 	if err != nil { | ||||
| 		return errors.InternalServerError("go.micro.client", "unknown content-type %s: %v", ct, err) | ||||
| 	} | ||||
|  | ||||
| 	if log.V(logger.DebugLevel) { | ||||
| 		log.Debug(ctx, fmt.Sprintf("response with headers: %v and body: %s", hrsp.Header, buf)) | ||||
| 	} | ||||
|  | ||||
| 	if hrsp.StatusCode < http.StatusBadRequest { | ||||
| 		if err = cf.Unmarshal(buf, rsp); err != nil { | ||||
| 			return errors.InternalServerError("go.micro.client", "unmarshal response: %v", err) | ||||
|   | ||||
							
								
								
									
										2
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.mod
									
									
									
									
									
								
							| @@ -7,7 +7,7 @@ toolchain go1.24.3 | ||||
| require ( | ||||
| 	github.com/stretchr/testify v1.11.1 | ||||
| 	go.unistack.org/micro-codec-json/v3 v3.10.3 | ||||
| 	go.unistack.org/micro/v3 v3.11.45 | ||||
| 	go.unistack.org/micro/v3 v3.11.48 | ||||
| 	google.golang.org/protobuf v1.36.10 | ||||
| ) | ||||
|  | ||||
|   | ||||
							
								
								
									
										4
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								go.sum
									
									
									
									
									
								
							| @@ -29,8 +29,8 @@ go.unistack.org/micro-codec-json/v3 v3.10.3 h1:FwSBfJswov30Dyqxp1XfQW1EG4h77uTEe | ||||
| go.unistack.org/micro-codec-json/v3 v3.10.3/go.mod h1:26OK5MizMNKhspGC6PRVwpDIp5w1GmRb0nE5eRWWDxA= | ||||
| go.unistack.org/micro-proto/v3 v3.4.1 h1:UTjLSRz2YZuaHk9iSlVqqsA50JQNAEK2ZFboGqtEa9Q= | ||||
| go.unistack.org/micro-proto/v3 v3.4.1/go.mod h1:okx/cnOhzuCX0ggl/vToatbCupi0O44diiiLLsZ93Zo= | ||||
| go.unistack.org/micro/v3 v3.11.45 h1:fjTLZYWgsVf9FIMZBxOg8ios2/tmyimnjZrsrxEUeXU= | ||||
| go.unistack.org/micro/v3 v3.11.45/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4= | ||||
| go.unistack.org/micro/v3 v3.11.48 h1:lHJYSHU2z1TTcuswItGwG7cZXN6n04EFqY7lk/0gA7w= | ||||
| go.unistack.org/micro/v3 v3.11.48/go.mod h1:fDQ8Mu9wubaFP0L8hNQlpzHiEnWN0wbOlawN9HYo0N4= | ||||
| golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= | ||||
| golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= | ||||
| golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= | ||||
|   | ||||
							
								
								
									
										28
									
								
								message.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								message.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,28 @@ | ||||
| package http | ||||
|  | ||||
| import ( | ||||
| 	"go.unistack.org/micro/v3/client" | ||||
| 	"go.unistack.org/micro/v3/metadata" | ||||
| ) | ||||
|  | ||||
| type httpMessage struct { | ||||
| 	topic   string | ||||
| 	payload interface{} | ||||
| 	opts    client.MessageOptions | ||||
| } | ||||
|  | ||||
| func (m *httpMessage) Topic() string { | ||||
| 	return m.topic | ||||
| } | ||||
|  | ||||
| func (m *httpMessage) Payload() interface{} { | ||||
| 	return m.payload | ||||
| } | ||||
|  | ||||
| func (m *httpMessage) ContentType() string { | ||||
| 	return m.opts.ContentType | ||||
| } | ||||
|  | ||||
| func (m *httpMessage) Metadata() metadata.Metadata { | ||||
| 	return m.opts.Metadata | ||||
| } | ||||
		Reference in New Issue
	
	Block a user