Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\PostgreSql\Schema\SortingStrategy;

use Flow\ETL\Adapter\PostgreSql\EntryTypesMap;
use Flow\ETL\Adapter\PostgreSql\Exception\TypeMappingException;
use Flow\ETL\Adapter\PostgreSql\PostgreSqlMetadata;
use Flow\ETL\Schema\Definition;
use Flow\ETL\Schema\SortingStrategy;

use function Flow\Types\DSL\type_string;

final readonly class TypeStrategy implements SortingStrategy
{
public function __construct(
private EntryTypesMap $typesMap = new EntryTypesMap(),
) {}

/**
* @param Definition<mixed> $left
* @param Definition<mixed> $right
*/
public function compare(Definition $left, Definition $right): int
{
$leftPrimaryKey = $left->metadata()->has(PostgreSqlMetadata::PRIMARY_KEY->value);
$rightPrimaryKey = $right->metadata()->has(PostgreSqlMetadata::PRIMARY_KEY->value);

if ($leftPrimaryKey !== $rightPrimaryKey) {
return $leftPrimaryKey ? -1 : 1;
}

$typeComparison = $this->postgreSqlType($left) <=> $this->postgreSqlType($right);

if ($typeComparison !== 0) {
return $typeComparison;
}

return $left->entry()->name() <=> $right->entry()->name();
}

/**
* @param Definition<mixed> $definition
*/
private function postgreSqlType(Definition $definition): string
{
$metadata = $definition->metadata();

if ($metadata->has(PostgreSqlMetadata::TYPE->value)) {
return (string) $metadata->getAs(PostgreSqlMetadata::TYPE->value, type_string());
}

try {
return $this->typesMap->toColumnType($definition->type())->normalize()['name'];
} catch (TypeMappingException) {
return "\xff" . $definition->type()::class;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Flow\ETL\Adapter\PostgreSql\Pagination\Key;
use Flow\ETL\Adapter\PostgreSql\Pagination\KeySet;
use Flow\ETL\Adapter\PostgreSql\Pagination\Order;
use Flow\ETL\Adapter\PostgreSql\Schema\SortingStrategy\TypeStrategy;
use Flow\ETL\Attribute\DocumentationDSL;
use Flow\ETL\Attribute\Module;
use Flow\ETL\Attribute\Type as DSLType;
Expand Down Expand Up @@ -179,3 +180,9 @@ function pgsql_table_to_flow_schema(Table $table, ?EntryTypesMap $typesMap = nul
{
return (new SchemaConverter($typesMap))->toFlowSchema($table);
}

#[DocumentationDSL(module: Module::POSTGRESQL, type: DSLType::HELPER)]
function pgsql_schema_sort_by_type(?EntryTypesMap $typesMap = null): TypeStrategy
{
return new TypeStrategy($typesMap ?? new EntryTypesMap());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Adapter\PostgreSql\Tests\Unit\Schema\SortingStrategy;

use Flow\ETL\Adapter\PostgreSql\PostgreSqlMetadata;
use Flow\ETL\Adapter\PostgreSql\Schema\SortingStrategy\TypeStrategy;
use PHPUnit\Framework\TestCase;

use function array_keys;
use function Flow\ETL\Adapter\PostgreSql\pgsql_schema_sort_by_type;
use function Flow\ETL\DSL\bool_schema;
use function Flow\ETL\DSL\int_schema;
use function Flow\ETL\DSL\schema;
use function Flow\ETL\DSL\str_schema;

final class TypeStrategyTest extends TestCase
{
public function test_explicit_type_metadata_overrides_mapping(): void
{
$schema = schema(int_schema('a', metadata: PostgreSqlMetadata::type('zzz_custom')), int_schema('b'));

static::assertSame(['b', 'a'], array_keys($schema->sort(new TypeStrategy())->definitions()));
}

public function test_groups_by_type_then_name(): void
{
$schema = schema(str_schema('label'), int_schema('count'), bool_schema('active'), int_schema('amount'));

static::assertSame(
['active', 'amount', 'count', 'label'],
array_keys($schema->sort(new TypeStrategy())->definitions()),
);
}

public function test_primary_key_columns_come_first(): void
{
$schema = schema(
str_schema('name'),
int_schema('id', metadata: PostgreSqlMetadata::primaryKey()),
str_schema('email'),
);

static::assertSame(
['id', 'email', 'name'],
array_keys($schema->sort(pgsql_schema_sort_by_type())->definitions()),
);
}
}
43 changes: 43 additions & 0 deletions src/core/etl/src/Flow/ETL/DSL/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@
use Flow\ETL\Schema\Formatter\PHPSchemaFormatter;
use Flow\ETL\Schema\Metadata;
use Flow\ETL\Schema\SchemaFormatter;
use Flow\ETL\Schema\SortingStrategy;
use Flow\ETL\Schema\SortingStrategy\AlphabeticalStrategy;
use Flow\ETL\Schema\SortingStrategy\CombinedStrategy;
use Flow\ETL\Schema\SortingStrategy\MetadataStrategy;
use Flow\ETL\Schema\SortingStrategy\TypeStrategy;
use Flow\ETL\Schema\SortingStrategy\TypeStrategy\TypePriorities as SchemaTypePriorities;
use Flow\ETL\Schema\Validator\EvolvingValidator;
use Flow\ETL\Schema\Validator\SelectiveValidator;
use Flow\ETL\Schema\Validator\StrictValidator;
Expand Down Expand Up @@ -2368,6 +2374,43 @@ function compare_entries_by_type_and_name(
);
}

#[DocumentationDSL(module: Module::CORE, type: DSLType::SCHEMA)]
function schema_sort_by_name(SortOrder $order = SortOrder::ASC): SortingStrategy
{
return new AlphabeticalStrategy($order);
}

/**
* @param array<class-string<Type<mixed>>, int> $priorities
*/
#[DocumentationDSL(module: Module::CORE, type: DSLType::SCHEMA)]
function schema_sort_by_type(
array $priorities = SchemaTypePriorities::PRIORITIES,
SortOrder $order = SortOrder::ASC,
): SortingStrategy {
return new TypeStrategy(new SchemaTypePriorities($priorities), $order);
}

/**
* @param array<class-string<Type<mixed>>, int> $priorities
*/
#[DocumentationDSL(module: Module::CORE, type: DSLType::SCHEMA)]
function schema_sort_by_type_and_name(
array $priorities = SchemaTypePriorities::PRIORITIES,
SortOrder $order = SortOrder::ASC,
): SortingStrategy {
return new CombinedStrategy(
new TypeStrategy(new SchemaTypePriorities($priorities), $order),
new AlphabeticalStrategy($order),
);
}

#[DocumentationDSL(module: Module::CORE, type: DSLType::SCHEMA)]
function schema_sort_by_metadata(string $key, SortOrder $order = SortOrder::ASC): SortingStrategy
{
return new MetadataStrategy($key, $order);
}

/**
* @param array<string|Type<mixed>>|Type<mixed> $type
* @param mixed $value
Expand Down
17 changes: 17 additions & 0 deletions src/core/etl/src/Flow/ETL/Schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
use Flow\ETL\Row\References;
use Flow\ETL\Schema\Definition;
use Flow\ETL\Schema\Metadata;
use Flow\ETL\Schema\SortingStrategy;
use Flow\ETL\Schema\SortingStrategy\AlphabeticalStrategy;

use function array_key_exists;
use function array_keys;
Expand All @@ -27,6 +29,7 @@
use function implode;
use function is_array;
use function sprintf;
use function usort;

final class Schema implements Countable
{
Expand Down Expand Up @@ -585,6 +588,20 @@ public function setMetadata(string $definition, Metadata $metadata): self
return $this;
}

/**
* @return Schema
*/
public function sort(SortingStrategy $strategy = new AlphabeticalStrategy()): self
{
$definitions = array_values($this->definitions);

usort($definitions, static fn(Definition $left, Definition $right): int => $strategy->compare($left, $right));

$this->setDefinitions(...$definitions);

return $this;
}

private function indexOf(string|Reference $reference): int
{
$index = array_search(EntryReference::init($reference)->name(), array_keys($this->definitions), true);
Expand Down
14 changes: 14 additions & 0 deletions src/core/etl/src/Flow/ETL/Schema/SortingStrategy.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Schema;

interface SortingStrategy
{
/**
* @param Definition<mixed> $left
* @param Definition<mixed> $right
*/
public function compare(Definition $left, Definition $right): int;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Schema\SortingStrategy;

use Flow\ETL\Row\SortOrder;
use Flow\ETL\Schema\Definition;
use Flow\ETL\Schema\SortingStrategy;

final readonly class AlphabeticalStrategy implements SortingStrategy
{
public function __construct(
private SortOrder $order = SortOrder::ASC,
) {}

/**
* @param Definition<mixed> $left
* @param Definition<mixed> $right
*/
public function compare(Definition $left, Definition $right): int
{
if ($this->order === SortOrder::ASC) {
return $left->entry()->name() <=> $right->entry()->name();
}

return $right->entry()->name() <=> $left->entry()->name();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Schema\SortingStrategy;

use Flow\ETL\Schema\Definition;
use Flow\ETL\Schema\SortingStrategy;

final readonly class CombinedStrategy implements SortingStrategy
{
public function __construct(
private SortingStrategy $first,
private SortingStrategy $second,
) {}

/**
* @param Definition<mixed> $left
* @param Definition<mixed> $right
*/
public function compare(Definition $left, Definition $right): int
{
$result = $this->first->compare($left, $right);

if ($result === 0) {
return $this->second->compare($left, $right);
}

return $result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Schema\SortingStrategy;

use Flow\ETL\Row\SortOrder;
use Flow\ETL\Schema\Definition;
use Flow\ETL\Schema\SortingStrategy;

final readonly class MetadataStrategy implements SortingStrategy
{
public function __construct(
private string $key,
private SortOrder $order = SortOrder::ASC,
) {}

/**
* @param Definition<mixed> $left
* @param Definition<mixed> $right
*/
public function compare(Definition $left, Definition $right): int
{
$leftHas = $left->metadata()->has($this->key);
$rightHas = $right->metadata()->has($this->key);

if ($leftHas && $rightHas) {
$comparison = $this->order === SortOrder::ASC
? $left->metadata()->get($this->key) <=> $right->metadata()->get($this->key)
: $right->metadata()->get($this->key) <=> $left->metadata()->get($this->key);

if ($comparison !== 0) {
return $comparison;
}

return $left->entry()->name() <=> $right->entry()->name();
}

if ($leftHas) {
return -1;
}

if ($rightHas) {
return 1;
}

return $left->entry()->name() <=> $right->entry()->name();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace Flow\ETL\Schema\SortingStrategy;

use Flow\ETL\Row\SortOrder;
use Flow\ETL\Schema\Definition;
use Flow\ETL\Schema\SortingStrategy;
use Flow\ETL\Schema\SortingStrategy\TypeStrategy\TypePriorities;

final readonly class TypeStrategy implements SortingStrategy
{
public function __construct(
private TypePriorities $priorities = new TypePriorities(),
private SortOrder $order = SortOrder::ASC,
) {}

/**
* @param Definition<mixed> $left
* @param Definition<mixed> $right
*/
public function compare(Definition $left, Definition $right): int
{
$leftPriority = $this->priorities->for($left);
$rightPriority = $this->priorities->for($right);

return $this->order === SortOrder::ASC ? $leftPriority <=> $rightPriority : $rightPriority <=> $leftPriority;
}
}
Loading
Loading