Skip to content

Commit

Permalink
fix(server): exit server instance if any of the sub-tasks exited (#1612)
Browse files Browse the repository at this point in the history
Replaced FutureUnordered with futures::select_all, and put all sub-tasks
into individual tokio tasks.
  • Loading branch information
zonyitoo committed Aug 14, 2024
1 parent d67908f commit 6ffeda5
Showing 1 changed file with 42 additions and 24 deletions.
66 changes: 42 additions & 24 deletions crates/shadowsocks-service/src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

use std::{
collections::HashMap,
future::Future,
io::{self, ErrorKind},
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use futures::{future, ready};
use log::{error, trace};
use shadowsocks::{
config::{ManagerAddr, ServerConfig},
Expand All @@ -16,7 +19,7 @@ use shadowsocks::{
plugin::{Plugin, PluginMode},
ManagerClient,
};
use tokio::time;
use tokio::{task::JoinHandle, time};

use crate::{acl::AccessControl, config::SecurityConfig, net::FlowStat};

Expand Down Expand Up @@ -159,6 +162,27 @@ impl ServerBuilder {
}
}

struct ServerHandle(JoinHandle<io::Result<()>>);

impl Drop for ServerHandle {
#[inline]
fn drop(&mut self) {
self.0.abort();
}
}

impl Future for ServerHandle {
type Output = io::Result<()>;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.0).poll(cx)) {
Ok(res) => res.into(),
Err(err) => Err(io::Error::new(ErrorKind::Other, err)).into(),
}
}
}

/// Shadowsocks Server instance
pub struct Server {
context: Arc<ServiceContext>,
Expand Down Expand Up @@ -187,36 +211,33 @@ impl Server {

/// Start serving
pub async fn run(self) -> io::Result<()> {
let vfut = FuturesUnordered::new();
let mut vfut = Vec::new();

if let Some(plugin) = self.plugin {
vfut.push(
async move {
match plugin.join().await {
Ok(status) => {
error!("plugin exited with status: {}", status);
Ok(())
}
Err(err) => {
error!("plugin exited with error: {}", err);
Err(err)
}
vfut.push(ServerHandle(tokio::spawn(async move {
match plugin.join().await {
Ok(status) => {
error!("plugin exited with status: {}", status);
Ok(())
}
Err(err) => {
error!("plugin exited with error: {}", err);
Err(err)
}
}
.boxed(),
);
})));
}

if let Some(tcp_server) = self.tcp_server {
vfut.push(tcp_server.run().boxed());
vfut.push(ServerHandle(tokio::spawn(tcp_server.run())));
}

if let Some(udp_server) = self.udp_server {
vfut.push(udp_server.run().boxed())
vfut.push(ServerHandle(tokio::spawn(udp_server.run())));
}

if let Some(manager_addr) = self.manager_addr {
let manager_fut = async move {
vfut.push(ServerHandle(tokio::spawn(async move {
loop {
match ManagerClient::connect(
self.context.context_ref(),
Expand Down Expand Up @@ -251,13 +272,10 @@ impl Server {
// Report every 10 seconds
time::sleep(Duration::from_secs(10)).await;
}
}
.boxed();
vfut.push(manager_fut);
})));
}

let (res, _) = vfut.into_future().await;
if let Some(Err(err)) = res {
if let (Err(err), ..) = future::select_all(vfut).await {
error!("servers exited with error: {}", err);
}

Expand Down

0 comments on commit 6ffeda5

Please sign in to comment.