// Package stream encapsulates streams within streams package stream // import "go.unistack.org/micro/v3/util/stream" import ( "context" "sync" "go.unistack.org/micro/v3/client" "go.unistack.org/micro/v3/codec" "go.unistack.org/micro/v3/metadata" "go.unistack.org/micro/v3/server" ) // Stream interface type Stream interface { Context() context.Context SendMsg(msg interface{}) error RecvMsg(msg interface{}) error Close() error } type stream struct { sync.RWMutex Stream err error request *request } type request struct { client.Request context context.Context } // Codec returns codec.Codec func (r *request) Codec() codec.Codec { return r.Request.Codec() } // Header returns metadata header func (r *request) Header() metadata.Metadata { md, _ := metadata.FromIncomingContext(r.context) return md } // Read returns stream data func (r *request) Read() ([]byte, error) { return nil, nil } // Request returns server.Request func (s *stream) Request() server.Request { return s.request } // Send sends message func (s *stream) Send(v interface{}) error { err := s.Stream.SendMsg(v) if err != nil { s.Lock() s.err = err s.Unlock() } return err } // Recv receives data func (s *stream) Recv(v interface{}) error { err := s.Stream.RecvMsg(v) if err != nil { s.Lock() s.err = err s.Unlock() } return err } // Error returns error that stream holds func (s *stream) Error() error { s.RLock() defer s.RUnlock() return s.err } // New returns a new encapsulated stream // Proto stream within a server.Stream func New(service, endpoint string, req interface{}, c client.Client, s Stream) server.Stream { return &stream{ Stream: s, request: &request{ context: s.Context(), Request: c.NewRequest(service, endpoint, req), }, } }