Skip to content

Commit

Permalink
Add gRPC backup services
Browse files Browse the repository at this point in the history
  • Loading branch information
ravi-signal committed Jan 8, 2025
1 parent 3ca9a66 commit a88560e
Show file tree
Hide file tree
Showing 8 changed files with 1,482 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
import org.whispersystems.textsecuregcm.grpc.ProfileAnonymousGrpcService;
import org.whispersystems.textsecuregcm.grpc.ProfileGrpcService;
import org.whispersystems.textsecuregcm.grpc.RequestAttributesInterceptor;
import org.whispersystems.textsecuregcm.grpc.ValidatingInterceptor;
import org.whispersystems.textsecuregcm.grpc.net.GrpcClientConnectionManager;
import org.whispersystems.textsecuregcm.grpc.net.ManagedDefaultEventLoopGroup;
import org.whispersystems.textsecuregcm.grpc.net.ManagedLocalGrpcServer;
Expand Down Expand Up @@ -862,6 +863,8 @@ public void run(WhisperServerConfiguration config, Environment environment) thro
final RequestAttributesInterceptor requestAttributesInterceptor =
new RequestAttributesInterceptor(grpcClientConnectionManager);

final ValidatingInterceptor validatingInterceptor = new ValidatingInterceptor();

final LocalAddress anonymousGrpcServerAddress = new LocalAddress("grpc-anonymous");
final LocalAddress authenticatedGrpcServerAddress = new LocalAddress("grpc-authenticated");

