From 5c5c4dd0b288e92d637e9d97b92833dc0e7b5bd6 Mon Sep 17 00:00:00 2001 From: Michael Nabil Date: Wed, 27 Dec 2023 23:30:31 +0200 Subject: [PATCH] Add WebSocket into Swoole and fix the issue in the console --- bin/createSwooleServer.php | 6 +- bin/swoole-server | 99 +++++++++++++++++-- composer.json | 7 +- .../Concerns/InteractsWithServers.php | 38 +++++-- src/Commands/StartCommand.php | 2 - src/Commands/StartFrankenPhpCommand.php | 10 +- src/Concerns/ProvidesConcurrencySupport.php | 9 +- src/Events/WebSocketDisconnect.php | 16 +++ src/Events/WebSocketMessage.php | 17 ++++ src/Events/WebSocketOpen.php | 15 +++ src/Octane.php | 7 +- src/OctaneServiceProvider.php | 14 ++- ...sureRequestsDontExceedMaxExecutionTime.php | 2 +- src/Swoole/Handlers/OnServerStart.php | 2 +- src/Swoole/Handlers/OnWebSocketDisconnect.php | 23 +++++ src/Swoole/Handlers/OnWebSocketHandshake.php | 52 ++++++++++ src/Swoole/Handlers/OnWebSocketMessage.php | 24 +++++ src/Swoole/Handlers/OnWebSocketOpen.php | 24 +++++ src/Swoole/Handlers/OnWorkerStart.php | 14 +-- src/Swoole/SwooleTaskDispatcher.php | 22 +++-- src/Worker.php | 29 ++++++ tests/SwooleTaskDispatcherTest.php | 2 +- 22 files changed, 375 insertions(+), 59 deletions(-) create mode 100644 src/Events/WebSocketDisconnect.php create mode 100644 src/Events/WebSocketMessage.php create mode 100644 src/Events/WebSocketOpen.php create mode 100644 src/Swoole/Handlers/OnWebSocketDisconnect.php create mode 100644 src/Swoole/Handlers/OnWebSocketHandshake.php create mode 100644 src/Swoole/Handlers/OnWebSocketMessage.php create mode 100644 src/Swoole/Handlers/OnWebSocketOpen.php diff --git a/bin/createSwooleServer.php b/bin/createSwooleServer.php index b94bc0d91..821e4b157 100644 --- a/bin/createSwooleServer.php +++ b/bin/createSwooleServer.php @@ -3,11 +3,15 @@ $config = $serverState['octaneConfig']; try { + $serverClass = ($config['swoole']['enableWebSocket'] ?? false) + ? \Swoole\Websocket\Server::class + : \Swoole\Http\Server::class; + $host = $serverState['host'] ?? '127.0.0.1'; $sock = filter_var($host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4) ? SWOOLE_SOCK_TCP : SWOOLE_SOCK_TCP6; - $server = new Swoole\Http\Server( + $server = new $serverClass( $host, $serverState['port'] ?? 8000, $config['swoole']['mode'] ?? SWOOLE_PROCESS, diff --git a/bin/swoole-server b/bin/swoole-server index f51b932bb..cdb5e1feb 100755 --- a/bin/swoole-server +++ b/bin/swoole-server @@ -5,11 +5,18 @@ use Laravel\Octane\RequestContext; use Laravel\Octane\Swoole\Handlers\OnManagerStart; use Laravel\Octane\Swoole\Handlers\OnServerStart; use Laravel\Octane\Swoole\Handlers\OnWorkerStart; +use Laravel\Octane\Swoole\Handlers\OnWebSocketOpen; +use Laravel\Octane\Swoole\Handlers\OnWebSocketDisconnect; +use Laravel\Octane\Swoole\Handlers\OnWebSocketHandshake; +use Laravel\Octane\Swoole\Handlers\OnWebSocketMessage; use Laravel\Octane\Swoole\ServerStateFile; use Laravel\Octane\Swoole\SwooleExtension; use Laravel\Octane\Swoole\WorkerState; -use Swoole\Http\Server; +use Swoole\Http\Request; +use Swoole\Http\Response; +use Swoole\Server; use Swoole\Timer; +use Swoole\WebSocket\Frame; ini_set('display_errors', 'stderr'); @@ -62,7 +69,8 @@ $server->on('managerstart', function () use ($serverState) { require_once __DIR__.'/../src/Swoole/SwooleExtension.php'; (new OnManagerStart( - new SwooleExtension, $serverState['appName'] + new SwooleExtension, + $serverState['appName'] ))(); }); @@ -89,9 +97,13 @@ $workerState->cacheTable = require __DIR__.'/createSwooleCacheTable.php'; $workerState->timerTable = $timerTable; $workerState->tables = require __DIR__.'/createSwooleTables.php'; -$server->on('workerstart', fn (Server $server, $workerId) => - (fn ($basePath) => (new OnWorkerStart( - new SwooleExtension, $basePath, $serverState, $workerState +$server->on( + 'workerstart', + fn (Server $server, $workerId) => (fn ($basePath) => (new OnWorkerStart( + new SwooleExtension, + $basePath, + $serverState, + $workerState ))($server, $workerId))($bootstrap($serverState)) ); @@ -129,6 +141,75 @@ $server->on('request', function ($request, $response) use ($server, $workerState } }); +/* +|-------------------------------------------------------------------------- +| Handle Websocket Events +|-------------------------------------------------------------------------- +| +| The following callbacks manage websocket events when enabled +| +*/ + +if (($serverState['octaneConfig']['swoole']['enableWebSocket'])) { + /* + |-------------------------------------------------------------------------- + | Handle Incoming WebSocket open + |-------------------------------------------------------------------------- + | + | The following callback will handle all incoming new connections. + | + */ + + $server->on('open', fn (Request $request, Response $response) => (new OnWebSocketOpen( + $server, + $serverState, + $workerState + ))($request, $response)); + + /* + |-------------------------------------------------------------------------- + | Handle Incoming WebSocket Connections + |-------------------------------------------------------------------------- + | + | The following callback will handle all incoming connections. + | + */ + + $server->on('handshake', fn (Request $request, Response $response) => (new OnWebSocketHandshake( + $server, + $serverState, + $workerState + ))($request, $response)); + + /* + |-------------------------------------------------------------------------- + | Handle Incoming WebSocket Messages + |-------------------------------------------------------------------------- + | + | The following callback will handle all incoming messages. + | + */ + + $server->on('message', fn (Server $server, Frame $frame) => (new OnWebSocketMessage( + $serverState, + $workerState + ))($server, $frame)); + + /* + |-------------------------------------------------------------------------- + | Handle Disconnect WebSocket Connections + |-------------------------------------------------------------------------- + | + | The following callback will handle all disconnect connections. + | + */ + + $server->on('disconnect', fn (Server $server, int $fd) => (new OnWebSocketDisconnect( + $serverState, + $workerState + ))($server, $fd)); +} + /* |-------------------------------------------------------------------------- | Handle Tasks @@ -140,10 +221,12 @@ $server->on('request', function ($request, $response) use ($server, $workerState | */ -$server->on('task', fn (Server $server, int $taskId, int $fromWorkerId, $data) => +$server->on( + 'task', + fn (Server $server, int $taskId, int $fromWorkerId, $data) => $data === 'octane-tick' - ? $workerState->worker->handleTick() - : $workerState->worker->handleTask($data) + ? $workerState->worker->handleTick() + : $workerState->worker->handleTask($data) ); $server->on('finish', fn (Server $server, int $taskId, $result) => $result); diff --git a/composer.json b/composer.json index 040f68353..56068fe9f 100644 --- a/composer.json +++ b/composer.json @@ -15,8 +15,8 @@ ], "require": { "php": "^8.1.0", - "laravel/framework": "^10.10.1", "laminas/laminas-diactoros": "^3.0", + "laravel/framework": "^10.10.1", "laravel/serializable-closure": "^1.3.0", "nesbot/carbon": "^2.66.0", "symfony/psr-http-message-bridge": "^2.2.0" @@ -32,8 +32,9 @@ "orchestra/testbench": "^8.5.2", "phpstan/phpstan": "^1.10.15", "phpunit/phpunit": "^10.1.3", + "spiral/roadrunner-cli": "^2.5.0", "spiral/roadrunner-http": "^3.0.1", - "spiral/roadrunner-cli": "^2.5.0" + "swoole/ide-helper": "^5.1" }, "bin": [ "bin/roadrunner-worker", @@ -75,6 +76,6 @@ "config": { "sort-packages": true }, - "minimum-stability": "stable", + "minimum-stability": "dev", "prefer-stable": true } diff --git a/src/Commands/Concerns/InteractsWithServers.php b/src/Commands/Concerns/InteractsWithServers.php index 98593901f..375d5680c 100644 --- a/src/Commands/Concerns/InteractsWithServers.php +++ b/src/Commands/Concerns/InteractsWithServers.php @@ -23,7 +23,7 @@ protected function runServer($server, $inspector, $type) sleep(1); } - $this->writeServerRunningMessage(); + $this->writeServerRunningMessage($type); $watcher = $this->startServerWatcher(); @@ -31,15 +31,17 @@ protected function runServer($server, $inspector, $type) while ($server->isRunning()) { $this->writeServerOutput($server); - if ($watcher->isRunning() && - $watcher->getIncrementalOutput()) { + if ( + $watcher->isRunning() && + $watcher->getIncrementalOutput() + ) { $this->info('Application change detected. Restarting workers…'); $inspector->reloadServer(); } elseif ($watcher->isTerminated()) { $this->error( 'Watcher process has terminated. Please ensure Node and chokidar are installed.'.PHP_EOL. - $watcher->getErrorOutput() + $watcher->getErrorOutput() ); return 1; @@ -94,19 +96,41 @@ public function __call($method, $parameters) * * @return void */ - protected function writeServerRunningMessage() + protected function writeServerRunningMessage(string $type) { $this->info('Server running…'); + $urls = [ + " Local: {$this->uri()}", + ]; + + if ($type === 'swoole' && config('octane.swoole.enableWebSocket', false)) { + $uri = (config('octane.https', false) ? 'wss://' : 'ws://').$this->getHost().':'.$this->getPort(); + + $urls += [ + '', + '', + " WebSocket: $uri", + ]; + } + $this->output->writeln([ '', - ' Local: '.($this->hasOption('https') && $this->option('https') ? 'https://' : 'http://').$this->getHost().':'.$this->getPort().' ', + ...$urls, '', - ' Press Ctrl+C to stop the server', + ' Press Ctrl+C to stop the server', '', ]); } + /** + * For the given URI, retrieve the host and port. + */ + protected function uri(): string + { + return (config('octane.https', false) ? 'https://' : 'http://').$this->getHost().':'.$this->getPort(); + } + /** * Retrieve the given server output and flush it. * diff --git a/src/Commands/StartCommand.php b/src/Commands/StartCommand.php index a162d7ab7..a8f158139 100644 --- a/src/Commands/StartCommand.php +++ b/src/Commands/StartCommand.php @@ -25,7 +25,6 @@ class StartCommand extends Command implements SignalableCommandInterface {--max-requests=500 : The number of requests to process before reloading the server} {--rr-config= : The path to the RoadRunner .rr.yaml file} {--caddyfile= : The path to the FrankenPHP Caddyfile file} - {--https : Enable HTTPS, HTTP/2, and HTTP/3, and automatically generate and renew certificates [FrankenPHP only]} {--watch : Automatically reload the server when the application is modified} {--poll : Use file system polling while watching in order to watch files over a network} {--log-level= : Log messages at or above the specified log level}'; @@ -107,7 +106,6 @@ protected function startFrankenPhpServer() '--workers' => $this->option('workers'), '--max-requests' => $this->option('max-requests'), '--caddyfile' => $this->option('caddyfile'), - '--https' => $this->option('https'), '--watch' => $this->option('watch'), '--poll' => $this->option('poll'), '--log-level' => $this->option('log-level'), diff --git a/src/Commands/StartFrankenPhpCommand.php b/src/Commands/StartFrankenPhpCommand.php index 96b105b84..d6422479c 100644 --- a/src/Commands/StartFrankenPhpCommand.php +++ b/src/Commands/StartFrankenPhpCommand.php @@ -29,7 +29,6 @@ class StartFrankenPhpCommand extends Command implements SignalableCommandInterfa {--workers=auto : The number of workers that should be available to handle requests} {--max-requests=500 : The number of requests to process before reloading the server} {--caddyfile= : The path to the FrankenPHP Caddyfile file} - {--https : Enable HTTPS, HTTP/2, and HTTP/3, and automatically generate and renew certificates} {--watch : Automatically reload the server when the application is modified} {--poll : Use file system polling while watching in order to watch files over a network} {--log-level= : Log messages at or above the specified log level}'; @@ -72,13 +71,6 @@ public function handle(ServerProcessInspector $inspector, ServerStateFile $serve $this->forgetEnvironmentVariables(); - $host = $this->option('host'); - $port = $this->getPort(); - - $serverName = $this->option('https') - ? "https://$host:$port" - : "http://:$port"; - $process = tap(new Process([ $frankenphpBinary, 'run', @@ -93,7 +85,7 @@ public function handle(ServerProcessInspector $inspector, ServerStateFile $serve 'CADDY_SERVER_ADMIN_PORT' => $this->adminPort(), 'CADDY_SERVER_LOG_LEVEL' => $this->option('log-level') ?: (app()->environment('local') ? 'INFO' : 'WARN'), 'CADDY_SERVER_LOGGER' => 'json', - 'CADDY_SERVER_SERVER_NAME' => $serverName, + 'CADDY_SERVER_SERVER_NAME' => $this->uri(), 'CADDY_SERVER_WORKER_COUNT' => $this->workerCount() ?: '', 'CADDY_SERVER_EXTRA_DIRECTIVES' => $this->buildMercureConfig(), ])); diff --git a/src/Concerns/ProvidesConcurrencySupport.php b/src/Concerns/ProvidesConcurrencySupport.php index 134f601f6..f3d562607 100644 --- a/src/Concerns/ProvidesConcurrencySupport.php +++ b/src/Concerns/ProvidesConcurrencySupport.php @@ -7,7 +7,6 @@ use Laravel\Octane\Swoole\ServerStateFile; use Laravel\Octane\Swoole\SwooleHttpTaskDispatcher; use Laravel\Octane\Swoole\SwooleTaskDispatcher; -use Swoole\Http\Server; trait ProvidesConcurrencySupport { @@ -33,10 +32,14 @@ public function concurrently(array $tasks, int $waitMilliseconds = 3000) */ public function tasks() { + $serverClass = config('octane.swoole.enableWebSocket', false) + ? \Swoole\Websocket\Server::class + : \Swoole\Http\Server::class; + return match (true) { app()->bound(DispatchesTasks::class) => app(DispatchesTasks::class), - app()->bound(Server::class) => new SwooleTaskDispatcher, - class_exists(Server::class) => (fn (array $serverState) => new SwooleHttpTaskDispatcher( + app()->bound($serverClass) => new SwooleTaskDispatcher, + class_exists($serverClass) => (fn (array $serverState) => new SwooleHttpTaskDispatcher( $serverState['state']['host'] ?? '127.0.0.1', $serverState['state']['port'] ?? '8000', new SequentialTaskDispatcher diff --git a/src/Events/WebSocketDisconnect.php b/src/Events/WebSocketDisconnect.php new file mode 100644 index 000000000..052ff47d6 --- /dev/null +++ b/src/Events/WebSocketDisconnect.php @@ -0,0 +1,16 @@ +bound(Server::class)) { + $serverClass = config('octane.swoole.enableWebSocket', false) + ? \Swoole\Websocket\Server::class + : \Swoole\Http\Server::class; + + if (! app()->bound($serverClass)) { throw new Exception('Tables may only be accessed when using the Swoole server.'); } diff --git a/src/OctaneServiceProvider.php b/src/OctaneServiceProvider.php index 4785822fe..5f39e1f76 100644 --- a/src/OctaneServiceProvider.php +++ b/src/OctaneServiceProvider.php @@ -87,9 +87,13 @@ public function register() }); $this->app->bind(DispatchesCoroutines::class, function ($app) { - return class_exists('Swoole\Http\Server') - ? new SwooleCoroutineDispatcher($app->bound('Swoole\Http\Server')) - : $app->make(SequentialCoroutineDispatcher::class); + $serverClass = config('octane.swoole.enableWebSocket', false) + ? \Swoole\Websocket\Server::class + : \Swoole\Http\Server::class; + + return class_exists($serverClass) + ? new SwooleCoroutineDispatcher($app->bound($serverClass)) + : $app->make(SequentialCoroutineDispatcher::class); }); } @@ -164,8 +168,8 @@ protected function registerCacheDriver() } $store = $this->app->bound('octane.cacheTable') - ? new OctaneStore($this->app['octane.cacheTable']) - : new OctaneArrayStore; + ? new OctaneStore($this->app['octane.cacheTable']) + : new OctaneArrayStore; Event::listen(TickReceived::class, fn () => $store->refreshIntervalCaches()); diff --git a/src/Swoole/Actions/EnsureRequestsDontExceedMaxExecutionTime.php b/src/Swoole/Actions/EnsureRequestsDontExceedMaxExecutionTime.php index 787ea9fa2..e58f98a8d 100644 --- a/src/Swoole/Actions/EnsureRequestsDontExceedMaxExecutionTime.php +++ b/src/Swoole/Actions/EnsureRequestsDontExceedMaxExecutionTime.php @@ -4,7 +4,7 @@ use Laravel\Octane\Swoole\SwooleExtension; use Swoole\Http\Response; -use Swoole\Http\Server; +use Swoole\Server; class EnsureRequestsDontExceedMaxExecutionTime { diff --git a/src/Swoole/Handlers/OnServerStart.php b/src/Swoole/Handlers/OnServerStart.php index 64943571d..d2d29740f 100644 --- a/src/Swoole/Handlers/OnServerStart.php +++ b/src/Swoole/Handlers/OnServerStart.php @@ -23,7 +23,7 @@ public function __construct( /** * Handle the "start" Swoole event. * - * @param \Swoole\Http\Server $server + * @param \Swoole\Server $server * @return void */ public function __invoke($server) diff --git a/src/Swoole/Handlers/OnWebSocketDisconnect.php b/src/Swoole/Handlers/OnWebSocketDisconnect.php new file mode 100644 index 000000000..01597c2f3 --- /dev/null +++ b/src/Swoole/Handlers/OnWebSocketDisconnect.php @@ -0,0 +1,23 @@ +workerState->worker->handleWebSocketDisconnect($server, $fd); + } +} diff --git a/src/Swoole/Handlers/OnWebSocketHandshake.php b/src/Swoole/Handlers/OnWebSocketHandshake.php new file mode 100644 index 000000000..58e120142 --- /dev/null +++ b/src/Swoole/Handlers/OnWebSocketHandshake.php @@ -0,0 +1,52 @@ +header['sec-websocket-key']; + + if (preg_match('#^[+/0-9A-Za-z]{21}[AQgw]==$#', $secWebSocketKey) === 0 || strlen(base64_decode($secWebSocketKey)) !== 16) { + $response->end(); + + return false; + } + + $headers = [ + 'Upgrade' => 'websocket', + 'Connection' => 'Upgrade', + 'Sec-WebSocket-Accept' => base64_encode(sha1($request->header['sec-websocket-key'].'258EAFA5-E914-47DA-95CA-C5AB0DC85B11', true)), + 'Sec-WebSocket-Version' => '13', + ]; + + if (isset($request->header['sec-websocket-protocol'])) { + $headers['Sec-WebSocket-Protocol'] = $request->header['sec-websocket-protocol']; + } + + foreach ($headers as $key => $value) { + $response->header($key, $value); + } + + $response->status(101); + $response->end(); + + return true; + } +} diff --git a/src/Swoole/Handlers/OnWebSocketMessage.php b/src/Swoole/Handlers/OnWebSocketMessage.php new file mode 100644 index 000000000..12c211137 --- /dev/null +++ b/src/Swoole/Handlers/OnWebSocketMessage.php @@ -0,0 +1,24 @@ +workerState->worker->handleWebSocketMessage($server, $frame); + } +} diff --git a/src/Swoole/Handlers/OnWebSocketOpen.php b/src/Swoole/Handlers/OnWebSocketOpen.php new file mode 100644 index 000000000..34cf8e6d8 --- /dev/null +++ b/src/Swoole/Handlers/OnWebSocketOpen.php @@ -0,0 +1,24 @@ +workerState->worker->handleWebSocketOpen($server, $fd); + } +} diff --git a/src/Swoole/Handlers/OnWorkerStart.php b/src/Swoole/Handlers/OnWorkerStart.php index 0d928d79b..9732a0edd 100644 --- a/src/Swoole/Handlers/OnWorkerStart.php +++ b/src/Swoole/Handlers/OnWorkerStart.php @@ -8,7 +8,7 @@ use Laravel\Octane\Swoole\SwooleExtension; use Laravel\Octane\Swoole\WorkerState; use Laravel\Octane\Worker; -use Swoole\Http\Server; +use Swoole\Server; use Throwable; class OnWorkerStart @@ -25,10 +25,9 @@ public function __construct( /** * Handle the "workerstart" Swoole event. * - * @param \Swoole\Http\Server $server * @return void */ - public function __invoke($server, int $workerId) + public function __invoke(Server $server, int $workerId) { $this->clearOpcodeCache(); @@ -53,10 +52,9 @@ public function __invoke($server, int $workerId) /** * Boot the Octane worker and application. * - * @param \Swoole\Http\Server $server * @return \Laravel\Octane\Worker|null */ - protected function bootWorker($server) + protected function bootWorker(Server $server) { try { return tap(new Worker( @@ -77,10 +75,9 @@ protected function bootWorker($server) /** * Start the Octane server tick to dispatch the tick task every second. * - * @param \Swoole\Http\Server $server * @return void */ - protected function dispatchServerTickTaskEverySecond($server) + protected function dispatchServerTickTaskEverySecond(Server $server) { // ... } @@ -88,10 +85,9 @@ protected function dispatchServerTickTaskEverySecond($server) /** * Register the request handled listener that will output request information per request. * - * @param \Swoole\Http\Server $server * @return void */ - protected function streamRequestsToConsole($server) + protected function streamRequestsToConsole(Server $server) { $this->workerState->worker->onRequestHandled(function ($request, $response, $sandbox) { if (! $sandbox->environment('local', 'testing')) { diff --git a/src/Swoole/SwooleTaskDispatcher.php b/src/Swoole/SwooleTaskDispatcher.php index 2d7910875..b5cd543ba 100644 --- a/src/Swoole/SwooleTaskDispatcher.php +++ b/src/Swoole/SwooleTaskDispatcher.php @@ -8,10 +8,18 @@ use Laravel\Octane\Exceptions\TaskExceptionResult; use Laravel\Octane\Exceptions\TaskTimeoutException; use Laravel\SerializableClosure\SerializableClosure; -use Swoole\Http\Server; class SwooleTaskDispatcher implements DispatchesTasks { + protected string $serverClass; + + public function __construct() + { + $this->serverClass = config('octane.swoole.enableWebSocket', false) + ? \Swoole\Websocket\Server::class + : \Swoole\Http\Server::class; + } + /** * Concurrently resolve the given callbacks via background tasks, returning the results. * @@ -23,14 +31,14 @@ class SwooleTaskDispatcher implements DispatchesTasks */ public function resolve(array $tasks, int $waitMilliseconds = 3000): array { - if (! app()->bound(Server::class)) { + if (! app()->bound($this->serverClass)) { throw new InvalidArgumentException('Tasks can only be resolved within a Swoole server context / web request.'); } - $results = app(Server::class)->taskWaitMulti(collect($tasks)->mapWithKeys(function ($task, $key) { + $results = app($this->serverClass)->taskWaitMulti(collect($tasks)->mapWithKeys(function ($task, $key) { return [$key => $task instanceof Closure - ? new SerializableClosure($task) - : $task, ]; + ? new SerializableClosure($task) + : $task, ]; })->all(), $waitMilliseconds / 1000); if ($results === false) { @@ -61,11 +69,11 @@ public function resolve(array $tasks, int $waitMilliseconds = 3000): array */ public function dispatch(array $tasks): void { - if (! app()->bound(Server::class)) { + if (! app()->bound($this->serverClass)) { throw new InvalidArgumentException('Tasks can only be dispatched within a Swoole server context / web request.'); } - $server = app(Server::class); + $server = app($this->serverClass); collect($tasks)->each(function ($task) use ($server) { $server->task($task instanceof Closure ? new SerializableClosure($task) : $task); diff --git a/src/Worker.php b/src/Worker.php index d3b1e33d1..a49e07586 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -12,12 +12,17 @@ use Laravel\Octane\Events\TaskTerminated; use Laravel\Octane\Events\TickReceived; use Laravel\Octane\Events\TickTerminated; +use Laravel\Octane\Events\WebSocketDisconnect; +use Laravel\Octane\Events\WebSocketMessage; +use Laravel\Octane\Events\WebSocketOpen; use Laravel\Octane\Events\WorkerErrorOccurred; use Laravel\Octane\Events\WorkerStarting; use Laravel\Octane\Events\WorkerStopping; use Laravel\Octane\Exceptions\TaskExceptionResult; use Laravel\Octane\Swoole\TaskResult; use RuntimeException; +use Swoole\Server; +use Swoole\WebSocket\Frame; use Throwable; class Worker implements WorkerContract @@ -239,4 +244,28 @@ public function terminate(): void { $this->dispatchEvent($this->app, new WorkerStopping($this->app)); } + + /** + * Handle an incoming open from the worker. + */ + public function handleWebSocketOpen(Server $server) + { + $this->dispatchEvent($this->app, new WebSocketOpen($this->app, $server)); + } + + /** + * Handle an incoming message from the worker. + */ + public function handleWebSocketMessage(Server $server, Frame $frame) + { + $this->dispatchEvent($this->app, new WebSocketMessage($this->app, $server, $frame)); + } + + /** + * Handle an incoming disconnect from the worker. + */ + public function handleWebSocketDisconnect(Server $server, int $fd) + { + $this->dispatchEvent($this->app, new WebSocketDisconnect($this->app, $server, $fd)); + } } diff --git a/tests/SwooleTaskDispatcherTest.php b/tests/SwooleTaskDispatcherTest.php index b51f7121a..e9e95ca5b 100644 --- a/tests/SwooleTaskDispatcherTest.php +++ b/tests/SwooleTaskDispatcherTest.php @@ -12,7 +12,7 @@ use Laravel\Octane\Swoole\TaskResult; use Mockery; use Orchestra\Testbench\TestCase; -use Swoole\Http\Server; +use Swoole\Server; class SwooleTaskDispatcherTest extends TestCase {