Skip to content

Commit

Permalink
Isolate workaround nodes from experiment network
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Correa Casablanca <[email protected]>
  • Loading branch information
Andres Correa Casablanca committed May 29, 2019
1 parent 51e3076 commit 30de9e9
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 82 deletions.
203 changes: 130 additions & 73 deletions experiments/forking_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@
getLogger
)
from math import floor
from os import environ
from os import (
environ,
mkdir
)
from os.path import (
dirname,
exists as path_exists,
Expand All @@ -51,6 +54,8 @@
from tempfile import mkdtemp
from time import time as _time
from typing import (
Dict,
Iterable,
List,
Optional,
Set,
Expand Down Expand Up @@ -132,7 +137,7 @@ def __init__(

# Required to interact with the network & the nodes
self.loop = loop
self.nodes: List[TestNode] = []
self.nodes: Dict[int, TestNode] = {}
self.nodes_hub: Optional[NodesHub] = None
self.proposer_node_ids: List[int] = []
self.validator_node_ids: List[int] = []
Expand All @@ -145,11 +150,18 @@ def run(self) -> bool:
self.setup_chain()
self.setup_nodes()

try:
if self.num_validator_nodes > 0:
if self.num_validator_nodes > 0:
try:
self.autofinalization_workaround()
except BaseException as e:
self.logger.critical(
'Workaround execution failure', exc_info=e
)
return False
try:
self.start_nodes()
except (OSError, AssertionError):
except (OSError, AssertionError) as e:
self.logger.critical('Unable to start nodes', exc_info=e)
return False # Early shutdown

self.nodes_hub = NodesHub(
Expand All @@ -163,14 +175,19 @@ def run(self) -> bool:
self.nodes_hub.sync_start_proxies()
self.nodes_hub.sync_connect_nodes_graph(self.graph_edges)

# Notice that the validators have already loaded their wallets
self.logger.info('Importing wallets')
for idx, proposer_id in enumerate(self.proposer_node_ids):
if idx > 0:
self.nodes[proposer_id].createwallet(f'n{proposer_id}')
for node_id, node in self.nodes.items():
node.createwallet(f'n{node_id}')
tmp_wallet = node.get_wallet_rpc(f'n{node_id}')

tmp_wallet = self.nodes[proposer_id].get_wallet_rpc(f'n{proposer_id}')
tmp_wallet.importwallet(normpath(self.tmp_dir + f'/n{proposer_id}.wallet'))
if self.num_validator_nodes > 0:
tmp_wallet.importwallet(
normpath(self.tmp_dir + f'/n{node_id}.wallet')
)
else:
tmp_wallet.importmasterkey(
regtest_mnemonics[node_id]['mnemonics']
)

self.loop.run_until_complete(self.trigger_simulation_stop())
return True
Expand All @@ -183,10 +200,20 @@ def autofinalization_workaround(self):
self.logger.info('Running auto-finalization workaround')

lucky_proposer_id = self.proposer_node_ids[0]
validators = [self.nodes[i] for i in self.validator_node_ids]
lucky_node_ids = [lucky_proposer_id] + self.validator_node_ids

self.start_node(lucky_proposer_id)
self.start_nodes(validators)
# We'll start nodes isolated from the experiment's network, and reload
# their wallets later once the experiment starts after the workaround.
if not path_exists(self.tmp_dir + '/workaround'):
mkdir(self.tmp_dir + '/workaround')
for node_id in lucky_node_ids:
initialize_datadir(self.tmp_dir + '/workaround', node_id)

workaround_nodes = self.build_nodes_instances(
base_dir=self.tmp_dir + '/workaround',
node_ids=lucky_node_ids
)
self.start_nodes(workaround_nodes)

# Although we don't need to collect data during this initialization
# phase, we'll connect the nodes through a NodesHub instance to ensure
Expand All @@ -195,47 +222,62 @@ def autofinalization_workaround(self):
tmp_hub = NodesHub(
loop=self.loop,
latency_policy=StaticLatencyPolicy(0),
nodes=self.nodes,
nodes=workaround_nodes,
network_stats_collector=NullNetworkStatsCollector()
)
lucky_node_ids = [lucky_proposer_id] + self.validator_node_ids
tmp_hub.sync_start_proxies(lucky_node_ids)
dense_graph = create_simple_dense_graph(node_ids=lucky_node_ids)
tmp_hub.sync_connect_nodes_graph(dense_graph)

# We have to load some money into the nodes
lucky_proposer = self.nodes[lucky_proposer_id]
lucky_proposer = workaround_nodes[lucky_proposer_id]
for proposer_id in self.proposer_node_ids:
lucky_proposer.createwallet(f'n{proposer_id}')
tmp_wallet = lucky_proposer.get_wallet_rpc(f'n{proposer_id}')
tmp_wallet.importmasterkey(
regtest_mnemonics[proposer_id]['mnemonics']
)
for validator_id in self.validator_node_ids:
self.nodes[validator_id].createwallet(f'n{validator_id}')
tmp_wallet = self.nodes[validator_id].get_wallet_rpc(f'n{validator_id}')
workaround_nodes[validator_id].createwallet(f'n{validator_id}')
tmp_wallet = workaround_nodes[validator_id].get_wallet_rpc(f'n{validator_id}')
tmp_wallet.importmasterkey(
regtest_mnemonics[validator_id]['mnemonics']
)

self.loop.run_until_complete(self.ensure_autofinalization_is_off())
self.logger.info('Imported mnemonics into workaround nodes')

# Unloading the wallets that don't belong to the lucky proposer
self.loop.run_until_complete(self.ensure_autofinalization_is_off(
workaround_nodes
))

# Dumping wallets to be loaded later
for proposer_id in self.proposer_node_ids:
tmp_wallet = lucky_proposer.get_wallet_rpc(f'n{proposer_id}')
tmp_wallet.dumpwallet(normpath(self.tmp_dir + f'/n{proposer_id}.wallet'))
if proposer_id != lucky_proposer_id:
lucky_proposer.unloadwallet(f'n{proposer_id}')
lucky_proposer.unloadwallet(f'n{proposer_id}')
for validator_id in self.validator_node_ids:
tmp_wallet = workaround_nodes[validator_id].get_wallet_rpc(f'n{validator_id}')
tmp_wallet.dumpwallet(normpath(self.tmp_dir + f'/n{validator_id}.wallet'))

self.logger.info('Dumped workaround wallets to be reused later')

# We close all temporary connections & shut down nodes
tmp_hub.close()
self.stop_nodes(workaround_nodes)

tmp_hub.close() # We close all temporary connections
# Cleaning workaround stuff
rmtree(self.tmp_dir + '/workaround')

# We recover the original topology for the full network
# self.num_nodes, self.graph_edges = tmp_num_nodes, tmp_graph_edges
self.logger.info('Finished auto-finalization workaround')

async def ensure_autofinalization_is_off(self):
async def ensure_autofinalization_is_off(
self,
workaround_nodes: Dict[int, TestNode]
):
for validator_id in self.validator_node_ids:
validator = self.nodes[validator_id]
validator = workaround_nodes[validator_id]
tmp_wallet = validator.get_wallet_rpc(f'n{validator_id}')
tmp_wallet.deposit(
tmp_wallet.getnewaddress('', 'legacy'),
Expand All @@ -249,7 +291,7 @@ async def ensure_autofinalization_is_off(self):
# We have to wait at least for one epoch :( .
await asyncio_sleep(1 + self.block_time_seconds * 50)

lucky_proposer = self.nodes[self.proposer_node_ids[0]]
lucky_proposer = workaround_nodes[self.proposer_node_ids[0]]
is_autofinalization_off = False

while not is_autofinalization_off:
Expand All @@ -265,6 +307,8 @@ def safe_run(self, close_loop=True) -> bool:
successful_run = False
try:
successful_run = self.run()
except BaseException as e:
self.logger.critical('The sub-experiment failed', exc_info=e)
finally:
self.logger.info('Releasing resources')
if self.nodes_hub is not None:
Expand Down Expand Up @@ -299,29 +343,13 @@ def cleanup_directories(self):
if self.tmp_dir != '' and path_exists(self.tmp_dir):
self.logger.info('Cleaning temporary directories')
rmtree(self.tmp_dir)
# TODO: Remove wallet.* files too

def setup_chain(self):
self.logger.info('Preparing "empty" chain')
for i in range(self.num_nodes):
initialize_datadir(self.tmp_dir, i)

def setup_nodes(self):
if len(self.nodes) > 0:
self.logger.info('Skipping nodes setup')
return

self.logger.info('Creating node wrappers')

all_node_ids = set(range(self.num_nodes))
self.proposer_node_ids = sample(
all_node_ids, self.num_proposer_nodes
)
self.validator_node_ids = sample(
all_node_ids.difference(self.proposer_node_ids),
self.num_validator_nodes
)

def get_node_args(self, node_id: int) -> List[str]:
# Some values are copied from test_framework.util.initialize_datadir, so
# they are redundant, but it's easier to see what's going on by having
# all of them together.
Expand Down Expand Up @@ -353,36 +381,61 @@ def setup_nodes(self):
for mnemonic in regtest_mnemonics
]
}
}, separators=(",",":"))}'''
}, separators=(",", ":"))}'''
]
relay_args = ['-proposing=0'] + node_args
proposer_args = ['-proposing=1'] + node_args
validator_args = ['-proposing=0', '-validating=1'] + node_args

