96 lines
1.7 KiB
Go
96 lines
1.7 KiB
Go
// Package stream encapsulates streams within streams
|
|
package stream
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"github.com/unistack-org/micro/v3/client"
|
|
"github.com/unistack-org/micro/v3/codec"
|
|
"github.com/unistack-org/micro/v3/metadata"
|
|
"github.com/unistack-org/micro/v3/server"
|
|
)
|
|
|
|
// Stream interface
|
|
type Stream interface {
|
|
Context() context.Context
|
|
SendMsg(msg interface{}) error
|
|
RecvMsg(msg interface{}) error
|
|
Close() error
|
|
}
|
|
|
|
type stream struct {
|
|
Stream
|
|
|
|
sync.RWMutex
|
|
err error
|
|
request *request
|
|
}
|
|
|
|
type request struct {
|
|
client.Request
|
|
context context.Context
|
|
}
|
|
|
|
// Codec returns codec.Codec
|
|
func (r *request) Codec() codec.Codec {
|
|
return r.Request.Codec()
|
|
}
|
|
|
|
// Header returns metadata header
|
|
func (r *request) Header() metadata.Metadata {
|
|
md, _ := metadata.FromIncomingContext(r.context)
|
|
return md
|
|
}
|
|
|
|
// Read returns stream data
|
|
func (r *request) Read() ([]byte, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
// Request returns server.Request
|
|
func (s *stream) Request() server.Request {
|
|
return s.request
|
|
}
|
|
|
|
// Send sends message
|
|
func (s *stream) Send(v interface{}) error {
|
|
err := s.Stream.SendMsg(v)
|
|
if err != nil {
|
|
s.Lock()
|
|
s.err = err
|
|
s.Unlock()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Recv receives data
|
|
func (s *stream) Recv(v interface{}) error {
|
|
err := s.Stream.RecvMsg(v)
|
|
if err != nil {
|
|
s.Lock()
|
|
s.err = err
|
|
s.Unlock()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Error returns error that stream holds
|
|
func (s *stream) Error() error {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
return s.err
|
|
}
|
|
|
|
// New returns a new encapsulated stream
|
|
// Proto stream within a server.Stream
|
|
func New(service, endpoint string, req interface{}, c client.Client, s Stream) server.Stream {
|
|
return &stream{
|
|
Stream: s,
|
|
request: &request{
|
|
context: s.Context(),
|
|
Request: c.NewRequest(service, endpoint, req),
|
|
},
|
|
}
|
|
}
|