123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- """Connection pooling for psycopg2
- This module implements thread-safe (and not) connection pools.
- """
- # psycopg/pool.py - pooling code for psycopg
- #
- # Copyright (C) 2003-2019 Federico Di Gregorio <fog@debian.org>
- # Copyright (C) 2020-2021 The Psycopg Team
- #
- # psycopg2 is free software: you can redistribute it and/or modify it
- # under the terms of the GNU Lesser General Public License as published
- # by the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # In addition, as a special exception, the copyright holders give
- # permission to link this program with the OpenSSL library (or with
- # modified versions of OpenSSL that use the same license as OpenSSL),
- # and distribute linked combinations including the two.
- #
- # You must obey the GNU Lesser General Public License in all respects for
- # all of the code used other than OpenSSL.
- #
- # psycopg2 is distributed in the hope that it will be useful, but WITHOUT
- # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
- # License for more details.
- import psycopg2
- from psycopg2 import extensions as _ext
- class PoolError(psycopg2.Error):
- pass
- class AbstractConnectionPool:
- """Generic key-based pooling code."""
- def __init__(self, minconn, maxconn, *args, **kwargs):
- """Initialize the connection pool.
- New 'minconn' connections are created immediately calling 'connfunc'
- with given parameters. The connection pool will support a maximum of
- about 'maxconn' connections.
- """
- self.minconn = int(minconn)
- self.maxconn = int(maxconn)
- self.closed = False
- self._args = args
- self._kwargs = kwargs
- self._pool = []
- self._used = {}
- self._rused = {} # id(conn) -> key map
- self._keys = 0
- for i in range(self.minconn):
- self._connect()
- def _connect(self, key=None):
- """Create a new connection and assign it to 'key' if not None."""
- conn = psycopg2.connect(*self._args, **self._kwargs)
- if key is not None:
- self._used[key] = conn
- self._rused[id(conn)] = key
- else:
- self._pool.append(conn)
- return conn
- def _getkey(self):
- """Return a new unique key."""
- self._keys += 1
- return self._keys
- def _getconn(self, key=None):
- """Get a free connection and assign it to 'key' if not None."""
- if self.closed:
- raise PoolError("connection pool is closed")
- if key is None:
- key = self._getkey()
- if key in self._used:
- return self._used[key]
- if self._pool:
- self._used[key] = conn = self._pool.pop()
- self._rused[id(conn)] = key
- return conn
- else:
- if len(self._used) == self.maxconn:
- raise PoolError("connection pool exhausted")
- return self._connect(key)
- def _putconn(self, conn, key=None, close=False):
- """Put away a connection."""
- if self.closed:
- raise PoolError("connection pool is closed")
- if key is None:
- key = self._rused.get(id(conn))
- if key is None:
- raise PoolError("trying to put unkeyed connection")
- if len(self._pool) < self.minconn and not close:
- # Return the connection into a consistent state before putting
- # it back into the pool
- if not conn.closed:
- status = conn.info.transaction_status
- if status == _ext.TRANSACTION_STATUS_UNKNOWN:
- # server connection lost
- conn.close()
- elif status != _ext.TRANSACTION_STATUS_IDLE:
- # connection in error or in transaction
- conn.rollback()
- self._pool.append(conn)
- else:
- # regular idle connection
- self._pool.append(conn)
- # If the connection is closed, we just discard it.
- else:
- conn.close()
- # here we check for the presence of key because it can happen that a
- # thread tries to put back a connection after a call to close
- if not self.closed or key in self._used:
- del self._used[key]
- del self._rused[id(conn)]
- def _closeall(self):
- """Close all connections.
- Note that this can lead to some code fail badly when trying to use
- an already closed connection. If you call .closeall() make sure
- your code can deal with it.
- """
- if self.closed:
- raise PoolError("connection pool is closed")
- for conn in self._pool + list(self._used.values()):
- try:
- conn.close()
- except Exception:
- pass
- self.closed = True
- class SimpleConnectionPool(AbstractConnectionPool):
- """A connection pool that can't be shared across different threads."""
- getconn = AbstractConnectionPool._getconn
- putconn = AbstractConnectionPool._putconn
- closeall = AbstractConnectionPool._closeall
- class ThreadedConnectionPool(AbstractConnectionPool):
- """A connection pool that works with the threading module."""
- def __init__(self, minconn, maxconn, *args, **kwargs):
- """Initialize the threading lock."""
- import threading
- AbstractConnectionPool.__init__(
- self, minconn, maxconn, *args, **kwargs)
- self._lock = threading.Lock()
- def getconn(self, key=None):
- """Get a free connection and assign it to 'key' if not None."""
- self._lock.acquire()
- try:
- return self._getconn(key)
- finally:
- self._lock.release()
- def putconn(self, conn=None, key=None, close=False):
- """Put away an unused connection."""
- self._lock.acquire()
- try:
- self._putconn(conn, key, close)
- finally:
- self._lock.release()
- def closeall(self):
- """Close all connections (even the one currently in use.)"""
- self._lock.acquire()
- try:
- self._closeall()
- finally:
- self._lock.release()
|