101
http.go
101
http.go
@@ -10,14 +10,14 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/micro/go-log"
|
||||
"github.com/micro/go-micro/broker"
|
||||
"github.com/micro/go-micro/cmd"
|
||||
"github.com/micro/go-micro/codec"
|
||||
"github.com/micro/go-micro/codec/jsonrpc"
|
||||
"github.com/micro/go-micro/codec/protorpc"
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/server"
|
||||
"github.com/unistack-org/micro/v3/broker"
|
||||
"github.com/unistack-org/micro/v3/codec"
|
||||
jsonrpc "github.com/unistack-org/micro/v3/codec/jsonrpc"
|
||||
protorpc "github.com/unistack-org/micro/v3/codec/protorpc"
|
||||
"github.com/unistack-org/micro/v3/logger"
|
||||
"github.com/unistack-org/micro/v3/registry"
|
||||
"github.com/unistack-org/micro/v3/server"
|
||||
regutil "github.com/unistack-org/micro/v3/util/registry"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -31,7 +31,7 @@ var (
|
||||
)
|
||||
|
||||
type httpServer struct {
|
||||
sync.Mutex
|
||||
sync.RWMutex
|
||||
opts server.Options
|
||||
hd server.Handler
|
||||
exit chan chan error
|
||||
@@ -41,10 +41,6 @@ type httpServer struct {
|
||||
registered bool
|
||||
}
|
||||
|
||||
func init() {
|
||||
cmd.DefaultServers["http"] = NewServer
|
||||
}
|
||||
|
||||
func (h *httpServer) newCodec(contentType string) (codec.NewCodec, error) {
|
||||
if cf, ok := h.opts.Codecs[contentType]; ok {
|
||||
return cf, nil
|
||||
@@ -121,7 +117,7 @@ func (h *httpServer) Subscribe(sb server.Subscriber) error {
|
||||
return fmt.Errorf("invalid subscriber: no handler functions")
|
||||
}
|
||||
|
||||
if err := validateSubscriber(sb); err != nil {
|
||||
if err := server.ValidateSubscriber(sb); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -141,7 +137,11 @@ func (h *httpServer) Register() error {
|
||||
eps := h.hd.Endpoints()
|
||||
h.Unlock()
|
||||
|
||||
service := serviceDef(opts)
|
||||
service, err := regutil.NewService(h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
service.Metadata["protocol"] = "http"
|
||||
service.Endpoints = eps
|
||||
|
||||
h.Lock()
|
||||
@@ -165,7 +165,7 @@ func (h *httpServer) Register() error {
|
||||
}
|
||||
|
||||
h.registerOnce.Do(func() {
|
||||
log.Logf("Registering node: %s", opts.Name+"-"+opts.Id)
|
||||
logger.Infof("Registering node: %s", opts.Name+"-"+opts.Id)
|
||||
})
|
||||
|
||||
if err := opts.Registry.Register(service, rOpts...); err != nil {
|
||||
@@ -180,12 +180,17 @@ func (h *httpServer) Register() error {
|
||||
}
|
||||
h.registered = true
|
||||
|
||||
for sb, _ := range h.subscribers {
|
||||
for sb := range h.subscribers {
|
||||
handler := h.createSubHandler(sb, opts)
|
||||
var subOpts []broker.SubscribeOption
|
||||
if queue := sb.Options().Queue; len(queue) > 0 {
|
||||
subOpts = append(subOpts, broker.Queue(queue))
|
||||
}
|
||||
|
||||
if !sb.Options().AutoAck {
|
||||
subOpts = append(subOpts, broker.DisableAutoAck())
|
||||
}
|
||||
|
||||
sub, err := opts.Broker.Subscribe(sb.Topic(), handler, subOpts...)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -200,9 +205,13 @@ func (h *httpServer) Deregister() error {
|
||||
opts := h.opts
|
||||
h.Unlock()
|
||||
|
||||
log.Logf("Deregistering node: %s", opts.Name+"-"+opts.Id)
|
||||
logger.Infof("Deregistering node: %s", opts.Name+"-"+opts.Id)
|
||||
|
||||
service, err := regutil.NewService(h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
service := serviceDef(opts)
|
||||
if err := opts.Registry.Deregister(service); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -216,7 +225,7 @@ func (h *httpServer) Deregister() error {
|
||||
|
||||
for sb, subs := range h.subscribers {
|
||||
for _, sub := range subs {
|
||||
log.Logf("Unsubscribing from topic: %s", sub.Topic())
|
||||
logger.Infof("Unsubscribing from topic: %s", sub.Topic())
|
||||
sub.Unsubscribe()
|
||||
}
|
||||
h.subscribers[sb] = nil
|
||||
@@ -231,12 +240,14 @@ func (h *httpServer) Start() error {
|
||||
hd := h.hd
|
||||
h.Unlock()
|
||||
|
||||
config := h.Options()
|
||||
|
||||
ln, err := net.Listen("tcp", opts.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Logf("Listening on %s", ln.Addr().String())
|
||||
logger.Infof("Listening on %s", ln.Addr().String())
|
||||
|
||||
h.Lock()
|
||||
h.opts.Address = ln.Addr().String()
|
||||
@@ -251,9 +262,14 @@ func (h *httpServer) Start() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// register
|
||||
if err = h.Register(); err != nil {
|
||||
return err
|
||||
if err = h.opts.RegisterCheck(h.opts.Context); err != nil {
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, err)
|
||||
}
|
||||
} else {
|
||||
if err = h.Register(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
go http.Serve(ln, handler)
|
||||
@@ -275,8 +291,34 @@ func (h *httpServer) Start() error {
|
||||
select {
|
||||
// register self on interval
|
||||
case <-t.C:
|
||||
h.RLock()
|
||||
registered := h.registered
|
||||
h.RUnlock()
|
||||
rerr := h.opts.RegisterCheck(h.opts.Context)
|
||||
if rerr != nil && registered {
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf("Server %s-%s register check error: %s, deregister it", config.Name, config.Id, rerr)
|
||||
}
|
||||
// deregister self in case of error
|
||||
if err := h.Deregister(); err != nil {
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf("Server %s-%s deregister error: %s", config.Name, config.Id, err)
|
||||
}
|
||||
}
|
||||
} else if rerr != nil && !registered {
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf("Server %s-%s register check error: %s", config.Name, config.Id, rerr)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := h.Register(); err != nil {
|
||||
log.Log("Server register error: ", err)
|
||||
if logger.V(logger.ErrorLevel) {
|
||||
logger.Errorf("Server %s-%s register error: %s", config.Name, config.Id, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := h.Register(); err != nil {
|
||||
logger.Error("Server register error: ", err)
|
||||
}
|
||||
// wait for exit
|
||||
case ch = <-h.exit:
|
||||
@@ -305,14 +347,11 @@ func (h *httpServer) String() string {
|
||||
return "http"
|
||||
}
|
||||
|
||||
func newServer(opts ...server.Option) server.Server {
|
||||
func NewServer(opts ...server.Option) server.Server {
|
||||
options := server.NewOptions(opts...)
|
||||
return &httpServer{
|
||||
opts: newOptions(opts...),
|
||||
opts: options,
|
||||
exit: make(chan chan error),
|
||||
subscribers: make(map[*httpSubscriber][]broker.Subscriber),
|
||||
}
|
||||
}
|
||||
|
||||
func NewServer(opts ...server.Option) server.Server {
|
||||
return newServer(opts...)
|
||||
}
|
||||
|
Reference in New Issue
Block a user