From 3c3b81d9cd9380c6ea1706a08da1ae7b45e494cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AD=A6=E6=96=B0=E9=A3=9E?= Date: Wed, 19 Dec 2018 15:33:23 +0800 Subject: [PATCH] http server supports subscriber --- buffer.go | 14 +++ extractor.go | 111 ++++++++++++++++++- http.go | 94 +++++++++++++--- message.go | 19 ++++ subscriber.go | 288 +++++++++++++++++++++++++++++++++++++++++++++++--- 5 files changed, 499 insertions(+), 27 deletions(-) create mode 100644 buffer.go create mode 100644 message.go diff --git a/buffer.go b/buffer.go new file mode 100644 index 0000000..b38c9d8 --- /dev/null +++ b/buffer.go @@ -0,0 +1,14 @@ +package http + +import ( + "bytes" +) + +type buffer struct { + *bytes.Buffer +} + +func (b *buffer) Close() error { + b.Buffer.Reset() + return nil +} diff --git a/extractor.go b/extractor.go index f4d8ebf..0b75741 100644 --- a/extractor.go +++ b/extractor.go @@ -1,12 +1,13 @@ package http import ( + "fmt" + "reflect" "strconv" "strings" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/server" - "github.com/micro/util/go/lib/addr" ) @@ -48,3 +49,111 @@ func serviceDef(opts server.Options) *registry.Service { Nodes: []*registry.Node{node}, } } + +func extractValue(v reflect.Type, d int) *registry.Value { + if d == 3 { + return nil + } + if v == nil { + return nil + } + + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + + arg := ®istry.Value{ + Name: v.Name(), + Type: v.Name(), + } + + switch v.Kind() { + case reflect.Struct: + for i := 0; i < v.NumField(); i++ { + f := v.Field(i) + val := extractValue(f.Type, d+1) + if val == nil { + continue + } + + // if we can find a json tag use it + if tags := f.Tag.Get("json"); len(tags) > 0 { + parts := strings.Split(tags, ",") + val.Name = parts[0] + } + + // if there's no name default it + if len(val.Name) == 0 { + val.Name = v.Field(i).Name + } + + arg.Values = append(arg.Values, val) + } + case reflect.Slice: + p := v.Elem() + if p.Kind() == reflect.Ptr { + p = p.Elem() + } + arg.Type = "[]" + p.Name() + val := extractValue(v.Elem(), d+1) + if val != nil { + arg.Values = append(arg.Values, val) + } + } + + return arg +} + +func extractEndpoint(method reflect.Method) *registry.Endpoint { + if method.PkgPath != "" { + return nil + } + + var rspType, reqType reflect.Type + var stream bool + mt := method.Type + + switch mt.NumIn() { + case 3: + reqType = mt.In(1) + rspType = mt.In(2) + case 4: + reqType = mt.In(2) + rspType = mt.In(3) + default: + return nil + } + + // are we dealing with a stream? + switch rspType.Kind() { + case reflect.Func, reflect.Interface: + stream = true + } + + request := extractValue(reqType, 0) + response := extractValue(rspType, 0) + + return ®istry.Endpoint{ + Name: method.Name, + Request: request, + Response: response, + Metadata: map[string]string{ + "stream": fmt.Sprintf("%v", stream), + }, + } +} + +func extractSubValue(typ reflect.Type) *registry.Value { + var reqType reflect.Type + switch typ.NumIn() { + case 1: + reqType = typ.In(0) + case 2: + reqType = typ.In(1) + case 3: + reqType = typ.In(2) + default: + return nil + } + return extractValue(reqType, 0) +} diff --git a/http.go b/http.go index db8070c..57b5b19 100644 --- a/http.go +++ b/http.go @@ -3,28 +3,54 @@ package http import ( "errors" + "fmt" "net" "net/http" + "sort" "sync" "github.com/micro/go-log" - "github.com/micro/go-micro/cmd" - "github.com/micro/go-micro/registry" + "github.com/micro/go-micro/broker" "github.com/micro/go-micro/server" + "github.com/micro/go-micro/cmd" + "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/registry" + "github.com/micro/go-plugins/codec/jsonrpc" + "github.com/micro/go-plugins/codec/protorpc" ) +var ( + defaultCodecs = map[string]codec.NewCodec{ + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, + "application/protobuf": protorpc.NewCodec, + "application/proto-rpc": protorpc.NewCodec, + "application/octet-stream": protorpc.NewCodec, + } +) type httpServer struct { sync.Mutex opts server.Options hd server.Handler exit chan chan error registerOnce sync.Once + subscribers map[*subscriber][]broker.Subscriber } func init() { cmd.DefaultServers["http"] = NewServer } +func (h *httpServer) newCodec(contentType string) (codec.NewCodec, error) { + if cf, ok := h.opts.Codecs[contentType]; ok { + return cf, nil + } + if cf, ok := defaultCodecs[contentType]; ok { + return cf, nil + } + return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType) +} + func (h *httpServer) Options() server.Options { h.Lock() opts := h.opts @@ -79,20 +105,30 @@ func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOptio } func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber { - var options server.SubscriberOptions - for _, o := range opts { - o(&options) - } - - return &httpSubscriber{ - opts: options, - topic: topic, - hd: handler, - } + return newSubscriber(topic, handler, opts...) } -func (h *httpServer) Subscribe(s server.Subscriber) error { - return errors.New("subscribe is not supported") +func (h *httpServer) Subscribe(sb server.Subscriber) error { + sub, ok := sb.(*subscriber) + if !ok { + return fmt.Errorf("invalid subscriber: expected *subscriber") + } + if len(sub.handlers) == 0 { + return fmt.Errorf("invalid subscriber: no handler functions") + } + + if err := validateSubscriber(sb); err != nil { + return err + } + + h.Lock() + defer h.Unlock() + _, ok = h.subscribers[sub] + if ok { + return fmt.Errorf("subscriber %v already exists", h) + } + h.subscribers[sub] = nil + return nil } func (h *httpServer) Register() error { @@ -104,12 +140,42 @@ func (h *httpServer) Register() error { service := serviceDef(opts) service.Endpoints = eps + h.Lock() + var subscriberList []*subscriber + for e := range h.subscribers { + // Only advertise non internal subscribers + if !e.Options().Internal { + subscriberList = append(subscriberList, e) + } + } + sort.Slice(subscriberList, func(i, j int) bool { + return subscriberList[i].topic > subscriberList[j].topic + }) + for _, e := range subscriberList { + service.Endpoints = append(service.Endpoints, e.Endpoints()...) + } + h.Unlock() + rOpts := []registry.RegisterOption{ registry.RegisterTTL(opts.RegisterTTL), } h.registerOnce.Do(func() { log.Logf("Registering node: %s", opts.Name+"-"+opts.Id) + + for sb, _ := range h.subscribers { + handler := h.createSubHandler(sb, opts) + var subOpts []broker.SubscribeOption + if queue := sb.Options().Queue; len(queue) > 0 { + subOpts = append(subOpts, broker.Queue(queue)) + } + sub, err := opts.Broker.Subscribe(sb.Topic(), handler, subOpts...) + if err != nil { + log.Logf("Registering subscriber: %s, err: %s", sb.Topic, err) + return + } + h.subscribers[sb] = []broker.Subscriber{sub} + } }) return opts.Registry.Register(service, rOpts...) diff --git a/message.go b/message.go new file mode 100644 index 0000000..d916a5d --- /dev/null +++ b/message.go @@ -0,0 +1,19 @@ +package http + +type rpcMessage struct { + topic string + contentType string + payload interface{} +} + +func (r *rpcMessage) ContentType() string { + return r.contentType +} + +func (r *rpcMessage) Topic() string { + return r.topic +} + +func (r *rpcMessage) Payload() interface{} { + return r.payload +} diff --git a/subscriber.go b/subscriber.go index 300084b..622471f 100644 --- a/subscriber.go +++ b/subscriber.go @@ -1,28 +1,292 @@ package http import ( + "bytes" + "context" + "fmt" + "reflect" + "strings" + "unicode" + "unicode/utf8" + + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/codec" + "github.com/micro/go-micro/metadata" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/server" ) -type httpSubscriber struct { - opts server.SubscriberOptions - topic string - hd interface{} +const ( + subSig = "func(context.Context, interface{}) error" +) + +var typeOfError = reflect.TypeOf((*error)(nil)).Elem() + +type handler struct { + method reflect.Value + reqType reflect.Type + ctxType reflect.Type } -func (h *httpSubscriber) Topic() string { - return h.topic +type subscriber struct { + topic string + rcvr reflect.Value + typ reflect.Type + subscriber interface{} + handlers []*handler + endpoints []*registry.Endpoint + opts server.SubscriberOptions } -func (h *httpSubscriber) Subscriber() interface{} { - return h.hd +// Is this an exported - upper case - name? +func isExported(name string) bool { + rune, _ := utf8.DecodeRuneInString(name) + return unicode.IsUpper(rune) } -func (h *httpSubscriber) Endpoints() []*registry.Endpoint { - return []*registry.Endpoint{} +// Is this type exported or a builtin? +func isExportedOrBuiltinType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" } -func (h *httpSubscriber) Options() server.SubscriberOptions { - return h.opts +func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber { + var options server.SubscriberOptions + for _, o := range opts { + o(&options) + } + + var endpoints []*registry.Endpoint + var handlers []*handler + + if typ := reflect.TypeOf(sub); typ.Kind() == reflect.Func { + h := &handler{ + method: reflect.ValueOf(sub), + } + + switch typ.NumIn() { + case 1: + h.reqType = typ.In(0) + case 2: + h.ctxType = typ.In(0) + h.reqType = typ.In(1) + } + + handlers = append(handlers, h) + + endpoints = append(endpoints, ®istry.Endpoint{ + Name: "Func", + Request: extractSubValue(typ), + Metadata: map[string]string{ + "topic": topic, + "subscriber": "true", + }, + }) + } else { + hdlr := reflect.ValueOf(sub) + name := reflect.Indirect(hdlr).Type().Name() + + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + h := &handler{ + method: method.Func, + } + + switch method.Type.NumIn() { + case 2: + h.reqType = method.Type.In(1) + case 3: + h.ctxType = method.Type.In(1) + h.reqType = method.Type.In(2) + } + + handlers = append(handlers, h) + + endpoints = append(endpoints, ®istry.Endpoint{ + Name: name + "." + method.Name, + Request: extractSubValue(method.Type), + Metadata: map[string]string{ + "topic": topic, + "subscriber": "true", + }, + }) + } + } + + return &subscriber{ + rcvr: reflect.ValueOf(sub), + typ: reflect.TypeOf(sub), + topic: topic, + subscriber: sub, + handlers: handlers, + endpoints: endpoints, + opts: options, + } +} + +func validateSubscriber(sub server.Subscriber) error { + typ := reflect.TypeOf(sub.Subscriber()) + var argType reflect.Type + + if typ.Kind() == reflect.Func { + name := "Func" + switch typ.NumIn() { + case 2: + argType = typ.In(1) + default: + return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig) + } + if !isExportedOrBuiltinType(argType) { + return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType) + } + if typ.NumOut() != 1 { + return fmt.Errorf("subscriber %v has wrong number of outs: %v require signature %s", + name, typ.NumOut(), subSig) + } + if returnType := typ.Out(0); returnType != typeOfError { + return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String()) + } + } else { + hdlr := reflect.ValueOf(sub.Subscriber()) + name := reflect.Indirect(hdlr).Type().Name() + + for m := 0; m < typ.NumMethod(); m++ { + method := typ.Method(m) + + switch method.Type.NumIn() { + case 3: + argType = method.Type.In(2) + default: + return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s", + name, method.Name, method.Type.NumIn(), subSig) + } + + if !isExportedOrBuiltinType(argType) { + return fmt.Errorf("%v argument type not exported: %v", name, argType) + } + if method.Type.NumOut() != 1 { + return fmt.Errorf( + "subscriber %v.%v has wrong number of outs: %v require signature %s", + name, method.Name, method.Type.NumOut(), subSig) + } + if returnType := method.Type.Out(0); returnType != typeOfError { + return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String()) + } + } + } + + return nil +} + +func (s *httpServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler { + return func(p broker.Publication) error { + msg := p.Message() + ct := msg.Header["Content-Type"] + cf, err := s.newCodec(ct) + if err != nil { + return err + } + + hdr := make(map[string]string) + for k, v := range msg.Header { + hdr[k] = v + } + delete(hdr, "Content-Type") + ctx := metadata.NewContext(context.Background(), hdr) + + results := make(chan error, len(sb.handlers)) + + for i := 0; i < len(sb.handlers); i++ { + handler := sb.handlers[i] + + var isVal bool + var req reflect.Value + + if handler.reqType.Kind() == reflect.Ptr { + req = reflect.New(handler.reqType.Elem()) + } else { + req = reflect.New(handler.reqType) + isVal = true + } + if isVal { + req = req.Elem() + } + + b := &buffer{bytes.NewBuffer(msg.Body)} + co := cf(b) + defer co.Close() + + if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil { + return err + } + + if err := co.ReadBody(req.Interface()); err != nil { + return err + } + + fn := func(ctx context.Context, msg server.Message) error { + var vals []reflect.Value + if sb.typ.Kind() != reflect.Func { + vals = append(vals, sb.rcvr) + } + if handler.ctxType != nil { + vals = append(vals, reflect.ValueOf(ctx)) + } + + vals = append(vals, reflect.ValueOf(msg.Payload())) + + returnValues := handler.method.Call(vals) + if err := returnValues[0].Interface(); err != nil { + return err.(error) + } + return nil + } + + for i := len(opts.SubWrappers); i > 0; i-- { + fn = opts.SubWrappers[i-1](fn) + } + + go func() { + results <- fn(ctx, &rpcMessage{ + topic: sb.topic, + contentType: ct, + payload: req.Interface(), + }) + }() + } + + var errors []string + + for i := 0; i < len(sb.handlers); i++ { + if err := <-results; err != nil { + errors = append(errors, err.Error()) + } + } + + if len(errors) > 0 { + return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n")) + } + + return nil + } +} + +func (s *subscriber) Topic() string { + return s.topic +} + +func (s *subscriber) Subscriber() interface{} { + return s.subscriber +} + +func (s *subscriber) Endpoints() []*registry.Endpoint { + return s.endpoints +} + +func (s *subscriber) Options() server.SubscriberOptions { + return s.opts }