From 3d7d5ce6b4e58be092bb6c09f88c5d598f45410a Mon Sep 17 00:00:00 2001 From: Vasiliy Tolstov Date: Mon, 30 Mar 2020 11:04:59 +0300 Subject: [PATCH] api: add static router and improve path parser in rpc handler (#1437) * api: add static router and improve path parser in rpc handler Signed-off-by: Vasiliy Tolstov * expose metadata context key to be able to get unmodified map keys Signed-off-by: Vasiliy Tolstov * server/grpc: fix jsonpb codec for protobuf msg Signed-off-by: Vasiliy Tolstov * api/handler/rpc: write 204 status code when rsp is nil Signed-off-by: Vasiliy Tolstov * api/handler/rpc: add check for nil response for non javascript Signed-off-by: Vasiliy Tolstov --- api/api.go | 16 ++ api/grpc_test.go | 132 +++++++++++++ api/handler/rpc/rpc.go | 121 +++++++++--- api/router/registry/registry.go | 8 + api/router/router.go | 4 + api/router/static/static.go | 304 ++++++++++++++++++++++++++++++ api/service/proto/api.pb.go | 149 +++++++++++++++ api/service/proto/api.pb.micro.go | 102 ++++++++++ api/service/proto/api.proto | 18 ++ go.mod | 2 +- metadata/metadata.go | 12 +- server/grpc/codec.go | 6 +- 12 files changed, 839 insertions(+), 35 deletions(-) create mode 100644 api/grpc_test.go create mode 100644 api/router/static/static.go create mode 100644 api/service/proto/api.pb.go create mode 100644 api/service/proto/api.pb.micro.go create mode 100644 api/service/proto/api.proto diff --git a/api/api.go b/api/api.go index f04d67e0..1e81b184 100644 --- a/api/api.go +++ b/api/api.go @@ -9,6 +9,20 @@ import ( "github.com/micro/go-micro/v2/server" ) +type Api interface { + // Register a http handler + Register(*Endpoint) error + // Register a route + Deregister(*Endpoint) error + // Init initialises the command line. + // It also parses further options. + //Init(...Option) error + // Options + //Options() Options + // String + String() string +} + // Endpoint is a mapping between an RPC method and HTTP endpoint type Endpoint struct { // RPC Method e.g. Greeter.Hello @@ -23,6 +37,8 @@ type Endpoint struct { Method []string // HTTP Path e.g /greeter. Expect POSIX regex Path []string + // Stream flag + Stream bool } // Service represents an API service diff --git a/api/grpc_test.go b/api/grpc_test.go new file mode 100644 index 00000000..f40734c3 --- /dev/null +++ b/api/grpc_test.go @@ -0,0 +1,132 @@ +package api_test + +import ( + "context" + "fmt" + "io/ioutil" + "log" + "net/http" + "testing" + "time" + + "github.com/micro/go-micro/v2" + "github.com/micro/go-micro/v2/api" + ahandler "github.com/micro/go-micro/v2/api/handler" + apirpc "github.com/micro/go-micro/v2/api/handler/rpc" + "github.com/micro/go-micro/v2/api/router" + rstatic "github.com/micro/go-micro/v2/api/router/static" + bmemory "github.com/micro/go-micro/v2/broker/memory" + "github.com/micro/go-micro/v2/client" + gcli "github.com/micro/go-micro/v2/client/grpc" + rmemory "github.com/micro/go-micro/v2/registry/memory" + "github.com/micro/go-micro/v2/server" + gsrv "github.com/micro/go-micro/v2/server/grpc" + tgrpc "github.com/micro/go-micro/v2/transport/grpc" + + pb "github.com/micro/go-micro/v2/server/grpc/proto" +) + +// server is used to implement helloworld.GreeterServer. +type testServer struct { + msgCount int +} + +// TestHello implements helloworld.GreeterServer +func (s *testServer) Call(ctx context.Context, req *pb.Request, rsp *pb.Response) error { + rsp.Msg = "Hello " + req.Name + return nil +} + +func TestApiAndGRPC(t *testing.T) { + r := rmemory.NewRegistry() + b := bmemory.NewBroker() + tr := tgrpc.NewTransport() + s := gsrv.NewServer( + server.Broker(b), + server.Name("foo"), + server.Registry(r), + server.Transport(tr), + ) + c := gcli.NewClient( + client.Registry(r), + client.Broker(b), + client.Transport(tr), + ) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + svc := micro.NewService( + micro.Server(s), + micro.Client(c), + micro.Broker(b), + micro.Registry(r), + micro.Transport(tr), + micro.Context(ctx)) + h := &testServer{} + pb.RegisterTestHandler(s, h) + + go func() { + if err := svc.Run(); err != nil { + t.Fatalf("failed to start: %v", err) + } + }() + time.Sleep(1 * time.Second) + // check registration + services, err := r.GetService("foo") + if err != nil || len(services) == 0 { + t.Fatalf("failed to get service: %v # %d", err, len(services)) + } + + router := rstatic.NewRouter( + router.WithHandler(apirpc.Handler), + router.WithRegistry(svc.Server().Options().Registry), + ) + err = router.Register(&api.Endpoint{ + Name: "foo.Test.Call", + Method: []string{"GET"}, + Path: []string{"/api/v0/test/call/{name}"}, + Handler: "rpc", + }) + if err != nil { + t.Fatal(err) + } + + hrpc := apirpc.NewHandler( + ahandler.WithService(svc), + ahandler.WithRouter(router), + ) + + hsrv := &http.Server{ + Handler: hrpc, + Addr: "127.0.0.1:6543", + WriteTimeout: 15 * time.Second, + ReadTimeout: 15 * time.Second, + IdleTimeout: 20 * time.Second, + MaxHeaderBytes: 1024 * 1024 * 1, // 1Mb + } + + go func() { + log.Println(hsrv.ListenAndServe()) + }() + + time.Sleep(1 * time.Second) + rsp, err := http.Get(fmt.Sprintf("http://%s/api/v0/test/call/TEST", hsrv.Addr)) + if err != nil { + t.Fatalf("Failed to created http.Request: %v", err) + } + defer rsp.Body.Close() + buf, err := ioutil.ReadAll(rsp.Body) + if err != nil { + t.Fatal(err) + } + + jsonMsg := `{"msg":"Hello TEST"}` + if string(buf) != jsonMsg { + t.Fatalf("invalid message received, parsing error %s != %s", buf, jsonMsg) + } + select { + case <-ctx.Done(): + return + } + +} diff --git a/api/handler/rpc/rpc.go b/api/handler/rpc/rpc.go index 13c42070..c6bd6daf 100644 --- a/api/handler/rpc/rpc.go +++ b/api/handler/rpc/rpc.go @@ -12,7 +12,7 @@ import ( "github.com/joncalhoun/qson" "github.com/micro/go-micro/v2/api" "github.com/micro/go-micro/v2/api/handler" - proto "github.com/micro/go-micro/v2/api/internal/proto" + "github.com/micro/go-micro/v2/api/internal/proto" "github.com/micro/go-micro/v2/client" "github.com/micro/go-micro/v2/client/selector" "github.com/micro/go-micro/v2/codec" @@ -20,6 +20,7 @@ import ( "github.com/micro/go-micro/v2/codec/protorpc" "github.com/micro/go-micro/v2/errors" "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/metadata" "github.com/micro/go-micro/v2/registry" "github.com/micro/go-micro/v2/util/ctx" "github.com/oxtoacart/bpool" @@ -128,7 +129,6 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { so := selector.WithStrategy(strategy(service.Services)) // walk the standard call path - // get payload br, err := requestPayload(r) if err != nil { @@ -164,7 +164,12 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // marshall response - rsp, _ = response.Marshal() + rsp, err = response.Marshal() + if err != nil { + writeError(w, r, err) + return + } + default: // if json codec is not present set to json if !hasCodec(ct, jsonCodecs) { @@ -195,7 +200,11 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // marshall response - rsp, _ = response.MarshalJSON() + rsp, err = response.MarshalJSON() + if err != nil { + writeError(w, r, err) + return + } } // write the response @@ -219,8 +228,11 @@ func hasCodec(ct string, codecs []string) bool { // If the request is a GET the query string parameters are extracted and marshaled to JSON and the raw bytes are returned. // If the request method is a POST the request body is read and returned func requestPayload(r *http.Request) ([]byte, error) { + var err error + // we have to decode json-rpc and proto-rpc because we suck // well actually because there's no proxy codec right now + ct := r.Header.Get("Content-Type") switch { case strings.Contains(ct, "application/json-rpc"): @@ -229,11 +241,11 @@ func requestPayload(r *http.Request) ([]byte, error) { Header: make(map[string]string), } c := jsonrpc.NewCodec(&buffer{r.Body}) - if err := c.ReadHeader(&msg, codec.Request); err != nil { + if err = c.ReadHeader(&msg, codec.Request); err != nil { return nil, err } var raw json.RawMessage - if err := c.ReadBody(&raw); err != nil { + if err = c.ReadBody(&raw); err != nil { return nil, err } return ([]byte)(raw), nil @@ -243,15 +255,14 @@ func requestPayload(r *http.Request) ([]byte, error) { Header: make(map[string]string), } c := protorpc.NewCodec(&buffer{r.Body}) - if err := c.ReadHeader(&msg, codec.Request); err != nil { + if err = c.ReadHeader(&msg, codec.Request); err != nil { return nil, err } var raw proto.Message - if err := c.ReadBody(&raw); err != nil { + if err = c.ReadBody(&raw); err != nil { return nil, err } - b, err := raw.Marshal() - return b, err + return raw.Marshal() case strings.Contains(ct, "application/www-x-form-urlencoded"): r.ParseForm() @@ -262,43 +273,94 @@ func requestPayload(r *http.Request) ([]byte, error) { } // marshal - b, err := json.Marshal(vals) - return b, err + return json.Marshal(vals) // TODO: application/grpc } // otherwise as per usual + ctx := r.Context() + // dont user meadata.FromContext as it mangles names + md, ok := ctx.Value(metadata.MetadataKey{}).(metadata.Metadata) + if !ok { + md = make(map[string]string) + } + // allocate maximum + matches := make(map[string]string, len(md)) + for k, v := range md { + if strings.HasPrefix(k, "x-api-field-") { + matches[strings.TrimPrefix(k, "x-api-field-")] = v + } + delete(md, k) + } + + // restore context without fields + ctx = metadata.NewContext(ctx, md) + *r = *r.WithContext(ctx) + req := make(map[string]interface{}, len(md)) + for k, v := range matches { + ps := strings.Split(k, ".") + if len(ps) == 1 { + req[k] = v + continue + } + + em := make(map[string]interface{}) + em[ps[len(ps)-1]] = v + for i := len(ps) - 2; i > 0; i-- { + nm := make(map[string]interface{}) + nm[ps[i]] = em + em = nm + } + req[ps[0]] = em + } + pathbuf := []byte("{}") + if len(req) > 0 { + pathbuf, err = json.Marshal(req) + if err != nil { + return nil, err + } + } + urlbuf := []byte("{}") + if len(r.URL.RawQuery) > 0 { + urlbuf, err = qson.ToJSON(r.URL.RawQuery) + if err != nil { + return nil, err + } + } + + out, err := jsonpatch.MergeMergePatches(urlbuf, pathbuf) + if err != nil { + return nil, err + } switch r.Method { case "GET": - if len(r.URL.RawQuery) > 0 { - return qson.ToJSON(r.URL.RawQuery) + // empty response + if strings.Contains(ct, "application/json") && string(out) == "{}" { + return out, nil + } else if string(out) == "{}" && !strings.Contains(ct, "application/json") { + return []byte{}, nil } + return out, nil case "PATCH", "POST", "PUT", "DELETE": - urlParams := []byte("{}") - bodyParams := []byte("{}") - var err error - if len(r.URL.RawQuery) > 0 { - if urlParams, err = qson.ToJSON(r.URL.RawQuery); err != nil { - return nil, err - } - } - + bodybuf := []byte("{}") buf := bufferPool.Get() defer bufferPool.Put(buf) if _, err := buf.ReadFrom(r.Body); err != nil { return nil, err } if b := buf.Bytes(); len(b) > 0 { - bodyParams = b + bodybuf = b + } else { + return []byte{}, nil } - if out, err := jsonpatch.MergeMergePatches(urlParams, bodyParams); err == nil { + if out, err = jsonpatch.MergeMergePatches(out, bodybuf); err == nil { return out, nil } //fallback to previous unknown behaviour - return buf.Bytes(), nil + return bodybuf, nil } @@ -332,7 +394,7 @@ func writeError(w http.ResponseWriter, r *http.Request, err error) { } _, werr := w.Write([]byte(ce.Error())) - if err != nil { + if werr != nil { if logger.V(logger.ErrorLevel, logger.DefaultLogger) { logger.Error(werr) } @@ -351,6 +413,11 @@ func writeResponse(w http.ResponseWriter, r *http.Request, rsp []byte) { w.Header().Set("grpc-message", "") } + // write 204 status if rsp is nil + if len(rsp) == 0 { + w.WriteHeader(http.StatusNoContent) + } + // write response _, err := w.Write(rsp) if err != nil { diff --git a/api/router/registry/registry.go b/api/router/registry/registry.go index 5b61fca4..375aa7d0 100644 --- a/api/router/registry/registry.go +++ b/api/router/registry/registry.go @@ -250,6 +250,14 @@ func (r *registryRouter) Close() error { return nil } +func (r *registryRouter) Register(ep *api.Endpoint) error { + return nil +} + +func (r *registryRouter) Deregister(ep *api.Endpoint) error { + return nil +} + func (r *registryRouter) Endpoint(req *http.Request) (*api.Service, error) { if r.isClosed() { return nil, errors.New("router closed") diff --git a/api/router/router.go b/api/router/router.go index 7a2dc24a..18311804 100644 --- a/api/router/router.go +++ b/api/router/router.go @@ -15,6 +15,10 @@ type Router interface { Close() error // Endpoint returns an api.Service endpoint or an error if it does not exist Endpoint(r *http.Request) (*api.Service, error) + // Register endpoint in router + Register(ep *api.Endpoint) error + // Deregister endpoint from router + Deregister(ep *api.Endpoint) error // Route returns an api.Service route Route(r *http.Request) (*api.Service, error) } diff --git a/api/router/static/static.go b/api/router/static/static.go new file mode 100644 index 00000000..2f8a0962 --- /dev/null +++ b/api/router/static/static.go @@ -0,0 +1,304 @@ +package static + +import ( + "context" + "errors" + "fmt" + "net/http" + "regexp" + "strings" + "sync" + + "github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway/httprule" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/micro/go-micro/v2/api" + "github.com/micro/go-micro/v2/api/router" + "github.com/micro/go-micro/v2/logger" + "github.com/micro/go-micro/v2/metadata" +) + +type endpoint struct { + apiep *api.Endpoint + hostregs []*regexp.Regexp + pathregs []runtime.Pattern +} + +// router is the default router +type staticRouter struct { + exit chan bool + opts router.Options + sync.RWMutex + eps map[string]*endpoint +} + +func (r *staticRouter) isClosed() bool { + select { + case <-r.exit: + return true + default: + return false + } +} + +/* +// watch for endpoint changes +func (r *staticRouter) watch() { + var attempts int + + for { + if r.isClosed() { + return + } + + // watch for changes + w, err := r.opts.Registry.Watch() + if err != nil { + attempts++ + log.Println("Error watching endpoints", err) + time.Sleep(time.Duration(attempts) * time.Second) + continue + } + + ch := make(chan bool) + + go func() { + select { + case <-ch: + w.Stop() + case <-r.exit: + w.Stop() + } + }() + + // reset if we get here + attempts = 0 + + for { + // process next event + res, err := w.Next() + if err != nil { + log.Println("Error getting next endpoint", err) + close(ch) + break + } + r.process(res) + } + } +} +*/ + +func (r *staticRouter) Register(ep *api.Endpoint) error { + if err := api.Validate(ep); err != nil { + return err + } + + var pathregs []runtime.Pattern + var hostregs []*regexp.Regexp + + for _, h := range ep.Host { + if h == "" || h == "*" { + continue + } + hostreg, err := regexp.CompilePOSIX(h) + if err != nil { + return err + } + hostregs = append(hostregs, hostreg) + } + + for _, p := range ep.Path { + rule, err := httprule.Parse(p) + if err != nil { + return err + } + tpl := rule.Compile() + pathreg, err := runtime.NewPattern(tpl.Version, tpl.OpCodes, tpl.Pool, "") + if err != nil { + return err + } + pathregs = append(pathregs, pathreg) + } + + r.Lock() + r.eps[ep.Name] = &endpoint{apiep: ep, pathregs: pathregs, hostregs: hostregs} + r.Unlock() + return nil +} + +func (r *staticRouter) Deregister(ep *api.Endpoint) error { + if err := api.Validate(ep); err != nil { + return err + } + r.Lock() + delete(r.eps, ep.Name) + r.Unlock() + return nil +} + +func (r *staticRouter) Options() router.Options { + return r.opts +} + +func (r *staticRouter) Close() error { + select { + case <-r.exit: + return nil + default: + close(r.exit) + } + return nil +} + +func (r *staticRouter) Endpoint(req *http.Request) (*api.Service, error) { + ep, err := r.endpoint(req) + if err != nil { + return nil, err + } + + epf := strings.Split(ep.apiep.Name, ".") + services, err := r.opts.Registry.GetService(epf[0]) + if err != nil { + return nil, err + } + + // hack for stream endpoint + if ep.apiep.Stream { + for _, svc := range services { + for _, e := range svc.Endpoints { + e.Name = strings.Join(epf[1:], ".") + e.Metadata = make(map[string]string) + e.Metadata["stream"] = "true" + } + } + } + + svc := &api.Service{ + Name: epf[0], + Endpoint: &api.Endpoint{ + Name: strings.Join(epf[1:], "."), + Handler: "rpc", + Host: ep.apiep.Host, + Method: ep.apiep.Method, + Path: ep.apiep.Path, + }, + Services: services, + } + + return svc, nil +} + +func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) { + if r.isClosed() { + return nil, errors.New("router closed") + } + + r.RLock() + defer r.RUnlock() + + var idx int + if len(req.URL.Path) > 0 && req.URL.Path != "/" { + idx = 1 + } + path := strings.Split(req.URL.Path[idx:], "/") + // use the first match + // TODO: weighted matching + + for _, ep := range r.eps { + var mMatch, hMatch, pMatch bool + + // 1. try method + methodLoop: + for _, m := range ep.apiep.Method { + if m == req.Method { + mMatch = true + break methodLoop + } + } + if !mMatch { + continue + } + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("api method match %s", req.Method) + } + + // 2. try host + if len(ep.apiep.Host) == 0 { + hMatch = true + } else { + hostLoop: + for idx, h := range ep.apiep.Host { + if h == "" || h == "*" { + hMatch = true + break hostLoop + } else { + if ep.hostregs[idx].MatchString(req.URL.Host) { + hMatch = true + break hostLoop + } + } + } + } + if !hMatch { + continue + } + if logger.V(logger.DebugLevel, logger.DefaultLogger) { + logger.Debugf("api host match %s", req.URL.Host) + } + + // 3. try path + pathLoop: + for _, pathreg := range ep.pathregs { + matches, err := pathreg.Match(path, "") + if err != nil { + // TODO: log error + continue + } + pMatch = true + ctx := req.Context() + md, ok := metadata.FromContext(ctx) + if !ok { + md = make(metadata.Metadata) + } + for k, v := range matches { + md[fmt.Sprintf("x-api-field-%s", k)] = v + } + *req = *req.WithContext(context.WithValue(ctx, metadata.MetadataKey{}, md)) + break pathLoop + } + if !pMatch { + continue + } + // TODO: Percentage traffic + + // we got here, so its a match + return ep, nil + } + // no match + return nil, fmt.Errorf("endpoint not found for %v", req) +} + +func (r *staticRouter) Route(req *http.Request) (*api.Service, error) { + if r.isClosed() { + return nil, errors.New("router closed") + } + + // try get an endpoint + ep, err := r.Endpoint(req) + if err != nil { + return nil, err + } + + return ep, nil +} + +func NewRouter(opts ...router.Option) *staticRouter { + options := router.NewOptions(opts...) + r := &staticRouter{ + exit: make(chan bool), + opts: options, + eps: make(map[string]*endpoint), + } + //go r.watch() + //go r.refresh() + return r +} diff --git a/api/service/proto/api.pb.go b/api/service/proto/api.pb.go new file mode 100644 index 00000000..b643fd79 --- /dev/null +++ b/api/service/proto/api.pb.go @@ -0,0 +1,149 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: api.proto + +package go_micro_api + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Endpoint struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Host []string `protobuf:"bytes,2,rep,name=host,proto3" json:"host,omitempty"` + Path []string `protobuf:"bytes,3,rep,name=path,proto3" json:"path,omitempty"` + Method []string `protobuf:"bytes,4,rep,name=method,proto3" json:"method,omitempty"` + Stream bool `protobuf:"varint,5,opt,name=stream,proto3" json:"stream,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Endpoint) Reset() { *m = Endpoint{} } +func (m *Endpoint) String() string { return proto.CompactTextString(m) } +func (*Endpoint) ProtoMessage() {} +func (*Endpoint) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{0} +} + +func (m *Endpoint) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Endpoint.Unmarshal(m, b) +} +func (m *Endpoint) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Endpoint.Marshal(b, m, deterministic) +} +func (m *Endpoint) XXX_Merge(src proto.Message) { + xxx_messageInfo_Endpoint.Merge(m, src) +} +func (m *Endpoint) XXX_Size() int { + return xxx_messageInfo_Endpoint.Size(m) +} +func (m *Endpoint) XXX_DiscardUnknown() { + xxx_messageInfo_Endpoint.DiscardUnknown(m) +} + +var xxx_messageInfo_Endpoint proto.InternalMessageInfo + +func (m *Endpoint) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *Endpoint) GetHost() []string { + if m != nil { + return m.Host + } + return nil +} + +func (m *Endpoint) GetPath() []string { + if m != nil { + return m.Path + } + return nil +} + +func (m *Endpoint) GetMethod() []string { + if m != nil { + return m.Method + } + return nil +} + +func (m *Endpoint) GetStream() bool { + if m != nil { + return m.Stream + } + return false +} + +type EmptyResponse struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *EmptyResponse) Reset() { *m = EmptyResponse{} } +func (m *EmptyResponse) String() string { return proto.CompactTextString(m) } +func (*EmptyResponse) ProtoMessage() {} +func (*EmptyResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{1} +} + +func (m *EmptyResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_EmptyResponse.Unmarshal(m, b) +} +func (m *EmptyResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_EmptyResponse.Marshal(b, m, deterministic) +} +func (m *EmptyResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_EmptyResponse.Merge(m, src) +} +func (m *EmptyResponse) XXX_Size() int { + return xxx_messageInfo_EmptyResponse.Size(m) +} +func (m *EmptyResponse) XXX_DiscardUnknown() { + xxx_messageInfo_EmptyResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_EmptyResponse proto.InternalMessageInfo + +func init() { + proto.RegisterType((*Endpoint)(nil), "go.micro.api.Endpoint") + proto.RegisterType((*EmptyResponse)(nil), "go.micro.api.EmptyResponse") +} + +func init() { + proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) +} + +var fileDescriptor_00212fb1f9d3bf1c = []byte{ + // 201 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0xd0, 0x41, 0x4a, 0xc5, 0x30, + 0x10, 0x06, 0x60, 0x63, 0x9f, 0x8f, 0xbe, 0x41, 0x11, 0xb2, 0x78, 0x04, 0xdd, 0x94, 0xae, 0xde, + 0x2a, 0x0b, 0x3d, 0x41, 0xd1, 0x5e, 0x20, 0x37, 0x88, 0x76, 0x68, 0xb3, 0x48, 0x66, 0x48, 0x06, + 0xc1, 0x43, 0x78, 0x67, 0x49, 0xad, 0x50, 0xdc, 0xba, 0xfb, 0xe7, 0x5b, 0xfc, 0xfc, 0x0c, 0x9c, + 0x3c, 0x07, 0xcb, 0x99, 0x84, 0xf4, 0xed, 0x4c, 0x36, 0x86, 0xf7, 0x4c, 0xd6, 0x73, 0xe8, 0x3f, + 0xa0, 0x1d, 0xd3, 0xc4, 0x14, 0x92, 0x68, 0x0d, 0x87, 0xe4, 0x23, 0x1a, 0xd5, 0xa9, 0xcb, 0xc9, + 0xad, 0xb9, 0xda, 0x42, 0x45, 0xcc, 0x75, 0xd7, 0x54, 0xab, 0xb9, 0x1a, 0x7b, 0x59, 0x4c, 0xf3, + 0x63, 0x35, 0xeb, 0x33, 0x1c, 0x23, 0xca, 0x42, 0x93, 0x39, 0xac, 0xba, 0x5d, 0xd5, 0x8b, 0x64, + 0xf4, 0xd1, 0xdc, 0x74, 0xea, 0xd2, 0xba, 0xed, 0xea, 0xef, 0xe1, 0x6e, 0x8c, 0x2c, 0x9f, 0x0e, + 0x0b, 0x53, 0x2a, 0xf8, 0xf4, 0xa5, 0xa0, 0x19, 0x38, 0xe8, 0x01, 0x5a, 0x87, 0x73, 0x28, 0x82, + 0x59, 0x9f, 0xed, 0x7e, 0xab, 0xfd, 0x1d, 0xfa, 0xf0, 0xf8, 0xc7, 0xf7, 0x45, 0xfd, 0x95, 0x7e, + 0x01, 0x78, 0xc5, 0xfc, 0xbf, 0x92, 0xb7, 0xe3, 0xfa, 0xad, 0xe7, 0xef, 0x00, 0x00, 0x00, 0xff, + 0xff, 0x1f, 0xf0, 0xd9, 0x19, 0x3a, 0x01, 0x00, 0x00, +} diff --git a/api/service/proto/api.pb.micro.go b/api/service/proto/api.pb.micro.go new file mode 100644 index 00000000..6754bcca --- /dev/null +++ b/api/service/proto/api.pb.micro.go @@ -0,0 +1,102 @@ +// Code generated by protoc-gen-micro. DO NOT EDIT. +// source: api.proto + +package go_micro_api + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +import ( + context "context" + client "github.com/micro/go-micro/v2/client" + server "github.com/micro/go-micro/v2/server" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ client.Option +var _ server.Option + +// Client API for Api service + +type ApiService interface { + Register(ctx context.Context, in *Endpoint, opts ...client.CallOption) (*EmptyResponse, error) + Deregister(ctx context.Context, in *Endpoint, opts ...client.CallOption) (*EmptyResponse, error) +} + +type apiService struct { + c client.Client + name string +} + +func NewApiService(name string, c client.Client) ApiService { + return &apiService{ + c: c, + name: name, + } +} + +func (c *apiService) Register(ctx context.Context, in *Endpoint, opts ...client.CallOption) (*EmptyResponse, error) { + req := c.c.NewRequest(c.name, "Api.Register", in) + out := new(EmptyResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *apiService) Deregister(ctx context.Context, in *Endpoint, opts ...client.CallOption) (*EmptyResponse, error) { + req := c.c.NewRequest(c.name, "Api.Deregister", in) + out := new(EmptyResponse) + err := c.c.Call(ctx, req, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Api service + +type ApiHandler interface { + Register(context.Context, *Endpoint, *EmptyResponse) error + Deregister(context.Context, *Endpoint, *EmptyResponse) error +} + +func RegisterApiHandler(s server.Server, hdlr ApiHandler, opts ...server.HandlerOption) error { + type api interface { + Register(ctx context.Context, in *Endpoint, out *EmptyResponse) error + Deregister(ctx context.Context, in *Endpoint, out *EmptyResponse) error + } + type Api struct { + api + } + h := &apiHandler{hdlr} + return s.Handle(s.NewHandler(&Api{h}, opts...)) +} + +type apiHandler struct { + ApiHandler +} + +func (h *apiHandler) Register(ctx context.Context, in *Endpoint, out *EmptyResponse) error { + return h.ApiHandler.Register(ctx, in, out) +} + +func (h *apiHandler) Deregister(ctx context.Context, in *Endpoint, out *EmptyResponse) error { + return h.ApiHandler.Deregister(ctx, in, out) +} diff --git a/api/service/proto/api.proto b/api/service/proto/api.proto new file mode 100644 index 00000000..c4317d12 --- /dev/null +++ b/api/service/proto/api.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +package go.micro.api; + +service Api { + rpc Register(Endpoint) returns (EmptyResponse) {}; + rpc Deregister(Endpoint) returns (EmptyResponse) {}; +} + +message Endpoint { + string name = 1; + repeated string host = 2; + repeated string path = 3; + repeated string method = 4; + bool stream = 5; +} + +message EmptyResponse {} diff --git a/go.mod b/go.mod index 88d291a8..a0ce00d1 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/gorilla/websocket v1.4.1 github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect - github.com/grpc-ecosystem/grpc-gateway v1.9.5 // indirect + github.com/grpc-ecosystem/grpc-gateway v1.9.5 github.com/hashicorp/hcl v1.0.0 github.com/imdario/mergo v0.3.8 github.com/jonboulle/clockwork v0.1.0 // indirect diff --git a/metadata/metadata.go b/metadata/metadata.go index 63402fa9..5ba0e4c0 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -6,7 +6,7 @@ import ( "strings" ) -type metaKey struct{} +type MetadataKey struct{} // Metadata is our way of representing request headers internally. // They're used at the RPC level and translate back and forth @@ -41,7 +41,7 @@ func Set(ctx context.Context, k, v string) context.Context { md = make(Metadata) } md[k] = v - return context.WithValue(ctx, metaKey{}, md) + return context.WithValue(ctx, MetadataKey{}, md) } // Get returns a single value from metadata in the context @@ -64,7 +64,7 @@ func Get(ctx context.Context, key string) (string, bool) { // FromContext returns metadata from the given context func FromContext(ctx context.Context) (Metadata, bool) { - md, ok := ctx.Value(metaKey{}).(Metadata) + md, ok := ctx.Value(MetadataKey{}).(Metadata) if !ok { return nil, ok } @@ -80,7 +80,7 @@ func FromContext(ctx context.Context) (Metadata, bool) { // NewContext creates a new context with the given metadata func NewContext(ctx context.Context, md Metadata) context.Context { - return context.WithValue(ctx, metaKey{}, md) + return context.WithValue(ctx, MetadataKey{}, md) } // MergeContext merges metadata to existing metadata, overwriting if specified @@ -88,7 +88,7 @@ func MergeContext(ctx context.Context, patchMd Metadata, overwrite bool) context if ctx == nil { ctx = context.Background() } - md, _ := ctx.Value(metaKey{}).(Metadata) + md, _ := ctx.Value(MetadataKey{}).(Metadata) cmd := make(Metadata) for k, v := range md { cmd[k] = v @@ -100,5 +100,5 @@ func MergeContext(ctx context.Context, patchMd Metadata, overwrite bool) context cmd[k] = v } } - return context.WithValue(ctx, metaKey{}, cmd) + return context.WithValue(ctx, MetadataKey{}, cmd) } diff --git a/server/grpc/codec.go b/server/grpc/codec.go index db9706d0..55287f6b 100644 --- a/server/grpc/codec.go +++ b/server/grpc/codec.go @@ -20,7 +20,11 @@ type bytesCodec struct{} type protoCodec struct{} type wrapCodec struct{ encoding.Codec } -var jsonpbMarshaler = &jsonpb.Marshaler{} +var jsonpbMarshaler = &jsonpb.Marshaler{ + EnumsAsInts: false, + EmitDefaults: false, + OrigName: true, +} var ( defaultGRPCCodecs = map[string]encoding.Codec{