From 9320ea2f6c496341171ba8799cf0f3d58119ca83 Mon Sep 17 00:00:00 2001 From: Ivan Bodrov Date: Tue, 28 Jan 2025 09:54:44 -0500 Subject: [PATCH] concord-agent-operator: use informers API --- .../concord/agentoperator/Operator.java | 67 ++++++++++--------- .../agentoperator/scheduler/Scheduler.java | 5 +- 2 files changed, 39 insertions(+), 33 deletions(-) diff --git a/agent-operator/src/main/java/com/walmartlabs/concord/agentoperator/Operator.java b/agent-operator/src/main/java/com/walmartlabs/concord/agentoperator/Operator.java index d865ccd9c1..ef65339237 100644 --- a/agent-operator/src/main/java/com/walmartlabs/concord/agentoperator/Operator.java +++ b/agent-operator/src/main/java/com/walmartlabs/concord/agentoperator/Operator.java @@ -25,69 +25,74 @@ import com.walmartlabs.concord.agentoperator.crd.AgentPool; import com.walmartlabs.concord.agentoperator.crd.AgentPoolList; import com.walmartlabs.concord.agentoperator.scheduler.AutoScalerFactory; -import com.walmartlabs.concord.agentoperator.scheduler.Event; import com.walmartlabs.concord.agentoperator.scheduler.Scheduler; import io.fabric8.kubernetes.client.DefaultKubernetesClient; -import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; +import java.util.concurrent.Executors; + +import static com.walmartlabs.concord.agentoperator.scheduler.Event.Type.DELETED; +import static com.walmartlabs.concord.agentoperator.scheduler.Event.Type.MODIFIED; + public class Operator { private static final Logger log = LoggerFactory.getLogger(Operator.class); + private static final long RESYNC_PERIOD = Duration.ofSeconds(10).toMillis(); + public static void main(String[] args) { // TODO support overloading the CRD with an external file? - var namespace = System.getenv("WATCH_NAMESPACE"); - if (namespace == null) { - namespace = "default"; - } - - var client = new DefaultKubernetesClient() // NOSONAR - .inNamespace(namespace); - + var namespace = getEnv("WATCH_NAMESPACE", "default"); var baseUrl = getEnv("CONCORD_BASE_URL", "http://192.168.99.1:8001"); // use minikube/vbox host's default address var apiToken = getEnv("CONCORD_API_TOKEN", null); var useMaintenanceMode = Boolean.parseBoolean(getEnv("USE_AGENT_MAINTENANCE_MODE", "false")); // TODO use secrets for the token? var cfg = new Scheduler.Configuration(baseUrl, apiToken); + var client = new DefaultKubernetesClient().inNamespace(namespace); var autoScalerFactory = new AutoScalerFactory(cfg, client); var agentClientFactory = new AgentClientFactory(useMaintenanceMode); + var executor = Executors.newCachedThreadPool(); + var scheduler = new Scheduler(autoScalerFactory, client, agentClientFactory); scheduler.start(); log.info("main -> my watch begins... (namespace={})", namespace); - var dummyClient = client.resources(AgentPool.class, AgentPoolList.class); + var informer = client.resources(AgentPool.class, AgentPoolList.class).inAnyNamespace() + .inform(new ResourceEventHandler<>() { + + @Override + public void onAdd(AgentPool resource) { + executor.submit(() -> scheduler.onEvent(MODIFIED, resource)); + } + + @Override + public void onUpdate(AgentPool oldResource, AgentPool newResource) { + if (oldResource == newResource) { + return; + } + executor.submit(() -> scheduler.onEvent(MODIFIED, newResource)); + } + + @Override + public void onDelete(AgentPool resource, boolean deletedFinalStateUnknown) { + executor.submit(() -> scheduler.onEvent(DELETED, resource)); + } + }, RESYNC_PERIOD); + try { - dummyClient.watch(new Watcher<>() { - @Override - public void eventReceived(Action action, AgentPool resource) { - scheduler.onEvent(actionToEvent(action), resource); - } - - @Override - public void onClose(WatcherException we) { - log.error("Watcher exception {}", we.getMessage(), we); - } - }); + informer.run(); } catch (Exception e) { log.error("Error while watching for CRs (namespace={})", namespace, e); System.exit(2); } } - private static Event.Type actionToEvent(Watcher.Action action) { - return switch (action) { - case ADDED, MODIFIED -> Event.Type.MODIFIED; - case DELETED -> Event.Type.DELETED; - default -> throw new IllegalArgumentException("Unknown action type: " + action); - }; - } - private static String getEnv(String key, String defaultValue) { String s = System.getenv(key); if (s == null) { diff --git a/agent-operator/src/main/java/com/walmartlabs/concord/agentoperator/scheduler/Scheduler.java b/agent-operator/src/main/java/com/walmartlabs/concord/agentoperator/scheduler/Scheduler.java index 27b49189aa..fc6f6307e4 100644 --- a/agent-operator/src/main/java/com/walmartlabs/concord/agentoperator/scheduler/Scheduler.java +++ b/agent-operator/src/main/java/com/walmartlabs/concord/agentoperator/scheduler/Scheduler.java @@ -55,6 +55,7 @@ public Scheduler(AutoScalerFactory autoScalerFactory, KubernetesClient k8sClient } public void onEvent(Event.Type type, AgentPool resource) { + log.info("onEvent -> handling {} for {}/{}", type, resource.getMetadata().getNamespace(), resource.getMetadata().getName()); synchronized (events) { events.add(new Event(type, resource)); } @@ -161,13 +162,13 @@ private AgentPoolInstance updateTargetSize(AgentPoolInstance i) throws IOExcepti } private void processActive(AgentPoolInstance i) throws IOException { - log.info("processActive ['{}']", i); + log.info("processActive ['{}']", i.getName()); List changes = planner.plan(i); apply(changes); } private void processDeleted(AgentPoolInstance i) throws IOException { - log.info("processDeleted ['{}']", i); + log.info("processDeleted ['{}']", i.getName()); String resourceName = i.getName(); // remove all pool's pods