Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save data source tree params in analysis setup #229

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions extra_foam/database/base_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def reset(self):
def pipeline(self):
return self._db.pipeline()

def execute_command(self, *args, **kwargs):
return self._db.execute_command(*args, **kwargs)

@redis_except_handler
def hset(self, name, key, value):
"""Set a key-value pair of a hash.
Expand Down
6 changes: 5 additions & 1 deletion extra_foam/database/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ def from_category(self, ctg):
return self._categories.get(ctg, OrderedSet())

def add_item(self, *args, **kwargs):
"""Add a source item to the catalog."""
"""Add a source item to the catalog.

If the src already exists, the new item will overwrite
the old one.
"""
if len(args) == 1:
item = args[0] # SourceItem instance
else:
Expand Down
41 changes: 23 additions & 18 deletions extra_foam/database/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ class Metadata(metaclass=MetaMetadata):
FOM_FILTER_PROC = "meta:proc:fom_filter"
DARK_RUN_PROC = "meta:proc:dark_run"

# The real key depends on the category of the data source. For example,
# 'XGM' has the key 'meta:sources:XGM' and 'DSSC' has the key
# 'meta:sources:DSSC'.
# The value is an unordered set for each source.
DATA_SOURCE = "meta:data_source"
DATA_SOURCE_ITEMS = "meta:data_source_items"


class MetaProxy(_AbstractProxy):
Expand Down Expand Up @@ -137,26 +133,23 @@ def add_data_source(self, item):
:param tuple item: a tuple which can be used to construct a SourceItem.
"""
ctg, name, modules, ppt, slicer, vrange, ktype = item
key = f"{name} {ppt}"
item_key = Metadata.DATA_SOURCE_ITEMS
src = f"{name} {ppt}"
item = f"{ctg};{name};{modules};{ppt};{slicer};{vrange};{ktype}"
return self._db.pipeline().execute_command(
'HSET', key, 'category', ctg,
'name', name,
'modules', modules,
'property', ppt,
'slicer', slicer,
'vrange', vrange,
'ktype', ktype).execute_command(
'PUBLISH', Metadata.DATA_SOURCE, key).execute()
'HSET', item_key, src, item).execute_command(
'SADD', f"{item_key}:updated", src).execute()

@redis_except_handler
def remove_data_source(self, src):
"""Remove a data source.

:param str src: data source.
"""
item_key = Metadata.DATA_SOURCE_ITEMS
return self._db.pipeline().execute_command(
'DEL', src).execute_command(
'PUBLISH', Metadata.DATA_SOURCE, src).execute()
'HDEL', item_key, src).execute_command(
'SADD', f"{item_key}:updated", src).execute()

@redis_except_handler
def take_snapshot(self, name):
Expand Down Expand Up @@ -201,6 +194,8 @@ def remove_snapshot(self, name):
pipe = self.pipeline()
for k in Metadata.processor_keys:
pipe.execute_command("DEL", f"{k}:{name}")
item_key = Metadata.DATA_SOURCE_ITEMS
pipe.execute_command("DEL", f"{item_key}:{name}")
pipe.execute()

@redis_except_handler
Expand All @@ -211,18 +206,27 @@ def rename_snapshot(self, old, new):
:param str new: new analysis setup name.
"""
for k in Metadata.processor_keys:
# if a 'key' does not exist in "RENAME", redis raises
# ResponseError. In practice, the 'key' could not be there due to
# various reason, e.g., a version update.
try:
self._db.execute_command("RENAME", f"{k}:{old}", f"{k}:{new}")
except redis.ResponseError:
pass
item_key = Metadata.DATA_SOURCE_ITEMS
try:
self._db.execute_command(
"RENAME", f"{item_key}:{old}", f"{item_key}:{new}")
except redis.ResponseError:
pass

def _read_analysis_setup(self, name=None):
"""Read a analysis setup from Redis.

