Skip to content

Commit

Permalink
Added support for progress reporting during a sync operation.
Browse files Browse the repository at this point in the history
  • Loading branch information
diamondq committed Jan 27, 2024
1 parent 4c06576 commit 886030f
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,39 +135,52 @@ public <A, B, A_KEY, B_KEY, A_FRAG, B_FRAG> ExtendedCompletionStage<SyncResult>

/* First, let's check if there is a hash to shortcut this */

pInfo.reportSyncStatus(true, SyncInfo.ActionType.GET_A_HASH);
long aHashStartTimer = System.currentTimeMillis();
Optional<String> aHashOpt = pInfo.getAHash();
result.aHashElapsedTime = System.currentTimeMillis() - aHashStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.GET_A_HASH);

if (aHashOpt.isPresent()) {

pInfo.reportSyncStatus(true, SyncInfo.ActionType.GET_B_HASH);
long bHashStartTimer = System.currentTimeMillis();
Optional<String> bHashOpt = pInfo.getBHash();
result.bHashElapsedTime = System.currentTimeMillis() - bHashStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.GET_B_HASH);

if (bHashOpt.isPresent()) {

/* If the hashes match, then we're done */

if (aHashOpt.get().equals(bHashOpt.get())) {
result.totalElapsedTime = System.currentTimeMillis() - startTimer;
return pInfo.complete().thenApply((ignored) -> result);
pInfo.reportSyncStatus(true, SyncInfo.ActionType.COMPLETE);
return pInfo.complete().thenApply((ignored) -> {
pInfo.reportSyncStatus(false, SyncInfo.ActionType.COMPLETE);
return result;
});
}
}
}

/* Stream the data into a set of data */

pInfo.reportSyncStatus(true, SyncInfo.ActionType.GET_A_SOURCE);
long aSourceStartTimer = System.currentTimeMillis();
ExtendedCompletionStage<@NotNull Map<@NotNull A_KEY, @NotNull A_FRAG>> aSourceFuture = pInfo.getASource()
.thenApply((source) -> {
result.aSourceLoadElapsedTime = System.currentTimeMillis() - aSourceStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.GET_A_SOURCE);
return source;
});

pInfo.reportSyncStatus(true, SyncInfo.ActionType.GET_B_SOURCE);
long bSourceStartTimer = System.currentTimeMillis();
ExtendedCompletionStage<@NotNull Map<@NotNull B_KEY, @NotNull B_FRAG>> bSourceFuture = pInfo.getBSource()
.thenApply((source) -> {
result.bSourceLoadElapsedTime = System.currentTimeMillis() - bSourceStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.GET_B_SOURCE);
return source;
});

Expand All @@ -178,6 +191,8 @@ public <A, B, A_KEY, B_KEY, A_FRAG, B_FRAG> ExtendedCompletionStage<SyncResult>

