From 72399b22a9d2b8dd316e1e85be977b1e28b9f3ec Mon Sep 17 00:00:00 2001 From: zhujun98 Date: Wed, 17 Jun 2020 16:54:25 +0200 Subject: [PATCH 1/2] New ways of register/unregister and store selected source items. --- extra_foam/database/base_proxy.py | 3 ++ extra_foam/database/data_source.py | 6 ++- extra_foam/database/metadata.py | 26 ++++----- .../gui/ctrl_widgets/data_source_widget.py | 10 ++-- .../tests/test_data_source_widget.py | 40 +++++--------- extra_foam/pipeline/f_pipe.py | 53 ++++++++----------- 6 files changed, 55 insertions(+), 83 deletions(-) diff --git a/extra_foam/database/base_proxy.py b/extra_foam/database/base_proxy.py index 083e72f9e..b4ba4ab62 100644 --- a/extra_foam/database/base_proxy.py +++ b/extra_foam/database/base_proxy.py @@ -31,6 +31,9 @@ def reset(self): def pipeline(self): return self._db.pipeline() + def execute_command(self, *args, **kwargs): + return self._db.execute_command(*args, **kwargs) + @redis_except_handler def hset(self, name, key, value): """Set a key-value pair of a hash. diff --git a/extra_foam/database/data_source.py b/extra_foam/database/data_source.py index a1c7b770e..3edd2f918 100644 --- a/extra_foam/database/data_source.py +++ b/extra_foam/database/data_source.py @@ -89,7 +89,11 @@ def from_category(self, ctg): return self._categories.get(ctg, OrderedSet()) def add_item(self, *args, **kwargs): - """Add a source item to the catalog.""" + """Add a source item to the catalog. + + If the src already exists, the new item will overwrite + the old one. + """ if len(args) == 1: item = args[0] # SourceItem instance else: diff --git a/extra_foam/database/metadata.py b/extra_foam/database/metadata.py index b92076331..a09e8eb52 100644 --- a/extra_foam/database/metadata.py +++ b/extra_foam/database/metadata.py @@ -62,11 +62,7 @@ class Metadata(metaclass=MetaMetadata): FOM_FILTER_PROC = "meta:proc:fom_filter" DARK_RUN_PROC = "meta:proc:dark_run" - # The real key depends on the category of the data source. For example, - # 'XGM' has the key 'meta:sources:XGM' and 'DSSC' has the key - # 'meta:sources:DSSC'. - # The value is an unordered set for each source. - DATA_SOURCE = "meta:data_source" + DATA_SOURCE_ITEMS = "meta:data_source_items" class MetaProxy(_AbstractProxy): @@ -137,16 +133,12 @@ def add_data_source(self, item): :param tuple item: a tuple which can be used to construct a SourceItem. """ ctg, name, modules, ppt, slicer, vrange, ktype = item - key = f"{name} {ppt}" + item_key = Metadata.DATA_SOURCE_ITEMS + src = f"{name} {ppt}" + item = f"{ctg};{name};{modules};{ppt};{slicer};{vrange};{ktype}" return self._db.pipeline().execute_command( - 'HSET', key, 'category', ctg, - 'name', name, - 'modules', modules, - 'property', ppt, - 'slicer', slicer, - 'vrange', vrange, - 'ktype', ktype).execute_command( - 'PUBLISH', Metadata.DATA_SOURCE, key).execute() + 'HSET', item_key, src, item).execute_command( + 'SADD', f"{item_key}:updated", src).execute() @redis_except_handler def remove_data_source(self, src): @@ -154,9 +146,10 @@ def remove_data_source(self, src): :param str src: data source. """ + item_key = Metadata.DATA_SOURCE_ITEMS return self._db.pipeline().execute_command( - 'DEL', src).execute_command( - 'PUBLISH', Metadata.DATA_SOURCE, src).execute() + 'HDEL', item_key, src).execute_command( + 'SADD', f"{item_key}:updated", src).execute() @redis_except_handler def take_snapshot(self, name): @@ -226,6 +219,7 @@ def _read_analysis_setup(self, name=None): if name is not None: k = f"{k}:{name}" cfg[k] = self.hget_all(k) + return cfg def _write_analysis_setup(self, cfg, old, new): diff --git a/extra_foam/gui/ctrl_widgets/data_source_widget.py b/extra_foam/gui/ctrl_widgets/data_source_widget.py index 23863099e..d8170df0c 100644 --- a/extra_foam/gui/ctrl_widgets/data_source_widget.py +++ b/extra_foam/gui/ctrl_widgets/data_source_widget.py @@ -26,6 +26,7 @@ from ..misc_widgets import FColor from ..mediator import Mediator from ...database import MonProxy +from ...database import Metadata as mt from ...config import config, DataSource from ...geometries import module_indices from ...processes import list_foam_processes @@ -286,12 +287,7 @@ def setData(self, index, value, role=None) -> bool: item.setChecked(value) else: # role == Qt.EditRole - old_src_name = item.name() - old_ppt = item.ppt() item.setData(value, index.column()) - # remove registered item with the old device ID and property - self.source_item_toggled_sgn.emit( - False, f'{old_src_name} {old_ppt}') main_det = config["DETECTOR"] ctg = item.parent().name() @@ -836,11 +832,13 @@ def updateMetaData(self): except ValueError as e: logger.error(e) return False + return True def loadMetaData(self): """Override.""" - pass + selected = self._meta.execute_command( + 'SMEMBERS', mt.DATA_SOURCE_ITEMS) def updateAvailableSources(self): ret = self._mon.get_available_sources() diff --git a/extra_foam/gui/ctrl_widgets/tests/test_data_source_widget.py b/extra_foam/gui/ctrl_widgets/tests/test_data_source_widget.py index b9fcfaf1b..88897f6e7 100644 --- a/extra_foam/gui/ctrl_widgets/tests/test_data_source_widget.py +++ b/extra_foam/gui/ctrl_widgets/tests/test_data_source_widget.py @@ -212,35 +212,23 @@ def testDataSourceTreeModelPs(self, control_sources, pipeline_sources): spy = QtTest.QSignalSpy(model.source_item_toggled_sgn) # change device ID model.setData(model.index(0, 2, dssc_ctg), 'A+', Qt.EditRole) - self.assertEqual(2, len(spy)) - # check signal for deleting old source - self.assertFalse(spy[0][0]) - self.assertEqual('A a', spy[0][1]) - # check signal for adding new source - self.assertTrue(spy[1][0]) - self.assertTupleEqual(('DSSC', 'A+', '[]', 'a', '[None, None]', '', 1), spy[1][1]) + self.assertEqual(1, len(spy)) + self.assertTrue(spy[0][0]) + self.assertTupleEqual(('DSSC', 'A+', '[]', 'a', '[None, None]', '', 1), spy[0][1]) spy = QtTest.QSignalSpy(model.source_item_toggled_sgn) # change property model.setData(model.index(0, 3, dssc_ctg), 'a-', Qt.EditRole) - self.assertEqual(2, len(spy)) - # check signal for deleting old source - self.assertFalse(spy[0][0]) - self.assertEqual('A+ a', spy[0][1]) - # check signal for adding new source - self.assertTrue(spy[1][0]) - self.assertTupleEqual(('DSSC', 'A+', '[]', 'a-', '[None, None]', '', 1), spy[1][1]) + self.assertEqual(1, len(spy)) + self.assertTrue(spy[0][0]) + self.assertTupleEqual(('DSSC', 'A+', '[]', 'a-', '[None, None]', '', 1), spy[0][1]) spy = QtTest.QSignalSpy(model.source_item_toggled_sgn) # change slicer model.setData(model.index(0, 4, dssc_ctg), '::2', Qt.EditRole) - self.assertEqual(2, len(spy)) - # check signal for deleting old source - self.assertFalse(spy[0][0]) - # deleting does not check slicer - # check signal for adding new source - self.assertTrue(spy[1][0]) - self.assertTupleEqual(('DSSC', 'A+', '[]', 'a-', '[None, None, 2]', '', 1), spy[1][1]) + self.assertEqual(1, len(spy)) + self.assertTrue(spy[0][0]) + self.assertTupleEqual(('DSSC', 'A+', '[]', 'a-', '[None, None, 2]', '', 1), spy[0][1]) spy = QtTest.QSignalSpy(model.source_item_toggled_sgn) # change a DSSC source @@ -325,13 +313,9 @@ def testDataSourceTreeModelPs(self, control_sources, pipeline_sources): spy = QtTest.QSignalSpy(model.source_item_toggled_sgn) # change slicer model.setData(model.index(1, 5, xgm_ctg), '-1, 1', Qt.EditRole) - self.assertEqual(2, len(spy)) - # delete old source - self.assertFalse(spy[0][0]) - # deleting does not check range - # add new source - self.assertTrue(spy[1][0]) - self.assertTupleEqual(('XGM', 'XA', '[]', 'flux', '', '(-1.0, 1.0)', 0), spy[1][1]) + self.assertEqual(1, len(spy)) + self.assertTrue(spy[0][0]) + self.assertTupleEqual(('XGM', 'XA', '[]', 'flux', '', '(-1.0, 1.0)', 0), spy[0][1]) @patch.dict(config._data, {"PULSE_RESOLVED": False}) @patch.object(ConfigWrapper, "pipeline_sources", new_callable=PropertyMock) diff --git a/extra_foam/pipeline/f_pipe.py b/extra_foam/pipeline/f_pipe.py index fc55c2b1c..b31a831b4 100644 --- a/extra_foam/pipeline/f_pipe.py +++ b/extra_foam/pipeline/f_pipe.py @@ -18,7 +18,6 @@ from .processors.base_processor import _RedisParserMixin from ..config import config, DataSource from ..utils import profiler, run_in_thread -from ..ipc import RedisSubscriber from ..ipc import process_logger as logger from ..database import ( MetaProxy, MonProxy, SourceCatalog @@ -115,8 +114,6 @@ def put_pop(self, item): class KaraboBridge(_PipeInBase, _RedisParserMixin): """Karabo bridge client which is an input pipe.""" - _sub = RedisSubscriber(mt.DATA_SOURCE) - def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -126,22 +123,25 @@ def __init__(self, *args, **kwargs): def _update_source_items(self): """Updated requested source items.""" - sub = self._sub - while True: - msg = sub.get_message(ignore_subscribe_messages=True) - if msg is None: - break - - src = msg['data'] - item = self._meta.hget_all(src) - if item: + item_key = mt.DATA_SOURCE_ITEMS + updated_key = f"{item_key}:updated" + items, updated, _ = self._meta.pipeline().execute_command( + 'HGETALL', item_key).execute_command( + 'SMEMBERS', updated_key).execute_command( + 'DEL', updated_key).execute() + + if updated: + new_srcs = [] + for src in updated: + if src not in items: + self._catalog.remove_item(src) + logger.debug(f"Source item unregistered: {src}") + else: + new_srcs.append(src) + + for item in items.values(): # add a new source item - ctg = item['category'] - name = item['name'] - modules = item['modules'] - ppt = item['property'] - slicer = item['slicer'] - vrange = item['vrange'] + ctg, name, modules, ppt, slicer, vrange, ktype = item.split(";") self._catalog.add_item( ctg, name, @@ -150,20 +150,10 @@ def _update_source_items(self): ppt, self.str2slice(slicer) if slicer else None, self.str2tuple(vrange) if vrange else None, - int(item['ktype']) + int(ktype) ) - logger.debug(f"Source item registered: {name} {ppt} ({ctg})") - else: - # remove a source item - if src not in self._catalog: - # Raised when there were two checked items in - # the data source tree with the same "device ID" - # and "property". The item has already been - # deleted when one of them was unchecked. - logger.error("Duplicated data source items") - continue - self._catalog.remove_item(src) - logger.debug(f"Source item unregistered: {src}") + logger.debug(f"Source item registered/updated: " + f"{name} {ppt} ({ctg})") def _update_connection(self, proxy): cons = self._meta.hget_all(mt.CONNECTION) @@ -196,7 +186,6 @@ def run(self): # this cannot be in a thread since SourceCatalog is not thread-safe self._update_source_items() - if self.running and proxy.client is not None: if not self._catalog.main_detector: # skip the pipeline if the main detector is not specified From aceb800cab13d57f110b76d38475405d4d987d1d Mon Sep 17 00:00:00 2001 From: zhujun98 Date: Thu, 18 Jun 2020 09:05:03 +0200 Subject: [PATCH 2/2] Allow save/load data source tree params in analysis setup --- extra_foam/database/metadata.py | 17 ++++++++++++++--- .../gui/ctrl_widgets/data_source_widget.py | 3 +-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/extra_foam/database/metadata.py b/extra_foam/database/metadata.py index a09e8eb52..4d0569628 100644 --- a/extra_foam/database/metadata.py +++ b/extra_foam/database/metadata.py @@ -194,6 +194,8 @@ def remove_snapshot(self, name): pipe = self.pipeline() for k in Metadata.processor_keys: pipe.execute_command("DEL", f"{k}:{name}") + item_key = Metadata.DATA_SOURCE_ITEMS + pipe.execute_command("DEL", f"{item_key}:{name}") pipe.execute() @redis_except_handler @@ -204,10 +206,19 @@ def rename_snapshot(self, old, new): :param str new: new analysis setup name. """ for k in Metadata.processor_keys: + # if a 'key' does not exist in "RENAME", redis raises + # ResponseError. In practice, the 'key' could not be there due to + # various reason, e.g., a version update. try: self._db.execute_command("RENAME", f"{k}:{old}", f"{k}:{new}") except redis.ResponseError: pass + item_key = Metadata.DATA_SOURCE_ITEMS + try: + self._db.execute_command( + "RENAME", f"{item_key}:{old}", f"{item_key}:{new}") + except redis.ResponseError: + pass def _read_analysis_setup(self, name=None): """Read a analysis setup from Redis. @@ -215,11 +226,10 @@ def _read_analysis_setup(self, name=None): :param str name: analysis setup name. """ cfg = dict() - for k in Metadata.processor_keys: + for k in (*Metadata.processor_keys, Metadata.DATA_SOURCE_ITEMS): if name is not None: k = f"{k}:{name}" cfg[k] = self.hget_all(k) - return cfg def _write_analysis_setup(self, cfg, old, new): @@ -241,7 +251,8 @@ def _write_analysis_setup(self, cfg, old, new): else: k_new = k_root - if k_root in Metadata.processor_keys: + if k_root in Metadata.processor_keys or \ + k_root == Metadata.DATA_SOURCE_ITEMS: if v: self._db.hset(k_new, mapping=v) else: diff --git a/extra_foam/gui/ctrl_widgets/data_source_widget.py b/extra_foam/gui/ctrl_widgets/data_source_widget.py index d8170df0c..a13a3bc06 100644 --- a/extra_foam/gui/ctrl_widgets/data_source_widget.py +++ b/extra_foam/gui/ctrl_widgets/data_source_widget.py @@ -837,8 +837,7 @@ def updateMetaData(self): def loadMetaData(self): """Override.""" - selected = self._meta.execute_command( - 'SMEMBERS', mt.DATA_SOURCE_ITEMS) + cfg = self._meta.hget_all(mt.DATA_SOURCE_ITEMS) def updateAvailableSources(self): ret = self._mon.get_available_sources()