micro/server/rpc_server.go

426 lines
8.6 KiB
Go
Raw Normal View History

2015-01-13 23:31:27 +00:00
package server
import (
2018-03-03 11:53:52 +00:00
"context"
"fmt"
2015-12-21 17:18:04 +00:00
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
2016-05-12 23:32:58 +01:00
"time"
2017-05-11 20:43:42 +01:00
"github.com/micro/go-log"
2015-11-20 16:17:33 +00:00
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
2016-01-28 17:55:28 +00:00
"github.com/micro/go-micro/metadata"
2015-11-20 16:17:33 +00:00
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/transport"
2015-05-23 11:53:40 +01:00
2017-01-12 13:20:34 +00:00
"github.com/micro/misc/lib/addr"
2015-01-13 23:31:27 +00:00
)
2015-05-23 17:40:53 +01:00
type rpcServer struct {
rpc *server
2015-05-26 22:39:48 +01:00
exit chan chan error
sync.RWMutex
opts Options
handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber
// used for first registration
registered bool
// graceful exit
wg sync.WaitGroup
2015-01-13 23:31:27 +00:00
}
2015-05-26 22:39:48 +01:00
func newRpcServer(opts ...Option) Server {
options := newOptions(opts...)
2015-05-23 17:40:53 +01:00
return &rpcServer{
opts: options,
rpc: &server{
name: options.Name,
serviceMap: make(map[string]*service),
hdlrWrappers: options.HdlrWrappers,
},
handlers: make(map[string]Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error),
2015-05-23 17:40:53 +01:00
}
}
2015-01-13 23:31:27 +00:00
2015-05-23 17:40:53 +01:00
func (s *rpcServer) accept(sock transport.Socket) {
2015-12-21 17:18:04 +00:00
defer func() {
2016-05-10 10:55:18 +01:00
// close socket
sock.Close()
2015-12-21 17:18:04 +00:00
if r := recover(); r != nil {
2017-05-16 19:14:00 +01:00
log.Log("panic recovered: ", r)
log.Log(string(debug.Stack()))
2015-12-21 17:18:04 +00:00
}
}()
2016-05-13 15:58:53 +01:00
for {
var msg transport.Message
if err := sock.Recv(&msg); err != nil {
return
}
2015-01-13 23:31:27 +00:00
2016-05-13 15:58:53 +01:00
// we use this Timeout header to set a server deadline
to := msg.Header["Timeout"]
// we use this Content-Type header to identify the codec needed
ct := msg.Header["Content-Type"]
cf, err := s.newCodec(ct)
// TODO: needs better error handling
if err != nil {
sock.Send(&transport.Message{
Header: map[string]string{
"Content-Type": "text/plain",
},
Body: []byte(err.Error()),
})
return
}
2015-11-25 19:50:05 +00:00
2016-05-13 15:58:53 +01:00
codec := newRpcPlusCodec(&msg, sock, cf)
2015-01-13 23:31:27 +00:00
2016-05-13 15:58:53 +01:00
// strip our headers
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
delete(hdr, "Content-Type")
delete(hdr, "Timeout")
2015-01-13 23:31:27 +00:00
2016-05-13 15:58:53 +01:00
ctx := metadata.NewContext(context.Background(), hdr)
2015-12-02 20:56:50 +00:00
2016-05-13 15:58:53 +01:00
// set the timeout if we have it
if len(to) > 0 {
if n, err := strconv.ParseUint(to, 10, 64); err == nil {
ctx, _ = context.WithTimeout(ctx, time.Duration(n))
}
2016-05-12 23:32:58 +01:00
}
// add to wait group
s.wg.Add(1)
defer s.wg.Done()
2016-05-13 15:58:53 +01:00
// TODO: needs better error handling
if err := s.rpc.serveRequest(ctx, codec, ct); err != nil {
2017-05-11 20:43:42 +01:00
log.Logf("Unexpected error serving request, closing socket: %v", err)
2016-05-13 15:58:53 +01:00
return
}
}
2015-05-20 22:57:19 +01:00
}
func (s *rpcServer) newCodec(contentType string) (codec.NewCodec, error) {
if cf, ok := s.opts.Codecs[contentType]; ok {
return cf, nil
2015-11-25 19:50:05 +00:00
}
if cf, ok := defaultCodecs[contentType]; ok {
return cf, nil
}
return nil, fmt.Errorf("Unsupported Content-Type: %s", contentType)
}
func (s *rpcServer) Options() Options {
s.RLock()
opts := s.opts
s.RUnlock()
return opts
2015-01-13 23:31:27 +00:00
}
2016-01-02 19:12:17 +00:00
func (s *rpcServer) Init(opts ...Option) error {
s.Lock()
2015-05-26 22:39:48 +01:00
for _, opt := range opts {
opt(&s.opts)
}
// update internal server
s.rpc = &server{
name: s.opts.Name,
serviceMap: s.rpc.serviceMap,
hdlrWrappers: s.opts.HdlrWrappers,
}
s.Unlock()
2016-01-02 19:12:17 +00:00
return nil
2015-01-13 23:31:27 +00:00
}
func (s *rpcServer) NewHandler(h interface{}, opts ...HandlerOption) Handler {
return newRpcHandler(h, opts...)
2015-01-13 23:31:27 +00:00
}
func (s *rpcServer) Handle(h Handler) error {
s.Lock()
defer s.Unlock()
2015-12-02 01:18:32 +00:00
if err := s.rpc.register(h.Handler()); err != nil {
return err
}
s.handlers[h.Name()] = h
return nil
2015-01-13 23:31:27 +00:00
}
func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
return newSubscriber(topic, sb, opts...)
}
func (s *rpcServer) Subscribe(sb Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := validateSubscriber(sb); err != nil {
return err
}
s.Lock()
defer s.Unlock()
_, ok = s.subscribers[sub]
if ok {
return fmt.Errorf("subscriber %v already exists", s)
}
s.subscribers[sub] = nil
return nil
}
2016-01-27 12:23:18 +00:00
func (s *rpcServer) Register() error {
// parse address for host, port
config := s.Options()
2015-11-11 18:22:04 +00:00
var advt, host string
var port int
2015-11-11 18:22:04 +00:00
// check the advertise address first
// if it exists then use it, otherwise
// use the address
if len(config.Advertise) > 0 {
advt = config.Advertise
2015-11-11 18:22:04 +00:00
} else {
advt = config.Address
2015-11-11 18:22:04 +00:00
}
parts := strings.Split(advt, ":")
if len(parts) > 1 {
host = strings.Join(parts[:len(parts)-1], ":")
port, _ = strconv.Atoi(parts[len(parts)-1])
} else {
host = parts[0]
2015-01-13 23:31:27 +00:00
}
2017-01-12 13:20:34 +00:00
addr, err := addr.Extract(host)
if err != nil {
return err
}
// register service
node := &registry.Node{
2016-01-02 00:38:57 +00:00
Id: config.Name + "-" + config.Id,
Address: addr,
Port: port,
Metadata: config.Metadata,
}
node.Metadata["transport"] = config.Transport.String()
2016-01-04 01:27:05 +00:00
node.Metadata["broker"] = config.Broker.String()
node.Metadata["server"] = s.String()
2016-01-04 23:27:44 +00:00
node.Metadata["registry"] = config.Registry.String()
2015-12-19 21:56:14 +00:00
s.RLock()
// Maps are ordered randomly, sort the keys for consistency
var handlerList []string
for n, e := range s.handlers {
// Only advertise non internal handlers
if !e.Options().Internal {
handlerList = append(handlerList, n)
}
}
sort.Strings(handlerList)
var subscriberList []*subscriber
for e := range s.subscribers {
// Only advertise non internal subscribers
if !e.Options().Internal {
subscriberList = append(subscriberList, e)
}
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
})
var endpoints []*registry.Endpoint
for _, n := range handlerList {
endpoints = append(endpoints, s.handlers[n].Endpoints()...)
}
for _, e := range subscriberList {
endpoints = append(endpoints, e.Endpoints()...)
}
s.RUnlock()
service := &registry.Service{
Name: config.Name,
Version: config.Version,
Nodes: []*registry.Node{node},
Endpoints: endpoints,
}
2016-02-27 22:14:25 +00:00
s.Lock()
registered := s.registered
s.Unlock()
if !registered {
2017-05-11 20:43:42 +01:00
log.Logf("Registering node: %s", node.Id)
2016-02-27 22:14:25 +00:00
}
2016-01-27 12:23:18 +00:00
// create registry options
Add option to enable TCP check with Consul registry One disadvantage of using TTL based health check is the high network traffic between Consul agent (either between servers, or between server and client). In order for the services considered alive by Consul, microservices must send an update TTL to Consul every n seconds (currently 30 seconds). Here is the explanation about TTL check from Consul documentation [1] Time to Live (TTL) - These checks retain their last known state for a given TTL. The state of the check must be updated periodically over the HTTP interface. If an external system fails to update the status within a given TTL, the check is set to the failed state. This mechanism, conceptually similar to a dead man's switch, relies on the application to directly report its health. For example, a healthy app can periodically PUT a status update to the HTTP endpoint; if the app fails, the TTL will expire and the health check enters a critical state. The endpoints used to update health information for a given check are the pass endpoint and the fail endpoint. TTL checks also persist their last known status to disk. This allows the Consul agent to restore the last known status of the check across restarts. Persisted check status is valid through the end of the TTL from the time of the last check. Hint: TTL checks also persist their last known status to disk. This allows the Consul agent to restore the last known status of the check across restarts. When microservices update the TTL, Consul will write to disk. Writing to disk means all other slaves need to replicate it, which means master need to inform other standby Consul to pull the new catalog. Hence, the increased traffic. More information about this issue can be viewed at Consul mailing list [2]. [1] https://www.consul.io/docs/agent/checks.html [2] https://groups.google.com/forum/#!topic/consul-tool/84h7qmCCpjg
2018-03-14 18:51:38 +07:00
rOpts := []registry.RegisterOption{
registry.RegisterTTL(config.RegisterTTL),
registry.RegisterTCPCheck(config.RegisterInterval),
}
2016-01-27 12:23:18 +00:00
2016-01-26 23:32:27 +00:00
if err := config.Registry.Register(service, rOpts...); err != nil {
return err
}
2016-02-27 22:14:25 +00:00
// already registered? don't need to register subscribers
if registered {
return nil
}
2016-02-27 22:14:25 +00:00
s.Lock()
defer s.Unlock()
s.registered = true
for sb, _ := range s.subscribers {
2015-12-02 19:56:57 +00:00
handler := s.createSubHandler(sb, s.opts)
2016-01-22 21:48:43 +00:00
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
2016-05-10 10:55:18 +01:00
opts = append(opts, broker.Queue(queue))
2016-01-22 21:48:43 +00:00
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
if err != nil {
return err
}
s.subscribers[sb] = []broker.Subscriber{sub}
}
return nil
}
func (s *rpcServer) Deregister() error {
config := s.Options()
2015-11-11 18:22:04 +00:00
var advt, host string
var port int
2015-11-11 18:22:04 +00:00
// check the advertise address first
// if it exists then use it, otherwise
// use the address
if len(config.Advertise) > 0 {
advt = config.Advertise
2015-11-11 18:22:04 +00:00
} else {
advt = config.Address
2015-11-11 18:22:04 +00:00
}
parts := strings.Split(advt, ":")
if len(parts) > 1 {
host = strings.Join(parts[:len(parts)-1], ":")
port, _ = strconv.Atoi(parts[len(parts)-1])
} else {
host = parts[0]
}
2017-01-12 13:20:34 +00:00
addr, err := addr.Extract(host)
if err != nil {
return err
}
node := &registry.Node{
2016-01-02 00:38:57 +00:00
Id: config.Name + "-" + config.Id,
Address: addr,
Port: port,
}
service := &registry.Service{
Name: config.Name,
Version: config.Version,
Nodes: []*registry.Node{node},
}
2017-05-11 20:43:42 +01:00
log.Logf("Deregistering node: %s", node.Id)
if err := config.Registry.Deregister(service); err != nil {
return err
}
s.Lock()
if !s.registered {
s.Unlock()
return nil
}
s.registered = false
for sb, subs := range s.subscribers {
for _, sub := range subs {
2017-05-11 20:43:42 +01:00
log.Logf("Unsubscribing from topic: %s", sub.Topic())
sub.Unsubscribe()
}
s.subscribers[sb] = nil
}
s.Unlock()
return nil
2015-01-13 23:31:27 +00:00
}
2015-05-23 17:40:53 +01:00
func (s *rpcServer) Start() error {
registerDebugHandler(s)
config := s.Options()
2015-01-13 23:31:27 +00:00
ts, err := config.Transport.Listen(config.Address)
2015-01-13 23:31:27 +00:00
if err != nil {
return err
}
2017-05-11 20:43:42 +01:00
log.Logf("Listening on %s", ts.Addr())
s.Lock()
s.opts.Address = ts.Addr()
s.Unlock()
2015-05-21 21:08:19 +01:00
go ts.Accept(s.accept)
2015-01-13 23:31:27 +00:00
go func() {
// wait for exit
2015-01-13 23:31:27 +00:00
ch := <-s.exit
// wait for requests to finish
if wait(s.opts.Context) {
s.wg.Wait()
}
// close transport listener
2015-05-20 22:57:19 +01:00
ch <- ts.Close()
// disconnect the broker
config.Broker.Disconnect()
2015-01-13 23:31:27 +00:00
}()
// TODO: subscribe to cruft
return config.Broker.Connect()
2015-01-13 23:31:27 +00:00
}
2015-05-23 17:40:53 +01:00
func (s *rpcServer) Stop() error {
2015-01-13 23:31:27 +00:00
ch := make(chan error)
s.exit <- ch
return <-ch
}
2015-12-19 21:56:14 +00:00
func (s *rpcServer) String() string {
return "rpc"
}