return aSourceFuture.thenCombine(bSourceFuture, (origAMap, origBMap) -> {

pInfo.reportSyncStatus(true, SyncInfo.ActionType.CATEGORIZE_A);

long categorizationStartTimer = System.currentTimeMillis();

result.aSourceCount = origAMap.size();
Expand All @@ -196,8 +211,11 @@ public <A, B, A_KEY, B_KEY, A_FRAG, B_FRAG> ExtendedCompletionStage<SyncResult>
boolean aModSupported = pInfo.isAModificationSupported();
boolean bModSupported = pInfo.isBModificationSupported();

pInfo.reportSyncStatusTotal(SyncInfo.ActionType.CATEGORIZE_A, result.aSourceCount);
for (Map.Entry<A_KEY, A_FRAG> aPair : aMap.entrySet()) {

pInfo.reportIncrementStatus(SyncInfo.ActionType.CATEGORIZE_A);

A_KEY aKey = aPair.getKey();
@SuppressWarnings("unchecked") B_KEY bKey = (keyTypesEqual ? (B_KEY) aKey : pInfo.convertAKeyToBKey(aKey));

Expand Down Expand Up @@ -254,10 +272,16 @@ public <A, B, A_KEY, B_KEY, A_FRAG, B_FRAG> ExtendedCompletionStage<SyncResult>
}
}

pInfo.reportSyncStatus(false, SyncInfo.ActionType.CATEGORIZE_A);
pInfo.reportSyncStatus(true, SyncInfo.ActionType.CATEGORIZE_B);

/* Anything left in B has to be processed */

pInfo.reportSyncStatusTotal(SyncInfo.ActionType.CATEGORIZE_A, bMap.size());
for (Map.Entry<B_KEY, B_FRAG> bPair : bMap.entrySet()) {

pInfo.reportIncrementStatus(SyncInfo.ActionType.CATEGORIZE_B);

B_KEY bKey = bPair.getKey();
@SuppressWarnings("unchecked") A_KEY aKey = (keyTypesEqual ? (A_KEY) bKey : pInfo.convertBKeyToAKey(bKey));
boolean wasDeleted = pInfo.getAStatus(aKey);
Expand All @@ -277,6 +301,7 @@ public <A, B, A_KEY, B_KEY, A_FRAG, B_FRAG> ExtendedCompletionStage<SyncResult>
}

result.categorizationElapsedTime = System.currentTimeMillis() - categorizationStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.CATEGORIZE_B);

