Add pub/sub to client/server and make broker more low level

This commit is contained in:
Asim 2015-06-12 19:52:27 +01:00
parent cdf2f2cbcd
commit b91af916f9
24 changed files with 542 additions and 193 deletions

View File

@ -1,23 +1,18 @@
package broker
import (
"code.google.com/p/go-uuid/uuid"
"golang.org/x/net/context"
)
type Broker interface {
Address() string
Connect() error
Disconnect() error
Init() error
Publish(context.Context, string, []byte) error
Subscribe(string, func(context.Context, *Message)) (Subscriber, error)
Publish(string, *Message) error
Subscribe(string, Handler) (Subscriber, error)
}
type Handler func(*Message)
type Message struct {
Id string
Timestamp int64
Topic string
Header map[string]string
Body []byte
}
@ -31,24 +26,14 @@ type options struct{}
type Option func(*options)
var (
Address string
Id string
DefaultBroker Broker
DefaultBroker Broker = newHttpBroker([]string{})
)
func NewBroker(addrs []string, opt ...Option) Broker {
return newHttpBroker([]string{Address}, opt...)
return newHttpBroker(addrs, opt...)
}
func Init() error {
if len(Id) == 0 {
Id = "broker-" + uuid.NewUUID().String()
}
if DefaultBroker == nil {
DefaultBroker = newHttpBroker([]string{Address})
}
return DefaultBroker.Init()
}
@ -60,10 +45,10 @@ func Disconnect() error {
return DefaultBroker.Disconnect()
}
func Publish(ctx context.Context, topic string, body []byte) error {
return DefaultBroker.Publish(ctx, topic, body)
func Publish(topic string, msg *Message) error {
return DefaultBroker.Publish(topic, msg)
}
func Subscribe(topic string, function func(context.Context, *Message)) (Subscriber, error) {
return DefaultBroker.Subscribe(topic, function)
func Subscribe(topic string, handler Handler) (Subscriber, error) {
return DefaultBroker.Subscribe(topic, handler)
}

View File

@ -7,21 +7,14 @@ import (
"io/ioutil"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
"code.google.com/p/go-uuid/uuid"
log "github.com/golang/glog"
c "github.com/myodc/go-micro/context"
"github.com/myodc/go-micro/errors"
"github.com/myodc/go-micro/registry"
"golang.org/x/net/context"
)
type httpBroker struct {
@ -39,28 +32,22 @@ type httpSubscriber struct {
id string
topic string
ch chan *httpSubscriber
fn func(context.Context, *Message)
fn Handler
svc *registry.Service
}
// used in brokers where there is no support for headers
type envelope struct {
Header map[string]string
Message *Message
}
var (
DefaultSubPath = "/_sub"
)
func newHttpBroker(addrs []string, opt ...Option) Broker {
addr := ":0"
if len(addrs) > 0 {
if len(addrs) > 0 && len(addrs[0]) > 0 {
addr = addrs[0]
}
return &httpBroker{
id: Id,
id: "broker-" + uuid.NewUUID().String(),
address: addr,
subscribers: make(map[string][]*httpSubscriber),
unsubscribe: make(chan *httpSubscriber),
@ -96,9 +83,6 @@ func (h *httpBroker) start() error {
go http.Serve(l, h)
go func() {
ce := make(chan os.Signal, 1)
signal.Notify(ce, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
for {
select {
case ch := <-h.exit:
@ -107,8 +91,6 @@ func (h *httpBroker) start() error {
h.running = false
h.Unlock()
return
case <-ce:
h.stop()
case subscriber := <-h.unsubscribe:
h.Lock()
var subscribers []*httpSubscriber
@ -150,26 +132,27 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
var e *envelope
if err = json.Unmarshal(b, &e); err != nil {
var m *Message
if err = json.Unmarshal(b, &m); err != nil {
errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error parsing request body: %v", err))
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
if len(e.Message.Topic) == 0 {
topic := m.Header[":topic"]
delete(m.Header, ":topic")
if len(topic) == 0 {
errr := errors.InternalServerError("go.micro.broker", "Topic not found")
w.WriteHeader(500)
w.Write([]byte(errr.Error()))
return
}
ctx := c.WithMetadata(context.Background(), e.Header)
h.RLock()
for _, subscriber := range h.subscribers[e.Message.Topic] {
subscriber.fn(ctx, e.Message)
for _, subscriber := range h.subscribers[topic] {
subscriber.fn(m)
}
h.RUnlock()
}
@ -195,26 +178,14 @@ func (h *httpBroker) Init() error {
return nil
}
func (h *httpBroker) Publish(ctx context.Context, topic string, body []byte) error {
func (h *httpBroker) Publish(topic string, msg *Message) error {
s, err := registry.GetService("topic:" + topic)
if err != nil {
return err
}
message := &Message{
Id: uuid.NewUUID().String(),
Timestamp: time.Now().Unix(),
Topic: topic,
Body: body,
}
header, _ := c.GetMetadata(ctx)
b, err := json.Marshal(&envelope{
header,
message,
})
msg.Header[":topic"] = topic
b, err := json.Marshal(msg)
if err != nil {
return err
}
@ -229,7 +200,7 @@ func (h *httpBroker) Publish(ctx context.Context, topic string, body []byte) err
return nil
}
func (h *httpBroker) Subscribe(topic string, function func(context.Context, *Message)) (Subscriber, error) {
func (h *httpBroker) Subscribe(topic string, handler Handler) (Subscriber, error) {
// parse address for host, port
parts := strings.Split(h.Address(), ":")
host := strings.Join(parts[:len(parts)-1], ":")
@ -251,11 +222,10 @@ func (h *httpBroker) Subscribe(topic string, function func(context.Context, *Mes
id: uuid.NewUUID().String(),
topic: topic,
ch: h.unsubscribe,
fn: function,
fn: handler,
svc: service,
}
log.Infof("Registering subscriber %s", node.Id)
if err := registry.Register(service); err != nil {
return nil, err
}

View File

@ -3,14 +3,9 @@ package nats
import (
"encoding/json"
"strings"
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/apcera/nats"
"github.com/myodc/go-micro/broker"
c "github.com/myodc/go-micro/context"
"golang.org/x/net/context"
)
type nbroker struct {
@ -22,12 +17,6 @@ type subscriber struct {
s *nats.Subscription
}
// used in brokers where there is no support for headers
type envelope struct {
Header map[string]string
Message *broker.Message
}
func (n *subscriber) Topic() string {
return n.s.Subject
}
@ -67,34 +56,21 @@ func (n *nbroker) Init() error {
return nil
}
func (n *nbroker) Publish(ctx context.Context, topic string, body []byte) error {
header, _ := c.GetMetadata(ctx)
message := &broker.Message{
Id: uuid.NewUUID().String(),
Timestamp: time.Now().Unix(),
Topic: topic,
Body: body,
}
b, err := json.Marshal(&envelope{
header,
message,
})
func (n *nbroker) Publish(topic string, msg *broker.Message) error {
b, err := json.Marshal(msg)
if err != nil {
return err
}
return n.conn.Publish(topic, b)
}
func (n *nbroker) Subscribe(topic string, function func(context.Context, *broker.Message)) (broker.Subscriber, error) {
func (n *nbroker) Subscribe(topic string, handler broker.Handler) (broker.Subscriber, error) {
sub, err := n.conn.Subscribe(topic, func(msg *nats.Msg) {
var e *envelope
if err := json.Unmarshal(msg.Data, &e); err != nil {
var m *broker.Message
if err := json.Unmarshal(msg.Data, &m); err != nil {
return
}
ctx := c.WithMetadata(context.Background(), e.Header)
function(ctx, e.Message)
handler(m)
})
if err != nil {
return nil, err

View File

@ -1,13 +1,8 @@
package rabbitmq
import (
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/myodc/go-micro/broker"
c "github.com/myodc/go-micro/context"
"github.com/streadway/amqp"
"golang.org/x/net/context"
)
type rbroker struct {
@ -28,24 +23,20 @@ func (s *subscriber) Unsubscribe() error {
return s.ch.Close()
}
func (r *rbroker) Publish(ctx context.Context, topic string, body []byte) error {
header, _ := c.GetMetadata(ctx)
msg := amqp.Publishing{
MessageId: uuid.NewUUID().String(),
Timestamp: time.Now().UTC(),
Body: body,
func (r *rbroker) Publish(topic string, msg *broker.Message) error {
m := amqp.Publishing{
Body: msg.Body,
Headers: amqp.Table{},
}
for k, v := range header {
msg.Headers[k] = v
for k, v := range msg.Header {
m.Headers[k] = v
}
return r.conn.Publish("", topic, msg)
return r.conn.Publish("", topic, m)
}
func (r *rbroker) Subscribe(topic string, function func(context.Context, *broker.Message)) (broker.Subscriber, error) {
func (r *rbroker) Subscribe(topic string, handler broker.Handler) (broker.Subscriber, error) {
ch, sub, err := r.conn.Consume(topic)
if err != nil {
return nil, err
@ -56,11 +47,8 @@ func (r *rbroker) Subscribe(topic string, function func(context.Context, *broker
for k, v := range msg.Headers {
header[k], _ = v.(string)
}
ctx := c.WithMetadata(context.Background(), header)
function(ctx, &broker.Message{
Id: msg.MessageId,
Timestamp: msg.Timestamp.Unix(),
Topic: topic,
handler(&broker.Message{
Header: header,
Body: msg.Body,
})
}

View File

@ -1,19 +1,32 @@
package client
import (
"github.com/myodc/go-micro/registry"
"github.com/myodc/go-micro/transport"
"golang.org/x/net/context"
)
type Client interface {
NewRequest(string, string, interface{}) Request
NewProtoRequest(string, string, interface{}) Request
NewJsonRequest(string, string, interface{}) Request
Call(context.Context, Request, interface{}) error
CallRemote(context.Context, string, Request, interface{}) error
Stream(context.Context, Request, interface{}) (Streamer, error)
StreamRemote(context.Context, string, Request, interface{}) (Streamer, error)
NewPublication(topic string, msg interface{}) Publication
NewRequest(service, method string, req interface{}) Request
NewProtoRequest(service, method string, req interface{}) Request
NewJsonRequest(service, method string, req interface{}) Request
Call(ctx context.Context, req Request, rsp interface{}) error
CallRemote(ctx context.Context, addr string, req Request, rsp interface{}) error
Stream(ctx context.Context, req Request, rspChan interface{}) (Streamer, error)
StreamRemote(ctx context.Context, addr string, req Request, rspChan interface{}) (Streamer, error)
Publish(ctx context.Context, p Publication) error
}
type Publication interface {
Topic() string
Message() interface{}
ContentType() string
}
type Request interface {
Service() string
Method() string
ContentType() string
Request() interface{}
}
type Streamer interface {
@ -22,29 +35,12 @@ type Streamer interface {
Close() error
}
type options struct {
registry registry.Registry
transport transport.Transport
}
type Option func(*options)
var (
DefaultClient Client = newRpcClient()
)
func Registry(r registry.Registry) Option {
return func(o *options) {
o.registry = r
}
}
func Transport(t transport.Transport) Option {
return func(o *options) {
o.transport = t
}
}
func Call(ctx context.Context, request Request, response interface{}) error {
return DefaultClient.Call(ctx, request, response)
}
@ -61,10 +57,18 @@ func StreamRemote(ctx context.Context, address string, request Request, response
return DefaultClient.StreamRemote(ctx, address, request, responseChan)
}
func Publish(ctx context.Context, p Publication) error {
return DefaultClient.Publish(ctx, p)
}
func NewClient(opt ...Option) Client {
return newRpcClient(opt...)
}
func NewPublication(topic string, message interface{}) Publication {
return DefaultClient.NewPublication(topic, message)
}
func NewRequest(service, method string, request interface{}) Request {
return DefaultClient.NewRequest(service, method, request)
}

31
client/options.go Normal file
View File

@ -0,0 +1,31 @@
package client
import (
"github.com/myodc/go-micro/broker"
"github.com/myodc/go-micro/registry"
"github.com/myodc/go-micro/transport"
)
type options struct {
broker broker.Broker
registry registry.Registry
transport transport.Transport
}
func Broker(b broker.Broker) Option {
return func(o *options) {
o.broker = b
}
}
func Registry(r registry.Registry) Option {
return func(o *options) {
o.registry = r
}
}
func Transport(t transport.Transport) Option {
return func(o *options) {
o.transport = t
}
}

View File

@ -1,8 +0,0 @@
package client
type Request interface {
Service() string
Method() string
ContentType() string
Request() interface{}
}

View File

@ -1,11 +1,14 @@
package client
import (
"encoding/json"
"fmt"
"math/rand"
"net/http"
"sync"
"time"
"github.com/myodc/go-micro/broker"
c "github.com/myodc/go-micro/context"
"github.com/myodc/go-micro/errors"
"github.com/myodc/go-micro/registry"
@ -13,6 +16,7 @@ import (
rpc "github.com/youtube/vitess/go/rpcplus"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
)
@ -21,6 +25,7 @@ type headerRoundTripper struct {
}
type rpcClient struct {
once sync.Once
opts options
}
@ -39,7 +44,14 @@ func newRpcClient(opt ...Option) Client {
opts.transport = transport.DefaultTransport
}
if opts.broker == nil {
opts.broker = broker.DefaultBroker
}
var once sync.Once
return &rpcClient{
once: once,
opts: opts,
}
}
@ -152,6 +164,48 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in
return r.stream(ctx, address, request, responseChan)
}
func (r *rpcClient) Publish(ctx context.Context, p Publication) error {
md, ok := c.GetMetadata(ctx)
if !ok {
md = make(map[string]string)
}
md["Content-Type"] = p.ContentType()
// encode message body
var body []byte
switch p.ContentType() {
case "application/octet-stream":
b, err := proto.Marshal(p.Message().(proto.Message))
if err != nil {
return err
}
body = b
case "application/json":
b, err := json.Marshal(p.Message())
if err != nil {
return err
}
body = b
}
r.once.Do(func() {
r.opts.broker.Connect()
})
return r.opts.broker.Publish(p.Topic(), &broker.Message{
Header: md,
Body: body,
})
}
func (r *rpcClient) NewPublication(topic string, message interface{}) Publication {
return r.NewProtoPublication(topic, message)
}
func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication {
return newRpcPublication(topic, message, "application/octet-stream")
}
func (r *rpcClient) NewRequest(service, method string, request interface{}) Request {
return r.NewProtoRequest(service, method, request)
}

27
client/rpc_publication.go Normal file
View File

@ -0,0 +1,27 @@
package client
type rpcPublication struct {
topic string
contentType string
message interface{}
}
func newRpcPublication(topic string, message interface{}, contentType string) Publication {
return &rpcPublication{
message: message,
topic: topic,
contentType: contentType,
}
}
func (r *rpcPublication) ContentType() string {
return r.contentType
}
func (r *rpcPublication) Topic() string {
return r.topic
}
func (r *rpcPublication) Message() interface{} {
return r.message
}

View File

@ -10,6 +10,26 @@ import (
"golang.org/x/net/context"
)
func pub() {
msg := client.NewPublication("topic.go.micro.srv.example", &example.Message{
Say: "This is a publication",
})
// create context with metadata
ctx := c.WithMetadata(context.Background(), map[string]string{
"X-User-Id": "john",
"X-From-Id": "script",
})
// publish message
if err := client.Publish(ctx, msg); err != nil {
fmt.Println("pub err: ", err)
return
}
fmt.Printf("Published: %v\n", msg)
}
func call(i int) {
// Create new request to service go.micro.srv.example, method Example.Call
req := client.NewRequest("go.micro.srv.example", "Example.Call", &example.Request{
@ -26,7 +46,7 @@ func call(i int) {
// Call service
if err := client.Call(ctx, req, rsp); err != nil {
fmt.Println("err: ", err, rsp)
fmt.Println("call err: ", err, rsp)
return
}
@ -52,7 +72,7 @@ func stream() {
}
if stream.Error() != nil {
fmt.Println("err:", err)
fmt.Println("stream err:", err)
return
}
@ -67,4 +87,5 @@ func main() {
}
stream()
pub()
}

View File

@ -7,9 +7,6 @@ import (
log "github.com/golang/glog"
"github.com/myodc/go-micro/broker"
"github.com/myodc/go-micro/cmd"
c "github.com/myodc/go-micro/context"
"golang.org/x/net/context"
)
var (
@ -20,24 +17,24 @@ func pub() {
tick := time.NewTicker(time.Second)
i := 0
for _ = range tick.C {
ctx := c.WithMetadata(context.Background(), map[string]string{
msg := &broker.Message{
Header: map[string]string{
"id": fmt.Sprintf("%d", i),
})
msg := fmt.Sprintf("%d: %s", i, time.Now().String())
if err := broker.Publish(ctx, topic, []byte(msg)); err != nil {
},
Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
}
if err := broker.Publish(topic, msg); err != nil {
log.Errorf("[pub] failed: %v", err)
} else {
fmt.Println("[pub] pubbed message:", msg)
fmt.Println("[pub] pubbed message:", string(msg.Body))
}
i++
}
}
func sub() {
_, err := broker.Subscribe(topic, func(ctx context.Context, msg *broker.Message) {
md, _ := c.GetMetadata(ctx)
fmt.Println("[sub] received message:", string(msg.Body), "context", md)
_, err := broker.Subscribe(topic, func(msg *broker.Message) {
fmt.Println("[sub] received message:", string(msg.Body), "header", msg.Header)
})
if err != nil {
fmt.Println(err)

View File

@ -4,6 +4,7 @@ import (
log "github.com/golang/glog"
"github.com/myodc/go-micro/cmd"
"github.com/myodc/go-micro/examples/server/handler"
"github.com/myodc/go-micro/examples/server/subscriber"
"github.com/myodc/go-micro/server"
)
@ -23,6 +24,21 @@ func main() {
),
)
// Register Subscribers
server.Subscribe(
server.NewSubscriber(
"topic.go.micro.srv.example",
new(subscriber.Example),
),
)
server.Subscribe(
server.NewSubscriber(
"topic.go.micro.srv.example",
subscriber.Handler,
),
)
// Run server
if err := server.Run(); err != nil {
log.Fatal(err)

View File

@ -9,6 +9,7 @@ It is generated from these files:
go-micro/examples/server/proto/example/example.proto
It has these top-level messages:
Message
Request
Response
StreamingRequest
@ -21,6 +22,14 @@ import proto "github.com/golang/protobuf/proto"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
type Message struct {
Say string `protobuf:"bytes,1,opt,name=say" json:"say,omitempty"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
type Request struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
}

View File

@ -1,5 +1,9 @@
syntax = "proto3";
message Message {
string say = 1;
}
message Request {
string name = 1;
}

View File

@ -0,0 +1,18 @@
package subscriber
import (
log "github.com/golang/glog"
example "github.com/myodc/go-micro/examples/server/proto/example"
"golang.org/x/net/context"
)
type Example struct{}
func (e *Example) Handle(ctx context.Context, msg *example.Message) error {
log.Info("Handler Received message: ", msg.Say)
return nil
}
func Handler(msg *example.Message) {
log.Info("Function Received message: ", msg.Say)
}

View File

@ -19,6 +19,7 @@ type Endpoint struct {
Name string
Request *Value
Response *Value
Metadata map[string]string
}
type Value struct {

View File

@ -1,6 +1,7 @@
package server
import (
"fmt"
"reflect"
"github.com/myodc/go-micro/registry"
@ -37,7 +38,7 @@ func extractEndpoint(method reflect.Method) *registry.Endpoint {
}
var rspType, reqType reflect.Type
// var stream bool
var stream bool
mt := method.Type
switch mt.NumIn() {
@ -51,9 +52,9 @@ func extractEndpoint(method reflect.Method) *registry.Endpoint {
return nil
}
// if rspType.Kind() == reflect.Func {
// stream = true
// }
if rspType.Kind() == reflect.Func {
stream = true
}
request := extractValue(reqType)
response := extractValue(rspType)
@ -62,5 +63,21 @@ func extractEndpoint(method reflect.Method) *registry.Endpoint {
Name: method.Name,
Request: request,
Response: response,
Metadata: map[string]string{
"stream": fmt.Sprintf("%v", stream),
},
}
}
func extractSubValue(typ reflect.Type) *registry.Value {
var reqType reflect.Type
switch typ.NumIn() {
case 1:
reqType = typ.In(0)
case 2:
reqType = typ.In(1)
default:
return nil
}
return extractValue(reqType)
}

View File

@ -9,3 +9,9 @@ type Handler interface {
Handler() interface{}
Endpoints() []*registry.Endpoint
}
type Subscriber interface {
Topic() string
Subscriber() interface{}
Endpoints() []*registry.Endpoint
}

View File

@ -1,11 +1,13 @@
package server
import (
"github.com/myodc/go-micro/broker"
"github.com/myodc/go-micro/registry"
"github.com/myodc/go-micro/transport"
)
type options struct {
broker broker.Broker
registry registry.Registry
transport transport.Transport
metadata map[string]string
@ -22,6 +24,10 @@ func newOptions(opt ...Option) options {
o(&opts)
}
if opts.broker == nil {
opts.broker = broker.DefaultBroker
}
if opts.registry == nil {
opts.registry = registry.DefaultRegistry
}
@ -93,6 +99,12 @@ func Address(a string) Option {
}
}
func Broker(b broker.Broker) Option {
return func(o *options) {
o.broker = b
}
}
func Registry(r registry.Registry) Option {
return func(o *options) {
o.registry = r

View File

@ -1,10 +1,12 @@
package server
import (
"fmt"
"strconv"
"strings"
"sync"
"github.com/myodc/go-micro/broker"
c "github.com/myodc/go-micro/context"
"github.com/myodc/go-micro/registry"
"github.com/myodc/go-micro/transport"
@ -22,6 +24,7 @@ type rpcServer struct {
sync.RWMutex
opts options
handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber
}
func newRpcServer(opts ...Option) Server {
@ -29,6 +32,7 @@ func newRpcServer(opts ...Option) Server {
opts: newOptions(opts...),
rpc: rpc.NewServer(),
handlers: make(map[string]Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error),
}
}
@ -84,6 +88,29 @@ func (s *rpcServer) Handle(h Handler) error {
return nil
}
func (s *rpcServer) NewSubscriber(topic string, sb interface{}) Subscriber {
return newSubscriber(topic, sb)
}
func (s *rpcServer) Subscribe(sb Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
s.Lock()
_, ok = s.subscribers[sub]
if ok {
return fmt.Errorf("subscriber %v already exists", s)
}
s.subscribers[sub] = nil
s.Unlock()
return nil
}
func (s *rpcServer) Register() error {
// parse address for host, port
config := s.Config()
@ -110,6 +137,9 @@ func (s *rpcServer) Register() error {
for _, e := range s.handlers {
endpoints = append(endpoints, e.Endpoints()...)
}
for e, _ := range s.subscribers {
endpoints = append(endpoints, e.Endpoints()...)
}
s.RUnlock()
service := &registry.Service{
@ -120,7 +150,23 @@ func (s *rpcServer) Register() error {
}
log.Infof("Registering node: %s", node.Id)
return config.registry.Register(service)
if err := config.registry.Register(service); err != nil {
return err
}
s.Lock()
defer s.Unlock()
for sb, _ := range s.subscribers {
handler := createSubHandler(sb)
sub, err := config.broker.Subscribe(sb.Topic(), handler)
if err != nil {
return err
}
s.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}
func (s *rpcServer) Deregister() error {
@ -147,7 +193,21 @@ func (s *rpcServer) Deregister() error {
Nodes: []*registry.Node{node},
}
return config.registry.Deregister(service)
log.Infof("Deregistering node: %s", node.Id)
if err := config.registry.Deregister(service); err != nil {
return err
}
s.Lock()
for sb, subs := range s.subscribers {
for _, sub := range subs {
log.Infof("Unsubscribing from topic: %s", sub.Topic())
sub.Unsubscribe()
}
s.subscribers[sb] = nil
}
s.Unlock()
return nil
}
func (s *rpcServer) Start() error {
@ -169,9 +229,11 @@ func (s *rpcServer) Start() error {
go func() {
ch := <-s.exit
ch <- ts.Close()
config.broker.Disconnect()
}()
return nil
// TODO: subscribe to cruft
return config.broker.Connect()
}
func (s *rpcServer) Stop() error {

View File

@ -14,6 +14,8 @@ type Server interface {
Init(...Option)
Handle(Handler) error
NewHandler(interface{}) Handler
NewSubscriber(string, interface{}) Subscriber
Subscribe(Subscriber) error
Register() error
Deregister() error
Start() error
@ -45,6 +47,10 @@ func NewServer(opt ...Option) Server {
return newRpcServer(opt...)
}
func NewSubscriber(topic string, h interface{}) Subscriber {
return DefaultServer.NewSubscriber(topic, h)
}
func NewHandler(h interface{}) Handler {
return DefaultServer.NewHandler(h)
}
@ -53,6 +59,10 @@ func Handle(h Handler) error {
return DefaultServer.Handle(h)
}
func Subscribe(s Subscriber) error {
return DefaultServer.Subscribe(s)
}
func Register() error {
return DefaultServer.Register()
}
@ -78,9 +88,6 @@ func Run() error {
return err
}
log.Infof("Deregistering %s", DefaultServer.Config().Id())
DefaultServer.Deregister()
return Stop()
}

154
server/subscriber.go Normal file
View File

@ -0,0 +1,154 @@
package server
import (
"encoding/json"
"reflect"
"github.com/golang/protobuf/proto"
"github.com/myodc/go-micro/broker"
c "github.com/myodc/go-micro/context"
"github.com/myodc/go-micro/registry"
"golang.org/x/net/context"
)
type handler struct {
method reflect.Value
reqType reflect.Type
ctxType reflect.Type
}
type subscriber struct {
topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*registry.Endpoint
}
func newSubscriber(topic string, sub interface{}) Subscriber {
var endpoints []*registry.Endpoint
var handlers []*handler
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
h := &handler{
method: reflect.ValueOf(sub),
}
switch typ.NumIn() {
case 1:
h.reqType = typ.In(0)
case 2:
h.ctxType = typ.In(0)
h.reqType = typ.In(1)
}
handlers = append(handlers, h)
endpoints = append(endpoints, &registry.Endpoint{
Name: "Func",
Request: extractSubValue(typ),
Metadata: map[string]string{
"topic": topic,
},
})
} else {
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
method: method.Func,
}
switch method.Type.NumIn() {
case 2:
h.reqType = method.Type.In(1)
case 3:
h.ctxType = method.Type.In(1)
h.reqType = method.Type.In(2)
}
handlers = append(handlers, h)
endpoints = append(endpoints, &registry.Endpoint{
Name: method.Name,
Request: extractSubValue(method.Type),
Metadata: map[string]string{
"topic": topic,
},
})
}
}
return &subscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
subscriber: sub,
handlers: handlers,
endpoints: endpoints,
}
}
func createSubHandler(sb *subscriber) broker.Handler {
return func(msg *broker.Message) {
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
delete(hdr, "Content-Type")
ctx := c.WithMetadata(context.Background(), hdr)
rctx := reflect.ValueOf(ctx)
for _, handler := range sb.handlers {
var isVal bool
var req reflect.Value
var uerr error
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
switch msg.Header["Content-Type"] {
case "application/octet-stream":
uerr = proto.Unmarshal(msg.Body, req.Interface().(proto.Message))
case "application/json":
uerr = json.Unmarshal(msg.Body, req.Interface())
}
if uerr != nil {
continue
}
if isVal {
req = req.Elem()
}
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, rctx)
}
vals = append(vals, req)
go handler.method.Call(vals)
}
}
}
func (s *subscriber) Topic() string {
return s.topic
}
func (s *subscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *subscriber) Endpoints() []*registry.Endpoint {
return s.endpoints
}

View File

@ -137,7 +137,6 @@ func (h *httpTransportSocket) Send(m *Message) error {
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(m.Body)),
// Request: h.r,
}
for k, v := range m.Header {

View File

@ -1,7 +1,6 @@
package transport
type Message struct {
Id string
Header map[string]string
Body []byte
}