diff --git a/src/main/java/com/uid2/operator/service/IModifiedBucketReadWriteStream.java b/src/main/java/com/uid2/operator/service/IModifiedBucketReadWriteStream.java new file mode 100644 index 000000000..6720e49d4 --- /dev/null +++ b/src/main/java/com/uid2/operator/service/IModifiedBucketReadWriteStream.java @@ -0,0 +1,74 @@ +package com.uid2.operator.service; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.Pipe; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +public interface IModifiedBucketReadWriteStream extends ReadStream, WriteStream { + IModifiedBucketReadWriteStream exceptionHandler(Handler handler); + + Future write(Buffer buffer); + + void write(Buffer buffer, Handler> handler); + + void end(Handler> handler); + + WriteStream setWriteQueueMaxSize(int i); + + boolean writeQueueFull(); + + WriteStream drainHandler(Handler handler); + + ReadStream handler(Handler handler); + + ReadStream pause(); + + ReadStream resume(); + + ReadStream fetch(long l); + + ReadStream endHandler(Handler handler); + + @Override + default Pipe pipe() { + return ReadStream.super.pipe(); + } + + @Override + default Future pipeTo(WriteStream dst) { + return ReadStream.super.pipeTo(dst); + } + + @Override + default void pipeTo(WriteStream dst, Handler> handler) { + ReadStream.super.pipeTo(dst, handler); + } + + default void succeededAsyncResult(Handler> handler) { + handler.handle(new AsyncResult<>() { + @Override + public Void result() { + return null; + } + + @Override + public Throwable cause() { + return null; + } + + @Override + public boolean succeeded() { + return true; + } + + @Override + public boolean failed() { + return false; + } + }); + } +} diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java new file mode 100644 index 000000000..70028046e --- /dev/null +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketEncodeStream.java @@ -0,0 +1,171 @@ +package com.uid2.operator.service; + +import com.uid2.shared.Utils; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; + + +public class ModifiedBucketEncodeStream implements IModifiedBucketReadWriteStream { + private static final Logger LOGGER = LoggerFactory.getLogger(ModifiedBucketEncodeStream.class); + + private final Context context; + + private Handler endHandler; + private Handler dataHandler; + private Handler drainHandler; // used by pipe + + private boolean readInProgress; + private boolean incomingStreamEnded = false; + private boolean outgoingStreamEnded = false; + private long maxBufferSizeBytes = 5242880; // 5 MB + private long demand = Long.MAX_VALUE; + + Buffer data; + + public ModifiedBucketEncodeStream(Context context) { + this.context = context; + this.data = Buffer.buffer(); + } + + @Override + public synchronized IModifiedBucketReadWriteStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public synchronized Future write(Buffer buffer) { + synchronized (this) { + data.appendBuffer(buffer); + } + return Future.succeededFuture(); + } + + @Override + public synchronized void write(Buffer buffer, Handler> handler) { + synchronized (this) { + data.appendBuffer(buffer); + } + succeededAsyncResult(handler); + } + + @Override + public synchronized void end(Handler> handler) { + this.incomingStreamEnded = true; + succeededAsyncResult(handler); + } + + @Override + public synchronized WriteStream setWriteQueueMaxSize(int i) { + maxBufferSizeBytes = i; + return this; + } + + @Override + public synchronized boolean writeQueueFull() { + return data.length() > maxBufferSizeBytes; + } + + @Override + public synchronized WriteStream drainHandler(Handler handler) { + this.drainHandler = handler; + return this; + } + + // ReadStream methods + + @Override + public synchronized ReadStream handler(Handler handler) { + this.dataHandler = handler; + if (handler != null && demand > 0) { + read(); + } + return this; + } + + @Override + public synchronized ReadStream pause() { + this.demand = 0; + return this; + } + + @Override + public synchronized ReadStream resume() { + fetch(Long.MAX_VALUE); + return this; + } + + @Override + public synchronized ReadStream fetch(long amount) { + demand = Long.MAX_VALUE - amount >= demand ? demand + amount : Long.MAX_VALUE; + read(); + return this; + } + + @Override + public synchronized ReadStream endHandler(Handler handler) { + this.endHandler = handler; + return this; + } + + private synchronized void read() { + if (this.readInProgress) { + if (!incomingStreamEnded || !outgoingStreamEnded) { + this.context.runOnContext(v -> this.read()); + } + return; + } + + if (demand <= 0) { + return; + } + demand--; + + this.readInProgress = true; + + this.context.executeBlocking(() -> { + String chunk = ""; + if (data.length() == 0) { + return chunk; + } + synchronized (this) { + if (data.length() % 3 == 0 || incomingStreamEnded) { + chunk = Utils.toBase64String(data.getBytes()); + data = Buffer.buffer(); + } else if ((data.length() - 1) % 3 == 0) { + chunk = Utils.toBase64String(Arrays.copyOfRange(data.getBytes(), 0, data.length() - 1)); + data = Buffer.buffer(Arrays.copyOfRange(data.getBytes(), data.length() - 1, data.length())); + } else { + chunk = Utils.toBase64String(Arrays.copyOfRange(data.getBytes(), 0, data.length() - 2)); + data = Buffer.buffer(Arrays.copyOfRange(data.getBytes(), data.length() - 2, data.length())); + } + + if(incomingStreamEnded) { + outgoingStreamEnded = true; + } + } + return chunk; + }, asyncResult -> { + this.dataHandler.handle(Buffer.buffer(asyncResult.result())); + this.readInProgress = false; + scheduleNextRead(); + }); + } + + private synchronized void scheduleNextRead() { + if (demand > 0 && (!incomingStreamEnded || !outgoingStreamEnded)) { + context.runOnContext(unused -> read()); + } else if (outgoingStreamEnded && endHandler != null) { + endHandler.handle(null); + } + } +} diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java new file mode 100644 index 000000000..0d1235382 --- /dev/null +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketEncryptStream.java @@ -0,0 +1,196 @@ +package com.uid2.operator.service; + +import com.uid2.shared.encryption.Random; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.crypto.Cipher; +import javax.crypto.NoSuchPaddingException; +import javax.crypto.SecretKey; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.security.InvalidAlgorithmParameterException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; + + +public class ModifiedBucketEncryptStream implements IModifiedBucketReadWriteStream { + private static final Logger LOGGER = LoggerFactory.getLogger(ModifiedBucketEncryptStream.class); + + private final Context context; + + private Handler endHandler; + private Handler dataHandler; + private Handler drainHandler; // used by pipe + + private boolean readInProgress; + private boolean wroteStart = false; + private boolean incomingStreamEnded = false; + private boolean outgoingStreamEnded = false; + private long maxBufferSizeBytes = 5242880; + private long demand = Long.MAX_VALUE; + + Buffer data; + + final String cipherScheme = "AES/GCM/NoPadding"; + final int GCM_AUTHTAG_LENGTH = 16; + final int GCM_IV_LENGTH = 12; + final SecretKey k; + final byte[] nonce; + final Cipher c = Cipher.getInstance(cipherScheme); + final byte[] ivBytes = Random.getBytes(GCM_IV_LENGTH); + GCMParameterSpec gcmParameterSpec; + + public ModifiedBucketEncryptStream(Context context, byte[] encryptionKey, byte[] nonce) throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException { + this.context = context; + this.data = Buffer.buffer(); + this.k = new SecretKeySpec(encryptionKey, "AES"); + this.nonce = nonce; + this.gcmParameterSpec = new GCMParameterSpec(GCM_AUTHTAG_LENGTH * 8, ivBytes); + c.init(Cipher.ENCRYPT_MODE, k, gcmParameterSpec); + } + + @Override + public synchronized IModifiedBucketReadWriteStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public synchronized Future write(Buffer buffer) { + synchronized (this) { + data.appendBuffer(buffer); + } + return Future.succeededFuture(); + } + + @Override + public synchronized void write(Buffer buffer, Handler> handler) { + synchronized (this) { + data.appendBuffer(buffer); + } + succeededAsyncResult(handler); + } + + @Override + public synchronized void end(Handler> handler) { + this.incomingStreamEnded = true; + succeededAsyncResult(handler); + } + + @Override + public synchronized WriteStream setWriteQueueMaxSize(int i) { + maxBufferSizeBytes = i; + return this; + } + + @Override + public synchronized boolean writeQueueFull() { + return data.length() > maxBufferSizeBytes; + } + + @Override + public synchronized WriteStream drainHandler(Handler handler) { + this.drainHandler = handler; + return this; + } + + // ReadStream methods + + @Override + public synchronized ReadStream handler(Handler handler) { + this.dataHandler = handler; + if (handler != null && demand > 0) { + read(); + } + return this; + } + + @Override + public synchronized ReadStream pause() { + this.demand = 0; + return this; + } + + @Override + public synchronized ReadStream resume() { + fetch(Long.MAX_VALUE); + return this; + } + + @Override + public synchronized ReadStream fetch(long amount) { + demand = Long.MAX_VALUE - amount >= demand ? demand + amount : Long.MAX_VALUE; + read(); + return this; + } + + @Override + public synchronized ReadStream endHandler(Handler handler) { + this.endHandler = handler; + return this; + } + + private void read() { + if (this.readInProgress) { + if (!incomingStreamEnded || !outgoingStreamEnded) { + this.context.runOnContext(v -> this.read()); + } + return; + } + + if (demand <= 0) { + return; + } + demand--; + + this.readInProgress = true; + + this.context.executeBlocking(() -> { + Buffer chunk = Buffer.buffer(); + if (data.length() == 0) { + return chunk; + } + + if (!wroteStart) { + chunk.appendBytes(ivBytes); + Buffer b = Buffer.buffer(); + b.appendLong(EncodingUtils.NowUTCMillis().toEpochMilli()); + b.appendBytes(this.nonce); + chunk.appendBytes(c.update(b.getBytes())); + wroteStart = true; + } + + synchronized (this) { + if (!incomingStreamEnded) { + chunk.appendBytes(c.update(data.getBytes())); + data = Buffer.buffer(); + } else { + chunk.appendBytes(c.doFinal(data.getBytes())); + data = Buffer.buffer(); + outgoingStreamEnded = true; + } + return chunk; + } + }, asyncResult -> { + this.dataHandler.handle(asyncResult.result()); + this.readInProgress = false; + scheduleNextRead(); + }); + } + + private synchronized void scheduleNextRead() { + if (demand > 0 && (!incomingStreamEnded || !outgoingStreamEnded)) { + context.runOnContext(unused -> read()); + } else if (outgoingStreamEnded && endHandler != null) { + endHandler.handle(null); + } + } +} diff --git a/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java b/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java new file mode 100644 index 000000000..ff94048d7 --- /dev/null +++ b/src/main/java/com/uid2/operator/service/ModifiedBucketReadStream.java @@ -0,0 +1,144 @@ +package com.uid2.operator.service; + +import com.uid2.shared.model.SaltEntry; +import io.vertx.core.*; +import io.vertx.core.AsyncResult; +import io.vertx.core.Context; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.streams.Pipe; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ModifiedBucketReadStream implements ReadStream { + private static final Logger LOGGER = LoggerFactory.getLogger(ModifiedBucketReadStream.class); + private static final DateTimeFormatter APIDateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC")); + + private final Context context; + + private Handler dataHandler; + private Handler endHandler; + + private boolean readInProgress; + private boolean wroteStart = false; + private boolean streamEnded = false; + private long demand = Long.MAX_VALUE; + private final int chunkSize; + + private final List modified; + + public ModifiedBucketReadStream(Context context, List modified, int chunkSize) { + this.context = context; + this.modified = modified; + this.chunkSize = chunkSize; + } + + private String makeSaltEntriesString() { + StringBuilder s = new StringBuilder(); + for (int i = 0; i < chunkSize && i < modified.size(); i++) { + SaltEntry e = modified.remove(0); + s.append("{\"bucket_id\":\"") + .append(e.getHashedId()) + .append("\",\"last_updated\":\"") + .append(APIDateTimeFormatter.format(Instant.ofEpochMilli(e.getLastUpdated()))) + .append("\"},"); + } + return s.toString(); + } + + @Override + public synchronized ReadStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public synchronized ReadStream handler(Handler handler) { + this.dataHandler = handler; + if (handler != null && demand > 0) { + read(); + } + return this; + } + + @Override + public synchronized ReadStream pause() { + this.demand = 0; + return this; + } + + @Override + public synchronized ReadStream resume() { + fetch(Long.MAX_VALUE); + return this; + } + + @Override + public synchronized ReadStream fetch(long amount) { + demand = Long.MAX_VALUE - amount >= demand ? demand + amount : Long.MAX_VALUE; + read(); + return this; + } + + @Override + public synchronized ReadStream endHandler(Handler handler) { + this.endHandler = handler; + return this; + } + + private void read() { + if (this.readInProgress) { + if (!streamEnded && !modified.isEmpty()) { + this.context.runOnContext(v -> this.read()); + } + return; + } + + if (demand <= 0) { + return; + } + demand--; + + this.readInProgress = true; + + this.context.executeBlocking(() -> { + StringBuilder salts = new StringBuilder(); + + if (!wroteStart) { + salts.append("{\"body\":["); + wroteStart = true; + } + + salts.append(makeSaltEntriesString()); + + if (modified.isEmpty()) { + salts.deleteCharAt(salts.length() - 1); // remove trailing comma + salts.append("],\"status\":\"success\"}"); + streamEnded = true; + } + return salts.toString(); + }, asyncResult -> { + this.dataHandler.handle(Buffer.buffer(asyncResult.result().getBytes())); + this.readInProgress = false; + scheduleNextRead(); + }); + } + + private synchronized void scheduleNextRead() { + if (demand > 0 && !streamEnded) { + context.runOnContext(unused -> read()); + } else if (streamEnded && endHandler != null) { + endHandler.handle(null); + } + } +} diff --git a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java index e9c59af56..c57fb3493 100644 --- a/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java +++ b/src/main/java/com/uid2/operator/vertx/UIDOperatorVerticle.java @@ -16,9 +16,11 @@ import com.uid2.operator.util.PrivacyBits; import com.uid2.operator.util.Tuple; import com.uid2.shared.Const.Data; +import com.uid2.shared.InstantClock; import com.uid2.shared.Utils; import com.uid2.shared.auth.*; import com.uid2.shared.encryption.AesGcm; +import com.uid2.shared.encryption.Random; import com.uid2.shared.health.HealthComponent; import com.uid2.shared.health.HealthManager; import com.uid2.shared.middleware.AuthMiddleware; @@ -528,6 +530,12 @@ public void handleKeysRequest(RoutingContext rc) { private String getSharingTokenExpirySeconds() { return config.getString(Const.Config.SharingTokenExpiryProp); } + private int getMaxIdentityBucketsResponseEntries() { + return config.getInteger(Const.Config.MaxIdentityBucketsResponseEntries, 1048576); + } + private int getIdentityBucketsResponseChunkSize() { + return config.getInteger(Const.Config.IdentityBucketsResponseChunkSize, 1048576); + } public void handleKeysSharing(RoutingContext rc) { try { @@ -1088,17 +1096,12 @@ private void handleBucketsV1(RoutingContext rc) { return; } final List modified = this.idService.getModifiedBuckets(sinceTimestamp); - final JsonArray resp = new JsonArray(); if (modified != null) { - for (SaltEntry e : modified) { - final JsonObject o = new JsonObject(); - o.put("bucket_id", e.getHashedId()); - Instant lastUpdated = Instant.ofEpochMilli(e.getLastUpdated()); - - o.put("last_updated", APIDateTimeFormatter.format(lastUpdated)); - resp.add(o); + if (modified.size() > getMaxIdentityBucketsResponseEntries()) { + ResponseUtil.ClientError(rc, "provided since_timestamp produced large response. please provide a more recent since_timestamp or remap all with /identity/map"); + return; } - ResponseUtil.Success(rc, resp); + transmitModifiedBucketsInChunks(rc, modified); } } else { ResponseUtil.ClientError(rc, "missing parameter since_timestamp"); @@ -1119,23 +1122,47 @@ private void handleBucketsV2(RoutingContext rc) { return; } final List modified = this.idService.getModifiedBuckets(sinceTimestamp); - final JsonArray resp = new JsonArray(); if (modified != null) { - for (SaltEntry e : modified) { - final JsonObject o = new JsonObject(); - o.put("bucket_id", e.getHashedId()); - Instant lastUpdated = Instant.ofEpochMilli(e.getLastUpdated()); - - o.put("last_updated", APIDateTimeFormatter.format(lastUpdated)); - resp.add(o); + if (modified.size() > getMaxIdentityBucketsResponseEntries()) { + ResponseUtil.ClientError(rc, "provided since_timestamp produced large response. please provide a more recent since_timestamp or remap all with /identity/map"); + return; + } + try { + transmitModifiedBucketsInChunksEncrypted(rc, modified); + } catch (InvalidKeyException | InvalidAlgorithmParameterException | NoSuchAlgorithmException | + NoSuchPaddingException | IllegalBlockSizeException | BadPaddingException e) { + ResponseUtil.Error(ResponseUtil.ResponseStatus.GenericError, 500, rc, ""); } - ResponseUtil.SuccessV2(rc, resp); } } else { ResponseUtil.ClientError(rc, "missing parameter since_timestamp"); } } + private void transmitModifiedBucketsInChunks(RoutingContext rc, List modified) { + HttpServerResponse response = rc.response(); + + ModifiedBucketReadStream readStream = new ModifiedBucketReadStream(this.vertx.getOrCreateContext(), modified, getIdentityBucketsResponseChunkSize()); + + response.setChunked(true).putHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + readStream.pipe().endOnSuccess(true).to(response); + } + + private void transmitModifiedBucketsInChunksEncrypted(RoutingContext rc, List modified) throws NoSuchPaddingException, NoSuchAlgorithmException, InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException, BadPaddingException { + V2RequestUtil.V2Request request = V2RequestUtil.parseRequest(rc.body().asString(), AuthMiddleware.getAuthClient(ClientKey.class, rc), new InstantClock()); + HttpServerResponse response = rc.response(); + + ModifiedBucketReadStream readStream = new ModifiedBucketReadStream(this.vertx.getOrCreateContext(), modified, getIdentityBucketsResponseChunkSize()); + ModifiedBucketEncryptStream encryptStream = new ModifiedBucketEncryptStream(this.vertx.getOrCreateContext(), request.encryptionKey, request.nonce); + ModifiedBucketEncodeStream encodeStream = new ModifiedBucketEncodeStream(this.vertx.getOrCreateContext()); + + response.setChunked(true).putHeader(HttpHeaders.CONTENT_TYPE, "text/plain"); + readStream.pipe().endOnSuccess(true).to(encryptStream); + encryptStream.pipe().endOnSuccess(true).to(encodeStream); + encodeStream.pipe().to(response); + + } + private void handleIdentityMapV1(RoutingContext rc) { final InputUtil.InputVal input = this.phoneSupport ? this.getTokenInputV1(rc) : this.getTokenInput(rc); if (this.phoneSupport ? !checkTokenInputV1(input, rc) : !checkTokenInput(input, rc)) { diff --git a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java index e59e49619..bb503ef74 100644 --- a/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java +++ b/src/main/java/com/uid2/operator/vertx/V2PayloadHandler.java @@ -170,15 +170,19 @@ public void handleTokenRefresh(RoutingContext rc, Handler apiHan private void passThrough(RoutingContext rc, Handler apiHandler) { rc.data().put("request", rc.body().asJsonObject()); apiHandler.handle(rc); - if (rc.response().getStatusCode() != 200) { + if (rc.response().getStatusCode() != 200 || rc.response().ended() || rc.request().path().contains("/identity/buckets")) { return; } JsonObject respJson = (JsonObject) rc.data().get("response"); rc.response().putHeader(HttpHeaders.CONTENT_TYPE, "application/json") .end(respJson.encode()); + } private void writeResponse(RoutingContext rc, byte[] nonce, JsonObject resp, byte[] keyBytes) { + if (rc.response().ended() || rc.request().path().contains("/identity/buckets")) { + return; + } Buffer buffer = Buffer.buffer(); buffer.appendLong(EncodingUtils.NowUTCMillis().toEpochMilli()); buffer.appendBytes(nonce); diff --git a/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java b/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java index 5bbb03bfa..c7914eeac 100644 --- a/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java +++ b/src/test/java/com/uid2/operator/UIDOperatorVerticleTest.java @@ -61,8 +61,11 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -78,6 +81,10 @@ @ExtendWith(VertxExtension.class) public class UIDOperatorVerticleTest { + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(UIDOperatorVerticleTest.class); + + private final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); private static final Instant legacyClientCreationDateTime = Instant.ofEpochSecond(OPT_OUT_CHECK_CUTOFF_DATE).minus(1, ChronoUnit.SECONDS); private static final Instant newClientCreationDateTime = Instant.ofEpochSecond(OPT_OUT_CHECK_CUTOFF_DATE).plus(1, ChronoUnit.SECONDS); @@ -155,6 +162,9 @@ private void setupConfig(JsonObject config) { config.put("client_side_token_generate_log_invalid_http_origins", true); config.put(Const.Config.AllowClockSkewSecondsProp, 3600); + + config.put(Const.Config.IdentityBucketsResponseChunkSize, 100); + config.put(Const.Config.MaxIdentityBucketsResponseEntries, 500000); } private static byte[] makeAesKey(String prefix) { @@ -209,9 +219,11 @@ private void send(String apiVersion, Vertx vertx, String endpoint, boolean isV1G assertEquals(expectedHttpCode, ar.result().statusCode()); if (ar.result().statusCode() == 200) { + LOGGER.info(ar.result().bodyAsString()); byte[] decrypted = AesGcm.decrypt(Utils.decodeBase64String(ar.result().bodyAsString()), 0, ck.getSecretBytes()); assertArrayEquals(Buffer.buffer().appendLong(nonce).getBytes(), Buffer.buffer(decrypted).slice(8, 16).getBytes()); + System.out.println(new String(decrypted, 16, decrypted.length - 16, StandardCharsets.UTF_8)); JsonObject respJson = new JsonObject(new String(decrypted, 16, decrypted.length - 16, StandardCharsets.UTF_8)); handler.handle(respJson); @@ -802,6 +814,65 @@ RefreshToken decodeRefreshToken(EncryptedTokenEncoder encoder, String refreshTok return decodeRefreshToken(encoder, refreshTokenString, IdentityType.Email); } + private void validateIdentityBuckets(List expectedList, JsonArray actualList) { + final DateTimeFormatter APIDateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC")); + + assertEquals(expectedList.size(), actualList.size()); + for(int i = 0; i < actualList.size(); i++) { + JsonObject actual = actualList.getJsonObject(i); + SaltEntry expected = expectedList.get(i); + assertAll("Salt Entry Matches", + () -> assertEquals(expected.getHashedId(), actual.getString("bucket_id")), + () -> assertEquals(APIDateTimeFormatter.format(Instant.ofEpochMilli(expected.getLastUpdated())), actual.getString("last_updated"))); + } + } + + @ParameterizedTest + @CsvSource({"v1,1", "v2,1", "v1,11", "v2,11", "v1,15", "v2,15", "v1,20","v2,20", "v1,30", "v2,30", "v1,35", "v2,35", "v1,45", "v2,45", "v1,50", "v2,50", "v1,10001", "v2,10001", "v1,255000", "v2,255000", "v1,500000", "v2,500000"}) + void identityBucketsChunked(String apiVersion, int numModifiedSalts, Vertx vertx, VertxTestContext testContext) throws InterruptedException { + final int clientSiteId = 201; + fakeAuth(clientSiteId, newClientCreationDateTime, Role.MAPPER); + List modifiedSalts = new ArrayList<>(); + List actualSalts = new ArrayList<>(); + for(int i = 0; i < numModifiedSalts; i++) { + modifiedSalts.add(new SaltEntry(i, "someId" + i, i, "someSalt")); + actualSalts.add(new SaltEntry(i, "someId" + i, i, "someSalt")); + } + when(saltProviderSnapshot.getModifiedSince(any())).thenReturn(modifiedSalts); + + JsonObject req = new JsonObject(); + req.put("since_timestamp", "2023-04-08T13:00:00"); + + send(apiVersion, vertx, apiVersion + "/identity/buckets", true, "since_timestamp=" +urlEncode("2023-04-08T13:00:00"), req, 200, respJson -> { + assertTrue(respJson.containsKey("body")); + assertFalse(respJson.containsKey("client_error")); + validateIdentityBuckets(actualSalts, respJson.getJsonArray("body")); + testContext.completeNow(); + }); + } + + @ParameterizedTest + @ValueSource(strings = {"v1", "v2"}) + void identityBucketsLimit(String apiVersion, Vertx vertx, VertxTestContext testContext) { + final int clientSiteId = 201; + fakeAuth(clientSiteId, newClientCreationDateTime, Role.MAPPER); + List modifiedSalts = new ArrayList<>(); + for(int i = 0; i < 51; i++) { + modifiedSalts.add(new SaltEntry(i, "someId" + i, i, "someSalt")); + } + when(saltProviderSnapshot.getModifiedSince(any())).thenReturn(modifiedSalts); + + JsonObject req = new JsonObject(); + req.put("since_timestamp", "2023-04-08T13:00:00"); + + send(apiVersion, vertx, apiVersion + "/identity/buckets", true, "since_timestamp=" + urlEncode("2023-04-08T13:00:00"), req, 400, respJson -> { + assertFalse(respJson.containsKey("body")); + assertEquals("client_error", respJson.getString("status")); + assertEquals("provided since_timestamp produced large response. please provide a more recent since_timestamp or remap all with /identity/map", respJson.getString("message")); + testContext.completeNow(); + }); + } + @ParameterizedTest @ValueSource(strings = {"v1", "v2"}) void identityMapNewClientNoPolicySpecified(String apiVersion, Vertx vertx, VertxTestContext testContext) {