diff --git a/CHANGELOG.md b/CHANGELOG.md index 5377d35..e053796 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 2.1.0 +- Added `multi` parameter to `SocketConnector.serverToSocket` - whether to + create new connections on the "B" side every time there is a new "A" side + connection to the bound server port. Also added `onConnect` parameter, + so that callers can be informed when every new connection is made, and + can thus take whatever action they require. +- feat: Added grace period so that SocketConnector doesn't close until both + (a) initial timeout has expired and (b) number of established connections + is zero or has dropped to zero + ## 2.0.1 - Removed an unnecessary dependency diff --git a/lib/src/socket_connector.dart b/lib/src/socket_connector.dart index fab09bb..927d3f8 100644 --- a/lib/src/socket_connector.dart +++ b/lib/src/socket_connector.dart @@ -21,6 +21,8 @@ import 'package:socket_connector/src/types.dart'; class SocketConnector { static const defaultTimeout = Duration(seconds: 30); + bool gracePeriodPassed = false; + SocketConnector({ this.verbose = false, this.logTraffic = false, @@ -29,6 +31,7 @@ class SocketConnector { }) { this.logger = logger ?? stderr; Timer(timeout, () { + gracePeriodPassed = true; if (connections.isEmpty) { close(); } @@ -106,11 +109,12 @@ class SocketConnector { } } catch (e) { thisSide.authenticated = false; - _log('Error while authenticating side ${thisSide.name} : $e'); + _log('Error while authenticating side ${thisSide.name} : $e', + force: true); } } if (!thisSide.authenticated) { - _log('Authentication failed on side ${thisSide.name}'); + _log('Authentication failed on side ${thisSide.name}', force: true); _destroySide(thisSide); return; } @@ -149,7 +153,7 @@ class SocketConnector { _log('stream.onDone on side ${side.name}'); _destroySide(side); }, onError: (error) { - _log('stream.onError on side ${side.name}: $error'); + _log('stream.onError on side ${side.name}: $error', force: true); _destroySide(side); }); } @@ -181,8 +185,9 @@ class SocketConnector { if (connectionToRemove != null) { connections.remove(connectionToRemove); _log(chalk.brightBlue('Removed connection')); - if (connections.isEmpty) { - _log(chalk.brightBlue('No established connections remain - ' + if (connections.isEmpty && gracePeriodPassed) { + _log(chalk.brightBlue('No established connections remain' + ' and grace period has passed - ' ' will close connector')); close(); } @@ -214,8 +219,8 @@ class SocketConnector { pendingB.clear(); } - void _log(String s) { - if (verbose) { + void _log(String s, {bool force = false}) { + if (verbose || force) { logger.writeln('${DateTime.now()} | SocketConnector | $s'); } } @@ -331,6 +336,7 @@ class SocketConnector { /// - Creates socket to [portB] on [addressB] /// - Relays data between the sockets static Future socketToSocket({ + SocketConnector? connector, required InternetAddress addressA, required int portA, required InternetAddress addressB, @@ -343,7 +349,7 @@ class SocketConnector { IOSink? logger, }) async { IOSink logSink = logger ?? stderr; - SocketConnector connector = SocketConnector( + connector ??= SocketConnector( verbose: verbose, logTraffic: logTraffic, timeout: timeout, @@ -374,22 +380,27 @@ class SocketConnector { /// - Binds to [portA] on [addressA] /// - Listens for a socket connection on [portA] port and joins it to /// the 'B' side - /// /// - If [portA] is not provided then a port is chosen by the OS. /// - [addressA] defaults to [InternetAddress.anyIPv4] - static Future serverToSocket({ - /// Defaults to [InternetAddress.anyIPv4] - InternetAddress? addressA, - int portA = 0, - required InternetAddress addressB, - required int portB, - DataTransformer? transformAtoB, - DataTransformer? transformBtoA, - bool verbose = false, - bool logTraffic = false, - Duration timeout = SocketConnector.defaultTimeout, - IOSink? logger, - }) async { + /// - [multi] flag controls whether or not to allow multiple connections + /// to the bound server port [portA] + /// - [onConnect] is called when [portA] has got a new connection and a + /// corresponding outbound socket has been created to [addressB]:[portB] + static Future serverToSocket( + { + /// Defaults to [InternetAddress.anyIPv4] + InternetAddress? addressA, + int portA = 0, + required InternetAddress addressB, + required int portB, + DataTransformer? transformAtoB, + DataTransformer? transformBtoA, + bool verbose = false, + bool logTraffic = false, + Duration timeout = SocketConnector.defaultTimeout, + IOSink? logger, + bool multi = false, + Function(Socket sideA, Socket sideB)? onConnect}) async { IOSink logSink = logger ?? stderr; addressA ??= InternetAddress.anyIPv4; @@ -400,18 +411,27 @@ class SocketConnector { logger: logSink, ); + int connections = 0; // bind to a local port for side 'A' connector._serverSocketA = await ServerSocket.bind(addressA, portA); // listen on the local port and connect the inbound socket - connector._serverSocketA?.listen((socket) { - Side sideA = Side(socket, true, transformer: transformAtoB); + connector._serverSocketA?.listen((sideASocket) async { + if (!multi) { + unawaited(connector._serverSocketA?.close()); + } + Side sideA = Side(sideASocket, true, transformer: transformAtoB); unawaited(connector.handleSingleConnection(sideA)); - }); - // connect to the side 'B' address and port - Socket sideBSocket = await Socket.connect(addressB, portB); - Side sideB = Side(sideBSocket, false, transformer: transformBtoA); - unawaited(connector.handleSingleConnection(sideB)); + if (verbose) { + logSink.writeln('Making connection ${++connections} to the "B" side'); + } + // connect to the side 'B' address and port + Socket sideBSocket = await Socket.connect(addressB, portB); + Side sideB = Side(sideBSocket, false, transformer: transformBtoA); + unawaited(connector.handleSingleConnection(sideB)); + + onConnect?.call(sideASocket, sideBSocket); + }); return (connector); } diff --git a/pubspec.yaml b/pubspec.yaml index 473c0d7..26ff2a7 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,7 +1,7 @@ name: socket_connector -description: Package allows you to join two TCP clients or two servers this package includes all the tools you need to connect and optionally print the traffic. +description: Package for joining sockets together to create socket relays. -version: 2.0.1 +version: 2.1.0 repository: https://github.com/cconstab/socket_connector environment: diff --git a/test/socket_connector_test.dart b/test/socket_connector_test.dart index 1258d65..c401ee2 100644 --- a/test/socket_connector_test.dart +++ b/test/socket_connector_test.dart @@ -67,7 +67,8 @@ void main() { }); test('Test ServerToServer', () async { - Duration timeout = Duration(milliseconds: 200); + int timeoutMs = 200; + Duration timeout = Duration(milliseconds: timeoutMs); SocketConnector connector = await SocketConnector.serverToServer( portA: 0, portB: 0, @@ -118,12 +119,13 @@ void main() { socketB.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(Duration(milliseconds: timeoutMs))); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); test('Test socketToServer', () async { + int timeoutMs = 100; // Bind to a port that SocketConnector.socketToServer can connect to ServerSocket testExternalServer = await ServerSocket.bind('127.0.0.1', 0); @@ -131,7 +133,7 @@ void main() { addressA: testExternalServer.address, portA: testExternalServer.port, verbose: false, - timeout: Duration(milliseconds: 100), + timeout: Duration(milliseconds: timeoutMs), ); expect(connector.connections.isEmpty, true); @@ -176,12 +178,13 @@ void main() { socketB.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(Duration(milliseconds: timeoutMs))); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); test('Test socketToSocket', () async { + int timeoutMs = 200; // Bind two ports that SocketConnector.socketToSocket can connect to ServerSocket testExternalServerA = await ServerSocket.bind('127.0.0.1', 0); @@ -194,6 +197,7 @@ void main() { addressB: testExternalServerB.address, portB: testExternalServerB.port, verbose: false, + timeout: Duration(milliseconds: timeoutMs), ); String rcvdA = ''; @@ -235,20 +239,21 @@ void main() { socketA.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(Duration(milliseconds: timeoutMs))); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); - test('Test serverToSocket', () async { + test('Test serverToSocket single', () async { // Bind to a port that SocketConnector.serverToSocket can connect to ServerSocket testExternalServer = await ServerSocket.bind('127.0.0.1', 0); + int timeoutMs = 100; SocketConnector connector = await SocketConnector.serverToSocket( addressB: testExternalServer.address, portB: testExternalServer.port, verbose: false, - timeout: Duration(milliseconds: 100), + timeout: Duration(milliseconds: timeoutMs), ); expect(connector.connections.isEmpty, true); @@ -265,15 +270,14 @@ void main() { }); }); - await readyB.future; - expect(connector.connections.isEmpty, true); - Socket socketA = await Socket.connect( 'localhost', connector.sideAPort!, ); // Wait for SocketConnector to handle the events await (Future.delayed(Duration(milliseconds: 10))); + await readyB.future; + expect(connector.connections.isEmpty, false); socketA.listen((List data) { @@ -292,11 +296,91 @@ void main() { socketA.destroy(); // Wait for SocketConnector to handle the events + await (Future.delayed(Duration(milliseconds: timeoutMs))); + expect(connector.closed, true); + await connector.done.timeout(Duration.zero); + }); + + test('Test serverToSocket multi', () async { + // Bind to a port that SocketConnector.serverToSocket can connect to + ServerSocket testExternalServer = await ServerSocket.bind('127.0.0.1', 0); + + int serverConnections = 0; + SocketConnector connector = await SocketConnector.serverToSocket( + addressB: testExternalServer.address, + portB: testExternalServer.port, + verbose: true, + timeout: Duration(milliseconds: 100), + multi: true, + onConnect: (Socket sideA, Socket sideB) { + serverConnections++; + print('SocketConnector.serverToSocket onConnect called back'); + }); + expect(connector.connections.isEmpty, true); + + List rcvdA = []; + List rcvdB = []; + List bSockets = []; + + Socket? currentSocketB; + testExternalServer.listen((socket) { + currentSocketB = socket; + bSockets.add(socket); + int which = bSockets.length; + socket.listen((List data) { + var msg = '$which: ${String.fromCharCodes(data)}'; + print('socket B ultimate destination received $msg'); + rcvdB.add(msg); + }); + }); + + expect(connector.connections.isEmpty, true); + + int howMany = 5; + List aSockets = []; + for (int i = 0; i < howMany; i++) { + Socket socketA = await Socket.connect( + 'localhost', + connector.sideAPort!, + ); + aSockets.add(socketA); + // Wait for SocketConnector to handle the events + await (Future.delayed(Duration(milliseconds: 10))); + expect(connector.connections.isEmpty, false); + + socketA.listen((List data) { + var msg = '${aSockets.length}: ${String.fromCharCodes(data)}'; + print('socket A ultimate client received $msg'); + rcvdA.add(msg); + }); + + // Wait for the sockets to send and receive data + await Future.delayed(Duration(milliseconds: 10)); + + socketA.write('hello world from side A'); + expect(currentSocketB != null, true); + currentSocketB?.write('hello world from side B'); + // Wait for the sockets to send and receive data + await Future.delayed(Duration(milliseconds: 10)); + + expect(rcvdA.last, "${aSockets.length}: hello world from side B"); + expect(rcvdB.last, "${bSockets.length}: hello world from side A"); + expect(rcvdA.length, i + 1); + expect(rcvdB.length, i + 1); + } + + expect(serverConnections, howMany); + + for (final s in aSockets) { + s.destroy(); + } await (Future.delayed(Duration(milliseconds: 10))); expect(connector.closed, true); + await connector.done.timeout(Duration.zero); }); }); + group('Authenticator tests', () { Future<(bool, Stream?)> goAuthVerifier(Socket socket) async { Completer<(bool, Stream?)> completer = Completer(); @@ -386,7 +470,7 @@ void main() { socketB.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(timeout)); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); @@ -473,7 +557,7 @@ void main() { socketB.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(timeout)); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); @@ -554,7 +638,7 @@ void main() { expect(connector.closed, false); authedB[2].destroy(); - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(timeout)); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); @@ -564,11 +648,12 @@ void main() { // Bind to a port that SocketConnector.socketToServer can connect to ServerSocket testExternalServer = await ServerSocket.bind('127.0.0.1', 0); + var timeout = Duration(milliseconds: 100); SocketConnector connector = await SocketConnector.socketToServer( addressA: testExternalServer.address, portA: testExternalServer.port, transformAtoB: reverser, - timeout: Duration(milliseconds: 100), + timeout: timeout, ); expect(connector.connections.isEmpty, true); @@ -611,12 +696,13 @@ void main() { socketB.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(timeout)); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); test('Test socketToSocket with two prefixing transformers', () async { + int timeoutMs = 100; // Bind two ports that SocketConnector.socketToSocket can connect to ServerSocket testExternalServerA = await ServerSocket.bind('127.0.0.1', 0); @@ -630,6 +716,7 @@ void main() { addressB: testExternalServerB.address, portB: testExternalServerB.port, transformBtoA: bToA, + timeout: Duration(milliseconds: timeoutMs), ); String rcvdA = ''; @@ -670,7 +757,7 @@ void main() { socketA.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(Duration(milliseconds: timeoutMs))); expect(connector.closed, true); await connector.done.timeout(Duration.zero); }); @@ -680,12 +767,13 @@ void main() { // Bind to a port that SocketConnector.serverToSocket can connect to ServerSocket testExternalServer = await ServerSocket.bind('127.0.0.1', 0); + var timeout = Duration(milliseconds: 100); SocketConnector connector = await SocketConnector.serverToSocket( addressB: testExternalServer.address, portB: testExternalServer.port, transformAtoB: reverser, transformBtoA: reverser, - timeout: Duration(milliseconds: 100), + timeout: timeout, ); expect(connector.connections.isEmpty, true); @@ -702,15 +790,13 @@ void main() { }); }); - await readyB.future; - expect(connector.connections.isEmpty, true); - Socket socketA = await Socket.connect( 'localhost', connector.sideAPort!, ); // Wait for SocketConnector to handle the events await (Future.delayed(Duration(milliseconds: 10))); + await readyB.future; expect(connector.connections.isEmpty, false); socketA.listen((List data) { @@ -727,7 +813,7 @@ void main() { socketA.destroy(); // Wait for SocketConnector to handle the events - await (Future.delayed(Duration(milliseconds: 10))); + await (Future.delayed(timeout)); expect(connector.closed, true); await connector.done.timeout(Duration.zero); });