Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: "multi" support for SocketConnector.serverToSocket #9

Merged
merged 9 commits into from
Feb 15, 2024
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## 2.1.0
- Added `multi` parameter to `SocketConnector.serverToSocket` - whether to
create new connections on the "B" side every time there is a new "A" side
connection to the bound server port. Also added `onConnect` parameter,
so that callers can be informed when every new connection is made, and
can thus take whatever action they require.
- feat: Added grace period so that SocketConnector doesn't close until both
(a) initial timeout has expired and (b) number of established connections
is zero or has dropped to zero

## 2.0.1
- Removed an unnecessary dependency

Expand Down
78 changes: 49 additions & 29 deletions lib/src/socket_connector.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import 'package:socket_connector/src/types.dart';
class SocketConnector {
static const defaultTimeout = Duration(seconds: 30);

bool gracePeriodPassed = false;

SocketConnector({
this.verbose = false,
this.logTraffic = false,
Expand All @@ -29,6 +31,7 @@ class SocketConnector {
}) {
this.logger = logger ?? stderr;
Timer(timeout, () {
gracePeriodPassed = true;
if (connections.isEmpty) {
close();
}
Expand Down Expand Up @@ -106,11 +109,12 @@ class SocketConnector {
}
} catch (e) {
thisSide.authenticated = false;
_log('Error while authenticating side ${thisSide.name} : $e');
_log('Error while authenticating side ${thisSide.name} : $e',
force: true);
}
}
if (!thisSide.authenticated) {
_log('Authentication failed on side ${thisSide.name}');
_log('Authentication failed on side ${thisSide.name}', force: true);
_destroySide(thisSide);
return;
}
Expand Down Expand Up @@ -149,7 +153,7 @@ class SocketConnector {
_log('stream.onDone on side ${side.name}');
_destroySide(side);
}, onError: (error) {
_log('stream.onError on side ${side.name}: $error');
_log('stream.onError on side ${side.name}: $error', force: true);
_destroySide(side);
});
}
Expand Down Expand Up @@ -181,8 +185,9 @@ class SocketConnector {
if (connectionToRemove != null) {
connections.remove(connectionToRemove);
_log(chalk.brightBlue('Removed connection'));
if (connections.isEmpty) {
_log(chalk.brightBlue('No established connections remain - '
if (connections.isEmpty && gracePeriodPassed) {
_log(chalk.brightBlue('No established connections remain'
' and grace period has passed - '
' will close connector'));
close();
}
Expand Down Expand Up @@ -214,8 +219,8 @@ class SocketConnector {
pendingB.clear();
}

void _log(String s) {
if (verbose) {
void _log(String s, {bool force = false}) {
if (verbose || force) {
logger.writeln('${DateTime.now()} | SocketConnector | $s');
}
}
Expand Down Expand Up @@ -331,6 +336,7 @@ class SocketConnector {
/// - Creates socket to [portB] on [addressB]
/// - Relays data between the sockets
static Future<SocketConnector> socketToSocket({
SocketConnector? connector,
required InternetAddress addressA,
required int portA,
required InternetAddress addressB,
Expand All @@ -343,7 +349,7 @@ class SocketConnector {
IOSink? logger,
}) async {
IOSink logSink = logger ?? stderr;
SocketConnector connector = SocketConnector(
connector ??= SocketConnector(
verbose: verbose,
logTraffic: logTraffic,
timeout: timeout,
Expand Down Expand Up @@ -374,22 +380,27 @@ class SocketConnector {
/// - Binds to [portA] on [addressA]
/// - Listens for a socket connection on [portA] port and joins it to
/// the 'B' side
///
/// - If [portA] is not provided then a port is chosen by the OS.
/// - [addressA] defaults to [InternetAddress.anyIPv4]
static Future<SocketConnector> serverToSocket({
/// Defaults to [InternetAddress.anyIPv4]
InternetAddress? addressA,
int portA = 0,
required InternetAddress addressB,
required int portB,
DataTransformer? transformAtoB,
DataTransformer? transformBtoA,
bool verbose = false,
bool logTraffic = false,
Duration timeout = SocketConnector.defaultTimeout,
IOSink? logger,
}) async {
/// - [multi] flag controls whether or not to allow multiple connections
/// to the bound server port [portA]
/// - [onConnect] is called when [portA] has got a new connection and a
/// corresponding outbound socket has been created to [addressB]:[portB]
static Future<SocketConnector> serverToSocket(
{
/// Defaults to [InternetAddress.anyIPv4]
InternetAddress? addressA,
int portA = 0,
required InternetAddress addressB,
required int portB,
DataTransformer? transformAtoB,
DataTransformer? transformBtoA,
bool verbose = false,
bool logTraffic = false,
Duration timeout = SocketConnector.defaultTimeout,
IOSink? logger,
bool multi = false,
Function(Socket sideA, Socket sideB)? onConnect}) async {
IOSink logSink = logger ?? stderr;
addressA ??= InternetAddress.anyIPv4;

Expand All @@ -400,18 +411,27 @@ class SocketConnector {
logger: logSink,
);

int connections = 0;
// bind to a local port for side 'A'
connector._serverSocketA = await ServerSocket.bind(addressA, portA);
// listen on the local port and connect the inbound socket
connector._serverSocketA?.listen((socket) {
Side sideA = Side(socket, true, transformer: transformAtoB);
connector._serverSocketA?.listen((sideASocket) async {
if (!multi) {
unawaited(connector._serverSocketA?.close());
}
Side sideA = Side(sideASocket, true, transformer: transformAtoB);
unawaited(connector.handleSingleConnection(sideA));
});

// connect to the side 'B' address and port
Socket sideBSocket = await Socket.connect(addressB, portB);
Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
unawaited(connector.handleSingleConnection(sideB));
if (verbose) {
logSink.writeln('Making connection ${++connections} to the "B" side');
}
// connect to the side 'B' address and port
Socket sideBSocket = await Socket.connect(addressB, portB);
Side sideB = Side(sideBSocket, false, transformer: transformBtoA);
unawaited(connector.handleSingleConnection(sideB));

onConnect?.call(sideASocket, sideBSocket);
});

return (connector);
}
Expand Down
4 changes: 2 additions & 2 deletions pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: socket_connector
description: Package allows you to join two TCP clients or two servers this package includes all the tools you need to connect and optionally print the traffic.
description: Package for joining sockets together to create socket relays.

version: 2.0.1
version: 2.1.0
repository: https://github.com/cconstab/socket_connector

environment:
Expand Down
Loading