Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent-operator: scaling strategies and configurable requirements #893

Merged
merged 7 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.walmartlabs.concord.agentoperator.crd.AgentPool;
import com.walmartlabs.concord.agentoperator.crd.AgentPoolList;
import com.walmartlabs.concord.agentoperator.scheduler.AutoScalerFactory;
import com.walmartlabs.concord.agentoperator.scheduler.Event;
import com.walmartlabs.concord.agentoperator.scheduler.Scheduler;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
Expand Down Expand Up @@ -54,7 +55,8 @@ public static void main(String[] args) {

// TODO use secrets for the token?
Scheduler.Configuration cfg = new Scheduler.Configuration(baseUrl, apiToken);
Scheduler scheduler = new Scheduler(client, cfg);
AutoScalerFactory autoScalerFactory = new AutoScalerFactory(cfg, client);
Scheduler scheduler = new Scheduler(autoScalerFactory, client);
scheduler.start();

// TODO retries
Expand All @@ -74,7 +76,7 @@ public void onClose(WatcherException we) {
});
}

private static final Event.Type actionToEvent(Watcher.Action action) {
private static Event.Type actionToEvent(Watcher.Action action) {
switch (action) {
case ADDED:
case MODIFIED: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* =====
*/

import com.walmartlabs.concord.agentoperator.scheduler.DefaultAutoScaler;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.Pod;

Expand All @@ -43,6 +44,8 @@ public class AgentPoolConfiguration implements Serializable {
private static final int DEFAULT_MAX_SIZE = 10;
private static final int DEFAULT_MIN_SIZE = 1;
private static final int DEFAULT_SIZE = 1;
private static final int DEFAULT_SIZE_INCREMENT = 1;

private static final double DEFAULT_INCREMENT_THRESHOLD_FACTOR = 1.5;
private static final double DEFAULT_DECREMENT_THRESHOLD_FACTOR = 1.0;
private static final double DEFAULT_INCREMENT_PERCENTAGE = 50;
Expand All @@ -55,6 +58,9 @@ public class AgentPoolConfiguration implements Serializable {
private int minSize = DEFAULT_MIN_SIZE;
private int size = DEFAULT_SIZE;

private String autoScaleStrategy = DefaultAutoScaler.NAME;
private int sizeIncrement = DEFAULT_SIZE_INCREMENT;

private int queueQueryLimit = DEFAULT_QUEUE_QUERY_LIMIT;

/**
Expand Down Expand Up @@ -98,10 +104,26 @@ public boolean isAutoScale() {
return autoScale;
}

public int getSizeIncrement() {
return sizeIncrement;
}

public void setSizeIncrement(int sizeIncrement) {
this.sizeIncrement = sizeIncrement;
}

public void setAutoScale(boolean autoScale) {
this.autoScale = autoScale;
}

public String getAutoScaleStrategy() {
return autoScaleStrategy;
}

public void setAutoScaleStrategy(String autoScaleStrategy) {
this.autoScaleStrategy = autoScaleStrategy;
}

public long getScaleUpDelayMs() {
return scaleUpDelayMs;
}
Expand Down Expand Up @@ -215,4 +237,9 @@ private static double getDoubleFromEnv(String key, double defaultValue) {
String envValue = System.getenv(key);
return envValue != null ? Double.parseDouble(envValue) : defaultValue;
}

private static String getStringFromEnv(String key, String defaultValue) {
String envValue = System.getenv(key);
return envValue != null ? envValue : defaultValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@
* =====
*/

import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListMeta;
import io.fabric8.kubernetes.client.CustomResourceList;

import java.util.List;

public class AgentPoolList extends CustomResourceList<AgentPool> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@

public interface Change {

public void apply(KubernetesClient client);
void apply(KubernetesClient client);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,11 @@ public void apply(KubernetesClient client) {
log.error("apply -> error while creating a configmap {}: {}", configMapName, e.getMessage());
}
}

@Override
public String toString() {
return "CreateConfigMapChange{" +
"configMapName='" + configMapName + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,13 @@ public void apply(KubernetesClient client) {
log.error("apply -> error while creating a pod {}/{}: {}", poolInstance.getName(), podName, e.getMessage());
}
}

@Override
public String toString() {
return "CreatePodChange{" +
"podName='" + podName + '\'' +
", configMapName='" + configMapName + '\'' +
", hash='" + hash + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,11 @@ public void apply(KubernetesClient client) {
log.info("apply -> removed a configmap {}", configMapName);
}
}

@Override
public String toString() {
return "DeleteConfigMapChange{" +
"configMapName='" + configMapName + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.client.KubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class Planner {

private static final Logger log = LoggerFactory.getLogger(Planner.class);

private final KubernetesClient client;

public Planner(KubernetesClient client) {
Expand Down Expand Up @@ -70,6 +76,8 @@ public List<Change> plan(AgentPoolInstance poolInstance) throws IOException {

int targetSize = poolInstance.getTargetSize();

log.info("plan ['{}'] -> currentSize = {}, targetSize= {}, configMap = {}", resourceName, currentSize, targetSize, m != null);

AgentPoolInstance.Status poolStatus = poolInstance.getStatus();
if (poolStatus == AgentPoolInstance.Status.DELETED) {
targetSize = 0;
Expand Down Expand Up @@ -124,35 +132,51 @@ public List<Change> plan(AgentPoolInstance poolInstance) throws IOException {
}

// create or remove pods according to the configured pool size

for (int i = 0; i < targetSize; i++) {
String podName = podName(resourceName, i);

boolean exists = hasPod(pods, podName);
if (exists) {
continue;
if (targetSize > currentSize) {
Set<String> podNames = pods.stream().map(p -> p.getMetadata().getName()).collect(Collectors.toSet());
for (int i = 0; i < targetSize - currentSize; i++) {
String podName = generatePodName(resourceName, podNames);
changes.add(new CreatePodChange(poolInstance, podName, configMapName(resourceName), newHash));
podNames.add(podName);
}

changes.add(new CreatePodChange(poolInstance, podName, configMapName(resourceName), newHash));
}

if (currentSize > targetSize) {
for (int i = targetSize; i < currentSize; i++) {
String podName = podName(resourceName, i);

boolean exists = hasPod(pods, podName);
if (!exists) {
int podsToDelete = currentSize - targetSize;
for (Pod pod : pods) {
if (pod.getMetadata().getLabels().containsKey(AgentPod.TAGGED_FOR_REMOVAL_LABEL)) {
continue;
}

String podName = pod.getMetadata().getName();
changes.add(new TagForRemovalChange(podName));
changes.add(new TryToDeletePodChange(podName));

podsToDelete--;
if (podsToDelete == 0) {
break;
}
}
}

if (!changes.isEmpty()) {
log.info("plan ['{}'] -> changes: {}", resourceName, changes);
}

return changes;
}

private static String generatePodName(String resourceName, Set<String> podNames) {
for (int i = 0; i < podNames.size() + 1; i++) {
String podName = podName(resourceName, i);

if (!podNames.contains(podName)) {
return podName;
}
}
throw new RuntimeException("Can't generate pod name for '" + resourceName + "', current pods: " + podNames);
}

private static boolean hasPod(List<Pod> pods, String podName) {
return pods.stream().anyMatch(p -> p.getMetadata().getName().equals(podName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,11 @@ public void apply(KubernetesClient client) {
public static void apply(KubernetesClient client, String podName) {
PodUtils.applyTag(client, podName, AgentPod.TAGGED_FOR_REMOVAL_LABEL, "true");
}

@Override
public String toString() {
return "TagForRemovalChange{" +
"podName='" + podName + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,11 @@ public void apply(KubernetesClient client) {
PodUtils.applyTag(client, podName, AgentPod.PRE_STOP_HOOK_TERMINATION_LABEL, "true");
log.info("apply ['{}'] -> Marked for termination (former phase: {})", podName, pod.getStatus().getPhase());
}

@Override
public String toString() {
return "TryToDeletePodChange{" +
"podName='" + podName + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.escape.Escaper;
import com.google.common.net.UrlEscapers;
import com.walmartlabs.concord.agentoperator.scheduler.QueueSelector;
import okhttp3.*;

import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -50,13 +54,20 @@ public ProcessQueueClient(String baseUrl, String apiToken) {
this.objectMapper = new ObjectMapper();
}

public List<ProcessQueueEntry> query(String processStatus, int limit, String flavor) throws IOException {
String queryUrl = baseUrl + "/api/v2/process/requirements?status=" + processStatus + "&limit=" + limit + "&startAt.len=";
public List<ProcessQueueEntry> query(String processStatus, int limit, QueueSelector queueSelector) throws IOException {
StringBuilder queryUrl = new StringBuilder(baseUrl + "/api/v2/process/requirements?status=" + processStatus + "&limit=" + limit + "&startAt.len=");
String flavor = queueSelector.getFlavor();
if (flavor != null) {
queryUrl = queryUrl + "&requirements.agent.flavor.eq=" + flavor;
queryUrl.append("&requirements.agent.flavor.eq=").append(flavor);
}
List<String> queryParams = queueSelector.getQueryParams();
if (queryParams != null) {
for (String queryParam : queryParams) {
queryUrl.append("&").append(escapeQueryParam(queryParam));
}
}
Request req = new Request.Builder()
.url(queryUrl)
.url(queryUrl.toString())
.header("Authorization", apiToken)
.addHeader("User-Agent", "k8s-agent-operator")
.build();
Expand Down Expand Up @@ -160,4 +171,16 @@ public Response intercept(Chain chain) throws IOException {
return resp;
}
}

@VisibleForTesting
static String escapeQueryParam(String s) {
Escaper escaper = UrlEscapers.urlPathSegmentEscaper();
int i = s.indexOf("=");
if (i < 0) {
return escaper.escape(s);
}
String key = s.substring(0, i);
String value = s.substring(i + 1);
return escaper.escape(key) + "=" + escaper.escape(value);
}
}
Loading
Loading