From 718f9120e4af4ad15e7a40a7d7697f2e9856bfaf Mon Sep 17 00:00:00 2001 From: Manfred Touron Date: Fri, 23 Dec 2016 19:20:44 +0100 Subject: [PATCH] Handle streams --- .../gen/endpoints/endpoints.go.tmpl | 74 ++++++++++++----- .../gen/transports/grpc/grpc.go.tmpl | 79 +++++++++++++------ .../gen/transports/http/http.go.tmpl | 44 ++++++----- 3 files changed, 133 insertions(+), 64 deletions(-) diff --git a/examples/go-kit/templates/{{.File.Package}}/gen/endpoints/endpoints.go.tmpl b/examples/go-kit/templates/{{.File.Package}}/gen/endpoints/endpoints.go.tmpl index b970831..d61d065 100644 --- a/examples/go-kit/templates/{{.File.Package}}/gen/endpoints/endpoints.go.tmpl +++ b/examples/go-kit/templates/{{.File.Package}}/gen/endpoints/endpoints.go.tmpl @@ -12,41 +12,75 @@ import ( var _ = fmt.Errorf +type StreamEndpoint func(server interface{}, req interface{}) (err error) + type Endpoints struct { {{range .Service.Method}} - {{.Name}}Endpoint endpoint.Endpoint + {{if or (.ClientStreaming) (.ServerStreaming)}} + {{.Name}}Endpoint StreamEndpoint + {{else}} + {{.Name}}Endpoint endpoint.Endpoint + {{end}} {{end}} } {{range .Service.Method}} -/*{{. | prettyjson}}*/ - -func (e *Endpoints){{.Name}}(ctx context.Context, in *pb.{{.InputType | splitArray "." | last}}) (*pb.{{.OutputType | splitArray "." | last}}, error) { - out, err := e.{{.Name}}Endpoint(ctx, in) - if err != nil { - return &pb.{{.OutputType | splitArray "." | last}}{ErrMsg: err.Error()}, err - } - return out.(*pb.{{.OutputType | splitArray "." | last}}), err -} + {{if .ServerStreaming}} + {{if .ClientStreaming}} + func (e *Endpoints){{.Name}}(server pb.{{$file.Package | title}}Service_{{.Name}}Server) error { + return fmt.Errorf("not implemented") + } + {{else}} + func (e *Endpoints){{.Name}}(in *pb.{{.Name}}Request, server pb.{{$file.Package | title}}Service_{{.Name}}Server) error { + return fmt.Errorf("not implemented") + } + {{end}} + {{else}} + {{if .ClientStreaming}} + func (e *Endpoints){{.Name}}(server pb.{{$file.Package | title}}Service_{{.Name}}Server) error { + return fmt.Errorf("not implemented") + } + {{else}} + func (e *Endpoints){{.Name}}(ctx context.Context, in *pb.{{.InputType | splitArray "." | last}}) (*pb.{{.OutputType | splitArray "." | last}}, error) { + out, err := e.{{.Name}}Endpoint(ctx, in) + if err != nil { + return &pb.{{.OutputType | splitArray "." | last}}{ErrMsg: err.Error()}, err + } + return out.(*pb.{{.OutputType | splitArray "." | last}}), err + } + {{end}} + {{end}} {{end}} {{range .Service.Method}} -func Make{{.Name}}Endpoint(svc pb.{{$file.Package | title}}ServiceServer) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - req := request.(*pb.{{.InputType | splitArray "." | last}}) - rep, err := svc.{{.Name}}(ctx, req) - if err != nil { - return &pb.{{.OutputType | splitArray "." | last}}{ErrMsg: err.Error()}, err + {{if or (.ServerStreaming) (.ClientStreaming)}} + func Make{{.Name}}Endpoint(svc pb.{{$file.Package | title}}ServiceServer) StreamEndpoint { + return func(server interface{}, request interface{}) error { + {{if .ClientStreaming}} + return svc.{{.Name}}(server.(pb.{{$file.Package | title}}Service_{{.Name}}Server)) + {{else}} + return svc.{{.Name}}(request.(*pb.{{.Name}}Request), server.(pb.{{$file.Package | title}}Service_{{.Name}}Server)) + {{end}} + } } - return rep, nil - } -} + {{else}} + func Make{{.Name}}Endpoint(svc pb.{{$file.Package | title}}ServiceServer) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(*pb.{{.InputType | splitArray "." | last}}) + rep, err := svc.{{.Name}}(ctx, req) + if err != nil { + return &pb.{{.OutputType | splitArray "." | last}}{ErrMsg: err.Error()}, err + } + return rep, nil + } + } + {{end}} {{end}} func MakeEndpoints(svc pb.{{.File.Package | title}}ServiceServer) Endpoints { return Endpoints{ {{range .Service.Method}} - {{.Name}}Endpoint: Make{{.Name}}Endpoint(svc), + {{.Name}}Endpoint: Make{{.Name}}Endpoint(svc), {{end}} } } diff --git a/examples/go-kit/templates/{{.File.Package}}/gen/transports/grpc/grpc.go.tmpl b/examples/go-kit/templates/{{.File.Package}}/gen/transports/grpc/grpc.go.tmpl index 5062394..260fec5 100644 --- a/examples/go-kit/templates/{{.File.Package}}/gen/transports/grpc/grpc.go.tmpl +++ b/examples/go-kit/templates/{{.File.Package}}/gen/transports/grpc/grpc.go.tmpl @@ -18,42 +18,73 @@ func MakeGRPCServer(ctx context.Context, endpoints endpoints.Endpoints) pb.{{.Fi options := []grpctransport.ServerOption{} return &grpcServer{ {{range .Service.Method}} - {{if not .ServerStreaming}} - {{if not .ClientStreaming}} - {{.Name | lower}}: grpctransport.NewServer( - ctx, - endpoints.{{.Name}}Endpoint, - decode{{.Name}}Request, - encode{{.Name}}Response, - options..., - ), - {{end}} - {{end}} + {{if or (.ClientStreaming) (.ServerStreaming)}} + {{.Name | lower}}: &server{ + e: endpoints.{{.Name}}Endpoint, + }, + {{else}} + {{.Name | lower}}: grpctransport.NewServer( + ctx, + endpoints.{{.Name}}Endpoint, + decodeRequest, + encode{{.Name}}Response, + options..., + ), + {{end}} {{end}} } } type grpcServer struct { {{range .Service.Method}} - {{.Name | lower}} grpctransport.Handler + {{if or (.ClientStreaming) (.ServerStreaming)}} + {{.Name | lower}} streamHandler + {{else}} + {{.Name | lower}} grpctransport.Handler + {{end}} {{end}} } {{range .Service.Method}} -func (s *grpcServer) {{.Name}}(ctx context.Context, req *pb.{{.InputType | splitArray "." | last}}) (*pb.{{.OutputType | splitArray "." | last}}, error) { - _, rep, err := s.{{.Name | lower}}.ServeGRPC(ctx, req) - if err != nil { - return nil, err - } - return rep.(*pb.{{.OutputType | splitArray "." | last}}), nil -} + {{if .ClientStreaming}} + func (s *grpcServer) {{.Name}}(server pb.{{$file.Package | title}}Service_{{.Name}}Server) error { + return s.{{.Name | lower}}.Do(server, nil) + } + {{else if .ServerStreaming}} + func (s *grpcServer) {{.Name}}(req *pb.{{.Name}}Request, server pb.{{$file.Package | title}}Service_{{.Name}}Server) error { + return s.{{.Name | lower}}.Do(server, req) + } + {{else}} + func (s *grpcServer) {{.Name}}(ctx context.Context, req *pb.{{.InputType | splitArray "." | last}}) (*pb.{{.OutputType | splitArray "." | last}}, error) { + _, rep, err := s.{{.Name | lower}}.ServeGRPC(ctx, req) + if err != nil { + return nil, err + } + return rep.(*pb.{{.OutputType | splitArray "." | last}}), nil + } -func decode{{.Name}}Request(ctx context.Context, grpcReq interface{}) (interface{}, error) { + func encode{{.Name}}Response(ctx context.Context, response interface{}) (interface{}, error) { + resp := response.(*pb.{{.OutputType | splitArray "." | last}}) + return resp, nil + } + {{end}} +{{end}} + +func decodeRequest(ctx context.Context, grpcReq interface{}) (interface{}, error) { return grpcReq, nil } -func encode{{.Name}}Response(ctx context.Context, response interface{}) (interface{}, error) { - resp := response.(*pb.{{.OutputType | splitArray "." | last}}) - return resp, nil +type streamHandler interface{ + Do(server interface{}, req interface{}) (err error) +} + +type server struct { + e endpoints.StreamEndpoint +} + +func (s server) Do(server interface{}, req interface{}) (err error) { + if err := s.e(server, req); err != nil { + return err + } + return nil } -{{end}} diff --git a/examples/go-kit/templates/{{.File.Package}}/gen/transports/http/http.go.tmpl b/examples/go-kit/templates/{{.File.Package}}/gen/transports/http/http.go.tmpl index cf27b2a..fd541b1 100644 --- a/examples/go-kit/templates/{{.File.Package}}/gen/transports/http/http.go.tmpl +++ b/examples/go-kit/templates/{{.File.Package}}/gen/transports/http/http.go.tmpl @@ -15,33 +15,37 @@ import ( ) {{range .Service.Method}} -func Make{{.Name}}Handler(ctx context.Context, svc pb.{{$file.Package | title}}ServiceServer, endpoint gokit_endpoint.Endpoint) *httptransport.Server { - return httptransport.NewServer( - ctx, - endpoint, - decode{{.Name}}Request, - encode{{.Name}}Response, - []httptransport.ServerOption{}..., - ) -} + {{if and (not .ServerStreaming) (not .ClientStreaming)}} + func Make{{.Name}}Handler(ctx context.Context, svc pb.{{$file.Package | title}}ServiceServer, endpoint gokit_endpoint.Endpoint) *httptransport.Server { + return httptransport.NewServer( + ctx, + endpoint, + decode{{.Name}}Request, + encodeResponse, + []httptransport.ServerOption{}..., + ) + } -func decode{{.Name}}Request(ctx context.Context, r *http.Request) (interface{}, error) { - var req pb.{{.InputType | splitArray "." | last}} - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - return nil, err - } - return &req, nil -} + func decode{{.Name}}Request(ctx context.Context, r *http.Request) (interface{}, error) { + var req pb.{{.InputType | splitArray "." | last}} + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return nil, err + } + return &req, nil + } + {{end}} +{{end}} -func encode{{.Name}}Response(ctx context.Context, w http.ResponseWriter, response interface{}) error { +func encodeResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error { return json.NewEncoder(w).Encode(response) } -{{end}} func RegisterHandlers(ctx context.Context, svc pb.{{$file.Package | title}}ServiceServer, mux *http.ServeMux, endpoints endpoints.Endpoints) error { {{range .Service.Method}} - log.Println("new HTTP endpoint: \"/{{.Name}}\" (service={{$file.Package | title}})") - mux.Handle("/{{.Name}}", Make{{.Name}}Handler(ctx, svc, endpoints.{{.Name}}Endpoint)) + {{if and (not .ServerStreaming) (not .ClientStreaming)}} + log.Println("new HTTP endpoint: \"/{{.Name}}\" (service={{$file.Package | title}})") + mux.Handle("/{{.Name}}", Make{{.Name}}Handler(ctx, svc, endpoints.{{.Name}}Endpoint)) + {{end}} {{end}} return nil }