最近的项目中用到了 SQLAlchemy,在压力测试过程中发现了在并发比较高的情况下,发生了大量报错。
项目代码 项目中所用到的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import threadingfrom sqlalchemy import Column, Integerfrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, scoped_sessiondriver = 'SQL Server' address = '' engine = create_engine(f'mssql+pyodbc://{address} /MES?driver={driver} ' , echo=False , pool_recycle=3600 , pool_size=20 , max_overflow=0 ) Base = declarative_base(engine) class APPCommand (Base ): __tablename__ = 'APPCommand' id = Column(Integer, autoincrement=True , primary_key=True , comment='主键id' ) session_factory = sessionmaker(bind=engine) Session = scoped_session(session_factory) session = Session() def test (): result = session.query(APPCommand).all () print (result) session.query(APPCommand).all () if __name__ == '__main__' : threading.Thread(target=test, args=()).start() threading.Thread(target=test, args=()).start() threading.Thread(target=test, args=()).start() threading.Thread(target=test, args=()).start()
错误代码 以上代码在执行过程中,会发生以下错误。
1 2 (pyodbc.Error) ('HY000' , '[HY000] [Microsoft][ODBC SQL Server Driver]连接占线导致另一个 hstmt -> Windows 上报错 Connection is busy with results for another command (0) (SQLExecDirectW)' ) -> Linux 上报错
猜测原因 从报错提示来看可能是因为连接正忙于查询另一个请求,由于是多线程共享一个连接,导致另一个线程使用同一个连接进行了一个新的查询导致了报错。
查询相关关键字,得到以下链接:
https://github.com/mkleehammer/pyodbc/issues/300
https://stackoverflow.com/questions/9017264/why-only-some-users-get-the-error-connection-is-busy-with-results-for-another
查询到的信息都是和 pyodbc 相关的问题,但是由于项目里并没有直接使用 pyodbc,所以解决方案并没有很大的参考性。
继续查询 SQLAlchemy 文档,发现我目前所使用的 session 是非线程安全的。
解决方法 这里先解释下 Session 这个对象,每个 Session 对象对应一个数据库连接,在上面的测试代码中,我们只是初始化了一个 session 然后在全局共享,所以只会和数据库建立一个连接,虽然设置了线程池,但是并没有用到。
这样肯定是不符合我们的要求的,正确的开启线程池的方法是使用上下文/线程本地会话的方式来优雅的使用 session。和 Flask 的实现方式类似,使用 threading.local()
的方式使每一个线程都有自己的 session 对象,从而让每个线程获取到的都是一个新的连接, 这种方式不需要自己实现,SQLAlchemy 已经做好了对应的实现,我们直接调用即可。
对应文档:
https://docs.sqlalchemy.org/en/13/orm/session_basics.html#is-the-session-thread-safe
https://docs.sqlalchemy.org/en/13/orm/contextual.html#unitofwork-contextual
https://docs.sqlalchemy.org/en/13/orm/contextual.html#thread-local-scope
了解了原理之后,根据文档来修改之前的测试代码,使用文档中提到的隐式方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import threadingfrom sqlalchemy import Column, Integerfrom sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmaker, scoped_sessiondriver = 'SQL Server' address = '' engine = create_engine(f'mssql+pyodbc://{address} /MES?driver={driver} ' , echo=False , pool_recycle=3600 , pool_size=20 , max_overflow=0 ) Base = declarative_base(engine) class APPCommand (Base ): __tablename__ = 'APPCommand' id = Column(Integer, autoincrement=True , primary_key=True , comment='主键id' ) session_factory = sessionmaker(bind=engine) Session = scoped_session(session_factory) def test (): result = Session.query(APPCommand).all () print (result) Session.query(APPCommand).all () if __name__ == '__main__' : threading.Thread(target=test, args=()).start() threading.Thread(target=test, args=()).start() threading.Thread(target=test, args=()).start() threading.Thread(target=test, args=()).start()
再次运行,发现可以正常运行,使用系统监测软件也可以发现 python 和数据库建立了四个连接,完工!
参考链接:
https://docs.sqlalchemy.org/
本文章首发于个人博客 LLLibra146’s blog 本文作者 :LLLibra146 更多文章请关注:版权声明 :本博客所有文章除特别声明外,均采用 © BY-NC-ND 许可协议。非商用转载请注明出处!严禁商业转载!本文链接 :https://blog.d77.xyz/archives/87a89514.html