diff --git a/src/Plugin/DistributedInsert/Handler.php b/src/Plugin/DistributedInsert/Handler.php index 583bc550..f23b36fa 100644 --- a/src/Plugin/DistributedInsert/Handler.php +++ b/src/Plugin/DistributedInsert/Handler.php @@ -318,10 +318,11 @@ protected function assignId(Struct $struct): int { $id = match (true) { // _bulk isset($struct['index']['_id']) => $struct['index']['_id'], - // insert, deletc etc + // insert, delete etc isset($struct['id']) => $struct['id'], // bulk isset($struct['insert']['id']) => $struct['insert']['id'], + isset($struct['replace']['id']) => $struct['replace']['id'], default => array_pop($idPool), }; // When id = 0 we generate @@ -343,10 +344,11 @@ protected function assignId(Struct $struct): int { $index['_id'] = "$id"; $struct['index'] = $index; } elseif ($this->payload->type === 'sql') { - /** @var Struct<"insert",array{id:string|int}> $struct */ - $insert = $struct['insert']; - $insert['id'] = (int)$id; - $struct['insert'] = $insert; + /** @var Struct<"insert"|"replace",array{id:string|int}> $struct */ + $key = isset($struct['replace']) ? 'replace' : 'insert'; + $row = $struct[$key]; + $row['id'] = (int)$id; + $struct[$key] = $row; } else { /** @var Struct<"id",string|int> $struct */ $struct['id'] = $id; @@ -369,11 +371,12 @@ protected function assignTable(Struct $struct, string $cluster, string $shardNam $index['_index'] = $table; $struct['index'] = $index; } elseif ($this->payload->type === 'sql') { - /** @var Struct<"insert",array{table:string}> $struct */ - $insert = $struct['insert']; - /** @var array{table:string} $insert */ - $insert['table'] = $table; - $struct['insert'] = $insert; + /** @var Struct<"insert"|"replace",array{table:string}> $struct */ + $key = isset($struct['replace']) ? 'replace' : 'insert'; + $row = $struct[$key]; + /** @var array{table:string} $row */ + $row['table'] = $table; + $struct[$key] = $row; } else { /** @var Struct $struct */ $struct['table'] = $table; diff --git a/src/Plugin/DistributedInsert/Payload.php b/src/Plugin/DistributedInsert/Payload.php index 44b782ae..41a02f07 100644 --- a/src/Plugin/DistributedInsert/Payload.php +++ b/src/Plugin/DistributedInsert/Payload.php @@ -100,8 +100,9 @@ public static function hasMatch(Request $request): bool { $isSqlEndpoint = $request->endpointBundle === Endpoint::Sql || $request->endpointBundle === Endpoint::Cli; $hasMatch = $isSqlEndpoint - && strpos($request->error, 'not support insert') !== true - && strpos($request->payload, 'insert') === 0; + && ($request->command === 'insert' || $request->command === 'replace') + && $hasErrorMessage + ; } return $hasMatch; @@ -228,19 +229,19 @@ protected static function parseSinglePayload(Request $request): array { * @return Batch */ protected static function parseSqlPayload(Request $request): array { - static $insertPattern = '/^insert\s+into\s+' + static $queryPattern = '/^(insert|replace)\s+into\s+' . '`?([a-z][a-z\_\-0-9]*)`?' . '(?:\s*\(([^)]+)\))?\s+' . 'values\s*\((.*)\)/ius'; static $valuePattern = "/'(?:[^'\\\\]*(?:\\\\.[^'\\\\]*)*)'" . '|\\d+(?:\\.\\d+)?|NULL|\\{(?:[^{}]|\\{[^{}]*\\})*\\}/'; - - preg_match($insertPattern, $request->payload, $matches); - $table = $matches[1] ?? null; + preg_match($queryPattern, $request->payload, $matches); + $type = strtolower($matches[1]); + $table = $matches[2] ?? null; $fields = []; - if (isset($matches[2])) { - $fields = array_map('trim', explode(',', $matches[2])); + if (isset($matches[3])) { + $fields = array_map('trim', explode(',', $matches[3])); $fields = array_map( function ($field) { return trim($field, '`'); @@ -248,7 +249,7 @@ function ($field) { ); } if (!$fields) { - throw QueryParseError::create('INSERT into a sharded table requires specifying the fields.'); + throw QueryParseError::create(strtoupper($type) . ' into a sharded table requires specifying the fields.'); } if (!$table) { throw QueryParseError::create('Failed to parse table from the query'); @@ -256,11 +257,11 @@ function ($field) { [$cluster, $table] = static::parseCluster($table); // It's time to parse values - if (!isset($matches[3])) { + if (!isset($matches[4])) { throw QueryParseError::create('Failed to parse values from the query'); } - $values = &$matches[3]; + $values = &$matches[4]; preg_match_all($valuePattern, $values, $matches); $values = &$matches[0]; /* $values = array_map(trim(...), $matches[0]); */ @@ -277,13 +278,13 @@ function ($field) { if (!$isLast) { continue; } - $insert = []; + $row = []; if (isset($doc['id'])) { - $insert['id'] = (int)$doc['id']; + $row['id'] = (int)$doc['id']; unset($doc['id']); } - $insert['doc'] = $doc; - $batch[] = Struct::fromData(['insert' => $insert]); + $row['doc'] = $doc; + $batch[] = Struct::fromData([$type => $row]); $doc = []; } /** @var Batch */