基于redis的stream实现简单的websocket推送

前言

上篇文章写了 redis 的 stream 数据类型笔记,这篇文章使用学到的知识来搭建一个 websocket 信息推送系统。

环境搭建

需要一个 redis 服务器,端口默认就好。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import asyncio
import datetime
import json
import random

import aredis
import websockets


async def producer():
conn = aredis.StrictRedis(host='192.168.233.128', decode_responses=True)
# 启动前清除数据库
await conn.flushdb()

async def write_data():
data = {'result': [{f'alarm': datetime.datetime.now().__str__()[:-7]}]}
# 将消息队列长度限制在5,测试用
await conn.xadd(name='message', entry={'data': json.dumps(data)}, max_len=5,
approximate=False)

# 先写入一条数据,会自动创建message
await write_data()
# 然后创建消费者组,防止下面读取报错,从头开始读取数据
await conn.xgroup_create('message', 'alarm', stream_id=0)
# 循环读取
while True:
await write_data()
print(f'产生了一条数据,当前队列长度为:{await conn.xlen("message")}')
await asyncio.sleep(random.randint(2, 5))


class PushMessage(object):
def __init__(self):
self.conn = aredis.StrictRedis(host='192.168.233.128', decode_responses=True)
self.server = websockets.serve(self.handler, "0.0.0.0", 8765)

async def read_data(self):
result = await self.conn.xreadgroup(group='alarm', consumer_id='alarm_read', count=1, message='>')
# 读取数据,如果数据不为空返回
if result and (result := result['message']):
return result[0]

async def pending_data(self):
result = await self.conn.xpending('message', 'alarm')
if any(result):
count, min_, max_, [[consumers, _]] = result
r = await self.conn.xpending(name='message', group='alarm', start=min_, end=max_, count=count,
consumer=consumers)
for a, _, _, _ in r:
# 未确认的消息可以处理,也可以直接确认忽略掉,这里忽略了
await self.conn.xack('message', 'alarm', a)
print(f'清除掉未完成数据:{a}')

async def ack(self, stream_id):
await self.conn.xack('message', 'alarm', stream_id)

async def handler(self, websocket, path):
print(f'客户端{websocket.local_address}连接{path}')
# 可以根据路径来判断给它推送哪个流的消息
# if path=='/':
# pass
while True:
try:
await self.pending_data()
result = await self.read_data()
if result:
stream_id, data = result
print(f'收到了数据:{data},stream_id为:{stream_id}')
await websocket.send(data['data'])
await self.ack(stream_id)
await asyncio.sleep(0.1)
except websockets.exceptions.ConnectionClosed as e:
print(f'退出,{e}')
break


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.create_task(producer())
loop.run_until_complete(PushMessage().server)
print('start server')
loop.run_forever()

思路

简单叙述下思路:

  • 首先创建一个事件循环,将生产者任务放到事件循环去运行,之后启动 websocket 服务器,开始提供服务。
  • 生产者任务用来产生消息,因为只是演示,所以并没有实际的消息生产者,这里使用自己的函数来产生数据。
  • 这里是一个简单的推送系统,所以没有添加认证功能,如果有需要也可以在 handler 开头添加认证功能。
  • 定时轮询是否有新消息到来,将新消息发送给对应的客户端。
  • 如果发送失败了证明客户端已经断开了连接,退出循环。

image-20201110102238106

这里要注意顺序,先要获取数据,判断数据是否为空,先发送数据,然后再发送 axk,因为如果消息发送失败的话不会执行 ack 的代码,确保了消息不会丢失,未被 ack 的消息会在下一次客户端连接时首先被确认。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<!DOCTYPE html>
<html>
<head>
<title>WebSocket demo</title>
</head>
<body>
<script>
var ws = new WebSocket("ws://127.0.0.1:8765/"),
messages = document.createElement('ul');
ws.onmessage = function (event) {
var messages = document.getElementsByTagName('ul')[0],
message = document.createElement('li'),
content = document.createTextNode(event.data);
message.appendChild(content);
messages.appendChild(message);
};
document.body.appendChild(messages);
</script>
</body>
</html>

开启服务器后打开测试网页,可以看到网页上不断的刷新消息。

image-20201110102749923

总结

基于 websockets 库和 aredis 库实现了全异步的 websocket 推送系统,可以根据需要在此基础上添加其他功能。如认证功能,如未确认的消息重新消费功能等等。

参考链接

https://websockets.readthedocs.io/en/stable/index.html

https://aredis.readthedocs.io/en/latest/

https://redis.io/commands

本文章首发于个人博客 LLLibra146’s blog
本文作者:LLLibra146
版权声明:本博客所有文章除特别声明外,均采用 © BY-NC-ND 许可协议。非商用转载请注明出处!严禁商业转载!
本文链接https://blog.d77.xyz/archives/a730105d.html