2016-06-30 22:21:57 +03:00
|
|
|
// Package http implements a go-micro.Server
|
|
|
|
package http
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
2018-12-19 10:33:23 +03:00
|
|
|
"fmt"
|
2016-06-30 22:21:57 +03:00
|
|
|
"net"
|
|
|
|
"net/http"
|
2018-12-19 10:33:23 +03:00
|
|
|
"sort"
|
2016-06-30 22:21:57 +03:00
|
|
|
"sync"
|
2019-02-01 12:11:25 +03:00
|
|
|
"time"
|
2016-06-30 22:21:57 +03:00
|
|
|
|
2020-10-10 00:38:35 +03:00
|
|
|
"github.com/unistack-org/micro/v3/broker"
|
|
|
|
"github.com/unistack-org/micro/v3/codec"
|
|
|
|
jsonrpc "github.com/unistack-org/micro/v3/codec/jsonrpc"
|
|
|
|
protorpc "github.com/unistack-org/micro/v3/codec/protorpc"
|
|
|
|
"github.com/unistack-org/micro/v3/logger"
|
|
|
|
"github.com/unistack-org/micro/v3/registry"
|
|
|
|
"github.com/unistack-org/micro/v3/server"
|
2016-06-30 22:21:57 +03:00
|
|
|
)
|
|
|
|
|
2018-12-19 10:33:23 +03:00
|
|
|
var (
|
|
|
|
defaultCodecs = map[string]codec.NewCodec{
|
|
|
|
"application/json": jsonrpc.NewCodec,
|
|
|
|
"application/json-rpc": jsonrpc.NewCodec,
|
|
|
|
"application/protobuf": protorpc.NewCodec,
|
|
|
|
"application/proto-rpc": protorpc.NewCodec,
|
|
|
|
"application/octet-stream": protorpc.NewCodec,
|
|
|
|
}
|
|
|
|
)
|
2018-12-19 13:22:27 +03:00
|
|
|
|
2016-06-30 22:21:57 +03:00
|
|
|
type httpServer struct {
|
2020-10-10 00:38:35 +03:00
|
|
|
sync.RWMutex
|
2018-08-09 13:31:22 +03:00
|
|
|
opts server.Options
|
|
|
|
hd server.Handler
|
|
|
|
exit chan chan error
|
|
|
|
registerOnce sync.Once
|
2018-12-19 12:47:03 +03:00
|
|
|
subscribers map[*httpSubscriber][]broker.Subscriber
|
2018-12-19 13:22:27 +03:00
|
|
|
// used for first registration
|
|
|
|
registered bool
|
2016-06-30 22:21:57 +03:00
|
|
|
}
|
|
|
|
|
2018-12-19 10:33:23 +03:00
|
|
|
func (h *httpServer) newCodec(contentType string) (codec.NewCodec, error) {
|
|
|
|
if cf, ok := h.opts.Codecs[contentType]; ok {
|
|
|
|
return cf, nil
|
|
|
|
}
|
|
|
|
if cf, ok := defaultCodecs[contentType]; ok {
|
|
|
|
return cf, nil
|
|
|
|
}
|
|
|
|
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
|
|
|
|
}
|
|
|
|
|
2016-06-30 22:21:57 +03:00
|
|
|
func (h *httpServer) Options() server.Options {
|
|
|
|
h.Lock()
|
|
|
|
opts := h.opts
|
|
|
|
h.Unlock()
|
|
|
|
return opts
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *httpServer) Init(opts ...server.Option) error {
|
|
|
|
h.Lock()
|
|
|
|
for _, o := range opts {
|
|
|
|
o(&h.opts)
|
|
|
|
}
|
|
|
|
h.Unlock()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *httpServer) Handle(handler server.Handler) error {
|
|
|
|
if _, ok := handler.Handler().(http.Handler); !ok {
|
|
|
|
return errors.New("Handle requires http.Handler")
|
|
|
|
}
|
|
|
|
h.Lock()
|
|
|
|
h.hd = handler
|
|
|
|
h.Unlock()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *httpServer) NewHandler(handler interface{}, opts ...server.HandlerOption) server.Handler {
|
2017-04-03 17:03:46 +03:00
|
|
|
options := server.HandlerOptions{
|
|
|
|
Metadata: make(map[string]map[string]string),
|
|
|
|
}
|
|
|
|
|
2016-06-30 22:21:57 +03:00
|
|
|
for _, o := range opts {
|
|
|
|
o(&options)
|
|
|
|
}
|
|
|
|
|
2017-04-03 17:03:46 +03:00
|
|
|
var eps []*registry.Endpoint
|
|
|
|
|
|
|
|
if !options.Internal {
|
|
|
|
for name, metadata := range options.Metadata {
|
|
|
|
eps = append(eps, ®istry.Endpoint{
|
|
|
|
Name: name,
|
|
|
|
Metadata: metadata,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-30 22:21:57 +03:00
|
|
|
return &httpHandler{
|
2017-04-03 17:03:46 +03:00
|
|
|
eps: eps,
|
2016-06-30 22:21:57 +03:00
|
|
|
hd: handler,
|
2017-04-03 17:03:46 +03:00
|
|
|
opts: options,
|
2016-06-30 22:21:57 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *httpServer) NewSubscriber(topic string, handler interface{}, opts ...server.SubscriberOption) server.Subscriber {
|
2018-12-19 10:33:23 +03:00
|
|
|
return newSubscriber(topic, handler, opts...)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *httpServer) Subscribe(sb server.Subscriber) error {
|
2018-12-19 12:47:03 +03:00
|
|
|
sub, ok := sb.(*httpSubscriber)
|
2018-12-19 10:33:23 +03:00
|
|
|
if !ok {
|
2018-12-19 12:47:03 +03:00
|
|
|
return fmt.Errorf("invalid subscriber: expected *httpSubscriber")
|
2018-12-19 10:33:23 +03:00
|
|
|
}
|
|
|
|
if len(sub.handlers) == 0 {
|
|
|
|
return fmt.Errorf("invalid subscriber: no handler functions")
|
2016-06-30 22:21:57 +03:00
|
|
|
}
|
|
|
|
|
2020-10-10 00:38:35 +03:00
|
|
|
if err := server.ValidateSubscriber(sb); err != nil {
|
2018-12-19 10:33:23 +03:00
|
|
|
return err
|
2016-06-30 22:21:57 +03:00
|
|
|
}
|
|
|
|
|
2018-12-19 10:33:23 +03:00
|
|
|
h.Lock()
|
|
|
|
defer h.Unlock()
|
|
|
|
_, ok = h.subscribers[sub]
|
|
|
|
if ok {
|
|
|
|
return fmt.Errorf("subscriber %v already exists", h)
|
|
|
|
}
|
|
|
|
h.subscribers[sub] = nil
|
|
|
|
return nil
|
2016-06-30 22:21:57 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (h *httpServer) Register() error {
|
|
|
|
h.Lock()
|
|
|
|
opts := h.opts
|
2017-04-03 17:03:46 +03:00
|
|
|
eps := h.hd.Endpoints()
|
2016-06-30 22:21:57 +03:00
|
|
|
h.Unlock()
|
|
|
|
|
2020-10-16 16:20:19 +03:00
|
|
|
service, err := server.NewRegistryService(h)
|
2020-10-10 00:38:35 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-10-16 16:20:19 +03:00
|
|
|
service.Nodes[0].Metadata["protocol"] = "http"
|
2017-04-03 17:03:46 +03:00
|
|
|
service.Endpoints = eps
|
2016-06-30 22:21:57 +03:00
|
|
|
|
2018-12-19 10:33:23 +03:00
|
|
|
h.Lock()
|
2018-12-19 12:47:03 +03:00
|
|
|
var subscriberList []*httpSubscriber
|
2018-12-19 10:33:23 +03:00
|
|
|
for e := range h.subscribers {
|
|
|
|
// Only advertise non internal subscribers
|
|
|
|
if !e.Options().Internal {
|
|
|
|
subscriberList = append(subscriberList, e)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
sort.Slice(subscriberList, func(i, j int) bool {
|
|
|
|
return subscriberList[i].topic > subscriberList[j].topic
|
|
|
|
})
|
|
|
|
for _, e := range subscriberList {
|
|
|
|
service.Endpoints = append(service.Endpoints, e.Endpoints()...)
|
|
|
|
}
|
|
|
|
h.Unlock()
|
|
|
|
|
2016-06-30 22:21:57 +03:00
|
|
|
rOpts := []registry.RegisterOption{
|
|
|
|
registry.RegisterTTL(opts.RegisterTTL),
|
|
|
|
}
|
|
|
|
|
2018-08-09 13:31:22 +03:00
|
|
|
h.registerOnce.Do(func() {
|
2020-10-10 00:38:35 +03:00
|
|
|
logger.Infof("Registering node: %s", opts.Name+"-"+opts.Id)
|
2018-08-09 13:31:22 +03:00
|
|
|
})
|
|
|
|
|
2018-12-19 13:22:27 +03:00
|
|
|
if err := opts.Registry.Register(service, rOpts...); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
h.Lock()
|
|
|
|
defer h.Unlock()
|
|
|
|
|
|
|
|
if h.registered {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
h.registered = true
|
|
|
|
|
2020-10-16 16:20:19 +03:00
|
|
|
subCtx := h.opts.Context
|
|
|
|
|
2020-10-10 00:38:35 +03:00
|
|
|
for sb := range h.subscribers {
|
2018-12-19 13:22:27 +03:00
|
|
|
handler := h.createSubHandler(sb, opts)
|
|
|
|
var subOpts []broker.SubscribeOption
|
|
|
|
if queue := sb.Options().Queue; len(queue) > 0 {
|
|
|
|
subOpts = append(subOpts, broker.Queue(queue))
|
|
|
|
}
|
2020-10-10 00:38:35 +03:00
|
|
|
|
2020-10-16 16:20:19 +03:00
|
|
|
if cx := sb.Options().Context; cx != nil {
|
|
|
|
subCtx = cx
|
|
|
|
}
|
|
|
|
|
2020-10-10 00:38:35 +03:00
|
|
|
if !sb.Options().AutoAck {
|
|
|
|
subOpts = append(subOpts, broker.DisableAutoAck())
|
|
|
|
}
|
|
|
|
|
2020-10-16 16:20:19 +03:00
|
|
|
sub, err := opts.Broker.Subscribe(subCtx, sb.Topic(), handler, subOpts...)
|
2018-12-19 13:22:27 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
h.subscribers[sb] = []broker.Subscriber{sub}
|
|
|
|
}
|
|
|
|
return nil
|
2016-06-30 22:21:57 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (h *httpServer) Deregister() error {
|
|
|
|
h.Lock()
|
|
|
|
opts := h.opts
|
|
|
|
h.Unlock()
|
|
|
|
|
2020-10-10 00:38:35 +03:00
|
|
|
logger.Infof("Deregistering node: %s", opts.Name+"-"+opts.Id)
|
|
|
|
|
2020-10-16 16:20:19 +03:00
|
|
|
service, err := server.NewRegistryService(h)
|
2020-10-10 00:38:35 +03:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-08-09 13:31:22 +03:00
|
|
|
|
2018-12-19 12:56:52 +03:00
|
|
|
if err := opts.Registry.Deregister(service); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
h.Lock()
|
2018-12-19 13:22:27 +03:00
|
|
|
if !h.registered {
|
|
|
|
h.Unlock()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
h.registered = false
|
|
|
|
|
2020-10-16 16:20:19 +03:00
|
|
|
subCtx := h.opts.Context
|
2018-12-19 12:56:52 +03:00
|
|
|
for sb, subs := range h.subscribers {
|
2020-10-16 16:46:39 +03:00
|
|
|
if cx := sb.Options().Context; cx != nil {
|
|
|
|
subCtx = cx
|
|
|
|
}
|
|
|
|
|
2018-12-19 12:56:52 +03:00
|
|
|
for _, sub := range subs {
|
2020-10-10 00:38:35 +03:00
|
|
|
logger.Infof("Unsubscribing from topic: %s", sub.Topic())
|
2020-10-16 16:20:19 +03:00
|
|
|
if err := sub.Unsubscribe(subCtx); err != nil {
|
|
|
|
logger.Errorf("failed to unsubscribe topic: %s, error: %v", sb.Topic(), err)
|
|
|
|
return err
|
|
|
|
}
|
2018-12-19 12:56:52 +03:00
|
|
|
}
|
|
|
|
h.subscribers[sb] = nil
|
|
|
|
}
|
|
|
|
h.Unlock()
|
|
|
|
return nil
|
2016-06-30 22:21:57 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (h *httpServer) Start() error {
|
|
|
|
h.Lock()
|
|
|
|
opts := h.opts
|
|
|
|
hd := h.hd
|
|
|
|
h.Unlock()
|
|
|
|
|
2020-10-10 00:38:35 +03:00
|
|
|
config := h.Options()
|
|
|
|
|
2016-06-30 22:21:57 +03:00
|
|
|
ln, err := net.Listen("tcp", opts.Address)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-10-10 00:38:35 +03:00
|
|
|
logger.Infof("Listening on %s", ln.Addr().String())
|
2018-08-09 13:31:22 +03:00
|
|
|
|
2016-06-30 22:21:57 +03:00
|
|
|
h.Lock()
|
|
|
|
h.opts.Address = ln.Addr().String()
|
|
|
|
h.Unlock()
|
|
|
|
|
|
|
|
handler, ok := hd.Handler().(http.Handler)
|
|
|
|
if !ok {
|
|
|
|
return errors.New("Server required http.Handler")
|
|
|
|
}
|
|
|
|
|
2020-10-16 16:20:19 +03:00
|
|
|
if err = opts.Broker.Connect(h.opts.Context); err != nil {
|
2019-02-27 07:28:03 +03:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-10-10 00:38:35 +03:00
|
|
|
if err = h.opts.RegisterCheck(h.opts.Context); err != nil {
|
|
|
|
if logger.V(logger.ErrorLevel) {
|
|
|
|
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if err = h.Register(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2019-02-27 07:28:03 +03:00
|
|
|
}
|
2019-01-24 16:58:56 +03:00
|
|
|
|
2016-06-30 22:21:57 +03:00
|
|
|
go http.Serve(ln, handler)
|
|
|
|
|
|
|
|
go func() {
|
2019-02-01 12:11:25 +03:00
|
|
|
t := new(time.Ticker)
|
|
|
|
|
|
|
|
// only process if it exists
|
|
|
|
if opts.RegisterInterval > time.Duration(0) {
|
|
|
|
// new ticker
|
|
|
|
t = time.NewTicker(opts.RegisterInterval)
|
|
|
|
}
|
|
|
|
|
|
|
|
// return error chan
|
|
|
|
var ch chan error
|
|
|
|
|
|
|
|
Loop:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
// register self on interval
|
|
|
|
case <-t.C:
|
2020-10-10 00:38:35 +03:00
|
|
|
h.RLock()
|
|
|
|
registered := h.registered
|
|
|
|
h.RUnlock()
|
|
|
|
rerr := h.opts.RegisterCheck(h.opts.Context)
|
|
|
|
if rerr != nil && registered {
|
|
|
|
if logger.V(logger.ErrorLevel) {
|
|
|
|
logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
|
|
|
|
}
|
|
|
|
// deregister self in case of error
|
|
|
|
if err := h.Deregister(); err != nil {
|
|
|
|
if logger.V(logger.ErrorLevel) {
|
|
|
|
logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if rerr != nil && !registered {
|
|
|
|
if logger.V(logger.ErrorLevel) {
|
|
|
|
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, rerr)
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if err := h.Register(); err != nil {
|
|
|
|
if logger.V(logger.ErrorLevel) {
|
|
|
|
logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-01 12:11:25 +03:00
|
|
|
if err := h.Register(); err != nil {
|
2020-10-10 00:38:35 +03:00
|
|
|
logger.Error("Server register error: ", err)
|
2019-02-01 12:11:25 +03:00
|
|
|
}
|
|
|
|
// wait for exit
|
|
|
|
case ch = <-h.exit:
|
|
|
|
break Loop
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-30 22:21:57 +03:00
|
|
|
ch <- ln.Close()
|
2018-12-20 06:31:36 +03:00
|
|
|
|
2019-01-24 16:58:56 +03:00
|
|
|
// deregister
|
|
|
|
h.Deregister()
|
|
|
|
|
2020-10-16 16:20:19 +03:00
|
|
|
opts.Broker.Disconnect(h.opts.Context)
|
2016-06-30 22:21:57 +03:00
|
|
|
}()
|
|
|
|
|
2019-02-27 07:28:03 +03:00
|
|
|
return nil
|
2016-06-30 22:21:57 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func (h *httpServer) Stop() error {
|
|
|
|
ch := make(chan error)
|
|
|
|
h.exit <- ch
|
|
|
|
return <-ch
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *httpServer) String() string {
|
|
|
|
return "http"
|
|
|
|
}
|
|
|
|
|
2020-10-10 00:38:35 +03:00
|
|
|
func NewServer(opts ...server.Option) server.Server {
|
|
|
|
options := server.NewOptions(opts...)
|
2016-06-30 22:21:57 +03:00
|
|
|
return &httpServer{
|
2020-10-10 00:38:35 +03:00
|
|
|
opts: options,
|
2018-12-19 13:22:27 +03:00
|
|
|
exit: make(chan chan error),
|
2018-12-19 12:53:34 +03:00
|
|
|
subscribers: make(map[*httpSubscriber][]broker.Subscriber),
|
2016-06-30 22:21:57 +03:00
|
|
|
}
|
|
|
|
}
|