fixes for safe conversation and avoid panics (#1213)
* fixes for safe convertation Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * fix client publish panic If broker connect returns error we dont check it status and use it later to publish message, mostly this is unexpected because broker connection failed and we cant use it. Also proposed solution have benefit - we flag connection status only when we have succeseful broker connection Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * api/handler/broker: fix possible broker publish panic Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
parent
5b60299d47
commit
b00167520c
15
grpc.go
15
grpc.go
@ -6,7 +6,7 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/micro/go-micro/v2/broker"
|
"github.com/micro/go-micro/v2/broker"
|
||||||
@ -24,9 +24,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type grpcClient struct {
|
type grpcClient struct {
|
||||||
once sync.Once
|
|
||||||
opts client.Options
|
opts client.Options
|
||||||
pool *pool
|
pool *pool
|
||||||
|
once atomic.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -570,9 +570,12 @@ func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...clie
|
|||||||
body = b
|
body = b
|
||||||
}
|
}
|
||||||
|
|
||||||
g.once.Do(func() {
|
if !g.once.Load().(bool) {
|
||||||
g.opts.Broker.Connect()
|
if err = g.opts.Broker.Connect(); err != nil {
|
||||||
})
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
|
}
|
||||||
|
g.once.Store(true)
|
||||||
|
}
|
||||||
|
|
||||||
topic := p.Topic()
|
topic := p.Topic()
|
||||||
|
|
||||||
@ -641,9 +644,9 @@ func newClient(opts ...client.Option) client.Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
rc := &grpcClient{
|
rc := &grpcClient{
|
||||||
once: sync.Once{},
|
|
||||||
opts: options,
|
opts: options,
|
||||||
}
|
}
|
||||||
|
rc.once.Store(false)
|
||||||
|
|
||||||
rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
|
rc.pool = newPool(options.PoolSize, options.PoolTTL, rc.poolMaxIdle(), rc.poolMaxStreams())
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user