Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core) implement client request rate limiting #683

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ def __init__(
ca=None,
use_ssl=False,
verify_certs=True,
concurrent_request_limit=0,
**kwargs,
):
"""Create a :class:`KazooClient` instance. All time arguments
Expand Down Expand Up @@ -241,6 +242,18 @@ def __init__(
self.keyfile = keyfile
self.keyfile_password = keyfile_password
self.ca = ca
if concurrent_request_limit > 0:
self.logger.info(
"Zookeeper client rate-limited to %d concurrent requests",
concurrent_request_limit,
)
self.rate_limiting_sem = self.handler.semaphore_impl(
concurrent_request_limit
)

else:
self.rate_limiting_sem = None

# Curator like simplified state tracking, and listeners for
# state transitions
self._state = KeeperState.CLOSED
Expand Down Expand Up @@ -635,6 +648,16 @@ def _call(self, request, async_object):
async_object.set_exception(SessionExpiredError())
return False

if self.rate_limiting_sem:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can a test be added to check that we get blocked and then get released?

if not self.rate_limiting_sem.acquire(blocking=False):
self.logger.info(
"Limiting concurrent requests. Waiting for completion."
)
# Actually block on the sempahore here
self.rate_limiting_sem.acquire(blocking=True)
# Register the release of the semaphore on async request completion
async_object.rawlink(lambda _res: self.rate_limiting_sem.release())
ceache marked this conversation as resolved.
Show resolved Hide resolved

self._queue.append((request, async_object))

# wake the connection, guarding against a race with close()
Expand Down
2 changes: 2 additions & 0 deletions kazoo/handlers/eventlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from eventlet.green import threading as green_threading
from eventlet.green import selectors as green_selectors
from eventlet import queue as green_queue
from eventlet import semaphore as green_semaphore

from kazoo.handlers import utils
from kazoo.handlers.utils import selector_select
Expand Down Expand Up @@ -80,6 +81,7 @@ class SequentialEventletHandler(object):
name = "sequential_eventlet_handler"
queue_impl = green_queue.LightQueue
queue_empty = green_queue.Empty
semaphore_impl = green_semaphore.BoundedSemaphore

def __init__(self):
"""Create a :class:`SequentialEventletHandler` instance"""
Expand Down
7 changes: 6 additions & 1 deletion kazoo/handlers/gevent.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""A gevent based handler."""

from __future__ import absolute_import

import atexit
Expand All @@ -14,7 +15,10 @@

from kazoo.handlers.utils import selector_select

from gevent.lock import Semaphore, RLock
from gevent.lock import (
BoundedSemaphore as Semaphore,
RLock as RLock,
)

from kazoo.handlers import utils

Expand Down Expand Up @@ -52,6 +56,7 @@ class SequentialGeventHandler(object):
queue_impl = gevent.queue.Queue
queue_empty = gevent.queue.Empty
sleep_func = staticmethod(gevent.sleep)
semaphore_impl = Semaphore

def __init__(self):
"""Create a :class:`SequentialGeventHandler` instance"""
Expand Down
2 changes: 2 additions & 0 deletions kazoo/handlers/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
:class:`~kazoo.handlers.gevent.SequentialGeventHandler` instead.

"""

from __future__ import absolute_import

import atexit
Expand Down Expand Up @@ -95,6 +96,7 @@ class SequentialThreadingHandler(object):
sleep_func = staticmethod(time.sleep)
queue_impl = queue.Queue
queue_empty = queue.Empty
semaphore_impl = threading.BoundedSemaphore

def __init__(self):
"""Create a :class:`SequentialThreadingHandler` instance"""
Expand Down