引言
有小伙伴看了之前 Requests
自动重试的文章,在问有没有 aiohttp
自动重试的方法,今天,它来了。
aiohttp 特点
aiohttp
是一个异步的请求库,本身可以作为客户端也可以作为服务端对外提供服务。aiohttp
采用异步非阻塞模式,区别于传统同步库。发送多个 HTTP 请求时,不会因等待某个请求响应而阻塞整个程序,能同时处理多个请求,一旦有响应即可处理,极大提升效率与资源利用率。
重试
启用日志
首先导入日志包,开启所有的日志,但是我看了一下 aiohttp
好像没怎么打日志,开了日志也没有打印多少日志,代码见下面。
安装 aiohttp_retry
由于 aiohttp
本身没有自带重试相关的逻辑,所以这里需要安装专门为 aiohttp
设计的重试库,aiohttp_retry
。
使用 pip 安装,我这里使用 poetry 安装。
1 2
| pip install aiohttp_retry poetry add aiohttp_retry
|
重试逻辑
aiohttp_retry
自带了多种重试方法,从指数重试到随机重试可以随意选择。
指数重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import asyncio import logging
from aiohttp_retry import RetryClient, ExponentialRetry
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s-%(filename)s-%(funcName)s-[%(lineno)d]-%(levelname)s-%(message)s') logging.getLogger("aiohttp.access").setLevel(logging.DEBUG) logging.getLogger("aiohttp.client").setLevel(logging.DEBUG) logging.getLogger("aiohttp.internal").setLevel(logging.DEBUG) logging.getLogger("aiohttp.server").setLevel(logging.DEBUG) logging.getLogger("aiohttp.web").setLevel(logging.DEBUG) logging.getLogger("aiohttp.websocket").setLevel(logging.DEBUG) logging.getLogger('aiohttp_retry').setLevel(logging.DEBUG)
loop = asyncio.new_event_loop() loop.set_debug(True)
async def main(): retry_options = ExponentialRetry(attempts=3, start_timeout=1) retry_client = RetryClient(raise_for_status=True, retry_options=retry_options) try: async with retry_client.get('https://httpbin.org/status/503') as response: print((await response.text())[:100]) except Exception as exc: print(exc) finally: await retry_client.close()
asyncio.run(main())
|
使用 ExponentialRetry
指定重试次数,默认参数是 0.1 秒进行指数重试,使用 RetryClient
中的 raise_for_status
可以指定是否在指定的状态码进行重试,这里设置为 True
,所以默认会重试三次。为了明显看出效果,这里将 start_timeout
设置成 1 秒,运行结果:
1 2 3 4 5 6 7 8 9
| 2024-11-21 21:17:04,644-selector_events.py-__init__-[54]-DEBUG-Using selector: KqueueSelector 2024-11-21 21:17:04,644-selector_events.py-__init__-[54]-DEBUG-Using selector: KqueueSelector 2024-11-21 21:17:04,644-client.py-_do_request-[110]-DEBUG-Attempt 1 out of 3 2024-11-21 21:17:06,374-client.py-_do_request-[151]-DEBUG-Retrying after response code: 503 2024-11-21 21:17:08,377-client.py-_do_request-[110]-DEBUG-Attempt 2 out of 3 2024-11-21 21:17:08,636-client.py-_do_request-[151]-DEBUG-Retrying after response code: 503 2024-11-21 21:17:12,637-client.py-_do_request-[110]-DEBUG-Attempt 3 out of 3 503, message='SERVICE UNAVAILABLE', url=URL('https://httpbin.org/status/503') 2024-11-21 21:17:13,058-base_events.py-close-[672]-DEBUG-Close <_UnixSelectorEventLoop running=False closed=False debug=True>
|
可以看到,重试间隔从 1 秒到 2 秒到 4 秒成指数递增。
指定 session
注意,上面的代码中,RetryClient
会在没有传入 session
的时候使用默认的 session
,如果想要使用自定义的 session
的话,需要手动传入你自己的 session
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| loop = asyncio.new_event_loop() loop.set_debug(True)
session = ClientSession()
async def main(): retry_options = ExponentialRetry(attempts=3) retry_client = RetryClient(raise_for_status=True, retry_options=retry_options, client_session=session) try: async with retry_client.get('https://httpbin.org/status/503') as response: print((await response.text())[:100]) except Exception as exc: print(exc) finally: await retry_client.close() await session.close()
asyncio.run(main())
|
传输指定的 session
可以自定义 headers
和 cookie
等参数,这里就不演示了。
随机重试
1 2 3 4 5 6 7 8 9 10 11 12 13
| async def main(): retry_options = RandomRetry(attempts=3, random_func=random.random) retry_client = RetryClient(raise_for_status=True, retry_options=retry_options) try: async with retry_client.get('https://httpbin.org/status/503') as response: print((await response.text())[:100]) except Exception as exc: print(exc) finally: await retry_client.close()
asyncio.run(main())
|
可以通过 RandomRetry
来进行随机时间后重试,随机数函数可以自定义。
跟踪请求
可以在 http 请求在发送的整个生命周期中跟踪它,并且修改它的数据。此方法是 aiohttp
原生提供的,aiohttp_retry
只是对它进行了透传。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| loop = asyncio.new_event_loop() loop.set_debug(True)
retry_options = ExponentialRetry(attempts=3)
async def on_request_start( session: ClientSession, trace_config_ctx: SimpleNamespace, params: TraceRequestStartParams, ) -> None: logger.debug('on_request_start')
async def on_connection_create_end( session: ClientSession, trace_config_ctx: SimpleNamespace, params: TraceRequestStartParams, ) -> None: logger.debug('on_connection_create_end')
async def connection_reuseconn( session: ClientSession, trace_config_ctx: SimpleNamespace, params: TraceRequestStartParams, ) -> None: logger.debug('connection_reuseconn')
async def main(): trace_config = TraceConfig() trace_config.on_request_start.append(on_request_start) trace_config.on_connection_create_end.append(on_connection_create_end) trace_config.on_connection_reuseconn.append(connection_reuseconn) retry_client = RetryClient(retry_options=retry_options, trace_configs=[trace_config]) try: async with retry_client.get('https://httpbin.org/status/503') as response: print((await response.text())[:100]) except Exception as exc: print(exc) finally: await retry_client.close()
asyncio.run(main())
|
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 2024-11-21 21:43:58,362-selector_events.py-__init__-[54]-DEBUG-Using selector: KqueueSelector 2024-11-21 21:43:58,362-selector_events.py-__init__-[54]-DEBUG-Using selector: KqueueSelector 2024-11-21 21:43:58,362-client.py-_do_request-[110]-DEBUG-Attempt 1 out of 3 2024-11-21 21:43:58,363-test7.py-on_request_start-[32]-DEBUG-on_request_start 2024-11-21 21:43:59,268-test7.py-on_connection_create_end-[41]-DEBUG-on_connection_create_end 2024-11-21 21:43:59,764-client.py-_do_request-[151]-DEBUG-Retrying after response code: 503 2024-11-21 21:43:59,965-client.py-_do_request-[110]-DEBUG-Attempt 2 out of 3 2024-11-21 21:43:59,966-test7.py-on_request_start-[32]-DEBUG-on_request_start 2024-11-21 21:43:59,966-test7.py-connection_reuseconn-[50]-DEBUG-connection_reuseconn 2024-11-21 21:44:00,209-client.py-_do_request-[151]-DEBUG-Retrying after response code: 503 2024-11-21 21:44:00,611-client.py-_do_request-[110]-DEBUG-Attempt 3 out of 3 2024-11-21 21:44:00,620-test7.py-on_request_start-[32]-DEBUG-on_request_start 2024-11-21 21:44:00,621-test7.py-connection_reuseconn-[50]-DEBUG-connection_reuseconn
2024-11-21 21:44:00,869-base_events.py-close-[672]-DEBUG-Close <_UnixSelectorEventLoop running=False closed=False debug=True>
|
从日志上可以看出,请求开始,请求结束,连接复用日志都被打印出来了。当然,跟踪请求的方式不止这些,还有更多的可以查看文档。此方法可以用来调试请求,也可以用来在发生异常的时候修改 headers
、body
、params
等等,或许可以修改代理?先挖个坑哈哈,后面再填。
更换 params 重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| loop = asyncio.new_event_loop() loop.set_debug(True)
retry_options = ExponentialRetry(attempts=3)
async def on_request_start( session: ClientSession, trace_config_ctx: SimpleNamespace, params: TraceRequestStartParams, ) -> None: logger.debug('on_request_start') logger.debug(params.headers)
async def on_connection_create_end( session: ClientSession, trace_config_ctx: SimpleNamespace, params: TraceRequestStartParams, ) -> None: logger.debug('on_connection_create_end')
async def connection_reuseconn( session: ClientSession, trace_config_ctx: SimpleNamespace, params: TraceRequestStartParams, ) -> None: logger.debug('connection_reuseconn')
async def main(): trace_config = TraceConfig() trace_config.on_request_start.append(on_request_start) trace_config.on_connection_create_end.append(on_connection_create_end) trace_config.on_connection_reuseconn.append(connection_reuseconn) retry_client = RetryClient(retry_options=retry_options, trace_configs=[trace_config]) try: async with retry_client.requests( params_list=[ RequestParams(method="POST", url='http://httpbin.org/status/503', headers={'headers': 'headers1'}), RequestParams(method="POST", url='http://httpbin.org/status/503', headers={'headers': 'headers2'}) ] ) as response: print((await response.text())[:100]) except Exception as exc: print(exc) finally: await retry_client.close()
asyncio.run(main())
|
运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| 2024-11-21 22:05:26,582-selector_events.py-__init__-[54]-DEBUG-Using selector: KqueueSelector 2024-11-21 22:05:26,582-selector_events.py-__init__-[54]-DEBUG-Using selector: KqueueSelector 2024-11-21 22:05:26,582-client.py-_do_request-[110]-DEBUG-Attempt 1 out of 3 2024-11-21 22:05:26,582-test7.py-on_request_start-[32]-DEBUG-on_request_start 2024-11-21 22:05:26,582-test7.py-on_request_start-[33]-DEBUG-<CIMultiDict('headers': 'headers1')> 2024-11-21 22:05:26,590-test7.py-on_connection_create_end-[42]-DEBUG-on_connection_create_end 2024-11-21 22:05:27,295-client.py-_do_request-[151]-DEBUG-Retrying after response code: 503 2024-11-21 22:05:27,497-client.py-_do_request-[110]-DEBUG-Attempt 2 out of 3 2024-11-21 22:05:27,498-test7.py-on_request_start-[32]-DEBUG-on_request_start 2024-11-21 22:05:27,498-test7.py-on_request_start-[33]-DEBUG-<CIMultiDict('headers': 'headers2')> 2024-11-21 22:05:27,500-test7.py-connection_reuseconn-[51]-DEBUG-connection_reuseconn 2024-11-21 22:05:27,746-client.py-_do_request-[151]-DEBUG-Retrying after response code: 503 2024-11-21 22:05:28,147-client.py-_do_request-[110]-DEBUG-Attempt 3 out of 3 2024-11-21 22:05:28,148-test7.py-on_request_start-[32]-DEBUG-on_request_start 2024-11-21 22:05:28,148-test7.py-on_request_start-[33]-DEBUG-<CIMultiDict('headers': 'headers2')> 2024-11-21 22:05:28,148-test7.py-connection_reuseconn-[51]-DEBUG-connection_reuseconn
2024-11-21 22:05:29,326-base_events.py-close-[672]-DEBUG-Close <_UnixSelectorEventLoop running=False closed=False debug=True>
|
可以看到,在每次重试的时候自动切换了 header
,使用了提前传入的 header
。
总结
以上就是使用 aiohttp_retry
进行自动重试的内容了,当然上面只是举了一些例子,aiohttp_retry
还有更多的用法,大家可以根据自己的需求进行自定义或者查看官方文档。如果有无法满足的地方,可以尝试自己实现重试类,这种传入 retry_options
方式提供了极大的灵活度,我们可以根据自己的需求传入自定义的类,方便自定义逻辑。如果想要控制请求的流转流程,也可以使用 aiohttp
官方提供的 TraceConfig
,它可以在请求流转的各个时间对请求进行修改或者拦截。
本文章首发于个人博客 LLLibra146’s blog
本文作者:LLLibra146
更多文章请关注:
版权声明:本博客所有文章除特别声明外,均采用 © BY-NC-ND 许可协议。非商用转载请注明出处!严禁商业转载!
本文链接:
https://blog.d77.xyz/archives/3208f48f.html