一次封装,解放双手:Requests如何实现0入侵请求与响应的智能加解密

引言

之前写了 Requests 自动重试的文章,突然想到,之前还用到过 Requests 自动加解密请求的逻辑,分享一下。之前在做逆向的时候,发现一般医院的小程序请求会这么玩,请求数据可能加密也可能不加密,但是返回的 json 数据是加密的,每次都要去写加解密的代码就比较麻烦,所以想了一个办法将加解密封装到 Requests 里面,下面开始。

session

session 是 Requests 中比较重要的一个类,基本上所有的请求都走 session 类的 send 方法,所以咱们这里从 send 方法入手。

服务端代码

这里为了方便演示,我开发了一个小小的 demo 服务端来演示服务端加密的场景。先看一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import base64
import json
import uuid
from hashlib import md5

from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from flask import Flask, request, Response

app = Flask(__name__)


def get_nonce():
return uuid.uuid4().__str__()


def encrypt(k, data, mode=AES.MODE_ECB):
aes = AES.new(k.encode(), mode)
result = aes.encrypt(pad(data.encode(), block_size=16))
return base64.b64encode(result).decode()


def get_headers():
return {
'Content-Type': 'application/json',
'x-encrypt': 'true',
'nonce': get_nonce()
}


@app.post('/')
def index():
data = request.json
print(f"get timestamp is :{request.headers.get('x-timestamp')}")
print(f"get sign is :{request.headers.get('x-sign')}")
print(f'json data is:{data}')
assert md5(request.headers.get('x-timestamp').encode()).hexdigest() == request.headers.get('x-sign')
headers = get_headers()
data = {'encrypt': str(data['data'] * 2) + (request.headers.get('x-sign') or '')}
print(f'before encrypt data is:{data}')
d = encrypt(headers.get('nonce').replace('-', '')[:16], json.dumps(data))
print(f'after encrypt data is:{d}')
return Response(json.dumps({'data': d}), headers=headers, mimetype='application/json')


if __name__ == '__main__':
app.run(debug=True)

这里通过 flask 写了一个接口,代码不复杂,大家应该都能看懂。简单讲下逻辑:

  • 首先这是个一个 post 请求的接口,接受 json 数据和请求头
  • 接收到请求头后获取时间戳和 sign 字段
  • 使用时间戳验证 sign 字段是否正确,不正确抛出异常,为了演示这里就不处理异常的场景了
  • 接下来处理数据,将接收到的数据取出 data 字段,复制一遍,然后添加 sign 字段后作为初始数据
  • 然后将初始数据进行加密
  • 返回加密后的数据,并且在返回的响应头中添加 nonce 字段作为解密的密钥,并且将 x-encrypt 响应头设置为 true

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import base64
import time
from hashlib import md5

import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad


def decrypt(k, data, mode=AES.MODE_ECB):
aes = AES.new(k.encode(), mode)
result = aes.decrypt(base64.b64decode(data))
return unpad(result, block_size=16).decode()


class CustomSession(requests.Session):
def send(self, request, **kwargs):
tt = int(time.time() * 1000)
request.headers['x-timestamp'] = f'{tt}'
request.headers['x-sign'] = f'{md5(str(tt).encode()).hexdigest()}'
print(f'request.headers: {request.headers}')
print(f'request.body: {request.body.decode()}')
response = super().send(request, **kwargs)
print(f'response.headers: {response.headers}')
print(f'response.body: {response.text}')
if response.headers.get('x-encrypt') == 'true':
k = response.headers.get('nonce').replace('-', '')[:16]
enc = response.json().get('data')
print(f'encrypt data: {enc}')
r = decrypt(k, enc, AES.MODE_ECB)
print(f'decrypt data: {r}')
setattr(response, '_content', r.encode())
return response
return response


if __name__ == '__main__':
session = CustomSession()
res = session.post(url='http://127.0.0.1:5000/', json={'data': 'test123'})
print(f'response is:{res.json()}')

在客户端代码,我们继承了 session 类,并且重写了 send 方法,加入了自定义的逻辑,自定义的这块逻辑就是本篇文章的核心代码了,自定义的逻辑中总共做了几件事:

  • 首先获取了当前的时间戳,将时间戳添加到 x-timestamp 请求头中
  • 对时间戳字段取 md5 作为 sign 字段添加到 x-sign 请求头中
  • 执行正常的 send 方法逻辑
  • 判断是否存在 x-encrypt 字段并且字段的内容是否等于 true
  • 获取响应头中的 nonce 字段,并且去掉 - 后取前十六个字节作为密钥
  • 获取加密后的数据,使用上面获取到的密钥进行解密
  • 将解密后的 json 重新赋值给响应对象,返回响应