:param str name: analysis setup name.
"""
cfg = dict()
for k in Metadata.processor_keys:
for k in (*Metadata.processor_keys, Metadata.DATA_SOURCE_ITEMS):
if name is not None:
k = f"{k}:{name}"
cfg[k] = self.hget_all(k)
Expand All @@ -247,7 +251,8 @@ def _write_analysis_setup(self, cfg, old, new):
else:
k_new = k_root

if k_root in Metadata.processor_keys:
if k_root in Metadata.processor_keys or \
k_root == Metadata.DATA_SOURCE_ITEMS:
if v:
self._db.hset(k_new, mapping=v)
else:
Expand Down
9 changes: 3 additions & 6 deletions extra_foam/gui/ctrl_widgets/data_source_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from ..misc_widgets import FColor
from ..mediator import Mediator
from ...database import MonProxy
from ...database import Metadata as mt
from ...config import config, DataSource
from ...geometries import module_indices
from ...processes import list_foam_processes
Expand Down Expand Up @@ -286,12 +287,7 @@ def setData(self, index, value, role=None) -> bool:

item.setChecked(value)
else: # role == Qt.EditRole
old_src_name = item.name()
old_ppt = item.ppt()
item.setData(value, index.column())
# remove registered item with the old device ID and property
self.source_item_toggled_sgn.emit(
False, f'{old_src_name} {old_ppt}')

main_det = config["DETECTOR"]
ctg = item.parent().name()
Expand Down Expand Up @@ -836,11 +832,12 @@ def updateMetaData(self):
except ValueError as e:
logger.error(e)
return False

return True

def loadMetaData(self):
"""Override."""
pass
cfg = self._meta.hget_all(mt.DATA_SOURCE_ITEMS)

def updateAvailableSources(self):
ret = self._mon.get_available_sources()
Expand Down
40 changes: 12 additions & 28 deletions extra_foam/gui/ctrl_widgets/tests/test_data_source_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,35 +212,23 @@ def testDataSourceTreeModelPs(self, control_sources, pipeline_sources):
spy = QtTest.QSignalSpy(model.source_item_toggled_sgn)
# change device ID
model.setData(model.index(0, 2, dssc_ctg), 'A+', Qt.EditRole)
self.assertEqual(2, len(spy))
# check signal for deleting old source
self.assertFalse(spy[0][0])
self.assertEqual('A a', spy[0][1])
# check signal for adding new source
self.assertTrue(spy[1][0])
self.assertTupleEqual(('DSSC', 'A+', '[]', 'a', '[None, None]', '', 1), spy[1][1])
self.assertEqual(1, len(spy))
self.assertTrue(spy[0][0])
self.assertTupleEqual(('DSSC', 'A+', '[]', 'a', '[None, None]', '', 1), spy[0][1])

spy = QtTest.QSignalSpy(model.source_item_toggled_sgn)
# change property
model.setData(model.index(0, 3, dssc_ctg), 'a-', Qt.EditRole)
self.assertEqual(2, len(spy))
# check signal for deleting old source
self.assertFalse(spy[0][0])
self.assertEqual('A+ a', spy[0][1])
# check signal for adding new source
self.assertTrue(spy[1][0])
self.assertTupleEqual(('DSSC', 'A+', '[]', 'a-', '[None, None]', '', 1), spy[1][1])
self.assertEqual(1, len(spy))
self.assertTrue(spy[0][0])
self.assertTupleEqual(('DSSC', 'A+', '[]', 'a-', '[None, None]', '', 1), spy[0][1])

spy = QtTest.QSignalSpy(model.source_item_toggled_sgn)
# change slicer
model.setData(model.index(0, 4, dssc_ctg), '::2', Qt.EditRole)
self.assertEqual(2, len(spy))
# check signal for deleting old source
self.assertFalse(spy[0][0])
# deleting does not check slicer
# check signal for adding new source
self.assertTrue(spy[1][0])
self.assertTupleEqual(('DSSC', 'A+', '[]', 'a-', '[None, None, 2]', '', 1), spy[1][1])
self.assertEqual(1, len(spy))
self.assertTrue(spy[0][0])
self.assertTupleEqual(('DSSC', 'A+', '[]', 'a-', '[None, None, 2]', '', 1), spy[0][1])

spy = QtTest.QSignalSpy(model.source_item_toggled_sgn)
# change a DSSC source
Expand Down Expand Up @@ -325,13 +313,9 @@ def testDataSourceTreeModelPs(self, control_sources, pipeline_sources):
spy = QtTest.QSignalSpy(model.source_item_toggled_sgn)
# change slicer
model.setData(model.index(1, 5, xgm_ctg), '-1, 1', Qt.EditRole)
self.assertEqual(2, len(spy))
# delete old source
self.assertFalse(spy[0][0])
# deleting does not check range
# add new source
self.assertTrue(spy[1][0])
self.assertTupleEqual(('XGM', 'XA', '[]', 'flux', '', '(-1.0, 1.0)', 0), spy[1][1])
self.assertEqual(1, len(spy))
self.assertTrue(spy[0][0])
self.assertTupleEqual(('XGM', 'XA', '[]', 'flux', '', '(-1.0, 1.0)', 0), spy[0][1])

@patch.dict(config._data, {"PULSE_RESOLVED": False})
@patch.object(ConfigWrapper, "pipeline_sources", new_callable=PropertyMock)
Expand Down
53 changes: 21 additions & 32 deletions extra_foam/pipeline/f_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from .processors.base_processor import _RedisParserMixin
from ..config import config, DataSource
from ..utils import profiler, run_in_thread
from ..ipc import RedisSubscriber
from ..ipc import process_logger as logger
from ..database import (
MetaProxy, MonProxy, SourceCatalog
Expand Down Expand Up @@ -115,8 +114,6 @@ def put_pop(self, item):
class KaraboBridge(_PipeInBase, _RedisParserMixin):
"""Karabo bridge client which is an input pipe."""

_sub = RedisSubscriber(mt.DATA_SOURCE)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand All @@ -126,22 +123,25 @@ def __init__(self, *args, **kwargs):

def _update_source_items(self):
"""Updated requested source items."""
sub = self._sub
while True:
msg = sub.get_message(ignore_subscribe_messages=True)
if msg is None:
break

src = msg['data']
item = self._meta.hget_all(src)
if item:
item_key = mt.DATA_SOURCE_ITEMS
updated_key = f"{item_key}:updated"
items, updated, _ = self._meta.pipeline().execute_command(
'HGETALL', item_key).execute_command(
'SMEMBERS', updated_key).execute_command(
'DEL', updated_key).execute()

if updated:
new_srcs = []
for src in updated:
if src not in items:
self._catalog.remove_item(src)
logger.debug(f"Source item unregistered: {src}")
else:
new_srcs.append(src)

for item in items.values():
# add a new source item
ctg = item['category']
name = item['name']
modules = item['modules']
ppt = item['property']
slicer = item['slicer']
vrange = item['vrange']
ctg, name, modules, ppt, slicer, vrange, ktype = item.split(";")
self._catalog.add_item(
ctg,
name,
Expand All @@ -150,20 +150,10 @@ def _update_source_items(self):
ppt,
self.str2slice(slicer) if slicer else None,
self.str2tuple(vrange) if vrange else None,
int(item['ktype'])
int(ktype)
)
logger.debug(f"Source item registered: {name} {ppt} ({ctg})")
else:
# remove a source item
if src not in self._catalog:
# Raised when there were two checked items in
# the data source tree with the same "device ID"
# and "property". The item has already been
# deleted when one of them was unchecked.
logger.error("Duplicated data source items")
continue
self._catalog.remove_item(src)
logger.debug(f"Source item unregistered: {src}")
logger.debug(f"Source item registered/updated: "
f"{name} {ppt} ({ctg})")

def _update_connection(self, proxy):
cons = self._meta.hget_all(mt.CONNECTION)
Expand Down Expand Up @@ -196,7 +186,6 @@ def run(self):

# this cannot be in a thread since SourceCatalog is not thread-safe
self._update_source_items()

if self.running and proxy.client is not None:
if not self._catalog.main_detector:
# skip the pipeline if the main detector is not specified
Expand Down