Compare commits
46 Commits
Author | SHA1 | Date | |
---|---|---|---|
d646deb468 | |||
468819f0a0 | |||
832f1034a8 | |||
f0b6370ee1 | |||
3d522b094b | |||
92dcd1acd7 | |||
dc8a736e13 | |||
|
4219919c9e | ||
1f447ea747 | |||
|
30c0e01397 | ||
244f3def4d | |||
|
55cbc89e11 | ||
df00f718cf | |||
|
bc3369f3a6 | ||
8d4c661ce5 | |||
|
7b97212e26 | ||
|
9d5a2c1168 | ||
|
483c6bb801 | ||
|
7ba5fd5fee | ||
080705a5df | |||
|
79df512e5e | ||
|
3e893b78c8 | ||
8e9c64d78b | |||
|
d66aa424d2 | ||
|
f8d3695962 | ||
ae158ce5fc | |||
8125c9003c | |||
a6f6df257b | |||
6b19cb2fb7 | |||
|
db6fee9760 | ||
309f100532 | |||
|
22ae55f739 | ||
70700a3f86 | |||
7dd327086c | |||
a67efa39ae | |||
|
8ee91422cc | ||
f8ae500c5f | |||
|
7fcc042fbf | ||
3a22f3a900 | |||
|
452a124aee | ||
|
26d3adfe95 | ||
|
18d6584c8f | ||
f26dde5d63 | |||
|
66d3feb263 | ||
5c8effa23f | |||
|
c1e318d0b3 |
2
.github/workflows/autoapprove.yml
vendored
2
.github/workflows/autoapprove.yml
vendored
@@ -13,7 +13,7 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: approve
|
- name: approve
|
||||||
uses: hmarr/auto-approve-action@v2
|
uses: hmarr/auto-approve-action@v3
|
||||||
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
|
if: github.actor == 'vtolstov' || github.actor == 'dependabot[bot]'
|
||||||
id: approve
|
id: approve
|
||||||
with:
|
with:
|
||||||
|
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@@ -34,7 +34,7 @@ jobs:
|
|||||||
- name: checkout
|
- name: checkout
|
||||||
uses: actions/checkout@v3
|
uses: actions/checkout@v3
|
||||||
- name: lint
|
- name: lint
|
||||||
uses: golangci/golangci-lint-action@v3.3.1
|
uses: golangci/golangci-lint-action@v3.4.0
|
||||||
continue-on-error: true
|
continue-on-error: true
|
||||||
with:
|
with:
|
||||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||||
|
2
.github/workflows/dependabot-automerge.yml
vendored
2
.github/workflows/dependabot-automerge.yml
vendored
@@ -15,7 +15,7 @@ jobs:
|
|||||||
steps:
|
steps:
|
||||||
- name: metadata
|
- name: metadata
|
||||||
id: metadata
|
id: metadata
|
||||||
uses: dependabot/fetch-metadata@v1.3.5
|
uses: dependabot/fetch-metadata@v1.3.6
|
||||||
with:
|
with:
|
||||||
github-token: "${{ secrets.TOKEN }}"
|
github-token: "${{ secrets.TOKEN }}"
|
||||||
- name: merge
|
- name: merge
|
||||||
|
2
.github/workflows/pr.yml
vendored
2
.github/workflows/pr.yml
vendored
@@ -34,7 +34,7 @@ jobs:
|
|||||||
- name: checkout
|
- name: checkout
|
||||||
uses: actions/checkout@v3
|
uses: actions/checkout@v3
|
||||||
- name: lint
|
- name: lint
|
||||||
uses: golangci/golangci-lint-action@v3.3.1
|
uses: golangci/golangci-lint-action@v3.4.0
|
||||||
continue-on-error: true
|
continue-on-error: true
|
||||||
with:
|
with:
|
||||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||||
|
8
go.mod
8
go.mod
@@ -4,10 +4,8 @@ go 1.16
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/golang/protobuf v1.5.2
|
github.com/golang/protobuf v1.5.2
|
||||||
go.unistack.org/micro/v3 v3.9.11
|
go.unistack.org/micro/v3 v3.10.14
|
||||||
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
|
golang.org/x/net v0.5.0
|
||||||
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5 // indirect
|
google.golang.org/grpc v1.52.3
|
||||||
google.golang.org/genproto v0.0.0-20220310185008-1973136f34c6 // indirect
|
|
||||||
google.golang.org/grpc v1.50.1
|
|
||||||
google.golang.org/protobuf v1.28.1
|
google.golang.org/protobuf v1.28.1
|
||||||
)
|
)
|
||||||
|
132
grpc.go
132
grpc.go
@@ -35,17 +35,17 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DefaultContentType = "application/grpc+proto"
|
DefaultContentType = "application/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
type grpcServerReflection struct {
|
type ServerReflection struct {
|
||||||
srv *grpc.Server
|
srv *grpc.Server
|
||||||
s *serverReflectionServer
|
s *serverReflectionServer
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
type grpcServer struct {
|
type Server struct {
|
||||||
handlers map[string]server.Handler
|
handlers map[string]server.Handler
|
||||||
srv *grpc.Server
|
srv *grpc.Server
|
||||||
exit chan chan error
|
exit chan chan error
|
||||||
@@ -60,9 +60,9 @@ type grpcServer struct {
|
|||||||
reflection bool
|
reflection bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newGRPCServer(opts ...server.Option) server.Server {
|
func newServer(opts ...server.Option) *Server {
|
||||||
// create a grpc server
|
// create a grpc server
|
||||||
g := &grpcServer{
|
g := &Server{
|
||||||
opts: server.NewOptions(opts...),
|
opts: server.NewOptions(opts...),
|
||||||
rpc: &rServer{
|
rpc: &rServer{
|
||||||
serviceMap: make(map[string]*service),
|
serviceMap: make(map[string]*service),
|
||||||
@@ -91,7 +91,7 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se
|
|||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
func (g *grpcServer) configure(opts ...server.Option) error {
|
func (g *Server) configure(opts ...server.Option) error {
|
||||||
g.Lock()
|
g.Lock()
|
||||||
defer g.Unlock()
|
defer g.Unlock()
|
||||||
|
|
||||||
@@ -128,6 +128,10 @@ func (g *grpcServer) configure(opts ...server.Option) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, k := range g.opts.Codecs {
|
||||||
|
encoding.RegisterCodec(&wrapMicroCodec{k})
|
||||||
|
}
|
||||||
|
|
||||||
maxMsgSize := g.getMaxMsgSize()
|
maxMsgSize := g.getMaxMsgSize()
|
||||||
|
|
||||||
gopts := []grpc.ServerOption{
|
gopts := []grpc.ServerOption{
|
||||||
@@ -165,7 +169,7 @@ func (g *grpcServer) configure(opts ...server.Option) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) getMaxMsgSize() int {
|
func (g *Server) getMaxMsgSize() int {
|
||||||
if g.opts.Context == nil {
|
if g.opts.Context == nil {
|
||||||
return codec.DefaultMaxMsgSize
|
return codec.DefaultMaxMsgSize
|
||||||
}
|
}
|
||||||
@@ -176,14 +180,14 @@ func (g *grpcServer) getMaxMsgSize() int {
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) getCredentials() credentials.TransportCredentials {
|
func (g *Server) getCredentials() credentials.TransportCredentials {
|
||||||
if g.opts.TLSConfig != nil {
|
if g.opts.TLSConfig != nil {
|
||||||
return credentials.NewTLS(g.opts.TLSConfig)
|
return credentials.NewTLS(g.opts.TLSConfig)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
|
func (g *Server) getGrpcOptions() []grpc.ServerOption {
|
||||||
if g.opts.Context == nil {
|
if g.opts.Context == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -196,7 +200,7 @@ func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
|
|||||||
return opts
|
return opts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) {
|
func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) {
|
||||||
fullMethod, ok := grpc.MethodFromServerStream(stream)
|
fullMethod, ok := grpc.MethodFromServerStream(stream)
|
||||||
if !ok {
|
if !ok {
|
||||||
return status.Errorf(codes.Internal, "method does not exist in context")
|
return status.Errorf(codes.Internal, "method does not exist in context")
|
||||||
@@ -302,7 +306,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
if svc == nil && g.reflection && methodName == "ServerReflectionInfo" {
|
if svc == nil && g.reflection && methodName == "ServerReflectionInfo" {
|
||||||
rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
|
rfl := &ServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
|
||||||
svc = &service{}
|
svc = &service{}
|
||||||
svc.typ = reflect.TypeOf(rfl)
|
svc.typ = reflect.TypeOf(rfl)
|
||||||
svc.rcvr = reflect.ValueOf(rfl)
|
svc.rcvr = reflect.ValueOf(rfl)
|
||||||
@@ -350,7 +354,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
|
|||||||
return g.processStream(ctx, stream, svc, mtype, ct)
|
return g.processStream(ctx, stream, svc, mtype, ct)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||||
// for {
|
// for {
|
||||||
var err error
|
var err error
|
||||||
var argv, replyv reflect.Value
|
var argv, replyv reflect.Value
|
||||||
@@ -384,6 +388,7 @@ func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStrea
|
|||||||
service: g.opts.Name,
|
service: g.opts.Name,
|
||||||
contentType: ct,
|
contentType: ct,
|
||||||
method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
|
method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
|
||||||
|
endpoint: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
|
||||||
payload: argv.Interface(),
|
payload: argv.Interface(),
|
||||||
}
|
}
|
||||||
// define the handler func
|
// define the handler func
|
||||||
@@ -492,18 +497,19 @@ func (s *reflectStream) RecvMsg(m interface{}) error {
|
|||||||
return s.stream.Recv(m)
|
return s.stream.Recv(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
|
func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
|
||||||
return g.s.ServerReflectionInfo(&reflectStream{stream})
|
return g.s.ServerReflectionInfo(&reflectStream{stream})
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||||
opts := g.opts
|
opts := g.opts
|
||||||
|
|
||||||
r := &rpcRequest{
|
r := &rpcRequest{
|
||||||
service: opts.Name,
|
service: opts.Name,
|
||||||
contentType: ct,
|
contentType: ct,
|
||||||
method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
|
method: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
|
||||||
|
endpoint: fmt.Sprintf("%s.%s", service.name, mtype.method.Name),
|
||||||
stream: true,
|
stream: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -570,7 +576,7 @@ func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream
|
|||||||
return status.New(statusCode, statusDesc).Err()
|
return status.New(statusCode, statusDesc).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
|
func (g *Server) newCodec(ct string) (codec.Codec, error) {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
defer g.RUnlock()
|
defer g.RUnlock()
|
||||||
|
|
||||||
@@ -585,7 +591,7 @@ func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
|
|||||||
return nil, codec.ErrUnknownContentType
|
return nil, codec.ErrUnknownContentType
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Options() server.Options {
|
func (g *Server) Options() server.Options {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
opts := g.opts
|
opts := g.opts
|
||||||
g.RUnlock()
|
g.RUnlock()
|
||||||
@@ -593,15 +599,15 @@ func (g *grpcServer) Options() server.Options {
|
|||||||
return opts
|
return opts
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Init(opts ...server.Option) error {
|
func (g *Server) Init(opts ...server.Option) error {
|
||||||
return g.configure(opts...)
|
return g.configure(opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
|
func (g *Server) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
|
||||||
return newRPCHandler(h, opts...)
|
return newRPCHandler(h, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Handle(h server.Handler) error {
|
func (g *Server) Handle(h server.Handler) error {
|
||||||
if err := g.rpc.register(h.Handler()); err != nil {
|
if err := g.rpc.register(h.Handler()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -610,11 +616,11 @@ func (g *grpcServer) Handle(h server.Handler) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||||
return newSubscriber(topic, sb, opts...)
|
return newSubscriber(topic, sb, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Subscribe(sb server.Subscriber) error {
|
func (g *Server) Subscribe(sb server.Subscriber) error {
|
||||||
sub, ok := sb.(*subscriber)
|
sub, ok := sb.(*subscriber)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("invalid subscriber: expected *subscriber")
|
return fmt.Errorf("invalid subscriber: expected *subscriber")
|
||||||
@@ -638,7 +644,7 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Register() error {
|
func (g *Server) Register() error {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
rsvc := g.rsvc
|
rsvc := g.rsvc
|
||||||
config := g.opts
|
config := g.opts
|
||||||
@@ -712,38 +718,13 @@ func (g *grpcServer) Register() error {
|
|||||||
g.Lock()
|
g.Lock()
|
||||||
defer g.Unlock()
|
defer g.Unlock()
|
||||||
|
|
||||||
for sb := range g.subscribers {
|
|
||||||
handler := g.createSubHandler(sb, config)
|
|
||||||
var opts []broker.SubscribeOption
|
|
||||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
|
||||||
opts = append(opts, broker.SubscribeGroup(queue))
|
|
||||||
}
|
|
||||||
|
|
||||||
subCtx := config.Context
|
|
||||||
if cx := sb.Options().Context; cx != nil {
|
|
||||||
subCtx = cx
|
|
||||||
}
|
|
||||||
opts = append(opts, broker.SubscribeContext(subCtx))
|
|
||||||
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
|
||||||
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
|
|
||||||
|
|
||||||
if config.Logger.V(logger.InfoLevel) {
|
|
||||||
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
|
|
||||||
}
|
|
||||||
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
g.subscribers[sb] = []broker.Subscriber{sub}
|
|
||||||
}
|
|
||||||
|
|
||||||
g.registered = true
|
g.registered = true
|
||||||
g.rsvc = service
|
g.rsvc = service
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Deregister() error {
|
func (g *Server) Deregister() error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
g.RLock()
|
g.RLock()
|
||||||
@@ -797,7 +778,7 @@ func (g *grpcServer) Deregister() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Start() error {
|
func (g *Server) Start() error {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
if g.started {
|
if g.started {
|
||||||
g.RUnlock()
|
g.RUnlock()
|
||||||
@@ -807,10 +788,6 @@ func (g *grpcServer) Start() error {
|
|||||||
|
|
||||||
config := g.Options()
|
config := g.Options()
|
||||||
|
|
||||||
for _, k := range config.Codecs {
|
|
||||||
encoding.RegisterCodec(&wrapMicroCodec{k})
|
|
||||||
}
|
|
||||||
|
|
||||||
// micro: config.Transport.Listen(config.Address)
|
// micro: config.Transport.Listen(config.Address)
|
||||||
var ts net.Listener
|
var ts net.Listener
|
||||||
var err error
|
var err error
|
||||||
@@ -874,6 +851,10 @@ func (g *grpcServer) Start() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = g.subscribe(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// micro: go ts.Accept(s.accept)
|
// micro: go ts.Accept(s.accept)
|
||||||
go func() {
|
go func() {
|
||||||
if err = g.srv.Serve(ts); err != nil {
|
if err = g.srv.Serve(ts); err != nil {
|
||||||
@@ -985,7 +966,38 @@ func (g *grpcServer) Start() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Stop() error {
|
func (g *Server) subscribe() error {
|
||||||
|
config := g.opts
|
||||||
|
|
||||||
|
for sb := range g.subscribers {
|
||||||
|
handler := g.createSubHandler(sb, config)
|
||||||
|
var opts []broker.SubscribeOption
|
||||||
|
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||||
|
opts = append(opts, broker.SubscribeGroup(queue))
|
||||||
|
}
|
||||||
|
|
||||||
|
subCtx := config.Context
|
||||||
|
if cx := sb.Options().Context; cx != nil {
|
||||||
|
subCtx = cx
|
||||||
|
}
|
||||||
|
opts = append(opts, broker.SubscribeContext(subCtx))
|
||||||
|
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
||||||
|
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
|
||||||
|
|
||||||
|
if config.Logger.V(logger.InfoLevel) {
|
||||||
|
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
|
||||||
|
}
|
||||||
|
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
g.subscribers[sb] = []broker.Subscriber{sub}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *Server) Stop() error {
|
||||||
g.RLock()
|
g.RLock()
|
||||||
if !g.started {
|
if !g.started {
|
||||||
g.RUnlock()
|
g.RUnlock()
|
||||||
@@ -1005,14 +1017,18 @@ func (g *grpcServer) Stop() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) String() string {
|
func (g *Server) String() string {
|
||||||
return "grpc"
|
return "grpc"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) Name() string {
|
func (g *Server) Name() string {
|
||||||
return g.opts.Name
|
return g.opts.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServer(opts ...server.Option) server.Server {
|
func (g *Server) GRPCServer() *grpc.Server {
|
||||||
return newGRPCServer(opts...)
|
return g.srv
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServer(opts ...server.Option) *Server {
|
||||||
|
return newServer(opts...)
|
||||||
}
|
}
|
||||||
|
@@ -102,7 +102,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||||
return func(p broker.Event) (err error) {
|
return func(p broker.Event) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
|
Reference in New Issue
Block a user