// Package stream encapsulates streams within streams
package stream

import (
	"context"
	"sync"

	"go.unistack.org/micro/v3/client"
	"go.unistack.org/micro/v3/codec"
	"go.unistack.org/micro/v3/metadata"
	"go.unistack.org/micro/v3/server"
)

// Stream interface
type Stream interface {
	Context() context.Context
	SendMsg(msg interface{}) error
	RecvMsg(msg interface{}) error
	Close() error
}

type stream struct {
	Stream
	err     error
	request *request

	sync.RWMutex
}

type request struct {
	client.Request
	context context.Context
}

// Codec returns codec.Codec
func (r *request) Codec() codec.Codec {
	return r.Request.Codec()
}

// Header returns metadata header
func (r *request) Header() metadata.Metadata {
	md, _ := metadata.FromIncomingContext(r.context)
	return md
}

// Read returns stream data
func (r *request) Read() ([]byte, error) {
	return nil, nil
}

// Request returns server.Request
func (s *stream) Request() server.Request {
	return s.request
}

// Send sends message
func (s *stream) Send(v interface{}) error {
	err := s.Stream.SendMsg(v)
	if err != nil {
		s.Lock()
		s.err = err
		s.Unlock()
	}
	return err
}

// Recv receives data
func (s *stream) Recv(v interface{}) error {
	err := s.Stream.RecvMsg(v)
	if err != nil {
		s.Lock()
		s.err = err
		s.Unlock()
	}
	return err
}

// Error returns error that stream holds
func (s *stream) Error() error {
	s.RLock()
	defer s.RUnlock()
	return s.err
}

// New returns a new encapsulated stream
// Proto stream within a server.Stream
func New(service, endpoint string, req interface{}, c client.Client, s Stream) server.Stream {
	return &stream{
		Stream: s,
		request: &request{
			context: s.Context(),
			Request: c.NewRequest(service, endpoint, req),
		},
	}
}