add for breaking changes in go-micro
This commit is contained in:
parent
4a8855f478
commit
3d3d9bb1e7
40
http.go
40
http.go
@ -95,7 +95,7 @@ func (h *httpClient) call(ctx context.Context, address string, req client.Reques
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) stream(ctx context.Context, address string, req client.Request, opts client.CallOptions) (client.Streamer, error) {
|
func (h *httpClient) stream(ctx context.Context, address string, req client.Request, opts client.CallOptions) (client.Stream, error) {
|
||||||
header := make(http.Header)
|
header := make(http.Header)
|
||||||
if md, ok := metadata.FromContext(ctx); ok {
|
if md, ok := metadata.FromContext(ctx); ok {
|
||||||
for k, v := range md {
|
for k, v := range md {
|
||||||
@ -159,22 +159,14 @@ func (h *httpClient) Options() client.Options {
|
|||||||
return h.opts
|
return h.opts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) NewPublication(topic string, msg interface{}) client.Publication {
|
func (h *httpClient) NewMessage(topic string, msg interface{}) client.Message {
|
||||||
return newHTTPPublication(topic, msg, "application/proto")
|
return newHTTPMessage(topic, msg, "application/proto")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
|
func (h *httpClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
|
||||||
return newHTTPRequest(service, method, req, h.opts.ContentType, reqOpts...)
|
return newHTTPRequest(service, method, req, h.opts.ContentType, reqOpts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) NewProtoRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
|
|
||||||
return newHTTPRequest(service, method, req, "application/proto", reqOpts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *httpClient) NewJsonRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
|
|
||||||
return newHTTPRequest(service, method, req, "application/json", reqOpts...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
||||||
// make a copy of call opts
|
// make a copy of call opts
|
||||||
callOpts := h.opts.CallOptions
|
callOpts := h.opts.CallOptions
|
||||||
@ -283,15 +275,7 @@ func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface
|
|||||||
return gerr
|
return gerr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) CallRemote(ctx context.Context, addr string, req client.Request, rsp interface{}, opts ...client.CallOption) error {
|
func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
|
||||||
callOpts := h.opts.CallOptions
|
|
||||||
for _, opt := range opts {
|
|
||||||
opt(&callOpts)
|
|
||||||
}
|
|
||||||
return h.call(ctx, addr, req, rsp, callOpts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
|
|
||||||
// make a copy of call opts
|
// make a copy of call opts
|
||||||
callOpts := h.opts.CallOptions
|
callOpts := h.opts.CallOptions
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
@ -325,7 +309,7 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
call := func(i int) (client.Streamer, error) {
|
call := func(i int) (client.Stream, error) {
|
||||||
// call backoff first. Someone may want an initial start delay
|
// call backoff first. Someone may want an initial start delay
|
||||||
t, err := callOpts.Backoff(ctx, req, i)
|
t, err := callOpts.Backoff(ctx, req, i)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -355,7 +339,7 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
|||||||
}
|
}
|
||||||
|
|
||||||
type response struct {
|
type response struct {
|
||||||
stream client.Streamer
|
stream client.Stream
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -393,15 +377,7 @@ func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
|||||||
return nil, grr
|
return nil, grr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpClient) StreamRemote(ctx context.Context, addr string, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
|
func (h *httpClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
|
||||||
callOpts := h.opts.CallOptions
|
|
||||||
for _, opt := range opts {
|
|
||||||
opt(&callOpts)
|
|
||||||
}
|
|
||||||
return h.stream(ctx, addr, req, callOpts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *httpClient) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error {
|
|
||||||
md, ok := metadata.FromContext(ctx)
|
md, ok := metadata.FromContext(ctx)
|
||||||
if !ok {
|
if !ok {
|
||||||
md = make(map[string]string)
|
md = make(map[string]string)
|
||||||
@ -414,7 +390,7 @@ func (h *httpClient) Publish(ctx context.Context, p client.Publication, opts ...
|
|||||||
}
|
}
|
||||||
|
|
||||||
b := &buffer{bytes.NewBuffer(nil)}
|
b := &buffer{bytes.NewBuffer(nil)}
|
||||||
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil {
|
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Payload()); err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,28 +4,28 @@ import (
|
|||||||
"github.com/micro/go-micro/client"
|
"github.com/micro/go-micro/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
type httpPublication struct {
|
type httpMessage struct {
|
||||||
topic string
|
topic string
|
||||||
contentType string
|
contentType string
|
||||||
message interface{}
|
payload interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHTTPPublication(topic string, message interface{}, contentType string) client.Publication {
|
func newHTTPMessage(topic string, payload interface{}, contentType string) client.Message {
|
||||||
return &httpPublication{
|
return &httpMessage{
|
||||||
message: message,
|
payload: payload,
|
||||||
topic: topic,
|
topic: topic,
|
||||||
contentType: contentType,
|
contentType: contentType,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpPublication) ContentType() string {
|
func (h *httpMessage) ContentType() string {
|
||||||
return h.contentType
|
return h.contentType
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpPublication) Topic() string {
|
func (h *httpMessage) Topic() string {
|
||||||
return h.topic
|
return h.topic
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpPublication) Message() interface{} {
|
func (h *httpMessage) Payload() interface{} {
|
||||||
return h.message
|
return h.payload
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user