Compare commits

..

4 Commits

Author SHA1 Message Date
1213bace75 Merge pull request 'quic fix' (#166) from quicfix into v3
Reviewed-on: #166
2023-06-10 17:09:26 +03:00
3844d9484c quic fix
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-06-10 17:09:01 +03:00
b978e58cf9 Merge pull request 'add quic connection transport support' (#165) from quicv3 into v3
Reviewed-on: #165
2023-06-10 17:06:00 +03:00
a793983ed2 add quic connection transport support
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2023-06-10 17:05:34 +03:00
9 changed files with 2062 additions and 557 deletions

19
go.mod
View File

@@ -1,17 +1,12 @@
module go.unistack.org/micro-server-grpc/v3
go 1.20
go 1.16
require (
github.com/golang/protobuf v1.5.3
go.unistack.org/micro/v3 v3.10.31
golang.org/x/net v0.17.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
)
require (
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
github.com/golang/protobuf v1.5.2
github.com/quic-go/quic-go v0.35.1
go.unistack.org/micro/v3 v3.10.14
golang.org/x/net v0.7.0
google.golang.org/grpc v1.52.3
google.golang.org/protobuf v1.28.1
)

1224
go.sum

File diff suppressed because it is too large Load Diff

101
grpc.go
View File

@@ -26,6 +26,7 @@ import (
"golang.org/x/net/netutil"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
gmetadata "google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
@@ -45,15 +46,14 @@ type ServerReflection struct {
*/
type Server struct {
handlers map[string]server.Handler
srv *grpc.Server
exit chan chan error
wg *sync.WaitGroup
rsvc *register.Service
subscribers map[*subscriber][]broker.Subscriber
rpc *rServer
opts server.Options
unknownHandler grpc.StreamHandler
handlers map[string]server.Handler
srv *grpc.Server
exit chan chan error
wg *sync.WaitGroup
rsvc *register.Service
subscribers map[*subscriber][]broker.Subscriber
rpc *rServer
opts server.Options
sync.RWMutex
started bool
registered bool
@@ -140,6 +140,10 @@ func (g *Server) configure(opts ...server.Option) error {
grpc.UnknownServiceHandler(g.handler),
}
if creds := g.getCredentials(); creds != nil {
gopts = append(gopts, grpc.Creds(creds))
}
if opts := g.getGrpcOptions(); opts != nil {
gopts = append(opts, gopts...)
}
@@ -158,10 +162,6 @@ func (g *Server) configure(opts ...server.Option) error {
g.reflection = v
}
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
g.unknownHandler = h
}
if restart {
return g.Start()
}
@@ -180,6 +180,13 @@ func (g *Server) getMaxMsgSize() int {
return s
}
func (g *Server) getCredentials() credentials.TransportCredentials {
if g.opts.TLSConfig != nil {
return credentials.NewTLS(g.opts.TLSConfig)
}
return nil
}
func (g *Server) getGrpcOptions() []grpc.ServerOption {
if g.opts.Context == nil {
return nil
@@ -214,6 +221,13 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
config.Logger.Error(config.Context, string(debug.Stack()))
}
err = errors.InternalServerError(g.opts.Name, "panic in %s.%s recovered: %v", serviceName, methodName, r)
} else if err != nil {
g.RLock()
config := g.opts
g.RUnlock()
if config.Logger.V(logger.ErrorLevel) {
config.Logger.Errorf(config.Context, "grpc handler %s.%s got error: %s", serviceName, methodName, err)
}
}
}()
@@ -232,10 +246,6 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
for k, v := range gmd {
md.Set(k, strings.Join(v, ", "))
}
md.Set("Path", fullMethod)
md.Set("Micro-Server", "grpc")
md.Set(metadata.HeaderEndpoint, methodName)
md.Set(metadata.HeaderService, serviceName)
var td string
// timeout for server deadline
@@ -317,16 +327,20 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
*/
if svc == nil {
if g.unknownHandler != nil {
return g.unknownHandler(srv, stream)
if g.opts.Context != nil {
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
return h(srv, stream)
}
}
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err()
}
mtype := svc.method[methodName]
if mtype == nil {
if g.unknownHandler != nil {
return g.unknownHandler(srv, stream)
if g.opts.Context != nil {
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
return h(srv, stream)
}
}
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service method %s.%s", serviceName, methodName)).Err()
}
@@ -441,8 +455,53 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
}
return status.New(statusCode, statusDesc).Err()
// }
}
/*
type reflectStream struct {
stream server.Stream
}
func (s *reflectStream) Send(rsp *grpcreflect.ServerReflectionResponse) error {
return s.stream.Send(rsp)
}
func (s *reflectStream) Recv() (*grpcreflect.ServerReflectionRequest, error) {
req := &grpcreflect.ServerReflectionRequest{}
err := s.stream.Recv(req)
return req, err
}
func (s *reflectStream) SetHeader(gmetadata.MD) error {
return nil
}
func (s *reflectStream) SendHeader(gmetadata.MD) error {
return nil
}
func (s *reflectStream) SetTrailer(gmetadata.MD) {
}
func (s *reflectStream) Context() context.Context {
return s.stream.Context()
}
func (s *reflectStream) SendMsg(m interface{}) error {
return s.stream.Send(m)
}
func (s *reflectStream) RecvMsg(m interface{}) error {
return s.stream.Recv(m)
}
func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
return g.s.ServerReflectionInfo(&reflectStream{stream})
}
*/
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
opts := g.opts

View File

@@ -36,6 +36,7 @@ func Options(opts ...grpc.ServerOption) server.Option {
return server.SetOption(grpcOptions{}, opts)
}
//
// MaxMsgSize set the maximum message in bytes the server can receive and
// send. Default maximum message size is 4 MB.
func MaxMsgSize(s int) server.Option {
@@ -43,8 +44,8 @@ func MaxMsgSize(s int) server.Option {
}
// Reflection enables reflection support in grpc server
func Reflection(r Reflector) server.Option {
return server.SetOption(reflectionKey{}, r)
func Reflection(b bool) server.Option {
return server.SetOption(reflectionKey{}, b)
}
// UnknownServiceHandler enables support for all services

166
quic/quic_net.go Normal file
View File

@@ -0,0 +1,166 @@
// grpc over quic mostly based on https://github.com/sssgun/grpc-quic
// copyright sssgun with MIT license
package grpcquic // import "go.unistack.org/micro-server-grpc/v3/quic"
import (
"context"
"crypto/tls"
"net"
"time"
quic "github.com/quic-go/quic-go"
)
///////////////////////////////////////////////////////////////////////////////
// Connection
var _ net.Conn = (*Conn)(nil)
type Conn struct {
conn quic.Connection
stream quic.Stream
}
func NewConn(conn quic.Connection) (net.Conn, error) {
stream, err := conn.OpenStreamSync(context.Background())
if err != nil {
return nil, err
}
return &Conn{conn, stream}, nil
}
// Read reads data from the connection.
// Read can be made to time out and return an Error with Timeout() == true
// after a fixed time limit; see SetDeadline and SetReadDeadline.
func (c *Conn) Read(b []byte) (n int, err error) {
return c.stream.Read(b)
}
// Write writes data to the connection.
// Write can be made to time out and return an Error with Timeout() == true
// after a fixed time limit; see SetDeadline and SetWriteDeadline.
func (c *Conn) Write(b []byte) (n int, err error) {
return c.stream.Write(b)
}
// Close closes the connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (c *Conn) Close() error {
// @TODO: log this
c.stream.Close()
return c.conn.CloseWithError(0, "")
}
// LocalAddr returns the local network address.
func (c *Conn) LocalAddr() net.Addr {
return c.conn.LocalAddr()
}
// RemoteAddr returns the remote network address.
func (c *Conn) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
// SetDeadline sets the read and write deadlines associated
// with the connection. It is equivalent to calling both
// SetReadDeadline and SetWriteDeadline.
//
// A deadline is an absolute time after which I/O operations
// fail with a timeout (see type Error) instead of
// blocking. The deadline applies to all future and pending
// I/O, not just the immediately following call to Read or
// Write. After a deadline has been exceeded, the connection
// can be refreshed by setting a deadline in the future.
//
// An idle timeout can be implemented by repeatedly extending
// the deadline after successful Read or Write calls.
//
// A zero value for t means I/O operations will not time out.
func (c *Conn) SetDeadline(t time.Time) error {
return c.stream.SetDeadline(t)
}
// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
func (c *Conn) SetReadDeadline(t time.Time) error {
return c.stream.SetReadDeadline(t)
}
// SetWriteDeadline sets the deadline for future Write calls
// and any currently-blocked Write call.
// Even if write times out, it may return n > 0, indicating that
// some of the data was successfully written.
// A zero value for t means Write will not time out.
func (c *Conn) SetWriteDeadline(t time.Time) error {
return c.stream.SetWriteDeadline(t)
}
///////////////////////////////////////////////////////////////////////////////
// Listener
var _ net.Listener = (*Listener)(nil)
type Listener struct {
ql quic.Listener
}
func Listen(ql quic.Listener) net.Listener {
return &Listener{ql}
}
// Accept waits for and returns the next connection to the listener.
func (l *Listener) Accept() (net.Conn, error) {
sess, err := l.ql.Accept(context.Background())
if err != nil {
return nil, err
}
stream, err := sess.AcceptStream(context.Background())
if err != nil {
return nil, err
}
return &Conn{sess, stream}, nil
}
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
func (l *Listener) Close() error {
return l.ql.Close()
}
// Addr returns the listener's network address.
func (l *Listener) Addr() net.Addr {
return l.ql.Addr()
}
///////////////////////////////////////////////////////////////////////////////
// Dialer
var QuicConfig = &quic.Config{
KeepAlivePeriod: 10 * time.Second,
}
func NewPacketConn(addr string) (net.PacketConn, error) {
// create a packet conn for outgoing connections
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
return net.ListenUDP("udp", udpAddr)
}
func NewQuicDialer(tlsConf *tls.Config) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, target string) (net.Conn, error) {
sess, err := quic.DialAddr(ctx, target, tlsConf, QuicConfig)
if err != nil {
return nil, err
}
return NewConn(sess)
}
}

