Handle streams

This commit is contained in:
Manfred Touron 2016-12-23 19:20:44 +01:00
parent f4f2300417
commit 718f9120e4
No known key found for this signature in database
GPG Key ID: 9CCF47DF1FD978A1
3 changed files with 133 additions and 64 deletions
examples/go-kit/templates/{{.File.Package}}/gen

@ -12,26 +12,59 @@ import (
var _ = fmt.Errorf
type StreamEndpoint func(server interface{}, req interface{}) (err error)
type Endpoints struct {
{{range .Service.Method}}
{{if or (.ClientStreaming) (.ServerStreaming)}}
{{.Name}}Endpoint StreamEndpoint
{{else}}
{{.Name}}Endpoint endpoint.Endpoint
{{end}}
{{end}}
}
{{range .Service.Method}}
/*{{. | prettyjson}}*/
func (e *Endpoints){{.Name}}(ctx context.Context, in *pb.{{.InputType | splitArray "." | last}}) (*pb.{{.OutputType | splitArray "." | last}}, error) {
{{if .ServerStreaming}}
{{if .ClientStreaming}}
func (e *Endpoints){{.Name}}(server pb.{{$file.Package | title}}Service_{{.Name}}Server) error {
return fmt.Errorf("not implemented")
}
{{else}}
func (e *Endpoints){{.Name}}(in *pb.{{.Name}}Request, server pb.{{$file.Package | title}}Service_{{.Name}}Server) error {
return fmt.Errorf("not implemented")
}
{{end}}
{{else}}
{{if .ClientStreaming}}
func (e *Endpoints){{.Name}}(server pb.{{$file.Package | title}}Service_{{.Name}}Server) error {
return fmt.Errorf("not implemented")
}
{{else}}
func (e *Endpoints){{.Name}}(ctx context.Context, in *pb.{{.InputType | splitArray "." | last}}) (*pb.{{.OutputType | splitArray "." | last}}, error) {
out, err := e.{{.Name}}Endpoint(ctx, in)
if err != nil {
return &pb.{{.OutputType | splitArray "." | last}}{ErrMsg: err.Error()}, err
}
return out.(*pb.{{.OutputType | splitArray "." | last}}), err
}
}
{{end}}
{{end}}
{{end}}
{{range .Service.Method}}
func Make{{.Name}}Endpoint(svc pb.{{$file.Package | title}}ServiceServer) endpoint.Endpoint {
{{if or (.ServerStreaming) (.ClientStreaming)}}
func Make{{.Name}}Endpoint(svc pb.{{$file.Package | title}}ServiceServer) StreamEndpoint {
return func(server interface{}, request interface{}) error {
{{if .ClientStreaming}}
return svc.{{.Name}}(server.(pb.{{$file.Package | title}}Service_{{.Name}}Server))
{{else}}
return svc.{{.Name}}(request.(*pb.{{.Name}}Request), server.(pb.{{$file.Package | title}}Service_{{.Name}}Server))
{{end}}
}
}
{{else}}
func Make{{.Name}}Endpoint(svc pb.{{$file.Package | title}}ServiceServer) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(*pb.{{.InputType | splitArray "." | last}})
rep, err := svc.{{.Name}}(ctx, req)
@ -40,7 +73,8 @@ func Make{{.Name}}Endpoint(svc pb.{{$file.Package | title}}ServiceServer) endpoi
}
return rep, nil
}
}
}
{{end}}
{{end}}
func MakeEndpoints(svc pb.{{.File.Package | title}}ServiceServer) Endpoints {

@ -18,42 +18,73 @@ func MakeGRPCServer(ctx context.Context, endpoints endpoints.Endpoints) pb.{{.Fi
options := []grpctransport.ServerOption{}
return &grpcServer{
{{range .Service.Method}}
{{if not .ServerStreaming}}
{{if not .ClientStreaming}}
{{if or (.ClientStreaming) (.ServerStreaming)}}
{{.Name | lower}}: &server{
e: endpoints.{{.Name}}Endpoint,
},
{{else}}
{{.Name | lower}}: grpctransport.NewServer(
ctx,
endpoints.{{.Name}}Endpoint,
decode{{.Name}}Request,
decodeRequest,
encode{{.Name}}Response,
options...,
),
{{end}}
{{end}}
{{end}}
}
}
type grpcServer struct {
{{range .Service.Method}}
{{if or (.ClientStreaming) (.ServerStreaming)}}
{{.Name | lower}} streamHandler
{{else}}
{{.Name | lower}} grpctransport.Handler
{{end}}
{{end}}
}
{{range .Service.Method}}
func (s *grpcServer) {{.Name}}(ctx context.Context, req *pb.{{.InputType | splitArray "." | last}}) (*pb.{{.OutputType | splitArray "." | last}}, error) {
{{if .ClientStreaming}}
func (s *grpcServer) {{.Name}}(server pb.{{$file.Package | title}}Service_{{.Name}}Server) error {
return s.{{.Name | lower}}.Do(server, nil)
}
{{else if .ServerStreaming}}
func (s *grpcServer) {{.Name}}(req *pb.{{.Name}}Request, server pb.{{$file.Package | title}}Service_{{.Name}}Server) error {
return s.{{.Name | lower}}.Do(server, req)
}
{{else}}
func (s *grpcServer) {{.Name}}(ctx context.Context, req *pb.{{.InputType | splitArray "." | last}}) (*pb.{{.OutputType | splitArray "." | last}}, error) {
_, rep, err := s.{{.Name | lower}}.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return rep.(*pb.{{.OutputType | splitArray "." | last}}), nil
}
}
func decode{{.Name}}Request(ctx context.Context, grpcReq interface{}) (interface{}, error) {
func encode{{.Name}}Response(ctx context.Context, response interface{}) (interface{}, error) {
resp := response.(*pb.{{.OutputType | splitArray "." | last}})
return resp, nil
}
{{end}}
{{end}}
func decodeRequest(ctx context.Context, grpcReq interface{}) (interface{}, error) {
return grpcReq, nil
}
func encode{{.Name}}Response(ctx context.Context, response interface{}) (interface{}, error) {
resp := response.(*pb.{{.OutputType | splitArray "." | last}})
return resp, nil
type streamHandler interface{
Do(server interface{}, req interface{}) (err error)
}
type server struct {
e endpoints.StreamEndpoint
}
func (s server) Do(server interface{}, req interface{}) (err error) {
if err := s.e(server, req); err != nil {
return err
}
return nil
}
{{end}}

@ -15,33 +15,37 @@ import (
)
{{range .Service.Method}}
func Make{{.Name}}Handler(ctx context.Context, svc pb.{{$file.Package | title}}ServiceServer, endpoint gokit_endpoint.Endpoint) *httptransport.Server {
{{if and (not .ServerStreaming) (not .ClientStreaming)}}
func Make{{.Name}}Handler(ctx context.Context, svc pb.{{$file.Package | title}}ServiceServer, endpoint gokit_endpoint.Endpoint) *httptransport.Server {
return httptransport.NewServer(
ctx,
endpoint,
decode{{.Name}}Request,
encode{{.Name}}Response,
encodeResponse,
[]httptransport.ServerOption{}...,
)
}
}
func decode{{.Name}}Request(ctx context.Context, r *http.Request) (interface{}, error) {
func decode{{.Name}}Request(ctx context.Context, r *http.Request) (interface{}, error) {
var req pb.{{.InputType | splitArray "." | last}}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, err
}
return &req, nil
}
}
{{end}}
{{end}}
func encode{{.Name}}Response(ctx context.Context, w http.ResponseWriter, response interface{}) error {
func encodeResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response)
}
{{end}}
func RegisterHandlers(ctx context.Context, svc pb.{{$file.Package | title}}ServiceServer, mux *http.ServeMux, endpoints endpoints.Endpoints) error {
{{range .Service.Method}}
{{if and (not .ServerStreaming) (not .ClientStreaming)}}
log.Println("new HTTP endpoint: \"/{{.Name}}\" (service={{$file.Package | title}})")
mux.Handle("/{{.Name}}", Make{{.Name}}Handler(ctx, svc, endpoints.{{.Name}}Endpoint))
{{end}}
{{end}}
return nil
}