http server supports subscriber

This commit is contained in:
武新飞 2018-12-19 15:33:23 +08:00 committed by Vasiliy Tolstov
parent 24f8e6b137
commit 3c3b81d9cd
5 changed files with 499 additions and 27 deletions

14
buffer.go Normal file
View File

@ -0,0 +1,14 @@
package http
import (
"bytes"
)
type buffer struct {
*bytes.Buffer
}
func (b *buffer) Close() error {
b.Buffer.Reset()
return nil
}

View File

@ -1,12 +1,13 @@
package http package http
import ( import (
"fmt"
"reflect"
"strconv" "strconv"
"strings" "strings"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
"github.com/micro/util/go/lib/addr" "github.com/micro/util/go/lib/addr"
) )
@ -48,3 +49,111 @@ func serviceDef(opts server.Options) *registry.Service {
Nodes: []*registry.Node{node}, Nodes: []*registry.Node{node},
} }
} }
func extractValue(v reflect.Type, d int) *registry.Value {
if d == 3 {
return nil
}
if v == nil {
return nil
}
if v.Kind() == reflect.Ptr {
v = v.Elem()
}
arg := &registry.Value{
Name: v.Name(),
Type: v.Name(),
}
switch v.Kind() {
case reflect.Struct:
for i := 0; i < v.NumField(); i++ {
f := v.Field(i)
val := extractValue(f.Type, d+1)
if val == nil {
continue
}
// if we can find a json tag use it
if tags := f.Tag.Get("json"); len(tags) > 0 {
parts := strings.Split(tags, ",")
val.Name = parts[0]
}
// if there's no name default it
if len(val.Name) == 0 {
val.Name = v.Field(i).Name
}
arg.Values = append(arg.Values, val)
}
case reflect.Slice:
p := v.Elem()
if p.Kind() == reflect.Ptr {
p = p.Elem()
}
arg.Type = "[]" + p.Name()
val := extractValue(v.Elem(), d+1)
if val != nil {
arg.Values = append(arg.Values, val)
}
}
return arg
}
func extractEndpoint(method reflect.Method) *registry.Endpoint {
if method.PkgPath != "" {
return nil
}
var rspType, reqType reflect.Type
var stream bool
mt := method.Type
switch mt.NumIn() {
case 3:
reqType = mt.In(1)
rspType = mt.In(2)
case 4:
reqType = mt.In(2)
rspType = mt.In(3)
default:
return nil
}
// are we dealing with a stream?
switch rspType.Kind() {
case reflect.Func, reflect.Interface:
stream = true
}
request := extractValue(reqType, 0)
response := extractValue(rspType, 0)
return &registry.Endpoint{
Name: method.Name,
Request: request,
Response: response,
Metadata: map[string]string{
"stream": fmt.Sprintf("%v", stream),
},
}
}
func extractSubValue(typ reflect.Type) *registry.Value {
var reqType reflect.Type
switch typ.NumIn() {
case 1:
reqType = typ.In(0)
case 2:
reqType = typ.In(1)
case 3:
reqType = typ.In(2)
default:
return nil
}
return extractValue(reqType, 0)
}

94
http.go
View File

