From 83d538b9191213055c8cf7ce8c7f6c67a64db5b6 Mon Sep 17 00:00:00 2001 From: Petar Vujovic Date: Tue, 17 Sep 2024 11:52:37 +0200 Subject: [PATCH] feat(core,host): initial aggregation API --- core/src/interfaces.rs | 84 +++++++++++- host/src/server/api/mod.rs | 1 + host/src/server/api/v2/mod.rs | 2 +- host/src/server/api/v2/proof/mod.rs | 8 +- host/src/server/api/v3/mod.rs | 166 ++++++++++++++++++++++++ host/src/server/api/v3/proof/cancel.rs | 77 +++++++++++ host/src/server/api/v3/proof/mod.rs | 172 +++++++++++++++++++++++++ 7 files changed, 498 insertions(+), 12 deletions(-) create mode 100644 host/src/server/api/v3/mod.rs create mode 100644 host/src/server/api/v3/proof/cancel.rs create mode 100644 host/src/server/api/v3/proof/mod.rs diff --git a/core/src/interfaces.rs b/core/src/interfaces.rs index 962baf1d..f89f5247 100644 --- a/core/src/interfaces.rs +++ b/core/src/interfaces.rs @@ -442,14 +442,84 @@ impl TryFrom for ProofRequest { } } -#[serde_as] -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Default, Clone, Serialize, Deserialize, Debug, ToSchema)] +#[serde(default)] /// A request for proof aggregation of multiple proofs. pub struct AggregationRequest { - /// All the proofs to verify - pub proofs: Vec, + /// The block numbers and l1 inclusion block numbers for the blocks to aggregate proofs for. + pub block_numbers: Vec<(u64, u64)>, + /// The network to generate the proof for. + pub network: Option, + /// The L1 network to generate the proof for. + pub l1_network: Option, + // Graffiti. + pub graffiti: Option, + /// The protocol instance data. + pub prover: Option, /// The proof type. - pub proof_type: ProofType, - /// Additional prover params. - pub prover_args: HashMap, + pub proof_type: Option, + /// Blob proof type. + pub blob_proof_type: Option, + #[serde(flatten)] + /// Any additional prover params in JSON format. + pub prover_args: ProverSpecificOpts, +} + +impl AggregationRequest { + /// Merge proof request options into aggregation request options. + pub fn merge(&mut self, opts: &ProofRequestOpt) -> RaikoResult<()> { + let this = serde_json::to_value(&self)?; + let mut opts = serde_json::to_value(opts)?; + merge(&mut opts, &this); + *self = serde_json::from_value(opts)?; + Ok(()) + } +} + +impl From for Vec { + fn from(value: AggregationRequest) -> Self { + value + .block_numbers + .iter() + .map( + |&(block_number, l1_inclusion_block_number)| ProofRequestOpt { + block_number: Some(block_number), + l1_inclusion_block_number: Some(l1_inclusion_block_number), + network: value.network.clone(), + l1_network: value.l1_network.clone(), + graffiti: value.graffiti.clone(), + prover: value.prover.clone(), + proof_type: value.proof_type.clone(), + blob_proof_type: value.blob_proof_type.clone(), + prover_args: value.prover_args.clone(), + }, + ) + .collect() + } +} + +impl From for AggregationRequest { + fn from(value: ProofRequestOpt) -> Self { + let block_numbers = if let Some(block_number) = value.block_number { + vec![( + block_number, + value + .l1_inclusion_block_number + .unwrap_or_else(|| block_number - 1), + )] + } else { + vec![] + }; + + Self { + block_numbers, + network: value.network, + l1_network: value.l1_network, + graffiti: value.graffiti, + prover: value.prover, + proof_type: value.proof_type, + blob_proof_type: value.blob_proof_type, + prover_args: value.prover_args, + } + } } diff --git a/host/src/server/api/mod.rs b/host/src/server/api/mod.rs index 4aa8e098..7558b81e 100644 --- a/host/src/server/api/mod.rs +++ b/host/src/server/api/mod.rs @@ -18,6 +18,7 @@ use crate::ProverState; pub mod v1; pub mod v2; +pub mod v3; pub fn create_router(concurrency_limit: usize, jwt_secret: Option<&str>) -> Router { let cors = CorsLayer::new() diff --git a/host/src/server/api/v2/mod.rs b/host/src/server/api/v2/mod.rs index 7c32b4ff..109e362d 100644 --- a/host/src/server/api/v2/mod.rs +++ b/host/src/server/api/v2/mod.rs @@ -11,7 +11,7 @@ use crate::{ ProverState, }; -mod proof; +pub mod proof; #[derive(OpenApi)] #[openapi( diff --git a/host/src/server/api/v2/proof/mod.rs b/host/src/server/api/v2/proof/mod.rs index ce089375..b02079c3 100644 --- a/host/src/server/api/v2/proof/mod.rs +++ b/host/src/server/api/v2/proof/mod.rs @@ -11,10 +11,10 @@ use crate::{ Message, ProverState, }; -mod cancel; -mod list; -mod prune; -mod report; +pub mod cancel; +pub mod list; +pub mod prune; +pub mod report; #[utoipa::path(post, path = "/proof", tag = "Proving", diff --git a/host/src/server/api/v3/mod.rs b/host/src/server/api/v3/mod.rs new file mode 100644 index 00000000..ffcfff1c --- /dev/null +++ b/host/src/server/api/v3/mod.rs @@ -0,0 +1,166 @@ +use axum::{response::IntoResponse, Json, Router}; +use raiko_lib::prover::Proof; +use raiko_tasks::TaskStatus; +use serde::{Deserialize, Serialize}; +use utoipa::{OpenApi, ToSchema}; +use utoipa_scalar::{Scalar, Servable}; +use utoipa_swagger_ui::SwaggerUi; + +use crate::{ + server::api::v1::{self, GuestOutputDoc}, + ProverState, +}; + +mod proof; + +#[derive(OpenApi)] +#[openapi( + info( + title = "Raiko Proverd Server API", + version = "3.0", + description = "Raiko Proverd Server API", + contact( + name = "API Support", + url = "https://community.taiko.xyz", + email = "info@taiko.xyz", + ), + license( + name = "MIT", + url = "https://github.com/taikoxyz/raiko/blob/main/LICENSE" + ), + ), + components( + schemas( + raiko_core::interfaces::ProofRequestOpt, + raiko_core::interfaces::ProverSpecificOpts, + crate::interfaces::HostError, + GuestOutputDoc, + ProofResponse, + TaskStatus, + CancelStatus, + PruneStatus, + Proof, + Status, + ) + ), + tags( + (name = "Proving", description = "Routes that handle proving requests"), + (name = "Health", description = "Routes that report the server health status"), + (name = "Metrics", description = "Routes that give detailed insight into the server") + ) +)] +/// The root API struct which is generated from the `OpenApi` derive macro. +pub struct Docs; + +#[derive(Debug, Deserialize, Serialize, ToSchema)] +#[serde(untagged)] +pub enum ProofResponse { + Status { + /// The status of the submitted task. + status: TaskStatus, + }, + Proof { + /// The proof. + proof: Proof, + }, +} + +#[derive(Debug, Deserialize, Serialize, ToSchema)] +#[serde(tag = "status", rename_all = "lowercase")] +pub enum Status { + Ok { data: ProofResponse }, + Error { error: String, message: String }, +} + +impl From> for Status { + fn from(proof: Vec) -> Self { + Self::Ok { + data: ProofResponse::Proof { + proof: serde_json::from_slice(&proof).unwrap_or_default(), + }, + } + } +} + +impl From for Status { + fn from(proof: Proof) -> Self { + Self::Ok { + data: ProofResponse::Proof { proof }, + } + } +} + +impl From for Status { + fn from(status: TaskStatus) -> Self { + Self::Ok { + data: ProofResponse::Status { status }, + } + } +} + +impl IntoResponse for Status { + fn into_response(self) -> axum::response::Response { + Json(serde_json::to_value(self).unwrap()).into_response() + } +} + +#[derive(Debug, Deserialize, Serialize, ToSchema)] +#[serde(tag = "status", rename_all = "lowercase")] +/// Status of cancellation request. +/// Can be `ok` for a successful cancellation or `error` with message and error type for errors. +pub enum CancelStatus { + /// Cancellation was successful. + Ok, + /// Cancellation failed. + Error { error: String, message: String }, +} + +impl IntoResponse for CancelStatus { + fn into_response(self) -> axum::response::Response { + Json(serde_json::to_value(self).unwrap()).into_response() + } +} + +#[derive(Debug, Serialize, ToSchema, Deserialize)] +#[serde(tag = "status", rename_all = "lowercase")] +/// Status of prune request. +/// Can be `ok` for a successful prune or `error` with message and error type for errors. +pub enum PruneStatus { + /// Prune was successful. + Ok, + /// Prune failed. + Error { error: String, message: String }, +} + +impl IntoResponse for PruneStatus { + fn into_response(self) -> axum::response::Response { + Json(serde_json::to_value(self).unwrap()).into_response() + } +} + +#[must_use] +pub fn create_docs() -> utoipa::openapi::OpenApi { + [ + v1::health::create_docs(), + v1::metrics::create_docs(), + proof::create_docs(), + ] + .into_iter() + .fold(Docs::openapi(), |mut doc, sub_doc| { + doc.merge(sub_doc); + doc + }) +} + +pub fn create_router() -> Router { + let docs = create_docs(); + + Router::new() + // Only add the concurrency limit to the proof route. We want to still be able to call + // healthchecks and metrics to have insight into the system. + .nest("/proof", proof::create_router()) + .nest("/health", v1::health::create_router()) + .nest("/metrics", v1::metrics::create_router()) + .merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", docs.clone())) + .merge(Scalar::with_url("/scalar", docs)) +} diff --git a/host/src/server/api/v3/proof/cancel.rs b/host/src/server/api/v3/proof/cancel.rs new file mode 100644 index 00000000..7f771616 --- /dev/null +++ b/host/src/server/api/v3/proof/cancel.rs @@ -0,0 +1,77 @@ +use axum::{debug_handler, extract::State, routing::post, Json, Router}; +use raiko_core::{ + interfaces::{AggregationRequest, ProofRequest, ProofRequestOpt}, + provider::get_task_data, +}; +use raiko_tasks::{TaskDescriptor, TaskManager, TaskStatus}; +use serde_json::Value; +use utoipa::OpenApi; + +use crate::{interfaces::HostResult, server::api::v2::CancelStatus, Message, ProverState}; + +#[utoipa::path(post, path = "/proof/cancel", + tag = "Proving", + request_body = ProofRequestOpt, + responses ( + (status = 200, description = "Successfully cancelled proof task", body = CancelStatus) + ) +)] +#[debug_handler(state = ProverState)] +/// Cancel a proof task with requested config. +/// +/// Accepts a proof request and cancels a proving task with the specified guest prover. +/// The guest provers currently available are: +/// - native - constructs a block and checks for equality +/// - sgx - uses the sgx environment to construct a block and produce proof of execution +/// - sp1 - uses the sp1 prover +/// - risc0 - uses the risc0 prover +async fn cancel_handler( + State(prover_state): State, + Json(aggregation_request): Json, +) -> HostResult { + // Override the existing proof request config from the config file and command line + // options with the request from the client. + aggregation_request.merge(&prover_state.request_config()); + + let proof_request_opts: Vec = aggregation_request.into(); + + for opt in proof_request_opts { + let proof_request = ProofRequest::try_from(opt)?; + + let (chain_id, block_hash) = get_task_data( + &proof_request.network, + proof_request.block_number, + &prover_state.chain_specs, + ) + .await?; + + let key = TaskDescriptor::from(( + chain_id, + block_hash, + proof_request.proof_type, + proof_request.prover.clone().to_string(), + )); + + prover_state.task_channel.try_send(Message::from(&key))?; + + let mut manager = prover_state.task_manager(); + + manager + .update_task_progress(key, TaskStatus::Cancelled, None) + .await?; + } + + Ok(CancelStatus::Ok) +} + +#[derive(OpenApi)] +#[openapi(paths(cancel_handler))] +struct Docs; + +pub fn create_docs() -> utoipa::openapi::OpenApi { + Docs::openapi() +} + +pub fn create_router() -> Router { + Router::new().route("/", post(cancel_handler)) +} diff --git a/host/src/server/api/v3/proof/mod.rs b/host/src/server/api/v3/proof/mod.rs new file mode 100644 index 00000000..72c5d1a5 --- /dev/null +++ b/host/src/server/api/v3/proof/mod.rs @@ -0,0 +1,172 @@ +use std::str::FromStr; + +use axum::{debug_handler, extract::State, routing::post, Json, Router}; +use raiko_core::{ + interfaces::{AggregationRequest, ProofRequest, ProofRequestOpt, ProofType}, + provider::get_task_data, +}; +use raiko_lib::input::{AggregationGuestInput, AggregationGuestOutput}; +use raiko_tasks::{TaskDescriptor, TaskManager, TaskStatus}; +use reth_primitives::B256; +use serde_json::Value; +use utoipa::OpenApi; + +use crate::{ + interfaces::HostResult, + metrics::{inc_current_req, inc_guest_req_count, inc_host_req_count}, + server::api::v2::{self, Status}, + Message, ProverState, +}; + +mod cancel; + +#[utoipa::path(post, path = "/proof", + tag = "Proving", + request_body = ProofRequestOpt, + responses ( + (status = 200, description = "Successfully submitted proof task, queried tasks in progress or retrieved proof.", body = Status) + ) +)] +#[debug_handler(state = ProverState)] +/// Submit a proof aggregation task with requested config, get task status or get proof value. +/// +/// Accepts a proof request and creates a proving task with the specified guest prover. +/// The guest provers currently available are: +/// - native - constructs a block and checks for equality +/// - sgx - uses the sgx environment to construct a block and produce proof of execution +/// - sp1 - uses the sp1 prover +/// - risc0 - uses the risc0 prover +async fn proof_handler( + State(prover_state): State, + Json(aggregation_request): Json, +) -> HostResult { + inc_current_req(); + // Override the existing proof request config from the config file and command line + // options with the request from the client. + aggregation_request.merge(&prover_state.request_config()); + + let mut tasks = Vec::with_capacity(aggregation_request.block_numbers.len()); + + let proof_request_opts: Vec = aggregation_request.into(); + + // Construct the actual proof request from the available configs. + for proof_request_opt in proof_request_opts { + let proof_request = ProofRequest::try_from(proof_request_opt)?; + + inc_host_req_count(proof_request.block_number); + inc_guest_req_count(&proof_request.proof_type, proof_request.block_number); + + let (chain_id, blockhash) = get_task_data( + &proof_request.network, + proof_request.block_number, + &prover_state.chain_specs, + ) + .await?; + + let key = TaskDescriptor::from(( + chain_id, + blockhash, + proof_request.proof_type, + proof_request.prover.to_string(), + )); + + tasks.push(key); + } + + let mut manager = prover_state.task_manager(); + + let mut is_registered = false; + let mut is_success = true; + + let mut statuses = Vec::with_capacity(tasks.len()); + + for task in tasks { + let status = manager.get_task_proving_status(&task).await?; + + let Some((latest_status, ..)) = status.last() else { + // If there are no tasks with provided config, create a new one. + manager.enqueue_task(&task).await?; + + prover_state + .task_channel + .try_send(Message::from(&aggregation_request))?; + is_registered = true; + }; + + match latest_status { + // If task has been cancelled add it to the queue again + TaskStatus::Cancelled + | TaskStatus::Cancelled_Aborted + | TaskStatus::Cancelled_NeverStarted + | TaskStatus::CancellationInProgress => { + manager + .update_task_progress(task, TaskStatus::Registered, None) + .await?; + + prover_state + .task_channel + .try_send(Message::from(&aggregation_request))?; + + is_registered = true; + } + // If the task has succeeded, return the proof. + TaskStatus::Success => { + is_success = is_success && true; + } + // For all other statuses just return the status. + status => Ok((*status).into()), + } + } + + if is_registered { + Ok(TaskStatus::Registered.into()) + } else if is_success { + // TODO:(petar) aggregate the proofs and return the result without blocking + let mut proofs = Vec::with_capacity(tasks.len()); + for task in tasks { + let raw_proof = manager.get_task_proof(&task).await?; + let proof = serde_json::from_slice(&raw_proof)?; + proofs.push(proof); + } + + let proof_type = ProofType::from_str(aggregation_request.proof_type)?; + let input = AggregationGuestInput { proofs }; + let output = AggregationGuestOutput { hash: B256::ZERO }; + let config = serde_json::to_value(aggregation_request)?; + + let proof = proof_type + .aggregate_proofs(input, &output, &config, manager) + .await?; + + Ok(proof.into()) + } else { + Ok(TaskStatus::WorkInProgress.into()) + } +} + +#[derive(OpenApi)] +#[openapi(paths(proof_handler))] +struct Docs; + +pub fn create_docs() -> utoipa::openapi::OpenApi { + [ + cancel::create_docs(), + v2::proof::report::create_docs(), + v2::proof::list::create_docs(), + v2::proof::prune::create_docs(), + ] + .into_iter() + .fold(Docs::openapi(), |mut docs, curr| { + docs.merge(curr); + docs + }) +} + +pub fn create_router() -> Router { + Router::new() + .route("/", post(proof_handler)) + .nest("/cancel", cancel::create_router()) + .nest("/report", v2::proof::report::create_router()) + .nest("/list", v2::proof::list::create_router()) + .nest("/prune", v2::proof::prune::create_router()) +}