return Sextet.with(aToBeCreated, aToBeDeleted, aToBeModified, bToBeCreated, bToBeDeleted, bToBeModified);
}).thenCompose((sextet) -> {
Expand Down Expand Up @@ -304,18 +329,26 @@ public <A, B, A_KEY, B_KEY, A_FRAG, B_FRAG> ExtendedCompletionStage<SyncResult>

/* Delete A records */

pInfo.reportSyncStatusTotal(SyncInfo.ActionType.DELETE_A, result.aToBeDeletedCount);
pInfo.reportSyncStatus(true, SyncInfo.ActionType.DELETE_A);
long aDeletedStartTimer = System.currentTimeMillis();
futures.add(pInfo.deleteA(aToBeDeleted.stream()).thenCompose((unused) -> {
futures.add(pInfo.deleteA(aToBeDeleted.stream()
.peek((pair) -> pInfo.reportIncrementStatus(SyncInfo.ActionType.DELETE_A))).thenCompose((unused) -> {

result.aToBeDeletedElapsedTime = System.currentTimeMillis() - aDeletedStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.DELETE_A);

/* Add A records */

pInfo.reportSyncStatus(true, SyncInfo.ActionType.CREATE_A);
pInfo.reportSyncStatusTotal(SyncInfo.ActionType.CREATE_A, result.aToBeCreatedCount);
long aCreatedStartTimer = System.currentTimeMillis();
return pInfo.createA(aToBeCreated.stream()
.peek((pair) -> pInfo.reportIncrementStatus(SyncInfo.ActionType.CREATE_A))
.map(SyncEngine.bToACreation(pInfo, keyTypesEqual, bFragTypeComplete, typesEqual)))
.<@Nullable Void>thenApply((ignored) -> {
result.aToBeCreatedElapsedTime = System.currentTimeMillis() - aCreatedStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.CREATE_A);
return null;
});

Expand All @@ -325,49 +358,69 @@ public <A, B, A_KEY, B_KEY, A_FRAG, B_FRAG> ExtendedCompletionStage<SyncResult>

/* Add A records */

pInfo.reportSyncStatusTotal(SyncInfo.ActionType.CREATE_A, result.aToBeCreatedCount);
pInfo.reportSyncStatus(true, SyncInfo.ActionType.CREATE_A);
long aCreatedStartTimer = System.currentTimeMillis();
futures.add(pInfo.createA(aToBeCreated.stream()
.peek((pair) -> pInfo.reportIncrementStatus(SyncInfo.ActionType.CREATE_A))
.map(SyncEngine.bToACreation(pInfo, keyTypesEqual, bFragTypeComplete, typesEqual)))
.<@Nullable Void>thenApply((ignored) -> {
result.aToBeCreatedElapsedTime = System.currentTimeMillis() - aCreatedStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.CREATE_A);
return null;
}));

/* Delete A records */

pInfo.reportSyncStatusTotal(SyncInfo.ActionType.DELETE_A, result.aToBeDeletedCount);
pInfo.reportSyncStatus(true, SyncInfo.ActionType.DELETE_A);
long aDeletedStartTimer = System.currentTimeMillis();
futures.add(pInfo.deleteA(aToBeDeleted.stream()).<@Nullable Void>thenApply((ignored) -> {
result.aToBeDeletedElapsedTime = System.currentTimeMillis() - aDeletedStartTimer;
return null;
}));
futures.add(pInfo.deleteA(aToBeDeleted.stream()
.peek((pair) -> pInfo.reportIncrementStatus(SyncInfo.ActionType.DELETE_A)))
.<@Nullable Void>thenApply((ignored) -> {
result.aToBeDeletedElapsedTime = System.currentTimeMillis() - aDeletedStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.DELETE_A);
return null;
}));
}

/* Modify A records */

pInfo.reportSyncStatusTotal(SyncInfo.ActionType.MODIFY_A, result.aToBeModifiedCount);
pInfo.reportSyncStatus(true, SyncInfo.ActionType.MODIFY_A);
long aModifiedStartTimer = System.currentTimeMillis();
futures.add(pInfo.modifyA(aToBeModified.stream()
.peek((pair) -> pInfo.reportIncrementStatus(SyncInfo.ActionType.MODIFY_A))
.map(SyncEngine.modifyA(pInfo, aFragTypeComplete, bFragTypeComplete))).<@Nullable Void>thenApply((ignored) -> {
result.aToBeModifiedElapsedTime = System.currentTimeMillis() - aModifiedStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.MODIFY_A);
return null;
}));

if (pInfo.isBDeleteBeforeCreate()) {

/* Delete B records */

pInfo.reportSyncStatusTotal(SyncInfo.ActionType.DELETE_B, result.bToBeDeletedCount);
pInfo.reportSyncStatus(true, SyncInfo.ActionType.DELETE_B);
long bDeletedStartTimer = System.currentTimeMillis();
futures.add(pInfo.deleteB(bToBeDeleted.stream()).thenCompose((unused) -> {
futures.add(pInfo.deleteB(bToBeDeleted.stream()
.peek((pair) -> pInfo.reportIncrementStatus(SyncInfo.ActionType.DELETE_B))).thenCompose((unused) -> {

result.bToBeDeletedElapsedTime = System.currentTimeMillis() - bDeletedStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.DELETE_B);

/* Add B records */

pInfo.reportSyncStatusTotal(SyncInfo.ActionType.CREATE_B, result.bToBeCreatedCount);
pInfo.reportSyncStatus(true, SyncInfo.ActionType.CREATE_B);
long bCreatedStartTimer = System.currentTimeMillis();

return pInfo.createB(bToBeCreated.stream()
.peek((pair) -> pInfo.reportIncrementStatus(SyncInfo.ActionType.CREATE_B))
.map(SyncEngine.aToBCreation(pInfo, keyTypesEqual, aFragTypeComplete, typesEqual)))
.<@Nullable Void>thenApply((ignored) -> {
result.bToBeCreatedElapsedTime = System.currentTimeMillis() - bCreatedStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.CREATE_B);
return null;
});

Expand All @@ -377,38 +430,55 @@ public <A, B, A_KEY, B_KEY, A_FRAG, B_FRAG> ExtendedCompletionStage<SyncResult>

/* Add B records */

pInfo.reportSyncStatusTotal(SyncInfo.ActionType.CREATE_B, result.bToBeCreatedCount);
pInfo.reportSyncStatus(true, SyncInfo.ActionType.CREATE_B);
long bCreatedStartTimer = System.currentTimeMillis();
futures.add(pInfo.createB(bToBeCreated.stream()
.peek((pair) -> pInfo.reportIncrementStatus(SyncInfo.ActionType.CREATE_B))
.map(SyncEngine.aToBCreation(pInfo, keyTypesEqual, aFragTypeComplete, typesEqual)))
.<@Nullable Void>thenApply((ignored) -> {
result.bToBeCreatedElapsedTime = System.currentTimeMillis() - bCreatedStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.CREATE_B);
return null;
}));

/* Delete B records */

pInfo.reportSyncStatusTotal(SyncInfo.ActionType.DELETE_B, result.bToBeDeletedCount);
pInfo.reportSyncStatus(true, SyncInfo.ActionType.DELETE_B);
long bDeletedStartTimer = System.currentTimeMillis();
futures.add(pInfo.deleteB(bToBeDeleted.stream()).<@Nullable Void>thenApply((ignored) -> {
result.bToBeDeletedElapsedTime = System.currentTimeMillis() - bDeletedStartTimer;
return null;
}));
futures.add(pInfo.deleteB(bToBeDeleted.stream()
.peek((pair) -> pInfo.reportIncrementStatus(SyncInfo.ActionType.DELETE_B)))
.<@Nullable Void>thenApply((ignored) -> {
result.bToBeDeletedElapsedTime = System.currentTimeMillis() - bDeletedStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.DELETE_B);
return null;
}));

}

/* Modify B records */

pInfo.reportSyncStatusTotal(SyncInfo.ActionType.MODIFY_B, result.bToBeModifiedCount);
pInfo.reportSyncStatus(true, SyncInfo.ActionType.MODIFY_B);
long bModifiedStartTimer = System.currentTimeMillis();
futures.add(pInfo.modifyB(bToBeModified.stream()
.peek((pair) -> pInfo.reportIncrementStatus(SyncInfo.ActionType.MODIFY_B))
.map(SyncEngine.modifyB(pInfo, aFragTypeComplete, bFragTypeComplete))).<@Nullable Void>thenApply((ignored) -> {
result.bToBeModifiedElapsedTime = System.currentTimeMillis() - bModifiedStartTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.MODIFY_B);
return null;
}));

/* Now set up a future for all these */

return ExtendedCompletableFuture.newCompletableFuture().relatedAllOf(futures);
}).thenCompose((ignored) -> pInfo.complete()).thenApply((ignored) -> {
}).thenCompose((ignored) -> {
pInfo.reportSyncStatus(true, SyncInfo.ActionType.COMPLETE);
return pInfo.complete();
}).thenApply((ignored) -> {
result.totalElapsedTime = System.currentTimeMillis() - startTimer;
pInfo.reportSyncStatus(false, SyncInfo.ActionType.COMPLETE);
return result;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,82 @@ default Optional<String> getBHash() {
* @return future to indicate success or failure
*/
ExtendedCompletionStage<@Nullable Void> complete();

enum ActionType {
/**
* Getting the hash of the A data
*/
GET_A_HASH,
/**
* Getting the hash of the B data
*/
GET_B_HASH,
/**
* Getting the actual A data
*/
GET_A_SOURCE,
/**
* Getting the actual B data
*/
GET_B_SOURCE,
/**
* Calling the completion
*/
COMPLETE,
/**
* Process A data
*/
CATEGORIZE_A,
/**
* Process B data
*/
CATEGORIZE_B,
/**
* Delete A
*/
DELETE_A,
/**
* Delete B
*/
DELETE_B,
/**
* Create A
*/
CREATE_A,
/**
* Create B
*/
CREATE_B,
/**
* Modify A
*/
MODIFY_A,
/**
* Modify B
*/
MODIFY_B
}

/**
* Called to report either starting or ending a particular action
*
* @param pIsStart true if starting or false if ending
* @param pType the type
*/
default void reportSyncStatus(boolean pIsStart, ActionType pType) {}

/**
* Called to set the total (used for progress bars) on an action
*
* @param pType the action
* @param pTotal the total number of steps
*/
default void reportSyncStatusTotal(ActionType pType, int pTotal) {}

/**
* Called to increment the completion of a step on an action
*
* @param pType the action
*/
default void reportIncrementStatus(ActionType pType) {}
}

0 comments on commit 886030f

Please sign in to comment.