Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

App is getting stuck, pool never closed with no exception in docker container #185

Open
itayB opened this issue Mar 19, 2023 · 0 comments
Open

Comments

@itayB
Copy link

itayB commented Mar 19, 2023

Description

I have a simple module that download avro data files (~50-200 files, each one can be between 1-50MB) from S3 and index the data to Elasticsearch.
This module is running in a docker container (within Kubernetes).
I was trying to use aiomultiprocess to speed up the process by running it in parallel with more resources (4 cores).
I have noticed that the module is getting stuck too often (keep running, doing nothing) and after a long research I found that it's a memory issue (although I didn't get Out Of Memory kill event from Kubernetes).
Is there a way to raise an exception in such case? I want to be alerted if my app is getting stuck so I could tune the memory and rerun the tasks again.

Below you can see my effort to reproduce this behavior in a simple task (just a stupid loop to fill memory) instead of downloading files / indexing them to database.
Running the code below (and also here) with memory limit of 2g never ends, while changing it to 3g finish successfully.

docker build -t aiomultiprocess_mem .
docker run --rm --memory=2g --cpus=5 --name=mem aiomultiprocess_mem

Output (at some point only heartbeat logs keeps infinitely):

2023-03-19 15:31:20,457 1 INFO               creating pool [app.py:38]
2023-03-19 15:31:20,524 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:20,584 14 INFO               T000 started! [app.py:27]
2023-03-19 15:31:20,589 18 INFO               T003 started! [app.py:27]
2023-03-19 15:31:20,596 22 INFO               T006 started! [app.py:27]
2023-03-19 15:31:20,600 26 INFO               T009 started! [app.py:27]
2023-03-19 15:31:21,525 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:22,525 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:23,527 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:24,387 18 INFO               T003 100000 [app.py:31]
2023-03-19 15:31:24,386 26 INFO               T009 100000 [app.py:31]
2023-03-19 15:31:24,391 22 INFO               T006 100000 [app.py:31]
2023-03-19 15:31:24,527 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:24,610 31 INFO               T012 started! [app.py:27]
2023-03-19 15:31:25,528 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:31:25,695 31 INFO               T012 100000 [app.py:31]
2023-03-19 15:31:26,261 31 INFO               T013 started! [app.py:27]
...
...
2023-03-19 15:33:34,095 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:35,096 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:36,097 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:37,097 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:38,098 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:39,099 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:40,100 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:41,101 1 INFO               heartbeat alive 4 processes [app.py:22]
2023-03-19 15:33:42,101 1 INFO               heartbeat alive 4 processes [app.py:22]
...

Code:

├── Dockerfile
└── multi
    ├── __init__.py
    ├── __main__.py
    └── app.py

Dockerfile:

FROM python:3.11.2-slim-bullseye
WORKDIR /
RUN pip install --no-cache-dir --upgrade pip==23.0.1 \
 && pip install --no-cache-dir aiomultiprocess==0.9.0
COPY multi /multi
CMD ["python", "-m", "multi"]

__main__.py:

import asyncio

from multi.app import main


if __name__ == "__main__":
    asyncio.run(main())

app.py:

import asyncio
import logging
import os
import sys

from aiomultiprocess import Pool

logger = logging.getLogger(__name__)


def init_logger():
    logging.basicConfig(
        format="%(asctime)-15s %(process)d %(levelname)-18.18s %(message)s [%(filename)s:%(lineno)d]",
        stream=sys.stdout
    )
    logging.root.setLevel(logging.INFO)


async def is_alive(pool):
    while True:
        if pool is not None:
            logger.info(f"heartbeat alive {len(pool.processes.keys())} processes")
        await asyncio.sleep(1)


async def my_mem_task(task_id):
    logger.info(f"T{task_id:03} started!")
    data = []
    for i in range(100_000):
        data.append([i] * 1_000)
    logger.info(f"T{task_id:03} {len(data)}")


async def main():
    init_logger()
    number_of_processes = int(os.getenv("NUMBER_OF_PROCESSES", "4"))
    number_of_async_tasks = int(os.getenv("NUMBER_OF_ASYNC_TASKS", "3"))
    logger.info("creating pool")
    async with Pool(
            processes=number_of_processes,
            childconcurrency=number_of_async_tasks,
            initializer=init_logger,
    ) as pool:
        asyncio.create_task(is_alive(pool))
        task_ids = [task_id for task_id in range(150)]
        await pool.map(my_mem_task, task_ids)
        pool.close()
        logger.info("processes pool closed")
        await pool.join()
        logger.info("all processes are done")

UPDATE:
Same happen in non aio version multiprocessing :( (added a relevant question in SO)

Details

  • OS:
  • Python version: 3.11.2
  • aiomultiprocess version: 0.9.0
  • Can you repro on master? yes
  • Can you repro in a clean virtualenv? yes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant