diff --git a/api/handler/rpc/stream.go b/api/handler/rpc/stream.go index a4741769..87dc02ea 100644 --- a/api/handler/rpc/stream.go +++ b/api/handler/rpc/stream.go @@ -135,27 +135,38 @@ func serveWebsocket(ctx context.Context, w http.ResponseWriter, r *http.Request, // receive from stream and send to client for { - // read backend response body - buf, err := rsp.Read() - if err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(err) - } + select { + case <-ctx.Done(): return - } + case <-stream.Context().Done(): + return + default: + // read backend response body + buf, err := rsp.Read() + if err != nil { + // wants to avoid import grpc/status.Status + if strings.Contains(err.Error(), "context canceled") { + return + } + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } + return + } - // write the response - if err := wsutil.WriteServerMessage(rw, op, buf); err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(err) + // write the response + if err := wsutil.WriteServerMessage(rw, op, buf); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } + return } - return - } - if err = rw.Flush(); err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(err) + if err = rw.Flush(); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } + return } - return } } } @@ -166,30 +177,40 @@ func writeLoop(rw io.ReadWriter, stream client.Stream) { defer stream.Close() for { - buf, op, err := wsutil.ReadClientData(rw) - if err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(err) - } + select { + case <-stream.Context().Done(): return - } - switch op { default: - // not relevant - continue - case ws.OpText, ws.OpBinary: - break - } - // send to backend - // default to trying json - // if the extracted payload isn't empty lets use it - request := &raw.Frame{Data: buf} - - if err := stream.Send(request); err != nil { - if logger.V(logger.ErrorLevel, logger.DefaultLogger) { - logger.Error(err) + buf, op, err := wsutil.ReadClientData(rw) + if err != nil { + if wserr, ok := err.(wsutil.ClosedError); ok { + switch wserr.Code { + case ws.StatusNormalClosure, ws.StatusNoStatusRcvd: + return + } + } + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } + return + } + switch op { + default: + // not relevant + continue + case ws.OpText, ws.OpBinary: + break + } + // send to backend + // default to trying json + // if the extracted payload isn't empty lets use it + request := &raw.Frame{Data: buf} + if err := stream.Send(request); err != nil { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Error(err) + } + return } - return } } } diff --git a/web/options.go b/web/options.go index c675f4ea..16aa41c5 100644 --- a/web/options.go +++ b/web/options.go @@ -11,6 +11,7 @@ import ( "github.com/micro/go-micro/v2/registry" ) +//Options for web type Options struct { Name string Version string @@ -67,7 +68,9 @@ func newOptions(opts ...Option) Options { for _, o := range opts { o(&opt) } - + if opt.Registry == nil { + opt.Registry = registry.DefaultRegistry + } if opt.RegisterCheck == nil { opt.RegisterCheck = DefaultRegisterCheck } @@ -75,7 +78,7 @@ func newOptions(opts ...Option) Options { return opt } -// Server name +// Name of Web func Name(n string) Option { return func(o *Options) { o.Name = n @@ -92,7 +95,7 @@ func Icon(ico string) Option { } } -// Unique server id +//Id for Unique server id func Id(id string) Option { return func(o *Options) { o.Id = id @@ -120,7 +123,7 @@ func Address(a string) Option { } } -// The address to advertise for discovery - host:port +//Advertise The address to advertise for discovery - host:port func Advertise(a string) Option { return func(o *Options) { o.Advertise = a @@ -143,26 +146,28 @@ func Registry(r registry.Registry) Option { } } -// Register the service with a TTL +//RegisterTTL Register the service with a TTL func RegisterTTL(t time.Duration) Option { return func(o *Options) { o.RegisterTTL = t } } -// Register the service with at interval +//RegisterInterval Register the service with at interval func RegisterInterval(t time.Duration) Option { return func(o *Options) { o.RegisterInterval = t } } +//Handler for custom handler func Handler(h http.Handler) Option { return func(o *Options) { o.Handler = h } } +//Server for custom Server func Server(srv *http.Server) Option { return func(o *Options) { o.Server = srv diff --git a/web/service.go b/web/service.go index 0cd683a4..cff7e113 100644 --- a/web/service.go +++ b/web/service.go @@ -268,7 +268,7 @@ func (s *service) stop() error { func (s *service) Client() *http.Client { rt := mhttp.NewRoundTripper( - mhttp.WithRegistry(registry.DefaultRegistry), + mhttp.WithRegistry(s.opts.Registry), ) return &http.Client{ Transport: rt, diff --git a/web/web.go b/web/web.go index ae3eb7f1..1da83fd3 100644 --- a/web/web.go +++ b/web/web.go @@ -20,8 +20,10 @@ type Service interface { Run() error } +//Option for web type Option func(o *Options) +//Web basic Defaults var ( // For serving DefaultName = "go-web"