All checks were successful
		
		
	
	test / test (push) Successful in 42s
				
			## Pull Request template Please, go through these steps before clicking submit on this PR. 1. Give a descriptive title to your PR. 2. Provide a description of your changes. 3. Make sure you have some relevant tests. 4. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable). **PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING** Reviewed-on: #369 Co-authored-by: Evstigneev Denis <danteevstigneev@yandex.ru> Co-committed-by: Evstigneev Denis <danteevstigneev@yandex.ru>
		
			
				
	
	
		
			96 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			96 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Package stream encapsulates streams within streams
 | |
| package stream
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"sync"
 | |
| 
 | |
| 	"go.unistack.org/micro/v3/client"
 | |
| 	"go.unistack.org/micro/v3/codec"
 | |
| 	"go.unistack.org/micro/v3/metadata"
 | |
| 	"go.unistack.org/micro/v3/server"
 | |
| )
 | |
| 
 | |
| // Stream interface
 | |
| type Stream interface {
 | |
| 	Context() context.Context
 | |
| 	SendMsg(msg interface{}) error
 | |
| 	RecvMsg(msg interface{}) error
 | |
| 	Close() error
 | |
| }
 | |
| 
 | |
| type stream struct {
 | |
| 	Stream
 | |
| 	err     error
 | |
| 	request *request
 | |
| 
 | |
| 	sync.RWMutex
 | |
| }
 | |
| 
 | |
| type request struct {
 | |
| 	client.Request
 | |
| 	context context.Context
 | |
| }
 | |
| 
 | |
| // Codec returns codec.Codec
 | |
| func (r *request) Codec() codec.Codec {
 | |
| 	return r.Request.Codec()
 | |
| }
 | |
| 
 | |
| // Header returns metadata header
 | |
| func (r *request) Header() metadata.Metadata {
 | |
| 	md, _ := metadata.FromIncomingContext(r.context)
 | |
| 	return md
 | |
| }
 | |
| 
 | |
| // Read returns stream data
 | |
| func (r *request) Read() ([]byte, error) {
 | |
| 	return nil, nil
 | |
| }
 | |
| 
 | |
| // Request returns server.Request
 | |
| func (s *stream) Request() server.Request {
 | |
| 	return s.request
 | |
| }
 | |
| 
 | |
| // Send sends message
 | |
| func (s *stream) Send(v interface{}) error {
 | |
| 	err := s.Stream.SendMsg(v)
 | |
| 	if err != nil {
 | |
| 		s.Lock()
 | |
| 		s.err = err
 | |
| 		s.Unlock()
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Recv receives data
 | |
| func (s *stream) Recv(v interface{}) error {
 | |
| 	err := s.Stream.RecvMsg(v)
 | |
| 	if err != nil {
 | |
| 		s.Lock()
 | |
| 		s.err = err
 | |
| 		s.Unlock()
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Error returns error that stream holds
 | |
| func (s *stream) Error() error {
 | |
| 	s.RLock()
 | |
| 	defer s.RUnlock()
 | |
| 	return s.err
 | |
| }
 | |
| 
 | |
| // New returns a new encapsulated stream
 | |
| // Proto stream within a server.Stream
 | |
| func New(service, endpoint string, req interface{}, c client.Client, s Stream) server.Stream {
 | |
| 	return &stream{
 | |
| 		Stream: s,
 | |
| 		request: &request{
 | |
| 			context: s.Context(),
 | |
| 			Request: c.NewRequest(service, endpoint, req),
 | |
| 		},
 | |
| 	}
 | |
| }
 |