Skip to content

Commit

Permalink
Merge pull request #438 from kytos-ng/feat/event_buffer_config
Browse files Browse the repository at this point in the history
Add in configuration for event buffers
  • Loading branch information
Ktmi authored Jan 24, 2024
2 parents 2482a2f + 6e2fab7 commit 9b5c3c9
Show file tree
Hide file tree
Showing 9 changed files with 608 additions and 92 deletions.
4 changes: 4 additions & 0 deletions kytos/core/buffers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Module for kytos buffers"""

from .buffers import KytosEventBuffer
from .manager import KytosBuffers
65 changes: 3 additions & 62 deletions kytos/core/buffers.py → kytos/core/buffers/buffers.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
"""Kytos Buffer Classes, based on Python Queue."""
import logging

from janus import PriorityQueue, Queue

from kytos.core.events import KytosEvent
from kytos.core.helpers import get_thread_pool_max_workers

__all__ = ('KytosBuffers', )
from janus import Queue

LOG = logging.getLogger(__name__)


class KytosEventBuffer:
"""KytosEventBuffer represents a queue to store a set of KytosEvents."""

def __init__(self, name, event_base_class=None, maxsize=0,
queue_cls=Queue):
def __init__(self, name, queue: Queue = None):
"""Contructor of KytosEventBuffer receive the parameters below.
Args:
Expand All @@ -25,8 +19,7 @@ def __init__(self, name, event_base_class=None, maxsize=0,
queue_cls (class): queue class from janus
"""
self.name = name
self._event_base_class = event_base_class
self._queue = queue_cls(maxsize=maxsize)
self._queue = queue if queue is not None else Queue()
self._reject_new_events = False

def put(self, event):
Expand Down Expand Up @@ -121,55 +114,3 @@ def empty(self):
def full(self):
"""Return True if KytosEventBuffer is full of KytosEvent."""
return self._queue.sync_q.full()


class KytosBuffers:
"""Set of KytosEventBuffer used in Kytos."""

def __init__(self):
"""Build four KytosEventBuffers.
:attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
received from connection events.
:attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
received from network.
:attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with
events to be received.
:attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with
events to be sent.
:attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
sent to NApps.
"""
self._pool_max_workers = get_thread_pool_max_workers()
self.conn = KytosEventBuffer("conn")
self.raw = KytosEventBuffer("raw", maxsize=self._get_maxsize("sb"))
self.msg_in = KytosEventBuffer("msg_in",
maxsize=self._get_maxsize("sb"),
queue_cls=PriorityQueue)
self.msg_out = KytosEventBuffer("msg_out",
maxsize=self._get_maxsize("sb"),
queue_cls=PriorityQueue)
self.app = KytosEventBuffer("app", maxsize=self._get_maxsize("app"))

def get_all_buffers(self):
"""Get all KytosEventBuffer instances."""
return [
event_buffer for event_buffer in self.__dict__.values()
if isinstance(event_buffer, KytosEventBuffer)
]

def _get_maxsize(self, queue_name):
"""Get queue maxsize if it's been set."""
return self._pool_max_workers.get(queue_name, 0)

def send_stop_signal(self):
"""Send a ``kytos/core.shutdown`` event to each buffer."""
LOG.info('Stop signal received by Kytos buffers.')
LOG.info('Sending KytosShutdownEvent to all apps.')
event = KytosEvent(name='kytos/core.shutdown')
for buffer in self.get_all_buffers():
buffer.put(event)
54 changes: 54 additions & 0 deletions kytos/core/buffers/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Utilities for composing KytosEventBuffers"""

from janus import PriorityQueue, Queue

from kytos.core.helpers import get_thread_pool_max_workers

from .buffers import KytosEventBuffer

queue_classes = {
'queue': Queue,
'priority': PriorityQueue,
}


def process_queue(config: dict) -> Queue:
"""
Create a janus queue from a given config dict
"""
queue_type = queue_classes[config.get('type', 'queue')]
queue_size = config.get('maxsize', 0)
if isinstance(queue_size, str):
if queue_size.startswith('threadpool_'):
threadpool = queue_size[len('threadpool_'):]
queue_size = get_thread_pool_max_workers().get(threadpool, 0)
else:
raise TypeError(
'Expected int or str formatted '
'as "threadpool_{threadpool_name}"'
)
return queue_type(maxsize=queue_size)


extension_processors = {}


def buffer_from_config(name: str, config: dict) -> KytosEventBuffer:
"""
Create a KytosEventBuffer from a given config dict
"""
buffer_cls = KytosEventBuffer
args = {}
# Process Queue Config
args['queue'] = process_queue(config.get('queue', {}))

buffer = buffer_cls(name, **args)

# Process Mixins
extensions: dict = config.get('extensions', [])
for extension in extensions:
extension_type = extension['type']
extension_args = extension.get('args', {})
buffer = extension_processors[extension_type](buffer, extension_args)

return buffer
80 changes: 80 additions & 0 deletions kytos/core/buffers/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Kytos Buffer Classes, based on Python Queue."""
import logging

from janus import PriorityQueue, Queue

from kytos.core.config import KytosConfig
from kytos.core.events import KytosEvent
from kytos.core.helpers import get_thread_pool_max_workers

from .buffers import KytosEventBuffer
from .factory import buffer_from_config

LOG = logging.getLogger(__name__)


class KytosBuffers:
"""Set of KytosEventBuffer used in Kytos."""

def __init__(self):
"""Build four KytosEventBuffers.
:attr:`conn`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
received from connection events.
:attr:`raw`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
received from network.
:attr:`msg_in`: :class:`~kytos.core.buffers.KytosEventBuffer` with
events to be received.
:attr:`msg_out`: :class:`~kytos.core.buffers.KytosEventBuffer` with
events to be sent.
:attr:`app`: :class:`~kytos.core.buffers.KytosEventBuffer` with events
sent to NApps.
"""

self._pool_max_workers = get_thread_pool_max_workers()

self.conn = KytosEventBuffer("conn")
self.raw = KytosEventBuffer(
"raw",
queue=Queue(maxsize=self._get_maxsize("sb"))
)
self.msg_in = KytosEventBuffer(
"msg_in",
queue=PriorityQueue(maxsize=self._get_maxsize("sb")),
)
self.msg_out = KytosEventBuffer(
"msg_out",
queue=PriorityQueue(maxsize=self._get_maxsize("sb")),
)
self.app = KytosEventBuffer(
"app",
queue=Queue(maxsize=self._get_maxsize("app")),
)

buffer_conf = KytosConfig().options['daemon'].event_buffer_conf

for name, config in buffer_conf.items():
setattr(self, name, buffer_from_config(name, config))

def get_all_buffers(self):
"""Get all KytosEventBuffer instances."""
return [
event_buffer for event_buffer in self.__dict__.values()
if isinstance(event_buffer, KytosEventBuffer)
]

def _get_maxsize(self, queue_name):
"""Get queue maxsize if it's been set."""
return self._pool_max_workers.get(queue_name, 0)

def send_stop_signal(self):
"""Send a ``kytos/core.shutdown`` event to each buffer."""
LOG.info('Stop signal received by Kytos buffers.')
LOG.info('Sending KytosShutdownEvent to all apps.')
event = KytosEvent(name='kytos/core.shutdown')
for buffer in self.get_all_buffers():
buffer.put(event)
95 changes: 66 additions & 29 deletions kytos/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,32 +125,65 @@ def parse_args(self):
'debug': False}
"""
defaults = {'pidfile': os.path.join(BASE_ENV,
'var/run/kytos/kytosd.pid'),
'workdir': os.path.join(BASE_ENV, 'var/lib/kytos'),
'napps': os.path.join(BASE_ENV, 'var/lib/kytos/napps/'),
'napps_repositories': "['https://napps.kytos.io/repo/']",
'installed_napps': os.path.join(BASE_ENV,
'var/lib/kytos/napps/',
'.installed'),
'conf': os.path.join(BASE_ENV, 'etc/kytos/kytos.conf'),
'logging': os.path.join(BASE_ENV, 'etc/kytos/logging.ini'),
'logger_decorators':
["kytos.core.logger_decorators.queue_decorator"],
'listen': '0.0.0.0',
'port': 6653,
'api_traceback_on_500': True,
'foreground': False,
'protocol_name': '',
'enable_entities_by_default': False,
'napps_pre_installed': [],
'authenticate_urls': [],
'token_expiration_minutes': 180,
'thread_pool_max_workers': {},
'database': '',
'apm': '',
'connection_timeout': 130,
'debug': False}
defaults = {
'pidfile': os.path.join(
BASE_ENV,
'var/run/kytos/kytosd.pid'
),
'workdir': os.path.join(
BASE_ENV,
'var/lib/kytos'
),
'napps': os.path.join(
BASE_ENV,
'var/lib/kytos/napps/'
),
'napps_repositories': ['https://napps.kytos.io/repo/'],
'installed_napps': os.path.join(
BASE_ENV,
'var/lib/kytos/napps/',
'.installed'
),
'conf': os.path.join(
BASE_ENV,
'etc/kytos/kytos.conf'
),
'logging': os.path.join(
BASE_ENV,
'etc/kytos/logging.ini'
),
'logger_decorators': [
"kytos.core.logger_decorators.queue_decorator",
],
'listen': '0.0.0.0',
'port': 6653,
'api_traceback_on_500': True,
'foreground': False,
'protocol_name': '',
'enable_entities_by_default': False,
'napps_pre_installed': [],
'authenticate_urls': [],
'token_expiration_minutes': 180,
'thread_pool_max_workers': {},
'database': '',
'apm': '',
'connection_timeout': 130,
'debug': False,
'event_buffer_conf': {
'msg_out': {
'queue': {
'type': 'priority',
'maxsize': 'threadpool_sb',
},
},
'msg_in': {
'queue': {
'type': 'priority',
'maxsize': 'threadpool_sb',
},
},
},
}

options, argv = self.conf_parser.parse_known_args()

Expand Down Expand Up @@ -184,7 +217,6 @@ def _parse_options(self, argv):
options, unknown = self.parser.parse_known_args(argv)
if unknown:
warnings.warn(f"Unknown arguments: {unknown}")
options.napps_repositories = json.loads(options.napps_repositories)
options.debug = options.debug in ['True', True]
options.daemon = options.daemon in ['True', True]
options.port = int(options.port)
Expand All @@ -203,11 +235,16 @@ def _parse_json(value):
return json.loads(value)
return value

options.napps_repositories = _parse_json(options.napps_repositories)
options.logger_decorators = _parse_json(options.logger_decorators)
options.napps_pre_installed = _parse_json(options.napps_pre_installed)
options.authenticate_urls = _parse_json(options.authenticate_urls)
thread_pool_max_workers = options.thread_pool_max_workers
options.thread_pool_max_workers = _parse_json(thread_pool_max_workers)
options.thread_pool_max_workers = _parse_json(
options.thread_pool_max_workers
)
options.event_buffer_conf = _parse_json(
options.event_buffer_conf
)

return options

Expand Down
22 changes: 22 additions & 0 deletions kytos/templates/kytos.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,28 @@ jwt_secret = {{ jwt_secret }}
# - db: it can be used by for higher priority db related tasks (need to be parametrized on decorator)
thread_pool_max_workers = {"sb": 256, "db": 256, "app": 512}

# Configuration for KytosEventBuffers
# Valid event buffers are "msg_in", "msg_out", "app", "conn", and "raw".
# Valid queue types are:
# - queue: Default queue class provided by janus
# - priority: PriorityQueue class provided by janus

event_buffer_conf =
{
"msg_out": {
"queue": {
"type": "priority",
"maxsize": "threadpool_sb"
}
},
"msg_in": {
"queue": {
"type": "priority",
"maxsize": "threadpool_sb"
}
}
}

# Time to expire authentication token in minutes
token_expiration_minutes = 180

Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_core/test_buffers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Test kytos buffers functionalities."""
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from unittest.mock import MagicMock

import pytest
from janus import Queue

from kytos.core.buffers import KytosBuffers, KytosEventBuffer
from kytos.core.events import KytosEvent
Expand Down Expand Up @@ -103,7 +104,7 @@ async def test_empty(self):

async def test_full(self):
"""Test full method to full."""
kytos_event_buffer = KytosEventBuffer("name", maxsize=1)
kytos_event_buffer = KytosEventBuffer("name", queue=Queue(maxsize=1))
assert not kytos_event_buffer.full()
assert kytos_event_buffer.empty()

Expand Down
Loading

0 comments on commit 9b5c3c9

Please sign in to comment.