以上就是客户端和服务端的代码和执行逻辑。

当然了,这篇文章想要说的是自动加解密的逻辑,这里为了演示方便只用到了响应加密,并没有添加请求加密的逻辑,有需求的小伙伴可以自行动手添加。主要是的思路就是重写 send 方法,在 send 方法的前后,添加自定义的请求加密和响应解密的逻辑。

先看下运行结果:

服务端日志:

1
2
3
4
5
6
127.0.0.1 - - [22/Nov/2024 21:11:23] "POST / HTTP/1.1" 200 -
get timestamp is :1732281083065
get sign is :a981a12b2bdd5bd795c5323c516e2407
json data is:{'data': 'test123'}
before encrypt data is:{'encrypt': 'test123test123a981a12b2bdd5bd795c5323c516e2407'}
after encrypt data is:5qzFCwzBoTi4eA3NLW7Z8ZA/4jh5QLBRwwn8uPCYncZowxrRhqjrmQnfnC2keyN6q8Z0CrqrNej5s69A075XkA==

客户端日志:

1
2
3
4
5
6
7
request.headers: {'User-Agent': 'python-requests/2.32.3', 'Accept-Encoding': 'gzip, deflate', 'Accept': '*/*', 'Connection': 'keep-alive', 'Content-Length': '19', 'Content-Type': 'application/json', 'x-timestamp': '1732281083065', 'x-sign': 'a981a12b2bdd5bd795c5323c516e2407'}
request.body: {"data": "test123"}
response.headers: {'Server': 'Werkzeug/3.1.3 Python/3.10.9', 'Date': 'Fri, 22 Nov 2024 13:11:23 GMT', 'Content-Type': 'application/json', 'x-encrypt': 'true', 'nonce': '3d783f6f-b0d3-43dd-8f96-14db015b8f68', 'Content-Length': '100', 'Connection': 'close'}
response.body: {"data": "5qzFCwzBoTi4eA3NLW7Z8ZA/4jh5QLBRwwn8uPCYncZowxrRhqjrmQnfnC2keyN6q8Z0CrqrNej5s69A075XkA=="}
encrypt data: 5qzFCwzBoTi4eA3NLW7Z8ZA/4jh5QLBRwwn8uPCYncZowxrRhqjrmQnfnC2keyN6q8Z0CrqrNej5s69A075XkA==
decrypt data: {"encrypt": "test123test123a981a12b2bdd5bd795c5323c516e2407"}
response is:{'encrypt': 'test123test123a981a12b2bdd5bd795c5323c516e2407'}

从客户端日志中,可以看到,请求的 body 是明文的值,请求头中包含一个时间戳字段和 sign 字段用来给服务端做校验,被发送出去后接收到的是加密后的数据,并且响应头中多了一个 nonce 字段,作为密钥用来解密响应数据,将响应数据解密后可以得到两个我们需要的响应。

从服务端日志中,可以看到,显示获取了时间戳字段和 sign 字段,并且经过处理后,将数据加密返回给客户端。

抓包验证

接下来抓包验证一下,是否如上文所说的响应是加密的,请求是明文的

响应

image-20241122212749150

请求

image-20241122212951719

从截图中可以看到,在发送请求时,请求是明文的,在接收到响应时,响应是加密的,这里其实可以拓宽一下,如果我们将请求加密,那么整个请求和响应就都是加密的了,但是因为重写了 sessionsend 方法,其实并不会对整体的业务代码有很大的入侵,可以更加方便的更改或者判断是否需要加解密,应该如何加密和解密。这里提供了极大的灵活性,并且可以做到对业务 0 入侵。

总结

以上就是自动加解密的所有逻辑了,希望对大家有所帮助。

本文章首发于个人博客 LLLibra146’s blog
本文作者:LLLibra146
更多文章请关注:qrcode
版权声明:本博客所有文章除特别声明外,均采用 © BY-NC-ND 许可协议。非商用转载请注明出处!严禁商业转载!
本文链接
https://blog.d77.xyz/archives/b513fd4f.html