Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool not utilising available CPUs #104

Open
chrisk314 opened this issue Jul 9, 2021 · 2 comments
Open

Pool not utilising available CPUs #104

chrisk314 opened this issue Jul 9, 2021 · 2 comments

Comments

@chrisk314
Copy link

chrisk314 commented Jul 9, 2021

Description

Hi, I'm sure this will turn out to be more of a question rather than a bug, anyway... I'm trying to run a very basic example script following from the example provided in the repo. I'm using aiomultiprocess.Pool to execute a task which sleeps and then does some CPU intensive work. I've timed the execution and it takes the same amount of time with aiomultiprocess.Pool when running with 1 process or 2, or 4, or 8.

Further to this, I have added equivalent implementations using asyncio (which just runs on a single process of course) and multiprocessing.Pool. The asyncio version takes the same amount of time as the aiomultiprocess.Pool version. The multiprocessing.Pool version takes less than half the time when running with 4 cores which is in the range of my expectations.

As well as the timing results I can also see from watching htop during the experiment that the aiomultiprocess and asyncio versions are maxing out a single CPU whilst the multiprocess version is using around 80% on 4 of 8 of the available logical cores on my machine at any given moment. As an aside, is there a better way to check CPU utilisation of python scripts rather than eye balling htop?

Here's an example script showing the three implementations. I think I've done everything correctly as far as I can tell but clearly I must be missing something! I like the look of aiomultiprocess and would like to use it in a project at work so any help getting it working would be much appreciated!

#!/usr/bin/env python3
"""A simple example of multiprocessing + asyncio with the aiomultiprocess library.
"""

import asyncio
import multiprocessing
import time
from typing import Any, Dict, List

import aiomultiprocess


SLEEP_TIME = 0.1
CALC_ITERS = 100000000


def calc(iters=CALC_ITERS) -> float:
    """Performs FLOPS in a loop with specified number of iterations."""
    x = 0.0
    for i in range(1, iters + 1):
        x = (x + i) / i
    return x


async def exec_and_sleep(*args: List, **kwargs: Dict) -> float:
    """Sleeps and then runs a CPU bound function."""
    await asyncio.sleep(SLEEP_TIME)
    return calc()


def get_or_create_event_loop():
    try:
        return asyncio.get_event_loop()
    except RuntimeError as ex:
        if "There is no current event loop in thread" in str(ex):
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            return asyncio.get_event_loop()


def wrap_async(fn, args, kwargs) -> Any:
    loop = get_or_create_event_loop()
    result = loop.run_until_complete(fn(*args, **kwargs))
    return result


async def main_aiomultiprocess() -> None:
    async with aiomultiprocess.Pool(processes=4) as pool:
        results = await pool.map(exec_and_sleep, [() for _ in range(4)])
        print(results)


async def main_asyncio() -> None:
    results = await asyncio.gather(*[exec_and_sleep() for _ in range(4)])
    print(results)


def main_multiprocessing() -> None:
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.starmap(wrap_async, [(exec_and_sleep, (), {}) for _ in range(4)])
        print(results)


if __name__ == "__main__":
    t_start = time.monotonic()
    asyncio.run(main_aiomultiprocess())
    t_total = time.monotonic() - t_start
    print(f"Execution of {main_aiomultiprocess.__name__} took {t_total:.2f}s")

    t_start = time.monotonic()
    asyncio.run(main_asyncio())
    t_total = time.monotonic() - t_start
    print(f"Execution of {main_asyncio.__name__} took {t_total:.2f}s")

    t_start = time.monotonic()
    main_multiprocessing()
    t_total = time.monotonic() - t_start
    print(f"Execution of {main_multiprocessing.__name__} took {t_total:.2f}s")

Here's the results of running the script

(.venv-dev) csk@aquila: PMF $ aiomultiprocess-simple-example.py
[1.0000000100000002, 1.0000000100000002, 1.0000000100000002, 1.0000000100000002]
Execution of main_aiomultiprocess took 23.48s
[1.0000000100000002, 1.0000000100000002, 1.0000000100000002, 1.0000000100000002]
Execution of main_asyncio took 23.36s
[1.0000000100000002, 1.0000000100000002, 1.0000000100000002, 1.0000000100000002]
Execution of main_multiprocessing took 10.52s

Details

  • OS: Ubuntu 20.04 LTS
  • Python version: 3.7.11
  • aiomultiprocess version: 0.9.0
  • Can you repro on master? Not checked
  • Can you repro in a clean virtualenv? Yes
@fz-gaojian
Copy link

@jreese I have come across this confusion, and I would be grateful if you could solve it

@richardalanjones
Copy link

@chrisk314 @fz-gaojian The default child_concurrency is 16, change to 0 or 1 and behavior would be as you expected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants