diff --git a/build.sh b/build.sh index feac5769c5808..39bf3ba2923ad 100755 --- a/build.sh +++ b/build.sh @@ -95,7 +95,7 @@ Usage: $0 --without-starcache build Backend without starcache library -j build Backend parallel - --output-compile-time + --output-compile-time save a list of the compile time for every C++ file in ${ROOT}/compile_times.txt. Turning this option on automatically disables ccache. @@ -350,7 +350,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then fi export STARLET_INSTALL_DIR fi - + if [ "${OUTPUT_COMPILE_TIME}" == "ON" ]; then rm -f ${ROOT}/compile_times.txt CXX_COMPILER_LAUNCHER=${ROOT}/build-support/compile_time.sh @@ -568,4 +568,4 @@ if [[ ! -z ${STARROCKS_POST_BUILD_HOOK} ]]; then eval ${STARROCKS_POST_BUILD_HOOK} fi -exit 0 +exit 0 \ No newline at end of file diff --git a/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java b/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java index 1a6d1364287f0..52ffa415edc8a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java @@ -342,7 +342,8 @@ public enum ErrorCode { ERR_WAREHOUSE_SUSPENDED(10003, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse %s has been suspended."), ERR_WAREHOUSE_UNAVAILABLE(10004, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse %s is not available."), ERR_NO_NODES_IN_WAREHOUSE(10005, new byte[] {'4', '2', '0', '0', '0'}, - "No alive backend or compute node in warehouse %s."), + "No alive backend or compute node in warehouse %s. Also possible that there are no CN of the " + + "resource isolation group matching the FE."), ERR_INVALID_WAREHOUSE_NAME(10006, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse name can not be null or empty"), ERR_NOT_SUPPORTED_STATEMENT_IN_SHARED_NOTHING_MODE(10007, new byte[] {'4', '2', '0', '0', '0'}, diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/ResourceIsolationGroupUtils.java b/fe/fe-core/src/main/java/com/starrocks/lake/ResourceIsolationGroupUtils.java new file mode 100644 index 0000000000000..5a76c57db8dc7 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/lake/ResourceIsolationGroupUtils.java @@ -0,0 +1,16 @@ +package com.starrocks.lake; + +import java.util.Objects; + +public class ResourceIsolationGroupUtils { + public static final String DEFAULT_RESOURCE_ISOLATION_GROUP_ID = ""; + + public static boolean resourceIsolationGroupMatches(String rig1, String rig2) { + if (Objects.equals(rig1, rig2)) { + return true; + } + boolean unset1 = rig1 == null || rig1.isEmpty(); + boolean unset2 = rig2 == null || rig2.isEmpty(); + return unset1 && unset2; + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java b/fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java index aec2cabc03118..f1cb6ab78f867 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java @@ -614,7 +614,15 @@ public Set getAllNodeIdsByShard(long shardId, long workerGroupId, boolean } public Set getAllNodeIdsByShard(ShardInfo shardInfo, boolean onlyPrimary) { + // TODO(cbrennan) Here we'd want the shardInfo to include resource isolation group info, so we could get a + // different node for the shard depending on its resource isolation group. One problem is that + // getReplicaInfoList is a part of the staros module, which is not modifiable. + // Failed idea 1. Another workaround would be not filter using onlyPrimary, and then filter down later using + // node's resourceGroupId. However, it appears that the staros is returning only one replica even before the + // stream/filter on replica role + // Right now, seeing that we're only getting one replica for each shard. List replicas = shardInfo.getReplicaInfoList(); + if (onlyPrimary) { replicas = replicas.stream().filter(x -> x.getReplicaRole() == ReplicaRole.PRIMARY) .collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/Utils.java b/fe/fe-core/src/main/java/com/starrocks/lake/Utils.java index 14ed7713e35b6..13c55720b2e1e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/Utils.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/Utils.java @@ -114,6 +114,7 @@ public static void publishVersionBatch(@NotNull List tablets, List builder = ImmutableMap.builder(); List computeNodeIds = warehouseManager.getAllComputeNodeIds(warehouseId); @@ -82,23 +88,26 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService LOG.debug("idToComputeNode: {}", idToComputeNode); } - ImmutableMap availableComputeNodes = filterAvailableWorkers(idToComputeNode); + ImmutableMap availableComputeNodes = filterAvailableWorkers(idToComputeNode, + resourceIsolationGroup); if (availableComputeNodes.isEmpty()) { Warehouse warehouse = warehouseManager.getWarehouse(warehouseId); throw ErrorReportException.report(ErrorCode.ERR_NO_NODES_IN_WAREHOUSE, warehouse.getName()); } - - return new DefaultSharedDataWorkerProvider(idToComputeNode, availableComputeNodes); + return new DefaultSharedDataWorkerProvider(idToComputeNode, availableComputeNodes, + resourceIsolationGroup); } } /** - * All the compute nodes (including backends), including those that are not alive or in block list. + * All the compute nodes (including backends), including those that are not alive, in block list, and not matching + * the resource isolation group of the current frontend. */ private final ImmutableMap id2ComputeNode; /** * The available compute nodes, which are alive and not in the block list when creating the snapshot. It is still * possible that the node becomes unavailable later, it will be checked again in some of the interfaces. + * If we're using resource isolation groups, this only includes ComputeNodes of the same group as the frontend. */ private final ImmutableMap availableID2ComputeNode; @@ -109,16 +118,27 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService private final Set selectedWorkerIds; + private final String resourceIsolationGroup; + @VisibleForTesting public DefaultSharedDataWorkerProvider(ImmutableMap id2ComputeNode, - ImmutableMap availableID2ComputeNode + ImmutableMap availableID2ComputeNode, + String resourceIsolationGroup ) { this.id2ComputeNode = id2ComputeNode; this.availableID2ComputeNode = availableID2ComputeNode; this.selectedWorkerIds = Sets.newConcurrentHashSet(); + this.resourceIsolationGroup = resourceIsolationGroup; this.allComputeNodeIds = null; } + @VisibleForTesting + public DefaultSharedDataWorkerProvider(ImmutableMap id2ComputeNode, + ImmutableMap availableID2ComputeNode + ) { + this(id2ComputeNode, availableID2ComputeNode, null); + } + @Override public long selectNextWorker() throws NonRecoverableException { ComputeNode worker; @@ -244,9 +264,12 @@ public long selectBackupWorker(long workerId) { public String toString() { StringBuilder out = new StringBuilder("compute node: "); id2ComputeNode.forEach((backendID, backend) -> out.append( - String.format("[%s alive: %b, available: %b, inBlacklist: %b] ", backend.getHost(), + String.format("[%s alive: %b, available: %b, inBlacklist: %b, resourceIsolationGroupMatch: %b] ", + backend.getHost(), backend.isAlive(), availableID2ComputeNode.containsKey(backendID), - SimpleScheduler.isInBlocklist(backendID)))); + SimpleScheduler.isInBlocklist(backendID), + resourceIsolationGroupMatches(this.resourceIsolationGroup, + backend.getResourceIsolationGroup())))); return out.toString(); } @@ -270,10 +293,12 @@ private static ComputeNode getNextWorker(ImmutableMap workers return workers.values().asList().get(index); } - private static ImmutableMap filterAvailableWorkers(ImmutableMap workers) { + private static ImmutableMap filterAvailableWorkers(ImmutableMap workers, + String thisFeResourceIsolationGroup) { ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); for (Map.Entry entry : workers.entrySet()) { - if (entry.getValue().isAlive() && !SimpleScheduler.isInBlocklist(entry.getKey())) { + if (entry.getValue().isAlive() && !SimpleScheduler.isInBlocklist(entry.getKey()) && + resourceIsolationGroupMatches(thisFeResourceIsolationGroup, entry.getValue().getResourceIsolationGroup())) { builder.put(entry); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java index 6c48ceb5066bb..3d7e56940c383 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/OlapScanNode.java @@ -539,6 +539,8 @@ public void addScanRangeLocations(Partition partition, List allQueryableReplicas = Lists.newArrayList(); List localReplicas = Lists.newArrayList(); if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA) { + /* TODO(cbrennan) This would be best if it could return only replicas belonging to CNs in this FE's + resource isolation group. */ tablet.getQueryableReplicas(allQueryableReplicas, localReplicas, visibleVersion, localBeId, schemaHash, warehouseId); } else { @@ -608,6 +610,9 @@ public void addScanRangeLocations(Partition partition, internalRange.setFill_data_cache(fillDataCache); tabletIsNull = false; + // TODO(cbrennan) turn this into a debug statement once we've confirmed that we have a stable mapping for each group. + LOG.info("Ideally, tablet {} mapped to backend id {}", tablet.getId(), replica.getBackendId()); + // for CBO if (!collectedStat && replica.getRowCount() != -1) { actualRows += replica.getRowCount(); diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java index 713c80739b07c..b81b85c3958f6 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java @@ -115,6 +115,10 @@ protected Expr castToSlot(SlotDescriptor slotDesc, Expr expr) throws UserExcepti * only applicable to HDFS; less than or equal to zero means no * maximum. */ + // TODO(cbrennan): When we want to have multiple replicas in shared-data mode, or if we want official assignments of + // scan ranges to one node per group, we'll ultimately need to change what this returns. Currently it appears that + // this will be a somewhat involved process, left for v2. For our purposes, this should be done first for the + // OlapScanNode. public abstract List getScanRangeLocations(long maxScanRangeLength); @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java b/fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java index 43cf806d6952a..d0fbe7a62b933 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java @@ -143,6 +143,9 @@ public static TQueryGlobals genQueryGlobals(Instant startTime, String timezone) } private WorkerProvider.Factory newWorkerProviderFactory() { + // TODO(cbrennan): In a future implementation of resource isolation groups, we could create an entirely new + // class and if all the compute nodes are ungrouped or no compute node matches the frontend group, we return + // the Default. if (RunMode.isSharedDataMode()) { return new DefaultSharedDataWorkerProvider.Factory(); } else { diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/NormalBackendSelector.java b/fe/fe-core/src/main/java/com/starrocks/qe/NormalBackendSelector.java index 6ad4a9ffc83ba..591ca2a0a9c87 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/NormalBackendSelector.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/NormalBackendSelector.java @@ -81,13 +81,16 @@ public void computeScanRangeAssignment() throws UserException { if (!workerProvider.isDataNodeAvailable(location.getBackend_id())) { if (workerProvider.allowUsingBackupNode()) { long backupNodeId = workerProvider.selectBackupWorker(location.getBackend_id()); - LOG.debug("Select a backup node:{} for node:{}", backupNodeId, location.getBackend_id()); if (backupNodeId > 0) { // using the backupNode to generate a new ScanRangeLocation TScanRangeLocation backupLocation = new TScanRangeLocation(); backupLocation.setBackend_id(backupNodeId); backupLocation.setServer(workerProvider.getWorkerById(backupNodeId).getAddress()); backupLocations.add(backupLocation); + // TODO(cbrennan) turn into a debug statement and move it back up when done testing + LOG.info("Select a backup node:{} for node:{} {}", backupNodeId, + location.getBackend_id(), + backupLocation.getServer()); } } continue; diff --git a/fe/fe-core/src/main/java/com/starrocks/server/WarehouseManager.java b/fe/fe-core/src/main/java/com/starrocks/server/WarehouseManager.java index 44cc327e7c330..2e847a24892d3 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/WarehouseManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/WarehouseManager.java @@ -227,6 +227,7 @@ public ComputeNode getComputeNodeAssignedToTablet(String warehouseName, LakeTabl return getComputeNodeAssignedToTablet(warehouse.getId(), tablet); } + // TODO(cbrennan) Ensure that the ComputeNode returned here is checked for its resource group id if appropriate. public ComputeNode getComputeNodeAssignedToTablet(Long warehouseId, LakeTablet tablet) { Long computeNodeId = getComputeNodeId(warehouseId, tablet); if (computeNodeId == null) { diff --git a/fe/fe-core/src/main/java/com/starrocks/system/NodeSelector.java b/fe/fe-core/src/main/java/com/starrocks/system/NodeSelector.java index 341056b350c91..4f2b717711436 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/NodeSelector.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/NodeSelector.java @@ -23,6 +23,7 @@ import com.starrocks.clone.TabletChecker; import com.starrocks.common.Pair; import com.starrocks.common.UserException; +import com.starrocks.server.GlobalStateMgr; import com.starrocks.server.RunMode; import com.starrocks.thrift.TStorageMedium; import org.apache.commons.collections.CollectionUtils; @@ -40,6 +41,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static com.starrocks.lake.ResourceIsolationGroupUtils.resourceIsolationGroupMatches; + /** * Select nodes when creating table or loading data. */ @@ -91,12 +94,19 @@ public List seqChooseBackendIds(int backendNum, boolean needAvailable, } } + public List seqChooseComputeNodes(int computeNodeNum, boolean needAvailable, boolean isCreate) { - final List candidateComputeNodes = + List candidateComputeNodes = needAvailable ? systemInfoService.getAvailableComputeNodes() : systemInfoService.getComputeNodes(); + + String thisFeRig = GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf().getResourceIsolationGroup(); + candidateComputeNodes = candidateComputeNodes.stream(). + filter(cn -> resourceIsolationGroupMatches(cn.getResourceIsolationGroup(), thisFeRig)). + collect(Collectors.toList()); if (CollectionUtils.isEmpty(candidateComputeNodes)) { - LOG.warn("failed to find any compute nodes, needAvailable={}", needAvailable); + LOG.warn("failed to find any compute nodes, needAvailable={}, resourceIsolationGroup={}", + needAvailable, thisFeRig); return Collections.emptyList(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java index 796c89363bf45..f291064dc54dd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/com/starrocks/system/SystemInfoService.java @@ -100,6 +100,7 @@ import java.util.stream.Stream; import static com.starrocks.common.util.PropertyAnalyzer.getResourceIsolationGroupFromProperties; +import static com.starrocks.lake.ResourceIsolationGroupUtils.DEFAULT_RESOURCE_ISOLATION_GROUP_ID; public class SystemInfoService implements GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(SystemInfoService.class); @@ -108,6 +109,8 @@ public class SystemInfoService implements GsonPostProcessable { @SerializedName(value = "be") protected volatile ConcurrentHashMap idToBackendRef; + // TODO(cbrennan) Trace all usages of the ComputeNode references, make sure their resource isolation group + // information is being used appropriately. @SerializedName(value = "ce") protected volatile ConcurrentHashMap idToComputeNodeRef; @@ -315,7 +318,7 @@ public ShowResultSet modifyComputeNodeProperty(ModifyComputeNodeClause modifyCom if (entry.getKey().equals(AlterSystemStmtAnalyzer.PROP_KEY_GROUP)) { // "" means clean group label if (entry.getValue().isEmpty()) { - computeNode.setResourceIsolationGroup(""); + computeNode.setResourceIsolationGroup(DEFAULT_RESOURCE_ISOLATION_GROUP_ID); continue; } String oldResourceIsolationGroup = computeNode.getResourceIsolationGroup(); diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java index 6090b80598420..c2863e0897e3b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java @@ -543,6 +543,7 @@ public boolean publishPartitionBatch(Database db, long tableId, long partitionId // used to delete txnLog when publish success Map> nodeToTablets = new HashMap<>(); + // TODO(cbrennan): See here for entrypoint into getting nodes for vacuuming. Utils.publishVersionBatch(publishTablets, txnInfos, startVersion - 1, endVersion, compactionScores, warehouseId, diff --git a/fe/fe-core/src/test/java/com/starrocks/cluster/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/com/starrocks/cluster/SystemInfoServiceTest.java index 6469c7d54589f..3dadeff1b8c28 100644 --- a/fe/fe-core/src/test/java/com/starrocks/cluster/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/cluster/SystemInfoServiceTest.java @@ -54,6 +54,7 @@ import com.starrocks.sql.ast.DropBackendClause; import com.starrocks.system.Backend; import com.starrocks.system.ComputeNode; +import com.starrocks.system.Frontend; import com.starrocks.system.NodeSelector; import com.starrocks.system.SystemInfoService; import mockit.Expectations; @@ -398,6 +399,16 @@ public void testSeqChooseComputeNodes() { result = analyzer; } }; + + Frontend thisFe = new Frontend(); + NodeMgr nodeMgr = GlobalStateMgr.getCurrentState().getNodeMgr(); + new Expectations(nodeMgr) { + { + nodeMgr.getMySelf(); + result = thisFe; + minTimes = 0; + } + }; com.starrocks.sql.analyzer.Analyzer.analyze(new AlterSystemStmt(stmt), new ConnectContext(null)); try { diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProviderTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProviderTest.java index c4a7914388af4..bd7e6adeb2acd 100644 --- a/fe/fe-core/src/test/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProviderTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/lake/qe/scheduler/DefaultSharedDataWorkerProviderTest.java @@ -37,9 +37,11 @@ import com.starrocks.qe.scheduler.NonRecoverableException; import com.starrocks.qe.scheduler.WorkerProvider; import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.NodeMgr; import com.starrocks.server.WarehouseManager; import com.starrocks.system.Backend; import com.starrocks.system.ComputeNode; +import com.starrocks.system.Frontend; import com.starrocks.system.SystemInfoService; import com.starrocks.thrift.TInternalScanRange; import com.starrocks.thrift.TScanRange; @@ -72,6 +74,7 @@ public class DefaultSharedDataWorkerProviderTest { private Map id2ComputeNode; private Map id2AllNodes; private DefaultSharedDataWorkerProvider.Factory factory; + private Frontend thisFe; private static Map genWorkers(long startId, long endId, Supplier factory) { @@ -117,6 +120,16 @@ public void setUp() { } }; + thisFe = new Frontend(); + NodeMgr nodeMgr = GlobalStateMgr.getCurrentState().getNodeMgr(); + new Expectations(nodeMgr) { + { + nodeMgr.getMySelf(); + result = thisFe; + minTimes = 0; + } + }; + new MockUp() { @Mock public ComputeNode getBackendOrComputeNode(long nodeId) { @@ -127,6 +140,8 @@ public ComputeNode getBackendOrComputeNode(long nodeId) { return node; } }; + + } private WorkerProvider newWorkerProvider() {