Compare commits
28 Commits
Author | SHA1 | Date | |
---|---|---|---|
46891c397f | |||
6856038abe | |||
786bbb7185 | |||
95207c9617 | |||
d646deb468 | |||
468819f0a0 | |||
832f1034a8 | |||
f0b6370ee1 | |||
3d522b094b | |||
92dcd1acd7 | |||
dc8a736e13 | |||
|
4219919c9e | ||
1f447ea747 | |||
|
30c0e01397 | ||
244f3def4d | |||
|
55cbc89e11 | ||
df00f718cf | |||
|
bc3369f3a6 | ||
8d4c661ce5 | |||
|
7b97212e26 | ||
|
9d5a2c1168 | ||
|
483c6bb801 | ||
|
7ba5fd5fee | ||
080705a5df | |||
|
79df512e5e | ||
|
3e893b78c8 | ||
8e9c64d78b | |||
|
d66aa424d2 |
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@@ -34,7 +34,7 @@ jobs:
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: lint
|
||||
uses: golangci/golangci-lint-action@v3.3.1
|
||||
uses: golangci/golangci-lint-action@v3.4.0
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
|
2
.github/workflows/dependabot-automerge.yml
vendored
2
.github/workflows/dependabot-automerge.yml
vendored
@@ -15,7 +15,7 @@ jobs:
|
||||
steps:
|
||||
- name: metadata
|
||||
id: metadata
|
||||
uses: dependabot/fetch-metadata@v1.3.5
|
||||
uses: dependabot/fetch-metadata@v1.3.6
|
||||
with:
|
||||
github-token: "${{ secrets.TOKEN }}"
|
||||
- name: merge
|
||||
|
2
.github/workflows/pr.yml
vendored
2
.github/workflows/pr.yml
vendored
@@ -34,7 +34,7 @@ jobs:
|
||||
- name: checkout
|
||||
uses: actions/checkout@v3
|
||||
- name: lint
|
||||
uses: golangci/golangci-lint-action@v3.3.1
|
||||
uses: golangci/golangci-lint-action@v3.4.0
|
||||
continue-on-error: true
|
||||
with:
|
||||
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
|
||||
|
18
go.mod
18
go.mod
@@ -1,11 +1,17 @@
|
||||
module go.unistack.org/micro-server-grpc/v3
|
||||
|
||||
go 1.16
|
||||
go 1.20
|
||||
|
||||
require (
|
||||
github.com/golang/protobuf v1.5.2
|
||||
go.unistack.org/micro/v3 v3.10.1
|
||||
golang.org/x/net v0.4.0
|
||||
google.golang.org/grpc v1.52.0
|
||||
google.golang.org/protobuf v1.28.1
|
||||
github.com/golang/protobuf v1.5.3
|
||||
go.unistack.org/micro/v3 v3.10.31
|
||||
golang.org/x/net v0.17.0
|
||||
google.golang.org/grpc v1.59.0
|
||||
google.golang.org/protobuf v1.31.0
|
||||
)
|
||||
|
||||
require (
|
||||
golang.org/x/sys v0.13.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect
|
||||
)
|
||||
|
142
grpc.go
142
grpc.go
@@ -26,7 +26,6 @@ import (
|
||||
"golang.org/x/net/netutil"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/encoding"
|
||||
gmetadata "google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
@@ -39,13 +38,13 @@ const (
|
||||
)
|
||||
|
||||
/*
|
||||
type grpcServerReflection struct {
|
||||
type ServerReflection struct {
|
||||
srv *grpc.Server
|
||||
s *serverReflectionServer
|
||||
}
|
||||
*/
|
||||
|
||||
type grpcServer struct {
|
||||
type Server struct {
|
||||
handlers map[string]server.Handler
|
||||
srv *grpc.Server
|
||||
exit chan chan error
|
||||
@@ -60,9 +59,9 @@ type grpcServer struct {
|
||||
reflection bool
|
||||
}
|
||||
|
||||
func newGRPCServer(opts ...server.Option) server.Server {
|
||||
func newServer(opts ...server.Option) *Server {
|
||||
// create a grpc server
|
||||
g := &grpcServer{
|
||||
g := &Server{
|
||||
opts: server.NewOptions(opts...),
|
||||
rpc: &rServer{
|
||||
serviceMap: make(map[string]*service),
|
||||
@@ -91,7 +90,7 @@ func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp se
|
||||
|
||||
*/
|
||||
|
||||
func (g *grpcServer) configure(opts ...server.Option) error {
|
||||
func (g *Server) configure(opts ...server.Option) error {
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
|
||||
@@ -128,6 +127,10 @@ func (g *grpcServer) configure(opts ...server.Option) error {
|
||||
}
|
||||
}
|
||||
|
||||
for _, k := range g.opts.Codecs {
|
||||
encoding.RegisterCodec(&wrapMicroCodec{k})
|
||||
}
|
||||
|
||||
maxMsgSize := g.getMaxMsgSize()
|
||||
|
||||
gopts := []grpc.ServerOption{
|
||||
@@ -136,10 +139,6 @@ func (g *grpcServer) configure(opts ...server.Option) error {
|
||||
grpc.UnknownServiceHandler(g.handler),
|
||||
}
|
||||
|
||||
if creds := g.getCredentials(); creds != nil {
|
||||
gopts = append(gopts, grpc.Creds(creds))
|
||||
}
|
||||
|
||||
if opts := g.getGrpcOptions(); opts != nil {
|
||||
gopts = append(opts, gopts...)
|
||||
}
|
||||
@@ -165,7 +164,7 @@ func (g *grpcServer) configure(opts ...server.Option) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) getMaxMsgSize() int {
|
||||
func (g *Server) getMaxMsgSize() int {
|
||||
if g.opts.Context == nil {
|
||||
return codec.DefaultMaxMsgSize
|
||||
}
|
||||
@@ -176,14 +175,7 @@ func (g *grpcServer) getMaxMsgSize() int {
|
||||
return s
|
||||
}
|
||||
|
||||
func (g *grpcServer) getCredentials() credentials.TransportCredentials {
|
||||
if g.opts.TLSConfig != nil {
|
||||
return credentials.NewTLS(g.opts.TLSConfig)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
|
||||
func (g *Server) getGrpcOptions() []grpc.ServerOption {
|
||||
if g.opts.Context == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -196,7 +188,7 @@ func (g *grpcServer) getGrpcOptions() []grpc.ServerOption {
|
||||
return opts
|
||||
}
|
||||
|
||||
func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err error) {
|
||||
func (g *Server) handler(srv interface{}, stream grpc.ServerStream) (err error) {
|
||||
fullMethod, ok := grpc.MethodFromServerStream(stream)
|
||||
if !ok {
|
||||
return status.Errorf(codes.Internal, "method does not exist in context")
|
||||
@@ -242,6 +234,10 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
|
||||
for k, v := range gmd {
|
||||
md.Set(k, strings.Join(v, ", "))
|
||||
}
|
||||
md.Set("Path", fullMethod)
|
||||
md.Set("Micro-Server", "grpc")
|
||||
md.Set(metadata.HeaderEndpoint, methodName)
|
||||
md.Set(metadata.HeaderService, serviceName)
|
||||
|
||||
var td string
|
||||
// timeout for server deadline
|
||||
@@ -302,7 +298,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
|
||||
|
||||
/*
|
||||
if svc == nil && g.reflection && methodName == "ServerReflectionInfo" {
|
||||
rfl := &grpcServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
|
||||
rfl := &ServerReflection{srv: g.srv, s: &serverReflectionServer{s: g.srv}}
|
||||
svc = &service{}
|
||||
svc.typ = reflect.TypeOf(rfl)
|
||||
svc.rcvr = reflect.ValueOf(rfl)
|
||||
@@ -350,7 +346,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) (err err
|
||||
return g.processStream(ctx, stream, svc, mtype, ct)
|
||||
}
|
||||
|
||||
func (g *grpcServer) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||
func (g *Server) processRequest(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||
// for {
|
||||
var err error
|
||||
var argv, replyv reflect.Value
|
||||
@@ -493,12 +489,12 @@ func (s *reflectStream) RecvMsg(m interface{}) error {
|
||||
return s.stream.Recv(m)
|
||||
}
|
||||
|
||||
func (g *grpcServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
|
||||
func (g *ServerReflection) ServerReflectionInfo(ctx context.Context, stream server.Stream) error {
|
||||
return g.s.ServerReflectionInfo(&reflectStream{stream})
|
||||
}
|
||||
*/
|
||||
|
||||
func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||
func (g *Server) processStream(ctx context.Context, stream grpc.ServerStream, service *service, mtype *methodType, ct string) error {
|
||||
opts := g.opts
|
||||
|
||||
r := &rpcRequest{
|
||||
@@ -572,7 +568,7 @@ func (g *grpcServer) processStream(ctx context.Context, stream grpc.ServerStream
|
||||
return status.New(statusCode, statusDesc).Err()
|
||||
}
|
||||
|
||||
func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
|
||||
func (g *Server) newCodec(ct string) (codec.Codec, error) {
|
||||
g.RLock()
|
||||
defer g.RUnlock()
|
||||
|
||||
@@ -587,7 +583,7 @@ func (g *grpcServer) newCodec(ct string) (codec.Codec, error) {
|
||||
return nil, codec.ErrUnknownContentType
|
||||
}
|
||||
|
||||
func (g *grpcServer) Options() server.Options {
|
||||
func (g *Server) Options() server.Options {
|
||||
g.RLock()
|
||||
opts := g.opts
|
||||
g.RUnlock()
|
||||
@@ -595,15 +591,15 @@ func (g *grpcServer) Options() server.Options {
|
||||
return opts
|
||||
}
|
||||
|
||||
func (g *grpcServer) Init(opts ...server.Option) error {
|
||||
func (g *Server) Init(opts ...server.Option) error {
|
||||
return g.configure(opts...)
|
||||
}
|
||||
|
||||
func (g *grpcServer) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
|
||||
func (g *Server) NewHandler(h interface{}, opts ...server.HandlerOption) server.Handler {
|
||||
return newRPCHandler(h, opts...)
|
||||
}
|
||||
|
||||
func (g *grpcServer) Handle(h server.Handler) error {
|
||||
func (g *Server) Handle(h server.Handler) error {
|
||||
if err := g.rpc.register(h.Handler()); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -612,11 +608,11 @@ func (g *grpcServer) Handle(h server.Handler) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||
func (g *Server) NewSubscriber(topic string, sb interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||
return newSubscriber(topic, sb, opts...)
|
||||
}
|
||||
|
||||
func (g *grpcServer) Subscribe(sb server.Subscriber) error {
|
||||
func (g *Server) Subscribe(sb server.Subscriber) error {
|
||||
sub, ok := sb.(*subscriber)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid subscriber: expected *subscriber")
|
||||
@@ -640,7 +636,7 @@ func (g *grpcServer) Subscribe(sb server.Subscriber) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) Register() error {
|
||||
func (g *Server) Register() error {
|
||||
g.RLock()
|
||||
rsvc := g.rsvc
|
||||
config := g.opts
|
||||
@@ -714,38 +710,13 @@ func (g *grpcServer) Register() error {
|
||||
g.Lock()
|
||||
defer g.Unlock()
|
||||
|
||||
for sb := range g.subscribers {
|
||||
handler := g.createSubHandler(sb, config)
|
||||
var opts []broker.SubscribeOption
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
opts = append(opts, broker.SubscribeGroup(queue))
|
||||
}
|
||||
|
||||
subCtx := config.Context
|
||||
if cx := sb.Options().Context; cx != nil {
|
||||
subCtx = cx
|
||||
}
|
||||
opts = append(opts, broker.SubscribeContext(subCtx))
|
||||
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
||||
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
|
||||
}
|
||||
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
g.subscribers[sb] = []broker.Subscriber{sub}
|
||||
}
|
||||
|
||||
g.registered = true
|
||||
g.rsvc = service
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) Deregister() error {
|
||||
func (g *Server) Deregister() error {
|
||||
var err error
|
||||
|
||||
g.RLock()
|
||||
@@ -799,7 +770,7 @@ func (g *grpcServer) Deregister() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) Start() error {
|
||||
func (g *Server) Start() error {
|
||||
g.RLock()
|
||||
if g.started {
|
||||
g.RUnlock()
|
||||
@@ -809,10 +780,6 @@ func (g *grpcServer) Start() error {
|
||||
|
||||
config := g.Options()
|
||||
|
||||
for _, k := range config.Codecs {
|
||||
encoding.RegisterCodec(&wrapMicroCodec{k})
|
||||
}
|
||||
|
||||
// micro: config.Transport.Listen(config.Address)
|
||||
var ts net.Listener
|
||||
var err error
|
||||
@@ -876,6 +843,10 @@ func (g *grpcServer) Start() error {
|
||||
}
|
||||
}
|
||||
|
||||
if err = g.subscribe(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// micro: go ts.Accept(s.accept)
|
||||
go func() {
|
||||
if err = g.srv.Serve(ts); err != nil {
|
||||
@@ -987,7 +958,38 @@ func (g *grpcServer) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *grpcServer) Stop() error {
|
||||
func (g *Server) subscribe() error {
|
||||
config := g.opts
|
||||
|
||||
for sb := range g.subscribers {
|
||||
handler := g.createSubHandler(sb, config)
|
||||
var opts []broker.SubscribeOption
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
opts = append(opts, broker.SubscribeGroup(queue))
|
||||
}
|
||||
|
||||
subCtx := config.Context
|
||||
if cx := sb.Options().Context; cx != nil {
|
||||
subCtx = cx
|
||||
}
|
||||
opts = append(opts, broker.SubscribeContext(subCtx))
|
||||
opts = append(opts, broker.SubscribeAutoAck(sb.Options().AutoAck))
|
||||
opts = append(opts, broker.SubscribeBodyOnly(sb.Options().BodyOnly))
|
||||
|
||||
if config.Logger.V(logger.InfoLevel) {
|
||||
config.Logger.Infof(config.Context, "Subscribing to topic: %s", sb.Topic())
|
||||
}
|
||||
sub, err := config.Broker.Subscribe(subCtx, sb.Topic(), handler, opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
g.subscribers[sb] = []broker.Subscriber{sub}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Server) Stop() error {
|
||||
g.RLock()
|
||||
if !g.started {
|
||||
g.RUnlock()
|
||||
@@ -1007,14 +1009,18 @@ func (g *grpcServer) Stop() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (g *grpcServer) String() string {
|
||||
func (g *Server) String() string {
|
||||
return "grpc"
|
||||
}
|
||||
|
||||
func (g *grpcServer) Name() string {
|
||||
func (g *Server) Name() string {
|
||||
return g.opts.Name
|
||||
}
|
||||
|
||||
func NewServer(opts ...server.Option) server.Server {
|
||||
return newGRPCServer(opts...)
|
||||
func (g *Server) GRPCServer() *grpc.Server {
|
||||
return g.srv
|
||||
}
|
||||
|
||||
func NewServer(opts ...server.Option) *Server {
|
||||
return newServer(opts...)
|
||||
}
|
||||
|
@@ -102,7 +102,7 @@ func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOptio
|
||||
}
|
||||
}
|
||||
|
||||
func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||
func (g *Server) createSubHandler(sb *subscriber, opts server.Options) broker.Handler {
|
||||
return func(p broker.Event) (err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
|
Reference in New Issue
Block a user