diff --git a/client/rpc_client.go b/client/rpc_client.go index e930ab61..47aedb94 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -19,16 +19,16 @@ import ( type rpcClient struct { once sync.Once opts Options + pool *pool } func newRpcClient(opt ...Option) Client { - var once sync.Once - opts := newOptions(opt...) rc := &rpcClient{ - once: once, + once: sync.Once{}, opts: opts, + pool: newPool(), } c := Client(rc) @@ -73,10 +73,15 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp return errors.InternalServerError("go.micro.client", err.Error()) } - c, err := r.opts.Transport.Dial(address, transport.WithTimeout(opts.DialTimeout)) + var grr error + c, err := r.pool.getConn(address, r.opts.Transport, transport.WithTimeout(opts.DialTimeout)) if err != nil { return errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } + defer func() { + // defer execution of release + r.pool.release(address, c, grr) + }() stream := &rpcStream{ context: ctx, @@ -107,8 +112,10 @@ func (r *rpcClient) call(ctx context.Context, address string, req Request, resp select { case err := <-ch: + grr = err return err case <-ctx.Done(): + grr = ctx.Err() return errors.New("go.micro.client", fmt.Sprintf("%v", ctx.Err()), 408) } } diff --git a/client/rpc_pool.go b/client/rpc_pool.go new file mode 100644 index 00000000..7fa0e720 --- /dev/null +++ b/client/rpc_pool.go @@ -0,0 +1,71 @@ +package client + +import ( + "sync" + + "github.com/micro/go-micro/transport" +) + +type pool struct { + tr transport.Transport + + sync.Mutex + conns map[string][]*poolConn +} + +type poolConn struct { + transport.Client +} + +var ( + maxIdleConn = 2 +) + +func newPool() *pool { + return &pool{ + conns: make(map[string][]*poolConn), + } +} + +// NoOp the Close since we manage it +func (p *poolConn) Close() error { + return nil +} + +func (p *pool) getConn(addr string, tr transport.Transport, opts ...transport.DialOption) (*poolConn, error) { + p.Lock() + conns, ok := p.conns[addr] + // no free conn + if !ok || len(conns) == 0 { + p.Unlock() + // create new conn + c, err := tr.Dial(addr, opts...) + if err != nil { + return nil, err + } + return &poolConn{c}, nil + } + + conn := conns[len(conns)-1] + p.conns[addr] = conns[:len(conns)-1] + p.Unlock() + return conn, nil +} + +func (p *pool) release(addr string, conn *poolConn, err error) { + // don't store the conn + if err != nil { + conn.Client.Close() + return + } + + // otherwise put it back + p.Lock() + conns := p.conns[addr] + if len(conns) >= maxIdleConn { + conn.Client.Close() + return + } + p.conns[addr] = append(conns, conn) + p.Unlock() +} diff --git a/server/rpc_server.go b/server/rpc_server.go index 3a70d54d..d6ca68aa 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -55,50 +55,53 @@ func (s *rpcServer) accept(sock transport.Socket) { } }() - var msg transport.Message - if err := sock.Recv(&msg); err != nil { - return - } - - // we use this Timeout header to set a server deadline - to := msg.Header["Timeout"] - // we use this Content-Type header to identify the codec needed - ct := msg.Header["Content-Type"] - - cf, err := s.newCodec(ct) - // TODO: needs better error handling - if err != nil { - sock.Send(&transport.Message{ - Header: map[string]string{ - "Content-Type": "text/plain", - }, - Body: []byte(err.Error()), - }) - return - } - - codec := newRpcPlusCodec(&msg, sock, cf) - - // strip our headers - hdr := make(map[string]string) - for k, v := range msg.Header { - hdr[k] = v - } - delete(hdr, "Content-Type") - delete(hdr, "Timeout") - - ctx := metadata.NewContext(context.Background(), hdr) - - // set the timeout if we have it - if len(to) > 0 { - if n, err := strconv.ParseUint(to, 10, 64); err == nil { - ctx, _ = context.WithTimeout(ctx, time.Duration(n)) + for { + var msg transport.Message + if err := sock.Recv(&msg); err != nil { + return } - } - // TODO: needs better error handling - if err := s.rpc.serveRequest(ctx, codec, ct); err != nil { - log.Printf("Unexpected error serving request, closing socket: %v", err) + // we use this Timeout header to set a server deadline + to := msg.Header["Timeout"] + // we use this Content-Type header to identify the codec needed + ct := msg.Header["Content-Type"] + + cf, err := s.newCodec(ct) + // TODO: needs better error handling + if err != nil { + sock.Send(&transport.Message{ + Header: map[string]string{ + "Content-Type": "text/plain", + }, + Body: []byte(err.Error()), + }) + return + } + + codec := newRpcPlusCodec(&msg, sock, cf) + + // strip our headers + hdr := make(map[string]string) + for k, v := range msg.Header { + hdr[k] = v + } + delete(hdr, "Content-Type") + delete(hdr, "Timeout") + + ctx := metadata.NewContext(context.Background(), hdr) + + // set the timeout if we have it + if len(to) > 0 { + if n, err := strconv.ParseUint(to, 10, 64); err == nil { + ctx, _ = context.WithTimeout(ctx, time.Duration(n)) + } + } + + // TODO: needs better error handling + if err := s.rpc.serveRequest(ctx, codec, ct); err != nil { + log.Printf("Unexpected error serving request, closing socket: %v", err) + return + } } }