if node_id in self.proposer_node_ids:
_node_args = proposer_args
elif node_id in self.validator_node_ids:
_node_args = validator_args
else:
_node_args = relay_args
return [
f'-bind=127.0.0.1:{NodesHub.get_p2p_node_port(node_id)}',
f'-rpcbind=127.0.0.1:{NodesHub.get_rpc_node_port(node_id)}',
f'''-stats-log-output-file={
self.nodes_stats_directory.joinpath(f"stats_{node_id}.csv")
}''',
f'-uacomment=simpatch{node_id}'
] + _node_args

def setup_nodes(self):
if len(self.nodes) > 0:
self.logger.info('Skipping nodes setup')
return

self.logger.info('Creating node wrappers')

all_node_ids = set(range(self.num_nodes))
self.proposer_node_ids = sample(
all_node_ids, self.num_proposer_nodes
)
self.validator_node_ids = sample(
all_node_ids.difference(self.proposer_node_ids),
self.num_validator_nodes
)

if not self.nodes_stats_directory.exists():
self.nodes_stats_directory.mkdir()

def get_node_args(node_id: int) -> List[str]:
if node_id in self.proposer_node_ids:
_node_args = proposer_args
elif node_id in self.validator_node_ids:
_node_args = validator_args
else:
_node_args = relay_args
return [
f'-bind=127.0.0.1:{NodesHub.get_p2p_node_port(node_id)}',
f'-rpcbind=127.0.0.1:{NodesHub.get_rpc_node_port(node_id)}',
f'''-stats-log-output-file={
self.nodes_stats_directory.joinpath(f"stats_{node_id}.csv")
}''',
f'-uacomment=simpatch{node_id}'
] + _node_args

