Skip to content

Commit

Permalink
Feature: support replace into distributed table
Browse files Browse the repository at this point in the history
  • Loading branch information
donhardman committed Jan 14, 2025
1 parent 0670d60 commit f344f18
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 25 deletions.
23 changes: 13 additions & 10 deletions src/Plugin/DistributedInsert/Handler.php
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,11 @@ protected function assignId(Struct $struct): int {
$id = match (true) {
// _bulk
isset($struct['index']['_id']) => $struct['index']['_id'],
// insert, deletc etc
// insert, delete etc
isset($struct['id']) => $struct['id'],
// bulk
isset($struct['insert']['id']) => $struct['insert']['id'],
isset($struct['replace']['id']) => $struct['replace']['id'],
default => array_pop($idPool),
};
// When id = 0 we generate
Expand All @@ -343,10 +344,11 @@ protected function assignId(Struct $struct): int {
$index['_id'] = "$id";
$struct['index'] = $index;
} elseif ($this->payload->type === 'sql') {
/** @var Struct<"insert",array{id:string|int}> $struct */
$insert = $struct['insert'];
$insert['id'] = (int)$id;
$struct['insert'] = $insert;
/** @var Struct<"insert"|"replace",array{id:string|int}> $struct */
$key = isset($struct['replace']) ? 'replace' : 'insert';
$row = $struct[$key];
$row['id'] = (int)$id;
$struct[$key] = $row;
} else {
/** @var Struct<"id",string|int> $struct */
$struct['id'] = $id;
Expand All @@ -369,11 +371,12 @@ protected function assignTable(Struct $struct, string $cluster, string $shardNam
$index['_index'] = $table;
$struct['index'] = $index;
} elseif ($this->payload->type === 'sql') {
/** @var Struct<"insert",array{table:string}> $struct */
$insert = $struct['insert'];
/** @var array{table:string} $insert */
$insert['table'] = $table;
$struct['insert'] = $insert;
/** @var Struct<"insert"|"replace",array{table:string}> $struct */
$key = isset($struct['replace']) ? 'replace' : 'insert';
$row = $struct[$key];
/** @var array{table:string} $row */
$row['table'] = $table;
$struct[$key] = $row;
} else {
/** @var Struct<string,string|int> $struct */
$struct['table'] = $table;
Expand Down
31 changes: 16 additions & 15 deletions src/Plugin/DistributedInsert/Payload.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ public static function hasMatch(Request $request): bool {
$isSqlEndpoint = $request->endpointBundle === Endpoint::Sql
|| $request->endpointBundle === Endpoint::Cli;
$hasMatch = $isSqlEndpoint
&& strpos($request->error, 'not support insert') !== true
&& strpos($request->payload, 'insert') === 0;
&& ($request->command === 'insert' || $request->command === 'replace')
&& $hasErrorMessage
;
}

return $hasMatch;
Expand Down Expand Up @@ -228,39 +229,39 @@ protected static function parseSinglePayload(Request $request): array {
* @return Batch
*/
protected static function parseSqlPayload(Request $request): array {
static $insertPattern = '/^insert\s+into\s+'
static $queryPattern = '/^(insert|replace)\s+into\s+'
. '`?([a-z][a-z\_\-0-9]*)`?'
. '(?:\s*\(([^)]+)\))?\s+'
. 'values\s*\((.*)\)/ius';
static $valuePattern = "/'(?:[^'\\\\]*(?:\\\\.[^'\\\\]*)*)'"
. '|\\d+(?:\\.\\d+)?|NULL|\\{(?:[^{}]|\\{[^{}]*\\})*\\}/';


preg_match($insertPattern, $request->payload, $matches);
$table = $matches[1] ?? null;
preg_match($queryPattern, $request->payload, $matches);
$type = strtolower($matches[1]);
$table = $matches[2] ?? null;
$fields = [];
if (isset($matches[2])) {
$fields = array_map('trim', explode(',', $matches[2]));
if (isset($matches[3])) {
$fields = array_map('trim', explode(',', $matches[3]));
$fields = array_map(
function ($field) {
return trim($field, '`');
}, $fields
);
}
if (!$fields) {
throw QueryParseError::create('INSERT into a sharded table requires specifying the fields.');
throw QueryParseError::create(strtoupper($type) . ' into a sharded table requires specifying the fields.');
}
if (!$table) {
throw QueryParseError::create('Failed to parse table from the query');
}
[$cluster, $table] = static::parseCluster($table);

// It's time to parse values
if (!isset($matches[3])) {
if (!isset($matches[4])) {
throw QueryParseError::create('Failed to parse values from the query');
}

$values = &$matches[3];
$values = &$matches[4];
preg_match_all($valuePattern, $values, $matches);
$values = &$matches[0];
/* $values = array_map(trim(...), $matches[0]); */
Expand All @@ -277,13 +278,13 @@ function ($field) {
if (!$isLast) {
continue;
}
$insert = [];
$row = [];
if (isset($doc['id'])) {
$insert['id'] = (int)$doc['id'];
$row['id'] = (int)$doc['id'];
unset($doc['id']);
}
$insert['doc'] = $doc;
$batch[] = Struct::fromData(['insert' => $insert]);
$row['doc'] = $doc;
$batch[] = Struct::fromData([$type => $row]);
$doc = [];
}
/** @var Batch */
Expand Down

0 comments on commit f344f18

Please sign in to comment.