Skip to content

Commit

Permalink
feat: add blob tx listener (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
hal3e authored Aug 10, 2024
1 parent d582de1 commit 1daed88
Show file tree
Hide file tree
Showing 39 changed files with 1,094 additions and 394 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions committer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub struct App {
/// How often to check the latest fuel block
#[serde(deserialize_with = "human_readable_duration")]
pub block_check_interval: Duration,
/// Number of L1 blocks that need to pass to accept the tx as finalized
pub num_blocks_to_finalize_tx: u64,
}

fn human_readable_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error>
Expand Down
9 changes: 7 additions & 2 deletions committer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ async fn main() -> Result<()> {
listener_handle,
];

// If the blob pool wallet key is set, we need to start the state committer and state importer
// If the blob pool wallet key is set, we need to start
// the state committer, state importer and state listener
if config.eth.blob_pool_wallet_key.is_some() {
let state_committer_handle = setup::state_committer(
ethereum_rpc,
ethereum_rpc.clone(),
storage.clone(),
&metrics_registry,
cancel_token.clone(),
Expand All @@ -88,8 +89,12 @@ async fn main() -> Result<()> {
&config,
);

let state_listener_handle =
setup::state_listener(ethereum_rpc, storage.clone(), cancel_token.clone(), &config);

handles.push(state_committer_handle);
handles.push(state_importer_handle);
handles.push(state_listener_handle);
}

launch_api_server(
Expand Down
17 changes: 17 additions & 0 deletions committer/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,23 @@ pub fn state_importer(
)
}

pub fn state_listener(
l1: L1,
storage: impl Storage + 'static,
cancel_token: CancellationToken,
config: &config::Config,
) -> tokio::task::JoinHandle<()> {
let state_listener =
services::StateListener::new(l1, storage, config.app.num_blocks_to_finalize_tx);

schedule_polling(
config.app.block_check_interval,
state_listener,
"State Listener",
cancel_token,
)
}

pub async fn l1_adapter(
config: &config::Config,
internal_config: &config::Internal,
Expand Down
1 change: 1 addition & 0 deletions configurations/development/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ block_producer_public_key = "0x73dc6cc8cc0041e4924954b35a71a22ccb520664c522198a6
port = 8080
host = "0.0.0.0"
block_check_interval = "1s"
num_blocks_to_finalize_tx = 12

[app.db]
host = "localhost"
Expand Down
9 changes: 8 additions & 1 deletion packages/eth/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ethers::types::U256;
use futures::{stream::TryStreamExt, Stream};
use ports::{
l1::{Api, Contract, EventStreamer, Result},
types::{FuelBlockCommittedOnL1, L1Height, ValidatedFuelBlock},
types::{FuelBlockCommittedOnL1, L1Height, TransactionResponse, ValidatedFuelBlock},
};
use websocket::EthEventStreamer;

Expand Down Expand Up @@ -51,6 +51,13 @@ impl Api for WebsocketClient {

Ok(height)
}

async fn get_transaction_response(
&self,
tx_hash: [u8; 32],
) -> Result<Option<TransactionResponse>> {
Ok(self.get_transaction_response(tx_hash).await?)
}
}

#[async_trait::async_trait]
Expand Down
9 changes: 8 additions & 1 deletion packages/eth/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use ::metrics::{prometheus::core::Collector, HealthChecker, RegistersMetrics};
use ethers::types::{Address, Chain};
use ports::{
l1::Result,
types::{ValidatedFuelBlock, U256},
types::{TransactionResponse, ValidatedFuelBlock, U256},
};
use std::num::NonZeroU32;
use url::Url;
Expand Down Expand Up @@ -66,6 +66,13 @@ impl WebsocketClient {
Ok(self.inner.get_block_number().await?)
}

pub(crate) async fn get_transaction_response(
&self,
tx_hash: [u8; 32],
) -> Result<Option<TransactionResponse>> {
Ok(self.inner.get_transaction_response(tx_hash).await?)
}

pub(crate) async fn balance(&self) -> Result<U256> {
Ok(self.inner.balance().await?)
}
Expand Down
Loading

0 comments on commit 1daed88

Please sign in to comment.