Commit c9fef251 authored by Mickaël Bourgier's avatar Mickaël Bourgier
Browse files

🔥 Remove groupable events

This is not the responsibility of this library to provide this kind of feature
parent 2e78eb02
<?php
namespace Webf\ProjectionBundle\Event;
interface GroupableEventInterface extends EventInterface
{
public function getGroup(): string;
public function isLastOfGroup(): bool;
}
<?php
namespace Webf\ProjectionBundle\Stream;
use Webf\ProjectionBundle\Event\GroupableEventInterface;
/**
* A group is a set of events, a closed group is when there will be no more
* events of that group from the inner stream, an unclosed group is when there
* will eventually be more events of that group from the inner stream.
*/
class GroupByStream implements GroupableStreamInterface
{
private GroupableStreamInterface $innerStream;
/** @var GroupableEventInterface[][] a buffer to store events of unclosed groups */
private array $allGroupsBuffer = [];
/** @var GroupableEventInterface[] */
private array $currentGroupBuffer = [];
public function __construct(GroupableStreamInterface $innerStream)
{
$this->innerStream = $innerStream;
}
public function getEventTypes(): array
{
return $this->innerStream->getEventTypes();
}
public function nextEvent(): ?GroupableEventInterface
{
if (!empty($this->currentGroupBuffer)) {
// Return events of the current group buffer in priority
return array_shift($this->currentGroupBuffer);
}
// Poll events from the inner stream and store them while it doesn't return a last-of-group event
do {
$event = $this->innerStream->nextEvent();
if (null !== $event) {
$this->allGroupsBuffer[$event->getGroup()][] = $event;
}
} while (null !== $event && !$event->isLastOfGroup());
if (empty($this->allGroupsBuffer)) {
// No more events are polled from the inner stream
return null;
}
if (null !== $event) {
// The last event of a group has been catched
// Move events into the current group buffer
$this->currentGroupBuffer = $this->allGroupsBuffer[$event->getGroup()];
unset($this->allGroupsBuffer[$event->getGroup()]);
} else {
// There is no more event from the inner stream, all groups are considered as closed
// Move all the events into the current group buffer so they can all be consumed on later calls
foreach ($this->allGroupsBuffer as $groupBuffer) {
foreach ($groupBuffer as $event) {
$this->currentGroupBuffer[] = $event;
}
}
$this->allGroupsBuffer = [];
}
return array_shift($this->currentGroupBuffer);
}
}
<?php
namespace Webf\ProjectionBundle\Stream;
use Webf\ProjectionBundle\Event\GroupableEventInterface;
interface GroupableStreamInterface extends StreamInterface
{
public function nextEvent(): ?GroupableEventInterface;
}
<?php
namespace Tests\Event\Stub;
use DateTimeImmutable;
use Webf\ProjectionBundle\Event\GroupableEventInterface;
class GroupableEventStub extends EventStub implements GroupableEventInterface
{
private string $group;
private bool $isLastOfGroup;
public function __construct(
string $type,
string $group,
bool $isLastOfGroup = false,
DateTimeImmutable $date = null
) {
parent::__construct($type, $date);
$this->group = $group;
$this->isLastOfGroup = $isLastOfGroup;
}
public function getGroup(): string
{
return $this->group;
}
public function isLastOfGroup(): bool
{
return $this->isLastOfGroup;
}
}
<?php
namespace Tests\Stream;
use PHPUnit\Framework\TestCase;
use Tests\Event\Stub\GroupableEventStub;
use Tests\Stream\Stub\GroupableStreamStub;
use Webf\ProjectionBundle\Stream\GroupByStream;
/**
* @internal
* @covers \Webf\ProjectionBundle\Stream\GroupByStream
*/
class GroupByStreamTest extends TestCase
{
public function test_get_event_types_returns_event_types_of_inner_stream()
{
$eventTypes = ['a', 'b', 'c'];
$innerStream = new GroupableStreamStub($eventTypes);
$stream = new GroupByStream($innerStream);
$this->assertEquals($eventTypes, $stream->getEventTypes());
}
public function test_next_event_returns_events_of_inner_stream_by_group()
{
$innerStream = GroupableStreamStub::fromEvents([
$e1 = new GroupableEventStub('a', '1'),
$e2 = new GroupableEventStub('a', '2'),
$e3 = new GroupableEventStub('a', '3'),
$e4 = new GroupableEventStub('a', '1'),
$e5 = new GroupableEventStub('a', '3'),
$e6 = new GroupableEventStub('a', '1'),
$e7 = new GroupableEventStub('a', '3', true),
$e8 = new GroupableEventStub('a', '2'),
$e9 = new GroupableEventStub('a', '2'),
]);
$stream = new GroupByStream($innerStream);
$events = [];
while (null !== ($event = $stream->nextEvent())) {
$events[] = $event;
}
$this->assertEquals([$e3, $e5, $e7, $e1, $e4, $e6, $e2, $e8, $e9], $events);
}
}
<?php
namespace Tests\Stream\Stub;
use Webf\ProjectionBundle\Event\GroupableEventInterface;
use Webf\ProjectionBundle\Stream\GroupableStreamInterface;
class GroupableStreamStub implements GroupableStreamInterface
{
/** @var string[] */
private array $eventTypes;
/** @var GroupableEventInterface[] */
private array $events;
/**
* @param string[] $eventTypes
* @param GroupableEventInterface[] $events
*/
public function __construct(array $eventTypes = [], array $events = [])
{
$this->eventTypes = $eventTypes;
$this->events = $events;
}
/**
* @param GroupableEventInterface[] $events
*/
public static function fromEvents(array $events = []): self
{
$eventTypes = [];
foreach ($events as $event) {
$eventTypes[$event->getType()] = true;
}
return new self(array_keys($eventTypes), $events);
}
public function getEventTypes(): array
{
return $this->eventTypes;
}
public function nextEvent(): ?GroupableEventInterface
{
return array_shift($this->events);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment