add a http client

This commit is contained in:
Asim Aslam 2017-01-01 18:39:05 +00:00 committed by Vasiliy Tolstov
commit 0f3fd4b1f9
10 changed files with 1158 additions and 0 deletions

43
README.md Normal file
View File

@ -0,0 +1,43 @@
# HTTP Client
This plugin is a http client for go-micro.
The http client wraps net/http to provide a robust go-micro client with service discovery, load balancing and streaming.
## Usage
### Use directly
```go
import "github.com/micro/go-plugins/client/http"
service := micro.NewService(
micro.Name("my.service"),
micro.Client(http.NewClient()),
)
```
### Use with flags
```go
import _ "github.com/micro/go-plugins/client/http"
```
```shell
go run main.go --client=http
```
### Call Service
Assuming you have a http service "my.service" with path "/foo/bar"
```go
// new client
client := http.NewClient()
// create request/response
request := client.NewRequest("my.service", "/foo/bar", &proto.Request{})
response := new(proto.Response)
// call service
err := client.Call(context.TODO(), request, response)
```

14
buffer.go Normal file
View File

@ -0,0 +1,14 @@
package http
import (
"bytes"
)
type buffer struct {
*bytes.Buffer
}
func (b *buffer) Close() error {
b.Buffer.Reset()
return nil
}

61
codec.go Normal file
View File

@ -0,0 +1,61 @@
package http
import (
"encoding/json"
"github.com/golang/protobuf/proto"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/codec/jsonrpc"
"github.com/micro/go-micro/codec/protorpc"
)
type jsonCodec struct{}
type protoCodec struct{}
type Codec interface {
Marshal(v interface{}) ([]byte, error)
Unmarshal(b []byte, v interface{}) error
String() string
}
var (
defaultHTTPCodecs = map[string]Codec{
"application/json": jsonCodec{},
"application/proto": protoCodec{},
"application/protobuf": protoCodec{},
"application/octet-stream": protoCodec{},
}
defaultRPCCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
)
func (protoCodec) Marshal(v interface{}) ([]byte, error) {
return proto.Marshal(v.(proto.Message))
}
func (protoCodec) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message))
}
func (protoCodec) String() string {
return "proto"
}
func (jsonCodec) Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (jsonCodec) Unmarshal(data []byte, v interface{}) error {
return json.Unmarshal(data, v)
}
func (jsonCodec) String() string {
return "json"
}

485
http.go Normal file
View File

