commit
57dacf1831
@ -74,7 +74,7 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele
|
|||||||
// get next nodes from the selector
|
// get next nodes from the selector
|
||||||
next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
|
next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
|
||||||
if err != nil && err == selector.ErrNotFound {
|
if err != nil && err == selector.ErrNotFound {
|
||||||
return nil, errors.NotFound("go.micro.client", err.Error())
|
return nil, errors.NotFound("go.micro.client", "service %s not found: %v", service, err.Error())
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
@ -351,7 +351,7 @@ func (g *grpcClient) Call(ctx context.Context, req client.Request, rsp interface
|
|||||||
// select next node
|
// select next node
|
||||||
node, err := next()
|
node, err := next()
|
||||||
if err != nil && err == selector.ErrNotFound {
|
if err != nil && err == selector.ErrNotFound {
|
||||||
return errors.NotFound("go.micro.client", err.Error())
|
return errors.NotFound("go.micro.client", "service %s not found: %v", req.Service(), err.Error())
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return errors.InternalServerError("go.micro.client", err.Error())
|
return errors.InternalServerError("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
@ -430,7 +430,7 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
|
|||||||
|
|
||||||
node, err := next()
|
node, err := next()
|
||||||
if err != nil && err == selector.ErrNotFound {
|
if err != nil && err == selector.ErrNotFound {
|
||||||
return nil, errors.NotFound("go.micro.client", err.Error())
|
return nil, errors.NotFound("go.micro.client", "service %s not found: %v", req.Service(), err.Error())
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
return nil, errors.InternalServerError("go.micro.client", err.Error())
|
||||||
}
|
}
|
||||||
|
@ -111,25 +111,26 @@ func (t *tun) process() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case msg := <-t.send:
|
case msg := <-t.send:
|
||||||
nmsg := &transport.Message{
|
newMsg := &transport.Message{
|
||||||
Header: msg.data.Header,
|
Header: make(map[string]string),
|
||||||
Body: msg.data.Body,
|
Body: msg.data.Body,
|
||||||
}
|
}
|
||||||
|
|
||||||
if nmsg.Header == nil {
|
for k, v := range msg.data.Header {
|
||||||
nmsg.Header = make(map[string]string)
|
newMsg.Header[k] = v
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the tunnel id on the outgoing message
|
// set the tunnel id on the outgoing message
|
||||||
nmsg.Header["Micro-Tunnel-Id"] = msg.id
|
newMsg.Header["Micro-Tunnel-Id"] = msg.id
|
||||||
|
|
||||||
// set the session id
|
// set the session id
|
||||||
nmsg.Header["Micro-Tunnel-Session"] = msg.session
|
newMsg.Header["Micro-Tunnel-Session"] = msg.session
|
||||||
|
|
||||||
// send the message via the interface
|
// send the message via the interface
|
||||||
t.RLock()
|
t.RLock()
|
||||||
for _, link := range t.links {
|
for _, link := range t.links {
|
||||||
link.Send(nmsg)
|
log.Debugf("Sending %+v to %s", newMsg, link.Remote())
|
||||||
|
link.Send(newMsg)
|
||||||
}
|
}
|
||||||
t.RUnlock()
|
t.RUnlock()
|
||||||
case <-t.closed:
|
case <-t.closed:
|
||||||
@ -170,29 +171,26 @@ func (t *tun) listen(link transport.Socket, listener bool) {
|
|||||||
var s *socket
|
var s *socket
|
||||||
var exists bool
|
var exists bool
|
||||||
|
|
||||||
// if its a local listener then we use that as the session id
|
log.Debugf("Received %+v from %s", msg, link.Remote())
|
||||||
// e.g we're using a loopback connecting to ourselves
|
// get the socket based on the tunnel id and session
|
||||||
if listener {
|
// this could be something we dialed in which case
|
||||||
|
// we have a session for it otherwise its a listener
|
||||||
|
s, exists = t.getSocket(id, session)
|
||||||
|
if !exists {
|
||||||
|
// try get it based on just the tunnel id
|
||||||
|
// the assumption here is that a listener
|
||||||
|
// has no session but its set a listener session
|
||||||
s, exists = t.getSocket(id, "listener")
|
s, exists = t.getSocket(id, "listener")
|
||||||
} else {
|
|
||||||
// get the socket based on the tunnel id and session
|
|
||||||
// this could be something we dialed in which case
|
|
||||||
// we have a session for it otherwise its a listener
|
|
||||||
s, exists = t.getSocket(id, session)
|
|
||||||
if !exists {
|
|
||||||
// try get it based on just the tunnel id
|
|
||||||
// the assumption here is that a listener
|
|
||||||
// has no session but its set a listener session
|
|
||||||
s, exists = t.getSocket(id, "listener")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// no socket in existence
|
// no socket in existence
|
||||||
if !exists {
|
if !exists {
|
||||||
|
log.Debugf("Skipping")
|
||||||
// drop it, we don't care about
|
// drop it, we don't care about
|
||||||
// messages we don't know about
|
// messages we don't know about
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
log.Debugf("Using socket %s %s", s.id, s.session)
|
||||||
|
|
||||||
// is the socket closed?
|
// is the socket closed?
|
||||||
select {
|
select {
|
||||||
@ -398,6 +396,7 @@ func (t *tun) Init(opts ...Option) error {
|
|||||||
|
|
||||||
// Dial an address
|
// Dial an address
|
||||||
func (t *tun) Dial(addr string) (Conn, error) {
|
func (t *tun) Dial(addr string) (Conn, error) {
|
||||||
|
log.Debugf("Tunnel dialing %s", addr)
|
||||||
c, ok := t.newSocket(addr, t.newSession())
|
c, ok := t.newSocket(addr, t.newSession())
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("error dialing " + addr)
|
return nil, errors.New("error dialing " + addr)
|
||||||
@ -413,6 +412,7 @@ func (t *tun) Dial(addr string) (Conn, error) {
|
|||||||
|
|
||||||
// Accept a connection on the address
|
// Accept a connection on the address
|
||||||
func (t *tun) Listen(addr string) (Listener, error) {
|
func (t *tun) Listen(addr string) (Listener, error) {
|
||||||
|
log.Debugf("Tunnel listening on %s", addr)
|
||||||
// create a new socket by hashing the address
|
// create a new socket by hashing the address
|
||||||
c, ok := t.newSocket(addr, "listener")
|
c, ok := t.newSocket(addr, "listener")
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -48,9 +48,6 @@ func (t *tunListener) process() {
|
|||||||
wait: make(chan bool),
|
wait: make(chan bool),
|
||||||
}
|
}
|
||||||
|
|
||||||
// first message
|
|
||||||
sock.recv <- m
|
|
||||||
|
|
||||||
// save the socket
|
// save the socket
|
||||||
conns[m.session] = sock
|
conns[m.session] = sock
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user