From 30de9e9c6d33bfc98e86cccc04d0dbd18a101915 Mon Sep 17 00:00:00 2001 From: Andres Correa Casablanca Date: Wed, 29 May 2019 17:13:39 +0200 Subject: [PATCH] Isolate workaround nodes from experiment network Signed-off-by: Andres Correa Casablanca --- experiments/forking_simulation.py | 203 +++++++++++++++++++----------- network/nodes_hub.py | 21 ++-- 2 files changed, 142 insertions(+), 82 deletions(-) diff --git a/experiments/forking_simulation.py b/experiments/forking_simulation.py index 8160ae1..322c485 100755 --- a/experiments/forking_simulation.py +++ b/experiments/forking_simulation.py @@ -38,7 +38,10 @@ getLogger ) from math import floor -from os import environ +from os import ( + environ, + mkdir +) from os.path import ( dirname, exists as path_exists, @@ -51,6 +54,8 @@ from tempfile import mkdtemp from time import time as _time from typing import ( + Dict, + Iterable, List, Optional, Set, @@ -132,7 +137,7 @@ def __init__( # Required to interact with the network & the nodes self.loop = loop - self.nodes: List[TestNode] = [] + self.nodes: Dict[int, TestNode] = {} self.nodes_hub: Optional[NodesHub] = None self.proposer_node_ids: List[int] = [] self.validator_node_ids: List[int] = [] @@ -145,11 +150,18 @@ def run(self) -> bool: self.setup_chain() self.setup_nodes() - try: - if self.num_validator_nodes > 0: + if self.num_validator_nodes > 0: + try: self.autofinalization_workaround() + except BaseException as e: + self.logger.critical( + 'Workaround execution failure', exc_info=e + ) + return False + try: self.start_nodes() - except (OSError, AssertionError): + except (OSError, AssertionError) as e: + self.logger.critical('Unable to start nodes', exc_info=e) return False # Early shutdown self.nodes_hub = NodesHub( @@ -163,14 +175,19 @@ def run(self) -> bool: self.nodes_hub.sync_start_proxies() self.nodes_hub.sync_connect_nodes_graph(self.graph_edges) - # Notice that the validators have already loaded their wallets self.logger.info('Importing wallets') - for idx, proposer_id in enumerate(self.proposer_node_ids): - if idx > 0: - self.nodes[proposer_id].createwallet(f'n{proposer_id}') + for node_id, node in self.nodes.items(): + node.createwallet(f'n{node_id}') + tmp_wallet = node.get_wallet_rpc(f'n{node_id}') - tmp_wallet = self.nodes[proposer_id].get_wallet_rpc(f'n{proposer_id}') - tmp_wallet.importwallet(normpath(self.tmp_dir + f'/n{proposer_id}.wallet')) + if self.num_validator_nodes > 0: + tmp_wallet.importwallet( + normpath(self.tmp_dir + f'/n{node_id}.wallet') + ) + else: + tmp_wallet.importmasterkey( + regtest_mnemonics[node_id]['mnemonics'] + ) self.loop.run_until_complete(self.trigger_simulation_stop()) return True @@ -183,10 +200,20 @@ def autofinalization_workaround(self): self.logger.info('Running auto-finalization workaround') lucky_proposer_id = self.proposer_node_ids[0] - validators = [self.nodes[i] for i in self.validator_node_ids] + lucky_node_ids = [lucky_proposer_id] + self.validator_node_ids - self.start_node(lucky_proposer_id) - self.start_nodes(validators) + # We'll start nodes isolated from the experiment's network, and reload + # their wallets later once the experiment starts after the workaround. + if not path_exists(self.tmp_dir + '/workaround'): + mkdir(self.tmp_dir + '/workaround') + for node_id in lucky_node_ids: + initialize_datadir(self.tmp_dir + '/workaround', node_id) + + workaround_nodes = self.build_nodes_instances( + base_dir=self.tmp_dir + '/workaround', + node_ids=lucky_node_ids + ) + self.start_nodes(workaround_nodes) # Although we don't need to collect data during this initialization # phase, we'll connect the nodes through a NodesHub instance to ensure @@ -195,16 +222,15 @@ def autofinalization_workaround(self): tmp_hub = NodesHub( loop=self.loop, latency_policy=StaticLatencyPolicy(0), - nodes=self.nodes, + nodes=workaround_nodes, network_stats_collector=NullNetworkStatsCollector() ) - lucky_node_ids = [lucky_proposer_id] + self.validator_node_ids tmp_hub.sync_start_proxies(lucky_node_ids) dense_graph = create_simple_dense_graph(node_ids=lucky_node_ids) tmp_hub.sync_connect_nodes_graph(dense_graph) # We have to load some money into the nodes - lucky_proposer = self.nodes[lucky_proposer_id] + lucky_proposer = workaround_nodes[lucky_proposer_id] for proposer_id in self.proposer_node_ids: lucky_proposer.createwallet(f'n{proposer_id}') tmp_wallet = lucky_proposer.get_wallet_rpc(f'n{proposer_id}') @@ -212,30 +238,46 @@ def autofinalization_workaround(self): regtest_mnemonics[proposer_id]['mnemonics'] ) for validator_id in self.validator_node_ids: - self.nodes[validator_id].createwallet(f'n{validator_id}') - tmp_wallet = self.nodes[validator_id].get_wallet_rpc(f'n{validator_id}') + workaround_nodes[validator_id].createwallet(f'n{validator_id}') + tmp_wallet = workaround_nodes[validator_id].get_wallet_rpc(f'n{validator_id}') tmp_wallet.importmasterkey( regtest_mnemonics[validator_id]['mnemonics'] ) - self.loop.run_until_complete(self.ensure_autofinalization_is_off()) + self.logger.info('Imported mnemonics into workaround nodes') - # Unloading the wallets that don't belong to the lucky proposer + self.loop.run_until_complete(self.ensure_autofinalization_is_off( + workaround_nodes + )) + + # Dumping wallets to be loaded later for proposer_id in self.proposer_node_ids: tmp_wallet = lucky_proposer.get_wallet_rpc(f'n{proposer_id}') tmp_wallet.dumpwallet(normpath(self.tmp_dir + f'/n{proposer_id}.wallet')) - if proposer_id != lucky_proposer_id: - lucky_proposer.unloadwallet(f'n{proposer_id}') + lucky_proposer.unloadwallet(f'n{proposer_id}') + for validator_id in self.validator_node_ids: + tmp_wallet = workaround_nodes[validator_id].get_wallet_rpc(f'n{validator_id}') + tmp_wallet.dumpwallet(normpath(self.tmp_dir + f'/n{validator_id}.wallet')) + + self.logger.info('Dumped workaround wallets to be reused later') + + # We close all temporary connections & shut down nodes + tmp_hub.close() + self.stop_nodes(workaround_nodes) - tmp_hub.close() # We close all temporary connections + # Cleaning workaround stuff + rmtree(self.tmp_dir + '/workaround') # We recover the original topology for the full network # self.num_nodes, self.graph_edges = tmp_num_nodes, tmp_graph_edges self.logger.info('Finished auto-finalization workaround') - async def ensure_autofinalization_is_off(self): + async def ensure_autofinalization_is_off( + self, + workaround_nodes: Dict[int, TestNode] + ): for validator_id in self.validator_node_ids: - validator = self.nodes[validator_id] + validator = workaround_nodes[validator_id] tmp_wallet = validator.get_wallet_rpc(f'n{validator_id}') tmp_wallet.deposit( tmp_wallet.getnewaddress('', 'legacy'), @@ -249,7 +291,7 @@ async def ensure_autofinalization_is_off(self): # We have to wait at least for one epoch :( . await asyncio_sleep(1 + self.block_time_seconds * 50) - lucky_proposer = self.nodes[self.proposer_node_ids[0]] + lucky_proposer = workaround_nodes[self.proposer_node_ids[0]] is_autofinalization_off = False while not is_autofinalization_off: @@ -265,6 +307,8 @@ def safe_run(self, close_loop=True) -> bool: successful_run = False try: successful_run = self.run() + except BaseException as e: + self.logger.critical('The sub-experiment failed', exc_info=e) finally: self.logger.info('Releasing resources') if self.nodes_hub is not None: @@ -299,29 +343,13 @@ def cleanup_directories(self): if self.tmp_dir != '' and path_exists(self.tmp_dir): self.logger.info('Cleaning temporary directories') rmtree(self.tmp_dir) - # TODO: Remove wallet.* files too def setup_chain(self): self.logger.info('Preparing "empty" chain') for i in range(self.num_nodes): initialize_datadir(self.tmp_dir, i) - def setup_nodes(self): - if len(self.nodes) > 0: - self.logger.info('Skipping nodes setup') - return - - self.logger.info('Creating node wrappers') - - all_node_ids = set(range(self.num_nodes)) - self.proposer_node_ids = sample( - all_node_ids, self.num_proposer_nodes - ) - self.validator_node_ids = sample( - all_node_ids.difference(self.proposer_node_ids), - self.num_validator_nodes - ) - + def get_node_args(self, node_id: int) -> List[str]: # Some values are copied from test_framework.util.initialize_datadir, so # they are redundant, but it's easier to see what's going on by having # all of them together. @@ -353,36 +381,61 @@ def setup_nodes(self): for mnemonic in regtest_mnemonics ] } - }, separators=(",",":"))}''' + }, separators=(",", ":"))}''' ] relay_args = ['-proposing=0'] + node_args proposer_args = ['-proposing=1'] + node_args validator_args = ['-proposing=0', '-validating=1'] + node_args + if node_id in self.proposer_node_ids: + _node_args = proposer_args + elif node_id in self.validator_node_ids: + _node_args = validator_args + else: + _node_args = relay_args + return [ + f'-bind=127.0.0.1:{NodesHub.get_p2p_node_port(node_id)}', + f'-rpcbind=127.0.0.1:{NodesHub.get_rpc_node_port(node_id)}', + f'''-stats-log-output-file={ + self.nodes_stats_directory.joinpath(f"stats_{node_id}.csv") + }''', + f'-uacomment=simpatch{node_id}' + ] + _node_args + + def setup_nodes(self): + if len(self.nodes) > 0: + self.logger.info('Skipping nodes setup') + return + + self.logger.info('Creating node wrappers') + + all_node_ids = set(range(self.num_nodes)) + self.proposer_node_ids = sample( + all_node_ids, self.num_proposer_nodes + ) + self.validator_node_ids = sample( + all_node_ids.difference(self.proposer_node_ids), + self.num_validator_nodes + ) + if not self.nodes_stats_directory.exists(): self.nodes_stats_directory.mkdir() - def get_node_args(node_id: int) -> List[str]: - if node_id in self.proposer_node_ids: - _node_args = proposer_args - elif node_id in self.validator_node_ids: - _node_args = validator_args - else: - _node_args = relay_args - return [ - f'-bind=127.0.0.1:{NodesHub.get_p2p_node_port(node_id)}', - f'-rpcbind=127.0.0.1:{NodesHub.get_rpc_node_port(node_id)}', - f'''-stats-log-output-file={ - self.nodes_stats_directory.joinpath(f"stats_{node_id}.csv") - }''', - f'-uacomment=simpatch{node_id}' - ] + _node_args - - self.nodes = [ - TestNode( + self.nodes = self.build_nodes_instances( + base_dir=self.tmp_dir, + node_ids=range(self.num_nodes) + ) + + def build_nodes_instances( + self, + base_dir: str, + node_ids: Iterable[int] + ) -> Dict[int, TestNode]: + return { + i: TestNode( i=i, - datadir=f'{self.tmp_dir}/node{i}', - extra_args=get_node_args(i), + datadir=f'{base_dir}/node{i}', + extra_args=self.get_node_args(i), rpchost=None, timewait=60, unit_e=environ['UNIT_E'], @@ -391,8 +444,8 @@ def get_node_args(node_id: int) -> List[str]: coverage_dir=None, use_cli=False ) - for i in range(self.num_nodes) - ] + for i in node_ids + } def start_node(self, i: int): node = self.nodes[i] @@ -403,20 +456,20 @@ def start_node(self, i: int): self.stop_nodes() raise - def start_nodes(self, nodes: Optional[List[TestNode]] = None): + def start_nodes(self, nodes: Optional[Dict[int, TestNode]] = None): self.logger.info('Starting nodes') if nodes is None: nodes = self.nodes - for node_id, node in enumerate(nodes): + for node_id, node in nodes.items(): try: if not node.running: node.start() except OSError as e: self.logger.critical(f'Node {node_id} failed to start', e) raise - for node_id, node in enumerate(nodes): + for node_id, node in nodes.items(): try: node.wait_for_rpc_connection() except AssertionError as e: @@ -428,14 +481,18 @@ def start_nodes(self, nodes: Optional[List[TestNode]] = None): self.logger.info('Started nodes') - def stop_nodes(self): + def stop_nodes(self, nodes: Optional[Dict[int, TestNode]] = None): self.logger.info('Stopping nodes') - for node in self.nodes: + + if nodes is None: + nodes = self.nodes + + for node in nodes.values(): try: node.stop_node() except AssertionError: continue - for node in self.nodes: + for node in nodes.values(): node.wait_until_stopped() def define_network_topology(self): diff --git a/network/nodes_hub.py b/network/nodes_hub.py index d5724aa..2e712ac 100644 --- a/network/nodes_hub.py +++ b/network/nodes_hub.py @@ -21,6 +21,7 @@ from typing import ( Callable, Dict, + Iterable, List, Optional, Set, @@ -70,17 +71,19 @@ def __init__( self, loop: AbstractEventLoop, latency_policy: LatencyPolicy, - nodes: List[TestNode], + nodes: Union[List[TestNode], Dict[int, TestNode]], network_stats_collector: NetworkStatsCollector, host: str = '127.0.0.1' ): self.loop = loop self.latency_policy = latency_policy + + if isinstance(nodes, list): + nodes = {i: n for i, n in enumerate(nodes)} self.nodes = nodes + self.pid2node_id: Dict[int, int] = { - node.process.pid: node_id for node_id, node in enumerate(self.nodes) - # Could be that some nodes are not started - if node.process is not None + node.process.pid: node_id for node_id, node in self.nodes.items() } self.host = host @@ -101,7 +104,7 @@ def sync_start_proxies(self, node_ids: Optional[List[int]] = None): """Sync wrapper around start_proxies""" self.loop.run_until_complete(self.start_proxies(node_ids)) - async def start_proxies(self, node_ids: Optional[List[int]] = None): + async def start_proxies(self, node_ids: Optional[Iterable[int]] = None): """ This method creates (& starts) a listener proxy for each node, the connections from each proxy to the real node that they represent will be @@ -113,7 +116,7 @@ async def start_proxies(self, node_ids: Optional[List[int]] = None): logger.info('Starting node proxies') if node_ids is None: - node_ids = list(range(len(self.nodes))) + node_ids = self.nodes.keys() for node_id in node_ids: self.ports2nodes_map[self.get_p2p_node_port(node_id)] = node_id @@ -142,7 +145,7 @@ def sync_biconnect_nodes_as_linked_list(self, nodes_list=None): async def biconnect_nodes_as_linked_list(self, nodes_list=None): """Connects nodes as a linked list.""" if nodes_list is None: - nodes_list = range(len(self.nodes)) + nodes_list = list(self.nodes.keys()) if 0 == len(nodes_list): return @@ -183,7 +186,7 @@ def close(self): self.state = 'closing' logger.info('Shutting down NodesHub instance') - for node in self.nodes: + for node in self.nodes.values(): node.disconnect_p2ps() self.network_stats_collector.close() @@ -206,7 +209,7 @@ def get_p2p_node_port(node_idx): return p2p_port(node_idx) def get_p2p_proxy_port(self, node_idx): - return p2p_port(len(self.nodes) + 1 + node_idx) + return p2p_port(max(self.nodes.keys()) + 2 + node_idx) def get_proxy_address(self, node_idx): return f'{self.host}:{self.get_p2p_proxy_port(node_idx)}'