Compare commits
4 Commits
v3.10.3
...
b978e58cf9
| Author | SHA1 | Date | |
|---|---|---|---|
| b978e58cf9 | |||
| a793983ed2 | |||
| d646deb468 | |||
| 468819f0a0 |
60
grpc.go
60
grpc.go
@@ -718,31 +718,6 @@ func (g *Server) Register() error {
|
|||||||
g.Lock()
|
g.Lock()
|
||||||
defer g.Unlock()
|
defer g.Unlock()
|
||||||
|
|
||||||
for sb := range g.subscribers {
|
|
||||||
handler := g.createSubHandler(sb, config)
|
|
||||||
var opts []broker.SubscribeOption
|
|
||||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
|
||||||
opts = append(opts, broker.SubscribeGroup(queue))
|
|
||||||
}
|
|
||||||
|
|
||||||
subCtx := config.Context
|
|
||||||
if cx := sb.Options().Context; cx != nil {
|
|
||||||
subCtx = cx
|
|
||||||
}
|
|
||||||
opts = append(opts, broker.SubscribeContext(subCtx))
|
|
||||||
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
|
||||||
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
|
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
|
||||||
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
|
|
||||||
}
|
|
||||||
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
g.subscribers[sb] = []broker.Subscriber{sub}
|
|
||||||
}
|
|
||||||
|
|
||||||
g.registered = true
|
g.registered = true
|
||||||
g.rsvc = service
|
g.rsvc = service
|
||||||
|
|
||||||
@@ -876,6 +851,10 @@ func (g *Server) Start() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = g.subscribe(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// micro: go ts.Accept(s.accept)
|
// micro: go ts.Accept(s.accept)
|
||||||
go func() {
|
go func() {
|
||||||
if err = g.srv.Serve(ts); err != nil {
|
if err = g.srv.Serve(ts); err != nil {
|
||||||
@@ -987,6 +966,37 @@ func (g *Server) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *Server) subscribe() error {
|
||||||
|
config := g.opts
|
||||||
|
|
||||||
|
for sb := range g.subscribers {
|
||||||
|
handler := g.createSubHandler(sb, config)
|
||||||
|
var opts []broker.SubscribeOption
|
||||||
|
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||||
|
opts = append(opts, broker.SubscribeGroup(queue))
|
||||||
|
}
|
||||||
|
|
||||||
|
subCtx := config.Context
|
||||||
|
if cx := sb.Options().Context; cx != nil {
|
||||||
|
subCtx = cx
|
||||||
|
}
|
||||||
|
opts = append(opts, broker.SubscribeContext(subCtx))
|
||||||
|
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
||||||
|
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
|
||||||
|
}
|
||||||
|
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
g.subscribers[sb] = []broker.Subscriber{sub}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (g *Server) Stop() error {
|
func (g *Server) Stop() error {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
if !g.started {
|
if !g.started {
|
||||||
|
|||||||
166
quic/quic_net.go
Normal file
166
quic/quic_net.go
Normal file
@@ -0,0 +1,166 @@
|
|||||||
|
// grpc over quic mostly based on https://github.com/sssgun/grpc-quic
|
||||||
|
// copyright sssgun with MIT license
|
||||||
|
package grpcquic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
quic "github.com/quic-go/quic-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Connection
|
||||||
|
|
||||||
|
var _ net.Conn = (*Conn)(nil)
|
||||||
|
|
||||||
|
type Conn struct {
|
||||||
|
conn quic.Connection
|
||||||
|
stream quic.Stream
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConn(conn quic.Connection) (net.Conn, error) {
|
||||||
|
stream, err := conn.OpenStreamSync(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Conn{conn, stream}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read reads data from the connection.
|
||||||
|
// Read can be made to time out and return an Error with Timeout() == true
|
||||||
|
// after a fixed time limit; see SetDeadline and SetReadDeadline.
|
||||||
|
func (c *Conn) Read(b []byte) (n int, err error) {
|
||||||
|
return c.stream.Read(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write writes data to the connection.
|
||||||
|
// Write can be made to time out and return an Error with Timeout() == true
|
||||||
|
// after a fixed time limit; see SetDeadline and SetWriteDeadline.
|
||||||
|
func (c *Conn) Write(b []byte) (n int, err error) {
|
||||||
|
return c.stream.Write(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the connection.
|
||||||
|
// Any blocked Read or Write operations will be unblocked and return errors.
|
||||||
|
func (c *Conn) Close() error {
|
||||||
|
// @TODO: log this
|
||||||
|
c.stream.Close()
|
||||||
|
|
||||||
|
return c.conn.CloseWithError(0, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
// LocalAddr returns the local network address.
|
||||||
|
func (c *Conn) LocalAddr() net.Addr {
|
||||||
|
return c.conn.LocalAddr()
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoteAddr returns the remote network address.
|
||||||
|
func (c *Conn) RemoteAddr() net.Addr {
|
||||||
|
return c.conn.RemoteAddr()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetDeadline sets the read and write deadlines associated
|
||||||
|
// with the connection. It is equivalent to calling both
|
||||||
|
// SetReadDeadline and SetWriteDeadline.
|
||||||
|
//
|
||||||
|
// A deadline is an absolute time after which I/O operations
|
||||||
|
// fail with a timeout (see type Error) instead of
|
||||||
|
// blocking. The deadline applies to all future and pending
|
||||||
|
// I/O, not just the immediately following call to Read or
|
||||||
|
// Write. After a deadline has been exceeded, the connection
|
||||||
|
// can be refreshed by setting a deadline in the future.
|
||||||
|
//
|
||||||
|
// An idle timeout can be implemented by repeatedly extending
|
||||||
|
// the deadline after successful Read or Write calls.
|
||||||
|
//
|
||||||
|
// A zero value for t means I/O operations will not time out.
|
||||||
|
func (c *Conn) SetDeadline(t time.Time) error {
|
||||||
|
return c.stream.SetDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetReadDeadline sets the deadline for future Read calls
|
||||||
|
// and any currently-blocked Read call.
|
||||||
|
// A zero value for t means Read will not time out.
|
||||||
|
func (c *Conn) SetReadDeadline(t time.Time) error {
|
||||||
|
return c.stream.SetReadDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetWriteDeadline sets the deadline for future Write calls
|
||||||
|
// and any currently-blocked Write call.
|
||||||
|
// Even if write times out, it may return n > 0, indicating that
|
||||||
|
// some of the data was successfully written.
|
||||||
|
// A zero value for t means Write will not time out.
|
||||||
|
func (c *Conn) SetWriteDeadline(t time.Time) error {
|
||||||
|
return c.stream.SetWriteDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Listener
|
||||||
|
|
||||||
|
var _ net.Listener = (*Listener)(nil)
|
||||||
|
|
||||||
|
type Listener struct {
|
||||||
|
ql quic.Listener
|
||||||
|
}
|
||||||
|
|
||||||
|
func Listen(ql quic.Listener) net.Listener {
|
||||||
|
return &Listener{ql}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Accept waits for and returns the next connection to the listener.
|
||||||
|
func (l *Listener) Accept() (net.Conn, error) {
|
||||||
|
sess, err := l.ql.Accept(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, err := sess.AcceptStream(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Conn{sess, stream}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the listener.
|
||||||
|
// Any blocked Accept operations will be unblocked and return errors.
|
||||||
|
func (l *Listener) Close() error {
|
||||||
|
return l.ql.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Addr returns the listener's network address.
|
||||||
|
func (l *Listener) Addr() net.Addr {
|
||||||
|
return l.ql.Addr()
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Dialer
|
||||||
|
|
||||||
|
var QuicConfig = &quic.Config{
|
||||||
|
KeepAlivePeriod: 10 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPacketConn(addr string) (net.PacketConn, error) {
|
||||||
|
// create a packet conn for outgoing connections
|
||||||
|
udpAddr, err := net.ResolveUDPAddr("udp", addr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return net.ListenUDP("udp", udpAddr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewQuicDialer(tlsConf *tls.Config) func(context.Context, string) (net.Conn, error) {
|
||||||
|
return func(ctx context.Context, target string) (net.Conn, error) {
|
||||||
|
sess, err := quic.DialAddr(ctx, target, tlsConf, QuicConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return NewConn(sess)
|
||||||
|
}
|
||||||
|
}
|
||||||
117
quic/quic_transport.go
Normal file
117
quic/quic_transport.go
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
// grpc over quic mostly based on https://github.com/sssgun/grpc-quic
|
||||||
|
// copyright sssgun with MIT license
|
||||||
|
package grpcquic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"net"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/credentials"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ credentials.AuthInfo = (*Info)(nil)
|
||||||
|
|
||||||
|
// Info contains the auth information
|
||||||
|
type Info struct {
|
||||||
|
conn *Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewInfo(c *Conn) *Info {
|
||||||
|
return &Info{c}
|
||||||
|
}
|
||||||
|
|
||||||
|
// AuthType returns the type of Info as a string.
|
||||||
|
func (i *Info) AuthType() string {
|
||||||
|
return "quic-tls"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Info) Conn() net.Conn {
|
||||||
|
return i.conn
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ credentials.TransportCredentials = (*Credentials)(nil)
|
||||||
|
|
||||||
|
type Credentials struct {
|
||||||
|
tlsConfig *tls.Config
|
||||||
|
isQuicConnection bool
|
||||||
|
serverName string
|
||||||
|
|
||||||
|
grpcCreds credentials.TransportCredentials
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCredentials(tlsConfig *tls.Config) credentials.TransportCredentials {
|
||||||
|
grpcCreds := credentials.NewTLS(tlsConfig)
|
||||||
|
return &Credentials{
|
||||||
|
grpcCreds: grpcCreds,
|
||||||
|
tlsConfig: tlsConfig,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClientHandshake does the authentication handshake specified by the corresponding
|
||||||
|
// authentication protocol on rawConn for clients. It returns the authenticated
|
||||||
|
// connection and the corresponding auth information about the connection.
|
||||||
|
// Implementations must use the provided context to implement timely cancellation.
|
||||||
|
// gRPC will try to reconnect if the error returned is a temporary error
|
||||||
|
// (io.EOF, context.DeadlineExceeded or err.Temporary() == true).
|
||||||
|
// If the returned error is a wrapper error, implementations should make sure that
|
||||||
|
// the error implements Temporary() to have the correct retry behaviors.
|
||||||
|
//
|
||||||
|
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
|
||||||
|
func (pt *Credentials) ClientHandshake(ctx context.Context, authority string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
|
||||||
|
if c, ok := conn.(*Conn); ok {
|
||||||
|
pt.isQuicConnection = true
|
||||||
|
return conn, NewInfo(c), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return pt.grpcCreds.ClientHandshake(ctx, authority, conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServerHandshake does the authentication handshake for servers. It returns
|
||||||
|
// the authenticated connection and the corresponding auth information about
|
||||||
|
// the connection.
|
||||||
|
//
|
||||||
|
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
|
||||||
|
func (pt *Credentials) ServerHandshake(conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
|
||||||
|
if c, ok := conn.(*Conn); ok {
|
||||||
|
pt.isQuicConnection = true
|
||||||
|
ainfo := NewInfo(c)
|
||||||
|
return conn, ainfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return pt.grpcCreds.ServerHandshake(conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Info provides the ProtocolInfo of this Credentials.
|
||||||
|
func (pt *Credentials) Info() credentials.ProtocolInfo {
|
||||||
|
if pt.isQuicConnection {
|
||||||
|
return credentials.ProtocolInfo{
|
||||||
|
// ProtocolVersion is the gRPC wire protocol version.
|
||||||
|
ProtocolVersion: "/quic/1.0.0",
|
||||||
|
// SecurityProtocol is the security protocol in use.
|
||||||
|
SecurityProtocol: "quic-tls",
|
||||||
|
// SecurityVersion is the security protocol version.
|
||||||
|
// SecurityVersion: "1.2.0",
|
||||||
|
// ServerName is the user-configured server name.
|
||||||
|
ServerName: pt.serverName,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return pt.grpcCreds.Info()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clone makes a copy of this Credentials.
|
||||||
|
func (pt *Credentials) Clone() credentials.TransportCredentials {
|
||||||
|
return &Credentials{
|
||||||
|
tlsConfig: pt.tlsConfig.Clone(),
|
||||||
|
grpcCreds: pt.grpcCreds.Clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// OverrideServerName overrides the server name used to verify the hostname on the returned certificates from the server.
|
||||||
|
// gRPC internals also use it to override the virtual hosting name if it is set.
|
||||||
|
// It must be called before dialing. Currently, this is only used by grpclb.
|
||||||
|
func (pt *Credentials) OverrideServerName(name string) error {
|
||||||
|
pt.serverName = name
|
||||||
|
return pt.grpcCreds.OverrideServerName(name)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user