@ -0,0 +1,485 @@
package http
import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
"time"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/codec"
errors "github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-micro/transport"
"golang.org/x/net/context"
)
type httpClient struct {
once sync.Once
opts client.Options
}
func init() {
cmd.DefaultClients["http"] = NewClient
}
func (h *httpClient) call(ctx context.Context, address string, req client.Request, rsp interface{}, opts client.CallOptions) error {
header := make(http.Header)
if md, ok := metadata.FromContext(ctx); ok {
for k, v := range md {
header.Set(k, v)
}
}
// set timeout in nanoseconds
header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout))
// set the content type for the request
header.Set("Content-Type", req.ContentType())
// get codec
cf, err := h.newHTTPCodec(req.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// marshal request
b, err := cf.Marshal(req.Request())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
buf := &buffer{bytes.NewBuffer(b)}
defer buf.Close()
hreq := &http.Request{
Method: "POST",
URL: &url.URL{
Scheme: "http",
Host: address,
Path: req.Method(),
},
Header: header,
Body: buf,
ContentLength: int64(len(b)),
Host: address,
}
// make the request
hrsp, err := http.DefaultClient.Do(hreq.WithContext(ctx))
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
defer hrsp.Body.Close()
// parse response
b, err = ioutil.ReadAll(hrsp.Body)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// unmarshal
if err := cf.Unmarshal(b, rsp); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
return nil
}
func (h *httpClient) stream(ctx context.Context, address string, req client.Request, opts client.CallOptions) (client.Streamer, error) {
header := make(http.Header)
if md, ok := metadata.FromContext(ctx); ok {
for k, v := range md {
header.Set(k, v)
}
}
// set timeout in nanoseconds
header.Set("Timeout", fmt.Sprintf("%d", opts.RequestTimeout))
// set the content type for the request
header.Set("Content-Type", req.ContentType())
// get codec
cf, err := h.newHTTPCodec(req.ContentType())
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
cc, err := net.Dial("tcp", address)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error dialing: %v", err))
}
return &httpStream{
address: address,
context: ctx,
closed: make(chan bool),
conn: cc,
codec: cf,
header: header,
reader: bufio.NewReader(cc),
request: req,
}, nil
}
func (h *httpClient) newHTTPCodec(contentType string) (Codec, error) {
if c, ok := defaultHTTPCodecs[contentType]; ok {
return c, nil
}
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
}
func (h *httpClient) newCodec(contentType string) (codec.NewCodec, error) {
if c, ok := h.opts.Codecs[contentType]; ok {
return c, nil
}
if cf, ok := defaultRPCCodecs[contentType]; ok {
return cf, nil
}
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
}
func (h *httpClient) Init(opts ...client.Option) error {
for _, o := range opts {
o(&h.opts)
}
return nil
}
func (h *httpClient) Options() client.Options {
return h.opts
}
func (h *httpClient) NewPublication(topic string, msg interface{}) client.Publication {
return newHTTPPublication(topic, msg, "application/proto")
}
func (h *httpClient) NewRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return newHTTPRequest(service, method, req, h.opts.ContentType, reqOpts...)
}
func (h *httpClient) NewProtoRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return newHTTPRequest(service, method, req, "application/proto", reqOpts...)
}
func (h *httpClient) NewJsonRequest(service, method string, req interface{}, reqOpts ...client.RequestOption) client.Request {
return newHTTPRequest(service, method, req, "application/json", reqOpts...)
}
func (h *httpClient) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
// make a copy of call opts
callOpts := h.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
// get next nodes from the selector
next, err := h.opts.Selector.Select(req.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
// no deadline so we create a new one
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := client.WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
}
// should we noop right here?
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
default:
}
// make copy of call method
hcall := h.call
// wrap the call in reverse
for i := len(callOpts.CallWrappers); i > 0; i-- {
hcall = callOpts.CallWrappers[i-1](hcall)
}
// return errors.New("go.micro.client", "request timeout", 408)
call := func(i int) error {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i)
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
// select next node
node, err := next()
if err != nil && err == selector.ErrNotFound {
return errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the address
addr := node.Address
if node.Port > 0 {
addr = fmt.Sprintf("%s:%d", addr, node.Port)
}
// make the call
err = hcall(ctx, addr, req, rsp, callOpts)
h.opts.Selector.Mark(req.Service(), node, err)
return err
}
ch := make(chan error, callOpts.Retries)
var gerr error
for i := 0; i < callOpts.Retries; i++ {
go func() {
ch <- call(i)
}()
select {
case <-ctx.Done():
return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
case err := <-ch:
// if the call succeeded lets bail early
if err == nil {
return nil
}
retry, rerr := callOpts.Retry(ctx, req, i, err)
if rerr != nil {
return rerr
}
if !retry {
return err
}
gerr = err
}
}
return gerr
}
func (h *httpClient) CallRemote(ctx context.Context, addr string, req client.Request, rsp interface{}, opts ...client.CallOption) error {
callOpts := h.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
return h.call(ctx, addr, req, rsp, callOpts)
}
func (h *httpClient) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
// make a copy of call opts
callOpts := h.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
// get next nodes from the selector
next, err := h.opts.Selector.Select(req.Service(), callOpts.SelectOptions...)
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// check if we already have a deadline
d, ok := ctx.Deadline()
if !ok {
// no deadline so we create a new one
ctx, _ = context.WithTimeout(ctx, callOpts.RequestTimeout)
} else {
// got a deadline so no need to setup context
// but we need to set the timeout we pass along
opt := client.WithRequestTimeout(d.Sub(time.Now()))
opt(&callOpts)
}
// should we noop right here?
select {
case <-ctx.Done():
return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
default:
}
call := func(i int) (client.Streamer, error) {
// call backoff first. Someone may want an initial start delay
t, err := callOpts.Backoff(ctx, req, i)
if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
// only sleep if greater than 0
if t.Seconds() > 0 {
time.Sleep(t)
}
node, err := next()
if err != nil && err == selector.ErrNotFound {
return nil, errors.NotFound("go.micro.client", err.Error())
} else if err != nil {
return nil, errors.InternalServerError("go.micro.client", err.Error())
}
addr := node.Address
if node.Port > 0 {
addr = fmt.Sprintf("%s:%d", addr, node.Port)
}
stream, err := h.stream(ctx, addr, req, callOpts)
h.opts.Selector.Mark(req.Service(), node, err)
return stream, err
}
type response struct {
stream client.Streamer
err error
}
ch := make(chan response, callOpts.Retries)
var grr error
for i := 0; i < callOpts.Retries; i++ {
go func() {
s, err := call(i)
ch <- response{s, err}
}()
select {
case <-ctx.Done():
return nil, errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408)
case rsp := <-ch:
// if the call succeeded lets bail early
if rsp.err == nil {
return rsp.stream, nil
}
retry, rerr := callOpts.Retry(ctx, req, i, err)
if rerr != nil {
return nil, rerr
}
if !retry {
return nil, rsp.err
}
grr = rsp.err
}
}
return nil, grr
}
func (h *httpClient) StreamRemote(ctx context.Context, addr string, req client.Request, opts ...client.CallOption) (client.Streamer, error) {
callOpts := h.opts.CallOptions
for _, opt := range opts {
opt(&callOpts)
}
return h.stream(ctx, addr, req, callOpts)
}
func (h *httpClient) Publish(ctx context.Context, p client.Publication, opts ...client.PublishOption) error {
md, ok := metadata.FromContext(ctx)
if !ok {
md = make(map[string]string)
}
md["Content-Type"] = p.ContentType()
cf, err := h.newCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
b := &buffer{bytes.NewBuffer(nil)}
if err := cf(b).Write(&codec.Message{Type: codec.Publication}, p.Message()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
h.once.Do(func() {
h.opts.Broker.Connect()
})
return h.opts.Broker.Publish(p.Topic(), &broker.Message{
Header: md,
Body: b.Bytes(),
})
}
func (h *httpClient) String() string {
return "http"
}
func newClient(opts ...client.Option) client.Client {
options := client.Options{
CallOptions: client.CallOptions{
Backoff: client.DefaultBackoff,
Retry: client.DefaultRetry,
Retries: client.DefaultRetries,
RequestTimeout: client.DefaultRequestTimeout,
DialTimeout: transport.DefaultDialTimeout,
},
}
for _, o := range opts {
o(&options)
}
if len(options.ContentType) == 0 {
options.ContentType = "application/proto"
}
if options.Broker == nil {
options.Broker = broker.DefaultBroker
}
if options.Registry == nil {
options.Registry = registry.DefaultRegistry
}
if options.Selector == nil {
options.Selector = selector.NewSelector(
selector.Registry(options.Registry),
)
}
rc := &httpClient{
once: sync.Once{},
opts: options,
}
c := client.Client(rc)
// wrap in reverse
for i := len(options.Wrappers); i > 0; i-- {
c = options.Wrappers[i-1](c)
}
return c
}
func NewClient(opts ...client.Option) client.Client {
return newClient(opts...)
}

280
http_test.go Normal file
View File

@ -0,0 +1,280 @@
package http
import (
"bufio"
"bytes"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"testing"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/selector"
"github.com/micro/go-plugins/client/http/proto"
"github.com/micro/go-plugins/registry/memory"
"golang.org/x/net/context"
)
func TestHTTPClient(t *testing.T) {
r := memory.NewRegistry()
s := selector.NewSelector(selector.Registry(r))
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer l.Close()
mux := http.NewServeMux()
mux.HandleFunc("/foo/bar", func(w http.ResponseWriter, r *http.Request) {
// only accept post
if r.Method != "POST" {
http.Error(w, "expect post method", 500)
return
}
// get codec
ct := r.Header.Get("Content-Type")
codec, ok := defaultHTTPCodecs[ct]
if !ok {
http.Error(w, "codec not found", 500)
return
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
// extract message
msg := new(test.Message)
if err := codec.Unmarshal(b, msg); err != nil {
http.Error(w, err.Error(), 500)
return
}
// marshal response
b, err = codec.Marshal(msg)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
// write response
w.Write(b)
})
go http.Serve(l, mux)
host, sport, err := net.SplitHostPort(l.Addr().String())
if err != nil {
t.Fatal(err)
}
port, _ := strconv.Atoi(sport)
if err := r.Register(&registry.Service{
Name: "test.service",
Nodes: []*registry.Node{
{
Id: "test.service.1",
Address: host,
Port: port,
},
},
}); err != nil {
t.Fatal(err)
}
c := NewClient(client.Selector(s))
for i := 0; i < 10; i++ {
msg := &test.Message{
Seq: int64(i),
Data: fmt.Sprintf("message %d", i),
}
req := c.NewRequest("test.service", "/foo/bar", msg)
rsp := new(test.Message)
err := c.Call(context.TODO(), req, rsp)
if err != nil {
t.Fatal(err)
}
if rsp.Seq != msg.Seq {
t.Fatalf("invalid seq %d for %d", rsp.Seq, msg.Seq)
}
}
}
func TestHTTPClientStream(t *testing.T) {
r := memory.NewRegistry()
s := selector.NewSelector(selector.Registry(r))
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer l.Close()
mux := http.NewServeMux()
mux.HandleFunc("/foo/bar", func(w http.ResponseWriter, r *http.Request) {
// only accept post
if r.Method != "POST" {
http.Error(w, "expect post method", 500)
return
}
// hijack the connection
hj, ok := w.(http.Hijacker)
if !ok {
http.Error(w, "could not hijack conn", 500)
return
}
// hijacked
conn, bufrw, err := hj.Hijack()
if err != nil {
http.Error(w, err.Error(), 500)
return
}
defer conn.Close()
// read off the first request
// get codec
ct := r.Header.Get("Content-Type")
codec, ok := defaultHTTPCodecs[ct]
if !ok {
http.Error(w, "codec not found", 500)
return
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
// extract message
msg := new(test.Message)
if err := codec.Unmarshal(b, msg); err != nil {
http.Error(w, err.Error(), 500)
return
}
// marshal response
b, err = codec.Marshal(msg)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
// write response
rsp := &http.Response{
Header: r.Header,
Body: &buffer{bytes.NewBuffer(b)},
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(b)),
}
// write response
rsp.Write(bufrw)
bufrw.Flush()
reader := bufio.NewReader(conn)
for {
r, err := http.ReadRequest(reader)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
b, err = ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
// extract message
msg := new(test.Message)
if err := codec.Unmarshal(b, msg); err != nil {
http.Error(w, err.Error(), 500)
return
}
// marshal response
b, err = codec.Marshal(msg)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
rsp := &http.Response{
Header: r.Header,
Body: &buffer{bytes.NewBuffer(b)},
Status: "200 OK",
StatusCode: 200,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(b)),
}
// write response
rsp.Write(bufrw)
bufrw.Flush()
}
})
go http.Serve(l, mux)
host, sport, err := net.SplitHostPort(l.Addr().String())
if err != nil {
t.Fatal(err)
}
port, _ := strconv.Atoi(sport)
if err := r.Register(&registry.Service{
Name: "test.service",
Nodes: []*registry.Node{
{
Id: "test.service.1",
Address: host,
Port: port,
},
},
}); err != nil {
t.Fatal(err)
}
c := NewClient(client.Selector(s))
req := c.NewRequest("test.service", "/foo/bar", new(test.Message))
stream, err := c.Stream(context.TODO(), req)
if err != nil {
t.Fatal(err)
}
defer stream.Close()
for i := 0; i < 10; i++ {
msg := &test.Message{
Seq: int64(i),
Data: fmt.Sprintf("message %d", i),
}
err := stream.Send(msg)
if err != nil {
t.Fatal(err)
}
rsp := new(test.Message)
err = stream.Recv(rsp)
if err != nil {
t.Fatal(err)
}
if rsp.Seq != msg.Seq {
t.Fatalf("invalid seq %d for %d", rsp.Seq, msg.Seq)
}
}
}

60
proto/test.pb.go Normal file
View File

@ -0,0 +1,60 @@
// Code generated by protoc-gen-go.
// source: github.com/micro/go-plugins/client/http/proto/test.proto
// DO NOT EDIT!
/*
Package test is a generated protocol buffer package.
It is generated from these files:
github.com/micro/go-plugins/client/http/proto/test.proto
It has these top-level messages:
Message
*/
package test
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Message struct {
Seq int64 `protobuf:"varint,1,opt,name=seq" json:"seq,omitempty"`
Data string `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func init() {
proto.RegisterType((*Message)(nil), "test.Message")
}
func init() {
proto.RegisterFile("github.com/micro/go-plugins/client/http/proto/test.proto", fileDescriptor0)
}
var fileDescriptor0 = []byte{
// 131 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x1c, 0xcb, 0xb1, 0x0e, 0x82, 0x30,
0x10, 0x06, 0xe0, 0x54, 0x88, 0xc6, 0x4e, 0xa6, 0x13, 0x23, 0x71, 0x62, 0x91, 0x1b, 0x5c, 0x7c,
0x09, 0x17, 0xde, 0xa0, 0xd4, 0x4b, 0x69, 0x02, 0x5c, 0xe5, 0x7e, 0xde, 0x9f, 0xd0, 0xed, 0x5b,
0x3e, 0xfb, 0x89, 0x09, 0xd3, 0x3e, 0xf6, 0x41, 0x16, 0x5a, 0x52, 0xd8, 0x84, 0xa2, 0xbc, 0xf2,
0xbc, 0xc7, 0xb4, 0x2a, 0x85, 0x39, 0xf1, 0x0a, 0x9a, 0x80, 0x4c, 0x79, 0x13, 0x08, 0x81, 0x15,
0x7d, 0xa1, 0xab, 0x4f, 0x3f, 0xc9, 0xde, 0xbe, 0xac, 0xea, 0x23, 0xbb, 0x87, 0xad, 0x94, 0xff,
0x8d, 0x69, 0x4d, 0x57, 0x0d, 0x27, 0x9d, 0xb3, 0xf5, 0xcf, 0xc3, 0x37, 0x97, 0xd6, 0x74, 0xf7,
0xa1, 0x78, 0xbc, 0x96, 0xfd, 0x3e, 0x02, 0x00, 0x00, 0xff, 0xff, 0xff, 0x16, 0x8d, 0x95, 0x79,
0x00, 0x00, 0x00,
}

8
proto/test.proto Normal file
View File

@ -0,0 +1,8 @@
syntax = "proto3";
package test;
message Message {
int64 seq = 1;
string data = 2;
}

31
publication.go Normal file
View File

@ -0,0 +1,31 @@
package http
import (
"github.com/micro/go-micro/client"
)
type httpPublication struct {
topic string
contentType string
message interface{}
}
func newHTTPPublication(topic string, message interface{}, contentType string) client.Publication {
return &httpPublication{
message: message,
topic: topic,
contentType: contentType,
}
}
func (h *httpPublication) ContentType() string {
return h.contentType
}
func (h *httpPublication) Topic() string {
return h.topic
}
func (h *httpPublication) Message() interface{} {
return h.message
}

48
request.go Normal file
View File

@ -0,0 +1,48 @@
package http
import (
"github.com/micro/go-micro/client"
)
type httpRequest struct {
service string
method string
contentType string
request interface{}
opts client.RequestOptions
}
func newHTTPRequest(service, method string, request interface{}, contentType string, reqOpts ...client.RequestOption) client.Request {
var opts client.RequestOptions
for _, o := range reqOpts {
o(&opts)
}
return &httpRequest{
service: service,
method: method,
request: request,
contentType: contentType,
opts: opts,
}
}
func (h *httpRequest) ContentType() string {
return h.contentType
}
func (h *httpRequest) Service() string {
return h.service
}
func (h *httpRequest) Method() string {
return h.method
}
func (h *httpRequest) Request() interface{} {
return h.request
}
func (h *httpRequest) Stream() bool {
return h.opts.Stream
}

128
stream.go Normal file
View File

@ -0,0 +1,128 @@
package http
import (
"bufio"
"bytes"
"errors"
"io/ioutil"
"net"
"net/http"
"net/url"
"sync"
"github.com/micro/go-micro/client"
"golang.org/x/net/context"
)
// Implements the streamer interface
type httpStream struct {
sync.RWMutex
address string
codec Codec
context context.Context
header http.Header
seq uint64
closed chan bool
err error
conn net.Conn
reader *bufio.Reader
request client.Request
}
var (
errShutdown = errors.New("connection is shut down")
)
func (h *httpStream) isClosed() bool {
select {
case <-h.closed:
return true
default:
return false
}
}
func (h *httpStream) Context() context.Context {
return h.context
}
func (h *httpStream) Request() client.Request {
return h.request
}
func (h *httpStream) Send(msg interface{}) error {
h.Lock()
defer h.Unlock()
if h.isClosed() {
h.err = errShutdown
return errShutdown
}
b, err := h.codec.Marshal(msg)
if err != nil {
return err
}
buf := &buffer{bytes.NewBuffer(b)}
defer buf.Close()
req := &http.Request{
Method: "POST",
URL: &url.URL{
Scheme: "http",
Host: h.address,
Path: h.request.Method(),
},
Header: h.header,
Body: buf,
ContentLength: int64(len(b)),
Host: h.address,
}
return req.Write(h.conn)
}
func (h *httpStream) Recv(msg interface{}) error {
h.Lock()
defer h.Unlock()
if h.isClosed() {
h.err = errShutdown
return errShutdown
}
rsp, err := http.ReadResponse(h.reader, new(http.Request))
if err != nil {
return err
}
defer rsp.Body.Close()
b, err := ioutil.ReadAll(rsp.Body)
if err != nil {
return err
}
if rsp.StatusCode != 200 {
return errors.New(rsp.Status + ": " + string(b))
}
return h.codec.Unmarshal(b, msg)
}
func (h *httpStream) Error() error {
h.RLock()
defer h.RUnlock()
return h.err
}
func (h *httpStream) Close() error {
select {
case <-h.closed:
return nil
default:
close(h.closed)
return h.conn.Close()
}
}