Remove unused handlers
This commit is contained in:
parent
c5d085cff8
commit
9a73828782
@ -1,292 +0,0 @@
|
|||||||
// Package broker provides a go-micro/broker handler
|
|
||||||
package broker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
"github.com/micro/go-micro/v2/api/handler"
|
|
||||||
"github.com/micro/go-micro/v2/broker"
|
|
||||||
"github.com/micro/go-micro/v2/logger"
|
|
||||||
"github.com/oxtoacart/bpool"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
bufferPool = bpool.NewSizedBufferPool(1024, 8)
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
Handler = "broker"
|
|
||||||
|
|
||||||
pingTime = (readDeadline * 9) / 10
|
|
||||||
readLimit = 16384
|
|
||||||
readDeadline = 60 * time.Second
|
|
||||||
writeDeadline = 10 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
type brokerHandler struct {
|
|
||||||
once atomic.Value
|
|
||||||
opts handler.Options
|
|
||||||
u websocket.Upgrader
|
|
||||||
}
|
|
||||||
|
|
||||||
type conn struct {
|
|
||||||
b broker.Broker
|
|
||||||
cType string
|
|
||||||
topic string
|
|
||||||
queue string
|
|
||||||
exit chan bool
|
|
||||||
|
|
||||||
sync.Mutex
|
|
||||||
ws *websocket.Conn
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
contentType = "text/plain"
|
|
||||||
)
|
|
||||||
|
|
||||||
func checkOrigin(r *http.Request) bool {
|
|
||||||
origin := r.Header["Origin"]
|
|
||||||
if len(origin) == 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
u, err := url.Parse(origin[0])
|
|
||||||
if err != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return u.Host == r.Host
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *conn) close() {
|
|
||||||
select {
|
|
||||||
case <-c.exit:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
close(c.exit)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *conn) readLoop() {
|
|
||||||
defer func() {
|
|
||||||
c.close()
|
|
||||||
c.ws.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// set read limit/deadline
|
|
||||||
c.ws.SetReadLimit(readLimit)
|
|
||||||
c.ws.SetReadDeadline(time.Now().Add(readDeadline))
|
|
||||||
|
|
||||||
// set close handler
|
|
||||||
ch := c.ws.CloseHandler()
|
|
||||||
c.ws.SetCloseHandler(func(code int, text string) error {
|
|
||||||
err := ch(code, text)
|
|
||||||
c.close()
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
|
|
||||||
// set pong handler
|
|
||||||
c.ws.SetPongHandler(func(string) error {
|
|
||||||
c.ws.SetReadDeadline(time.Now().Add(readDeadline))
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
for {
|
|
||||||
_, message, err := c.ws.ReadMessage()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.b.Publish(c.topic, &broker.Message{
|
|
||||||
Header: map[string]string{"Content-Type": c.cType},
|
|
||||||
Body: message,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *conn) write(mType int, data []byte) error {
|
|
||||||
c.Lock()
|
|
||||||
c.ws.SetWriteDeadline(time.Now().Add(writeDeadline))
|
|
||||||
err := c.ws.WriteMessage(mType, data)
|
|
||||||
c.Unlock()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *conn) writeLoop() {
|
|
||||||
ticker := time.NewTicker(pingTime)
|
|
||||||
|
|
||||||
var opts []broker.SubscribeOption
|
|
||||||
|
|
||||||
if len(c.queue) > 0 {
|
|
||||||
opts = append(opts, broker.Queue(c.queue))
|
|
||||||
}
|
|
||||||
|
|
||||||
subscriber, err := c.b.Subscribe(c.topic, func(p broker.Event) error {
|
|
||||||
b, err := json.Marshal(p.Message())
|
|
||||||
if err != nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return c.write(websocket.TextMessage, b)
|
|
||||||
}, opts...)
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
subscriber.Unsubscribe()
|
|
||||||
ticker.Stop()
|
|
||||||
c.ws.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
|
||||||
logger.Error(err.Error())
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
if err := c.write(websocket.PingMessage, []byte{}); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-c.exit:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *brokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
||||||
bsize := handler.DefaultMaxRecvSize
|
|
||||||
if b.opts.MaxRecvSize > 0 {
|
|
||||||
bsize = b.opts.MaxRecvSize
|
|
||||||
}
|
|
||||||
|
|
||||||
r.Body = http.MaxBytesReader(w, r.Body, bsize)
|
|
||||||
|
|
||||||
br := b.opts.Service.Client().Options().Broker
|
|
||||||
|
|
||||||
// Setup the broker
|
|
||||||
if !b.once.Load().(bool) {
|
|
||||||
if err := br.Init(); err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
}
|
|
||||||
if err := br.Connect(); err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
}
|
|
||||||
b.once.Store(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse
|
|
||||||
r.ParseForm()
|
|
||||||
topic := r.Form.Get("topic")
|
|
||||||
|
|
||||||
// Can't do anything without a topic
|
|
||||||
if len(topic) == 0 {
|
|
||||||
http.Error(w, "Topic not specified", 400)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Post assumed to be Publish
|
|
||||||
if r.Method == "POST" {
|
|
||||||
// Create a broker message
|
|
||||||
msg := &broker.Message{
|
|
||||||
Header: make(map[string]string),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set header
|
|
||||||
for k, v := range r.Header {
|
|
||||||
msg.Header[k] = strings.Join(v, ", ")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read body
|
|
||||||
buf := bufferPool.Get()
|
|
||||||
defer bufferPool.Put(buf)
|
|
||||||
if _, err := buf.ReadFrom(r.Body); err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Set body
|
|
||||||
msg.Body = buf.Bytes()
|
|
||||||
// Set body
|
|
||||||
|
|
||||||
// Publish
|
|
||||||
br.Publish(topic, msg)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// now back to our regularly scheduled programming
|
|
||||||
|
|
||||||
if r.Method != "GET" {
|
|
||||||
http.Error(w, "Method not allowed", 405)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
queue := r.Form.Get("queue")
|
|
||||||
|
|
||||||
ws, err := b.u.Upgrade(w, r, nil)
|
|
||||||
if err != nil {
|
|
||||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
|
||||||
logger.Error(err.Error())
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
cType := r.Header.Get("Content-Type")
|
|
||||||
if len(cType) == 0 {
|
|
||||||
cType = contentType
|
|
||||||
}
|
|
||||||
|
|
||||||
c := &conn{
|
|
||||||
b: br,
|
|
||||||
cType: cType,
|
|
||||||
topic: topic,
|
|
||||||
queue: queue,
|
|
||||||
exit: make(chan bool),
|
|
||||||
ws: ws,
|
|
||||||
}
|
|
||||||
|
|
||||||
go c.writeLoop()
|
|
||||||
c.readLoop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *brokerHandler) String() string {
|
|
||||||
return "broker"
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewHandler(opts ...handler.Option) handler.Handler {
|
|
||||||
h := &brokerHandler{
|
|
||||||
u: websocket.Upgrader{
|
|
||||||
CheckOrigin: func(r *http.Request) bool {
|
|
||||||
return true
|
|
||||||
},
|
|
||||||
ReadBufferSize: 1024,
|
|
||||||
WriteBufferSize: 1024,
|
|
||||||
},
|
|
||||||
opts: handler.NewOptions(opts...),
|
|
||||||
}
|
|
||||||
h.once.Store(true)
|
|
||||||
return h
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithCors(cors map[string]bool, opts ...handler.Option) handler.Handler {
|
|
||||||
return &brokerHandler{
|
|
||||||
u: websocket.Upgrader{
|
|
||||||
CheckOrigin: func(r *http.Request) bool {
|
|
||||||
if origin := r.Header.Get("Origin"); cors[origin] {
|
|
||||||
return true
|
|
||||||
} else if len(origin) > 0 && cors["*"] {
|
|
||||||
return true
|
|
||||||
} else if checkOrigin(r) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
},
|
|
||||||
ReadBufferSize: 1024,
|
|
||||||
WriteBufferSize: 1024,
|
|
||||||
},
|
|
||||||
opts: handler.NewOptions(opts...),
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,101 +0,0 @@
|
|||||||
// Package cloudevents provides a cloudevents handler publishing the event using the go-micro/client
|
|
||||||
package cloudevents
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/http"
|
|
||||||
"path"
|
|
||||||
"regexp"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/micro/go-micro/v2/api/handler"
|
|
||||||
"github.com/micro/go-micro/v2/util/ctx"
|
|
||||||
)
|
|
||||||
|
|
||||||
type event struct {
|
|
||||||
opts handler.Options
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
Handler = "cloudevents"
|
|
||||||
versionRe = regexp.MustCompilePOSIX("^v[0-9]+$")
|
|
||||||
)
|
|
||||||
|
|
||||||
func eventName(parts []string) string {
|
|
||||||
return strings.Join(parts, ".")
|
|
||||||
}
|
|
||||||
|
|
||||||
func evRoute(ns, p string) (string, string) {
|
|
||||||
p = path.Clean(p)
|
|
||||||
p = strings.TrimPrefix(p, "/")
|
|
||||||
|
|
||||||
if len(p) == 0 {
|
|
||||||
return ns, "event"
|
|
||||||
}
|
|
||||||
|
|
||||||
parts := strings.Split(p, "/")
|
|
||||||
|
|
||||||
// no path
|
|
||||||
if len(parts) == 0 {
|
|
||||||
// topic: namespace
|
|
||||||
// action: event
|
|
||||||
return strings.Trim(ns, "."), "event"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Treat /v[0-9]+ as versioning
|
|
||||||
// /v1/foo/bar => topic: v1.foo action: bar
|
|
||||||
if len(parts) >= 2 && versionRe.Match([]byte(parts[0])) {
|
|
||||||
topic := ns + "." + strings.Join(parts[:2], ".")
|
|
||||||
action := eventName(parts[1:])
|
|
||||||
return topic, action
|
|
||||||
}
|
|
||||||
|
|
||||||
// /foo => topic: ns.foo action: foo
|
|
||||||
// /foo/bar => topic: ns.foo action: bar
|
|
||||||
topic := ns + "." + strings.Join(parts[:1], ".")
|
|
||||||
action := eventName(parts[1:])
|
|
||||||
|
|
||||||
return topic, action
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *event) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
||||||
bsize := handler.DefaultMaxRecvSize
|
|
||||||
if e.opts.MaxRecvSize > 0 {
|
|
||||||
bsize = e.opts.MaxRecvSize
|
|
||||||
}
|
|
||||||
|
|
||||||
r.Body = http.MaxBytesReader(w, r.Body, bsize)
|
|
||||||
|
|
||||||
// request to topic:event
|
|
||||||
// create event
|
|
||||||
// publish to topic
|
|
||||||
topic, _ := evRoute(e.opts.Namespace, r.URL.Path)
|
|
||||||
|
|
||||||
// create event
|
|
||||||
ev, err := FromRequest(r)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// get client
|
|
||||||
c := e.opts.Service.Client()
|
|
||||||
|
|
||||||
// create publication
|
|
||||||
p := c.NewMessage(topic, ev)
|
|
||||||
|
|
||||||
// publish event
|
|
||||||
if err := c.Publish(ctx.FromRequest(r), p); err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *event) String() string {
|
|
||||||
return "cloudevents"
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewHandler(opts ...handler.Option) handler.Handler {
|
|
||||||
return &event{
|
|
||||||
opts: handler.NewOptions(opts...),
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,288 +0,0 @@
|
|||||||
/*
|
|
||||||
* From: https://github.com/serverless/event-gateway/blob/master/event/event.go
|
|
||||||
* Modified: Strip to handler requirements
|
|
||||||
*
|
|
||||||
* Copyright 2017 Serverless, Inc.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
|
|
||||||
package cloudevents
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"mime"
|
|
||||||
"net/http"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
"unicode"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/oxtoacart/bpool"
|
|
||||||
validator "gopkg.in/go-playground/validator.v9"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
bufferPool = bpool.NewSizedBufferPool(1024, 8)
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// TransformationVersion is indicative of the revision of how Event Gateway transforms a request into CloudEvents format.
|
|
||||||
TransformationVersion = "0.1"
|
|
||||||
|
|
||||||
// CloudEventsVersion currently supported by Event Gateway
|
|
||||||
CloudEventsVersion = "0.1"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Event is a default event structure. All data that passes through the Event Gateway
|
|
||||||
// is formatted to a format defined CloudEvents v0.1 spec.
|
|
||||||
type Event struct {
|
|
||||||
EventType string `json:"eventType" validate:"required"`
|
|
||||||
EventTypeVersion string `json:"eventTypeVersion,omitempty"`
|
|
||||||
CloudEventsVersion string `json:"cloudEventsVersion" validate:"required"`
|
|
||||||
Source string `json:"source" validate:"uri,required"`
|
|
||||||
EventID string `json:"eventID" validate:"required"`
|
|
||||||
EventTime *time.Time `json:"eventTime,omitempty"`
|
|
||||||
SchemaURL string `json:"schemaURL,omitempty"`
|
|
||||||
Extensions map[string]interface{} `json:"extensions,omitempty"`
|
|
||||||
ContentType string `json:"contentType,omitempty"`
|
|
||||||
Data interface{} `json:"data"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// New return new instance of Event.
|
|
||||||
func New(eventType string, mimeType string, payload interface{}) *Event {
|
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
event := &Event{
|
|
||||||
EventType: eventType,
|
|
||||||
CloudEventsVersion: CloudEventsVersion,
|
|
||||||
Source: "https://micro.mu",
|
|
||||||
EventID: uuid.New().String(),
|
|
||||||
EventTime: &now,
|
|
||||||
ContentType: mimeType,
|
|
||||||
Data: payload,
|
|
||||||
Extensions: map[string]interface{}{
|
|
||||||
"eventgateway": map[string]interface{}{
|
|
||||||
"transformed": "true",
|
|
||||||
"transformation-version": TransformationVersion,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
event.Data = normalizePayload(event.Data, event.ContentType)
|
|
||||||
return event
|
|
||||||
}
|
|
||||||
|
|
||||||
// FromRequest takes an HTTP request and returns an Event along with path. Most of the implementation
|
|
||||||
// is based on https://github.com/cloudevents/spec/blob/master/http-transport-binding.md.
|
|
||||||
// This function also supports legacy mode where event type is sent in Event header.
|
|
||||||
func FromRequest(r *http.Request) (*Event, error) {
|
|
||||||
contentType := r.Header.Get("Content-Type")
|
|
||||||
mimeType, _, err := mime.ParseMediaType(contentType)
|
|
||||||
if err != nil {
|
|
||||||
if err.Error() != "mime: no media type" {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
mimeType = "application/octet-stream"
|
|
||||||
}
|
|
||||||
// Read request body
|
|
||||||
body := []byte{}
|
|
||||||
if r.Body != nil {
|
|
||||||
buf := bufferPool.Get()
|
|
||||||
defer bufferPool.Put(buf)
|
|
||||||
if _, err := buf.ReadFrom(r.Body); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
body = buf.Bytes()
|
|
||||||
}
|
|
||||||
|
|
||||||
var event *Event
|
|
||||||
if mimeType == mimeCloudEventsJSON { // CloudEvents Structured Content Mode
|
|
||||||
return parseAsCloudEvent(mimeType, body)
|
|
||||||
} else if isCloudEventsBinaryContentMode(r.Header) { // CloudEvents Binary Content Mode
|
|
||||||
return parseAsCloudEventBinary(r.Header, body)
|
|
||||||
} else if isLegacyMode(r.Header) {
|
|
||||||
if mimeType == mimeJSON { // CloudEvent in Legacy Mode
|
|
||||||
event, err = parseAsCloudEvent(mimeType, body)
|
|
||||||
if err != nil {
|
|
||||||
return New(string(r.Header.Get("event")), mimeType, body), nil
|
|
||||||
}
|
|
||||||
return event, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return New(string(r.Header.Get("event")), mimeType, body), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return New("http.request", mimeJSON, newHTTPRequestData(r, body)), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Validate Event struct
|
|
||||||
func (e *Event) Validate() error {
|
|
||||||
validate := validator.New()
|
|
||||||
err := validate.Struct(e)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("CloudEvent not valid: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func isLegacyMode(headers http.Header) bool {
|
|
||||||
if headers.Get("Event") != "" {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func isCloudEventsBinaryContentMode(headers http.Header) bool {
|
|
||||||
if headers.Get("CE-EventType") != "" &&
|
|
||||||
headers.Get("CE-CloudEventsVersion") != "" &&
|
|
||||||
headers.Get("CE-Source") != "" &&
|
|
||||||
headers.Get("CE-EventID") != "" {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseAsCloudEventBinary(headers http.Header, payload interface{}) (*Event, error) {
|
|
||||||
event := &Event{
|
|
||||||
EventType: headers.Get("CE-EventType"),
|
|
||||||
EventTypeVersion: headers.Get("CE-EventTypeVersion"),
|
|
||||||
CloudEventsVersion: headers.Get("CE-CloudEventsVersion"),
|
|
||||||
Source: headers.Get("CE-Source"),
|
|
||||||
EventID: headers.Get("CE-EventID"),
|
|
||||||
ContentType: headers.Get("Content-Type"),
|
|
||||||
Data: payload,
|
|
||||||
}
|
|
||||||
|
|
||||||
err := event.Validate()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if headers.Get("CE-EventTime") != "" {
|
|
||||||
val, err := time.Parse(time.RFC3339, headers.Get("CE-EventTime"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
event.EventTime = &val
|
|
||||||
}
|
|
||||||
|
|
||||||
if val := headers.Get("CE-SchemaURL"); len(val) > 0 {
|
|
||||||
event.SchemaURL = val
|
|
||||||
}
|
|
||||||
|
|
||||||
event.Extensions = map[string]interface{}{}
|
|
||||||
for key, val := range flatten(headers) {
|
|
||||||
if strings.HasPrefix(key, "Ce-X-") {
|
|
||||||
key = strings.TrimLeft(key, "Ce-X-")
|
|
||||||
// Make first character lowercase
|
|
||||||
runes := []rune(key)
|
|
||||||
runes[0] = unicode.ToLower(runes[0])
|
|
||||||
event.Extensions[string(runes)] = val
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
event.Data = normalizePayload(event.Data, event.ContentType)
|
|
||||||
return event, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func flatten(h http.Header) map[string]string {
|
|
||||||
headers := map[string]string{}
|
|
||||||
for key, header := range h {
|
|
||||||
headers[key] = header[0]
|
|
||||||
if len(header) > 1 {
|
|
||||||
headers[key] = strings.Join(header, ", ")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return headers
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseAsCloudEvent(mime string, payload interface{}) (*Event, error) {
|
|
||||||
body, ok := payload.([]byte)
|
|
||||||
if ok {
|
|
||||||
event := &Event{}
|
|
||||||
err := json.Unmarshal(body, event)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = event.Validate()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
event.Data = normalizePayload(event.Data, event.ContentType)
|
|
||||||
return event, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, errors.New("couldn't cast to []byte")
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
mimeJSON = "application/json"
|
|
||||||
mimeFormMultipart = "multipart/form-data"
|
|
||||||
mimeFormURLEncoded = "application/x-www-form-urlencoded"
|
|
||||||
mimeCloudEventsJSON = "application/cloudevents+json"
|
|
||||||
)
|
|
||||||
|
|
||||||
// normalizePayload takes anything, checks if it's []byte array and depending on provided mime
|
|
||||||
// type converts it to either string or map[string]interface to avoid having base64 string after
|
|
||||||
// JSON marshaling.
|
|
||||||
func normalizePayload(payload interface{}, mime string) interface{} {
|
|
||||||
if bytePayload, ok := payload.([]byte); ok && len(bytePayload) > 0 {
|
|
||||||
switch {
|
|
||||||
case mime == mimeJSON || strings.HasSuffix(mime, "+json"):
|
|
||||||
var result map[string]interface{}
|
|
||||||
err := json.Unmarshal(bytePayload, &result)
|
|
||||||
if err != nil {
|
|
||||||
return payload
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
case strings.HasPrefix(mime, mimeFormMultipart), mime == mimeFormURLEncoded:
|
|
||||||
return string(bytePayload)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return payload
|
|
||||||
}
|
|
||||||
|
|
||||||
// HTTPRequestData is a event schema used for sending events to HTTP subscriptions.
|
|
||||||
type HTTPRequestData struct {
|
|
||||||
Headers map[string]string `json:"headers"`
|
|
||||||
Query map[string][]string `json:"query"`
|
|
||||||
Body interface{} `json:"body"`
|
|
||||||
Host string `json:"host"`
|
|
||||||
Path string `json:"path"`
|
|
||||||
Method string `json:"method"`
|
|
||||||
Params map[string]string `json:"params"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewHTTPRequestData returns a new instance of HTTPRequestData
|
|
||||||
func newHTTPRequestData(r *http.Request, eventData interface{}) *HTTPRequestData {
|
|
||||||
req := &HTTPRequestData{
|
|
||||||
Headers: flatten(r.Header),
|
|
||||||
Query: r.URL.Query(),
|
|
||||||
Body: eventData,
|
|
||||||
Host: r.Host,
|
|
||||||
Path: r.URL.Path,
|
|
||||||
Method: r.Method,
|
|
||||||
}
|
|
||||||
|
|
||||||
req.Body = normalizePayload(req.Body, r.Header.Get("content-type"))
|
|
||||||
return req
|
|
||||||
}
|
|
@ -1,16 +0,0 @@
|
|||||||
// Package file serves file relative to the current directory
|
|
||||||
package file
|
|
||||||
|
|
||||||
import (
|
|
||||||
"net/http"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Handler struct{}
|
|
||||||
|
|
||||||
func (h *Handler) Serve(w http.ResponseWriter, r *http.Request) {
|
|
||||||
http.ServeFile(w, r, "."+r.URL.Path)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Handler) String() string {
|
|
||||||
return "file"
|
|
||||||
}
|
|
@ -1,224 +0,0 @@
|
|||||||
// Package registry is a go-micro/registry handler
|
|
||||||
package registry
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"net/http"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
|
||||||
"github.com/micro/go-micro/v2/api/handler"
|
|
||||||
"github.com/micro/go-micro/v2/registry"
|
|
||||||
"github.com/oxtoacart/bpool"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
bufferPool = bpool.NewSizedBufferPool(1024, 8)
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
Handler = "registry"
|
|
||||||
|
|
||||||
pingTime = (readDeadline * 9) / 10
|
|
||||||
readLimit = 16384
|
|
||||||
readDeadline = 60 * time.Second
|
|
||||||
writeDeadline = 10 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
type registryHandler struct {
|
|
||||||
opts handler.Options
|
|
||||||
reg registry.Registry
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rh *registryHandler) add(w http.ResponseWriter, r *http.Request) {
|
|
||||||
r.ParseForm()
|
|
||||||
defer r.Body.Close()
|
|
||||||
|
|
||||||
// Read body
|
|
||||||
buf := bufferPool.Get()
|
|
||||||
defer bufferPool.Put(buf)
|
|
||||||
if _, err := buf.ReadFrom(r.Body); err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var opts []registry.RegisterOption
|
|
||||||
|
|
||||||
// parse ttl
|
|
||||||
if ttl := r.Form.Get("ttl"); len(ttl) > 0 {
|
|
||||||
d, err := time.ParseDuration(ttl)
|
|
||||||
if err == nil {
|
|
||||||
opts = append(opts, registry.RegisterTTL(d))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var service *registry.Service
|
|
||||||
if err := json.NewDecoder(buf).Decode(&service); err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := rh.reg.Register(service, opts...); err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rh *registryHandler) del(w http.ResponseWriter, r *http.Request) {
|
|
||||||
r.ParseForm()
|
|
||||||
defer r.Body.Close()
|
|
||||||
|
|
||||||
// Read body
|
|
||||||
buf := bufferPool.Get()
|
|
||||||
defer bufferPool.Put(buf)
|
|
||||||
if _, err := buf.ReadFrom(r.Body); err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var service *registry.Service
|
|
||||||
if err := json.NewDecoder(buf).Decode(&service); err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := rh.reg.Deregister(service); err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rh *registryHandler) get(w http.ResponseWriter, r *http.Request) {
|
|
||||||
r.ParseForm()
|
|
||||||
service := r.Form.Get("service")
|
|
||||||
|
|
||||||
var s []*registry.Service
|
|
||||||
var err error
|
|
||||||
|
|
||||||
if len(service) == 0 {
|
|
||||||
//
|
|
||||||
upgrade := r.Header.Get("Upgrade")
|
|
||||||
connect := r.Header.Get("Connection")
|
|
||||||
|
|
||||||
// watch if websockets
|
|
||||||
if upgrade == "websocket" && connect == "Upgrade" {
|
|
||||||
rw, err := rh.reg.Watch()
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
watch(rw, w, r)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// otherwise list services
|
|
||||||
s, err = rh.reg.ListServices()
|
|
||||||
} else {
|
|
||||||
s, err = rh.reg.GetService(service)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if s == nil || (len(service) > 0 && (len(s) == 0 || len(s[0].Name) == 0)) {
|
|
||||||
http.Error(w, "Service not found", 404)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
b, err := json.Marshal(s)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json")
|
|
||||||
w.Header().Set("Content-Length", strconv.Itoa(len(b)))
|
|
||||||
w.Write(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
func ping(ws *websocket.Conn, exit chan bool) {
|
|
||||||
ticker := time.NewTicker(pingTime)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ticker.C:
|
|
||||||
ws.SetWriteDeadline(time.Now().Add(writeDeadline))
|
|
||||||
err := ws.WriteMessage(websocket.PingMessage, []byte{})
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-exit:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func watch(rw registry.Watcher, w http.ResponseWriter, r *http.Request) {
|
|
||||||
upgrader := websocket.Upgrader{
|
|
||||||
ReadBufferSize: 1024,
|
|
||||||
WriteBufferSize: 1024,
|
|
||||||
}
|
|
||||||
|
|
||||||
ws, err := upgrader.Upgrade(w, r, nil)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// we need an exit chan
|
|
||||||
exit := make(chan bool)
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
close(exit)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// ping the socket
|
|
||||||
go ping(ws, exit)
|
|
||||||
|
|
||||||
for {
|
|
||||||
// get next result
|
|
||||||
r, err := rw.Next()
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// write to client
|
|
||||||
ws.SetWriteDeadline(time.Now().Add(writeDeadline))
|
|
||||||
if err := ws.WriteJSON(r); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rh *registryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
||||||
bsize := handler.DefaultMaxRecvSize
|
|
||||||
if rh.opts.MaxRecvSize > 0 {
|
|
||||||
bsize = rh.opts.MaxRecvSize
|
|
||||||
}
|
|
||||||
|
|
||||||
r.Body = http.MaxBytesReader(w, r.Body, bsize)
|
|
||||||
|
|
||||||
switch r.Method {
|
|
||||||
case "GET":
|
|
||||||
rh.get(w, r)
|
|
||||||
case "POST":
|
|
||||||
rh.add(w, r)
|
|
||||||
case "DELETE":
|
|
||||||
rh.del(w, r)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rh *registryHandler) String() string {
|
|
||||||
return "registry"
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewHandler(opts ...handler.Option) handler.Handler {
|
|
||||||
options := handler.NewOptions(opts...)
|
|
||||||
|
|
||||||
return ®istryHandler{
|
|
||||||
opts: options,
|
|
||||||
reg: options.Service.Client().Options().Registry,
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,25 +0,0 @@
|
|||||||
// Package udp reads and write from a udp connection
|
|
||||||
package udp
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Handler struct{}
|
|
||||||
|
|
||||||
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
||||||
c, err := net.Dial("udp", r.Host)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
go io.Copy(c, r.Body)
|
|
||||||
// write response
|
|
||||||
io.Copy(w, c)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Handler) String() string {
|
|
||||||
return "udp"
|
|
||||||
}
|
|
@ -1,30 +0,0 @@
|
|||||||
// Package unix reads from a unix socket expecting it to be in /tmp/path
|
|
||||||
package unix
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
"path/filepath"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Handler struct{}
|
|
||||||
|
|
||||||
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
||||||
sock := fmt.Sprintf("%s.sock", filepath.Clean(r.URL.Path))
|
|
||||||
path := filepath.Join("/tmp", sock)
|
|
||||||
|
|
||||||
c, err := net.Dial("unix", path)
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, err.Error(), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
go io.Copy(c, r.Body)
|
|
||||||
// write response
|
|
||||||
io.Copy(w, c)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Handler) String() string {
|
|
||||||
return "unix"
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user