micro/util/stream/stream.go
Evstigneev Denis 38c5fe8b5a
All checks were successful
test / test (push) Successful in 42s
fixed struct alignment && refactor linter (#369)
## Pull Request template
Please, go through these steps before clicking submit on this PR.

1. Give a descriptive title to your PR.
2. Provide a description of your changes.
3. Make sure you have some relevant tests.
4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).

**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**

Reviewed-on: #369
Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru>
Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
2024-12-09 16:23:25 +03:00

96 lines
1.7 KiB
Go

// Package stream encapsulates streams within streams
package stream
import (
"context"
"sync"
"go.unistack.org/micro/v3/client"
"go.unistack.org/micro/v3/codec"
"go.unistack.org/micro/v3/metadata"
"go.unistack.org/micro/v3/server"
)
// Stream interface
type Stream interface {
Context() context.Context
SendMsg(msg interface{}) error
RecvMsg(msg interface{}) error
Close() error
}
type stream struct {
Stream
err error
request *request
sync.RWMutex
}
type request struct {
client.Request
context context.Context
}
// Codec returns codec.Codec
func (r *request) Codec() codec.Codec {
return r.Request.Codec()
}
// Header returns metadata header
func (r *request) Header() metadata.Metadata {
md, _ := metadata.FromIncomingContext(r.context)
return md
}
// Read returns stream data
func (r *request) Read() ([]byte, error) {
return nil, nil
}
// Request returns server.Request
func (s *stream) Request() server.Request {
return s.request
}
// Send sends message
func (s *stream) Send(v interface{}) error {
err := s.Stream.SendMsg(v)
if err != nil {
s.Lock()
s.err = err
s.Unlock()
}
return err
}
// Recv receives data
func (s *stream) Recv(v interface{}) error {
err := s.Stream.RecvMsg(v)
if err != nil {
s.Lock()
s.err = err
s.Unlock()
}
return err
}
// Error returns error that stream holds
func (s *stream) Error() error {
s.RLock()
defer s.RUnlock()
return s.err
}
// New returns a new encapsulated stream
// Proto stream within a server.Stream
func New(service, endpoint string, req interface{}, c client.Client, s Stream) server.Stream {
return &stream{
Stream: s,
request: &request{
context: s.Context(),
Request: c.NewRequest(service, endpoint, req),
},
}
}