连接池是一种标准技术,用于维护内存中长时间运行的连接以实现高效的重用,并提供对应用程序可能同时使用的连接总数的管理。
特别是对于服务器端Web应用程序,连接池是在内存中维护活动数据库连接的“池”的标准方式,这些连接可以跨请求重用。
SQLAlchemy包含几个与Engine
集成的连接池实现。它们也可以直接用于想要将池化添加到其他普通DBAPI方法的应用程序。
The Engine
returned by the create_engine()
function in most cases has a QueuePool
integrated, pre-configured with reasonable pooling defaults. 如果您只是阅读本节以了解如何启用共享池 - 恭喜!你已经完成了。
The most common QueuePool
tuning parameters can be passed directly to create_engine()
as keyword arguments: pool_size
, max_overflow
, pool_recycle
and pool_timeout
. 例如:
engine = create_engine('postgresql://me@localhost/mydb',
pool_size=20, max_overflow=0)
在SQLite的情况下,方言选择SingletonThreadPool
或NullPool
,以提供与SQLite的线程和锁定模型的更大兼容性,并提供合理的默认行为SQLite“内存”数据库,它们将整个数据集保存在单个连接的范围内。
所有的SQLAlchemy池实现都有共同之处:它们都没有“预创建”连接 - 所有实现都等到创建连接之前首次使用。此时,如果没有额外的并发结算请求进行更多连接,则不会创建其他连接。这就是为什么create_engine()
默认使用大小为5的QueuePool
而不考虑应用程序是否真的需要5个连接排队的原因 - 池只有当应用程序实际同时使用5个连接时才会增长到这个大小,在这种情况下,小池的使用是完全适当的默认行为。
使用create_engine()
不同类型池的常用方法是使用poolclass
参数。该参数接受从sqlalchemy.pool
模块导入的类,并处理为您构建池的详细信息。通用选项包括用SQLite指定QueuePool
:
from sqlalchemy.pool import QueuePool
engine = create_engine('sqlite:///file.db', poolclass=QueuePool)
使用NullPool
:
from sqlalchemy.pool import NullPool
engine = create_engine(
'postgresql+psycopg2://scott:tiger@localhost/test',
poolclass=NullPool)
所有Pool
类接受一个参数creator
,它是一个可调用的参数,用于创建新的连接。create_engine()
接受这个函数通过一个同名的参数传递给池:
import sqlalchemy.pool as pool
import psycopg2
def getconn():
c = psycopg2.connect(username='ed', host='127.0.0.1', dbname='test')
# do things with 'c' to set up
return c
engine = create_engine('postgresql+psycopg2://', creator=getconn)
对于大多数“初始化连接”例程,使用PoolEvents
事件挂接更方便,因此create_engine()
的常用URL参数仍然可用。creator
is there as a last resort for when a DBAPI has some form of connect
that is not at all supported by SQLAlchemy.
要单独使用Pool
,creator
函数是必需的唯一参数,并首先传递,然后是任何其他选项:
import sqlalchemy.pool as pool
import psycopg2
def getconn():
c = psycopg2.connect(username='ed', host='127.0.0.1', dbname='test')
return c
mypool = pool.QueuePool(getconn, max_overflow=10, pool_size=5)
然后可以使用Pool.connect()
函数从池中获取DBAPI连接。此方法的返回值是包含在透明代理中的DBAPI连接:
# get a connection
conn = mypool.connect()
# use it
cursor = conn.cursor()
cursor.execute("select foo")
透明代理的目的是拦截close()
调用,这样就不会关闭DBAPI连接,而是返回到池:
# "close" the connection. Returns
# it to the pool.
conn.close()
代理还会在垃圾收集时将其包含的DBAPI连接返回到池中,尽管它在Python中并不是确定性的,它会立即发生(尽管它通常与cPython一起使用)。
close()
步骤还执行调用DBAPI连接的rollback()
方法的重要步骤。这样就可以删除连接上的任何现有事务,不仅可以确保在下次使用时不会保留现有状态,还可以释放表和行锁以及删除任何隔离的数据快照。可以使用Pool
的reset_on_return
选项禁用此行为。
通过将一个特定的预创建的Pool
传递给create_engine()
的pool
参数,可以与一个或多个引擎共享:
e = create_engine('postgresql://', pool=mypool)
连接池支持一个事件接口,该接口允许在第一次连接时,每次新建连接时以及在检出和检入连接时执行钩子。有关详细信息,请参见PoolEvents
。
连接池可以刷新单个连接以及其整个连接集,将之前池连接设置为“无效”。一个常见的用例是允许连接池在数据库服务器重新启动时正常恢复,并且所有以前建立的连接都不再起作用。有两种方法来解决这个问题。
最常见的方法是让SQLAlchemy在发生时断开连接,此时会刷新池。这假定Pool
与Engine
结合使用。Engine
具有可以检测断开事件并自动刷新池的逻辑。
当Connection
尝试使用DBAPI连接,并且引发与“断开”事件相对应的异常时,连接将失效。然后,Connection
调用Pool.recreate()
方法,有效地使所有当前未检出的连接失效,以便在下次检出时将其替换为新的连接:
from sqlalchemy import create_engine, exc
e = create_engine(...)
c = e.connect()
try:
# suppose the database has been restarted.
c.execute("SELECT * FROM table")
c.close()
except exc.DBAPIError, e:
# an exception is raised, Connection is invalidated.
if e.connection_invalidated:
print("Connection was invalidated!")
# after the invalidate event, a new connection
# starts with a new Pool
c = e.connect()
c.execute("SELECT * FROM table")
上面的例子说明不需要特别干预,在检测到断开连接事件后,池正常继续。但是,引发了一个例外。在使用ORM Session的典型Web应用程序中,上述条件将对应于单个请求失败并出现500错误,然后Web应用程序正常继续执行。因此这种方法是“乐观的”,因为频繁的数据库重启是不可预料的。
可以增加“乐观”方法的其他设置是设置池回收参数。此参数可防止池使用特定时间的特定连接,并适用于数据库后端(如MySQL),该后端可自动关闭在特定时间段后过时的连接:
from sqlalchemy import create_engine
e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)
以上,任何已打开超过一小时的DBAPI连接将在下次结帐时失效并被替换。请注意,仅在结帐时发生失效,而不是处于签出状态的任何连接。pool_recycle
是Pool
本身的函数,与是否使用Engine
无关。
以从池中检出的每个连接发出的额外SQL为代价,由checkout事件处理程序建立的“ping”操作可以在使用前检测到无效连接。在现代SQLAlchemy中,最好的方法是使用ConnectionEvents.engine_connect()
事件,假设使用Engine
,而不仅仅是一个原始Pool
对象:
from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy import select
some_engine = create_engine(...)
@event.listens_for(some_engine, "engine_connect")
def ping_connection(connection, branch):
if branch:
# "branch" refers to a sub-connection of a connection,
# we don't want to bother pinging on these.
return
# turn off "close with result". This flag is only used with
# "connectionless" execution, otherwise will be False in any case
save_should_close_with_result = connection.should_close_with_result
connection.should_close_with_result = False
try:
# run a SELECT 1. use a core select() so that
# the SELECT of a scalar value without a table is
# appropriately formatted for the backend
connection.scalar(select([1]))
except exc.DBAPIError as err:
# catch SQLAlchemy's DBAPIError, which is a wrapper
# for the DBAPI's exception. It includes a .connection_invalidated
# attribute which specifies if this connection is a "disconnect"
# condition, which is based on inspection of the original exception
# by the dialect in use.
if err.connection_invalidated:
# run the same SELECT again - the connection will re-validate
# itself and establish a new connection. The disconnect detection
# here also causes the whole connection pool to be invalidated
# so that all stale connections are discarded.
connection.scalar(select([1]))
else:
raise
finally:
# restore "close with result"
connection.should_close_with_result = save_should_close_with_result
上述配方的优点是,我们利用SQLAlchemy的工具来检测那些已知指示“断开”情况的DBAPI异常,以及Engine
对象正确地使当前连接无效的能力当出现这种情况时允许当前的Connection
重新验证到新的DBAPI连接。
对于不使用Engine
的情况下使用Pool
的常见情况,可以使用较老的方法,如下所示:
from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy.pool import Pool
@event.listens_for(Pool, "checkout")
def ping_connection(dbapi_connection, connection_record, connection_proxy):
cursor = dbapi_connection.cursor()
try:
cursor.execute("SELECT 1")
except:
# raise DisconnectionError - pool will try
# connecting again up to three times before raising.
raise exc.DisconnectionError()
cursor.close()
以上,Pool
对象专门捕获DisconnectionError
,并尝试在放弃然后提升InvalidRequestError
之前创建新的DBAPI连接,最多三次。 ,连接失败。上述方法的缺点是我们没有任何简单的方法来确定引发的异常是否是“断开”的情况,因为没有Engine
或Dialect
Pool
提供了“连接无效”服务,它允许连接的显式失效以及响应于确定使连接不可用的条件的自动失效。
“无效”意味着特定的DBAPI连接将从池中删除并丢弃。如果不清楚连接本身可能未关闭,则在此连接上调用.close()
方法,但如果此方法失败,则将记录异常,但操作仍在继续。
当使用Engine
时,Connection.invalidate()
方法是通常显式失效的入口点。DBAPI连接可能失效的其他条件包括:
connection.execute()
之类的方法时引发的诸如OperationalError
的DBAPI异常被检测为指示所谓的“断开”条件。由于Python DBAPI没有提供用于确定异常性质的标准系统,因此所有SQLAlchemy方言都包含称为is_disconnect()
的系统,该系统将检查异常对象的内容,包括字符串消息和任何潜在的包含的错误代码,以确定此异常是否表示连接不再可用。如果是这种情况,则调用_ConnectionFairy.invalidate()
方法,然后丢弃DBAPI连接。connection.rollback()
或connection.commit()
方法时,由池的“reset on return”行为,抛出一个异常。在连接上调用.close()
的最后一次尝试会被放弃,然后被放弃。PoolEvents.checkout()
的侦听器引发DisconnectionError
异常时,表明连接将不可用,并且需要进行新的连接尝试。所有发生的失效都将调用PoolEvents.invalidate()
事件。
在使用连接池时,以及在使用通过create_engine()
创建的Engine
时扩展至关重要的是,共享连接不会共享到分叉进程 T6>。TCP连接表示为文件描述符,通常跨进程边界工作,这意味着这将导致代表两个或多个完全独立的Python解释器状态并发访问文件描述符。
有两种方法可以解决这个问题。
首先是在子进程内或在现有的Engine
内创建一个新的Engine
,在子进程之前调用Engine.dispose()
进程使用任何连接。这将从池中删除所有现有的连接,以便它可以创建所有新的连接。下面是一个使用multiprocessing.Process
的简单版本,但是这个想法应该适应使用中的分叉风格:
eng = create_engine("...")
def run_in_process():
eng.dispose()
with eng.connect() as conn:
conn.execute("...")
p = Process(target=run_in_process)
The next approach is to instrument the Pool
itself with events so that connections are automatically invalidated in the subprocess. 这有点神奇,但可能更加万无一失:
from sqlalchemy import event
from sqlalchemy import exc
import os
eng = create_engine("...")
@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
connection_record.info['pid'] = os.getpid()
@event.listens_for(engine, "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):
pid = os.getpid()
if connection_record.info['pid'] != pid:
connection_record.connection = connection_proxy.connection = None
raise exc.DisconnectionError(
"Connection record belongs to pid %s, "
"attempting to check out in pid %s" %
(connection_record.info['pid'], pid)
)
在上面,我们使用类似于Disconnect Handling - Pessimistic中描述的方法来将源自不同父进程的DBAPI连接视为“无效”连接,强制池将连接记录回收为建立新的连接。
sqlalchemy.pool.
Pool
(creator, recycle=-1, echo=None, use_threadlocal=False, logging_name=None, reset_on_return=True, listeners=None, events=None, _dispatch=None, _dialect=None)¶连接池的抽象基类。
__init__
(creator, recycle=-1, echo=None, use_threadlocal=False, logging_name=None, reset_on_return=True, listeners=None, events=None, _dispatch=None, _dialect=None)¶构建一个池。
参数: |
|
---|
连接 T0> ( T1> ) T2> ¶ T3>
从池中返回一个DBAPI连接。
这个连接被调用,当它的close()
方法被调用时,连接将被返回到池中。
处置 T0> ( T1> ) T2> ¶ T3>
处置这个池。
这种方法使检出连接保持开放的可能性,因为它只影响在池中空闲的连接。
另请参阅Pool.recreate()
方法。
unique_connection T0> ( T1> ) T2> ¶ T3>
生成一个没有被任何线程本地上下文引用的DBAPI连接。
当Pool.use_threadlocal
标志未设置为True时,此方法等同于Pool.connect()
。当Pool.use_threadlocal
为True时,Pool.unique_connection()
方法提供绕过threadlocal上下文的方法。
sqlalchemy.pool.
QueuePool
(creator, pool_size=5, max_overflow=10, timeout=30, **kw)¶一个Pool
,对打开的连接数量施加限制。
QueuePool
is the default pooling implementation used for all Engine
objects, unless the SQLite dialect is in use.
__init__
(creator, pool_size=5, max_overflow=10, timeout=30, **kw)¶构建一个QueuePool。
参数: |
|
---|
连接 T0> ( T1> ) T2> ¶ T3>
从池中返回一个DBAPI连接。
这个连接被调用,当它的close()
方法被调用时,连接将被返回到池中。
unique_connection T0> ( T1> ) T2> ¶ T3>
unique_connection()
method of Pool
生成一个没有被任何线程本地上下文引用的DBAPI连接。
当Pool.use_threadlocal
标志未设置为True时,此方法等同于Pool.connect()
。当Pool.use_threadlocal
为True时,Pool.unique_connection()
方法提供绕过threadlocal上下文的方法。
sqlalchemy.pool。
SingletonThreadPool
( creator,pool_size = 5 ,** kw ) ¶每个线程维护一个连接的池。
每个线程维护一个连接,永远不会将连接移动到除创建它之外的线程。
警告
SingletonThreadPool
将在存在超过pool_size
大小设置的任意连接上调用.close()
。如果更多的唯一线程标识比使用pool_size
状态。这种清理是非确定性的,并且对链接到这些线程标识的连接是否当前正在使用不敏感。
SingletonThreadPool
may be improved in a future release, however in its current status it is generally used only for test scenarios using a SQLite :memory:
database and is not recommended for production use.
选项与Pool
的选项相同,以及:
参数: | pool_size ¶ - 一次维护连接的线程数。默认为五。 |
---|
SingletonThreadPool
is used by the SQLite dialect automatically when a memory-based database is used. 请参阅SQLite。
__init__
(creator, pool_size=5, **kw)¶sqlalchemy.pool。
AssertionPool
( * args,**千瓦 T5> ) T6> ¶ T7>一个Pool
,允许在任何给定的时间最多检出一个连接。
如果同时检出多个连接,则会引发异常。用于调试使用比期望更多连接的代码。
Changed in version 0.7: AssertionPool
also logs a traceback of where the original connection was checked out, and reports this in the assertion error raised.
sqlalchemy.pool。
NullPool
( creator,recycle = 1,echo = None,use_threadlocal = False,logging_name = None,reset_on_return = True, listeners = None,events = None,_dispatch = None,_dialect = None ) t14 > ¶ T15>不共享连接的池。
相反,它打开并关闭每个打开/关闭连接的底层DB-API连接。
此池实现不支持重新连接相关函数,如recycle
和连接失效,因为没有连接持久存在。
sqlalchemy.pool。
StaticPool
( creator,recycle = 1,echo = None,use_threadlocal = False,logging_name = None,reset_on_return = True, listeners = None,events = None,_dispatch = None,_dialect = None ) t14 > ¶ T15>完全由一个连接组成的池,用于所有请求。
此池实施当前不支持重新连接相关的功能,如recycle
和连接失效(也用于支持自动重新连接),但可以在未来版本中实施。
sqlalchemy.pool.
_ConnectionFairy
(dbapi_connection, connection_record, echo)¶代理一个DBAPI连接并提供返回解除引用支持。
这是Pool
实现用于为由Pool
提供的DBAPI连接提供上下文管理的内部对象。
“fairy”这个名字的灵感来自于_ConnectionFairy
对象的生命周期是暂时的,因为它只持续从池中检出特定DBAPI连接的长度,另外还有一个透明代理,它大部分是看不见的。
也可以看看
_connection_record
=无 ¶对与DBAPI连接关联的_ConnectionRecord
对象的引用。
目前这是一个可能会改变的内部存取器。
连接
=无 ¶对正在跟踪的实际DBAPI连接的引用。
游标
( * args,** kwargs ) T5>为基础连接返回一个新的DBAPI游标。
此方法是connection.cursor()
DBAPI方法的代理。
分离 T0> ( T1> ) T2> ¶ T3>
将此连接与其池分开。
这意味着关闭时连接将不再返回到池中,而是直接关闭。包含的ConnectionRecord与DB-API连接分开,并在下次使用时创建一个新连接。
请注意,由于分离后的连接已从池的知识和控制中移除,因此可能会在分离之后违反池实施施加的任何总体连接限制约束。
信息 T0> ¶ T1>
信息字典与该ConnectionFairy
引用的底层DBAPI连接关联,允许用户定义的数据与连接相关联。
这里的数据将与DBAPI连接一起进行,包括返回到连接池之后,并在_ConnectionFairy
的后续实例中再次使用。它与_ConnectionRecord.info
和Connection.info
访问器共享。
invalidate
(e=None, soft=False)¶将此连接标记为无效。
此方法可以直接调用,也可以作为Connection.invalidate()
方法的结果调用。当被调用时,DBAPI连接会立即关闭并被池中的进一步使用丢弃。失效机制通过_ConnectionRecord.invalidate()
内部方法进行。
参数: |
---|
也可以看看
is_valid T0> ¶ T1>
如果_ConnectionFairy
仍然指向活动的DBAPI连接,则返回True。
sqlalchemy.pool。
_ConnectionRecord
( pool ) t5 > ¶ T6>内部对象,它维护由Pool
引用的单个DBAPI连接。
对于任何特定的DBAPI连接,_ConnectionRecord
对象总是存在,而不管该DBAPI连接是否已“检出”。这与_ConnectionFairy
形成鲜明对比,它仅在检出时才是DBAPI连接的公共外观。
一个_ConnectionRecord
的存在时间可能比单个DBAPI连接的时间长。例如,如果调用_ConnectionRecord.invalidate()
方法,则与此_ConnectionRecord
关联的DBAPI连接将被丢弃,但_ConnectionRecord
可以再次使用,在这种情况下,当Pool
下一次使用此记录时会生成新的DBAPI连接。
_ConnectionRecord
与连接池事件一起交付,包括PoolEvents.connect()
和PoolEvents.checkout()
,但_ConnectionRecord
也可以看看
连接
=无 ¶对正在跟踪的实际DBAPI连接的引用。
如果_ConnectionRecord
已被标记为无效,则可能None
;如果拥有的池调用此_ConnectionRecord
重新连接,则新的DBAPI连接可能会替换它。
信息 T0> ¶ T1>
与DBAPI连接关联的.info
字典。
该字典在_ConnectionFairy.info
和Connection.info
访问器中共享。
invalidate
(e=None, soft=False)¶使此_ConnectionRecord
持有的DBAPI连接失效。
此方法针对所有连接失效而被调用,包括调用_ConnectionFairy.invalidate()
或Connection.invalidate()
方法时,以及何时调用任何所谓的“自动失效“情况发生。
参数: |
---|
也可以看看
任何 PEP 249 DB-API模块都可透明地通过连接池进行“代理”。除了connect()
方法将查询池之外,DB-API的用法与以前完全相同。下面我们用psycopg2
来说明这一点:
import sqlalchemy.pool as pool
import psycopg2 as psycopg
psycopg = pool.manage(psycopg)
# then connect normally
connection = psycopg.connect(database='test', username='scott',
password='tiger')
这产生一个_DBProxy
对象,它支持与原始DB-API模块相同的connect()
函数。连接时,返回一个连接代理对象,它将其调用委托给一个真实的DB-API连接对象。这个连接对象被持久地存储在连接池(Pool
的一个实例)中,该连接池对应于发送给connect()
函数的确切连接参数。
连接代理支持原始连接对象上的所有方法,其中大部分通过__getattr__()
进行代理。close()
方法将返回到池的连接,并且cursor()
方法将返回一个代理游标对象。Both the connection proxy and the cursor proxy will also return the underlying connection to the pool after they have both been garbage collected, which is detected via weakref callbacks (__del__
is not used).
此外,当连接返回到池时,无条件地在连接上发出rollback()
。这是释放可能由正常活动导致的连接仍然存在的任何锁定。
默认情况下,connect()
方法将返回已经在当前线程中检出的相同连接。这允许在给定的线程中使用特定的连接,而不需要在功能之间传递它。要禁用此行为,请为manage()
函数指定use_threadlocal=False
。
sqlalchemy.pool。
manage
( module,** params ) T5> ¶ T6>返回一个DB-API模块的代理,该模块自动将连接集中在一起。
给定一个DB-API 2.0模块和池管理参数,为模块返回一个代理,该模块将自动汇集连接,为发送到装饰模块的connect()函数的每个不同的连接参数集创建新的连接池。
参数: |
---|
sqlalchemy.pool。 T0> clear_managers T1> ( T2> ) T3> ¶ T4>
删除所有当前的DB-API 2.0管理器。
所有游泳池和连接都被丢弃。