Skip to content
This repository was archived by the owner on Sep 26, 2024. It is now read-only.

Commit

Permalink
KAFKA-16837, KAFKA-16838: Ignore task configs for deleted connectors,…
Browse files Browse the repository at this point in the history
… and compare raw task configs before publishing them (apache#16122)

Reviewers: Mickael Maison <[email protected]>
  • Loading branch information
C0urante authored Jun 4, 2024
1 parent a08db65 commit 0409003
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
*/
public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener {

private final Logger log = LoggerFactory.getLogger(AbstractHerder.class);
private static final Logger log = LoggerFactory.getLogger(AbstractHerder.class);

private final String workerId;
protected final Worker worker;
Expand Down Expand Up @@ -1039,16 +1039,16 @@ public static List<Map<String, String>> reverseTransform(String connName,
return result;
}

public boolean taskConfigsChanged(ClusterConfigState configState, String connName, List<Map<String, String>> taskProps) {
public static boolean taskConfigsChanged(ClusterConfigState configState, String connName, List<Map<String, String>> rawTaskProps) {
int currentNumTasks = configState.taskCount(connName);
boolean result = false;
if (taskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, taskProps.size());
if (rawTaskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, rawTaskProps.size());
result = true;
} else {
for (int index = 0; index < currentNumTasks; index++) {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
if (!taskProps.get(index).equals(configState.taskConfig(taskId))) {
if (!rawTaskProps.get(index).equals(configState.rawTaskConfig(taskId))) {
log.debug("Connector {} has change in configuration for task {}-{}", connName, connName, index);
result = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2229,11 +2229,11 @@ private void reconfigureConnector(final String connName, final Callback<Void> cb
}

private void publishConnectorTaskConfigs(String connName, List<Map<String, String>> taskProps, Callback<Void> cb) {
if (!taskConfigsChanged(configState, connName, taskProps)) {
List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
if (!taskConfigsChanged(configState, connName, rawTaskProps)) {
return;
}

List<Map<String, String>> rawTaskProps = reverseTransform(connName, configState, taskProps);
if (isLeader()) {
writeTaskConfigs(connName, rawTaskProps);
cb.onCompletion(null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,10 @@ private synchronized void updateConnectorTasks(String connName) {
}

List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs);

if (taskConfigsChanged(configState, connName, newTaskConfigs)) {
if (taskConfigsChanged(configState, connName, rawTaskConfigs)) {
removeConnectorTasks(connName);
List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs);
configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
createConnectorTasks(connName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -997,11 +997,8 @@ private void processConnectorConfigRecord(String connectorName, SchemaAndValue v
synchronized (lock) {
if (value.value() == null) {
// Connector deletion will be written as a null value
processConnectorRemoval(connectorName);
log.info("Successfully processed removal of connector '{}'", connectorName);
connectorConfigs.remove(connectorName);
connectorTaskCounts.remove(connectorName);
taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
deferredTaskUpdates.remove(connectorName);
removed = true;
} else {
// Connector configs can be applied and callbacks invoked immediately
Expand Down Expand Up @@ -1064,6 +1061,21 @@ private void processTaskConfigRecord(ConnectorTaskId taskId, SchemaAndValue valu
private void processTasksCommitRecord(String connectorName, SchemaAndValue value) {
List<ConnectorTaskId> updatedTasks = new ArrayList<>();
synchronized (lock) {
// Edge case: connector was deleted before these task configs were published,
// but compaction took place and both the original connector config and the
// tombstone message for it have been removed from the config topic
// We should ignore these task configs
if (!connectorConfigs.containsKey(connectorName)) {
processConnectorRemoval(connectorName);
log.debug(
"Ignoring task configs for connector {}; it appears that the connector was deleted previously "
+ "and that log compaction has since removed any trace of its previous configurations "
+ "from the config topic",
connectorName
);
return;
}

// Apply any outstanding deferred task updates for the given connector. Note that just because we
// encounter a commit message does not mean it will result in consistent output. In particular due to
// compaction, there may be cases where . For example if we have the following sequence of writes:
Expand Down Expand Up @@ -1168,7 +1180,7 @@ private void processTaskCountRecord(String connectorName, SchemaAndValue value)

log.debug("Setting task count record for connector '{}' to {}", connectorName, taskCount);
connectorTaskCountRecords.put(connectorName, taskCount);
// If a task count record appears after the latest task configs, the connectors doesn't need a round of zombie
// If a task count record appears after the latest task configs, the connector doesn't need a round of zombie
// fencing before it can start tasks with the latest configs
connectorsPendingFencing.remove(connectorName);
}
Expand Down Expand Up @@ -1244,6 +1256,13 @@ private void processLoggerLevelRecord(String namespace, SchemaAndValue value) {
}
}

private void processConnectorRemoval(String connectorName) {
connectorConfigs.remove(connectorName);
connectorTaskCounts.remove(connectorName);
taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
deferredTaskUpdates.remove(connectorName);
}

private ConnectorTaskId parseTaskId(String key) {
String[] parts = key.split("-");
if (parts.length < 3) return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.connect.integration;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.provider.FileConfigProvider;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -39,11 +40,14 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import java.io.File;
import java.io.FileOutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -56,6 +60,9 @@

import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.common.config.AbstractConfig.CONFIG_PROVIDERS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.DELETE_RETENTION_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_MS_CONFIG;
import static org.apache.kafka.connect.integration.BlockingConnectorTest.TASK_STOP;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
Expand All @@ -71,6 +78,7 @@
import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_PREFIX;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.REBALANCE_TIMEOUT_MS_CONFIG;
Expand Down Expand Up @@ -108,6 +116,9 @@ public class ConnectWorkerIntegrationTest {
@Rule
public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);

@Rule
public TemporaryFolder tmp = new TemporaryFolder();

@Before
public void setup() {
// setup Connect worker properties
Expand Down Expand Up @@ -1123,6 +1134,194 @@ public void testTasksMaxEnforcement() throws Exception {
);
}

/**
* Task configs are not removed from the config topic after a connector is deleted.
* When topic compaction takes place, this can cause the tombstone message for the
* connector config to be deleted, leaving the task configs in the config topic with no
* explicit record of the connector's deletion.
* <p>
* This test guarantees that those older task configs are never used, even when the
* connector is recreated later.
*/
@Test
public void testCompactedDeletedOlderConnectorConfig() throws Exception {
brokerProps.put("log.cleaner.backoff.ms", "100");
brokerProps.put("log.cleaner.delete.retention.ms", "1");
brokerProps.put("log.cleaner.max.compaction.lag.ms", "1");
brokerProps.put("log.cleaner.min.cleanable.ratio", "0");
brokerProps.put("log.cleaner.min.compaction.lag.ms", "1");
brokerProps.put("log.cleaner.threads", "1");

final String configTopic = "kafka-16838-configs";
final int offsetCommitIntervalMs = 100;
workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
workerProps.put(CONFIG_STORAGE_PREFIX + SEGMENT_MS_CONFIG, "100");
workerProps.put(CONFIG_STORAGE_PREFIX + DELETE_RETENTION_MS_CONFIG, "1");
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Integer.toString(offsetCommitIntervalMs));

final int numWorkers = 1;
connect = connectBuilder
.numWorkers(numWorkers)
.build();
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(
numWorkers,
"Initial group of workers did not start in time."
);

final String connectorTopic = "connector-topic";
connect.kafka().createTopic(connectorTopic, 1);

ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
connectorHandle.expectedCommits(NUM_TASKS * 2);

Map<String, String> connectorConfig = defaultSourceConnectorProps(connectorTopic);
connect.configureConnector(CONNECTOR_NAME, connectorConfig);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector or its tasks did not start in time"
);
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);

connect.deleteConnector(CONNECTOR_NAME);

// Roll the entire cluster
connect.activeWorkers().forEach(connect::removeWorker);

// Miserable hack: produce directly to the config topic and then wait a little bit
// in order to trigger segment rollover and allow compaction to take place
connect.kafka().produce(configTopic, "garbage-key-1", null);
Thread.sleep(1_000);
connect.kafka().produce(configTopic, "garbage-key-2", null);
Thread.sleep(1_000);

for (int i = 0; i < numWorkers; i++)
connect.addWorker();

connect.assertions().assertAtLeastNumWorkersAreUp(
numWorkers,
"Initial group of workers did not start in time."
);

final TopicPartition connectorTopicPartition = new TopicPartition(connectorTopic, 0);
final long initialEndOffset = connect.kafka().endOffset(connectorTopicPartition);
assertTrue(
"Source connector should have published at least one record to Kafka",
initialEndOffset > 0
);

connectorHandle.expectedCommits(NUM_TASKS * 2);

// Re-create the connector with a different config (targets a different topic)
final String otherConnectorTopic = "other-topic";
connect.kafka().createTopic(otherConnectorTopic, 1);
connectorConfig.put(TOPIC_CONFIG, otherConnectorTopic);
connect.configureConnector(CONNECTOR_NAME, connectorConfig);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector or its tasks did not start in time"
);
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);

// See if any new records got written to the old topic
final long nextEndOffset = connect.kafka().endOffset(connectorTopicPartition);
assertEquals(
"No new records should have been written to the older topic",
initialEndOffset,
nextEndOffset
);
}

/**
* If a connector has existing tasks, and then generates new task configs, workers compare the
* new and existing configs before publishing them to the config topic. If there is no difference,
* workers do not publish task configs (this is a workaround to prevent infinite loops with eager
* rebalancing).
* <p>
* This test tries to guarantee that, if the old task configs become invalid because of
* an invalid config provider reference, it will still be possible to reconfigure the connector.
*/
@Test
public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception {
final int offsetCommitIntervalMs = 100;
workerProps.put(CONFIG_PROVIDERS_CONFIG, "file");
workerProps.put(CONFIG_PROVIDERS_CONFIG + ".file.class", FileConfigProvider.class.getName());
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Integer.toString(offsetCommitIntervalMs));

final int numWorkers = 1;
connect = connectBuilder
.numWorkers(numWorkers)
.build();
// start the clusters
connect.start();

connect.assertions().assertAtLeastNumWorkersAreUp(
numWorkers,
"Initial group of workers did not start in time."
);

final String firstConnectorTopic = "connector-topic-1";
connect.kafka().createTopic(firstConnectorTopic);

final File secretsFile = tmp.newFile("test-secrets");
final Properties secrets = new Properties();
final String throughputSecretKey = "secret-throughput";
secrets.put(throughputSecretKey, "10");
try (FileOutputStream secretsOutputStream = new FileOutputStream(secretsFile)) {
secrets.store(secretsOutputStream, null);
}

ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
connectorHandle.expectedCommits(NUM_TASKS * 2);

Map<String, String> connectorConfig = defaultSourceConnectorProps(firstConnectorTopic);
connectorConfig.put(
"throughput",
"${file:" + secretsFile.getAbsolutePath() + ":" + throughputSecretKey + "}"
);
connect.configureConnector(CONNECTOR_NAME, connectorConfig);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector or its tasks did not start in time"
);
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);

// Delete the secrets file, which should render the old task configs invalid
assertTrue("Failed to delete secrets file", secretsFile.delete());

// Use a start latch here instead of assertConnectorAndExactlyNumTasksAreRunning
// since failure to reconfigure the tasks (which may occur if the bug this test was written
// to help catch resurfaces) will not cause existing tasks to fail or stop running
StartAndStopLatch restarts = connectorHandle.expectedStarts(1);
connectorHandle.expectedCommits(NUM_TASKS * 2);

final String secondConnectorTopic = "connector-topic-2";
connect.kafka().createTopic(secondConnectorTopic, 1);

// Stop using the config provider for this connector, and instruct it to start writing to the
// old topic again
connectorConfig.put("throughput", "10");
connectorConfig.put(TOPIC_CONFIG, secondConnectorTopic);
connect.configureConnector(CONNECTOR_NAME, connectorConfig);
assertTrue(
"Connector tasks were not restarted in time",
restarts.await(10, TimeUnit.SECONDS)
);
connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);

final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0));
assertTrue(
"Source connector should have published at least one record to new Kafka topic "
+ "after being reconfigured",
endOffset > 0
);
}

private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup props for the source connector
Map<String, String> props = new HashMap<>();
Expand Down
Loading

0 comments on commit 0409003

Please sign in to comment.