diff --git a/kytos/core/buffers/__init__.py b/kytos/core/buffers/__init__.py new file mode 100644 index 00000000..cd69e93e --- /dev/null +++ b/kytos/core/buffers/__init__.py @@ -0,0 +1,4 @@ +"""Module for kytos buffers""" + +from .buffers import KytosEventBuffer +from .manager import KytosBuffers diff --git a/kytos/core/buffers.py b/kytos/core/buffers/buffers.py similarity index 60% rename from kytos/core/buffers.py rename to kytos/core/buffers/buffers.py index 27cc3bbc..68e53bce 100644 --- a/kytos/core/buffers.py +++ b/kytos/core/buffers/buffers.py @@ -1,12 +1,7 @@ """Kytos Buffer Classes, based on Python Queue.""" import logging -from janus import PriorityQueue, Queue - -from kytos.core.events import KytosEvent -from kytos.core.helpers import get_thread_pool_max_workers - -__all__ = ('KytosBuffers', ) +from janus import Queue LOG = logging.getLogger(__name__) @@ -14,8 +9,7 @@ class KytosEventBuffer: """KytosEventBuffer represents a queue to store a set of KytosEvents.""" - def __init__(self, name, event_base_class=None, maxsize=0, - queue_cls=Queue): + def __init__(self, name, queue: Queue = None): """Contructor of KytosEventBuffer receive the parameters below. Args: @@ -25,8 +19,7 @@ def __init__(self, name, event_base_class=None, maxsize=0, queue_cls (class): queue class from janus """ self.name = name - self._event_base_class = event_base_class - self._queue = queue_cls(maxsize=maxsize) + self._queue = queue if queue is not None else Queue() self._reject_new_events = False def put(self, event): @@ -121,55 +114,3 @@ def empty(self): def full(self): """Return True if KytosEventBuffer is full of KytosEvent.""" return self._queue.sync_q.full() - - -class KytosBuffers: - """Set of KytosEventBuffer used in Kytos.""" - - def __init__(self): - """Build four KytosEventBuffers. - - :attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events - received from connection events. - - :attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events - received from network. - - :attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with - events to be received. - - :attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with - events to be sent. - - :attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events - sent to NApps. - """ - self._pool_max_workers = get_thread_pool_max_workers() - self.conn = KytosEventBuffer("conn") - self.raw = KytosEventBuffer("raw", maxsize=self._get_maxsize("sb")) - self.msg_in = KytosEventBuffer("msg_in", - maxsize=self._get_maxsize("sb"), - queue_cls=PriorityQueue) - self.msg_out = KytosEventBuffer("msg_out", - maxsize=self._get_maxsize("sb"), - queue_cls=PriorityQueue) - self.app = KytosEventBuffer("app", maxsize=self._get_maxsize("app")) - - def get_all_buffers(self): - """Get all KytosEventBuffer instances.""" - return [ - event_buffer for event_buffer in self.__dict__.values() - if isinstance(event_buffer, KytosEventBuffer) - ] - - def _get_maxsize(self, queue_name): - """Get queue maxsize if it's been set.""" - return self._pool_max_workers.get(queue_name, 0) - - def send_stop_signal(self): - """Send a ``kytos/core.shutdown`` event to each buffer.""" - LOG.info('Stop signal received by Kytos buffers.') - LOG.info('Sending KytosShutdownEvent to all apps.') - event = KytosEvent(name='kytos/core.shutdown') - for buffer in self.get_all_buffers(): - buffer.put(event) diff --git a/kytos/core/buffers/factory.py b/kytos/core/buffers/factory.py new file mode 100644 index 00000000..ba305a56 --- /dev/null +++ b/kytos/core/buffers/factory.py @@ -0,0 +1,54 @@ +"""Utilities for composing KytosEventBuffers""" + +from janus import PriorityQueue, Queue + +from kytos.core.helpers import get_thread_pool_max_workers + +from .buffers import KytosEventBuffer + +queue_classes = { + 'queue': Queue, + 'priority': PriorityQueue, +} + + +def process_queue(config: dict) -> Queue: + """ + Create a janus queue from a given config dict + """ + queue_type = queue_classes[config.get('type', 'queue')] + queue_size = config.get('maxsize', 0) + if isinstance(queue_size, str): + if queue_size.startswith('threadpool_'): + threadpool = queue_size[len('threadpool_'):] + queue_size = get_thread_pool_max_workers().get(threadpool, 0) + else: + raise TypeError( + 'Expected int or str formatted ' + 'as "threadpool_{threadpool_name}"' + ) + return queue_type(maxsize=queue_size) + + +extension_processors = {} + + +def buffer_from_config(name: str, config: dict) -> KytosEventBuffer: + """ + Create a KytosEventBuffer from a given config dict + """ + buffer_cls = KytosEventBuffer + args = {} + # Process Queue Config + args['queue'] = process_queue(config.get('queue', {})) + + buffer = buffer_cls(name, **args) + + # Process Mixins + extensions: dict = config.get('extensions', []) + for extension in extensions: + extension_type = extension['type'] + extension_args = extension.get('args', {}) + buffer = extension_processors[extension_type](buffer, extension_args) + + return buffer diff --git a/kytos/core/buffers/manager.py b/kytos/core/buffers/manager.py new file mode 100644 index 00000000..9436703c --- /dev/null +++ b/kytos/core/buffers/manager.py @@ -0,0 +1,80 @@ +"""Kytos Buffer Classes, based on Python Queue.""" +import logging + +from janus import PriorityQueue, Queue + +from kytos.core.config import KytosConfig +from kytos.core.events import KytosEvent +from kytos.core.helpers import get_thread_pool_max_workers + +from .buffers import KytosEventBuffer +from .factory import buffer_from_config + +LOG = logging.getLogger(__name__) + + +class KytosBuffers: + """Set of KytosEventBuffer used in Kytos.""" + + def __init__(self): + """Build four KytosEventBuffers. + + :attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events + received from connection events. + + :attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events + received from network. + + :attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with + events to be received. + + :attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with + events to be sent. + + :attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events + sent to NApps. + """ + + self._pool_max_workers = get_thread_pool_max_workers() + + self.conn = KytosEventBuffer("conn") + self.raw = KytosEventBuffer( + "raw", + queue=Queue(maxsize=self._get_maxsize("sb")) + ) + self.msg_in = KytosEventBuffer( + "msg_in", + queue=PriorityQueue(maxsize=self._get_maxsize("sb")), + ) + self.msg_out = KytosEventBuffer( + "msg_out", + queue=PriorityQueue(maxsize=self._get_maxsize("sb")), + ) + self.app = KytosEventBuffer( + "app", + queue=Queue(maxsize=self._get_maxsize("app")), + ) + + buffer_conf = KytosConfig().options['daemon'].event_buffer_conf + + for name, config in buffer_conf.items(): + setattr(self, name, buffer_from_config(name, config)) + + def get_all_buffers(self): + """Get all KytosEventBuffer instances.""" + return [ + event_buffer for event_buffer in self.__dict__.values() + if isinstance(event_buffer, KytosEventBuffer) + ] + + def _get_maxsize(self, queue_name): + """Get queue maxsize if it's been set.""" + return self._pool_max_workers.get(queue_name, 0) + + def send_stop_signal(self): + """Send a ``kytos/core.shutdown`` event to each buffer.""" + LOG.info('Stop signal received by Kytos buffers.') + LOG.info('Sending KytosShutdownEvent to all apps.') + event = KytosEvent(name='kytos/core.shutdown') + for buffer in self.get_all_buffers(): + buffer.put(event) diff --git a/kytos/core/config.py b/kytos/core/config.py index d8fb124f..600637f3 100644 --- a/kytos/core/config.py +++ b/kytos/core/config.py @@ -125,32 +125,65 @@ def parse_args(self): 'debug': False} """ - defaults = {'pidfile': os.path.join(BASE_ENV, - 'var/run/kytos/kytosd.pid'), - 'workdir': os.path.join(BASE_ENV, 'var/lib/kytos'), - 'napps': os.path.join(BASE_ENV, 'var/lib/kytos/napps/'), - 'napps_repositories': "['https://napps.kytos.io/repo/']", - 'installed_napps': os.path.join(BASE_ENV, - 'var/lib/kytos/napps/', - '.installed'), - 'conf': os.path.join(BASE_ENV, 'etc/kytos/kytos.conf'), - 'logging': os.path.join(BASE_ENV, 'etc/kytos/logging.ini'), - 'logger_decorators': - ["kytos.core.logger_decorators.queue_decorator"], - 'listen': '0.0.0.0', - 'port': 6653, - 'api_traceback_on_500': True, - 'foreground': False, - 'protocol_name': '', - 'enable_entities_by_default': False, - 'napps_pre_installed': [], - 'authenticate_urls': [], - 'token_expiration_minutes': 180, - 'thread_pool_max_workers': {}, - 'database': '', - 'apm': '', - 'connection_timeout': 130, - 'debug': False} + defaults = { + 'pidfile': os.path.join( + BASE_ENV, + 'var/run/kytos/kytosd.pid' + ), + 'workdir': os.path.join( + BASE_ENV, + 'var/lib/kytos' + ), + 'napps': os.path.join( + BASE_ENV, + 'var/lib/kytos/napps/' + ), + 'napps_repositories': ['https://napps.kytos.io/repo/'], + 'installed_napps': os.path.join( + BASE_ENV, + 'var/lib/kytos/napps/', + '.installed' + ), + 'conf': os.path.join( + BASE_ENV, + 'etc/kytos/kytos.conf' + ), + 'logging': os.path.join( + BASE_ENV, + 'etc/kytos/logging.ini' + ), + 'logger_decorators': [ + "kytos.core.logger_decorators.queue_decorator", + ], + 'listen': '0.0.0.0', + 'port': 6653, + 'api_traceback_on_500': True, + 'foreground': False, + 'protocol_name': '', + 'enable_entities_by_default': False, + 'napps_pre_installed': [], + 'authenticate_urls': [], + 'token_expiration_minutes': 180, + 'thread_pool_max_workers': {}, + 'database': '', + 'apm': '', + 'connection_timeout': 130, + 'debug': False, + 'event_buffer_conf': { + 'msg_out': { + 'queue': { + 'type': 'priority', + 'maxsize': 'threadpool_sb', + }, + }, + 'msg_in': { + 'queue': { + 'type': 'priority', + 'maxsize': 'threadpool_sb', + }, + }, + }, + } options, argv = self.conf_parser.parse_known_args() @@ -184,7 +217,6 @@ def _parse_options(self, argv): options, unknown = self.parser.parse_known_args(argv) if unknown: warnings.warn(f"Unknown arguments: {unknown}") - options.napps_repositories = json.loads(options.napps_repositories) options.debug = options.debug in ['True', True] options.daemon = options.daemon in ['True', True] options.port = int(options.port) @@ -203,11 +235,16 @@ def _parse_json(value): return json.loads(value) return value + options.napps_repositories = _parse_json(options.napps_repositories) options.logger_decorators = _parse_json(options.logger_decorators) options.napps_pre_installed = _parse_json(options.napps_pre_installed) options.authenticate_urls = _parse_json(options.authenticate_urls) - thread_pool_max_workers = options.thread_pool_max_workers - options.thread_pool_max_workers = _parse_json(thread_pool_max_workers) + options.thread_pool_max_workers = _parse_json( + options.thread_pool_max_workers + ) + options.event_buffer_conf = _parse_json( + options.event_buffer_conf + ) return options diff --git a/kytos/templates/kytos.conf.template b/kytos/templates/kytos.conf.template index dec7198c..e3795cfc 100644 --- a/kytos/templates/kytos.conf.template +++ b/kytos/templates/kytos.conf.template @@ -74,6 +74,28 @@ jwt_secret = {{ jwt_secret }} # - db: it can be used by for higher priority db related tasks (need to be parametrized on decorator) thread_pool_max_workers = {"sb": 256, "db": 256, "app": 512} +# Configuration for KytosEventBuffers +# Valid event buffers are "msg_in", "msg_out", "app", "conn", and "raw". +# Valid queue types are: +# - queue: Default queue class provided by janus +# - priority: PriorityQueue class provided by janus + +event_buffer_conf = + { + "msg_out": { + "queue": { + "type": "priority", + "maxsize": "threadpool_sb" + } + }, + "msg_in": { + "queue": { + "type": "priority", + "maxsize": "threadpool_sb" + } + } + } + # Time to expire authentication token in minutes token_expiration_minutes = 180 diff --git a/tests/unit/test_core/test_buffers/__init__.py b/tests/unit/test_core/test_buffers/__init__.py new file mode 100644 index 00000000..f24e51ba --- /dev/null +++ b/tests/unit/test_core/test_buffers/__init__.py @@ -0,0 +1 @@ +"""Test kytos buffers functionalities.""" diff --git a/tests/unit/test_core/test_buffers.py b/tests/unit/test_core/test_buffers/test_buffers.py similarity index 97% rename from tests/unit/test_core/test_buffers.py rename to tests/unit/test_core/test_buffers/test_buffers.py index 913f028a..2f6f85af 100644 --- a/tests/unit/test_core/test_buffers.py +++ b/tests/unit/test_core/test_buffers/test_buffers.py @@ -3,6 +3,7 @@ from unittest.mock import MagicMock import pytest +from janus import Queue from kytos.core.buffers import KytosBuffers, KytosEventBuffer from kytos.core.events import KytosEvent @@ -103,7 +104,7 @@ async def test_empty(self): async def test_full(self): """Test full method to full.""" - kytos_event_buffer = KytosEventBuffer("name", maxsize=1) + kytos_event_buffer = KytosEventBuffer("name", queue=Queue(maxsize=1)) assert not kytos_event_buffer.full() assert kytos_event_buffer.empty() diff --git a/tests/unit/test_core/test_buffers/test_factory.py b/tests/unit/test_core/test_buffers/test_factory.py new file mode 100644 index 00000000..1f4c75ca --- /dev/null +++ b/tests/unit/test_core/test_buffers/test_factory.py @@ -0,0 +1,376 @@ +"""Test kytos.core.buffers.factory module.""" +from unittest.mock import MagicMock + +import pytest + +from kytos.core.buffers import factory + + +class TestBufferFromConfig: + """Tests for from_buffer_config""" + + async def test_defaults(self, monkeypatch): + """Tests creating a buffer from default values""" + name = 'Test Unit 1' + input_config = {} + + buffer_cls_mock = MagicMock() + buffer_mock = MagicMock() + buffer_cls_mock.return_value = buffer_mock + + process_queue_mock = MagicMock() + queue_mock = MagicMock() + process_queue_mock.return_value = queue_mock + + monkeypatch.setattr( + "kytos.core.buffers.factory.KytosEventBuffer", + buffer_cls_mock + ) + monkeypatch.setattr( + "kytos.core.buffers.factory.process_queue", + process_queue_mock + ) + monkeypatch.setattr( + "kytos.core.buffers.factory.extension_processors", + {} + ) + + result_buffer = factory.buffer_from_config(name, input_config) + + assert result_buffer == buffer_mock + buffer_cls_mock.assert_called_once_with( + name, + queue=queue_mock, + ) + process_queue_mock.assert_called_once_with( + {} + ) + + async def test_bad_extension(self, monkeypatch): + """Tests creating a buffer with an invalid extension""" + + name = 'Test Unit 2' + input_config = { + 'extensions': [ + { + 'type': 'Nonexistant', + }, + ] + } + + buffer_cls_mock = MagicMock() + buffer_mock = MagicMock() + buffer_cls_mock.return_value = buffer_mock + + process_queue_mock = MagicMock() + queue_mock = MagicMock() + process_queue_mock.return_value = queue_mock + + monkeypatch.setattr( + "kytos.core.buffers.factory.KytosEventBuffer", + buffer_cls_mock + ) + monkeypatch.setattr( + "kytos.core.buffers.factory.process_queue", + process_queue_mock + ) + monkeypatch.setattr( + "kytos.core.buffers.factory.extension_processors", + {} + ) + + with pytest.raises(KeyError): + factory.buffer_from_config(name, input_config) + + async def test_good_extension(self, monkeypatch): + """Tests creating a buffer with an valid extension""" + + name = 'Test Unit 3' + input_config = { + 'extensions': [ + { + 'type': 'existant', + 'args': 'Unique value' + }, + ] + } + + buffer_cls_mock = MagicMock() + buffer_mock = MagicMock() + buffer_cls_mock.return_value = buffer_mock + + process_queue_mock = MagicMock() + queue_mock = MagicMock() + process_queue_mock.return_value = queue_mock + + extension_processor_mock = MagicMock() + extended_instance_mock = MagicMock + extension_processor_mock.return_value = extended_instance_mock + + monkeypatch.setattr( + "kytos.core.buffers.factory.KytosEventBuffer", + buffer_cls_mock + ) + monkeypatch.setattr( + "kytos.core.buffers.factory.process_queue", + process_queue_mock + ) + monkeypatch.setattr( + "kytos.core.buffers.factory.extension_processors", + {'existant': extension_processor_mock} + ) + + result_buffer = factory.buffer_from_config(name, input_config) + + assert result_buffer == extended_instance_mock + extension_processor_mock.assert_called_once_with( + buffer_mock, + 'Unique value' + ) + buffer_cls_mock.assert_called_once_with( + name, + queue=queue_mock, + ) + process_queue_mock.assert_called_once_with( + {} + ) + + async def test_queue_config(self, monkeypatch): + """Tests creating a buffer with queue configurations""" + name = 'Test Unit 4' + input_config = { + 'queue': 'Unique value 2' + } + + buffer_cls_mock = MagicMock() + buffer_mock = MagicMock() + buffer_cls_mock.return_value = buffer_mock + + process_queue_mock = MagicMock() + queue_mock = MagicMock() + process_queue_mock.return_value = queue_mock + + monkeypatch.setattr( + "kytos.core.buffers.factory.KytosEventBuffer", + buffer_cls_mock + ) + monkeypatch.setattr( + "kytos.core.buffers.factory.process_queue", + process_queue_mock + ) + monkeypatch.setattr( + "kytos.core.buffers.factory.extension_processors", + {} + ) + + result_buffer = factory.buffer_from_config(name, input_config) + + assert result_buffer == buffer_mock + buffer_cls_mock.assert_called_once_with( + name, + queue=queue_mock, + ) + process_queue_mock.assert_called_once_with( + 'Unique value 2' + ) + + +class TestProcessQueue: + """Tests for process_queue""" + + def test_defaults(self, monkeypatch): + """Tests creating a queue from default values""" + input_config = {} + + mock_queue_cls = MagicMock() + mock_queue = MagicMock() + mock_queue_cls.return_value = mock_queue + + mock_priority_queue_cls = MagicMock() + mock_priority_queue = MagicMock() + mock_priority_queue_cls.return_value = mock_priority_queue + + monkeypatch.setattr( + "kytos.core.buffers.factory.queue_classes", + { + "queue": mock_queue_cls, + "priority": mock_priority_queue_cls, + } + ) + + result_queue = factory.process_queue(input_config) + + assert result_queue == mock_queue + + mock_queue_cls.assert_called_once_with(maxsize=0) + + def test_set_type(self, monkeypatch): + """Tests creating a queue with a specified type""" + input_config = { + 'type': 'priority' + } + + mock_queue_cls = MagicMock() + mock_queue = MagicMock() + mock_queue_cls.return_value = mock_queue + + mock_priority_queue_cls = MagicMock() + mock_priority_queue = MagicMock() + mock_priority_queue_cls.return_value = mock_priority_queue + + monkeypatch.setattr( + "kytos.core.buffers.factory.queue_classes", + { + "queue": mock_queue_cls, + "priority": mock_priority_queue_cls, + } + ) + + result_queue = factory.process_queue(input_config) + + assert result_queue == mock_priority_queue + + mock_priority_queue_cls.assert_called_once_with(maxsize=0) + + def test_set_size(self, monkeypatch): + """Tests creating a queue with a given maxsize""" + maxsize = 39 + input_config = { + 'maxsize': maxsize, + } + + mock_queue_cls = MagicMock() + mock_queue = MagicMock() + mock_queue_cls.return_value = mock_queue + + mock_priority_queue_cls = MagicMock() + mock_priority_queue = MagicMock() + mock_priority_queue_cls.return_value = mock_priority_queue + + monkeypatch.setattr( + "kytos.core.buffers.factory.queue_classes", + { + "queue": mock_queue_cls, + "priority": mock_priority_queue_cls, + } + ) + + result_queue = factory.process_queue(input_config) + + assert result_queue == mock_queue + + mock_queue_cls.assert_called_once_with(maxsize=maxsize) + + def test_set_size_threadpool(self, monkeypatch): + """Tests creating a queue with a threadpool determined maxsize""" + maxsize = 'threadpool_test' + expected_size = 39 + input_config = { + 'maxsize': maxsize, + } + + mock_queue_cls = MagicMock() + mock_queue = MagicMock() + mock_queue_cls.return_value = mock_queue + + mock_priority_queue_cls = MagicMock() + mock_priority_queue = MagicMock() + mock_priority_queue_cls.return_value = mock_priority_queue + + mock_get_threadpool_max = MagicMock() + mock_get_threadpool_max.return_value = { + 'test': expected_size, + } + + monkeypatch.setattr( + "kytos.core.buffers.factory.queue_classes", + { + "queue": mock_queue_cls, + "priority": mock_priority_queue_cls, + } + ) + + monkeypatch.setattr( + "kytos.core.buffers.factory.get_thread_pool_max_workers", + mock_get_threadpool_max + ) + + result_queue = factory.process_queue(input_config) + + assert result_queue == mock_queue + + mock_queue_cls.assert_called_once_with(maxsize=expected_size) + + def test_set_size_bad_threadpool(self, monkeypatch): + """Tests creating a queue with a non existant threadpool for maxsize""" + maxsize = 'threadpool_test2' + not_expected_size = 39 + input_config = { + 'maxsize': maxsize, + } + + mock_queue_cls = MagicMock() + mock_queue = MagicMock() + mock_queue_cls.return_value = mock_queue + + mock_priority_queue_cls = MagicMock() + mock_priority_queue = MagicMock() + mock_priority_queue_cls.return_value = mock_priority_queue + + mock_get_threadpool_max = MagicMock() + mock_get_threadpool_max.return_value = { + 'test': not_expected_size, + } + + monkeypatch.setattr( + "kytos.core.buffers.factory.queue_classes", + { + "queue": mock_queue_cls, + "priority": mock_priority_queue_cls, + } + ) + + monkeypatch.setattr( + "kytos.core.buffers.factory.get_thread_pool_max_workers", + mock_get_threadpool_max + ) + + result_queue = factory.process_queue(input_config) + + assert result_queue == mock_queue + + mock_queue_cls.assert_called_once_with(maxsize=0) + + def test_set_size_bad_str(self, monkeypatch): + """Tests creating a queue with an invalid string""" + maxsize = 'bad string' + input_config = { + 'maxsize': maxsize, + } + + mock_queue_cls = MagicMock() + mock_queue = MagicMock() + mock_queue_cls.return_value = mock_queue + + mock_priority_queue_cls = MagicMock() + mock_priority_queue = MagicMock() + mock_priority_queue_cls.return_value = mock_priority_queue + + mock_get_threadpool_max = MagicMock() + mock_get_threadpool_max.return_value = {} + + monkeypatch.setattr( + "kytos.core.buffers.factory.queue_classes", + { + "queue": mock_queue_cls, + "priority": mock_priority_queue_cls, + } + ) + + monkeypatch.setattr( + "kytos.core.buffers.factory.get_thread_pool_max_workers", + mock_get_threadpool_max + ) + + with pytest.raises(TypeError): + factory.process_queue(input_config)