SQLAlchemy多线程报错

最近的项目中用到了 SQLAlchemy,在压力测试过程中发现了在并发比较高的情况下,发生了大量报错。

项目代码

项目中所用到的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
from sqlalchemy import Column, Integer
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

driver = 'SQL Server'
address = ''
# 使用主机名连接时,还必须在URL的查询参数中指定驱动程序名称
engine = create_engine(f'mssql+pyodbc://{address}/MES?driver={driver}', # noqa
echo=False, # 不显示SQL语句
pool_recycle=3600,
pool_size=20, max_overflow=0 # 开启连接池,并设置最大连接数量
)
Base = declarative_base(engine)


class APPCommand(Base):
__tablename__ = 'APPCommand' # noqa

id = Column(Integer, autoincrement=True, primary_key=True, comment='主键id')


session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
session = Session()


def test():
result = session.query(APPCommand).all()
print(result)
session.query(APPCommand).all()


if __name__ == '__main__':
# 多线程模拟并发访问
threading.Thread(target=test, args=()).start()
threading.Thread(target=test, args=()).start()
threading.Thread(target=test, args=()).start()
threading.Thread(target=test, args=()).start()

错误代码

以上代码在执行过程中,会发生以下错误。

1
2
(pyodbc.Error) ('HY000', '[HY000] [Microsoft][ODBC SQL Server Driver]连接占线导致另一个 hstmt  -> Windows 上报错
Connection is busy with results for another command (0) (SQLExecDirectW)') -> Linux 上报错

猜测原因

从报错提示来看可能是因为连接正忙于查询另一个请求,由于是多线程共享一个连接,导致另一个线程使用同一个连接进行了一个新的查询导致了报错。

查询相关关键字,得到以下链接:

https://github.com/mkleehammer/pyodbc/issues/300

https://stackoverflow.com/questions/9017264/why-only-some-users-get-the-error-connection-is-busy-with-results-for-another

查询到的信息都是和 pyodbc 相关的问题,但是由于项目里并没有直接使用 pyodbc,所以解决方案并没有很大的参考性。

继续查询 SQLAlchemy 文档,发现我目前所使用的 session 是非线程安全的。

解决方法

这里先解释下 Session 这个对象,每个 Session 对象对应一个数据库连接,在上面的测试代码中,我们只是初始化了一个 session 然后在全局共享,所以只会和数据库建立一个连接,虽然设置了线程池,但是并没有用到。

这样肯定是不符合我们的要求的,正确的开启线程池的方法是使用上下文/线程本地会话的方式来优雅的使用 session。和 Flask 的实现方式类似,使用 threading.local() 的方式使每一个线程都有自己的 session 对象,从而让每个线程获取到的都是一个新的连接, 这种方式不需要自己实现,SQLAlchemy 已经做好了对应的实现,我们直接调用即可。

对应文档:

https://docs.sqlalchemy.org/en/13/orm/session_basics.html#is-the-session-thread-safe

https://docs.sqlalchemy.org/en/13/orm/contextual.html#unitofwork-contextual

https://docs.sqlalchemy.org/en/13/orm/contextual.html#thread-local-scope

了解了原理之后,根据文档来修改之前的测试代码,使用文档中提到的隐式方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import threading
from sqlalchemy import Column, Integer
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

driver = 'SQL Server'
address = ''
# 使用主机名连接时,还必须在URL的查询参数中指定驱动程序名称
engine = create_engine(f'mssql+pyodbc://{address}/MES?driver={driver}', # noqa
echo=False, # 不显示SQL语句
pool_recycle=3600,
pool_size=20, max_overflow=0 # 开启连接池,并设置最大连接数量
)
Base = declarative_base(engine)


class APPCommand(Base):
__tablename__ = 'APPCommand' # noqa

id = Column(Integer, autoincrement=True, primary_key=True, comment='主键id')


session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)


def test():
result = Session.query(APPCommand).all()
print(result)
Session.query(APPCommand).all()


if __name__ == '__main__':
threading.Thread(target=test, args=()).start()
threading.Thread(target=test, args=()).start()
threading.Thread(target=test, args=()).start()
threading.Thread(target=test, args=()).start()

再次运行,发现可以正常运行,使用系统监测软件也可以发现 python 和数据库建立了四个连接,完工!

参考链接:

https://docs.sqlalchemy.org/

本文章首发于个人博客 LLLibra146’s blog
本文作者:LLLibra146
版权声明:本博客所有文章除特别声明外,均采用 © BY-NC-ND 许可协议。非商用转载请注明出处!严禁商业转载!
本文链接https://blog.d77.xyz/archives/87a89514.html