1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| import asyncio import datetime import json import random
import aredis import websockets
async def producer(): conn = aredis.StrictRedis(host='192.168.233.128', decode_responses=True) await conn.flushdb()
async def write_data(): data = {'result': [{f'alarm': datetime.datetime.now().__str__()[:-7]}]} await conn.xadd(name='message', entry={'data': json.dumps(data)}, max_len=5, approximate=False)
await write_data() await conn.xgroup_create('message', 'alarm', stream_id=0) while True: await write_data() print(f'产生了一条数据,当前队列长度为:{await conn.xlen("message")}') await asyncio.sleep(random.randint(2, 5))
class PushMessage(object): def __init__(self): self.conn = aredis.StrictRedis(host='192.168.233.128', decode_responses=True) self.server = websockets.serve(self.handler, "0.0.0.0", 8765)
async def read_data(self): result = await self.conn.xreadgroup(group='alarm', consumer_id='alarm_read', count=1, message='>') if result and (result := result['message']): return result[0]
async def pending_data(self): result = await self.conn.xpending('message', 'alarm') if any(result): count, min_, max_, [[consumers, _]] = result r = await self.conn.xpending(name='message', group='alarm', start=min_, end=max_, count=count, consumer=consumers) for a, _, _, _ in r: await self.conn.xack('message', 'alarm', a) print(f'清除掉未完成数据:{a}')
async def ack(self, stream_id): await self.conn.xack('message', 'alarm', stream_id)
async def handler(self, websocket, path): print(f'客户端{websocket.local_address}连接{path}') while True: try: await self.pending_data() result = await self.read_data() if result: stream_id, data = result print(f'收到了数据:{data},stream_id为:{stream_id}') await websocket.send(data['data']) await self.ack(stream_id) await asyncio.sleep(0.1) except websockets.exceptions.ConnectionClosed as e: print(f'退出,{e}') break
if __name__ == '__main__': loop = asyncio.get_event_loop() loop.create_task(producer()) loop.run_until_complete(PushMessage().server) print('start server') loop.run_forever()
|