117
quic/quic_transport.go Normal file
View File

@@ -0,0 +1,117 @@
// grpc over quic mostly based on https://github.com/sssgun/grpc-quic
// copyright sssgun with MIT license
package grpcquic
import (
"context"
"crypto/tls"
"net"
"google.golang.org/grpc/credentials"
)
var _ credentials.AuthInfo = (*Info)(nil)
// Info contains the auth information
type Info struct {
conn *Conn
}
func NewInfo(c *Conn) *Info {
return &Info{c}
}
// AuthType returns the type of Info as a string.
func (i *Info) AuthType() string {
return "quic-tls"
}
func (i *Info) Conn() net.Conn {
return i.conn
}
var _ credentials.TransportCredentials = (*Credentials)(nil)
type Credentials struct {
tlsConfig *tls.Config
isQuicConnection bool
serverName string
grpcCreds credentials.TransportCredentials
}
func NewCredentials(tlsConfig *tls.Config) credentials.TransportCredentials {
grpcCreds := credentials.NewTLS(tlsConfig)
return &Credentials{
grpcCreds: grpcCreds,
tlsConfig: tlsConfig,
}
}
// ClientHandshake does the authentication handshake specified by the corresponding
// authentication protocol on rawConn for clients. It returns the authenticated
// connection and the corresponding auth information about the connection.
// Implementations must use the provided context to implement timely cancellation.
// gRPC will try to reconnect if the error returned is a temporary error
// (io.EOF, context.DeadlineExceeded or err.Temporary() == true).
// If the returned error is a wrapper error, implementations should make sure that
// the error implements Temporary() to have the correct retry behaviors.
//
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
func (pt *Credentials) ClientHandshake(ctx context.Context, authority string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
if c, ok := conn.(*Conn); ok {
pt.isQuicConnection = true
return conn, NewInfo(c), nil
}
return pt.grpcCreds.ClientHandshake(ctx, authority, conn)
}
// ServerHandshake does the authentication handshake for servers. It returns
// the authenticated connection and the corresponding auth information about
// the connection.
//
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
func (pt *Credentials) ServerHandshake(conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
if c, ok := conn.(*Conn); ok {
pt.isQuicConnection = true
ainfo := NewInfo(c)
return conn, ainfo, nil
}
return pt.grpcCreds.ServerHandshake(conn)
}
// Info provides the ProtocolInfo of this Credentials.
func (pt *Credentials) Info() credentials.ProtocolInfo {
if pt.isQuicConnection {
return credentials.ProtocolInfo{
// ProtocolVersion is the gRPC wire protocol version.
ProtocolVersion: "/quic/1.0.0",
// SecurityProtocol is the security protocol in use.
SecurityProtocol: "quic-tls",
// SecurityVersion is the security protocol version.
// SecurityVersion: "1.2.0",
// ServerName is the user-configured server name.
ServerName: pt.serverName,
}
}
return pt.grpcCreds.Info()
}
// Clone makes a copy of this Credentials.
func (pt *Credentials) Clone() credentials.TransportCredentials {
return &Credentials{
tlsConfig: pt.tlsConfig.Clone(),
grpcCreds: pt.grpcCreds.Clone(),
}
}
// OverrideServerName overrides the server name used to verify the hostname on the returned certificates from the server.
// gRPC internals also use it to override the virtual hosting name if it is set.
// It must be called before dialing. Currently, this is only used by grpclb.
func (pt *Credentials) OverrideServerName(name string) error {
pt.serverName = name
return pt.grpcCreds.OverrideServerName(name)
}

