diff --git a/pkg/sql/plan/bind_insert.go b/pkg/sql/plan/bind_insert.go index cb089cb8f124e..fb31e7ff2ff3b 100644 --- a/pkg/sql/plan/bind_insert.go +++ b/pkg/sql/plan/bind_insert.go @@ -33,14 +33,6 @@ func (builder *QueryBuilder) bindInsert(stmt *tree.Insert, bindCtx *BindContext) return 0, err } - // clusterTable, err := getAccountInfoOfClusterTable(ctx, stmt.Accounts, tableDef, tblInfo.isClusterTable[0]) - // if err != nil { - // return 0, err - // } - // if len(stmt.OnDuplicateUpdate) > 0 && clusterTable.IsClusterTable { - // return 0, moerr.NewNotSupported(builder.compCtx.GetContext(), "INSERT ... ON DUPLICATE KEY UPDATE ... for cluster table") - // } - if stmt.IsRestore { builder.isRestore = true if stmt.IsRestoreByTs { @@ -58,7 +50,7 @@ func (builder *QueryBuilder) bindInsert(stmt *tree.Insert, bindCtx *BindContext) }() } - lastNodeID, colName2Idx, skipUniqueIdx, err := builder.initInsertStmt(bindCtx, stmt, dmlCtx.objRefs[0], dmlCtx.tableDefs[0]) + lastNodeID, colName2Idx, skipUniqueIdx, err := builder.initInsertReplaceStmt(bindCtx, stmt.Rows, stmt.Columns, dmlCtx.objRefs[0], dmlCtx.tableDefs[0], false) if err != nil { return 0, err } @@ -90,11 +82,6 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert( skipUniqueIdx []bool, astUpdateExprs tree.UpdateExprs, ) (int32, error) { - var err error - - selectNode := builder.qry.Nodes[lastNodeID] - selectTag := selectNode.BindingTags[0] - tableDef := dmlCtx.tableDefs[0] pkName := tableDef.Pkey.PkeyColName @@ -109,7 +96,13 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert( } } - var onDupAction plan.Node_OnDuplicateAction + var ( + err error + onDupAction plan.Node_OnDuplicateAction + ) + + selectNode := builder.qry.Nodes[lastNodeID] + selectTag := selectNode.BindingTags[0] scanTag := builder.genNewTag() updateExprs := make(map[string]*plan.Expr) @@ -676,20 +669,20 @@ func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindInsert( // If the INSERT statement specifies the columns, it validates the column names against the table definition // and returns an error if any of the column names are invalid. // The function returns the list of insert columns and an error, if any. -func (builder *QueryBuilder) getInsertColsFromStmt(stmt *tree.Insert, tableDef *TableDef) ([]string, error) { +func (builder *QueryBuilder) getInsertColsFromStmt(astCols tree.IdentifierList, tableDef *TableDef) ([]string, error) { var insertColNames []string colToIdx := make(map[string]int) for i, col := range tableDef.Cols { colToIdx[strings.ToLower(col.Name)] = i } - if stmt.Columns == nil { + if astCols == nil { for _, col := range tableDef.Cols { if !col.Hidden { insertColNames = append(insertColNames, col.Name) } } } else { - for _, column := range stmt.Columns { + for _, column := range astCols { colName := strings.ToLower(string(column)) idx, ok := colToIdx[colName] if !ok { @@ -701,7 +694,7 @@ func (builder *QueryBuilder) getInsertColsFromStmt(stmt *tree.Insert, tableDef * return insertColNames, nil } -func (builder *QueryBuilder) initInsertStmt(bindCtx *BindContext, stmt *tree.Insert, objRef *plan.ObjectRef, tableDef *plan.TableDef) (int32, map[string]int32, []bool, error) { +func (builder *QueryBuilder) initInsertReplaceStmt(bindCtx *BindContext, astRows *tree.Select, astCols tree.IdentifierList, objRef *plan.ObjectRef, tableDef *plan.TableDef, isReplace bool) (int32, map[string]int32, []bool, error) { var ( lastNodeID int32 err error @@ -711,12 +704,12 @@ func (builder *QueryBuilder) initInsertStmt(bindCtx *BindContext, stmt *tree.Ins var insertColumns []string //var ifInsertFromUniqueColMap map[string]bool - if insertColumns, err = builder.getInsertColsFromStmt(stmt, tableDef); err != nil { + if insertColumns, err = builder.getInsertColsFromStmt(astCols, tableDef); err != nil { return 0, nil, nil, err } var astSelect *tree.Select - switch selectImpl := stmt.Rows.Select.(type) { + switch selectImpl := astRows.Select.(type) { // rewrite 'insert into tbl values (1,1)' to 'insert into tbl select * from (values row(1,1))' case *tree.ValuesClause: isAllDefault := false @@ -741,7 +734,7 @@ func (builder *QueryBuilder) initInsertStmt(bindCtx *BindContext, stmt *tree.Ins // example1:insert into a values (); // but it does not work at the case: // insert into a(a) values (); insert into a values (0),(); - if isAllDefault && stmt.Columns != nil { + if isAllDefault && astCols != nil { return 0, nil, nil, moerr.NewInvalidInput(builder.GetContext(), "insert values does not match the number of columns") } lastNodeID, err = builder.buildValueScan(isAllDefault, bindCtx, tableDef, selectImpl, insertColumns) @@ -750,7 +743,7 @@ func (builder *QueryBuilder) initInsertStmt(bindCtx *BindContext, stmt *tree.Ins } case *tree.SelectClause: - astSelect = stmt.Rows + astSelect = astRows subCtx := NewBindContext(builder, bindCtx) lastNodeID, err = builder.bindSelect(astSelect, subCtx, false) @@ -810,7 +803,11 @@ func (builder *QueryBuilder) initInsertStmt(bindCtx *BindContext, stmt *tree.Ins insertColToExpr[column] = projExpr } - return builder.appendNodesForInsertStmt(bindCtx, lastNodeID, tableDef, objRef, insertColToExpr) + if isReplace { + return builder.appendNodesForReplaceStmt(bindCtx, lastNodeID, tableDef, objRef, insertColToExpr) + } else { + return builder.appendNodesForInsertStmt(bindCtx, lastNodeID, tableDef, objRef, insertColToExpr) + } } func (builder *QueryBuilder) appendNodesForInsertStmt( diff --git a/pkg/sql/plan/bind_replace.go b/pkg/sql/plan/bind_replace.go new file mode 100644 index 0000000000000..4f6e980bb496c --- /dev/null +++ b/pkg/sql/plan/bind_replace.go @@ -0,0 +1,800 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package plan + +import ( + "fmt" + "strings" + + "github.com/matrixorigin/matrixone/pkg/catalog" + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" +) + +func (builder *QueryBuilder) bindReplace(stmt *tree.Replace, bindCtx *BindContext) (int32, error) { + dmlCtx := NewDMLContext() + err := dmlCtx.ResolveTables(builder.compCtx, tree.TableExprs{stmt.Table}, nil, nil, true) + if err != nil { + return 0, err + } + + lastNodeID, colName2Idx, skipUniqueIdx, err := builder.initInsertReplaceStmt(bindCtx, stmt.Rows, stmt.Columns, dmlCtx.objRefs[0], dmlCtx.tableDefs[0], true) + if err != nil { + return 0, err + } + + return builder.appendDedupAndMultiUpdateNodesForBindReplace(bindCtx, dmlCtx, lastNodeID, colName2Idx, skipUniqueIdx) +} + +func (builder *QueryBuilder) appendDedupAndMultiUpdateNodesForBindReplace( + bindCtx *BindContext, + dmlCtx *DMLContext, + lastNodeID int32, + colName2Idx map[string]int32, + skipUniqueIdx []bool, +) (int32, error) { + objRef := dmlCtx.objRefs[0] + tableDef := dmlCtx.tableDefs[0] + pkName := tableDef.Pkey.PkeyColName + + if tableDef.TableType != catalog.SystemOrdinaryRel && + tableDef.TableType != catalog.SystemIndexRel { + return 0, moerr.NewUnsupportedDML(builder.GetContext(), "insert into vector/text index table") + } + + for _, idxDef := range tableDef.Indexes { + if !catalog.IsRegularIndexAlgo(idxDef.IndexAlgo) { + return 0, moerr.NewUnsupportedDML(builder.GetContext(), "have vector index table") + } + } + + if pkName == catalog.FakePrimaryKeyColName { + return 0, moerr.NewUnsupportedDML(builder.GetContext(), "fake primary key") + //return builder.appendDedupAndMultiUpdateNodesForBindInsert(bindCtx, dmlCtx, lastNodeID, colName2Idx, skipUniqueIdx, nil) + } + + selectNode := builder.qry.Nodes[lastNodeID] + selectTag := selectNode.BindingTags[0] + + fullProjTag := builder.genNewTag() + fullProjList := make([]*plan.Expr, 0, len(selectNode.ProjectList)+len(tableDef.Cols)) + for i, expr := range selectNode.ProjectList { + fullProjList = append(fullProjList, &plan.Expr{ + Typ: expr.Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: selectTag, + ColPos: int32(i), + }, + }, + }) + } + + idxObjRefs := make([]*plan.ObjectRef, len(tableDef.Indexes)) + idxTableDefs := make([]*plan.TableDef, len(tableDef.Indexes)) + + oldColName2Idx := make(map[string][2]int32) + + // get old columns from existing main table + { + oldScanTag := builder.genNewTag() + + builder.addNameByColRef(oldScanTag, tableDef) + + oldScanNodeID := builder.appendNode(&plan.Node{ + NodeType: plan.Node_TABLE_SCAN, + TableDef: tableDef, + ObjRef: objRef, + BindingTags: []int32{oldScanTag}, + ScanSnapshot: bindCtx.snapshot, + }, bindCtx) + + for i, col := range tableDef.Cols { + oldColName2Idx[tableDef.Name+"."+col.Name] = [2]int32{fullProjTag, int32(len(fullProjList))} + fullProjList = append(fullProjList, &plan.Expr{ + Typ: col.Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: oldScanTag, + ColPos: int32(i), + }, + }, + }) + } + + for i, idxDef := range tableDef.Indexes { + if !idxDef.TableExist { + continue + } + + idxObjRefs[i], idxTableDefs[i] = builder.compCtx.ResolveIndexTableByRef(objRef, idxDef.IndexTableName, bindCtx.snapshot) + + if len(idxDef.Parts) == 1 { + oldColName2Idx[idxDef.IndexTableName+"."+catalog.IndexTableIndexColName] = oldColName2Idx[tableDef.Name+"."+idxDef.Parts[0]] + } else { + args := make([]*plan.Expr, len(idxDef.Parts)) + for j, part := range idxDef.Parts { + colIdx := tableDef.Name2ColIndex[catalog.ResolveAlias(part)] + args[j] = &plan.Expr{ + Typ: tableDef.Cols[colIdx].Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: oldScanTag, + ColPos: colIdx, + }, + }, + } + } + + idxExpr := args[0] + if len(idxDef.Parts) > 1 { + funcName := "serial" + if !idxDef.Unique { + funcName = "serial_full" + } + idxExpr, _ = BindFuncExprImplByPlanExpr(builder.GetContext(), funcName, args) + } + + oldColName2Idx[idxDef.IndexTableName+"."+catalog.IndexTableIndexColName] = [2]int32{fullProjTag, int32(len(fullProjList))} + fullProjList = append(fullProjList, idxExpr) + } + } + + pkPos := tableDef.Name2ColIndex[pkName] + pkTyp := tableDef.Cols[pkPos].Typ + leftExpr := &plan.Expr{ + Typ: pkTyp, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: selectTag, + ColPos: colName2Idx[tableDef.Name+"."+pkName], + }, + }, + } + rightExpr := &plan.Expr{ + Typ: pkTyp, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: oldScanTag, + ColPos: pkPos, + }, + }, + } + + joinCond, _ := BindFuncExprImplByPlanExpr(builder.GetContext(), "=", []*plan.Expr{ + leftExpr, + rightExpr, + }) + + lastNodeID = builder.appendNode(&plan.Node{ + NodeType: plan.Node_JOIN, + Children: []int32{lastNodeID, oldScanNodeID}, + JoinType: plan.Node_LEFT, + OnList: []*plan.Expr{joinCond}, + }, bindCtx) + + lastNodeID = builder.appendNode(&plan.Node{ + NodeType: plan.Node_PROJECT, + ProjectList: fullProjList, + Children: []int32{lastNodeID}, + BindingTags: []int32{fullProjTag}, + }, bindCtx) + } + + // detect primary key confliction + { + scanTag := builder.genNewTag() + + // handle primary/unique key confliction + builder.addNameByColRef(scanTag, tableDef) + + scanNodeID := builder.appendNode(&plan.Node{ + NodeType: plan.Node_TABLE_SCAN, + TableDef: tableDef, + ObjRef: objRef, + BindingTags: []int32{scanTag}, + ScanSnapshot: bindCtx.snapshot, + }, bindCtx) + + pkPos := tableDef.Name2ColIndex[pkName] + pkTyp := tableDef.Cols[pkPos].Typ + leftExpr := &plan.Expr{ + Typ: pkTyp, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: scanTag, + ColPos: pkPos, + }, + }, + } + + rightExpr := &plan.Expr{ + Typ: pkTyp, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: fullProjTag, + ColPos: colName2Idx[tableDef.Name+"."+pkName], + }, + }, + } + + joinCond, _ := BindFuncExprImplByPlanExpr(builder.GetContext(), "=", []*plan.Expr{ + leftExpr, + rightExpr, + }) + + var dedupColName string + dedupColTypes := make([]plan.Type, len(tableDef.Pkey.Names)) + + if len(tableDef.Pkey.Names) == 1 { + dedupColName = tableDef.Pkey.Names[0] + } else { + dedupColName = "(" + strings.Join(tableDef.Pkey.Names, ",") + ")" + } + + for i, part := range tableDef.Pkey.Names { + dedupColTypes[i] = tableDef.Cols[tableDef.Name2ColIndex[part]].Typ + } + + oldPkPos := oldColName2Idx[tableDef.Name+"."+pkName] + + dedupJoinNode := &plan.Node{ + NodeType: plan.Node_JOIN, + Children: []int32{scanNodeID, lastNodeID}, + JoinType: plan.Node_DEDUP, + OnList: []*plan.Expr{joinCond}, + OnDuplicateAction: plan.Node_FAIL, + DedupColName: dedupColName, + DedupColTypes: dedupColTypes, + DedupJoinCtx: &plan.DedupJoinCtx{ + OldColList: []plan.ColRef{ + { + RelPos: oldPkPos[0], + ColPos: oldPkPos[1], + }, + }, + }, + } + + lastNodeID = builder.appendNode(dedupJoinNode, bindCtx) + } + + // detect unique key confliction + for i, idxDef := range tableDef.Indexes { + if !idxDef.TableExist || !idxDef.Unique { + continue + } + + idxTag := builder.genNewTag() + builder.addNameByColRef(idxTag, idxTableDefs[i]) + + idxScanNode := &plan.Node{ + NodeType: plan.Node_TABLE_SCAN, + TableDef: idxTableDefs[i], + ObjRef: idxObjRefs[i], + BindingTags: []int32{idxTag}, + ScanSnapshot: bindCtx.snapshot, + } + idxTableNodeID := builder.appendNode(idxScanNode, bindCtx) + + idxPkPos := idxTableDefs[i].Name2ColIndex[catalog.IndexTableIndexColName] + pkTyp := idxTableDefs[i].Cols[idxPkPos].Typ + + leftExpr := &plan.Expr{ + Typ: pkTyp, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: idxTag, + ColPos: idxPkPos, + }, + }, + } + + rightExpr := &plan.Expr{ + Typ: pkTyp, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: fullProjTag, + ColPos: colName2Idx[idxTableDefs[i].Name+"."+catalog.IndexTableIndexColName], + }, + }, + } + + joinCond, _ := BindFuncExprImplByPlanExpr(builder.GetContext(), "=", []*plan.Expr{ + leftExpr, + rightExpr, + }) + + var dedupColName string + dedupColTypes := make([]plan.Type, len(idxDef.Parts)) + + if len(idxDef.Parts) == 1 { + dedupColName = idxDef.Parts[0] + } else { + dedupColName = "(" + for j, part := range idxDef.Parts { + if j == 0 { + dedupColName += catalog.ResolveAlias(part) + } else { + dedupColName += "," + catalog.ResolveAlias(part) + } + } + dedupColName += ")" + } + + for j, part := range idxDef.Parts { + dedupColTypes[j] = tableDef.Cols[tableDef.Name2ColIndex[catalog.ResolveAlias(part)]].Typ + } + + oldPkPos := oldColName2Idx[idxTableDefs[i].Name+"."+catalog.IndexTableIndexColName] + + lastNodeID = builder.appendNode(&plan.Node{ + NodeType: plan.Node_JOIN, + Children: []int32{idxTableNodeID, lastNodeID}, + JoinType: plan.Node_DEDUP, + OnList: []*plan.Expr{joinCond}, + OnDuplicateAction: plan.Node_FAIL, + DedupColName: dedupColName, + DedupColTypes: dedupColTypes, + DedupJoinCtx: &plan.DedupJoinCtx{ + OldColList: []plan.ColRef{ + { + RelPos: oldPkPos[0], + ColPos: oldPkPos[1], + }, + }, + }, + }, bindCtx) + } + + // get old RowID for index tables + for i, idxDef := range tableDef.Indexes { + if !idxDef.TableExist { + continue + } + + idxTag := builder.genNewTag() + builder.addNameByColRef(idxTag, idxTableDefs[i]) + + idxScanNode := &plan.Node{ + NodeType: plan.Node_TABLE_SCAN, + TableDef: idxTableDefs[i], + ObjRef: idxObjRefs[i], + BindingTags: []int32{idxTag}, + ScanSnapshot: bindCtx.snapshot, + } + idxTableNodeID := builder.appendNode(idxScanNode, bindCtx) + + oldColName2Idx[idxTableDefs[i].Name+"."+catalog.Row_ID] = [2]int32{idxTag, idxTableDefs[i].Name2ColIndex[catalog.Row_ID]} + + idxPkPos := idxTableDefs[i].Name2ColIndex[catalog.IndexTableIndexColName] + pkTyp := idxTableDefs[i].Cols[idxPkPos].Typ + + leftExpr := &plan.Expr{ + Typ: pkTyp, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: idxTag, + ColPos: idxPkPos, + }, + }, + } + + oldPkPos := oldColName2Idx[idxTableDefs[i].Name+"."+catalog.IndexTableIndexColName] + oldColName2Idx[idxTableDefs[i].Name+"."+catalog.IndexTableIndexColName] = [2]int32{idxTag, idxTableDefs[i].Name2ColIndex[catalog.IndexTableIndexColName]} + + rightExpr := &plan.Expr{ + Typ: pkTyp, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: oldPkPos[0], + ColPos: oldPkPos[1], + }, + }, + } + + joinCond, _ := BindFuncExprImplByPlanExpr(builder.GetContext(), "=", []*plan.Expr{ + leftExpr, + rightExpr, + }) + + lastNodeID = builder.appendNode(&plan.Node{ + NodeType: plan.Node_JOIN, + Children: []int32{lastNodeID, idxTableNodeID}, + JoinType: plan.Node_LEFT, + OnList: []*plan.Expr{joinCond}, + }, bindCtx) + } + + lockTargets := make([]*plan.LockTarget, 0) + updateCtxList := make([]*plan.UpdateCtx, 0) + + finalProjTag := builder.genNewTag() + finalProjList := make([]*plan.Expr, 0, len(tableDef.Cols)+len(tableDef.Indexes)*2) + var newPkIdx int32 + + { + insertCols := make([]plan.ColRef, len(tableDef.Cols)-1) + deleteCols := make([]plan.ColRef, 2) + + for i, col := range tableDef.Cols { + finalColIdx := len(finalProjList) + + if col.Name != catalog.Row_ID { + insertCols[i].RelPos = finalProjTag + insertCols[i].ColPos = int32(finalColIdx) + } + + colIdx := colName2Idx[tableDef.Name+"."+col.Name] + finalProjList = append(finalProjList, &plan.Expr{ + Typ: fullProjList[colIdx].Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: fullProjTag, + ColPos: int32(colIdx), + }, + }, + }) + + if col.Name == tableDef.Pkey.PkeyColName { + newPkIdx = int32(finalColIdx) + } + } + + lockTargets = append(lockTargets, &plan.LockTarget{ + TableId: tableDef.TblId, + ObjRef: objRef, + PrimaryColIdxInBat: newPkIdx, + PrimaryColRelPos: finalProjTag, + PrimaryColTyp: finalProjList[newPkIdx].Typ, + }) + + oldRowIdPos := oldColName2Idx[tableDef.Name+"."+catalog.Row_ID] + deleteCols[0].RelPos = finalProjTag + deleteCols[0].ColPos = int32(len(finalProjList)) + finalProjList = append(finalProjList, &plan.Expr{ + Typ: fullProjList[oldRowIdPos[1]].Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: fullProjTag, + ColPos: oldRowIdPos[1], + }, + }, + }) + + oldPkPos := oldColName2Idx[tableDef.Name+"."+tableDef.Pkey.PkeyColName] + deleteCols[1].RelPos = finalProjTag + deleteCols[1].ColPos = int32(len(finalProjList)) + lockTargets = append(lockTargets, &plan.LockTarget{ + TableId: tableDef.TblId, + ObjRef: objRef, + PrimaryColIdxInBat: int32(len(finalProjList)), + PrimaryColRelPos: finalProjTag, + PrimaryColTyp: finalProjList[newPkIdx].Typ, + }) + finalProjList = append(finalProjList, &plan.Expr{ + Typ: fullProjList[oldPkPos[1]].Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: fullProjTag, + ColPos: oldPkPos[1], + }, + }, + }) + + updateCtxList = append(updateCtxList, &plan.UpdateCtx{ + ObjRef: objRef, + TableDef: tableDef, + InsertCols: insertCols, + DeleteCols: deleteCols, + }) + } + + for i, idxDef := range tableDef.Indexes { + if !idxDef.TableExist { + continue + } + + insertCols := make([]plan.ColRef, 2) + deleteCols := make([]plan.ColRef, 2) + + newIdxPos := colName2Idx[idxDef.IndexTableName+"."+catalog.IndexTableIndexColName] + if len(idxDef.Parts) > 1 { + idxExpr := &plan.Expr{ + Typ: fullProjList[newIdxPos].Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: fullProjTag, + ColPos: newIdxPos, + }, + }, + } + newIdxPos = int32(len(finalProjList)) + finalProjList = append(finalProjList, idxExpr) + } + + oldRowIdPos := int32(len(finalProjList)) + oldColRef := oldColName2Idx[idxDef.IndexTableName+"."+catalog.Row_ID] + rowIdExpr := &plan.Expr{ + Typ: idxTableDefs[i].Cols[idxTableDefs[i].Name2ColIndex[catalog.Row_ID]].Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: oldColRef[0], + ColPos: oldColRef[1], + }, + }, + } + finalProjList = append(finalProjList, rowIdExpr) + + oldIdxPos := int32(len(finalProjList)) + oldColRef = oldColName2Idx[idxDef.IndexTableName+"."+catalog.IndexTableIndexColName] + idxExpr := &plan.Expr{ + Typ: fullProjList[newIdxPos].Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: oldColRef[0], + ColPos: oldColRef[1], + }, + }, + } + finalProjList = append(finalProjList, idxExpr) + + insertCols[0].RelPos = finalProjTag + insertCols[0].ColPos = int32(newIdxPos) + insertCols[1].RelPos = finalProjTag + insertCols[1].ColPos = newPkIdx + + deleteCols[0].RelPos = finalProjTag + deleteCols[0].ColPos = oldRowIdPos + deleteCols[1].RelPos = finalProjTag + deleteCols[1].ColPos = int32(oldIdxPos) + + updateCtxList = append(updateCtxList, &plan.UpdateCtx{ + ObjRef: idxObjRefs[i], + TableDef: idxTableDefs[i], + InsertCols: insertCols, + DeleteCols: deleteCols, + }) + + if idxDef.Unique { + lockTargets = append(lockTargets, &plan.LockTarget{ + TableId: idxTableDefs[i].TblId, + ObjRef: idxObjRefs[i], + PrimaryColIdxInBat: int32(newIdxPos), + PrimaryColRelPos: finalProjTag, + PrimaryColTyp: finalProjList[newIdxPos].Typ, + }, &plan.LockTarget{ + TableId: idxTableDefs[i].TblId, + ObjRef: idxObjRefs[i], + PrimaryColIdxInBat: int32(oldIdxPos), + PrimaryColRelPos: finalProjTag, + PrimaryColTyp: finalProjList[oldIdxPos].Typ, + }) + } + } + + lastNodeID = builder.appendNode(&plan.Node{ + NodeType: plan.Node_PROJECT, + Children: []int32{lastNodeID}, + ProjectList: finalProjList, + BindingTags: []int32{finalProjTag}, + }, bindCtx) + + if len(lockTargets) > 0 { + lastNodeID = builder.appendNode(&plan.Node{ + NodeType: plan.Node_LOCK_OP, + Children: []int32{lastNodeID}, + TableDef: tableDef, + BindingTags: []int32{builder.genNewTag()}, + LockTargets: lockTargets, + }, bindCtx) + reCheckifNeedLockWholeTable(builder) + } + + lastNodeID = builder.appendNode(&plan.Node{ + NodeType: plan.Node_MULTI_UPDATE, + Children: []int32{lastNodeID}, + BindingTags: []int32{builder.genNewTag()}, + UpdateCtxList: updateCtxList, + }, bindCtx) + + return lastNodeID, nil +} + +func (builder *QueryBuilder) appendNodesForReplaceStmt( + bindCtx *BindContext, + lastNodeID int32, + tableDef *TableDef, + objRef *ObjectRef, + insertColToExpr map[string]*Expr, +) (int32, map[string]int32, []bool, error) { + colName2Idx := make(map[string]int32) + hasAutoCol := false + for _, col := range tableDef.Cols { + if col.Typ.AutoIncr { + hasAutoCol = true + break + } + } + + projList1 := make([]*plan.Expr, 0, len(tableDef.Cols)-1) + projList2 := make([]*plan.Expr, 0, len(tableDef.Cols)-1) + projTag1 := builder.genNewTag() + preInsertTag := builder.genNewTag() + + var ( + compPkeyExpr *plan.Expr + clusterByExpr *plan.Expr + ) + + columnIsNull := make(map[string]bool) + + for i, col := range tableDef.Cols { + if oldExpr, exists := insertColToExpr[col.Name]; exists { + projList2 = append(projList2, &plan.Expr{ + Typ: oldExpr.Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: projTag1, + ColPos: int32(len(projList1)), + }, + }, + }) + projList1 = append(projList1, oldExpr) + } else if col.Name == catalog.Row_ID { + continue + } else if col.Name == catalog.CPrimaryKeyColName { + //args := make([]*plan.Expr, len(tableDef.Pkey.Names)) + // + //for k, part := range tableDef.Pkey.Names { + // args[k] = DeepCopyExpr(insertColToExpr[part]) + //} + // + //compPkeyExpr, _ = BindFuncExprImplByPlanExpr(builder.GetContext(), "serial", args) + compPkeyExpr = makeCompPkeyExpr(tableDef, tableDef.Name2ColIndex) + projList2 = append(projList2, &plan.Expr{ + Typ: compPkeyExpr.Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: preInsertTag, + ColPos: 0, + }, + }, + }) + } else if tableDef.ClusterBy != nil && col.Name == tableDef.ClusterBy.Name { + //names := util.SplitCompositeClusterByColumnName(tableDef.ClusterBy.Name) + //args := make([]*plan.Expr, len(names)) + // + //for k, part := range names { + // args[k] = DeepCopyExpr(insertColToExpr[part]) + //} + // + //clusterByExpr, _ = BindFuncExprImplByPlanExpr(builder.GetContext(), "serial_full", args) + clusterByExpr = makeClusterByExpr(tableDef, tableDef.Name2ColIndex) + projList2 = append(projList2, &plan.Expr{ + Typ: clusterByExpr.Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: preInsertTag, + ColPos: 0, + }, + }, + }) + } else { + defExpr, err := getDefaultExpr(builder.GetContext(), col) + if err != nil { + return 0, nil, nil, err + } + + if !col.Typ.AutoIncr { + if lit := defExpr.GetLit(); lit != nil { + if lit.Isnull { + columnIsNull[col.Name] = true + } + } + } + + projList2 = append(projList2, &plan.Expr{ + Typ: defExpr.Typ, + Expr: &plan.Expr_Col{ + Col: &plan.ColRef{ + RelPos: projTag1, + ColPos: int32(len(projList1)), + }, + }, + }) + projList1 = append(projList1, defExpr) + } + + colName2Idx[tableDef.Name+"."+col.Name] = int32(i) + } + + skipUniqueIdx := make([]bool, len(tableDef.Indexes)) + pkName := tableDef.Pkey.PkeyColName + pkPos := tableDef.Name2ColIndex[pkName] + for i, idxDef := range tableDef.Indexes { + if !idxDef.TableExist { + continue + } + + skipUniqueIdx[i] = true + for _, part := range idxDef.Parts { + if !columnIsNull[catalog.ResolveAlias(part)] { + skipUniqueIdx[i] = false + break + } + } + + idxTableName := idxDef.IndexTableName + colName2Idx[idxTableName+"."+catalog.IndexTablePrimaryColName] = pkPos + argsLen := len(idxDef.Parts) + if argsLen == 1 { + colName2Idx[idxTableName+"."+catalog.IndexTableIndexColName] = colName2Idx[tableDef.Name+"."+idxDef.Parts[0]] + } else { + args := make([]*plan.Expr, argsLen) + + var colPos int32 + var ok bool + for k := 0; k < argsLen; k++ { + if colPos, ok = colName2Idx[tableDef.Name+"."+catalog.ResolveAlias(idxDef.Parts[k])]; !ok { + errMsg := fmt.Sprintf("bind insert err, can not find colName = %s", idxDef.Parts[k]) + return 0, nil, nil, moerr.NewInternalError(builder.GetContext(), errMsg) + } + args[k] = DeepCopyExpr(projList2[colPos]) + } + + funcName := "serial" + if !idxDef.Unique { + funcName = "serial_full" + } + idxExpr, _ := BindFuncExprImplByPlanExpr(builder.GetContext(), funcName, args) + colName2Idx[idxTableName+"."+catalog.IndexTableIndexColName] = int32(len(projList2)) + projList2 = append(projList2, idxExpr) + } + } + + tmpCtx := NewBindContext(builder, bindCtx) + lastNodeID = builder.appendNode(&plan.Node{ + NodeType: plan.Node_PROJECT, + ProjectList: projList1, + Children: []int32{lastNodeID}, + BindingTags: []int32{projTag1}, + }, tmpCtx) + + if hasAutoCol || compPkeyExpr != nil || clusterByExpr != nil { + lastNodeID = builder.appendNode(&plan.Node{ + NodeType: plan.Node_PRE_INSERT, + Children: []int32{lastNodeID}, + PreInsertCtx: &plan.PreInsertCtx{ + Ref: objRef, + TableDef: tableDef, + HasAutoCol: hasAutoCol, + CompPkeyExpr: compPkeyExpr, + ClusterByExpr: clusterByExpr, + }, + BindingTags: []int32{preInsertTag}, + }, tmpCtx) + } + + lastNodeID = builder.appendNode(&plan.Node{ + NodeType: plan.Node_PROJECT, + ProjectList: projList2, + Children: []int32{lastNodeID}, + BindingTags: []int32{builder.genNewTag()}, + }, tmpCtx) + + return lastNodeID, colName2Idx, skipUniqueIdx, nil +} diff --git a/pkg/sql/plan/build.go b/pkg/sql/plan/build.go index 0cc77c5188b49..58680edc4b0f1 100644 --- a/pkg/sql/plan/build.go +++ b/pkg/sql/plan/build.go @@ -93,6 +93,41 @@ func bindAndOptimizeInsertQuery(ctx CompilerContext, stmt *tree.Insert, isPrepar }, err } +func bindAndOptimizeReplaceQuery(ctx CompilerContext, stmt *tree.Replace, isPrepareStmt bool, skipStats bool) (*Plan, error) { + start := time.Now() + defer func() { + v2.TxnStatementBuildInsertHistogram.Observe(time.Since(start).Seconds()) + }() + + builder := NewQueryBuilder(plan.Query_INSERT, ctx, isPrepareStmt, true) + builder.parseOptimizeHints() + bindCtx := NewBindContext(builder, nil) + if IsSnapshotValid(ctx.GetSnapshot()) { + bindCtx.snapshot = ctx.GetSnapshot() + } + + rootId, err := builder.bindReplace(stmt, bindCtx) + if err != nil { + if err.(*moerr.Error).ErrorCode() == moerr.ErrUnsupportedDML { + return buildReplace(stmt, ctx, false, isPrepareStmt) + } + return nil, err + } + ctx.SetViews(bindCtx.views) + + builder.qry.Steps = append(builder.qry.Steps, rootId) + builder.skipStats = skipStats + query, err := builder.createQuery() + if err != nil { + return nil, err + } + return &Plan{ + Plan: &plan.Plan_Query{ + Query: query, + }, + }, err +} + func bindAndOptimizeLoadQuery(ctx CompilerContext, stmt *tree.Load, isPrepareStmt bool, skipStats bool) (*Plan, error) { // return buildLoad(stmt, ctx, isPrepareStmt) start := time.Now() @@ -163,10 +198,6 @@ func bindAndOptimizeDeleteQuery(ctx CompilerContext, stmt *tree.Delete, isPrepar } func bindAndOptimizeUpdateQuery(ctx CompilerContext, stmt *tree.Update, isPrepareStmt bool, skipStats bool) (*Plan, error) { - // if !isExplain { - // return buildTableUpdate(stmt, ctx, isPrepareStmt) - // } - start := time.Now() defer func() { v2.TxnStatementBuildDeleteHistogram.Observe(time.Since(start).Seconds()) @@ -258,7 +289,7 @@ func BuildPlan(ctx CompilerContext, stmt tree.Statement, isPrepareStmt bool) (*P case *tree.Insert: return bindAndOptimizeInsertQuery(ctx, stmt, isPrepareStmt, false) case *tree.Replace: - return buildReplace(stmt, ctx, isPrepareStmt, false) + return bindAndOptimizeReplaceQuery(ctx, stmt, isPrepareStmt, false) case *tree.Update: return bindAndOptimizeUpdateQuery(ctx, stmt, isPrepareStmt, false) case *tree.Delete: diff --git a/test/distributed/cases/dml/replace/replace.result b/test/distributed/cases/dml/replace/replace.result index aa152a9e28e18..b21f00ce3b172 100644 --- a/test/distributed/cases/dml/replace/replace.result +++ b/test/distributed/cases/dml/replace/replace.result @@ -44,10 +44,23 @@ replace into t4 values (1, 'b'); select * from t4; a b 1 b +drop table if exists t1; +create table t1(a int primary key, b int unique, c varchar(255), key(c)); +insert into t1 values (1,1,"1"), (2,2,"2"); +select * from t1; +a b c +1 1 1 +2 2 2 +replace into t1 values (1,4,"4"), (3,3,"3"); +select * from t1; +a b c +2 2 2 +1 4 4 +3 3 3 create database replace_db; use replace_db; replace into `replace`.`names` values (2, "Dylan", 20); select name, age from `replace`.`names` where id = 2; name age Dylan 20 -drop database replace_db; \ No newline at end of file +drop database replace_db; diff --git a/test/distributed/cases/dml/replace/replace.test b/test/distributed/cases/dml/replace/replace.test index 77dadb3b7a4fe..64ad7859c6900 100644 --- a/test/distributed/cases/dml/replace/replace.test +++ b/test/distributed/cases/dml/replace/replace.test @@ -28,8 +28,16 @@ replace into t4 values (1, 'a'); select * from t4; replace into t4 values (1, 'b'); select * from t4; + +drop table if exists t1; +create table t1(a int primary key, b int unique, c varchar(255), key(c)); +insert into t1 values (1,1,"1"), (2,2,"2"); +select * from t1; +replace into t1 values (1,4,"4"), (3,3,"3"); +select * from t1; + create database replace_db; use replace_db; replace into `replace`.`names` values (2, "Dylan", 20); select name, age from `replace`.`names` where id = 2; -drop database replace_db; \ No newline at end of file +drop database replace_db;