Merge pull request #30 from moul/dev/moul/handle-streams

Go-kit example: Handle streams
This commit is contained in:
Manfred Touron 2016-12-24 11:00:56 +01:00 committed by GitHub
commit 4c53f8ebdd
3 changed files with 133 additions and 64 deletions

View File

@ -12,41 +12,75 @@ import (
var _ = fmt.Errorf var _ = fmt.Errorf
type StreamEndpoint func(server interface{}, req interface{}) (err error)
type Endpoints struct { type Endpoints struct {
{{range .Service.Method}} {{range .Service.Method}}
{{.Name}}Endpoint endpoint.Endpoint {{if or (.ClientStreaming) (.ServerStreaming)}}
{{.Name}}Endpoint StreamEndpoint
{{else}}
{{.Name}}Endpoint endpoint.Endpoint
{{end}}
{{end}} {{end}}
} }
{{range .Service.Method}} {{range .Service.Method}}
/*{{. | prettyjson}}*/ {{if .ServerStreaming}}
{{if .ClientStreaming}}
func (e *Endpoints){{.Name}}(ctx context.Context, in *pb.{{.InputType | splitArray "." | last}}) (*pb.{{.OutputType | splitArray "." | last}}, error) { func (e *Endpoints){{.Name}}(server pb.{{$file.Package | title}}Service_{{.Name}}Server) error {
out, err := e.{{.Name}}Endpoint(ctx, in) return fmt.Errorf("not implemented")
if err != nil { }
return &pb.{{.OutputType | splitArray "." | last}}{ErrMsg: err.Error()}, err {{else}}
} func (e *Endpoints){{.Name}}(in *pb.{{.Name}}Request, server pb.{{$file.Package | title}}Service_{{.Name}}Server) error {
return out.(*pb.{{.OutputType | splitArray "." | last}}), err return fmt.Errorf("not implemented")
} }
{{end}}
{{else}}
{{if .ClientStreaming}}
func (e *Endpoints){{.Name}}(server pb.{{$file.Package | title}}Service_{{.Name}}Server) error {
return fmt.Errorf("not implemented")
}
{{else}}
func (e *Endpoints){{.Name}}(ctx context.Context, in *pb.{{.InputType | splitArray "." | last}}) (*pb.{{.OutputType | splitArray "." | last}}, error) {
out, err := e.{{.Name}}Endpoint(ctx, in)
if err != nil {
return &pb.{{.OutputType | splitArray "." | last}}{ErrMsg: err.Error()}, err
}
return out.(*pb.{{.OutputType | splitArray "." | last}}), err
}
{{end}}
{{end}}
{{end}} {{end}}
{{range .Service.Method}} {{range .Service.Method}}
func Make{{.Name}}Endpoint(svc pb.{{$file.Package | title}}ServiceServer) endpoint.Endpoint { {{if or (.ServerStreaming) (.ClientStreaming)}}
return func(ctx context.Context, request interface{}) (interface{}, error) { func Make{{.Name}}Endpoint(svc pb.{{$file.Package | title}}ServiceServer) StreamEndpoint {
req := request.(*pb.{{.InputType | splitArray "." | last}}) return func(server interface{}, request interface{}) error {
rep, err := svc.{{.Name}}(ctx, req) {{if .ClientStreaming}}
if err != nil { return svc.{{.Name}}(server.(pb.{{$file.Package | title}}Service_{{.Name}}Server))
return &pb.{{.OutputType | splitArray "." | last}}{ErrMsg: err.Error()}, err {{else}}
return svc.{{.Name}}(request.(*pb.{{.Name}}Request), server.(pb.{{$file.Package | title}}Service_{{.Name}}Server))
{{end}}
}
} }
return rep, nil {{else}}
} func Make{{.Name}}Endpoint(svc pb.{{$file.Package | title}}ServiceServer) endpoint.Endpoint {
} return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(*pb.{{.InputType | splitArray "." | last}})
rep, err := svc.{{.Name}}(ctx, req)
if err != nil {
return &pb.{{.OutputType | splitArray "." | last}}{ErrMsg: err.Error()}, err
}
return rep, nil
}
}
{{end}}
{{end}} {{end}}
func MakeEndpoints(svc pb.{{.File.Package | title}}ServiceServer) Endpoints { func MakeEndpoints(svc pb.{{.File.Package | title}}ServiceServer) Endpoints {
return Endpoints{ return Endpoints{
{{range .Service.Method}} {{range .Service.Method}}
{{.Name}}Endpoint: Make{{.Name}}Endpoint(svc), {{.Name}}Endpoint: Make{{.Name}}Endpoint(svc),
{{end}} {{end}}
} }
} }

View File

