minimize allocations in logger and tunnel code (#1323)

* logs alloc

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* fix allocs

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* fix allocs

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* tunnel allocs

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* try to fix tunnel

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* cache cipher for send

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>

* more logger

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
2020-03-11 20:55:39 +03:00
committed by GitHub
parent 4125ae8d53
commit 7b385bf163
47 changed files with 917 additions and 382 deletions

View File

@@ -8,7 +8,7 @@ import (
"time"
"github.com/google/uuid"
log "github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/logger"
"github.com/micro/go-micro/v2/transport"
)
@@ -120,7 +120,8 @@ func (t *tun) listChannels() []string {
}
// newSession creates a new session and saves it
func (t *tun) newSession(channel, sessionId string) (*session, bool) {
func (t *tun) newSession(channel, sessionId string) (*session, bool, error) {
// new session
s := &session{
tunnel: t.id,
@@ -133,6 +134,11 @@ func (t *tun) newSession(channel, sessionId string) (*session, bool) {
errChan: make(chan error, 1),
key: []byte(t.token + channel + sessionId),
}
gcm, err := newCipher(s.key)
if err != nil {
return nil, false, err
}
s.gcm = gcm
// save session
t.Lock()
@@ -140,14 +146,14 @@ func (t *tun) newSession(channel, sessionId string) (*session, bool) {
if ok {
// session already exists
t.Unlock()
return nil, false
return nil, false, nil
}
t.sessions[channel+sessionId] = s
t.Unlock()
// return session
return s, true
return s, true, nil
}
// TODO: use tunnel id as part of the session
@@ -193,11 +199,14 @@ func (t *tun) announce(channel, session string, link *link) {
}
}
log.Debugf("Tunnel sending announce for discovery of channel(s) %s", channel)
if logger.V(logger.TraceLevel, log) {
log.Debugf("Tunnel sending announce for discovery of channel(s) %s", channel)
}
// send back the announcement
if err := link.Send(msg); err != nil {
log.Debugf("Tunnel failed to send announcement for channel(s) %s message: %v", channel, err)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel failed to send announcement for channel(s) %s message: %v", channel, err)
}
}
}
@@ -241,18 +250,26 @@ func (t *tun) manageLink(link *link) {
wait(DiscoverTime)
// send a discovery message to the link
log.Debugf("Tunnel sending discover to link: %v", link.Remote())
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel sending discover to link: %v", link.Remote())
}
if err := t.sendMsg("discover", link); err != nil {
log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel failed to send discover to link %s: %v", link.Remote(), err)
}
}
case <-keepalive.C:
// wait half the keepalive time
wait(KeepAliveTime)
// send keepalive message
log.Debugf("Tunnel sending keepalive to link: %v", link.Remote())
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel sending keepalive to link: %v", link.Remote())
}
if err := t.sendMsg("keepalive", link); err != nil {
log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel error sending keepalive to link %v: %v", link.Remote(), err)
}
t.delLink(link.Remote())
return
}
@@ -301,8 +318,9 @@ func (t *tun) manageLinks() {
t.Lock()
for link, node := range delLinks {
log.Debugf("Tunnel deleting dead link for %s", node)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel deleting dead link for %s", node)
}
// check if the link exists
l, ok := t.links[node]
if ok {
@@ -335,7 +353,9 @@ func (t *tun) manageLinks() {
// if we're using quic it should be a max 10 second handshake period
link, err := t.setupLink(node)
if err != nil {
log.Debugf("Tunnel failed to setup node link to %s: %v", node, err)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel failed to setup node link to %s: %v", node, err)
}
return
}
@@ -384,7 +404,9 @@ func (t *tun) process() {
// if the link is not connected skip it
if !connected {
log.Debugf("Link for node %s not connected", id)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Link for node %s not connected", id)
}
err = ErrLinkDisconnected
continue
}
@@ -428,7 +450,9 @@ func (t *tun) process() {
// no links to send to
if len(sendTo) == 0 {
log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel)
if logger.V(logger.DebugLevel, log) {
log.Debugf("No links to send message type: %s channel: %s", msg.typ, msg.channel)
}
t.respond(msg, err)
continue
}
@@ -454,7 +478,9 @@ func (t *tun) sendTo(links []*link, msg *message) error {
// the function that sends the actual message
send := func(link *link, msg *transport.Message) error {
if err := link.Send(msg); err != nil {
log.Debugf("Tunnel error sending %+v to %s: %v", msg.Header, link.Remote(), err)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel error sending %+v to %s: %v", msg.Header, link.Remote(), err)
}
t.delLink(link.Remote())
return err
}
@@ -493,7 +519,9 @@ func (t *tun) sendTo(links []*link, msg *message) error {
// send the message
for _, link := range links {
// send the message via the current link
log.Tracef("Tunnel sending %+v to %s", newMsg.Header, link.Remote())
if logger.V(logger.TraceLevel, log) {
log.Tracef("Tunnel sending %+v to %s", newMsg.Header, link.Remote())
}
// blast it in a go routine since its multicast/broadcast
if msg.mode > Unicast {
@@ -552,7 +580,9 @@ func (t *tun) delLink(remote string) {
continue
}
// close and delete
log.Debugf("Tunnel deleting link node: %s remote: %s", id, link.Remote())
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel deleting link node: %s remote: %s", id, link.Remote())
}
link.Close()
delete(t.links, id)
}
@@ -602,7 +632,9 @@ func (t *tun) listen(link *link) {
// if its not connected throw away the link
// the first message we process needs to be connect
if !connected && mtype != "connect" {
log.Debugf("Tunnel link %s not connected", link.id)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel link %s not connected", link.id)
}
return
}
@@ -611,7 +643,9 @@ func (t *tun) listen(link *link) {
// discover, announce, session, keepalive
switch mtype {
case "connect":
log.Debugf("Tunnel link %s received connect message", link.Remote())
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel link %s received connect message", link.Remote())
}
link.Lock()
@@ -644,11 +678,15 @@ func (t *tun) listen(link *link) {
// if there is no channel then we close the link
// as its a signal from the other side to close the connection
if len(channel) == 0 {
log.Debugf("Tunnel link %s received close message", link.Remote())
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel link %s received close message", link.Remote())
}
return
}
log.Debugf("Tunnel link %s received close message for %s", link.Remote(), channel)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel link %s received close message for %s", link.Remote(), channel)
}
// the entire listener was closed by the remote side so we need to
// remove the channel mapping for it. should we also close sessions?
if sessionId == "listener" {
@@ -673,13 +711,17 @@ func (t *tun) listen(link *link) {
}
// otherwise its a session mapping of sorts
case "keepalive":
log.Debugf("Tunnel link %s received keepalive", link.Remote())
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel link %s received keepalive", link.Remote())
}
// save the keepalive
link.keepalive()
continue
// a new connection dialled outbound
case "open":
log.Debugf("Tunnel link %s received open %s %s", link.id, channel, sessionId)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel link %s received open %s %s", link.id, channel, sessionId)
}
// we just let it pass through to be processed
// an accept returned by the listener
case "accept":
@@ -697,11 +739,14 @@ func (t *tun) listen(link *link) {
// a continued session
case "session":
// process message
log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote())
if logger.V(logger.TraceLevel, log) {
log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote())
}
// an announcement of a channel listener
case "announce":
log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote())
if logger.V(logger.TraceLevel, log) {
log.Tracef("Tunnel received %+v from %s", msg.Header, link.Remote())
}
// process the announcement
channels := strings.Split(channel, ",")
@@ -773,7 +818,9 @@ func (t *tun) listen(link *link) {
s, exists = t.getSession(channel, "listener")
// only return accept to the session
case mtype == "accept":
log.Debugf("Tunnel received accept message for channel: %s session: %s", channel, sessionId)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel received accept message for channel: %s session: %s", channel, sessionId)
}
s, exists = t.getSession(channel, sessionId)
if exists && s.accepted {
continue
@@ -793,7 +840,9 @@ func (t *tun) listen(link *link) {
// bail if no session or listener has been found
if !exists {
log.Tracef("Tunnel skipping no channel: %s session: %s exists", channel, sessionId)
if logger.V(logger.TraceLevel, log) {
log.Tracef("Tunnel skipping no channel: %s session: %s exists", channel, sessionId)
}
// drop it, we don't care about
// messages we don't know about
continue
@@ -808,9 +857,9 @@ func (t *tun) listen(link *link) {
default:
// otherwise process
}
log.Tracef("Tunnel using channel: %s session: %s type: %s", s.channel, s.session, mtype)
if logger.V(logger.TraceLevel, log) {
log.Tracef("Tunnel using channel: %s session: %s type: %s", s.channel, s.session, mtype)
}
// construct a new transport message
tmsg := &transport.Message{
Header: msg.Header,
@@ -851,16 +900,19 @@ func (t *tun) sendMsg(method string, link *link) error {
// setupLink connects to node and returns link if successful
// It returns error if the link failed to be established
func (t *tun) setupLink(node string) (*link, error) {
log.Debugf("Tunnel setting up link: %s", node)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel setting up link: %s", node)
}
c, err := t.options.Transport.Dial(node)
if err != nil {
log.Debugf("Tunnel failed to connect to %s: %v", node, err)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel failed to connect to %s: %v", node, err)
}
return nil, err
}
log.Debugf("Tunnel connected to %s", node)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel connected to %s", node)
}
// create a new link
link := newLink(c)
@@ -905,7 +957,9 @@ func (t *tun) setupLinks() {
// create new link
link, err := t.setupLink(node)
if err != nil {
log.Debugf("Tunnel failed to setup node link to %s: %v", node, err)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel failed to setup node link to %s: %v", node, err)
}
return
}
@@ -931,8 +985,9 @@ func (t *tun) connect() error {
go func() {
// accept inbound connections
err := l.Accept(func(sock transport.Socket) {
log.Debugf("Tunnel accepted connection from %s", sock.Remote())
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel accepted connection from %s", sock.Remote())
}
// create a new link
link := newLink(sock)
@@ -1089,7 +1144,9 @@ func (t *tun) Close() error {
return nil
}
log.Debug("Tunnel closing")
if logger.V(logger.DebugLevel, log) {
log.Debug("Tunnel closing")
}
select {
case <-t.closed:
@@ -1117,11 +1174,18 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
o(&options)
}
log.Debugf("Tunnel dialing %s", channel)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel dialing %s", channel)
}
// create a new session
c, ok := t.newSession(channel, t.newSessionId())
if !ok {
c, ok, err := t.newSession(channel, t.newSessionId())
if err != nil {
if logger.V(logger.DebugLevel, log) {
log.Error(err)
}
return nil, err
} else if !ok {
return nil, errors.New("error dialing " + channel)
}
@@ -1171,7 +1235,9 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
if len(links) == 0 {
// delete session and return error
t.delSession(c.channel, c.session)
log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, ErrLinkNotFound)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, ErrLinkNotFound)
}
return nil, ErrLinkNotFound
}
@@ -1206,7 +1272,9 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
err := c.Discover()
if err != nil {
t.delSession(c.channel, c.session)
log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, err)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, err)
}
return nil, err
}
@@ -1244,7 +1312,9 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
if err := c.Open(); err != nil {
// delete the session
t.delSession(c.channel, c.session)
log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, err)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel deleting session %s %s: %v", c.session, c.channel, err)
}
return nil, err
}
@@ -1269,8 +1339,9 @@ func (t *tun) Dial(channel string, opts ...DialOption) (Session, error) {
// Accept a connection on the address
func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) {
log.Debugf("Tunnel listening on %s", channel)
if logger.V(logger.DebugLevel, log) {
log.Debugf("Tunnel listening on %s", channel)
}
options := ListenOptions{
// Read timeout defaults to never
Timeout: time.Duration(-1),
@@ -1281,8 +1352,13 @@ func (t *tun) Listen(channel string, opts ...ListenOption) (Listener, error) {
}
// create a new session by hashing the address
c, ok := t.newSession(channel, "listener")
if !ok {
c, ok, err := t.newSession(channel, "listener")
if err != nil {
if logger.V(logger.ErrorLevel, log) {
log.Error(err)
}
return nil, err
} else if !ok {
return nil, errors.New("already listening on " + channel)
}