Skip to content

Commit

Permalink
Add a command to remove e164-associated "recently-deleted account" re…
Browse files Browse the repository at this point in the history
…cords
  • Loading branch information
jon-signal committed Nov 27, 2024
1 parent d08bc4c commit 4d87b74
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@
import org.whispersystems.textsecuregcm.workers.MessagePersisterServiceCommand;
import org.whispersystems.textsecuregcm.workers.NotifyIdleDevicesCommand;
import org.whispersystems.textsecuregcm.workers.ProcessScheduledJobsServiceCommand;
import org.whispersystems.textsecuregcm.workers.RemoveE164RecentlyDeletedAccountsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredAccountsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredBackupsCommand;
import org.whispersystems.textsecuregcm.workers.RemoveExpiredLinkedDevicesCommand;
Expand Down Expand Up @@ -329,6 +330,8 @@ public void initialize(final Bootstrap<WhisperServerConfiguration> bootstrap) {
bootstrap.addCommand(new ProcessScheduledJobsServiceCommand("process-idle-device-notification-jobs",
"Processes scheduled jobs to send notifications to idle devices",
new IdleDeviceNotificationSchedulerFactory()));

bootstrap.addCommand(new RemoveE164RecentlyDeletedAccountsCommand());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import software.amazon.awssdk.services.dynamodb.model.CancellationReason;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.Delete;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.Put;
Expand Down Expand Up @@ -1261,7 +1262,15 @@ Flux<Account> getAll(final int segments, final Scheduler scheduler) {
.sequential();
}

public Flux<String> getE164sForRecentlyDeletedAccounts(final int segments, final Scheduler scheduler) {
CompletableFuture<Void> removeRecentlyDeletedAccountRecord(final String e164) {
return asyncClient.deleteItem(DeleteItemRequest.builder()
.tableName(deletedAccountsTableName)
.key(Map.of(DELETED_ACCOUNTS_KEY_ACCOUNT_PNI, AttributeValues.fromString(e164)))
.build())
.thenRun(Util.NOOP);
}

Flux<String> getE164sForRecentlyDeletedAccounts(final int segments, final Scheduler scheduler) {
if (segments < 1) {
throw new IllegalArgumentException("Total number of segments must be positive");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import org.whispersystems.textsecuregcm.util.Util;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.util.function.Tuple3;
import software.amazon.awssdk.services.dynamodb.model.TransactWriteItem;
import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException;

Expand Down Expand Up @@ -1213,6 +1212,14 @@ public Optional<UUID> findRecentlyDeletedPhoneNumberIdentifier(final UUID accoun
return accounts.findRecentlyDeletedPhoneNumberIdentifier(accountIdentifier);
}

public Flux<String> getE164sForRecentlyDeletedAccounts(final int segments, final Scheduler scheduler) {
return accounts.getE164sForRecentlyDeletedAccounts(segments, scheduler);
}

public CompletableFuture<Void> removeRecentlyDeletedAccountRecord(final String e164) {
return accounts.removeRecentlyDeletedAccountRecord(e164);
}

public Flux<Account> streamAllFromDynamo(final int segments, final Scheduler scheduler) {
return accounts.getAll(segments, scheduler);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/

package org.whispersystems.textsecuregcm.workers;

import io.dropwizard.core.Application;
import io.dropwizard.core.setup.Environment;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
import org.whispersystems.textsecuregcm.metrics.MetricsUtil;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

public class RemoveE164RecentlyDeletedAccountsCommand extends AbstractCommandWithDependencies {

private static final String DRY_RUN_ARGUMENT = "dry-run";
private static final String MAX_CONCURRENCY_ARGUMENT = "max-concurrency";
private static final String SEGMENTS_ARGUMENT = "segments";
private static final String BUFFER_ARGUMENT = "buffer";

private static final String RECORDS_INSPECTED_COUNTER_NAME =
MetricsUtil.name(RemoveE164RecentlyDeletedAccountsCommand.class, "recordsInspected");

private static final String RECORDS_DELETED_COUNTER_NAME =
MetricsUtil.name(RemoveE164RecentlyDeletedAccountsCommand.class, "recordsDeleted");

private static final String DRY_RUN_TAG = "dryRun";

private static final Logger logger = LoggerFactory.getLogger(RemoveE164RecentlyDeletedAccountsCommand.class);

public RemoveE164RecentlyDeletedAccountsCommand() {

super(new Application<>() {
@Override
public void run(final WhisperServerConfiguration configuration, final Environment environment) {
}
}, "remove-e164-recently-deleted-accounts", "Delete e164-associated recently-deleted account records");
}

@Override
public void configure(final Subparser subparser) {
super.configure(subparser);

subparser.addArgument("--dry-run")
.type(Boolean.class)
.dest(DRY_RUN_ARGUMENT)
.required(false)
.setDefault(true)
.help("If true, don’t actually delete any registration recovery passwords");

subparser.addArgument("--max-concurrency")
.type(Integer.class)
.dest(MAX_CONCURRENCY_ARGUMENT)
.setDefault(16)
.help("Max concurrency for DynamoDB operations");

subparser.addArgument("--segments")
.type(Integer.class)
.dest(SEGMENTS_ARGUMENT)
.required(false)
.setDefault(1)
.help("The total number of segments for a DynamoDB scan");

subparser.addArgument("--buffer")
.type(Integer.class)
.dest(BUFFER_ARGUMENT)
.setDefault(16_384)
.help("Records to buffer");
}

@Override
protected void run(final Environment environment, final Namespace namespace,
final WhisperServerConfiguration configuration, final CommandDependencies commandDependencies) throws Exception {

final boolean dryRun = namespace.getBoolean(DRY_RUN_ARGUMENT);
final int maxConcurrency = namespace.getInt(MAX_CONCURRENCY_ARGUMENT);
final int segments = namespace.getInt(SEGMENTS_ARGUMENT);
final int bufferSize = namespace.getInt(BUFFER_ARGUMENT);

final Counter recordsInspectedCounter =
Metrics.counter(RECORDS_INSPECTED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));

final Counter recordsDeletedCounter =
Metrics.counter(RECORDS_DELETED_COUNTER_NAME, DRY_RUN_TAG, String.valueOf(dryRun));

final AccountsManager accountsManager = commandDependencies.accountsManager();

accountsManager.getE164sForRecentlyDeletedAccounts(segments, Schedulers.parallel())
.buffer(bufferSize)
.map(source -> {
final List<String> shuffled = new ArrayList<>(source);
Collections.shuffle(shuffled);
return shuffled;
})
.limitRate(2)
.flatMapIterable(Function.identity())
.doOnNext(e164 -> recordsInspectedCounter.increment())
.flatMap(e164 -> {
final Mono<Void> deleteMono = dryRun
? Mono.empty()
: Mono.fromFuture(() -> accountsManager.removeRecentlyDeletedAccountRecord(e164))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
.onErrorResume(throwable -> {
logger.warn("Failed to remove recently-deleted account record for {}", e164, throwable);
return Mono.empty();
});

return deleteMono.doOnSuccess(ignored -> recordsDeletedCounter.increment());
}, maxConcurrency)
.then()
.block();
}
}

0 comments on commit 4d87b74

Please sign in to comment.