-
Notifications
You must be signed in to change notification settings - Fork 1
/
tst.py
58 lines (51 loc) · 1.67 KB
/
tst.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import asyncio
import signal
MAX_SOCKETS = 400
DELAY = 0.01
async def connect_socket(i):
try:
reader, writer = await asyncio.open_connection("127.0.0.1", 6669)
print("Connected!")
writer.write("PASS ss\r\n".encode())
await asyncio.sleep(DELAY)
writer.write(f"NICK users{i}\r\n".encode())
await asyncio.sleep(DELAY)
writer.write(f"USER a{i} 0 * a\r\n".encode())
await asyncio.sleep(DELAY)
writer.write("JOIN #general\r\n".encode())
await asyncio.sleep(DELAY)
await writer.drain()
while True:
# data = await reader.read(1024)
# if not data:
# break
# response = data.decode()
# print(f"Received: {response}")
await asyncio.sleep(DELAY)
writer.write(f"PRIVMSG #general :A7san Server Fl3alam{i}\r\n".encode())
await writer.drain()
except asyncio.CancelledError:
print(f"Task {i} was cancelled.")
except Exception as e:
print(f"Error: {e}")
async def main():
tasks = []
for i in range(MAX_SOCKETS):
task = asyncio.create_task(connect_socket(i))
tasks.append(task)
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
print("Main task was cancelled.")
async def run_main():
try:
await main()
except asyncio.CancelledError:
print("Main task was cancelled.")
def signal_handler(signal, frame):
print("Ctrl+C pressed. Cleaning up...")
for task in asyncio.all_tasks():
task.cancel()
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
asyncio.run(run_main())