self.nodes = [
TestNode(
self.nodes = self.build_nodes_instances(
base_dir=self.tmp_dir,
node_ids=range(self.num_nodes)
)

def build_nodes_instances(
self,
base_dir: str,
node_ids: Iterable[int]
) -> Dict[int, TestNode]:
return {
i: TestNode(
i=i,
datadir=f'{self.tmp_dir}/node{i}',
extra_args=get_node_args(i),
datadir=f'{base_dir}/node{i}',
extra_args=self.get_node_args(i),
rpchost=None,
timewait=60,
unit_e=environ['UNIT_E'],
Expand All @@ -391,8 +444,8 @@ def get_node_args(node_id: int) -> List[str]:
coverage_dir=None,
use_cli=False
)
for i in range(self.num_nodes)
]
for i in node_ids
}

def start_node(self, i: int):
node = self.nodes[i]
Expand All @@ -403,20 +456,20 @@ def start_node(self, i: int):
self.stop_nodes()
raise

def start_nodes(self, nodes: Optional[List[TestNode]] = None):
def start_nodes(self, nodes: Optional[Dict[int, TestNode]] = None):
self.logger.info('Starting nodes')

if nodes is None:
nodes = self.nodes

for node_id, node in enumerate(nodes):
for node_id, node in nodes.items():
try:
if not node.running:
node.start()
except OSError as e:
self.logger.critical(f'Node {node_id} failed to start', e)
raise
for node_id, node in enumerate(nodes):
for node_id, node in nodes.items():
try:
node.wait_for_rpc_connection()
except AssertionError as e:
Expand All @@ -428,14 +481,18 @@ def start_nodes(self, nodes: Optional[List[TestNode]] = None):

self.logger.info('Started nodes')

def stop_nodes(self):
def stop_nodes(self, nodes: Optional[Dict[int, TestNode]] = None):
self.logger.info('Stopping nodes')
for node in self.nodes:

if nodes is None:
nodes = self.nodes

for node in nodes.values():
try:
node.stop_node()
except AssertionError:
continue
for node in self.nodes:
for node in nodes.values():
node.wait_until_stopped()

def define_network_topology(self):
Expand Down
Loading

0 comments on commit 30de9e9

Please sign in to comment.