resurrect service stuff

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2020-08-28 10:57:42 +03:00
parent aa99378adc
commit 0f19355621
12 changed files with 967 additions and 332 deletions

View File

@@ -1,74 +0,0 @@
// Package bytes provides a bytes codec which does not encode or decode anything
package bytes
import (
"fmt"
"io"
"io/ioutil"
"github.com/unistack-org/micro/v3/codec"
)
type Codec struct {
Conn io.ReadWriteCloser
}
// Frame gives us the ability to define raw data to send over the pipes
type Frame struct {
Data []byte
}
func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
return nil
}
func (c *Codec) ReadBody(b interface{}) error {
// read bytes
buf, err := ioutil.ReadAll(c.Conn)
if err != nil {
return err
}
switch v := b.(type) {
case *[]byte:
*v = buf
case *Frame:
v.Data = buf
default:
return fmt.Errorf("failed to read body: %v is not type of *[]byte", b)
}
return nil
}
func (c *Codec) Write(m *codec.Message, b interface{}) error {
var v []byte
switch vb := b.(type) {
case nil:
return nil
case *Frame:
v = vb.Data
case *[]byte:
v = *vb
case []byte:
v = vb
default:
return fmt.Errorf("failed to write: %v is not type of *[]byte or []byte", b)
}
_, err := c.Conn.Write(v)
return err
}
func (c *Codec) Close() error {
return c.Conn.Close()
}
func (c *Codec) String() string {
return "bytes"
}
func NewCodec(c io.ReadWriteCloser) codec.Codec {
return &Codec{
Conn: c,
}
}

View File

@@ -1,38 +0,0 @@
package bytes
import (
"github.com/unistack-org/micro/v3/codec"
)
type Marshaler struct{}
type Message struct {
Header map[string]string
Body []byte
}
func (n Marshaler) Marshal(v interface{}) ([]byte, error) {
switch ve := v.(type) {
case *[]byte:
return *ve, nil
case []byte:
return ve, nil
case *Message:
return ve.Body, nil
}
return nil, codec.ErrInvalidMessage
}
func (n Marshaler) Unmarshal(d []byte, v interface{}) error {
switch ve := v.(type) {
case *[]byte:
*ve = d
case *Message:
ve.Body = d
}
return codec.ErrInvalidMessage
}
func (n Marshaler) String() string {
return "bytes"
}

View File

@@ -1,149 +0,0 @@
// Package grpc provides a grpc codec
package grpc
import (
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"github.com/golang/protobuf/proto"
"github.com/unistack-org/micro/v3/codec"
)
type Codec struct {
Conn io.ReadWriteCloser
ContentType string
}
func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error {
if ct := m.Header["Content-Type"]; len(ct) > 0 {
c.ContentType = ct
}
if ct := m.Header["content-type"]; len(ct) > 0 {
c.ContentType = ct
}
// service method
path := m.Header[":path"]
if len(path) == 0 || path[0] != '/' {
m.Target = m.Header["Micro-Service"]
m.Endpoint = m.Header["Micro-Endpoint"]
} else {
// [ , a.package.Foo, Bar]
parts := strings.Split(path, "/")
if len(parts) != 3 {
return errors.New("Unknown request path")
}
service := strings.Split(parts[1], ".")
m.Endpoint = strings.Join([]string{service[len(service)-1], parts[2]}, ".")
m.Target = strings.Join(service[:len(service)-1], ".")
}
return nil
}
func (c *Codec) ReadBody(b interface{}) error {
// no body
if b == nil {
return nil
}
_, buf, err := decode(c.Conn)
if err != nil {
return err
}
switch c.ContentType {
case "application/grpc+json":
return json.Unmarshal(buf, b)
case "application/grpc+proto", "application/grpc":
return proto.Unmarshal(buf, b.(proto.Message))
}
return errors.New("Unsupported Content-Type")
}
func (c *Codec) Write(m *codec.Message, b interface{}) error {
var buf []byte
var err error
if ct := m.Header["Content-Type"]; len(ct) > 0 {
c.ContentType = ct
}
if ct := m.Header["content-type"]; len(ct) > 0 {
c.ContentType = ct
}
switch m.Type {
case codec.Request:
parts := strings.Split(m.Endpoint, ".")
m.Header[":method"] = "POST"
m.Header[":path"] = fmt.Sprintf("/%s.%s/%s", m.Target, parts[0], parts[1])
m.Header[":proto"] = "HTTP/2.0"
m.Header["te"] = "trailers"
m.Header["user-agent"] = "grpc-go/1.0.0"
m.Header[":authority"] = m.Target
m.Header["content-type"] = c.ContentType
case codec.Response:
m.Header["Trailer"] = "grpc-status" //, grpc-message"
m.Header["content-type"] = c.ContentType
m.Header[":status"] = "200"
m.Header["grpc-status"] = "0"
// m.Header["grpc-message"] = ""
case codec.Error:
m.Header["Trailer"] = "grpc-status, grpc-message"
// micro end of stream
if m.Error == "EOS" {
m.Header["grpc-status"] = "0"
} else {
m.Header["grpc-message"] = m.Error
m.Header["grpc-status"] = "13"
}
return nil
}
// marshal content
switch c.ContentType {
case "application/grpc+json":
buf, err = json.Marshal(b)
case "application/grpc+proto", "application/grpc":
pb, ok := b.(proto.Message)
if ok {
buf, err = proto.Marshal(pb)
}
default:
err = errors.New("Unsupported Content-Type")
}
// check error
if err != nil {
m.Header["grpc-status"] = "8"
m.Header["grpc-message"] = err.Error()
return err
}
if len(buf) == 0 {
return nil
}
return encode(0, buf, c.Conn)
}
func (c *Codec) Close() error {
return c.Conn.Close()
}
func (c *Codec) String() string {
return "grpc"
}
func NewCodec(c io.ReadWriteCloser) codec.Codec {
return &Codec{
Conn: c,
ContentType: "application/grpc",
}
}

View File

@@ -1,70 +0,0 @@
package grpc
import (
"encoding/binary"
"fmt"
"io"
)
var (
MaxMessageSize = 1024 * 1024 * 4 // 4Mb
maxInt = int(^uint(0) >> 1)
)
func decode(r io.Reader) (uint8, []byte, error) {
header := make([]byte, 5)
// read the header
if _, err := r.Read(header[:]); err != nil {
return uint8(0), nil, err
}
// get encoding format e.g compressed
cf := uint8(header[0])
// get message length
length := binary.BigEndian.Uint32(header[1:])
// no encoding format
if length == 0 {
return cf, nil, nil
}
//
if int64(length) > int64(maxInt) {
return cf, nil, fmt.Errorf("grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
}
if int(length) > MaxMessageSize {
return cf, nil, fmt.Errorf("grpc: received message larger than max (%d vs. %d)", length, MaxMessageSize)
}
msg := make([]byte, int(length))
if _, err := r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return cf, nil, err
}
return cf, msg, nil
}
func encode(cf uint8, buf []byte, w io.Writer) error {
header := make([]byte, 5)
// set compression
header[0] = byte(cf)
// write length as header
binary.BigEndian.PutUint32(header[1:], uint32(len(buf)))
// read the header
if _, err := w.Write(header[:]); err != nil {
return err
}
// write the buffer
_, err := w.Write(buf)
return err
}