From 91970285c6e39ec6b4d30e204e328d5f76643f6c Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Sun, 18 Aug 2024 14:10:11 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20max=5Fwait=5Ftime=20in=20http=20handler?= =?UTF-8?q?=20not=20work=20when=20result=20set=20is=20large=20=E2=80=A6=20?= =?UTF-8?q?(#16267)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: max_wait_time in http handler not work when result set is large and return fast. * fix --- .../src/servers/http/v1/query/page_manager.rs | 72 +++++++++---------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/src/query/service/src/servers/http/v1/query/page_manager.rs b/src/query/service/src/servers/http/v1/query/page_manager.rs index 6f6360c1d397..2fc8a7910557 100644 --- a/src/query/service/src/servers/http/v1/query/page_manager.rs +++ b/src/query/service/src/servers/http/v1/query/page_manager.rs @@ -124,75 +124,71 @@ impl PageManager { fn append_block( &mut self, - rows: &mut Vec>>, + res: &mut Vec>>, block: DataBlock, - remain_rows: usize, + remain_rows: &mut usize, remain_size: &mut usize, ) -> Result<()> { let format_settings = { let guard = self.format_settings.read(); guard.as_ref().unwrap().clone() }; - let mut iter = block_to_strings(&block, &format_settings)? - .into_iter() - .peekable(); - let chunk: Vec<_> = iter - .by_ref() - .take(remain_rows) - .take_while(|r| { - let size = row_size(r); - let ok = *remain_size > size; - if ok { - *remain_size -= size; - } - ok - }) - .collect(); - rows.extend(chunk); - self.row_buffer = iter.by_ref().collect(); + let rows = block_to_strings(&block, &format_settings)?; + let mut i = 0; + while *remain_rows > 0 && *remain_size > 0 && i < rows.len() { + let size = row_size(&rows[i]); + if *remain_size > size { + *remain_size -= size; + *remain_rows -= 1; + i += 1; + } else { + *remain_size = 0; + } + } + res.extend_from_slice(&rows[..i]); + self.row_buffer = rows[i..].iter().cloned().collect(); Ok(()) } #[async_backtrace::framed] async fn collect_new_page(&mut self, tp: &Wait) -> Result<(StringBlock, bool)> { let mut res: Vec>> = Vec::with_capacity(self.max_rows_per_page); - let mut max_size_per_page = 10 * 1024 * 1024; - while res.len() < self.max_rows_per_page { + let mut remain_size = 10 * 1024 * 1024; + let mut remain_rows = self.max_rows_per_page; + while remain_rows > 0 && remain_size > 0 { if let Some(row) = self.row_buffer.pop_front() { let size = row_size(&row); - if max_size_per_page > size { + if remain_size > size { res.push(row); - max_size_per_page -= size; - continue; + remain_size -= size; + remain_rows -= 1; + } else { + remain_size = 0; } - } - break; - } - loop { - assert!(self.max_rows_per_page >= res.len()); - let remain_rows = self.max_rows_per_page - res.len(); - if remain_rows == 0 { + } else { break; } + } + + while remain_rows > 0 && remain_size > 0 { match tp { Wait::Async => match self.block_receiver.try_recv() { Some(block) => { - self.append_block(&mut res, block, remain_rows, &mut max_size_per_page)? + self.append_block(&mut res, block, &mut remain_rows, &mut remain_size)? } None => break, }, Wait::Deadline(t) => { let now = Instant::now(); let d = *t - now; + if d.is_zero() { + // timeout() will return Ok if the future completes immediately + break; + } match tokio::time::timeout(d, self.block_receiver.recv()).await { Ok(Some(block)) => { debug!("http query got new block with {} rows", block.num_rows()); - self.append_block( - &mut res, - block, - remain_rows, - &mut max_size_per_page, - )?; + self.append_block(&mut res, block, &mut remain_rows, &mut remain_size)?; } Ok(None) => { info!("http query reach end of blocks");