View File

@@ -1,23 +1,488 @@
// +build ignore
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
Package reflection implements server reflection service.
The service implemented is defined in:
https://github.com/grpc/grpc/blob/master/src/proto/grpc/reflection/v1alpha/reflection.proto.
To register server reflection on a gRPC server:
import "google.golang.org/grpc/reflection"
s := grpc.NewServer()
pb.RegisterYourOwnServer(s, &server{})
// Register reflection service on gRPC server.
reflection.Register(s)
s.Serve(lis)
*/
package grpc
import (
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/reflect/protodesc"
"bytes"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"reflect"
"sort"
"sync"
"github.com/golang/protobuf/proto"
dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/grpc/status"
)
type Reflector interface {
protodesc.Resolver
reflection.ServiceInfoProvider
reflection.ExtensionResolver
type serverReflectionServer struct {
rpb.UnimplementedServerReflectionServer
s *grpc.Server
initSymbols sync.Once
serviceNames []string
symbols map[string]*dpb.FileDescriptorProto // map of fully-qualified names to files
}
const (
// ReflectV1ServiceName is the fully-qualified name of the v1 version of the reflection service.
ReflectV1ServiceName = "grpc.reflection.v1.ServerReflection"
// ReflectV1AlphaServiceName is the fully-qualified name of the v1alpha version of the reflection service.
ReflectV1AlphaServiceName = "grpc.reflection.v1alpha.ServerReflection"
// Register registers the server reflection service on the given gRPC server.
func Register(s *grpc.Server) {
rpb.RegisterServerReflectionServer(s, &serverReflectionServer{
s: s,
})
}
ReflectServiceURLPathV1 = "/" + ReflectV1ServiceName + "/"
ReflectServiceURLPathV1Alpha = "/" + ReflectV1AlphaServiceName + "/"
ReflectMethodName = "ServerReflectionInfo"
)
// protoMessage is used for type assertion on proto messages.
// Generated proto message implements function Descriptor(), but Descriptor()
// is not part of interface proto.Message. This interface is needed to
// call Descriptor().
type protoMessage interface {
Descriptor() ([]byte, []int)
}
func (s *serverReflectionServer) getSymbols() (svcNames []string, symbolIndex map[string]*dpb.FileDescriptorProto) {
s.initSymbols.Do(func() {
serviceInfo := s.s.GetServiceInfo()
s.symbols = map[string]*dpb.FileDescriptorProto{}
s.serviceNames = make([]string, 0, len(serviceInfo))
processed := map[string]struct{}{}
for svc, info := range serviceInfo {
s.serviceNames = append(s.serviceNames, svc)
fdenc, ok := parseMetadata(info.Metadata)
if !ok {
continue
}
fd, err := decodeFileDesc(fdenc)
if err != nil {
continue
}
s.processFile(fd, processed)
}
sort.Strings(s.serviceNames)
})
return s.serviceNames, s.symbols
}
func (s *serverReflectionServer) processFile(fd *dpb.FileDescriptorProto, processed map[string]struct{}) {
filename := fd.GetName()
if _, ok := processed[filename]; ok {
return
}
processed[filename] = struct{}{}
prefix := fd.GetPackage()
for _, msg := range fd.MessageType {
s.processMessage(fd, prefix, msg)
}
for _, en := range fd.EnumType {
s.processEnum(fd, prefix, en)
}
for _, ext := range fd.Extension {
s.processField(fd, prefix, ext)
}
for _, svc := range fd.Service {
svcName := fqn(prefix, svc.GetName())
s.symbols[svcName] = fd
for _, meth := range svc.Method {
name := fqn(svcName, meth.GetName())
s.symbols[name] = fd
}
}
for _, dep := range fd.Dependency {
fdenc := proto.FileDescriptor(dep)
fdDep, err := decodeFileDesc(fdenc)
if err != nil {
continue
}
s.processFile(fdDep, processed)
}
}
func (s *serverReflectionServer) processMessage(fd *dpb.FileDescriptorProto, prefix string, msg *dpb.DescriptorProto) {
msgName := fqn(prefix, msg.GetName())
s.symbols[msgName] = fd
for _, nested := range msg.NestedType {
s.processMessage(fd, msgName, nested)
}
for _, en := range msg.EnumType {
s.processEnum(fd, msgName, en)
}
for _, ext := range msg.Extension {
s.processField(fd, msgName, ext)
}
for _, fld := range msg.Field {
s.processField(fd, msgName, fld)
}
for _, oneof := range msg.OneofDecl {
oneofName := fqn(msgName, oneof.GetName())
s.symbols[oneofName] = fd
}
}
func (s *serverReflectionServer) processEnum(fd *dpb.FileDescriptorProto, prefix string, en *dpb.EnumDescriptorProto) {
enName := fqn(prefix, en.GetName())
s.symbols[enName] = fd
for _, val := range en.Value {
valName := fqn(enName, val.GetName())
s.symbols[valName] = fd
}
}
func (s *serverReflectionServer) processField(fd *dpb.FileDescriptorProto, prefix string, fld *dpb.FieldDescriptorProto) {
fldName := fqn(prefix, fld.GetName())
s.symbols[fldName] = fd
}
func fqn(prefix, name string) string {
if prefix == "" {
return name
}
return prefix + "." + name
}
// fileDescForType gets the file descriptor for the given type.
// The given type should be a proto message.
func (s *serverReflectionServer) fileDescForType(st reflect.Type) (*dpb.FileDescriptorProto, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(protoMessage)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
enc, _ := m.Descriptor()
return decodeFileDesc(enc)
}
// decodeFileDesc does decompression and unmarshalling on the given
// file descriptor byte slice.
func decodeFileDesc(enc []byte) (*dpb.FileDescriptorProto, error) {
raw, err := decompress(enc)
if err != nil {
return nil, fmt.Errorf("failed to decompress enc: %v", err)
}
fd := new(dpb.FileDescriptorProto)
if err := proto.Unmarshal(raw, fd); err != nil {
return nil, fmt.Errorf("bad descriptor: %v", err)
}
return fd, nil
}
// decompress does gzip decompression.
func decompress(b []byte) ([]byte, error) {
r, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("bad gzipped descriptor: %v", err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("bad gzipped descriptor: %v", err)
}
return out, nil
}
func typeForName(name string) (reflect.Type, error) {
pt := proto.MessageType(name)
if pt == nil {
return nil, fmt.Errorf("unknown type: %q", name)
}
st := pt.Elem()
return st, nil
}
func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescriptorProto, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
var extDesc *proto.ExtensionDesc
for id, desc := range proto.RegisteredExtensions(m) {
if id == ext {
extDesc = desc
break
}
}
if extDesc == nil {
return nil, fmt.Errorf("failed to find registered extension for extension number %v", ext)
}
return decodeFileDesc(proto.FileDescriptor(extDesc.Filename))
}
func (s *serverReflectionServer) allExtensionNumbersForType(st reflect.Type) ([]int32, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
exts := proto.RegisteredExtensions(m)
out := make([]int32, 0, len(exts))
for id := range exts {
out = append(out, id)
}
return out, nil
}
// fileDescWithDependencies returns a slice of serialized fileDescriptors in
// wire format ([]byte). The fileDescriptors will include fd and all the
// transitive dependencies of fd with names not in sentFileDescriptors.
func fileDescWithDependencies(fd *dpb.FileDescriptorProto, sentFileDescriptors map[string]bool) ([][]byte, error) {
r := [][]byte{}
queue := []*dpb.FileDescriptorProto{fd}
for len(queue) > 0 {
currentfd := queue[0]
queue = queue[1:]
if sent := sentFileDescriptors[currentfd.GetName()]; len(r) == 0 || !sent {
sentFileDescriptors[currentfd.GetName()] = true
currentfdEncoded, err := proto.Marshal(currentfd)
if err != nil {
return nil, err
}
r = append(r, currentfdEncoded)
}
for _, dep := range currentfd.Dependency {
fdenc := proto.FileDescriptor(dep)
fdDep, err := decodeFileDesc(fdenc)
if err != nil {
continue
}
queue = append(queue, fdDep)
}
}
return r, nil
}
// fileDescEncodingByFilename finds the file descriptor for given filename,
// finds all of its previously unsent transitive dependencies, does marshalling
// on them, and returns the marshalled result.
func (s *serverReflectionServer) fileDescEncodingByFilename(name string, sentFileDescriptors map[string]bool) ([][]byte, error) {
enc := proto.FileDescriptor(name)
if enc == nil {
return nil, fmt.Errorf("unknown file: %v", name)
}
fd, err := decodeFileDesc(enc)
if err != nil {
return nil, err
}
return fileDescWithDependencies(fd, sentFileDescriptors)
}
// parseMetadata finds the file descriptor bytes specified meta.
// For SupportPackageIsVersion4, m is the name of the proto file, we
// call proto.FileDescriptor to get the byte slice.
// For SupportPackageIsVersion3, m is a byte slice itself.
func parseMetadata(meta interface{}) ([]byte, bool) {
// Check if meta is the file name.
if fileNameForMeta, ok := meta.(string); ok {
return proto.FileDescriptor(fileNameForMeta), true
}
// Check if meta is the byte slice.
if enc, ok := meta.([]byte); ok {
return enc, true
}
return nil, false
}
// fileDescEncodingContainingSymbol finds the file descriptor containing the
// given symbol, finds all of its previously unsent transitive dependencies,
// does marshalling on them, and returns the marshalled result. The given symbol
// can be a type, a service or a method.
func (s *serverReflectionServer) fileDescEncodingContainingSymbol(name string, sentFileDescriptors map[string]bool) ([][]byte, error) {
_, symbols := s.getSymbols()
fd := symbols[name]
if fd == nil {
// Check if it's a type name that was not present in the
// transitive dependencies of the registered services.
if st, err := typeForName(name); err == nil {
fd, err = s.fileDescForType(st)
if err != nil {
return nil, err
}
}
}
if fd == nil {
return nil, fmt.Errorf("unknown symbol: %v", name)
}
return fileDescWithDependencies(fd, sentFileDescriptors)
}
// fileDescEncodingContainingExtension finds the file descriptor containing
// given extension, finds all of its previously unsent transitive dependencies,
// does marshalling on them, and returns the marshalled result.
func (s *serverReflectionServer) fileDescEncodingContainingExtension(typeName string, extNum int32, sentFileDescriptors map[string]bool) ([][]byte, error) {
st, err := typeForName(typeName)
if err != nil {
return nil, err
}
fd, err := fileDescContainingExtension(st, extNum)
if err != nil {
return nil, err
}
return fileDescWithDependencies(fd, sentFileDescriptors)
}
// allExtensionNumbersForTypeName returns all extension numbers for the given type.
func (s *serverReflectionServer) allExtensionNumbersForTypeName(name string) ([]int32, error) {
st, err := typeForName(name)
if err != nil {
return nil, err
}
extNums, err := s.allExtensionNumbersForType(st)
if err != nil {
return nil, err
}
return extNums, nil
}
// ServerReflectionInfo is the reflection service handler.
func (s *serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflection_ServerReflectionInfoServer) error {
sentFileDescriptors := make(map[string]bool)
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
out := &rpb.ServerReflectionResponse{
ValidHost: in.Host,
OriginalRequest: in,
}
switch req := in.MessageRequest.(type) {
case *rpb.ServerReflectionRequest_FileByFilename:
b, err := s.fileDescEncodingByFilename(req.FileByFilename, sentFileDescriptors)
if err != nil {
out.MessageResponse = &rpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &rpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &rpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *rpb.ServerReflectionRequest_FileContainingSymbol:
b, err := s.fileDescEncodingContainingSymbol(req.FileContainingSymbol, sentFileDescriptors)
if err != nil {
out.MessageResponse = &rpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &rpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &rpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *rpb.ServerReflectionRequest_FileContainingExtension:
typeName := req.FileContainingExtension.ContainingType
extNum := req.FileContainingExtension.ExtensionNumber
b, err := s.fileDescEncodingContainingExtension(typeName, extNum, sentFileDescriptors)
if err != nil {
out.MessageResponse = &rpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &rpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &rpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *rpb.ServerReflectionRequest_AllExtensionNumbersOfType:
extNums, err := s.allExtensionNumbersForTypeName(req.AllExtensionNumbersOfType)
if err != nil {
out.MessageResponse = &rpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &rpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_AllExtensionNumbersResponse{
AllExtensionNumbersResponse: &rpb.ExtensionNumberResponse{
BaseTypeName: req.AllExtensionNumbersOfType,
ExtensionNumber: extNums,
},
}
}
case *rpb.ServerReflectionRequest_ListServices:
svcNames, _ := s.getSymbols()
serviceResponses := make([]*rpb.ServiceResponse, len(svcNames))
for i, n := range svcNames {
serviceResponses[i] = &rpb.ServiceResponse{
Name: n,
}
}
out.MessageResponse = &rpb.ServerReflectionResponse_ListServicesResponse{
ListServicesResponse: &rpb.ListServiceResponse{
Service: serviceResponses,
},
}
default:
return status.Errorf(codes.InvalidArgument, "invalid MessageRequest: %v", in.MessageRequest)
}
if err := stream.Send(out); err != nil {
return err
}
}
}

View File

@@ -1,489 +0,0 @@
//go:build ignore
// +build ignore
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
/*
Package reflection implements server reflection service.
The service implemented is defined in:
https://github.com/grpc/grpc/blob/master/src/proto/grpc/reflection/v1alpha/reflection.proto.
To register server reflection on a gRPC server:
import "google.golang.org/grpc/reflection"
s := grpc.NewServer()
pb.RegisterYourOwnServer(s, &server{})
// Register reflection service on gRPC server.
reflection.Register(s)
s.Serve(lis)
*/
package grpc
import (
"bytes"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"reflect"
"sort"
"sync"
"github.com/golang/protobuf/proto"
dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/grpc/status"
)
type serverReflectionServer struct {
rpb.UnimplementedServerReflectionServer
s *grpc.Server
initSymbols sync.Once
serviceNames []string
symbols map[string]*dpb.FileDescriptorProto // map of fully-qualified names to files
}
// Register registers the server reflection service on the given gRPC server.
func Register(s *grpc.Server) {
rpb.RegisterServerReflectionServer(s, &serverReflectionServer{
s: s,
})
}
// protoMessage is used for type assertion on proto messages.
// Generated proto message implements function Descriptor(), but Descriptor()
// is not part of interface proto.Message. This interface is needed to
// call Descriptor().
type protoMessage interface {
Descriptor() ([]byte, []int)
}
func (s *serverReflectionServer) getSymbols() (svcNames []string, symbolIndex map[string]*dpb.FileDescriptorProto) {
s.initSymbols.Do(func() {
serviceInfo := s.s.GetServiceInfo()
s.symbols = map[string]*dpb.FileDescriptorProto{}
s.serviceNames = make([]string, 0, len(serviceInfo))
processed := map[string]struct{}{}
for svc, info := range serviceInfo {
s.serviceNames = append(s.serviceNames, svc)
fdenc, ok := parseMetadata(info.Metadata)
if !ok {
continue
}
fd, err := decodeFileDesc(fdenc)
if err != nil {
continue
}
s.processFile(fd, processed)
}
sort.Strings(s.serviceNames)
})
return s.serviceNames, s.symbols
}
func (s *serverReflectionServer) processFile(fd *dpb.FileDescriptorProto, processed map[string]struct{}) {
filename := fd.GetName()
if _, ok := processed[filename]; ok {
return
}
processed[filename] = struct{}{}
prefix := fd.GetPackage()
for _, msg := range fd.MessageType {
s.processMessage(fd, prefix, msg)
}
for _, en := range fd.EnumType {
s.processEnum(fd, prefix, en)
}
for _, ext := range fd.Extension {
s.processField(fd, prefix, ext)
}
for _, svc := range fd.Service {
svcName := fqn(prefix, svc.GetName())
s.symbols[svcName] = fd
for _, meth := range svc.Method {
name := fqn(svcName, meth.GetName())
s.symbols[name] = fd
}
}
for _, dep := range fd.Dependency {
fdenc := proto.FileDescriptor(dep)
fdDep, err := decodeFileDesc(fdenc)
if err != nil {
continue
}
s.processFile(fdDep, processed)
}
}
func (s *serverReflectionServer) processMessage(fd *dpb.FileDescriptorProto, prefix string, msg *dpb.DescriptorProto) {
msgName := fqn(prefix, msg.GetName())
s.symbols[msgName] = fd
for _, nested := range msg.NestedType {
s.processMessage(fd, msgName, nested)
}
for _, en := range msg.EnumType {
s.processEnum(fd, msgName, en)
}
for _, ext := range msg.Extension {
s.processField(fd, msgName, ext)
}
for _, fld := range msg.Field {
s.processField(fd, msgName, fld)
}
for _, oneof := range msg.OneofDecl {
oneofName := fqn(msgName, oneof.GetName())
s.symbols[oneofName] = fd
}
}
func (s *serverReflectionServer) processEnum(fd *dpb.FileDescriptorProto, prefix string, en *dpb.EnumDescriptorProto) {
enName := fqn(prefix, en.GetName())
s.symbols[enName] = fd
for _, val := range en.Value {
valName := fqn(enName, val.GetName())
s.symbols[valName] = fd
}
}
func (s *serverReflectionServer) processField(fd *dpb.FileDescriptorProto, prefix string, fld *dpb.FieldDescriptorProto) {
fldName := fqn(prefix, fld.GetName())
s.symbols[fldName] = fd
}
func fqn(prefix, name string) string {
if prefix == "" {
return name
}
return prefix + "." + name
}
// fileDescForType gets the file descriptor for the given type.
// The given type should be a proto message.
func (s *serverReflectionServer) fileDescForType(st reflect.Type) (*dpb.FileDescriptorProto, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(protoMessage)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
enc, _ := m.Descriptor()
return decodeFileDesc(enc)
}
// decodeFileDesc does decompression and unmarshalling on the given
// file descriptor byte slice.
func decodeFileDesc(enc []byte) (*dpb.FileDescriptorProto, error) {
raw, err := decompress(enc)
if err != nil {
return nil, fmt.Errorf("failed to decompress enc: %v", err)
}
fd := new(dpb.FileDescriptorProto)
if err := proto.Unmarshal(raw, fd); err != nil {
return nil, fmt.Errorf("bad descriptor: %v", err)
}
return fd, nil
}
// decompress does gzip decompression.
func decompress(b []byte) ([]byte, error) {
r, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("bad gzipped descriptor: %v", err)
}
out, err := ioutil.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("bad gzipped descriptor: %v", err)
}
return out, nil
}
func typeForName(name string) (reflect.Type, error) {
pt := proto.MessageType(name)
if pt == nil {
return nil, fmt.Errorf("unknown type: %q", name)
}
st := pt.Elem()
return st, nil
}
func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescriptorProto, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
var extDesc *proto.ExtensionDesc
for id, desc := range proto.RegisteredExtensions(m) {
if id == ext {
extDesc = desc
break
}
}
if extDesc == nil {
return nil, fmt.Errorf("failed to find registered extension for extension number %v", ext)
}
return decodeFileDesc(proto.FileDescriptor(extDesc.Filename))
}
func (s *serverReflectionServer) allExtensionNumbersForType(st reflect.Type) ([]int32, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
exts := proto.RegisteredExtensions(m)
out := make([]int32, 0, len(exts))
for id := range exts {
out = append(out, id)
}
return out, nil
}
// fileDescWithDependencies returns a slice of serialized fileDescriptors in
// wire format ([]byte). The fileDescriptors will include fd and all the
// transitive dependencies of fd with names not in sentFileDescriptors.
func fileDescWithDependencies(fd *dpb.FileDescriptorProto, sentFileDescriptors map[string]bool) ([][]byte, error) {
r := [][]byte{}
queue := []*dpb.FileDescriptorProto{fd}
for len(queue) > 0 {
currentfd := queue[0]
queue = queue[1:]
if sent := sentFileDescriptors[currentfd.GetName()]; len(r) == 0 || !sent {
sentFileDescriptors[currentfd.GetName()] = true
currentfdEncoded, err := proto.Marshal(currentfd)
if err != nil {
return nil, err
}
r = append(r, currentfdEncoded)
}
for _, dep := range currentfd.Dependency {
fdenc := proto.FileDescriptor(dep)
fdDep, err := decodeFileDesc(fdenc)
if err != nil {
continue
}
queue = append(queue, fdDep)
}
}
return r, nil
}
// fileDescEncodingByFilename finds the file descriptor for given filename,
// finds all of its previously unsent transitive dependencies, does marshalling
// on them, and returns the marshalled result.
func (s *serverReflectionServer) fileDescEncodingByFilename(name string, sentFileDescriptors map[string]bool) ([][]byte, error) {
enc := proto.FileDescriptor(name)
if enc == nil {
return nil, fmt.Errorf("unknown file: %v", name)
}
fd, err := decodeFileDesc(enc)
if err != nil {
return nil, err
}
return fileDescWithDependencies(fd, sentFileDescriptors)
}
// parseMetadata finds the file descriptor bytes specified meta.
// For SupportPackageIsVersion4, m is the name of the proto file, we
// call proto.FileDescriptor to get the byte slice.
// For SupportPackageIsVersion3, m is a byte slice itself.
func parseMetadata(meta interface{}) ([]byte, bool) {
// Check if meta is the file name.
if fileNameForMeta, ok := meta.(string); ok {
return proto.FileDescriptor(fileNameForMeta), true
}
// Check if meta is the byte slice.
if enc, ok := meta.([]byte); ok {
return enc, true
}
return nil, false
}
// fileDescEncodingContainingSymbol finds the file descriptor containing the
// given symbol, finds all of its previously unsent transitive dependencies,
// does marshalling on them, and returns the marshalled result. The given symbol
// can be a type, a service or a method.
func (s *serverReflectionServer) fileDescEncodingContainingSymbol(name string, sentFileDescriptors map[string]bool) ([][]byte, error) {
_, symbols := s.getSymbols()
fd := symbols[name]
if fd == nil {
// Check if it's a type name that was not present in the
// transitive dependencies of the registered services.
if st, err := typeForName(name); err == nil {
fd, err = s.fileDescForType(st)
if err != nil {
return nil, err
}
}
}
if fd == nil {
return nil, fmt.Errorf("unknown symbol: %v", name)
}
return fileDescWithDependencies(fd, sentFileDescriptors)
}
// fileDescEncodingContainingExtension finds the file descriptor containing
// given extension, finds all of its previously unsent transitive dependencies,
// does marshalling on them, and returns the marshalled result.
func (s *serverReflectionServer) fileDescEncodingContainingExtension(typeName string, extNum int32, sentFileDescriptors map[string]bool) ([][]byte, error) {
st, err := typeForName(typeName)
if err != nil {
return nil, err
}
fd, err := fileDescContainingExtension(st, extNum)
if err != nil {
return nil, err
}
return fileDescWithDependencies(fd, sentFileDescriptors)
}
// allExtensionNumbersForTypeName returns all extension numbers for the given type.
func (s *serverReflectionServer) allExtensionNumbersForTypeName(name string) ([]int32, error) {
st, err := typeForName(name)
if err != nil {
return nil, err
}
extNums, err := s.allExtensionNumbersForType(st)
if err != nil {
return nil, err
}
return extNums, nil
}
// ServerReflectionInfo is the reflection service handler.
func (s *serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflection_ServerReflectionInfoServer) error {
sentFileDescriptors := make(map[string]bool)
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
out := &rpb.ServerReflectionResponse{
ValidHost: in.Host,
OriginalRequest: in,
}
switch req := in.MessageRequest.(type) {
case *rpb.ServerReflectionRequest_FileByFilename:
b, err := s.fileDescEncodingByFilename(req.FileByFilename, sentFileDescriptors)
if err != nil {
out.MessageResponse = &rpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &rpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &rpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *rpb.ServerReflectionRequest_FileContainingSymbol:
b, err := s.fileDescEncodingContainingSymbol(req.FileContainingSymbol, sentFileDescriptors)
if err != nil {
out.MessageResponse = &rpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &rpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &rpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *rpb.ServerReflectionRequest_FileContainingExtension:
typeName := req.FileContainingExtension.ContainingType
extNum := req.FileContainingExtension.ExtensionNumber
b, err := s.fileDescEncodingContainingExtension(typeName, extNum, sentFileDescriptors)
if err != nil {
out.MessageResponse = &rpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &rpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &rpb.FileDescriptorResponse{FileDescriptorProto: b},
}
}
case *rpb.ServerReflectionRequest_AllExtensionNumbersOfType:
extNums, err := s.allExtensionNumbersForTypeName(req.AllExtensionNumbersOfType)
if err != nil {
out.MessageResponse = &rpb.ServerReflectionResponse_ErrorResponse{
ErrorResponse: &rpb.ErrorResponse{
ErrorCode: int32(codes.NotFound),
ErrorMessage: err.Error(),
},
}
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_AllExtensionNumbersResponse{
AllExtensionNumbersResponse: &rpb.ExtensionNumberResponse{
BaseTypeName: req.AllExtensionNumbersOfType,
ExtensionNumber: extNums,
},
}
}
case *rpb.ServerReflectionRequest_ListServices:
svcNames, _ := s.getSymbols()
serviceResponses := make([]*rpb.ServiceResponse, len(svcNames))
for i, n := range svcNames {
serviceResponses[i] = &rpb.ServiceResponse{
Name: n,
}
}
out.MessageResponse = &rpb.ServerReflectionResponse_ListServicesResponse{
ListServicesResponse: &rpb.ListServiceResponse{
Service: serviceResponses,
},
}
default:
return status.Errorf(codes.InvalidArgument, "invalid MessageRequest: %v", in.MessageRequest)
}
if err := stream.Send(out); err != nil {
return err
}
}
}

View File

@@ -132,6 +132,9 @@ func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Ha
hdr := make(map[string]string, len(msg.Header))
for k, v := range msg.Header {
if k == "Content-Type" {
continue
}
hdr[k] = v
}