High level abstraction on top of innmind/stream
to work with streams in a more functional way.
composer require innmind/io
Note
examples below use innmind/operating-system
use Innmind\IO\IO;
use Innmind\OperatingSystem\Factory;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Stream\Streams;
use Innmind\Immutable\Str;
$os = Factory::build();
$streams = Streams::fromAmbienAuthority();
$io = IO::of($os->sockets()->watch(...));
$chunks = $io
->readable()
->wrap(
$streams
->readable()
->acquire(\fopen('/some/file.ext', 'r')),
)
->toEncoding(Str\Encoding::ascii)
// or call ->watch() to wait forever for the stream to be ready before
// reading from it
->timeoutAfter(ElapsedPeriod::of(1_000))
->chunks(8192) // max length of each chunk
->lazy()
->sequence();
The $chunks
variable is a Innmind\Innmutable\Sequence
containing Innmind\Immutable\Str
values, where each value is of a maximum length of 8192
bytes. Before a value is yielded it will make sure data is available before reading from the stream. If no data is available within 1
second the Sequence
will throw an exception saying it can't read from the stream, if you don't want it to throw replace timeoutAfter()
by watch()
so it will wait as long as it needs to.
use Innmind\IO\IO;
use Innmind\OperatingSystem\Factory;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Stream\Streams;
use Innmind\Immutable\Str;
$os = Factory::build();
$streams = Streams::fromAmbienAuthority();
$io = IO::of($os->sockets()->watch(...));
$lines = $io
->readable()
->wrap(
$streams
->readable()
->acquire(\fopen('/some/file.ext', 'r')),
)
->toEncoding(Str\Encoding::ascii)
// or call ->watch() to wait forever for the stream to be ready before
// reading from it
->timeoutAfter(ElapsedPeriod::of(1_000))
->lines()
->lazy()
->sequence();
This is the same as reading by chunks (described above) except that the delimiter is the end of line character \n
.
use Innmind\IO\{
IO,
Readable\Frame,
};
use Innmind\OperatingSystem\Factory;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Socket\{
Address,
Client,
};
use Innmind\Stream\Streams;
use Innmind\Immutable\{
Str,
Sequence,
};
$socket = Client\Unix::of(Address\Unix::of('/tmp/foo'))->match(
static fn($socket) => $socket,
static fn() => throw new \RuntimeException;
);
$os = Factory::build();
$io = IO::of($os->sockets()->watch(...));
$frame = $io
->sockets()
->clients()
->wrap($socket)
->toEncoding(Str\Encoding::ascii)
->timeoutAfter(ElapsedPeriod::of(1_000))
->heartbeatWith(static fn() => Sequence::of(Str::of('heartbeat')))
->frames(Frame\Line::new())
->one()
->match(
static fn($line) => $line,
static fn() => throw new \RuntimeException,
);
This example will wait to read a single from the socket /tmp/foo.sock
and it will send a heartbeat
message every second until the expected line is received.
use Innmind\IO\IO;
use Innmind\OperatingSystem\Factory;
use Innmind\TimeContinuum\Earth\ElapsedPeriod;
use Innmind\Stream\Streams;
use Innmind\Socket\Address\Unix;
use Innmind\Immutable\{
Str,
Fold,
Either,
};
$os = Factory::build();
$streams = Streams::fromAmbienAuthority();
$io = IO::of($os->sockets()->watch(...));
$io
->readable()
->wrap(
$os
->sockets()
->connectTo(Unix::of('/some/socket')),
)
->toEncoding('ASCII')
// or call ->watch() to wait forever for the stream to be ready before
// reading from it
->timeoutAfter(ElapsedPeriod::of(1_000))
->chunks(8192) // max length of each chunk
->fold(
Fold::with([]),
static function(array $chunks, Str $chunk) {
$chunks[] = $chunk->toString();
if ($chunk->contains('quit')) {
return Fold::result($chunks);
}
if ($chunk->contains('throw')) {
return Fold::fail('some error');
}
return Fold::with($chunks);
},
)
->match(
static fn(Either $result) => $result->match(
static fn(array $chunks) => doStuff($chunks),
static fn(string $error) => throw new \Exception($error), // $error === 'some error'
),
static fn() => throw new \RuntimeException('Failed to read from the stream or it timed out'),
);
This example will:
- open the local socket
/some/socket
- watch the socket to be ready for
1
second before it times out each time it tries to read from it - read chunks of a maximum length of
8192
- use the encoding
ASCII
- call the function passed to
->fold()
each time a chunk is read - it will continue reading from the stream until one of the chunks contains
quit
orthrow
- return a
Maybe<Either<string, list<string>>>
- contains nothing when it failed to read from the stream or it timed out
string
is the value passed toFold::fail()
list<string>
is the value passed toFold::result()
You can think of this fold
operation as a reduce where you can control when to stop iterating by return either Fold::fail()
or Fold::result()
.