Merge branch 'master' into pool

This commit is contained in:
Asim 2016-06-06 14:06:03 +01:00
commit 739b094cd2
26 changed files with 327 additions and 119 deletions

4
broker/doc.go Normal file
View File

@ -0,0 +1,4 @@
/*
Package broker is an interface used for asynchronous messaging.
*/
package broker

View File

@ -1,24 +1,3 @@
/*
Package client provides a method to make synchronous, asynchronous and
streaming requests to services. By default json and protobuf codecs are
supported.
import "github.com/micro/go-micro/client"
c := client.NewClient()
req := c.NewRequest("go.micro.srv.greeter", "Greeter.Hello", &greeter.Request{
Name: "John",
})
rsp := &greeter.Response{}
if err := c.Call(context.Background(), req, rsp); err != nil {
return err
}
fmt.Println(rsp.Msg)
*/
package client
import (

23
client/doc.go Normal file
View File

@ -0,0 +1,23 @@
/*
Package client is an interface for making requests.
It provides a method to make synchronous, asynchronous and streaming requests to services.
By default json and protobuf codecs are supported.
import "github.com/micro/go-micro/client"
c := client.NewClient()
req := c.NewRequest("go.micro.srv.greeter", "Greeter.Hello", &greeter.Request{
Name: "John",
})
rsp := &greeter.Response{}
if err := c.Call(context.Background(), req, rsp); err != nil {
return err
}
fmt.Println(rsp.Msg)
*/
package client

View File

@ -123,6 +123,8 @@ func (m *MockClient) CallRemote(ctx context.Context, addr string, req client.Req
}
v.Set(reflect.ValueOf(r.Response))
return nil
}
return fmt.Errorf("rpc: can't find service %s", req.Method())

4
cmd/doc.go Normal file
View File

@ -0,0 +1,4 @@
/*
Package cmd is an interface for parsing the command line.
*/
package cmd

4
codec/doc.go Normal file
View File

@ -0,0 +1,4 @@
/*
Package codec is an interface for encoding messages.
*/
package codec

4
errors/doc.go Normal file
View File

@ -0,0 +1,4 @@
/*
Package errors is an interface for defining detailed errors.
*/
package errors

4
metadata/doc.go Normal file
View File

@ -0,0 +1,4 @@
/*
Package metadata is a way of defining message headers.
*/
package metadata

4
registry/doc.go Normal file
View File

@ -0,0 +1,4 @@
/*
Package registry is an interface for service discovery.
*/
package registry

View File

