Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Old strategy) Resource Isolation Feature Checkpoint 2: Selection of compute node considering resource isolation group #9

Draft
wants to merge 6 commits into
base: pinterest-integration-3.3
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Usage: $0 <options>
--without-starcache
build Backend without starcache library
-j build Backend parallel
--output-compile-time
--output-compile-time
save a list of the compile time for every C++ file in ${ROOT}/compile_times.txt.
Turning this option on automatically disables ccache.

Expand Down Expand Up @@ -350,7 +350,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then
fi
export STARLET_INSTALL_DIR
fi

if [ "${OUTPUT_COMPILE_TIME}" == "ON" ]; then
rm -f ${ROOT}/compile_times.txt
CXX_COMPILER_LAUNCHER=${ROOT}/build-support/compile_time.sh
Expand Down Expand Up @@ -568,4 +568,4 @@ if [[ ! -z ${STARROCKS_POST_BUILD_HOOK} ]]; then
eval ${STARROCKS_POST_BUILD_HOOK}
fi

exit 0
exit 0
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public enum ErrorCode {
ERR_WAREHOUSE_SUSPENDED(10003, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse %s has been suspended."),
ERR_WAREHOUSE_UNAVAILABLE(10004, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse %s is not available."),
ERR_NO_NODES_IN_WAREHOUSE(10005, new byte[] {'4', '2', '0', '0', '0'},
"No alive backend or compute node in warehouse %s."),
"No alive backend or compute node in warehouse %s. Also possible that there are no CN of the " +
"resource isolation group matching the FE."),
ERR_INVALID_WAREHOUSE_NAME(10006, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse name can not be null or empty"),

ERR_NOT_SUPPORTED_STATEMENT_IN_SHARED_NOTHING_MODE(10007, new byte[] {'4', '2', '0', '0', '0'},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.starrocks.lake;

import java.util.Objects;

public class ResourceIsolationGroupUtils {
public static final String DEFAULT_RESOURCE_ISOLATION_GROUP_ID = "";

public static boolean resourceIsolationGroupMatches(String rig1, String rig2) {
if (Objects.equals(rig1, rig2)) {
return true;
}
boolean unset1 = rig1 == null || rig1.isEmpty();
boolean unset2 = rig2 == null || rig2.isEmpty();
return unset1 && unset2;
}
}
8 changes: 8 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/lake/StarOSAgent.java
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,15 @@ public Set<Long> getAllNodeIdsByShard(long shardId, long workerGroupId, boolean
}

public Set<Long> getAllNodeIdsByShard(ShardInfo shardInfo, boolean onlyPrimary) {
// TODO(cbrennan) Here we'd want the shardInfo to include resource isolation group info, so we could get a
// different node for the shard depending on its resource isolation group. One problem is that
// getReplicaInfoList is a part of the staros module, which is not modifiable.
// Failed idea 1. Another workaround would be not filter using onlyPrimary, and then filter down later using
// node's resourceGroupId. However, it appears that the staros is returning only one replica even before the
// stream/filter on replica role
// Right now, seeing that we're only getting one replica for each shard.
List<ReplicaInfo> replicas = shardInfo.getReplicaInfoList();

if (onlyPrimary) {
replicas = replicas.stream().filter(x -> x.getReplicaRole() == ReplicaRole.PRIMARY)
.collect(Collectors.toList());
Expand Down
1 change: 1 addition & 0 deletions fe/fe-core/src/main/java/com/starrocks/lake/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public static void publishVersionBatch(@NotNull List<Tablet> tablets, List<TxnIn
}

for (Tablet tablet : tablets) {
// TODO(cbrennan) I believe this is where we should enforce group matching for version vacuuming.
ComputeNode computeNode = GlobalStateMgr.getCurrentState().getWarehouseMgr()
.getComputeNodeAssignedToTablet(warehouseId, (LakeTablet) tablet);
if (computeNode == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.function.IntSupplier;
import java.util.stream.Collectors;

import static com.starrocks.lake.ResourceIsolationGroupUtils.resourceIsolationGroupMatches;

/**
* WorkerProvider for SHARED_DATA mode. Compared to its counterpart for SHARED_NOTHING mode:
* 1. All Backends and ComputeNodes are treated the same as ComputeNodes.
Expand All @@ -71,7 +73,11 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService
int numUsedComputeNodes,
ComputationFragmentSchedulingPolicy computationFragmentSchedulingPolicy,
long warehouseId) {

String thisFeResourceIsolationGroup = GlobalStateMgr.getCurrentState().
getNodeMgr().getMySelf().getResourceIsolationGroup();
return captureAvailableWorkers(warehouseId, thisFeResourceIsolationGroup);
}
public DefaultSharedDataWorkerProvider captureAvailableWorkers(long warehouseId, String resourceIsolationGroup) {
WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
ImmutableMap.Builder<Long, ComputeNode> builder = ImmutableMap.builder();
List<Long> computeNodeIds = warehouseManager.getAllComputeNodeIds(warehouseId);
Expand All @@ -82,23 +88,26 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService
LOG.debug("idToComputeNode: {}", idToComputeNode);
}

ImmutableMap<Long, ComputeNode> availableComputeNodes = filterAvailableWorkers(idToComputeNode);
ImmutableMap<Long, ComputeNode> availableComputeNodes = filterAvailableWorkers(idToComputeNode,
resourceIsolationGroup);
if (availableComputeNodes.isEmpty()) {
Warehouse warehouse = warehouseManager.getWarehouse(warehouseId);
throw ErrorReportException.report(ErrorCode.ERR_NO_NODES_IN_WAREHOUSE, warehouse.getName());
}

return new DefaultSharedDataWorkerProvider(idToComputeNode, availableComputeNodes);
return new DefaultSharedDataWorkerProvider(idToComputeNode, availableComputeNodes,
resourceIsolationGroup);
}
}

/**
* All the compute nodes (including backends), including those that are not alive or in block list.
* All the compute nodes (including backends), including those that are not alive, in block list, and not matching
* the resource isolation group of the current frontend.
*/
private final ImmutableMap<Long, ComputeNode> id2ComputeNode;
/**
* The available compute nodes, which are alive and not in the block list when creating the snapshot. It is still
* possible that the node becomes unavailable later, it will be checked again in some of the interfaces.
* If we're using resource isolation groups, this only includes ComputeNodes of the same group as the frontend.
*/
private final ImmutableMap<Long, ComputeNode> availableID2ComputeNode;

Expand All @@ -109,16 +118,27 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService

private final Set<Long> selectedWorkerIds;

private final String resourceIsolationGroup;

@VisibleForTesting
public DefaultSharedDataWorkerProvider(ImmutableMap<Long, ComputeNode> id2ComputeNode,
ImmutableMap<Long, ComputeNode> availableID2ComputeNode
ImmutableMap<Long, ComputeNode> availableID2ComputeNode,
String resourceIsolationGroup
) {
this.id2ComputeNode = id2ComputeNode;
this.availableID2ComputeNode = availableID2ComputeNode;
this.selectedWorkerIds = Sets.newConcurrentHashSet();
this.resourceIsolationGroup = resourceIsolationGroup;
this.allComputeNodeIds = null;
}

@VisibleForTesting
public DefaultSharedDataWorkerProvider(ImmutableMap<Long, ComputeNode> id2ComputeNode,
ImmutableMap<Long, ComputeNode> availableID2ComputeNode
) {
this(id2ComputeNode, availableID2ComputeNode, null);
}

@Override
public long selectNextWorker() throws NonRecoverableException {
ComputeNode worker;
Expand Down Expand Up @@ -244,9 +264,12 @@ public long selectBackupWorker(long workerId) {
public String toString() {
StringBuilder out = new StringBuilder("compute node: ");
id2ComputeNode.forEach((backendID, backend) -> out.append(
String.format("[%s alive: %b, available: %b, inBlacklist: %b] ", backend.getHost(),
String.format("[%s alive: %b, available: %b, inBlacklist: %b, resourceIsolationGroupMatch: %b] ",
backend.getHost(),
backend.isAlive(), availableID2ComputeNode.containsKey(backendID),
SimpleScheduler.isInBlocklist(backendID))));
SimpleScheduler.isInBlocklist(backendID),
resourceIsolationGroupMatches(this.resourceIsolationGroup,
backend.getResourceIsolationGroup()))));
return out.toString();
}

Expand All @@ -270,10 +293,12 @@ private static ComputeNode getNextWorker(ImmutableMap<Long, ComputeNode> workers
return workers.values().asList().get(index);
}

private static ImmutableMap<Long, ComputeNode> filterAvailableWorkers(ImmutableMap<Long, ComputeNode> workers) {
private static ImmutableMap<Long, ComputeNode> filterAvailableWorkers(ImmutableMap<Long, ComputeNode> workers,
String thisFeResourceIsolationGroup) {
ImmutableMap.Builder<Long, ComputeNode> builder = new ImmutableMap.Builder<>();
for (Map.Entry<Long, ComputeNode> entry : workers.entrySet()) {
if (entry.getValue().isAlive() && !SimpleScheduler.isInBlocklist(entry.getKey())) {
if (entry.getValue().isAlive() && !SimpleScheduler.isInBlocklist(entry.getKey()) &&
resourceIsolationGroupMatches(thisFeResourceIsolationGroup, entry.getValue().getResourceIsolationGroup())) {
builder.put(entry);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ public void addScanRangeLocations(Partition partition,
List<Replica> allQueryableReplicas = Lists.newArrayList();
List<Replica> localReplicas = Lists.newArrayList();
if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA) {
/* TODO(cbrennan) This would be best if it could return only replicas belonging to CNs in this FE's
resource isolation group. */
tablet.getQueryableReplicas(allQueryableReplicas, localReplicas,
visibleVersion, localBeId, schemaHash, warehouseId);
} else {
Expand Down Expand Up @@ -608,6 +610,9 @@ public void addScanRangeLocations(Partition partition,
internalRange.setFill_data_cache(fillDataCache);
tabletIsNull = false;

// TODO(cbrennan) turn this into a debug statement once we've confirmed that we have a stable mapping for each group.
LOG.info("Ideally, tablet {} mapped to backend id {}", tablet.getId(), replica.getBackendId());

// for CBO
if (!collectedStat && replica.getRowCount() != -1) {
actualRows += replica.getRowCount();
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ protected Expr castToSlot(SlotDescriptor slotDesc, Expr expr) throws UserExcepti
* only applicable to HDFS; less than or equal to zero means no
* maximum.
*/
// TODO(cbrennan): When we want to have multiple replicas in shared-data mode, or if we want official assignments of
// scan ranges to one node per group, we'll ultimately need to change what this returns. Currently it appears that
// this will be a somewhat involved process, left for v2. For our purposes, this should be done first for the
// OlapScanNode.
public abstract List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public static TQueryGlobals genQueryGlobals(Instant startTime, String timezone)
}

private WorkerProvider.Factory newWorkerProviderFactory() {
// TODO(cbrennan): In a future implementation of resource isolation groups, we could create an entirely new
// class and if all the compute nodes are ungrouped or no compute node matches the frontend group, we return
// the Default.
if (RunMode.isSharedDataMode()) {
return new DefaultSharedDataWorkerProvider.Factory();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,16 @@ public void computeScanRangeAssignment() throws UserException {
if (!workerProvider.isDataNodeAvailable(location.getBackend_id())) {
if (workerProvider.allowUsingBackupNode()) {
long backupNodeId = workerProvider.selectBackupWorker(location.getBackend_id());
LOG.debug("Select a backup node:{} for node:{}", backupNodeId, location.getBackend_id());
if (backupNodeId > 0) {
// using the backupNode to generate a new ScanRangeLocation
TScanRangeLocation backupLocation = new TScanRangeLocation();
backupLocation.setBackend_id(backupNodeId);
backupLocation.setServer(workerProvider.getWorkerById(backupNodeId).getAddress());
backupLocations.add(backupLocation);
// TODO(cbrennan) turn into a debug statement and move it back up when done testing
LOG.info("Select a backup node:{} for node:{} {}", backupNodeId,
location.getBackend_id(),
backupLocation.getServer());
}
}
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ public ComputeNode getComputeNodeAssignedToTablet(String warehouseName, LakeTabl
return getComputeNodeAssignedToTablet(warehouse.getId(), tablet);
}

// TODO(cbrennan) Ensure that the ComputeNode returned here is checked for its resource group id if appropriate.
public ComputeNode getComputeNodeAssignedToTablet(Long warehouseId, LakeTablet tablet) {
Long computeNodeId = getComputeNodeId(warehouseId, tablet);
if (computeNodeId == null) {
Expand Down
14 changes: 12 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/system/NodeSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.starrocks.clone.TabletChecker;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.thrift.TStorageMedium;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -40,6 +41,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static com.starrocks.lake.ResourceIsolationGroupUtils.resourceIsolationGroupMatches;

/**
* Select nodes when creating table or loading data.
*/
Expand Down Expand Up @@ -91,12 +94,19 @@ public List<Long> seqChooseBackendIds(int backendNum, boolean needAvailable,
}
}


public List<Long> seqChooseComputeNodes(int computeNodeNum, boolean needAvailable, boolean isCreate) {

final List<ComputeNode> candidateComputeNodes =
List<ComputeNode> candidateComputeNodes =
needAvailable ? systemInfoService.getAvailableComputeNodes() : systemInfoService.getComputeNodes();

String thisFeRig = GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf().getResourceIsolationGroup();
candidateComputeNodes = candidateComputeNodes.stream().
filter(cn -> resourceIsolationGroupMatches(cn.getResourceIsolationGroup(), thisFeRig)).
collect(Collectors.toList());
if (CollectionUtils.isEmpty(candidateComputeNodes)) {
LOG.warn("failed to find any compute nodes, needAvailable={}", needAvailable);
LOG.warn("failed to find any compute nodes, needAvailable={}, resourceIsolationGroup={}",
needAvailable, thisFeRig);
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import java.util.stream.Stream;

import static com.starrocks.common.util.PropertyAnalyzer.getResourceIsolationGroupFromProperties;
import static com.starrocks.lake.ResourceIsolationGroupUtils.DEFAULT_RESOURCE_ISOLATION_GROUP_ID;

public class SystemInfoService implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(SystemInfoService.class);
Expand All @@ -108,6 +109,8 @@ public class SystemInfoService implements GsonPostProcessable {
@SerializedName(value = "be")
protected volatile ConcurrentHashMap<Long, Backend> idToBackendRef;

// TODO(cbrennan) Trace all usages of the ComputeNode references, make sure their resource isolation group
// information is being used appropriately.
@SerializedName(value = "ce")
protected volatile ConcurrentHashMap<Long, ComputeNode> idToComputeNodeRef;

Expand Down Expand Up @@ -315,7 +318,7 @@ public ShowResultSet modifyComputeNodeProperty(ModifyComputeNodeClause modifyCom
if (entry.getKey().equals(AlterSystemStmtAnalyzer.PROP_KEY_GROUP)) {
// "" means clean group label
if (entry.getValue().isEmpty()) {
computeNode.setResourceIsolationGroup("");
computeNode.setResourceIsolationGroup(DEFAULT_RESOURCE_ISOLATION_GROUP_ID);
continue;
}
String oldResourceIsolationGroup = computeNode.getResourceIsolationGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ public boolean publishPartitionBatch(Database db, long tableId, long partitionId

// used to delete txnLog when publish success
Map<ComputeNode, List<Long>> nodeToTablets = new HashMap<>();
// TODO(cbrennan): See here for entrypoint into getting nodes for vacuuming.
Utils.publishVersionBatch(publishTablets, txnInfos,
startVersion - 1, endVersion, compactionScores,
warehouseId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import com.starrocks.sql.ast.DropBackendClause;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
import com.starrocks.system.Frontend;
import com.starrocks.system.NodeSelector;
import com.starrocks.system.SystemInfoService;
import mockit.Expectations;
Expand Down Expand Up @@ -398,6 +399,16 @@ public void testSeqChooseComputeNodes() {
result = analyzer;
}
};

Frontend thisFe = new Frontend();
NodeMgr nodeMgr = GlobalStateMgr.getCurrentState().getNodeMgr();
new Expectations(nodeMgr) {
{
nodeMgr.getMySelf();
result = thisFe;
minTimes = 0;
}
};
com.starrocks.sql.analyzer.Analyzer.analyze(new AlterSystemStmt(stmt), new ConnectContext(null));

try {
Expand Down
Loading
Loading