Expand All @@ -875,6 +878,7 @@ protected void configureServer(final ServerBuilder<?> serverBuilder) {
.intercept(
new ExternalRequestFilter(config.getExternalRequestFilterConfiguration().permittedInternalRanges(),
config.getExternalRequestFilterConfiguration().grpcMethods()))
.intercept(validatingInterceptor)
// TODO: specialize metrics with user-agent platform
.intercept(metricCollectingServerInterceptor)
.intercept(errorMappingInterceptor)
Expand All @@ -897,6 +901,7 @@ protected void configureServer(final ServerBuilder<?> serverBuilder) {
// http://grpc.github.io/grpc-java/javadoc/io/grpc/ServerBuilder.html#intercept-io.grpc.ServerInterceptor-
serverBuilder
// TODO: specialize metrics with user-agent platform
.intercept(validatingInterceptor)
.intercept(metricCollectingServerInterceptor)
.intercept(errorMappingInterceptor)
.intercept(remoteDeprecationFilter)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.grpc;

import com.google.protobuf.ByteString;
import io.grpc.Status;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.signal.chat.backup.CopyMediaRequest;
import org.signal.chat.backup.CopyMediaResponse;
import org.signal.chat.backup.DeleteAllRequest;
import org.signal.chat.backup.DeleteAllResponse;
import org.signal.chat.backup.DeleteMediaRequest;
import org.signal.chat.backup.DeleteMediaResponse;
import org.signal.chat.backup.GetBackupInfoRequest;
import org.signal.chat.backup.GetBackupInfoResponse;
import org.signal.chat.backup.GetCdnCredentialsRequest;
import org.signal.chat.backup.GetCdnCredentialsResponse;
import org.signal.chat.backup.GetUploadFormRequest;
import org.signal.chat.backup.GetUploadFormResponse;
import org.signal.chat.backup.ListMediaRequest;
import org.signal.chat.backup.ListMediaResponse;
import org.signal.chat.backup.ReactorBackupsAnonymousGrpc;
import org.signal.chat.backup.RefreshRequest;
import org.signal.chat.backup.RefreshResponse;
import org.signal.chat.backup.SetPublicKeyRequest;
import org.signal.chat.backup.SetPublicKeyResponse;
import org.signal.chat.backup.SignedPresentation;
import org.signal.libsignal.protocol.InvalidKeyException;
import org.signal.libsignal.protocol.ecc.ECPublicKey;
import org.signal.libsignal.zkgroup.InvalidInputException;
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialPresentation;
import org.whispersystems.textsecuregcm.auth.AuthenticatedBackupUser;
import org.whispersystems.textsecuregcm.backup.BackupManager;
import org.whispersystems.textsecuregcm.backup.CopyParameters;
import org.whispersystems.textsecuregcm.backup.MediaEncryptionParameters;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class BackupsAnonymousGrpcService extends ReactorBackupsAnonymousGrpc.BackupsAnonymousImplBase {

private final BackupManager backupManager;

public BackupsAnonymousGrpcService(final BackupManager backupManager) {
this.backupManager = backupManager;
}

@Override
public Mono<GetCdnCredentialsResponse> getCdnCredentials(final GetCdnCredentialsRequest request) {
return authenticateBackupUserMono(request.getSignedPresentation())
.map(user -> backupManager.generateReadAuth(user, request.getCdn()))
.map(credentials -> GetCdnCredentialsResponse.newBuilder().putAllHeaders(credentials).build());
}

@Override
public Mono<GetBackupInfoResponse> getBackupInfo(final GetBackupInfoRequest request) {
return Mono.fromFuture(() ->
authenticateBackupUser(request.getSignedPresentation()).thenCompose(backupManager::backupInfo))
.map(info -> GetBackupInfoResponse.newBuilder()
.setBackupName(info.messageBackupKey())
.setCdn(info.cdn())
.setBackupDir(info.backupSubdir())
.setMediaDir(info.mediaSubdir())
.setUsedSpace(info.mediaUsedSpace().orElse(0L))
.build());
}

@Override
public Mono<RefreshResponse> refresh(final RefreshRequest request) {
return Mono.fromFuture(() -> authenticateBackupUser(request.getSignedPresentation())
.thenCompose(backupManager::ttlRefresh))
.thenReturn(RefreshResponse.getDefaultInstance());
}

@Override
public Mono<SetPublicKeyResponse> setPublicKey(final SetPublicKeyRequest request) {
final ECPublicKey publicKey = deserialize(ECPublicKey::new, request.getPublicKey().toByteArray());
final BackupAuthCredentialPresentation presentation = deserialize(
BackupAuthCredentialPresentation::new,
request.getSignedPresentation().getPresentation().toByteArray());
final byte[] signature = request.getSignedPresentation().getPresentationSignature().toByteArray();

return Mono.fromFuture(() -> backupManager.setPublicKey(presentation, signature, publicKey))
.thenReturn(SetPublicKeyResponse.getDefaultInstance());
}


@Override
public Mono<GetUploadFormResponse> getUploadForm(final GetUploadFormRequest request) {
return authenticateBackupUserMono(request.getSignedPresentation())
.flatMap(backupUser -> switch (request.getUploadTypeCase()) {
case MESSAGES -> Mono.fromFuture(backupManager.createMessageBackupUploadDescriptor(backupUser));
case MEDIA -> Mono.fromCompletionStage(backupManager.createTemporaryAttachmentUploadDescriptor(backupUser));
case UPLOADTYPE_NOT_SET -> Mono.error(Status.INVALID_ARGUMENT
.withDescription("Must set upload_type")
.asRuntimeException());
})
.map(uploadDescriptor -> GetUploadFormResponse.newBuilder()
.setCdn(uploadDescriptor.cdn())
.setKey(uploadDescriptor.key())
.setSignedUploadLocation(uploadDescriptor.signedUploadLocation())
.putAllHeaders(uploadDescriptor.headers())
.build());
}

@Override
public Flux<CopyMediaResponse> copyMedia(final CopyMediaRequest request) {
return authenticateBackupUserMono(request.getSignedPresentation())
.flatMapMany(backupUser -> backupManager.copyToBackup(backupUser,
request.getItemsList().stream().map(item -> new CopyParameters(
item.getSourceAttachmentCdn(), item.getSourceKey(),
// uint32 in proto, make sure it fits in a signed int
fromUnsignedExact(item.getObjectLength()),
new MediaEncryptionParameters(item.getEncryptionKey().toByteArray(), item.getHmacKey().toByteArray()),
item.getMediaId().toByteArray())).toList()))
.map(copyResult -> {
CopyMediaResponse.Builder builder = CopyMediaResponse
.newBuilder()
.setMediaId(ByteString.copyFrom(copyResult.mediaId()));
builder = switch (copyResult.outcome()) {
case SUCCESS -> builder
.setSuccess(CopyMediaResponse.CopySuccess.newBuilder().setCdn(copyResult.cdn()).build());
case OUT_OF_QUOTA -> builder
.setOutOfSpace(CopyMediaResponse.OutOfSpace.getDefaultInstance());
case SOURCE_WRONG_LENGTH -> builder
.setWrongSourceLength(CopyMediaResponse.WrongSourceLength.getDefaultInstance());
case SOURCE_NOT_FOUND -> builder
.setSourceNotFound(CopyMediaResponse.SourceNotFound.getDefaultInstance());
};
return builder.build();
});

}

@Override
public Mono<ListMediaResponse> listMedia(final ListMediaRequest request) {
return authenticateBackupUserMono(request.getSignedPresentation()).zipWhen(
backupUser -> Mono.fromFuture(backupManager.list(
backupUser,
request.hasCursor() ? Optional.of(request.getCursor()) : Optional.empty(),
request.getLimit()).toCompletableFuture()),

(backupUser, listResult) -> {
final ListMediaResponse.Builder builder = ListMediaResponse.newBuilder();
for (BackupManager.StorageDescriptorWithLength sd : listResult.media()) {
builder.addPage(ListMediaResponse.ListEntry.newBuilder()
.setMediaId(ByteString.copyFrom(sd.key()))
.setCdn(sd.cdn())
.setLength(sd.length())
.build());
}
builder
.setBackupDir(backupUser.backupDir())
.setMediaDir(backupUser.mediaDir());
listResult.cursor().ifPresent(builder::setCursor);
return builder.build();
});

}

@Override
public Mono<DeleteAllResponse> deleteAll(final DeleteAllRequest request) {
return Mono.fromFuture(() -> authenticateBackupUser(request.getSignedPresentation())
.thenCompose(backupManager::deleteEntireBackup))
.thenReturn(DeleteAllResponse.getDefaultInstance());
}

@Override
public Flux<DeleteMediaResponse> deleteMedia(final DeleteMediaRequest request) {
return Mono
.fromFuture(() -> authenticateBackupUser(request.getSignedPresentation()))
.flatMapMany(backupUser -> backupManager.deleteMedia(backupUser, request
.getItemsList()
.stream()
.map(item -> new BackupManager.StorageDescriptor(item.getCdn(), item.getMediaId().toByteArray()))
.toList()))
.map(storageDescriptor -> DeleteMediaResponse.newBuilder()
.setMediaId(ByteString.copyFrom(storageDescriptor.key()))
.setCdn(storageDescriptor.cdn()).build());
}

private Mono<AuthenticatedBackupUser> authenticateBackupUserMono(final SignedPresentation signedPresentation) {
return Mono.fromFuture(() -> authenticateBackupUser(signedPresentation));
}

private CompletableFuture<AuthenticatedBackupUser> authenticateBackupUser(
final SignedPresentation signedPresentation) {
if (signedPresentation == null) {
throw Status.UNAUTHENTICATED.asRuntimeException();
}
try {
return backupManager.authenticateBackupUser(
new BackupAuthCredentialPresentation(signedPresentation.getPresentation().toByteArray()),
signedPresentation.getPresentationSignature().toByteArray());
} catch (InvalidInputException e) {
throw Status.UNAUTHENTICATED.withDescription("Could not deserialize presentation").asRuntimeException();
}
}

/**
* Convert an int from a proto uint32 to a signed positive integer, throwing if the value exceeds
* {@link Integer#MAX_VALUE}. To convert to a long, see {@link Integer#toUnsignedLong(int)}
*/
private static int fromUnsignedExact(final int i) {
if (i < 0) {
throw Status.INVALID_ARGUMENT.withDescription("Invalid size").asRuntimeException();
}
return i;
}

private interface Deserializer<T> {

T deserialize(byte[] bytes) throws InvalidInputException, InvalidKeyException;
}

private static <T> T deserialize(Deserializer<T> deserializer, byte[] bytes) {
try {
return deserializer.deserialize(bytes);
} catch (InvalidInputException | InvalidKeyException e) {
throw Status.INVALID_ARGUMENT.withDescription("Invalid serialization").asRuntimeException();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2024 Signal Messenger, LLC
* SPDX-License-Identifier: AGPL-3.0-only
*/
package org.whispersystems.textsecuregcm.grpc;

import com.google.protobuf.ByteString;
import io.grpc.Status;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
import org.signal.chat.backup.GetBackupAuthCredentialsRequest;
import org.signal.chat.backup.GetBackupAuthCredentialsResponse;
import org.signal.chat.backup.ReactorBackupsGrpc;
import org.signal.chat.backup.RedeemReceiptRequest;
import org.signal.chat.backup.RedeemReceiptResponse;
import org.signal.chat.backup.SetBackupIdRequest;
import org.signal.chat.backup.SetBackupIdResponse;
import org.signal.chat.common.ZkCredential;
import org.signal.libsignal.zkgroup.InvalidInputException;
import org.signal.libsignal.zkgroup.backups.BackupAuthCredentialRequest;
import org.signal.libsignal.zkgroup.backups.BackupCredentialType;
import org.signal.libsignal.zkgroup.receipts.ReceiptCredentialPresentation;
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticatedDevice;
import org.whispersystems.textsecuregcm.auth.grpc.AuthenticationUtil;
import org.whispersystems.textsecuregcm.backup.BackupAuthManager;
import org.whispersystems.textsecuregcm.storage.Account;
import org.whispersystems.textsecuregcm.storage.AccountsManager;
import reactor.core.publisher.Mono;

public class BackupsGrpcService extends ReactorBackupsGrpc.BackupsImplBase {

private final AccountsManager accountManager;
private final BackupAuthManager backupAuthManager;

public BackupsGrpcService(final AccountsManager accountManager, final BackupAuthManager backupAuthManager) {
this.accountManager = accountManager;
this.backupAuthManager = backupAuthManager;
}


@Override
public Mono<SetBackupIdResponse> setBackupId(SetBackupIdRequest request) {

final BackupAuthCredentialRequest messagesCredentialRequest = deserialize(
BackupAuthCredentialRequest::new,
request.getMessagesBackupAuthCredentialRequest().toByteArray());

final BackupAuthCredentialRequest mediaCredentialRequest = deserialize(
BackupAuthCredentialRequest::new,
request.getMediaBackupAuthCredentialRequest().toByteArray());

return authenticatedAccount()
.flatMap(account -> Mono.fromFuture(
backupAuthManager.commitBackupId(account, messagesCredentialRequest, mediaCredentialRequest)))
.thenReturn(SetBackupIdResponse.getDefaultInstance());
}

public Mono<RedeemReceiptResponse> redeemReceipt(RedeemReceiptRequest request) {
final ReceiptCredentialPresentation receiptCredentialPresentation = deserialize(
ReceiptCredentialPresentation::new,
request.getPresentation().toByteArray());
return authenticatedAccount()
.flatMap(account -> Mono.fromFuture(backupAuthManager.redeemReceipt(account, receiptCredentialPresentation)))
.thenReturn(RedeemReceiptResponse.getDefaultInstance());
}

@Override
public Mono<GetBackupAuthCredentialsResponse> getBackupAuthCredentials(GetBackupAuthCredentialsRequest request) {
return authenticatedAccount().flatMap(account -> {
final Mono<List<BackupAuthManager.Credential>> messageCredentials = Mono.fromCompletionStage(() ->
backupAuthManager.getBackupAuthCredentials(
account,
BackupCredentialType.MESSAGES,
Instant.ofEpochSecond(request.getRedemptionStart()),
Instant.ofEpochSecond(request.getRedemptionStop())));
final Mono<List<BackupAuthManager.Credential>> mediaCredentials = Mono.fromCompletionStage(() ->
backupAuthManager.getBackupAuthCredentials(
account,
BackupCredentialType.MEDIA,
Instant.ofEpochSecond(request.getRedemptionStart()),
Instant.ofEpochSecond(request.getRedemptionStop())));

return messageCredentials.zipWith(mediaCredentials, (messageCreds, mediaCreds) ->
GetBackupAuthCredentialsResponse.newBuilder()
.putAllMessageCredentials(messageCreds.stream().collect(Collectors.toMap(
c -> c.redemptionTime().getEpochSecond(),
c -> ZkCredential.newBuilder()
.setCredential(ByteString.copyFrom(c.credential().serialize()))
.setRedemptionTime(c.redemptionTime().getEpochSecond())
.build())))
.putAllMediaCredentials(mediaCreds.stream().collect(Collectors.toMap(
c -> c.redemptionTime().getEpochSecond(),
c -> ZkCredential.newBuilder()
.setCredential(ByteString.copyFrom(c.credential().serialize()))
.setRedemptionTime(c.redemptionTime().getEpochSecond())
.build())))
.build());
});
}

private Mono<Account> authenticatedAccount() {
final AuthenticatedDevice authenticatedDevice = AuthenticationUtil.requireAuthenticatedDevice();
return Mono
.fromFuture(() -> accountManager.getByAccountIdentifierAsync(authenticatedDevice.accountIdentifier()))
.map(maybeAccount -> maybeAccount.orElseThrow(Status.UNAUTHENTICATED::asRuntimeException));
}

private interface Deserializer<T> {

T deserialize(byte[] bytes) throws InvalidInputException;
}

private <T> T deserialize(Deserializer<T> deserializer, byte[] bytes) {
try {
return deserializer.deserialize(bytes);
} catch (InvalidInputException e) {
throw Status.INVALID_ARGUMENT.withDescription("Invalid serialization").asRuntimeException();
}
}

}
Loading

0 comments on commit a88560e

Please sign in to comment.