@ -3,28 +3,54 @@ package http
import ( import (
"errors" "errors"
"fmt"
"net" "net"
"net/http" "net/http"
"sort"
"sync" "sync"
"github.com/micro/go-log" "github.com/micro/go-log"
"github.com/micro/go-micro/cmd" "github.com/micro/go-micro/broker"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
"github.com/micro/go-micro/cmd"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/registry"
"github.com/micro/go-plugins/codec/jsonrpc"
"github.com/micro/go-plugins/codec/protorpc"
) )
var (
defaultCodecs = map[string]codec.NewCodec{
"application/json": jsonrpc.NewCodec,
"application/json-rpc": jsonrpc.NewCodec,
"application/protobuf": protorpc.NewCodec,
"application/proto-rpc": protorpc.NewCodec,
"application/octet-stream": protorpc.NewCodec,
}
)
type httpServer struct { type httpServer struct {
sync.Mutex sync.Mutex
opts server.Options opts server.Options
hd server.Handler hd server.Handler
exit chan chan error exit chan chan error
registerOnce sync.Once registerOnce sync.Once
subscribers map[*subscriber][]broker.Subscriber
} }
func init() { func init() {
cmd.DefaultServers["http"] = NewServer cmd.DefaultServers["http"] = NewServer
} }
func (h *httpServer) newCodec(contentType string) (codec.NewCodec, error) {
if cf, ok := h.opts.Codecs[contentType]; ok {
return cf, nil
}
if cf, ok := defaultCodecs[contentType]; ok {
return cf, nil
}
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
}
func (h *httpServer) Options() server.Options { func (h *httpServer) Options() server.Options {
h.Lock() h.Lock()
opts := h.opts opts := h.opts
@ -79,20 +105,30 @@ func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOptio
} }
func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber { func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
var options server.SubscriberOptions return newSubscriber(topic, handler, opts...)
for _, o := range opts {
o(&options)
}
return &httpSubscriber{
opts: options,
topic: topic,
hd: handler,
}
} }
func (h *httpServer) Subscribe(s server.Subscriber) error { func (h *httpServer) Subscribe(sb server.Subscriber) error {
return errors.New("subscribe is not supported") sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := validateSubscriber(sb); err != nil {
return err
}
h.Lock()
defer h.Unlock()
_, ok = h.subscribers[sub]
if ok {
return fmt.Errorf("subscriber %v already exists", h)
}
h.subscribers[sub] = nil
return nil
} }
func (h *httpServer) Register() error { func (h *httpServer) Register() error {
@ -104,12 +140,42 @@ func (h *httpServer) Register() error {
service := serviceDef(opts) service := serviceDef(opts)
service.Endpoints = eps service.Endpoints = eps
h.Lock()
var subscriberList []*subscriber
for e := range h.subscribers {
// Only advertise non internal subscribers
if !e.Options().Internal {
subscriberList = append(subscriberList, e)
}
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
for _, e := range subscriberList {
service.Endpoints = append(service.Endpoints, e.Endpoints()...)
}
h.Unlock()
rOpts := []registry.RegisterOption{ rOpts := []registry.RegisterOption{
registry.RegisterTTL(opts.RegisterTTL), registry.RegisterTTL(opts.RegisterTTL),
} }
h.registerOnce.Do(func() { h.registerOnce.Do(func() {
log.Logf("Registering node: %s", opts.Name+"-"+opts.Id) log.Logf("Registering node: %s", opts.Name+"-"+opts.Id)
for sb, _ := range h.subscribers {
handler := h.createSubHandler(sb, opts)
var subOpts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
subOpts = append(subOpts, broker.Queue(queue))
}
sub, err := opts.Broker.Subscribe(sb.Topic(), handler, subOpts...)
if err != nil {
log.Logf("Registering subscriber: %s, err: %s", sb.Topic, err)
return
}
h.subscribers[sb] = []broker.Subscriber{sub}
}
}) })
return opts.Registry.Register(service, rOpts...) return opts.Registry.Register(service, rOpts...)

19
message.go Normal file
View File

@ -0,0 +1,19 @@
package http
type rpcMessage struct {
topic string
contentType string
payload interface{}
}
func (r *rpcMessage) ContentType() string {
return r.contentType
}
func (r *rpcMessage) Topic() string {
return r.topic
}
func (r *rpcMessage) Payload() interface{} {
return r.payload
}

View File

@ -1,28 +1,292 @@
package http package http
import ( import (
"bytes"
"context"
"fmt"
"reflect"
"strings"
"unicode"
"unicode/utf8"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry"
"github.com/micro/go-micro/server" "github.com/micro/go-micro/server"
) )
type httpSubscriber struct { const (
opts server.SubscriberOptions subSig = "func(context.Context, interface{}) error"
topic string )
hd interface{}
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
type handler struct {
method reflect.Value
reqType reflect.Type
ctxType reflect.Type
} }
func (h *httpSubscriber) Topic() string { type subscriber struct {
return h.topic topic string
rcvr reflect.Value
typ reflect.Type
subscriber interface{}
handlers []*handler
endpoints []*registry.Endpoint
opts server.SubscriberOptions
} }
func (h *httpSubscriber) Subscriber() interface{} { // Is this an exported - upper case - name?
return h.hd func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
} }
func (h *httpSubscriber) Endpoints() []*registry.Endpoint { // Is this type exported or a builtin?
return []*registry.Endpoint{} func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
} }
func (h *httpSubscriber) Options() server.SubscriberOptions { func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
return h.opts var options server.SubscriberOptions
for _, o := range opts {
o(&options)
}
var endpoints []*registry.Endpoint
var handlers []*handler
if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func {
h := &handler{
method: reflect.ValueOf(sub),
}
switch typ.NumIn() {
case 1:
h.reqType = typ.In(0)
case 2:
h.ctxType = typ.In(0)
h.reqType = typ.In(1)
}
handlers = append(handlers, h)
endpoints = append(endpoints, &registry.Endpoint{
Name: "Func",
Request: extractSubValue(typ),
Metadata: map[string]string{
"topic": topic,
"subscriber": "true",
},
})
} else {
hdlr := reflect.ValueOf(sub)
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
h := &handler{
method: method.Func,
}
switch method.Type.NumIn() {
case 2:
h.reqType = method.Type.In(1)
case 3:
h.ctxType = method.Type.In(1)
h.reqType = method.Type.In(2)
}
handlers = append(handlers, h)
endpoints = append(endpoints, &registry.Endpoint{
Name: name + "." + method.Name,
Request: extractSubValue(method.Type),
Metadata: map[string]string{
"topic": topic,
"subscriber": "true",
},
})
}
}
return &subscriber{
rcvr: reflect.ValueOf(sub),
typ: reflect.TypeOf(sub),
topic: topic,
subscriber: sub,
handlers: handlers,
endpoints: endpoints,
opts: options,
}
}
func validateSubscriber(sub server.Subscriber) error {
typ := reflect.TypeOf(sub.Subscriber())
var argType reflect.Type
if typ.Kind() == reflect.Func {
name := "Func"
switch typ.NumIn() {
case 2:
argType = typ.In(1)
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf("subscriber %v has wrong number of outs: %v require signature %s",
name, typ.NumOut(), subSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
}
} else {
hdlr := reflect.ValueOf(sub.Subscriber())
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("%v argument type not exported: %v", name, argType)
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of outs: %v require signature %s",
name, method.Name, method.Type.NumOut(), subSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
}
}
}
return nil
}
func (s *httpServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
return func(p broker.Publication) error {
msg := p.Message()
ct := msg.Header["Content-Type"]
cf, err := s.newCodec(ct)
if err != nil {
return err
}
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
delete(hdr, "Content-Type")
ctx := metadata.NewContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
if isVal {
req = req.Elem()
}
b := &buffer{bytes.NewBuffer(msg.Body)}
co := cf(b)
defer co.Close()
if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil {
return err
}
if err := co.ReadBody(req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg server.Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Payload()))
returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil {
return err.(error)
}
return nil
}
for i := len(opts.SubWrappers); i > 0; i-- {
fn = opts.SubWrappers[i-1](fn)
}
go func() {
results <- fn(ctx, &rpcMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
})
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if err := <-results; err != nil {
errors = append(errors, err.Error())
}
}
if len(errors) > 0 {
return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return nil
}
}
func (s *subscriber) Topic() string {
return s.topic
}
func (s *subscriber) Subscriber() interface{} {
return s.subscriber
}
func (s *subscriber) Endpoints() []*registry.Endpoint {
return s.endpoints
}
func (s *subscriber) Options() server.SubscriberOptions {
return s.opts
} }