Merge branch 'master' into selector

This commit is contained in:
Asim 2016-05-06 23:16:07 +01:00
commit 639172f755
3 changed files with 110 additions and 28 deletions

View File

@ -14,6 +14,8 @@ import (
type service struct { type service struct {
opts Options opts Options
init chan bool
} }
func newService(opts ...Option) Service { func newService(opts ...Option) Service {
@ -28,6 +30,7 @@ func newService(opts ...Option) Service {
return &service{ return &service{
opts: options, opts: options,
init: make(chan bool),
} }
} }
@ -49,24 +52,36 @@ func (s *service) run(exit chan bool) {
} }
} }
// Init initialises options. Additionally it calls cmd.Init
// which parses command line flags. cmd.Init is only called
// on first Init.
func (s *service) Init(opts ...Option) { func (s *service) Init(opts ...Option) {
// We might get more command flags or the action here // If <-s.init blocks, Init has not been called yet
// This is pretty ugly, find a better way // so we can call cmd.Init once.
options := newOptions() select {
options.Cmd = s.opts.Cmd case <-s.init:
for _, o := range opts { default:
o(&options) // close init
} close(s.init)
s.opts.Cmd = options.Cmd
// Initialise the command flags, overriding new service // We might get more command flags or the action here
s.opts.Cmd.Init( // This is pretty ugly, find a better way
cmd.Broker(&s.opts.Broker), options := newOptions()
cmd.Registry(&s.opts.Registry), options.Cmd = s.opts.Cmd
cmd.Transport(&s.opts.Transport), for _, o := range opts {
cmd.Client(&s.opts.Client), o(&options)
cmd.Server(&s.opts.Server), }
) s.opts.Cmd = options.Cmd
// Initialise the command flags, overriding new service
s.opts.Cmd.Init(
cmd.Broker(&s.opts.Broker),
cmd.Registry(&s.opts.Registry),
cmd.Transport(&s.opts.Transport),
cmd.Client(&s.opts.Client),
cmd.Server(&s.opts.Server),
)
}
// Update any options to override command flags // Update any options to override command flags
for _, o := range opts { for _, o := range opts {

View File

@ -174,6 +174,10 @@ func (h *httpTransportClient) Recv(m *Message) error {
return err return err
} }
if rsp.StatusCode != 200 {
return errors.New(rsp.Status + ": " + string(b))
}
mr := &Message{ mr := &Message{
Header: make(map[string]string), Header: make(map[string]string),
Body: b, Body: b,

View File

@ -1,13 +1,11 @@
package transport_test package transport
import ( import (
"strings" "strings"
"testing" "testing"
"github.com/micro/go-micro/transport"
) )
func expectedPort(t *testing.T, expected string, lsn transport.Listener) { func expectedPort(t *testing.T, expected string, lsn Listener) {
parts := strings.Split(lsn.Addr(), ":") parts := strings.Split(lsn.Addr(), ":")
port := parts[len(parts)-1] port := parts[len(parts)-1]
@ -18,7 +16,7 @@ func expectedPort(t *testing.T, expected string, lsn transport.Listener) {
} }
func TestHTTPTransportPortRange(t *testing.T) { func TestHTTPTransportPortRange(t *testing.T) {
tp := transport.NewTransport() tp := NewTransport()
lsn1, err := tp.Listen(":44444-44448") lsn1, err := tp.Listen(":44444-44448")
if err != nil { if err != nil {
@ -43,7 +41,7 @@ func TestHTTPTransportPortRange(t *testing.T) {
} }
func TestHTTPTransportCommunication(t *testing.T) { func TestHTTPTransportCommunication(t *testing.T) {
tr := transport.NewTransport() tr := NewTransport()
l, err := tr.Listen(":0") l, err := tr.Listen(":0")
if err != nil { if err != nil {
@ -51,17 +49,15 @@ func TestHTTPTransportCommunication(t *testing.T) {
} }
defer l.Close() defer l.Close()
fn := func(sock transport.Socket) { fn := func(sock Socket) {
defer sock.Close() defer sock.Close()
for { for {
var m transport.Message var m Message
if err := sock.Recv(&m); err != nil { if err := sock.Recv(&m); err != nil {
return return
} }
t.Logf("Successfully received %+v", m)
if err := sock.Send(&m); err != nil { if err := sock.Send(&m); err != nil {
return return
} }
@ -86,7 +82,7 @@ func TestHTTPTransportCommunication(t *testing.T) {
} }
defer c.Close() defer c.Close()
m := transport.Message{ m := Message{
Header: map[string]string{ Header: map[string]string{
"Content-Type": "application/json", "Content-Type": "application/json",
}, },
@ -97,7 +93,7 @@ func TestHTTPTransportCommunication(t *testing.T) {
t.Errorf("Unexpected send err: %v", err) t.Errorf("Unexpected send err: %v", err)
} }
var rm transport.Message var rm Message
if err := c.Recv(&rm); err != nil { if err := c.Recv(&rm); err != nil {
t.Errorf("Unexpected recv err: %v", err) t.Errorf("Unexpected recv err: %v", err)
@ -109,3 +105,70 @@ func TestHTTPTransportCommunication(t *testing.T) {
close(done) close(done)
} }
func TestHTTPTransportError(t *testing.T) {
tr := NewTransport()
l, err := tr.Listen(":0")
if err != nil {
t.Errorf("Unexpected listen err: %v", err)
}
defer l.Close()
fn := func(sock Socket) {
defer sock.Close()
for {
var m Message
if err := sock.Recv(&m); err != nil {
t.Fatal(err)
}
sock.(*httpTransportSocket).error(&Message{
Body: []byte(`an error occurred`),
})
}
}
done := make(chan bool)
go func() {
if err := l.Accept(fn); err != nil {
select {
case <-done:
default:
t.Errorf("Unexpected accept err: %v", err)
}
}
}()
c, err := tr.Dial(l.Addr())
if err != nil {
t.Errorf("Unexpected dial err: %v", err)
}
defer c.Close()
m := Message{
Header: map[string]string{
"Content-Type": "application/json",
},
Body: []byte(`{"message": "Hello World"}`),
}
if err := c.Send(&m); err != nil {
t.Errorf("Unexpected send err: %v", err)
}
var rm Message
err = c.Recv(&rm)
if err == nil {
t.Fatal("Expected error but got nil")
}
if err.Error() != "500 Internal Server Error: an error occurred" {
t.Fatalf("Did not receive expected error, got: %v", err)
}
close(done)
}