212 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			212 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Package registry is a go-micro/registry handler
 | 
						|
package registry
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"io/ioutil"
 | 
						|
	"net/http"
 | 
						|
	"strconv"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/gorilla/websocket"
 | 
						|
	"github.com/micro/go-micro/v2/api/handler"
 | 
						|
	"github.com/micro/go-micro/v2/registry"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	Handler = "registry"
 | 
						|
 | 
						|
	pingTime      = (readDeadline * 9) / 10
 | 
						|
	readLimit     = 16384
 | 
						|
	readDeadline  = 60 * time.Second
 | 
						|
	writeDeadline = 10 * time.Second
 | 
						|
)
 | 
						|
 | 
						|
type registryHandler struct {
 | 
						|
	opts handler.Options
 | 
						|
	reg  registry.Registry
 | 
						|
}
 | 
						|
 | 
						|
func (rh *registryHandler) add(w http.ResponseWriter, r *http.Request) {
 | 
						|
	r.ParseForm()
 | 
						|
	b, err := ioutil.ReadAll(r.Body)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(w, err.Error(), 500)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	defer r.Body.Close()
 | 
						|
 | 
						|
	var opts []registry.RegisterOption
 | 
						|
 | 
						|
	// parse ttl
 | 
						|
	if ttl := r.Form.Get("ttl"); len(ttl) > 0 {
 | 
						|
		d, err := time.ParseDuration(ttl)
 | 
						|
		if err == nil {
 | 
						|
			opts = append(opts, registry.RegisterTTL(d))
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	var service *registry.Service
 | 
						|
	err = json.Unmarshal(b, &service)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(w, err.Error(), 500)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	err = rh.reg.Register(service, opts...)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(w, err.Error(), 500)
 | 
						|
		return
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (rh *registryHandler) del(w http.ResponseWriter, r *http.Request) {
 | 
						|
	r.ParseForm()
 | 
						|
	b, err := ioutil.ReadAll(r.Body)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(w, err.Error(), 500)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	defer r.Body.Close()
 | 
						|
 | 
						|
	var service *registry.Service
 | 
						|
	err = json.Unmarshal(b, &service)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(w, err.Error(), 500)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	err = rh.reg.Deregister(service)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(w, err.Error(), 500)
 | 
						|
		return
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (rh *registryHandler) get(w http.ResponseWriter, r *http.Request) {
 | 
						|
	r.ParseForm()
 | 
						|
	service := r.Form.Get("service")
 | 
						|
 | 
						|
	var s []*registry.Service
 | 
						|
	var err error
 | 
						|
 | 
						|
	if len(service) == 0 {
 | 
						|
		//
 | 
						|
		upgrade := r.Header.Get("Upgrade")
 | 
						|
		connect := r.Header.Get("Connection")
 | 
						|
 | 
						|
		// watch if websockets
 | 
						|
		if upgrade == "websocket" && connect == "Upgrade" {
 | 
						|
			rw, err := rh.reg.Watch()
 | 
						|
			if err != nil {
 | 
						|
				http.Error(w, err.Error(), 500)
 | 
						|
				return
 | 
						|
			}
 | 
						|
			watch(rw, w, r)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// otherwise list services
 | 
						|
		s, err = rh.reg.ListServices()
 | 
						|
	} else {
 | 
						|
		s, err = rh.reg.GetService(service)
 | 
						|
	}
 | 
						|
 | 
						|
	if err != nil {
 | 
						|
		http.Error(w, err.Error(), 500)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if s == nil || (len(service) > 0 && (len(s) == 0 || len(s[0].Name) == 0)) {
 | 
						|
		http.Error(w, "Service not found", 404)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	b, err := json.Marshal(s)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(w, err.Error(), 500)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	w.Header().Set("Content-Type", "application/json")
 | 
						|
	w.Header().Set("Content-Length", strconv.Itoa(len(b)))
 | 
						|
	w.Write(b)
 | 
						|
}
 | 
						|
 | 
						|
func ping(ws *websocket.Conn, exit chan bool) {
 | 
						|
	ticker := time.NewTicker(pingTime)
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ticker.C:
 | 
						|
			ws.SetWriteDeadline(time.Now().Add(writeDeadline))
 | 
						|
			err := ws.WriteMessage(websocket.PingMessage, []byte{})
 | 
						|
			if err != nil {
 | 
						|
				return
 | 
						|
			}
 | 
						|
		case <-exit:
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func watch(rw registry.Watcher, w http.ResponseWriter, r *http.Request) {
 | 
						|
	upgrader := websocket.Upgrader{
 | 
						|
		ReadBufferSize:  1024,
 | 
						|
		WriteBufferSize: 1024,
 | 
						|
	}
 | 
						|
 | 
						|
	ws, err := upgrader.Upgrade(w, r, nil)
 | 
						|
	if err != nil {
 | 
						|
		http.Error(w, err.Error(), 500)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	// we need an exit chan
 | 
						|
	exit := make(chan bool)
 | 
						|
 | 
						|
	defer func() {
 | 
						|
		close(exit)
 | 
						|
	}()
 | 
						|
 | 
						|
	// ping the socket
 | 
						|
	go ping(ws, exit)
 | 
						|
 | 
						|
	for {
 | 
						|
		// get next result
 | 
						|
		r, err := rw.Next()
 | 
						|
		if err != nil {
 | 
						|
			http.Error(w, err.Error(), 500)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// write to client
 | 
						|
		ws.SetWriteDeadline(time.Now().Add(writeDeadline))
 | 
						|
		if err := ws.WriteJSON(r); err != nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (rh *registryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | 
						|
	switch r.Method {
 | 
						|
	case "GET":
 | 
						|
		rh.get(w, r)
 | 
						|
	case "POST":
 | 
						|
		rh.add(w, r)
 | 
						|
	case "DELETE":
 | 
						|
		rh.del(w, r)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (rh *registryHandler) String() string {
 | 
						|
	return "registry"
 | 
						|
}
 | 
						|
 | 
						|
func NewHandler(opts ...handler.Option) handler.Handler {
 | 
						|
	options := handler.NewOptions(opts...)
 | 
						|
 | 
						|
	return ®istryHandler{
 | 
						|
		opts: options,
 | 
						|
		reg:  options.Service.Client().Options().Registry,
 | 
						|
	}
 | 
						|
}
 |