Change initialisation and add metadata

This commit is contained in:
Asim 2015-05-26 22:39:48 +01:00
parent 7aa2c82ced
commit 36b5ca46fe
16 changed files with 182 additions and 93 deletions

View File

@ -77,7 +77,7 @@ import (
type Example struct{} type Example struct{}
func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error {
md, _ := c.GetMetaData(ctx) md, _ := c.GetMetadata(ctx)
log.Info("Received Example.Call request with metadata: %v", md) log.Info("Received Example.Call request with metadata: %v", md)
rsp.Msg = server.Id + ": Hello " + req.Name rsp.Msg = server.Id + ": Hello " + req.Name
return nil return nil

View File

@ -165,7 +165,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return return
} }
ctx := c.WithMetaData(context.Background(), e.Header) ctx := c.WithMetadata(context.Background(), e.Header)
h.RLock() h.RLock()
for _, subscriber := range h.subscribers[e.Message.Topic] { for _, subscriber := range h.subscribers[e.Message.Topic] {
@ -208,7 +208,7 @@ func (h *httpBroker) Publish(ctx context.Context, topic string, body []byte) err
Body: body, Body: body,
} }
header, _ := c.GetMetaData(ctx) header, _ := c.GetMetadata(ctx)
b, err := json.Marshal(&envelope{ b, err := json.Marshal(&envelope{
header, header,

View File

@ -68,7 +68,7 @@ func (n *nbroker) Init() error {
} }
func (n *nbroker) Publish(ctx context.Context, topic string, body []byte) error { func (n *nbroker) Publish(ctx context.Context, topic string, body []byte) error {
header, _ := c.GetMetaData(ctx) header, _ := c.GetMetadata(ctx)
message := &broker.Message{ message := &broker.Message{
Id: uuid.NewUUID().String(), Id: uuid.NewUUID().String(),
@ -93,7 +93,7 @@ func (n *nbroker) Subscribe(topic string, function func(context.Context, *broker
if err := json.Unmarshal(msg.Data, &e); err != nil { if err := json.Unmarshal(msg.Data, &e); err != nil {
return return
} }
ctx := c.WithMetaData(context.Background(), e.Header) ctx := c.WithMetadata(context.Background(), e.Header)
function(ctx, e.Message) function(ctx, e.Message)
}) })
if err != nil { if err != nil {

View File

@ -96,7 +96,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r
Body: reqB.Bytes(), Body: reqB.Bytes(),
} }
md, ok := c.GetMetaData(ctx) md, ok := c.GetMetadata(ctx)
if ok { if ok {
for k, v := range md { for k, v := range md {
msg.Header[k] = v msg.Header[k] = v

View File

@ -30,12 +30,28 @@ import (
var ( var (
Flags = []cli.Flag{ Flags = []cli.Flag{
cli.StringFlag{
Name: "server_name",
EnvVar: "MICRO_SERVER_NAME",
Usage: "Name of the server. go.micro.srv.example",
},
cli.StringFlag{
Name: "server_id",
EnvVar: "MICRO_SERVER_ID",
Usage: "Id of the server. Auto-generated if not specified",
},
cli.StringFlag{ cli.StringFlag{
Name: "server_address", Name: "server_address",
EnvVar: "MICRO_SERVER_ADDRESS", EnvVar: "MICRO_SERVER_ADDRESS",
Value: ":0", Value: ":0",
Usage: "Bind address for the server. 127.0.0.1:8080", Usage: "Bind address for the server. 127.0.0.1:8080",
}, },
cli.StringSliceFlag{
Name: "server_metadata",
EnvVar: "MICRO_SERVER_METADATA",
Value: &cli.StringSlice{},
Usage: "A list of key-value pairs defining metadata. version=1.0.0",
},
cli.StringFlag{ cli.StringFlag{
Name: "broker", Name: "broker",
EnvVar: "MICRO_BROKER", EnvVar: "MICRO_BROKER",
@ -73,8 +89,6 @@ var (
) )
func Setup(c *cli.Context) error { func Setup(c *cli.Context) error {
server.Address = c.String("server_address")
bAddrs := strings.Split(c.String("broker_address"), ",") bAddrs := strings.Split(c.String("broker_address"), ",")
switch c.String("broker") { switch c.String("broker") {
@ -104,6 +118,24 @@ func Setup(c *cli.Context) error {
transport.DefaultTransport = tnats.NewTransport(tAddrs) transport.DefaultTransport = tnats.NewTransport(tAddrs)
} }
metadata := make(map[string]string)
for _, d := range c.StringSlice("server_metadata") {
var key, val string
parts := strings.Split(d, "=")
key = parts[0]
if len(parts) > 1 {
val = strings.Join(parts[1:], "=")
}
metadata[key] = val
}
server.DefaultServer = server.NewServer(
server.Name(c.String("server_name")),
server.Id(c.String("server_id")),
server.Address(c.String("server_address")),
server.Metadata(metadata),
)
client.DefaultClient = client.NewClient() client.DefaultClient = client.NewClient()
return nil return nil

View File

@ -10,13 +10,13 @@ const (
mdKey = key(0) mdKey = key(0)
) )
type MetaData map[string]string type Metadata map[string]string
func GetMetaData(ctx context.Context) (MetaData, bool) { func GetMetadata(ctx context.Context) (Metadata, bool) {
md, ok := ctx.Value(mdKey).(MetaData) md, ok := ctx.Value(mdKey).(Metadata)
return md, ok return md, ok
} }
func WithMetaData(ctx context.Context, md MetaData) context.Context { func WithMetadata(ctx context.Context, md Metadata) context.Context {
return context.WithValue(ctx, mdKey, md) return context.WithValue(ctx, mdKey, md)
} }

View File

@ -19,7 +19,7 @@ func main() {
}) })
// create context with metadata // create context with metadata
ctx := c.WithMetaData(context.Background(), map[string]string{ ctx := c.WithMetadata(context.Background(), map[string]string{
"X-User-Id": "john", "X-User-Id": "john",
"X-From-Id": "script", "X-From-Id": "script",
}) })

View File

@ -20,7 +20,7 @@ func pub() {
tick := time.NewTicker(time.Second) tick := time.NewTicker(time.Second)
i := 0 i := 0
for _ = range tick.C { for _ = range tick.C {
ctx := c.WithMetaData(context.Background(), map[string]string{ ctx := c.WithMetadata(context.Background(), map[string]string{
"id": fmt.Sprintf("%d", i), "id": fmt.Sprintf("%d", i),
}) })
@ -36,7 +36,7 @@ func pub() {
func sub() { func sub() {
_, err := broker.Subscribe(topic, func(ctx context.Context, msg *broker.Message) { _, err := broker.Subscribe(topic, func(ctx context.Context, msg *broker.Message) {
md, _ := c.GetMetaData(ctx) md, _ := c.GetMetadata(ctx)
fmt.Println("[sub] received message:", string(msg.Body), "context", md) fmt.Println("[sub] received message:", string(msg.Body), "context", md)
}) })
if err != nil { if err != nil {

View File

@ -12,8 +12,8 @@ import (
type Example struct{} type Example struct{}
func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error { func (e *Example) Call(ctx context.Context, req *example.Request, rsp *example.Response) error {
md, _ := c.GetMetaData(ctx) md, _ := c.GetMetadata(ctx)
log.Info("Received Example.Call request with metadata: %v", md) log.Info("Received Example.Call request with metadata: %v", md)
rsp.Msg = server.Id + ": Hello " + req.Name rsp.Msg = server.Config().Id() + ": Hello " + req.Name
return nil return nil
} }

View File

@ -11,10 +11,10 @@ func main() {
// optionally setup command line usage // optionally setup command line usage
cmd.Init() cmd.Init()
server.Name = "go.micro.srv.example"
// Initialise Server // Initialise Server
server.Init() server.Init(
server.Name("go.micro.srv.example"),
)
// Register Handlers // Register Handlers
server.Register( server.Register(

View File

@ -16,7 +16,7 @@ type consulRegistry struct {
services map[string]*Service services map[string]*Service
} }
func encodeMetaData(md map[string]string) []string { func encodeMetadata(md map[string]string) []string {
var tags []string var tags []string
for k, v := range md { for k, v := range md {
if b, err := json.Marshal(map[string]string{ if b, err := json.Marshal(map[string]string{
@ -28,7 +28,7 @@ func encodeMetaData(md map[string]string) []string {
return tags return tags
} }
func decodeMetaData(tags []string) map[string]string { func decodeMetadata(tags []string) map[string]string {
md := make(map[string]string) md := make(map[string]string)
for _, tag := range tags { for _, tag := range tags {
var kv map[string]string var kv map[string]string
@ -81,7 +81,7 @@ func (c *consulRegistry) Register(s *Service) error {
node := s.Nodes[0] node := s.Nodes[0]
tags := encodeMetaData(node.MetaData) tags := encodeMetadata(node.Metadata)
_, err := c.Client.Catalog().Register(&consul.CatalogRegistration{ _, err := c.Client.Catalog().Register(&consul.CatalogRegistration{
Node: node.Id, Node: node.Id,
@ -123,7 +123,7 @@ func (c *consulRegistry) GetService(name string) (*Service, error) {
Id: s.ServiceID, Id: s.ServiceID,
Address: s.Address, Address: s.Address,
Port: s.ServicePort, Port: s.ServicePort,
MetaData: decodeMetaData(s.ServiceTags), Metadata: decodeMetadata(s.ServiceTags),
}) })
} }

View File

@ -45,7 +45,7 @@ func (cw *consulWatcher) serviceHandler(idx uint64, data interface{}) {
Id: e.Service.ID, Id: e.Service.ID,
Address: e.Node.Address, Address: e.Node.Address,
Port: e.Service.Port, Port: e.Service.Port,
MetaData: decodeMetaData(e.Service.Tags), Metadata: decodeMetadata(e.Service.Tags),
}) })
} }

View File

@ -9,7 +9,7 @@ type Registry interface {
type Service struct { type Service struct {
Name string Name string
MetaData map[string]string Metadata map[string]string
Nodes []*Node Nodes []*Node
} }
@ -17,7 +17,7 @@ type Node struct {
Id string Id string
Address string Address string
Port int Port int
MetaData map[string]string Metadata map[string]string
} }
type options struct{} type options struct{}

55
server/options.go Normal file
View File

@ -0,0 +1,55 @@
package server
import (
"github.com/myodc/go-micro/transport"
)
type options struct {
transport transport.Transport
metadata map[string]string
name string
address string
id string
}
func newOptions(opt ...Option) options {
var opts options
for _, o := range opt {
o(&opts)
}
if opts.transport == nil {
opts.transport = transport.DefaultTransport
}
if len(opts.address) == 0 {
opts.address = DefaultAddress
}
if len(opts.name) == 0 {
opts.name = DefaultName
}
if len(opts.id) == 0 {
opts.id = DefaultId
}
return opts
}
func (o options) Name() string {
return o.name
}
func (o options) Id() string {
return o.name + "-" + o.id
}
func (o options) Address() string {
return o.address
}
func (o options) Metadata() map[string]string {
return o.metadata
}

View File

@ -2,7 +2,6 @@ package server
import ( import (
"bytes" "bytes"
"sync"
c "github.com/myodc/go-micro/context" c "github.com/myodc/go-micro/context"
"github.com/myodc/go-micro/transport" "github.com/myodc/go-micro/transport"
@ -16,27 +15,14 @@ import (
) )
type rpcServer struct { type rpcServer struct {
mtx sync.RWMutex
address string
opts options opts options
rpc *rpc.Server rpc *rpc.Server
exit chan chan error exit chan chan error
} }
func newRpcServer(address string, opt ...Option) Server { func newRpcServer(opts ...Option) Server {
var opts options
for _, o := range opt {
o(&opts)
}
if opts.transport == nil {
opts.transport = transport.DefaultTransport
}
return &rpcServer{ return &rpcServer{
opts: opts, opts: newOptions(opts...),
address: address,
rpc: rpc.NewServer(), rpc: rpc.NewServer(),
exit: make(chan chan error), exit: make(chan chan error),
} }
@ -72,7 +58,7 @@ func (s *rpcServer) accept(sock transport.Socket) {
ct := msg.Header["Content-Type"] ct := msg.Header["Content-Type"]
delete(msg.Header, "Content-Type") delete(msg.Header, "Content-Type")
ctx := c.WithMetaData(context.Background(), msg.Header) ctx := c.WithMetadata(context.Background(), msg.Header)
if err := s.rpc.ServeRequestWithContext(ctx, cc); err != nil { if err := s.rpc.ServeRequestWithContext(ctx, cc); err != nil {
return return
@ -86,15 +72,17 @@ func (s *rpcServer) accept(sock transport.Socket) {
}) })
} }
func (s *rpcServer) Address() string { func (s *rpcServer) Config() options {
s.mtx.RLock() return s.opts
address := s.address
s.mtx.RUnlock()
return address
} }
func (s *rpcServer) Init() error { func (s *rpcServer) Init(opts ...Option) {
return nil for _, opt := range opts {
opt(&s.opts)
}
if len(s.opts.id) == 0 {
s.opts.id = s.opts.name + "-" + DefaultId
}
} }
func (s *rpcServer) NewReceiver(handler interface{}) Receiver { func (s *rpcServer) NewReceiver(handler interface{}) Receiver {
@ -118,16 +106,14 @@ func (s *rpcServer) Register(r Receiver) error {
func (s *rpcServer) Start() error { func (s *rpcServer) Start() error {
registerHealthChecker(s) registerHealthChecker(s)
ts, err := s.opts.transport.Listen(s.address) ts, err := s.opts.transport.Listen(s.opts.address)
if err != nil { if err != nil {
return err return err
} }
log.Infof("Listening on %s", ts.Addr()) log.Infof("Listening on %s", ts.Addr())
s.mtx.RLock() s.opts.address = ts.Addr()
s.address = ts.Addr()
s.mtx.RUnlock()
go ts.Accept(s.accept) go ts.Accept(s.accept)

View File

@ -14,8 +14,8 @@ import (
) )
type Server interface { type Server interface {
Address() string Config() options
Init() error Init(...Option)
NewReceiver(interface{}) Receiver NewReceiver(interface{}) Receiver
NewNamedReceiver(string, interface{}) Receiver NewNamedReceiver(string, interface{}) Receiver
Register(Receiver) error Register(Receiver) error
@ -23,45 +23,58 @@ type Server interface {
Stop() error Stop() error
} }
type options struct {
transport transport.Transport
}
type Option func(*options) type Option func(*options)
var ( var (
Address string DefaultAddress = ":0"
Name string DefaultName = "go-server"
Id string DefaultId = uuid.NewUUID().String()
DefaultServer Server DefaultServer Server = newRpcServer()
) )
func Name(n string) Option {
return func(o *options) {
o.name = n
}
}
func Id(id string) Option {
return func(o *options) {
o.id = id
}
}
func Address(a string) Option {
return func(o *options) {
o.address = a
}
}
func Transport(t transport.Transport) Option { func Transport(t transport.Transport) Option {
return func(o *options) { return func(o *options) {
o.transport = t o.transport = t
} }
} }
func Init() error { func Metadata(md map[string]string) Option {
defer log.Flush() return func(o *options) {
o.metadata = md
if len(Name) == 0 { }
Name = "go-server"
} }
if len(Id) == 0 { func Config() options {
Id = Name + "-" + uuid.NewUUID().String() return DefaultServer.Config()
} }
func Init(opt ...Option) {
if DefaultServer == nil { if DefaultServer == nil {
DefaultServer = newRpcServer(Address) DefaultServer = newRpcServer(opt...)
}
DefaultServer.Init(opt...)
} }
return DefaultServer.Init() func NewServer(opt ...Option) Server {
} return newRpcServer(opt...)
func NewServer(address string, opt ...Option) Server {
return newRpcServer(address, opt...)
} }
func NewReceiver(handler interface{}) Receiver { func NewReceiver(handler interface{}) Receiver {
@ -82,9 +95,10 @@ func Run() error {
} }
// parse address for host, port // parse address for host, port
config := DefaultServer.Config()
var host string var host string
var port int var port int
parts := strings.Split(DefaultServer.Address(), ":") parts := strings.Split(config.Address(), ":")
if len(parts) > 1 { if len(parts) > 1 {
host = strings.Join(parts[:len(parts)-1], ":") host = strings.Join(parts[:len(parts)-1], ":")
port, _ = strconv.Atoi(parts[len(parts)-1]) port, _ = strconv.Atoi(parts[len(parts)-1])
@ -94,13 +108,14 @@ func Run() error {
// register service // register service
node := &registry.Node{ node := &registry.Node{
Id: Id, Id: config.Id(),
Address: host, Address: host,
Port: port, Port: port,
Metadata: config.Metadata(),
} }
service := &registry.Service{ service := &registry.Service{
Name: Name, Name: config.Name(),
Nodes: []*registry.Node{node}, Nodes: []*registry.Node{node},
} }
@ -122,7 +137,8 @@ func Run() error {
} }
func Start() error { func Start() error {
log.Infof("Starting server %s id %s", Name, Id) config := DefaultServer.Config()
log.Infof("Starting server %s id %s", config.Name(), config.Id())
return DefaultServer.Start() return DefaultServer.Start()
} }