@ -45,10 +45,6 @@ func encodeEndpoints(en []*Endpoint) []string {
var tags []string
for _, e := range en {
if b, err := json.Marshal(e); err == nil {
// old encoding
// TODO: remove in 09/2016
tags = append(tags, "e="+string(b))
// new encoding
tags = append(tags, "e-"+encode(b))
}
}
@ -100,9 +96,6 @@ func encodeMetadata(md map[string]string) []string {
if b, err := json.Marshal(map[string]string{
k: v,
}); err == nil {
// old encoding
// TODO: remove in 09/2016
tags = append(tags, "t="+string(b))
// new encoding
tags = append(tags, "t-"+encode(b))
}
@ -152,13 +145,7 @@ func decodeMetadata(tags []string) map[string]string {
}
func encodeVersion(v string) []string {
return []string{
// old encoding,
// TODO: remove in 09/2016
"v=" + v,
// new encoding,
"v-" + encode([]byte(v)),
}
return []string{"v-" + encode([]byte(v))}
}
func decodeVersion(tags []string) (string, bool) {

View File

@ -57,8 +57,8 @@ func TestEncodingEndpoints(t *testing.T) {
e := encodeEndpoints([]*Endpoint{ep})
// check there are two tags; old and new
if len(e) != 2 {
t.Fatalf("Expected 2 encoded tags, got %v", e)
if len(e) != 1 {
t.Fatalf("Expected 1 encoded tags, got %v", e)
}
// check old encoding
@ -104,30 +104,24 @@ func TestEncodingEndpoints(t *testing.T) {
// HEX encoded
hencoded := encode(jencoded)
// endpoint tag
jepTag := "e=" + string(jencoded)
hepTag := "e-" + hencoded
// test old
testEp(ep, jepTag)
// test new
testEp(ep, hepTag)
}
}
func TestEncodingVersion(t *testing.T) {
testData := []struct {
decoded string
encoded string
oldEncoded string
decoded string
encoded string
}{
{"1.0.0", "v-789c32d433d03300040000ffff02ce00ee", "v=1.0.0"},
{"latest", "v-789cca492c492d2e01040000ffff08cc028e", "v=latest"},
{"1.0.0", "v-789c32d433d03300040000ffff02ce00ee"},
{"latest", "v-789cca492c492d2e01040000ffff08cc028e"},
}
for _, data := range testData {
e := encodeVersion(data.decoded)
if e[1] != data.encoded {
if e[0] != data.encoded {
t.Fatalf("Expected %s got %s", data.encoded, e)
}
@ -148,14 +142,5 @@ func TestEncodingVersion(t *testing.T) {
if d != data.decoded {
t.Fatalf("Expected %s got %s", data.decoded, d)
}
d, ok = decodeVersion([]string{data.oldEncoded})
if !ok {
t.Fatalf("Unexpected %t for %s", ok, data.oldEncoded)
}
if d != data.decoded {
t.Fatalf("Expected %s got %s", data.decoded, d)
}
}
}

View File

@ -90,7 +90,7 @@ func TestBlackList(t *testing.T) {
}
// blacklist all of it
for i := 0; i < 9; i++ {
for i := 0; i < 20; i++ {
node, err = next()
if err != nil {
t.Fatal(err)

8
selector/doc.go Normal file
View File

@ -0,0 +1,8 @@
/*
Package selector is a way to load balance service nodes.
It algorithmically filter and return nodes required by the client or any other system.
Selector's implemented by Micro build on the registry but it's of optional use. One could
provide a static Selector that has a fixed pool.
*/
package selector

View File

@ -1,9 +1,3 @@
/*
The Selector package provides a way to algorithmically filter and return
nodes required by the client or any other system. Selector's implemented
by Micro build on the registry but it's of optional use. One could
provide a static Selector that has a fixed pool.
*/
package selector
import (

View File

@ -2,9 +2,6 @@ package server
import (
"github.com/micro/go-micro/server/debug"
proto "github.com/micro/go-micro/server/debug/proto"
"golang.org/x/net/context"
)
// We use this to wrap any debug handlers so we preserve the signature Debug.{Method}
@ -12,10 +9,6 @@ type Debug struct {
debug.DebugHandler
}
func (d *Debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto.HealthResponse) error {
return d.DebugHandler.Health(ctx, req, rsp)
}
func registerDebugHandler(s Server) {
s.Handle(s.NewHandler(&Debug{s.Options().DebugHandler}, InternalHandler(true)))
}

View File

@ -1,6 +1,9 @@
package debug
import (
"runtime"
"time"
proto "github.com/micro/go-micro/server/debug/proto"
"golang.org/x/net/context"
@ -12,16 +15,37 @@ import (
// and /varz
type DebugHandler interface {
Health(ctx context.Context, req *proto.HealthRequest, rsp *proto.HealthResponse) error
Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.StatsResponse) error
}
// Our own internal handler
type debug struct{}
type debug struct {
started int64
}
var (
DefaultDebugHandler DebugHandler = new(debug)
DefaultDebugHandler DebugHandler = newDebug()
)
func newDebug() *debug {
return &debug{
started: time.Now().Unix(),
}
}
func (d *debug) Health(ctx context.Context, req *proto.HealthRequest, rsp *proto.HealthResponse) error {
rsp.Status = "ok"
return nil
}
func (d *debug) Stats(ctx context.Context, req *proto.StatsRequest, rsp *proto.StatsResponse) error {
var mstat runtime.MemStats
runtime.ReadMemStats(&mstat)
rsp.Started = uint64(d.started)
rsp.Uptime = uint64(time.Now().Unix() - d.started)
rsp.Memory = mstat.Alloc
rsp.Gc = mstat.PauseTotalNs
rsp.Threads = uint64(runtime.NumGoroutine())
return nil
}

View File

@ -1,16 +1,18 @@
// Code generated by protoc-gen-go.
// source: go-micro/server/debug/proto/debug.proto
// source: github.com/micro/go-micro/server/debug/proto/debug.proto
// DO NOT EDIT!
/*
Package debug is a generated protocol buffer package.
It is generated from these files:
go-micro/server/debug/proto/debug.proto
github.com/micro/go-micro/server/debug/proto/debug.proto
It has these top-level messages:
HealthRequest
HealthResponse
StatsRequest
StatsResponse
*/
package debug
@ -23,6 +25,12 @@ var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type HealthRequest struct {
}
@ -32,6 +40,7 @@ func (*HealthRequest) ProtoMessage() {}
func (*HealthRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type HealthResponse struct {
// default: ok
Status string `protobuf:"bytes,1,opt,name=status" json:"status,omitempty"`
}
@ -40,18 +49,52 @@ func (m *HealthResponse) String() string { return proto.CompactTextSt
func (*HealthResponse) ProtoMessage() {}
func (*HealthResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
type StatsRequest struct {
}
func (m *StatsRequest) Reset() { *m = StatsRequest{} }
func (m *StatsRequest) String() string { return proto.CompactTextString(m) }
func (*StatsRequest) ProtoMessage() {}
func (*StatsRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
type StatsResponse struct {
// unix timestamp
Started uint64 `protobuf:"varint,1,opt,name=started" json:"started,omitempty"`
// in seconds
Uptime uint64 `protobuf:"varint,2,opt,name=uptime" json:"uptime,omitempty"`
// in bytes
Memory uint64 `protobuf:"varint,3,opt,name=memory" json:"memory,omitempty"`
// num threads
Threads uint64 `protobuf:"varint,4,opt,name=threads" json:"threads,omitempty"`
// in nanoseconds
Gc uint64 `protobuf:"varint,5,opt,name=gc" json:"gc,omitempty"`
}
func (m *StatsResponse) Reset() { *m = StatsResponse{} }
func (m *StatsResponse) String() string { return proto.CompactTextString(m) }
func (*StatsResponse) ProtoMessage() {}
func (*StatsResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func init() {
proto.RegisterType((*HealthRequest)(nil), "HealthRequest")
proto.RegisterType((*HealthResponse)(nil), "HealthResponse")
proto.RegisterType((*StatsRequest)(nil), "StatsRequest")
proto.RegisterType((*StatsResponse)(nil), "StatsResponse")
}
var fileDescriptor0 = []byte{
// 107 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0x52, 0x4f, 0xcf, 0xd7, 0xcd,
0xcd, 0x4c, 0x2e, 0xca, 0xd7, 0x2f, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d, 0xd2, 0x4f, 0x49, 0x4d, 0x2a,
0x4d, 0xd7, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0x87, 0xb0, 0xf5, 0xc0, 0x6c, 0x25, 0x7e, 0x2e, 0x5e,
0x8f, 0xd4, 0xc4, 0x9c, 0x92, 0x8c, 0xa0, 0xd4, 0xc2, 0xd2, 0xd4, 0xe2, 0x12, 0x25, 0x05, 0x2e,
0x3e, 0x98, 0x40, 0x71, 0x41, 0x7e, 0x5e, 0x71, 0xaa, 0x10, 0x1f, 0x17, 0x5b, 0x71, 0x49, 0x62,
0x49, 0x69, 0xb1, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x67, 0x12, 0x1b, 0x58, 0xa7, 0x31, 0x20, 0x00,
0x00, 0xff, 0xff, 0x1c, 0xef, 0x98, 0xac, 0x64, 0x00, 0x00, 0x00,
// 201 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x34, 0x8f, 0xbd, 0x6e, 0x83, 0x30,
0x14, 0x85, 0x05, 0xa5, 0x54, 0xbd, 0x2a, 0x54, 0x62, 0xa8, 0x3c, 0x56, 0x4c, 0x2c, 0xc5, 0x43,
0x97, 0x3e, 0x42, 0x67, 0xf2, 0x04, 0xfc, 0x5c, 0x19, 0xa4, 0x38, 0x26, 0xbe, 0xd7, 0x91, 0x32,
0xe7, 0xc5, 0x03, 0xb6, 0xd9, 0xce, 0xf7, 0xd9, 0xe7, 0x48, 0x17, 0xfe, 0xd4, 0xc2, 0xb3, 0x1b,
0xda, 0xd1, 0x68, 0xa9, 0x97, 0xd1, 0x1a, 0xa9, 0xcc, 0x4f, 0x08, 0x84, 0xf6, 0x86, 0x56, 0x4e,
0x38, 0x38, 0x25, 0x57, 0x6b, 0xd8, 0x84, 0xdc, 0xfa, 0x5c, 0x7f, 0x42, 0xf1, 0x8f, 0xfd, 0x99,
0xe7, 0x0e, 0xaf, 0x0e, 0x89, 0xeb, 0x06, 0xca, 0x43, 0xd0, 0x6a, 0x2e, 0x84, 0xd5, 0x17, 0xe4,
0xc4, 0x3d, 0x3b, 0x12, 0xc9, 0x77, 0xd2, 0xbc, 0x77, 0x91, 0xea, 0x12, 0x3e, 0x4e, 0x5b, 0xa2,
0xa3, 0xf9, 0x48, 0xa0, 0x88, 0x22, 0x36, 0x05, 0xbc, 0x6d, 0x7f, 0x2d, 0xe3, 0xe4, 0xab, 0x59,
0x77, 0xe0, 0xbe, 0xe9, 0x56, 0x5e, 0x34, 0x8a, 0xd4, 0x3f, 0x44, 0xda, 0xbd, 0x46, 0x6d, 0xec,
0x5d, 0xbc, 0x04, 0x1f, 0x68, 0x5f, 0xe2, 0xd9, 0x62, 0x3f, 0x91, 0xc8, 0xc2, 0x52, 0xc4, 0xaa,
0x84, 0x54, 0x8d, 0xe2, 0xd5, 0xcb, 0x2d, 0x0d, 0xb9, 0xbf, 0xeb, 0xf7, 0x19, 0x00, 0x00, 0xff,
0xff, 0xc6, 0x75, 0x51, 0x35, 0x13, 0x01, 0x00, 0x00,
}

View File

@ -6,11 +6,29 @@ syntax = "proto3";
//
// service Debug {
// rpc Health(HealthRequest) returns (HealthResponse) {}
// rpc Stats(StatsRequest) returns (StatsResponse) {}
// }
message HealthRequest {
}
message HealthResponse {
// default: ok
string status = 1;
}
message StatsRequest {
}
message StatsResponse {
// unix timestamp
uint64 started = 1;
// in seconds
uint64 uptime = 2;
// in bytes
uint64 memory = 3;
// num threads
uint64 threads = 4;
// total gc in nanoseconds
uint64 gc = 5;
}

31
server/doc.go Normal file
View File

@ -0,0 +1,31 @@
/*
Package server is an interface for a micro server.
It represents a server instance in go-micro which handles synchronous
requests via handlers and asynchronous requests via subscribers that
register with a broker.
The server combines the all the packages in go-micro to create a whole unit
used for building applications including discovery, client/server communication
and pub/sub.
import "github.com/micro/go-micro/server"
type Greeter struct {}
func (g *Greeter) Hello(ctx context.Context, req *greeter.Request, rsp *greeter.Response) error {
rsp.Msg = "Hello " + req.Name
return nil
}
s := server.NewServer()
s.Handle(
s.NewHandler(&Greeter{}),
)
s.Start()
*/
package server

View File

@ -34,6 +34,7 @@ type Subscriber interface {
type HandlerOptions struct {
Internal bool
Metadata map[string]map[string]string
}
type SubscriberOptions struct {
@ -41,6 +42,14 @@ type SubscriberOptions struct {
Internal bool
}
// EndpointMetadata is a Handler option that allows metadata to be added to
// individual endpoints.
func EndpointMetadata(name string, md map[string]string) HandlerOption {
return func(o *HandlerOptions) {
o.Metadata[name] = md
}
}
// Internal Handler options specifies that a handler is not advertised
// to the discovery system. In the future this may also limit request
// to the internal network or authorised user.

View File

@ -14,7 +14,10 @@ type rpcHandler struct {
}
func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler {
var options HandlerOptions
options := HandlerOptions{
Metadata: make(map[string]map[string]string),
}
for _, o := range opts {
o(&options)
}
@ -28,6 +31,11 @@ func newRpcHandler(handler interface{}, opts ...HandlerOption) Handler {
for m := 0; m < typ.NumMethod(); m++ {
if e := extractEndpoint(typ.Method(m)); e != nil {
e.Name = name + "." + e.Name
for k, v := range options.Metadata[e.Name] {
e.Metadata[k] = v
}
endpoints = append(endpoints, e)
}
}

View File

@ -1,31 +1,3 @@
/*
Server represents a server instance in go-micro which handles synchronous
requests via handlers and asynchronous requests via subscribers that
register with a broker.
The server combines the all the packages in go-micro to create a whole unit
used for building applications including discovery, client/server communication
and pub/sub.
import "github.com/micro/go-micro/server"
type Greeter struct {}
func (g *Greeter) Hello(ctx context.Context, req *greeter.Request, rsp *greeter.Response) error {
rsp.Msg = "Hello " + req.Name
return nil
}
s := server.NewServer()
s.Handle(
s.NewHandler(&Greeter{}),
)
s.Start()
*/
package server
import (

4
transport/doc.go Normal file
View File

@ -0,0 +1,4 @@
/*
Package is an interface for synchronous communication.
*/
package transport

View File

@ -409,7 +409,19 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err
fn := func(addr string) (net.Listener, error) {
if config == nil {
cert, err := mls.Certificate(addr)
hosts := []string{addr}
// check if its a valid host:port
if host, _, err := net.SplitHostPort(addr); err == nil {
if len(host) == 0 {
hosts = getIPAddrs()
} else {
hosts = []string{host}
}
}
// generate a certificate
cert, err := mls.Certificate(hosts...)
if err != nil {
return nil, err
}
@ -440,6 +452,45 @@ func (h *httpTransport) String() string {
return "http"
}
func getIPAddrs() []string {
ifaces, err := net.Interfaces()
if err != nil {
return nil
}
var ipAddrs []string
for _, i := range ifaces {
addrs, err := i.Addrs()
if err != nil {
continue
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip == nil {
continue
}
ip = ip.To4()
if ip == nil {
continue
}
ipAddrs = append(ipAddrs, ip.String())
}
}
return ipAddrs
}
func newHTTPTransport(opts ...Option) *httpTransport {
var options Options
for _, o := range opts {

View File

@ -2,7 +2,11 @@ package mock
import (
"errors"
"fmt"
"math/rand"
"strings"
"sync"
"time"
"github.com/micro/go-micro/transport"
)
@ -59,7 +63,12 @@ func (ms *mockSocket) Send(m *transport.Message) error {
}
func (ms *mockSocket) Close() error {
close(ms.exit)
select {
case <-ms.exit:
return nil
default:
close(ms.exit)
}
return nil
}
@ -68,7 +77,12 @@ func (m *mockListener) Addr() string {
}
func (m *mockListener) Close() error {
close(m.exit)
select {
case <-m.exit:
return nil
default:
close(m.exit)
}
return nil
}
@ -126,15 +140,25 @@ func (m *mockTransport) Listen(addr string, opts ...transport.ListenOption) (tra
m.Lock()
defer m.Unlock()
if _, ok := m.listeners[addr]; ok {
return nil, errors.New("already listening on " + addr)
}
var options transport.ListenOptions
for _, o := range opts {
o(&options)
}
parts := strings.Split(addr, ":")
// if zero port then randomly assign one
if len(parts) > 1 && parts[len(parts)-1] == "0" {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
i := r.Intn(10000)
// set addr with port
addr = fmt.Sprintf("%s:%d", parts[:len(parts)-1], 10000+i)
}
if _, ok := m.listeners[addr]; ok {
return nil, errors.New("already listening on " + addr)
}
listener := &mockListener{
opts: options,
addr: addr,

View File

@ -58,3 +58,32 @@ func TestTransport(t *testing.T) {
}
}
func TestListener(t *testing.T) {
tr := NewTransport()
// bind / listen on random port
l, err := tr.Listen(":0")
if err != nil {
t.Fatalf("Unexpected error listening %v", err)
}
defer l.Close()
// try again
l2, err := tr.Listen(":0")
if err != nil {
t.Fatalf("Unexpected error listening %v", err)
}
defer l2.Close()
// now make sure it still fails
l3, err := tr.Listen(":8080")
if err != nil {
t.Fatalf("Unexpected error listening %v", err)
}
defer l3.Close()
if _, err := tr.Listen(":8080"); err == nil {
t.Fatal("Expected error binding to :8080 got nil")
}
}