Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use system tables for sharding #406

Merged
merged 6 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/Plugin/DistributedInsert/Handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,6 @@ protected function getNewDocIds(int $count = 1): array {
* @throws RuntimeException
*/
protected function getShards(string $table): array {
static $map = [];
if (isset($map[$table])) {
return $map[$table];
}
[$locals, $agents] = $this->parseShards($table);

$shards = [];
Expand Down
5 changes: 3 additions & 2 deletions src/Plugin/DistributedInsert/Payload.php
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,10 @@ function ($field) {
$batch = new Vector();
$doc = [];
for ($i = 0; $i < $valueCount; $i++) {
$index = $i % $fieldCount;
$index = ($i + 1) % $fieldCount;
$doc[$fields[$index]] = trim($values[$i], "'");
$isLast = ($index - 1) === 0;
// We have edge case when single field and last is first also
$isLast = $index === 0;
if (!$isLast) {
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/ModifyTable/Payload.php
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static function hasMatch(Request $request): bool {

$payload = preg_replace(
[
'/(?P<key>rf|shards)\s*=\s*(?P<value>\d+)/',
'/(?P<key>rf|shards|timeout)\s*=\s*(?P<value>\d+)/ius',
'/(?P<key>[a-zA-Z_]+)\s*=\s*\'(?P<value>[^\']+)\'/',
],
'', $request->payload
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/Sharding/CreateHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ protected static function getShardingFn(): Closure {
$timeout = $payload->getShardingTimeout();
while (true) {
// TODO: think about the way to refactor it and remove duplication
$q = "select value[0] as value from _sharding_state where `key` = 'table:{$payload->table}'";
$q = "select value[0] as value from system.sharding_state where `key` = 'table:{$payload->table}'";
$resp = $client->sendRequest($q);
$result = $resp->getResult();
/** @var array{0:array{data?:array{0:array{value:string}}}} $result */
Expand Down
2 changes: 1 addition & 1 deletion src/Plugin/Sharding/DropHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ protected function validate(): ?Task {
*/
protected function getTableState(string $table): array {
// TODO: think about the way to refactor it and remove duplication
$q = "select value[0] as value from _sharding_state where `key` = 'table:{$table}'";
$q = "select value[0] as value from system.sharding_state where `key` = 'table:{$table}'";
$resp = $this->manticoreClient->sendRequest($q);

/** @var array{0:array{data?:array{0:array{value:string}}}} $result */
Expand Down
4 changes: 2 additions & 2 deletions src/Plugin/Sharding/Payload.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected static function fromCreate(Request $request): static {
/** @var array{table:string,cluster?:string,structure:string,extra:string} $matches */
$options = [];
if ($matches['extra']) {
$pattern = '/(?P<key>rf|shards|timeout)\s*=\s*(?P<value>\'?\d+\'?)/';
$pattern = '/(?P<key>rf|shards|timeout)\s*=\s*(?P<value>\'?\d+\'?)/ius';
if (preg_match_all($pattern, $matches['extra'], $optionMatches, PREG_SET_ORDER)) {
foreach ($optionMatches as $optionMatch) {
$key = strtolower($optionMatch['key']);
Expand Down Expand Up @@ -141,7 +141,7 @@ public static function hasMatch(Request $request): bool {
&& (
(stripos($request->payload, 'create table') === 0
&& stripos($request->payload, 'shards') !== false
&& preg_match('/(?P<key>rf|shards)\s*=\s*(?P<value>[\'"]?\d+[\'"]?)/', $request->payload)
&& preg_match('/(?P<key>rf|shards)\s*=\s*(?P<value>[\'"]?\d+[\'"]?)/ius', $request->payload)
) || stripos($request->payload, 'drop') === 0
);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Plugin/Sharding/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function __construct(
protected Cluster $cluster,
protected Client $client
) {
$this->table = '_sharding_queue';
$this->table = 'system.sharding_queue';
}

/**
Expand Down Expand Up @@ -270,7 +270,7 @@ public function setup(): void {
'Trying to initialize while already initialized.'
);
}
$query = "CREATE TABLE `{$this->table}` (
$query = "CREATE TABLE {$this->table} (
`node` string,
`query` string,
`wait_for_id` bigint,
Expand Down
4 changes: 2 additions & 2 deletions src/Plugin/Sharding/State.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ final class State {
public function __construct(
protected Client $client
) {
$this->table = '_sharding_state';
$this->table = 'system.sharding_state';
}

/**
Expand Down Expand Up @@ -151,7 +151,7 @@ public function setup(): void {
'Trying to initialize while already initialized.'
);
}
$query = "CREATE TABLE `{$this->table}` (
$query = "CREATE TABLE {$this->table} (
`key` string,
`value` json,
`updated_at` timestamp
Expand Down
13 changes: 7 additions & 6 deletions src/Plugin/Sharding/Table.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public function __construct(
protected readonly string $structure,
protected readonly string $extra
) {
$this->table = '_sharding_table';
$this->table = 'system.sharding_table';
}

/**
Expand Down Expand Up @@ -618,7 +618,7 @@ protected function getCreateTableShardSQL(int $shard): string {
// We can call this method on rebalancing, that means
// table may exist, so to suppress error we use
// if not exists to keep logic simpler
return "CREATE TABLE IF NOT EXISTS `{$this->getTableShardName($shard)}` {$structure} {$this->extra}";
return "CREATE TABLE IF NOT EXISTS {$this->getTableShardName($shard)} {$structure} {$this->extra}";
}

/**
Expand All @@ -627,7 +627,7 @@ protected function getCreateTableShardSQL(int $shard): string {
* @return string
*/
protected function getTableShardName(int $shard): string {
return "{$this->name}_s{$shard}";
return "system.{$this->name}_s{$shard}";
}

/**
Expand All @@ -639,7 +639,7 @@ protected function getCreateShardedTableSQL(Set $shards): string {
// Calculate local tables
$locals = new Set;
foreach ($shards as $shard) {
$locals->add("local='{$this->name}_s{$shard}'");
$locals->add("local='{$this->getTableShardName($shard)}'");
}

// Calculate external tables
Expand All @@ -649,8 +649,9 @@ protected function getCreateShardedTableSQL(Set $shards): string {
foreach ($nodes as $row) {
foreach ($row['shards'] as $shard) {
$map[$shard] ??= new Set;
$shardName = $this->getTableShardName($shard);
// @phpstan-ignore-next-line
$map[$shard]->add("{$row['node']}:{$this->name}_s{$shard}");
$map[$shard]->add("{$row['node']}:{$shardName}");
}
}

Expand Down Expand Up @@ -710,7 +711,7 @@ public function setup(): void {
'Trying to initialize while already initialized.'
);
}
$query = "CREATE TABLE `{$this->table}` (
$query = "CREATE TABLE {$this->table} (
`cluster` string,
`node` string,
`table` string,
Expand Down