From 0f3fd4b1f9c5cd14f2b878bcfcfdc91a13c172a7 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sun, 1 Jan 2017 18:39:05 +0000 Subject: [PATCH] add a http client --- README.md | 43 +++++ buffer.go | 14 ++ codec.go | 61 ++++++ http.go | 485 +++++++++++++++++++++++++++++++++++++++++++++++ http_test.go | 280 +++++++++++++++++++++++++++ proto/test.pb.go | 60 ++++++ proto/test.proto | 8 + publication.go | 31 +++ request.go | 48 +++++ stream.go | 128 +++++++++++++ 10 files changed, 1158 insertions(+) create mode 100644 README.md create mode 100644 buffer.go create mode 100644 codec.go create mode 100644 http.go create mode 100644 http_test.go create mode 100644 proto/test.pb.go create mode 100644 proto/test.proto create mode 100644 publication.go create mode 100644 request.go create mode 100644 stream.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..19eb3ff --- /dev/null +++ b/README.md @@ -0,0 +1,43 @@ +# HTTP Client + +This plugin is a http client for go-micro. + +The http client wraps net/http to provide a robust go-micro client with service discovery, load balancing and streaming. + +## Usage + +### Use directly + +```go +import "github.com/micro/go-plugins/client/http" + +service := micro.NewService( + micro.Name("my.service"), + micro.Client(http.NewClient()), +) +``` + +### Use with flags + +```go +import _ "github.com/micro/go-plugins/client/http" +``` + +```shell +go run main.go --client=http +``` + +### Call Service + +Assuming you have a http service "my.service" with path "/foo/bar" +```go +// new client +client := http.NewClient() + +// create request/response +request := client.NewRequest("my.service", "/foo/bar", &proto.Request{}) +response := new(proto.Response) + +// call service +err := client.Call(context.TODO(), request, response) +``` diff --git a/buffer.go b/buffer.go new file mode 100644 index 0000000..b38c9d8 --- /dev/null +++ b/buffer.go @@ -0,0 +1,14 @@ +package http + +import ( + "bytes" +) + +type buffer struct { + *bytes.Buffer +} + +func (b *buffer) Close() error { + b.Buffer.Reset() + return nil +} diff --git a/codec.go b/codec.go new file mode 100644 index 0000000..745c9dc --- /dev/null +++ b/codec.go @@ -0,0 +1,61 @@ +package http + +import ( + "encoding/json" + + "github.com/golang/protobuf/proto" + "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/codec/jsonrpc" + "github.com/micro/go-micro/codec/protorpc" +) + +type jsonCodec struct{} + +type protoCodec struct{} + +type Codec interface { + Marshal(v interface{}) ([]byte, error) + Unmarshal(b []byte, v interface{}) error + String() string +} + +var ( + defaultHTTPCodecs = map[string]Codec{ + "application/json": jsonCodec{}, + "application/proto": protoCodec{}, + "application/protobuf": protoCodec{}, + "application/octet-stream": protoCodec{}, + } + + defaultRPCCodecs = map[string]codec.NewCodec{ + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, + "application/protobuf": protorpc.NewCodec, + "application/proto-rpc": protorpc.NewCodec, + "application/octet-stream": protorpc.NewCodec, + } +) + +func (protoCodec) Marshal(v interface{}) ([]byte, error) { + return proto.Marshal(v.(proto.Message)) +} + +func (protoCodec) Unmarshal(data []byte, v interface{}) error { + return proto.Unmarshal(data, v.(proto.Message)) +} + +func (protoCodec) String() string { + return "proto" +} + +func (jsonCodec) Marshal(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + +func (jsonCodec) Unmarshal(data []byte, v interface{}) error { + return json.Unmarshal(data, v) +} + +func (jsonCodec) String() string { + return "json" +} diff --git a/http.go b/http.go new file mode 100644 index 0000000..6fad15c --- /dev/null +++ b/http.go @@ -0,0 +1,485 @@ +package http + +import ( + "bufio" + "bytes" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "sync" + "time" + + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/codec" + errors "github.com/micro/go-micro/errors" + "github.com/micro/go-micro/metadata" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" + "github.com/micro/go-micro/transport" + + "golang.org/x/net/context" +) + +type httpClient struct { + once sync.Once + opts client.Options +} + +func init() { + cmd.DefaultClients["http"] = NewClient +} + +func (h *httpClient) call(ctx context.Context, address string, req client.Request, rsp interface{}, opts client.CallOptions) error { + header := make(http.Header) + if md, ok := metadata.FromContext(ctx); ok { + for k, v := range md { + header.Set(k, v) + } + } + + // set timeout in nanoseconds + header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout)) + // set the content type for the request + header.Set("Content-Type", req.ContentType()) + + // get codec + cf, err := h.newHTTPCodec(req.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // marshal request + b, err := cf.Marshal(req.Request()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + buf := &buffer{bytes.NewBuffer(b)} + defer buf.Close() + + hreq := &http.Request{ + Method: "POST", + URL: &url.URL{ + Scheme: "http", + Host: address, + Path: req.Method(), + }, + Header: header, + Body: buf, + ContentLength: int64(len(b)), + Host: address, + } + + // make the request + hrsp, err := http.DefaultClient.Do(hreq.WithContext(ctx)) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + defer hrsp.Body.Close() + + // parse response + b, err = ioutil.ReadAll(hrsp.Body) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // unmarshal + if err := cf.Unmarshal(b, rsp); err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + return nil +} + +func (h *httpClient) stream(ctx context.Context, address string, req client.Request, opts client.CallOptions) (client.Streamer, error) { + header := make(http.Header) + if md, ok := metadata.FromContext(ctx); ok { + for k, v := range md { + header.Set(k, v) + } + } + + // set timeout in nanoseconds + header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout)) + // set the content type for the request + header.Set("Content-Type", req.ContentType()) + + // get codec + cf, err := h.newHTTPCodec(req.ContentType()) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + cc, err := net.Dial("tcp", address) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error dialing: %v", err)) + } + + return &httpStream{ + address: address, + context: ctx, + closed: make(chan bool), + conn: cc, + codec: cf, + header: header, + reader: bufio.NewReader(cc), + request: req, + }, nil +} + +func (h *httpClient) newHTTPCodec(contentType string) (Codec, error) { + if c, ok := defaultHTTPCodecs[contentType]; ok { + return c, nil + } + return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) +} + +func (h *httpClient) newCodec(contentType string) (codec.NewCodec, error) { + if c, ok := h.opts.Codecs[contentType]; ok { + return c, nil + } + if cf, ok := defaultRPCCodecs[contentType]; ok { + return cf, nil + } + return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) +} + +func (h *httpClient) Init(opts ...client.Option) error { + for _, o := range opts { + o(&h.opts) + } + return nil +} + +func (h *httpClient) Options() client.Options { + return h.opts +} + +func (h *httpClient) NewPublication(topic string, msg interface{}) client.Publication { + return newHTTPPublication(topic, msg, "application/proto") +} + +func (h *httpClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { + return newHTTPRequest(service, method, req, h.opts.ContentType, reqOpts...) +} + +func (h *httpClient) NewProtoRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { + return newHTTPRequest(service, method, req, "application/proto", reqOpts...) +} + +func (h *httpClient) NewJsonRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request { + return newHTTPRequest(service, method, req, "application/json", reqOpts...) +} + +func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { + // make a copy of call opts + callOpts := h.opts.CallOptions + for _, opt := range opts { + opt(&callOpts) + } + + // get next nodes from the selector + next, err := h.opts.Selector.Select(req.Service(), callOpts.SelectOptions...) + if err != nil && err == selector.ErrNotFound { + return errors.NotFound("go.micro.client", err.Error()) + } else if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // check if we already have a deadline + d, ok := ctx.Deadline() + if !ok { + // no deadline so we create a new one + ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) + } else { + // got a deadline so no need to setup context + // but we need to set the timeout we pass along + opt := client.WithRequestTimeout(d.Sub(time.Now())) + opt(&callOpts) + } + + // should we noop right here? + select { + case <-ctx.Done(): + return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + default: + } + + // make copy of call method + hcall := h.call + + // wrap the call in reverse + for i := len(callOpts.CallWrappers); i > 0; i-- { + hcall = callOpts.CallWrappers[i-1](hcall) + } + + // return errors.New("go.micro.client", "request timeout", 408) + call := func(i int) error { + // call backoff first. Someone may want an initial start delay + t, err := callOpts.Backoff(ctx, req, i) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // only sleep if greater than 0 + if t.Seconds() > 0 { + time.Sleep(t) + } + + // select next node + node, err := next() + if err != nil && err == selector.ErrNotFound { + return errors.NotFound("go.micro.client", err.Error()) + } else if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + // set the address + addr := node.Address + if node.Port > 0 { + addr = fmt.Sprintf("%s:%d", addr, node.Port) + } + + // make the call + err = hcall(ctx, addr, req, rsp, callOpts) + h.opts.Selector.Mark(req.Service(), node, err) + return err + } + + ch := make(chan error, callOpts.Retries) + var gerr error + + for i := 0; i < callOpts.Retries; i++ { + go func() { + ch <- call(i) + }() + + select { + case <-ctx.Done(): + return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + case err := <-ch: + // if the call succeeded lets bail early + if err == nil { + return nil + } + + retry, rerr := callOpts.Retry(ctx, req, i, err) + if rerr != nil { + return rerr + } + + if !retry { + return err + } + + gerr = err + } + } + + return gerr +} + +func (h *httpClient) CallRemote(ctx context.Context, addr string, req client.Request, rsp interface{}, opts ...client.CallOption) error { + callOpts := h.opts.CallOptions + for _, opt := range opts { + opt(&callOpts) + } + return h.call(ctx, addr, req, rsp, callOpts) +} + +func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) { + // make a copy of call opts + callOpts := h.opts.CallOptions + for _, opt := range opts { + opt(&callOpts) + } + + // get next nodes from the selector + next, err := h.opts.Selector.Select(req.Service(), callOpts.SelectOptions...) + if err != nil && err == selector.ErrNotFound { + return nil, errors.NotFound("go.micro.client", err.Error()) + } else if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + // check if we already have a deadline + d, ok := ctx.Deadline() + if !ok { + // no deadline so we create a new one + ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout) + } else { + // got a deadline so no need to setup context + // but we need to set the timeout we pass along + opt := client.WithRequestTimeout(d.Sub(time.Now())) + opt(&callOpts) + } + + // should we noop right here? + select { + case <-ctx.Done(): + return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + default: + } + + call := func(i int) (client.Streamer, error) { + // call backoff first. Someone may want an initial start delay + t, err := callOpts.Backoff(ctx, req, i) + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + // only sleep if greater than 0 + if t.Seconds() > 0 { + time.Sleep(t) + } + + node, err := next() + if err != nil && err == selector.ErrNotFound { + return nil, errors.NotFound("go.micro.client", err.Error()) + } else if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } + + addr := node.Address + if node.Port > 0 { + addr = fmt.Sprintf("%s:%d", addr, node.Port) + } + + stream, err := h.stream(ctx, addr, req, callOpts) + h.opts.Selector.Mark(req.Service(), node, err) + return stream, err + } + + type response struct { + stream client.Streamer + err error + } + + ch := make(chan response, callOpts.Retries) + var grr error + + for i := 0; i < callOpts.Retries; i++ { + go func() { + s, err := call(i) + ch <- response{s, err} + }() + + select { + case <-ctx.Done(): + return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) + case rsp := <-ch: + // if the call succeeded lets bail early + if rsp.err == nil { + return rsp.stream, nil + } + + retry, rerr := callOpts.Retry(ctx, req, i, err) + if rerr != nil { + return nil, rerr + } + + if !retry { + return nil, rsp.err + } + + grr = rsp.err + } + } + + return nil, grr +} + +func (h *httpClient) StreamRemote(ctx context.Context, addr string, req client.Request, opts ...client.CallOption) (client.Streamer, error) { + callOpts := h.opts.CallOptions + for _, opt := range opts { + opt(&callOpts) + } + return h.stream(ctx, addr, req, callOpts) +} + +func (h *httpClient) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error { + md, ok := metadata.FromContext(ctx) + if !ok { + md = make(map[string]string) + } + md["Content-Type"] = p.ContentType() + + cf, err := h.newCodec(p.ContentType()) + if err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + b := &buffer{bytes.NewBuffer(nil)} + if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil { + return errors.InternalServerError("go.micro.client", err.Error()) + } + + h.once.Do(func() { + h.opts.Broker.Connect() + }) + + return h.opts.Broker.Publish(p.Topic(), &broker.Message{ + Header: md, + Body: b.Bytes(), + }) +} + +func (h *httpClient) String() string { + return "http" +} + +func newClient(opts ...client.Option) client.Client { + options := client.Options{ + CallOptions: client.CallOptions{ + Backoff: client.DefaultBackoff, + Retry: client.DefaultRetry, + Retries: client.DefaultRetries, + RequestTimeout: client.DefaultRequestTimeout, + DialTimeout: transport.DefaultDialTimeout, + }, + } + + for _, o := range opts { + o(&options) + } + + if len(options.ContentType) == 0 { + options.ContentType = "application/proto" + } + + if options.Broker == nil { + options.Broker = broker.DefaultBroker + } + + if options.Registry == nil { + options.Registry = registry.DefaultRegistry + } + + if options.Selector == nil { + options.Selector = selector.NewSelector( + selector.Registry(options.Registry), + ) + } + + rc := &httpClient{ + once: sync.Once{}, + opts: options, + } + + c := client.Client(rc) + + // wrap in reverse + for i := len(options.Wrappers); i > 0; i-- { + c = options.Wrappers[i-1](c) + } + + return c +} + +func NewClient(opts ...client.Option) client.Client { + return newClient(opts...) +} diff --git a/http_test.go b/http_test.go new file mode 100644 index 0000000..b8410fa --- /dev/null +++ b/http_test.go @@ -0,0 +1,280 @@ +package http + +import ( + "bufio" + "bytes" + "fmt" + "io/ioutil" + "net" + "net/http" + "strconv" + "testing" + + "github.com/micro/go-micro/client" + "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/selector" + "github.com/micro/go-plugins/client/http/proto" + "github.com/micro/go-plugins/registry/memory" + + "golang.org/x/net/context" +) + +func TestHTTPClient(t *testing.T) { + r := memory.NewRegistry() + s := selector.NewSelector(selector.Registry(r)) + + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer l.Close() + + mux := http.NewServeMux() + mux.HandleFunc("/foo/bar", func(w http.ResponseWriter, r *http.Request) { + // only accept post + if r.Method != "POST" { + http.Error(w, "expect post method", 500) + return + } + + // get codec + ct := r.Header.Get("Content-Type") + codec, ok := defaultHTTPCodecs[ct] + if !ok { + http.Error(w, "codec not found", 500) + return + } + b, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + + // extract message + msg := new(test.Message) + if err := codec.Unmarshal(b, msg); err != nil { + http.Error(w, err.Error(), 500) + return + } + + // marshal response + b, err = codec.Marshal(msg) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + + // write response + w.Write(b) + }) + go http.Serve(l, mux) + + host, sport, err := net.SplitHostPort(l.Addr().String()) + if err != nil { + t.Fatal(err) + } + port, _ := strconv.Atoi(sport) + + if err := r.Register(®istry.Service{ + Name: "test.service", + Nodes: []*registry.Node{ + { + Id: "test.service.1", + Address: host, + Port: port, + }, + }, + }); err != nil { + t.Fatal(err) + } + + c := NewClient(client.Selector(s)) + + for i := 0; i < 10; i++ { + msg := &test.Message{ + Seq: int64(i), + Data: fmt.Sprintf("message %d", i), + } + req := c.NewRequest("test.service", "/foo/bar", msg) + rsp := new(test.Message) + err := c.Call(context.TODO(), req, rsp) + if err != nil { + t.Fatal(err) + } + if rsp.Seq != msg.Seq { + t.Fatalf("invalid seq %d for %d", rsp.Seq, msg.Seq) + } + } +} + +func TestHTTPClientStream(t *testing.T) { + r := memory.NewRegistry() + s := selector.NewSelector(selector.Registry(r)) + + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer l.Close() + + mux := http.NewServeMux() + mux.HandleFunc("/foo/bar", func(w http.ResponseWriter, r *http.Request) { + // only accept post + if r.Method != "POST" { + http.Error(w, "expect post method", 500) + return + } + + // hijack the connection + hj, ok := w.(http.Hijacker) + if !ok { + http.Error(w, "could not hijack conn", 500) + return + + } + + // hijacked + conn, bufrw, err := hj.Hijack() + if err != nil { + http.Error(w, err.Error(), 500) + return + } + defer conn.Close() + + // read off the first request + // get codec + ct := r.Header.Get("Content-Type") + codec, ok := defaultHTTPCodecs[ct] + if !ok { + http.Error(w, "codec not found", 500) + return + } + b, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + + // extract message + msg := new(test.Message) + if err := codec.Unmarshal(b, msg); err != nil { + http.Error(w, err.Error(), 500) + return + } + + // marshal response + b, err = codec.Marshal(msg) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + + // write response + rsp := &http.Response{ + Header: r.Header, + Body: &buffer{bytes.NewBuffer(b)}, + Status: "200 OK", + StatusCode: 200, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + ContentLength: int64(len(b)), + } + + // write response + rsp.Write(bufrw) + bufrw.Flush() + + reader := bufio.NewReader(conn) + + for { + r, err := http.ReadRequest(reader) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + + b, err = ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + + // extract message + msg := new(test.Message) + if err := codec.Unmarshal(b, msg); err != nil { + http.Error(w, err.Error(), 500) + return + } + + // marshal response + b, err = codec.Marshal(msg) + if err != nil { + http.Error(w, err.Error(), 500) + return + } + + rsp := &http.Response{ + Header: r.Header, + Body: &buffer{bytes.NewBuffer(b)}, + Status: "200 OK", + StatusCode: 200, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + ContentLength: int64(len(b)), + } + + // write response + rsp.Write(bufrw) + bufrw.Flush() + } + }) + go http.Serve(l, mux) + + host, sport, err := net.SplitHostPort(l.Addr().String()) + if err != nil { + t.Fatal(err) + } + port, _ := strconv.Atoi(sport) + + if err := r.Register(®istry.Service{ + Name: "test.service", + Nodes: []*registry.Node{ + { + Id: "test.service.1", + Address: host, + Port: port, + }, + }, + }); err != nil { + t.Fatal(err) + } + + c := NewClient(client.Selector(s)) + req := c.NewRequest("test.service", "/foo/bar", new(test.Message)) + stream, err := c.Stream(context.TODO(), req) + if err != nil { + t.Fatal(err) + } + defer stream.Close() + + for i := 0; i < 10; i++ { + msg := &test.Message{ + Seq: int64(i), + Data: fmt.Sprintf("message %d", i), + } + err := stream.Send(msg) + if err != nil { + t.Fatal(err) + } + rsp := new(test.Message) + err = stream.Recv(rsp) + if err != nil { + t.Fatal(err) + } + if rsp.Seq != msg.Seq { + t.Fatalf("invalid seq %d for %d", rsp.Seq, msg.Seq) + } + } +} diff --git a/proto/test.pb.go b/proto/test.pb.go new file mode 100644 index 0000000..4855911 --- /dev/null +++ b/proto/test.pb.go @@ -0,0 +1,60 @@ +// Code generated by protoc-gen-go. +// source: github.com/micro/go-plugins/client/http/proto/test.proto +// DO NOT EDIT! + +/* +Package test is a generated protocol buffer package. + +It is generated from these files: + github.com/micro/go-plugins/client/http/proto/test.proto + +It has these top-level messages: + Message +*/ +package test + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Message struct { + Seq int64 `protobuf:"varint,1,opt,name=seq" json:"seq,omitempty"` + Data string `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func init() { + proto.RegisterType((*Message)(nil), "test.Message") +} + +func init() { + proto.RegisterFile("github.com/micro/go-plugins/client/http/proto/test.proto", fileDescriptor0) +} + +var fileDescriptor0 = []byte{ + // 131 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x1c, 0xcb, 0xb1, 0x0e, 0x82, 0x30, + 0x10, 0x06, 0xe0, 0x54, 0x88, 0xc6, 0x4e, 0xa6, 0x13, 0x23, 0x71, 0x62, 0x91, 0x1b, 0x5c, 0x7c, + 0x09, 0x17, 0xde, 0xa0, 0xd4, 0x4b, 0x69, 0x02, 0x5c, 0xe5, 0x7e, 0xde, 0x9f, 0xd0, 0xed, 0x5b, + 0x3e, 0xfb, 0x89, 0x09, 0xd3, 0x3e, 0xf6, 0x41, 0x16, 0x5a, 0x52, 0xd8, 0x84, 0xa2, 0xbc, 0xf2, + 0xbc, 0xc7, 0xb4, 0x2a, 0x85, 0x39, 0xf1, 0x0a, 0x9a, 0x80, 0x4c, 0x79, 0x13, 0x08, 0x81, 0x15, + 0x7d, 0xa1, 0xab, 0x4f, 0x3f, 0xc9, 0xde, 0xbe, 0xac, 0xea, 0x23, 0xbb, 0x87, 0xad, 0x94, 0xff, + 0x8d, 0x69, 0x4d, 0x57, 0x0d, 0x27, 0x9d, 0xb3, 0xf5, 0xcf, 0xc3, 0x37, 0x97, 0xd6, 0x74, 0xf7, + 0xa1, 0x78, 0xbc, 0x96, 0xfd, 0x3e, 0x02, 0x00, 0x00, 0xff, 0xff, 0xff, 0x16, 0x8d, 0x95, 0x79, + 0x00, 0x00, 0x00, +} diff --git a/proto/test.proto b/proto/test.proto new file mode 100644 index 0000000..80ebd44 --- /dev/null +++ b/proto/test.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package test; + +message Message { + int64 seq = 1; + string data = 2; +} diff --git a/publication.go b/publication.go new file mode 100644 index 0000000..60fa52a --- /dev/null +++ b/publication.go @@ -0,0 +1,31 @@ +package http + +import ( + "github.com/micro/go-micro/client" +) + +type httpPublication struct { + topic string + contentType string + message interface{} +} + +func newHTTPPublication(topic string, message interface{}, contentType string) client.Publication { + return &httpPublication{ + message: message, + topic: topic, + contentType: contentType, + } +} + +func (h *httpPublication) ContentType() string { + return h.contentType +} + +func (h *httpPublication) Topic() string { + return h.topic +} + +func (h *httpPublication) Message() interface{} { + return h.message +} diff --git a/request.go b/request.go new file mode 100644 index 0000000..5ee3b96 --- /dev/null +++ b/request.go @@ -0,0 +1,48 @@ +package http + +import ( + "github.com/micro/go-micro/client" +) + +type httpRequest struct { + service string + method string + contentType string + request interface{} + opts client.RequestOptions +} + +func newHTTPRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request { + var opts client.RequestOptions + for _, o := range reqOpts { + o(&opts) + } + + return &httpRequest{ + service: service, + method: method, + request: request, + contentType: contentType, + opts: opts, + } +} + +func (h *httpRequest) ContentType() string { + return h.contentType +} + +func (h *httpRequest) Service() string { + return h.service +} + +func (h *httpRequest) Method() string { + return h.method +} + +func (h *httpRequest) Request() interface{} { + return h.request +} + +func (h *httpRequest) Stream() bool { + return h.opts.Stream +} diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..17d5085 --- /dev/null +++ b/stream.go @@ -0,0 +1,128 @@ +package http + +import ( + "bufio" + "bytes" + "errors" + "io/ioutil" + "net" + "net/http" + "net/url" + "sync" + + "github.com/micro/go-micro/client" + + "golang.org/x/net/context" +) + +// Implements the streamer interface +type httpStream struct { + sync.RWMutex + address string + codec Codec + context context.Context + header http.Header + seq uint64 + closed chan bool + err error + conn net.Conn + reader *bufio.Reader + request client.Request +} + +var ( + errShutdown = errors.New("connection is shut down") +) + +func (h *httpStream) isClosed() bool { + select { + case <-h.closed: + return true + default: + return false + } +} + +func (h *httpStream) Context() context.Context { + return h.context +} + +func (h *httpStream) Request() client.Request { + return h.request +} + +func (h *httpStream) Send(msg interface{}) error { + h.Lock() + defer h.Unlock() + + if h.isClosed() { + h.err = errShutdown + return errShutdown + } + + b, err := h.codec.Marshal(msg) + if err != nil { + return err + } + + buf := &buffer{bytes.NewBuffer(b)} + defer buf.Close() + + req := &http.Request{ + Method: "POST", + URL: &url.URL{ + Scheme: "http", + Host: h.address, + Path: h.request.Method(), + }, + Header: h.header, + Body: buf, + ContentLength: int64(len(b)), + Host: h.address, + } + + return req.Write(h.conn) +} + +func (h *httpStream) Recv(msg interface{}) error { + h.Lock() + defer h.Unlock() + + if h.isClosed() { + h.err = errShutdown + return errShutdown + } + + rsp, err := http.ReadResponse(h.reader, new(http.Request)) + if err != nil { + return err + } + defer rsp.Body.Close() + + b, err := ioutil.ReadAll(rsp.Body) + if err != nil { + return err + } + + if rsp.StatusCode != 200 { + return errors.New(rsp.Status + ": " + string(b)) + } + + return h.codec.Unmarshal(b, msg) +} + +func (h *httpStream) Error() error { + h.RLock() + defer h.RUnlock() + return h.err +} + +func (h *httpStream) Close() error { + select { + case <-h.closed: + return nil + default: + close(h.closed) + return h.conn.Close() + } +}