Compare commits
13 Commits
Author | SHA1 | Date | |
---|---|---|---|
241146cb6f | |||
3941a76333 | |||
941bce3ee4 | |||
|
da7980dbee | ||
00b616a35d | |||
9907ff90f5 | |||
4003d714ba | |||
c23433a49c | |||
fbb9e63df7 | |||
1213bace75 | |||
3844d9484c | |||
b978e58cf9 | |||
a793983ed2 |
24
.gitignore
vendored
Normal file
24
.gitignore
vendored
Normal file
@ -0,0 +1,24 @@
|
||||
# Binaries for programs and plugins
|
||||
*.exe
|
||||
*.exe~
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
bin
|
||||
|
||||
# Test binary, built with `go test -c`
|
||||
*.test
|
||||
|
||||
# Output of the go coverage tool, specifically when used with LiteIDE
|
||||
*.out
|
||||
|
||||
# Dependency directories (remove the comment below to include it)
|
||||
# vendor/
|
||||
|
||||
# Go workspace file
|
||||
go.work
|
||||
|
||||
# General
|
||||
.DS_Store
|
||||
.idea
|
||||
.vscode
|
29
go.mod
29
go.mod
@ -1,11 +1,28 @@
|
||||
module go.unistack.org/micro-server-grpc/v3
|
||||
|
||||
go 1.16
|
||||
go 1.20
|
||||
|
||||
|
||||
require (
|
||||
github.com/golang/protobuf v1.5.2
|
||||
go.unistack.org/micro/v3 v3.10.14
|
||||
golang.org/x/net v0.5.0
|
||||
google.golang.org/grpc v1.52.3
|
||||
google.golang.org/protobuf v1.28.1
|
||||
github.com/golang/protobuf v1.5.4
|
||||
github.com/quic-go/quic-go v0.41.0
|
||||
go.unistack.org/micro/v3 v3.10.48
|
||||
golang.org/x/net v0.22.0
|
||||
google.golang.org/grpc v1.62.1
|
||||
google.golang.org/protobuf v1.33.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
|
||||
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect
|
||||
github.com/onsi/ginkgo/v2 v2.16.0 // indirect
|
||||
github.com/stretchr/testify v1.8.3 // indirect
|
||||
go.uber.org/mock v0.4.0 // indirect
|
||||
golang.org/x/crypto v0.21.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 // indirect
|
||||
golang.org/x/mod v0.16.0 // indirect
|
||||
golang.org/x/sys v0.18.0 // indirect
|
||||
golang.org/x/text v0.14.0 // indirect
|
||||
golang.org/x/tools v0.19.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240304212257-790db918fca8 // indirect
|
||||
)
|
||||
|
126
grpc.go
126
grpc.go
@ -7,7 +7,6 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -20,9 +19,11 @@ import (
|
||||
"go.unistack.org/micro/v3/codec"
|
||||
"go.unistack.org/micro/v3/errors"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
metadata "go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
"go.unistack.org/micro/v3/semconv"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
msync "go.unistack.org/micro/v3/sync"
|
||||
"golang.org/x/net/netutil"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@ -45,14 +46,15 @@ type ServerReflection struct {
|
||||
*/
|
||||
|
||||
type Server struct {
|
||||
handlers map[string]server.Handler
|
||||
srv *grpc.Server
|
||||
exit chan chan error
|
||||
wg *sync.WaitGroup
|
||||
rsvc *register.Service
|
||||
subscribers map[*subscriber][]broker.Subscriber
|
||||
rpc *rServer
|
||||
opts server.Options
|
||||
handlers map[string]server.Handler
|
||||
srv *grpc.Server
|
||||
exit chan chan error
|
||||
wg *msync.WaitGroup
|
||||
rsvc *register.Service
|
||||
subscribers map[*subscriber][]broker.Subscriber
|
||||
rpc *rServer
|
||||
opts server.Options
|
||||
unknownHandler grpc.StreamHandler
|
||||
sync.RWMutex
|
||||
started bool
|
||||
registered bool
|
||||
@ -157,6 +159,10 @@ func (g *Server) configure(opts ...server.Option) error {
|
||||
g.reflection = v
|
||||
}
|
||||
|
||||
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
|
||||
g.unknownHandler = h
|
||||
}
|
||||
|
||||
if restart {
|
||||
return g.Start()
|
||||
}
|
||||
@ -188,37 +194,35 @@ func (g *Server) getGrpcOptions() []grpc.ServerOption {
|
||||
return opts
|
||||
}
|
||||
|
||||
func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) {
|
||||
func (g *Server) handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
var err error
|
||||
|
||||
fullMethod, ok := grpc.MethodFromServerStream(stream)
|
||||
if !ok {
|
||||
return status.Errorf(codes.Internal, "method does not exist in context")
|
||||
}
|
||||
|
||||
ts := time.Now()
|
||||
g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Inc()
|
||||
defer func() {
|
||||
te := time.Since(ts)
|
||||
g.opts.Meter.Summary(semconv.ServerRequestLatencyMicroseconds, "endpoint", fullMethod).Update(te.Seconds())
|
||||
g.opts.Meter.Histogram(semconv.ServerRequestDurationSeconds, "endpoint", fullMethod).Update(te.Seconds())
|
||||
g.opts.Meter.Counter(semconv.ServerRequestInflight, "endpoint", fullMethod).Dec()
|
||||
|
||||
st := status.Convert(err)
|
||||
if st == nil || st.Code() == codes.OK {
|
||||
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "success", "code", strconv.Itoa(int(codes.OK))).Inc()
|
||||
} else {
|
||||
g.opts.Meter.Counter(semconv.ServerRequestTotal, "endpoint", fullMethod, "status", "failure", "code", strconv.Itoa(int(st.Code()))).Inc()
|
||||
}
|
||||
}()
|
||||
|
||||
serviceName, methodName, err := serviceMethod(fullMethod)
|
||||
if err != nil {
|
||||
return status.New(codes.InvalidArgument, err.Error()).Err()
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
g.RLock()
|
||||
config := g.opts
|
||||
g.RUnlock()
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(config.Context, "panic in %s.%s recovered: %v", serviceName, methodName, r)
|
||||
config.Logger.Error(config.Context, string(debug.Stack()))
|
||||
}
|
||||
err = errors.InternalServerError(g.opts.Name, "panic in %s.%s recovered: %v", serviceName, methodName, r)
|
||||
} else if err != nil {
|
||||
g.RLock()
|
||||
config := g.opts
|
||||
g.RUnlock()
|
||||
if config.Logger.V(logger.ErrorLevel) {
|
||||
config.Logger.Errorf(config.Context, "grpc handler %s.%s got error: %s", serviceName, methodName, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if g.wg != nil {
|
||||
g.wg.Add(1)
|
||||
defer g.wg.Done()
|
||||
@ -234,6 +238,7 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
|
||||
for k, v := range gmd {
|
||||
md.Set(k, strings.Join(v, ", "))
|
||||
}
|
||||
md.Set("Path", fullMethod)
|
||||
|
||||
var td string
|
||||
// timeout for server deadline
|
||||
@ -315,20 +320,16 @@ func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error)
|
||||
*/
|
||||
|
||||
if svc == nil {
|
||||
if g.opts.Context != nil {
|
||||
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
|
||||
return h(srv, stream)
|
||||
}
|
||||
if g.unknownHandler != nil {
|
||||
return g.unknownHandler(srv, stream)
|
||||
}
|
||||
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service %s", serviceName)).Err()
|
||||
}
|
||||
|
||||
mtype := svc.method[methodName]
|
||||
if mtype == nil {
|
||||
if g.opts.Context != nil {
|
||||
if h, ok := g.opts.Context.Value(unknownServiceHandlerKey{}).(grpc.StreamHandler); ok {
|
||||
return h(srv, stream)
|
||||
}
|
||||
if g.unknownHandler != nil {
|
||||
return g.unknownHandler(srv, stream)
|
||||
}
|
||||
return status.New(codes.Unimplemented, fmt.Sprintf("unknown service method %s.%s", serviceName, methodName)).Err()
|
||||
}
|
||||
@ -443,53 +444,8 @@ func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, s
|
||||
}
|
||||
|
||||
return status.New(statusCode, statusDesc).Err()
|
||||
// }
|
||||
}
|
||||
|
||||
/*
|
||||
type reflectStream struct {
|
||||
stream server.Stream
|
||||
}
|
||||
|
||||
func (s *reflectStream) Send(rsp *grpcreflect.ServerReflectionResponse) error {
|
||||
return s.stream.Send(rsp)
|
||||
}
|
||||
|
||||
func (s *reflectStream) Recv() (*grpcreflect.ServerReflectionRequest, error) {
|
||||
req := &grpcreflect.ServerReflectionRequest{}
|
||||
err := s.stream.Recv(req)
|
||||
return req, err
|
||||
}
|
||||
|
||||
func (s *reflectStream) SetHeader(gmetadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *reflectStream) SendHeader(gmetadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *reflectStream) SetTrailer(gmetadata.MD) {
|
||||
|
||||
}
|
||||
|
||||
func (s *reflectStream) Context() context.Context {
|
||||
return s.stream.Context()
|
||||
}
|
||||
|
||||
func (s *reflectStream) SendMsg(m interface{}) error {
|
||||
return s.stream.Send(m)
|
||||
}
|
||||
|
||||
func (s *reflectStream) RecvMsg(m interface{}) error {
|
||||
return s.stream.Recv(m)
|
||||
}
|
||||
|
||||
func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
|
||||
return g.s.ServerReflectionInfo(&reflectStream{stream})
|
||||
}
|
||||
*/
|
||||
|
||||
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||
opts := g.opts
|
||||
|
||||
@ -928,7 +884,7 @@ func (g *Server) Start() error {
|
||||
|
||||
select {
|
||||
case <-exit:
|
||||
case <-time.After(time.Second):
|
||||
case <-time.After(g.opts.GracefulTimeout):
|
||||
g.srv.Stop()
|
||||
}
|
||||
|
||||
|
166
quic/quic_net.go
Normal file
166
quic/quic_net.go
Normal file
@ -0,0 +1,166 @@
|
||||
// grpc over quic mostly based on https://github.com/sssgun/grpc-quic
|
||||
// copyright sssgun with MIT license
|
||||
package grpcquic // import "go.unistack.org/micro-server-grpc/v3/quic"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
quic "github.com/quic-go/quic-go"
|
||||
)
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// Connection
|
||||
|
||||
var _ net.Conn = (*Conn)(nil)
|
||||
|
||||
type Conn struct {
|
||||
conn quic.Connection
|
||||
stream quic.Stream
|
||||
}
|
||||
|
||||
func NewConn(conn quic.Connection) (net.Conn, error) {
|
||||
stream, err := conn.OpenStreamSync(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Conn{conn, stream}, nil
|
||||
}
|
||||
|
||||
// Read reads data from the connection.
|
||||
// Read can be made to time out and return an Error with Timeout() == true
|
||||
// after a fixed time limit; see SetDeadline and SetReadDeadline.
|
||||
func (c *Conn) Read(b []byte) (n int, err error) {
|
||||
return c.stream.Read(b)
|
||||
}
|
||||
|
||||
// Write writes data to the connection.
|
||||
// Write can be made to time out and return an Error with Timeout() == true
|
||||
// after a fixed time limit; see SetDeadline and SetWriteDeadline.
|
||||
func (c *Conn) Write(b []byte) (n int, err error) {
|
||||
return c.stream.Write(b)
|
||||
}
|
||||
|
||||
// Close closes the connection.
|
||||
// Any blocked Read or Write operations will be unblocked and return errors.
|
||||
func (c *Conn) Close() error {
|
||||
// @TODO: log this
|
||||
c.stream.Close()
|
||||
|
||||
return c.conn.CloseWithError(0, "")
|
||||
}
|
||||
|
||||
// LocalAddr returns the local network address.
|
||||
func (c *Conn) LocalAddr() net.Addr {
|
||||
return c.conn.LocalAddr()
|
||||
}
|
||||
|
||||
// RemoteAddr returns the remote network address.
|
||||
func (c *Conn) RemoteAddr() net.Addr {
|
||||
return c.conn.RemoteAddr()
|
||||
}
|
||||
|
||||
// SetDeadline sets the read and write deadlines associated
|
||||
// with the connection. It is equivalent to calling both
|
||||
// SetReadDeadline and SetWriteDeadline.
|
||||
//
|
||||
// A deadline is an absolute time after which I/O operations
|
||||
// fail with a timeout (see type Error) instead of
|
||||
// blocking. The deadline applies to all future and pending
|
||||
// I/O, not just the immediately following call to Read or
|
||||
// Write. After a deadline has been exceeded, the connection
|
||||
// can be refreshed by setting a deadline in the future.
|
||||
//
|
||||
// An idle timeout can be implemented by repeatedly extending
|
||||
// the deadline after successful Read or Write calls.
|
||||
//
|
||||
// A zero value for t means I/O operations will not time out.
|
||||
func (c *Conn) SetDeadline(t time.Time) error {
|
||||
return c.stream.SetDeadline(t)
|
||||
}
|
||||
|
||||
// SetReadDeadline sets the deadline for future Read calls
|
||||
// and any currently-blocked Read call.
|
||||
// A zero value for t means Read will not time out.
|
||||
func (c *Conn) SetReadDeadline(t time.Time) error {
|
||||
return c.stream.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
// SetWriteDeadline sets the deadline for future Write calls
|
||||
// and any currently-blocked Write call.
|
||||
// Even if write times out, it may return n > 0, indicating that
|
||||
// some of the data was successfully written.
|
||||
// A zero value for t means Write will not time out.
|
||||
func (c *Conn) SetWriteDeadline(t time.Time) error {
|
||||
return c.stream.SetWriteDeadline(t)
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// Listener
|
||||
|
||||
var _ net.Listener = (*Listener)(nil)
|
||||
|
||||
type Listener struct {
|
||||
ql quic.Listener
|
||||
}
|
||||
|
||||
func Listen(ql quic.Listener) net.Listener {
|
||||
return &Listener{ql}
|
||||
}
|
||||
|
||||
// Accept waits for and returns the next connection to the listener.
|
||||
func (l *Listener) Accept() (net.Conn, error) {
|
||||
sess, err := l.ql.Accept(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stream, err := sess.AcceptStream(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Conn{sess, stream}, nil
|
||||
}
|
||||
|
||||
// Close closes the listener.
|
||||
// Any blocked Accept operations will be unblocked and return errors.
|
||||
func (l *Listener) Close() error {
|
||||
return l.ql.Close()
|
||||
}
|
||||
|
||||
// Addr returns the listener's network address.
|
||||
func (l *Listener) Addr() net.Addr {
|
||||
return l.ql.Addr()
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// Dialer
|
||||
|
||||
var QuicConfig = &quic.Config{
|
||||
KeepAlivePeriod: 10 * time.Second,
|
||||
}
|
||||
|
||||
func NewPacketConn(addr string) (net.PacketConn, error) {
|
||||
// create a packet conn for outgoing connections
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return net.ListenUDP("udp", udpAddr)
|
||||
}
|
||||
|
||||
func NewQuicDialer(tlsConf *tls.Config) func(context.Context, string) (net.Conn, error) {
|
||||
return func(ctx context.Context, target string) (net.Conn, error) {
|
||||
sess, err := quic.DialAddr(ctx, target, tlsConf, QuicConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewConn(sess)
|
||||
}
|
||||
}
|
117
quic/quic_transport.go
Normal file
117
quic/quic_transport.go
Normal file
@ -0,0 +1,117 @@
|
||||
// grpc over quic mostly based on https://github.com/sssgun/grpc-quic
|
||||
// copyright sssgun with MIT license
|
||||
package grpcquic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"net"
|
||||
|
||||
"google.golang.org/grpc/credentials"
|
||||
)
|
||||
|
||||
var _ credentials.AuthInfo = (*Info)(nil)
|
||||
|
||||
// Info contains the auth information
|
||||
type Info struct {
|
||||
conn *Conn
|
||||
}
|
||||
|
||||
func NewInfo(c *Conn) *Info {
|
||||
return &Info{c}
|
||||
}
|
||||
|
||||
// AuthType returns the type of Info as a string.
|
||||
func (i *Info) AuthType() string {
|
||||
return "quic-tls"
|
||||
}
|
||||
|
||||
func (i *Info) Conn() net.Conn {
|
||||
return i.conn
|
||||
}
|
||||
|
||||
var _ credentials.TransportCredentials = (*Credentials)(nil)
|
||||
|
||||
type Credentials struct {
|
||||
tlsConfig *tls.Config
|
||||
isQuicConnection bool
|
||||
serverName string
|
||||
|
||||
grpcCreds credentials.TransportCredentials
|
||||
}
|
||||
|
||||
func NewCredentials(tlsConfig *tls.Config) credentials.TransportCredentials {
|
||||
grpcCreds := credentials.NewTLS(tlsConfig)
|
||||
return &Credentials{
|
||||
grpcCreds: grpcCreds,
|
||||
tlsConfig: tlsConfig,
|
||||
}
|
||||
}
|
||||
|
||||
// ClientHandshake does the authentication handshake specified by the corresponding
|
||||
// authentication protocol on rawConn for clients. It returns the authenticated
|
||||
// connection and the corresponding auth information about the connection.
|
||||
// Implementations must use the provided context to implement timely cancellation.
|
||||
// gRPC will try to reconnect if the error returned is a temporary error
|
||||
// (io.EOF, context.DeadlineExceeded or err.Temporary() == true).
|
||||
// If the returned error is a wrapper error, implementations should make sure that
|
||||
// the error implements Temporary() to have the correct retry behaviors.
|
||||
//
|
||||
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
|
||||
func (pt *Credentials) ClientHandshake(ctx context.Context, authority string, conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
|
||||
if c, ok := conn.(*Conn); ok {
|
||||
pt.isQuicConnection = true
|
||||
return conn, NewInfo(c), nil
|
||||
}
|
||||
|
||||
return pt.grpcCreds.ClientHandshake(ctx, authority, conn)
|
||||
}
|
||||
|
||||
// ServerHandshake does the authentication handshake for servers. It returns
|
||||
// the authenticated connection and the corresponding auth information about
|
||||
// the connection.
|
||||
//
|
||||
// If the returned net.Conn is closed, it MUST close the net.Conn provided.
|
||||
func (pt *Credentials) ServerHandshake(conn net.Conn) (net.Conn, credentials.AuthInfo, error) {
|
||||
if c, ok := conn.(*Conn); ok {
|
||||
pt.isQuicConnection = true
|
||||
ainfo := NewInfo(c)
|
||||
return conn, ainfo, nil
|
||||
}
|
||||
|
||||
return pt.grpcCreds.ServerHandshake(conn)
|
||||
}
|
||||
|
||||
// Info provides the ProtocolInfo of this Credentials.
|
||||
func (pt *Credentials) Info() credentials.ProtocolInfo {
|
||||
if pt.isQuicConnection {
|
||||
return credentials.ProtocolInfo{
|
||||
// ProtocolVersion is the gRPC wire protocol version.
|
||||
ProtocolVersion: "/quic/1.0.0",
|
||||
// SecurityProtocol is the security protocol in use.
|
||||
SecurityProtocol: "quic-tls",
|
||||
// SecurityVersion is the security protocol version.
|
||||
// SecurityVersion: "1.2.0",
|
||||
// ServerName is the user-configured server name.
|
||||
ServerName: pt.serverName,
|
||||
}
|
||||
}
|
||||
|
||||
return pt.grpcCreds.Info()
|
||||
}
|
||||
|
||||
// Clone makes a copy of this Credentials.
|
||||
func (pt *Credentials) Clone() credentials.TransportCredentials {
|
||||
return &Credentials{
|
||||
tlsConfig: pt.tlsConfig.Clone(),
|
||||
grpcCreds: pt.grpcCreds.Clone(),
|
||||
}
|
||||
}
|
||||
|
||||
// OverrideServerName overrides the server name used to verify the hostname on the returned certificates from the server.
|
||||
// gRPC internals also use it to override the virtual hosting name if it is set.
|
||||
// It must be called before dialing. Currently, this is only used by grpclb.
|
||||
func (pt *Credentials) OverrideServerName(name string) error {
|
||||
pt.serverName = name
|
||||
return pt.grpcCreds.OverrideServerName(name)
|
||||
}
|
@ -4,12 +4,9 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
|
||||
"go.unistack.org/micro/v3/broker"
|
||||
"go.unistack.org/micro/v3/errors"
|
||||
"go.unistack.org/micro/v3/logger"
|
||||
"go.unistack.org/micro/v3/metadata"
|
||||
"go.unistack.org/micro/v3/register"
|
||||
"go.unistack.org/micro/v3/server"
|
||||
@ -104,15 +101,6 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
|
||||
|
||||
func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||
return func(p broker.Event) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
if g.opts.Logger.V(logger.ErrorLevel) {
|
||||
g.opts.Logger.Error(g.opts.Context, "panic recovered: ", r)
|
||||
g.opts.Logger.Error(g.opts.Context, string(debug.Stack()))
|
||||
}
|
||||
err = errors.InternalServerError(g.opts.Name+".subscriber", "panic recovered: %v", r)
|
||||
}
|
||||
}()
|
||||
|
||||
msg := p.Message()
|
||||
// if we don't have headers, create empty map
|
||||
|
Loading…
Reference in New Issue
Block a user