@ -18,42 +18,73 @@ func MakeGRPCServer(ctx context.Context, endpoints endpoints.Endpoints) pb.{{.Fi
options := []grpctransport.ServerOption{} options := []grpctransport.ServerOption{}
return &grpcServer{ return &grpcServer{
{{range .Service.Method}} {{range .Service.Method}}
{{if not .ServerStreaming}} {{if or (.ClientStreaming) (.ServerStreaming)}}
{{if not .ClientStreaming}} {{.Name | lower}}: &server{
{{.Name | lower}}: grpctransport.NewServer( e: endpoints.{{.Name}}Endpoint,
ctx, },
endpoints.{{.Name}}Endpoint, {{else}}
decode{{.Name}}Request, {{.Name | lower}}: grpctransport.NewServer(
encode{{.Name}}Response, ctx,
options..., endpoints.{{.Name}}Endpoint,
), decodeRequest,
{{end}} encode{{.Name}}Response,
{{end}} options...,
),
{{end}}
{{end}} {{end}}
} }
} }
type grpcServer struct { type grpcServer struct {
{{range .Service.Method}} {{range .Service.Method}}
{{.Name | lower}} grpctransport.Handler {{if or (.ClientStreaming) (.ServerStreaming)}}
{{.Name | lower}} streamHandler
{{else}}
{{.Name | lower}} grpctransport.Handler
{{end}}
{{end}} {{end}}
} }
{{range .Service.Method}} {{range .Service.Method}}
func (s *grpcServer) {{.Name}}(ctx context.Context, req *pb.{{.InputType | splitArray "." | last}}) (*pb.{{.OutputType | splitArray "." | last}}, error) { {{if .ClientStreaming}}
_, rep, err := s.{{.Name | lower}}.ServeGRPC(ctx, req) func (s *grpcServer) {{.Name}}(server pb.{{$file.Package | title}}Service_{{.Name}}Server) error {
if err != nil { return s.{{.Name | lower}}.Do(server, nil)
return nil, err }
} {{else if .ServerStreaming}}
return rep.(*pb.{{.OutputType | splitArray "." | last}}), nil func (s *grpcServer) {{.Name}}(req *pb.{{.Name}}Request, server pb.{{$file.Package | title}}Service_{{.Name}}Server) error {
} return s.{{.Name | lower}}.Do(server, req)
}
{{else}}
func (s *grpcServer) {{.Name}}(ctx context.Context, req *pb.{{.InputType | splitArray "." | last}}) (*pb.{{.OutputType | splitArray "." | last}}, error) {
_, rep, err := s.{{.Name | lower}}.ServeGRPC(ctx, req)
if err != nil {
return nil, err
}
return rep.(*pb.{{.OutputType | splitArray "." | last}}), nil
}
func decode{{.Name}}Request(ctx context.Context, grpcReq interface{}) (interface{}, error) { func encode{{.Name}}Response(ctx context.Context, response interface{}) (interface{}, error) {
resp := response.(*pb.{{.OutputType | splitArray "." | last}})
return resp, nil
}
{{end}}
{{end}}
func decodeRequest(ctx context.Context, grpcReq interface{}) (interface{}, error) {
return grpcReq, nil return grpcReq, nil
} }
func encode{{.Name}}Response(ctx context.Context, response interface{}) (interface{}, error) { type streamHandler interface{
resp := response.(*pb.{{.OutputType | splitArray "." | last}}) Do(server interface{}, req interface{}) (err error)
return resp, nil }
type server struct {
e endpoints.StreamEndpoint
}
func (s server) Do(server interface{}, req interface{}) (err error) {
if err := s.e(server, req); err != nil {
return err
}
return nil
} }
{{end}}

View File

@ -15,33 +15,37 @@ import (
) )
{{range .Service.Method}} {{range .Service.Method}}
func Make{{.Name}}Handler(ctx context.Context, svc pb.{{$file.Package | title}}ServiceServer, endpoint gokit_endpoint.Endpoint) *httptransport.Server { {{if and (not .ServerStreaming) (not .ClientStreaming)}}
return httptransport.NewServer( func Make{{.Name}}Handler(ctx context.Context, svc pb.{{$file.Package | title}}ServiceServer, endpoint gokit_endpoint.Endpoint) *httptransport.Server {
ctx, return httptransport.NewServer(
endpoint, ctx,
decode{{.Name}}Request, endpoint,
encode{{.Name}}Response, decode{{.Name}}Request,
[]httptransport.ServerOption{}..., encodeResponse,
) []httptransport.ServerOption{}...,
} )
}
func decode{{.Name}}Request(ctx context.Context, r *http.Request) (interface{}, error) { func decode{{.Name}}Request(ctx context.Context, r *http.Request) (interface{}, error) {
var req pb.{{.InputType | splitArray "." | last}} var req pb.{{.InputType | splitArray "." | last}}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil { if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, err return nil, err
} }
return &req, nil return &req, nil
} }
{{end}}
{{end}}
func encode{{.Name}}Response(ctx context.Context, w http.ResponseWriter, response interface{}) error { func encodeResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
return json.NewEncoder(w).Encode(response) return json.NewEncoder(w).Encode(response)
} }
{{end}}
func RegisterHandlers(ctx context.Context, svc pb.{{$file.Package | title}}ServiceServer, mux *http.ServeMux, endpoints endpoints.Endpoints) error { func RegisterHandlers(ctx context.Context, svc pb.{{$file.Package | title}}ServiceServer, mux *http.ServeMux, endpoints endpoints.Endpoints) error {
{{range .Service.Method}} {{range .Service.Method}}
log.Println("new HTTP endpoint: \"/{{.Name}}\" (service={{$file.Package | title}})") {{if and (not .ServerStreaming) (not .ClientStreaming)}}
mux.Handle("/{{.Name}}", Make{{.Name}}Handler(ctx, svc, endpoints.{{.Name}}Endpoint)) log.Println("new HTTP endpoint: \"/{{.Name}}\" (service={{$file.Package | title}})")
mux.Handle("/{{.Name}}", Make{{.Name}}Handler(ctx, svc, endpoints.{{.Name}}Endpoint))
{{end}}
{{end}} {{end}}
return nil return nil
} }