Add http server which implements go-micro.Server
This commit is contained in:
148
http.go
Normal file
148
http.go
Normal file
@@ -0,0 +1,148 @@
|
||||
// Package http implements a go-micro.Server
|
||||
package http
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/server"
|
||||
)
|
||||
|
||||
type httpServer struct {
|
||||
sync.Mutex
|
||||
opts server.Options
|
||||
hd server.Handler
|
||||
exit chan chan error
|
||||
}
|
||||
|
||||
func (h *httpServer) Options() server.Options {
|
||||
h.Lock()
|
||||
opts := h.opts
|
||||
h.Unlock()
|
||||
return opts
|
||||
}
|
||||
|
||||
func (h *httpServer) Init(opts ...server.Option) error {
|
||||
h.Lock()
|
||||
for _, o := range opts {
|
||||
o(&h.opts)
|
||||
}
|
||||
h.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpServer) Handle(handler server.Handler) error {
|
||||
if _, ok := handler.Handler().(http.Handler); !ok {
|
||||
return errors.New("Handle requires http.Handler")
|
||||
}
|
||||
h.Lock()
|
||||
h.hd = handler
|
||||
h.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
|
||||
var options server.HandlerOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return &httpHandler{
|
||||
opts: options,
|
||||
hd: handler,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
||||
var options server.SubscriberOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return &httpSubscriber{
|
||||
opts: options,
|
||||
topic: topic,
|
||||
hd: handler,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *httpServer) Subscribe(s server.Subscriber) error {
|
||||
return errors.New("subscribe is not supported")
|
||||
}
|
||||
|
||||
func (h *httpServer) Register() error {
|
||||
h.Lock()
|
||||
opts := h.opts
|
||||
h.Unlock()
|
||||
|
||||
service := serviceDef(opts)
|
||||
|
||||
rOpts := []registry.RegisterOption{
|
||||
registry.RegisterTTL(opts.RegisterTTL),
|
||||
}
|
||||
|
||||
return opts.Registry.Register(service, rOpts...)
|
||||
}
|
||||
|
||||
func (h *httpServer) Deregister() error {
|
||||
h.Lock()
|
||||
opts := h.opts
|
||||
h.Unlock()
|
||||
|
||||
service := serviceDef(opts)
|
||||
return opts.Registry.Deregister(service)
|
||||
}
|
||||
|
||||
func (h *httpServer) Start() error {
|
||||
h.Lock()
|
||||
opts := h.opts
|
||||
hd := h.hd
|
||||
h.Unlock()
|
||||
|
||||
ln, err := net.Listen("tcp", opts.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
h.Lock()
|
||||
h.opts.Address = ln.Addr().String()
|
||||
h.Unlock()
|
||||
|
||||
handler, ok := hd.Handler().(http.Handler)
|
||||
if !ok {
|
||||
return errors.New("Server required http.Handler")
|
||||
}
|
||||
|
||||
go http.Serve(ln, handler)
|
||||
|
||||
go func() {
|
||||
ch := <-h.exit
|
||||
ch <- ln.Close()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpServer) Stop() error {
|
||||
ch := make(chan error)
|
||||
h.exit <- ch
|
||||
return <-ch
|
||||
}
|
||||
|
||||
func (h *httpServer) String() string {
|
||||
return "http"
|
||||
}
|
||||
|
||||
func newServer(opts ...server.Option) server.Server {
|
||||
return &httpServer{
|
||||
opts: newOptions(opts...),
|
||||
exit: make(chan chan error),
|
||||
}
|
||||
}
|
||||
|
||||
func NewServer(opts ...server.Option) server.Server {
|
||||
return newServer(opts...)
|
||||
}
|
Reference in New Issue
Block a user