Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JS-13] Copy global model when accessing it from local tablet #14

Merged
merged 5 commits into from
Dec 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private CachedModelAccessor(@Parameter(DolphinParameters.ModelTableId.class) fin
final List<K> keyList = new ArrayList<>(keys.size());
try {
pullTracer.startTimer();
final Map<K, V> kvMap = modelTable.multiGetOrInit(keyList).get();
final Map<K, V> kvMap = modelTable.multiGetOrInit(keyList, true).get();
pullTracer.recordTime(keys.size());

kvMap.forEach(modelLoadingCache::put);
Expand All @@ -87,7 +87,7 @@ private LoadingCache<K, V> initCache() {
@Override
public V load(final K key) throws Exception {
pullTracer.startTimer();
final Future<V> pullFuture = modelTable.getOrInit(key);
final Future<V> pullFuture = modelTable.getOrInit(key, true);
final V value = pullFuture.get();
pullTracer.recordTime(1);
return value;
Expand All @@ -99,7 +99,7 @@ public Map<K, V> loadAll(final Iterable<? extends K> keys) throws Exception {
keys.forEach(keyList::add);

pullTracer.startTimer();
final Map<K, V> kvMap = modelTable.multiGetOrInit(keyList).get();
final Map<K, V> kvMap = modelTable.multiGetOrInit(keyList, true).get();
pullTracer.recordTime(kvMap.size());

return kvMap;
Expand Down Expand Up @@ -168,7 +168,7 @@ public List<V> pull(final List<K> keys) {
@Override
public List<V> pull(final List<K> keys, final Table<K, V, P> aModelTable) {
try {
final Map<K, V> result = aModelTable.multiGetOrInit(keys).get();
final Map<K, V> result = aModelTable.multiGetOrInit(keys, true).get();

final List<V> valueList = new ArrayList<>(keys.size());
keys.forEach(key -> valueList.add(result.get(key)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void push(final Map<K, P> keyToDeltaValueMap) {
public V pull(final K key) {
pullTracer.startTimer();

final Future<V> future = modelTable.getOrInit(key);
final Future<V> future = modelTable.getOrInit(key, true);
V result;
while (true) {
try {
Expand Down Expand Up @@ -104,7 +104,7 @@ public List<V> pull(final List<K> keys) {
@Override
public List<V> pull(final List<K> keys, final Table<K, V, P> aModelTable) {
try {
final Map<K, V> result = aModelTable.multiGetOrInit(keys).get();
final Map<K, V> result = aModelTable.multiGetOrInit(keys, true).get();

final List<V> valueList = new ArrayList<>(keys.size());
keys.forEach(key -> valueList.add(result.get(key)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private LDAStatCalculator(@Parameter(LDAParameters.Alpha.class) final double alp
final Document document = documentPair.getValue();
final LDALocalModel localModel;
try {
localModel = localModelTable.get(documentPair.getKey()).get();
localModel = localModelTable.get(documentPair.getKey(), false).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private void updateModel(final Map.Entry<Long, Document> documentPair, final LDA
final LDALocalModel localModel;
try {
final long docId = documentPair.getKey();
localModel = localModelTable.get(docId).get();
localModel = localModelTable.get(docId, false).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private NMFModel pullModelToEvaluate(final List<Integer> keys, final Table<Integ

private NMFLocalModel getLocalModel(final List<Integer> keys) {
try {
final Map<Integer, Vector> localModelMatrix = localModelTable.multiGetOrInit(keys).get();
final Map<Integer, Vector> localModelMatrix = localModelTable.multiGetOrInit(keys, false).get();
if (localModelMatrix.size() != keys.size()) {
throw new RuntimeException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,48 +91,64 @@ public interface Table<K, V, U> {
/**
* Retrieves the value to which the specified key is associated,
* or {@code null} if this table contains no value for the key.
* This method provides an option whether to retrieve the copy of value or the origin, when accessing local tablet.
* Retrieving the origin is good for performance, but users should be aware of that
* the type of value should be immutable or returned values can mutate without consistency.
* It returns a {@link Future} of result, which
* allows users to retrieve the result from the object when the request is complete.
* @param key key with which value is to be associated
* @param copy a boolean for whether to return the copy of data or the origin from the local tablet
* @return {@link Future} that will provide the value to which the specified key is associated,
* or {@code null} if no value is associated with the given key
*/
Future<V> get(K key);
Future<V> get(K key, boolean copy);

/**
* Retrieves the values to which the specified keys are associated.
* or {@code null} for the keys that this table contains no value.
* This method provides an option whether to retrieve the copy of value or the origin, when accessing local tablet.
* Retrieving the origin is good for performance, but users should be aware of that
* the type of value should be immutable or returned values can mutate without consistency.
* It returns a {@link Future} of result,
* which allows users to retrieve the result from the object when the request is complete.
* @param keys keys with which values are to be associated
* @param copy a boolean for whether to return the copy of data or the origin from the local tablet
* @return {@link Future} that will provide the map containing values to which the specified keys are associated,
* or {@code null} if no value is associated with the given keys
*/
Future<Map<K, V>> multiGet(List<K> keys);
Future<Map<K, V>> multiGet(List<K> keys, boolean copy);

/**
* Retrieves the value to which the specified key is associated.
* If this table contains no value for the key, it returns a value of {@link UpdateFunction#initValue(K)}
* after associating this value with the key.
* This method provides an option whether to retrieve the copy of value or the origin, when accessing local tablet.
* Retrieving the origin is good for performance, but users should be aware of that
* the type of value should be immutable or returned values can mutate without consistency.
* It returns a {@link Future} of result, which
* allows users to retrieve the result from the object when the request is complete.
* @param key key with which value is to be associated
* @param copy a boolean for whether to return the copy of data or the origin from the local tablet
* @return {@link Future} that will provide the value to which the specified key is associated,
* or a value obtained by {@link UpdateFunction#initValue(K)} if there is no mapping for the key
*/
Future<V> getOrInit(K key);
Future<V> getOrInit(K key, boolean copy);

/**
* Retrieves the values to which the specified keys are associated.
* For the entries that this table has not added yet, {@link UpdateFunction#initValue(K)} are associated and inserted
* to the table with the keys.
* This method provides an option whether to retrieve the copy of value or the origin, when accessing local tablet.
* Retrieving the origin is good for performance, but users should be aware of that
* the type of value should be immutable or returned values can mutate without consistency.
* It returns a {@link Future} of result,
* which allows users to retrieve the result from the object when the request is complete.
* @param keys keys with which values are to be associated
* @param copy a boolean for whether to return the copy of data or the origin from the local tablet
* @return {@link Future} that will provide the map containing values to which the specified keys are associated,
* or values obtained by {@link UpdateFunction#initValue(K)} for the keys that have no mapping
*/
Future<Map<K, V>> multiGetOrInit(List<K> keys);
Future<Map<K, V>> multiGetOrInit(List<K> keys, boolean copy);

/**
* Update a value associated with the specified key using {@link UpdateFunction}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private DataOpResult<V> putIfAbsentInternal(final K key, @Nonnull final V value,
}

@Override
public DataOpResult<V> get(final K key) {
public DataOpResult<V> get(final K key, final boolean copy) {
final EncodedKey<K> encodedKey = new EncodedKey<>(key, keyCodec);

final int blockId = blockPartitioner.getBlockId(encodedKey);
Expand All @@ -257,15 +257,18 @@ public DataOpResult<V> get(final K key) {
remoteIdOptional = remoteIdWithLock.getKey();

// execute operation in local, holding ownershipLock
if (!remoteIdOptional.isPresent()) {
if (!copy && !remoteIdOptional.isPresent()) {
final V result = tablet.get(blockId, key);
return new SingleKeyDataOpResult<>(result, true);
} else {
// TODO #11: Optimize local access routine of get operation
final String targetExecutorId = remoteIdOptional.orElse(executorId);

final DataOpResult<V> dataOpResult = new SingleKeyDataOpResult<>();
// send operation to remote
remoteAccessOpSender.sendSingleKeyOpToRemote(
OpType.GET, tableId, blockId, key, null, null,
remoteIdOptional.get(), true, tableComponents, dataOpResult);
targetExecutorId, true, tableComponents, dataOpResult);

return dataOpResult;
}
Expand All @@ -278,7 +281,7 @@ public DataOpResult<V> get(final K key) {
}

@Override
public Future<Map<K, V>> multiGet(final List<K> keys) {
public Future<Map<K, V>> multiGet(final List<K> keys, final boolean copy) {
final Map<Integer, List<K>> blockToKeyListMap = new HashMap<>();
for (final K key : keys) {
final int blockId = blockPartitioner.getBlockId(key);
Expand All @@ -293,7 +296,7 @@ public Future<Map<K, V>> multiGet(final List<K> keys) {
remoteIdOptional = remoteIdWithLock.getKey();
try {
// execute operation in local
if (!remoteIdOptional.isPresent()) {
if (!copy && !remoteIdOptional.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help me understand how values are copied when copy is true? Is it enough to put the request to a queue for handling remote operations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're copied through encoding and decoding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the operation should go into the queue not to get undetermined states by concurrent updates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way of copying have unnecessary overhead.
Issue #11.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. It looks working well; we may have to keep an eye on #11, though.
I will merge it.

final Map<K, V> localResultMap = new HashMap<>();
for (final K key : keyList) {
final V output = tablet.get(blockId, key);
Expand All @@ -303,10 +306,12 @@ public Future<Map<K, V>> multiGet(final List<K> keys) {
}
aggregateDataOpResult.onCompleted(localResultMap, true);
} else {
// TODO #11: Optimize local access routine of get operation
final String targetExecutorId = remoteIdOptional.orElse(executorId);

// send operation to remote
remoteAccessOpSender.sendMultiKeyOpToRemote(OpType.GET, tableId, blockId, keyList,
Collections.emptyList(), Collections.emptyList(), remoteIdOptional.get(),
Collections.emptyList(), Collections.emptyList(), targetExecutorId,
true, tableComponents, aggregateDataOpResult);
}
} catch (BlockNotExistsException e) {
Expand All @@ -321,7 +326,7 @@ public Future<Map<K, V>> multiGet(final List<K> keys) {
}

@Override
public Future<V> getOrInit(final K key) {
public Future<V> getOrInit(final K key, final boolean copy) {
final EncodedKey<K> encodedKey = new EncodedKey<>(key, keyCodec);

final int blockId = blockPartitioner.getBlockId(encodedKey);
Expand All @@ -333,15 +338,18 @@ public Future<V> getOrInit(final K key) {
remoteIdOptional = remoteIdWithLock.getKey();

// execute operation in local, holding ownershipLock
if (!remoteIdOptional.isPresent()) {
if (!copy && !remoteIdOptional.isPresent()) {
final V result = tablet.getOrInit(blockId, key);
return new SingleKeyDataOpResult<>(result, true);
} else {
// TODO #11: Optimize local access routine of get operation
final String targetExecutorId = remoteIdOptional.orElse(executorId);

final DataOpResult<V> dataOpResult = new SingleKeyDataOpResult<>();
// send operation to remote
remoteAccessOpSender.sendSingleKeyOpToRemote(
OpType.GET_OR_INIT, tableId, blockId, key, null, null,
remoteIdOptional.get(), true, tableComponents, dataOpResult);
targetExecutorId, true, tableComponents, dataOpResult);

return dataOpResult;
}
Expand All @@ -355,7 +363,7 @@ public Future<V> getOrInit(final K key) {
}

@Override
public Future<Map<K, V>> multiGetOrInit(final List<K> keys) {
public Future<Map<K, V>> multiGetOrInit(final List<K> keys, final boolean copy) {
final Map<Integer, List<K>> blockToKeyListMap = new HashMap<>();
for (final K key : keys) {
final int blockId = blockPartitioner.getBlockId(key);
Expand All @@ -370,7 +378,7 @@ public Future<Map<K, V>> multiGetOrInit(final List<K> keys) {
remoteIdOptional = remoteIdWithLock.getKey();
try {
// execute operation in local
if (!remoteIdOptional.isPresent()) {
if (!copy && !remoteIdOptional.isPresent()) {
final Map<K, V> localResultMap = new HashMap<>();
for (final K key : keyList) {
final V output = tablet.getOrInit(blockId, key);
Expand All @@ -380,10 +388,12 @@ public Future<Map<K, V>> multiGetOrInit(final List<K> keys) {
}
aggregateDataOpResult.onCompleted(localResultMap, true);
} else {
// TODO #11: Optimize local access routine of get operation
final String targetExecutorId = remoteIdOptional.orElse(executorId);

// send operation to remote
remoteAccessOpSender.sendMultiKeyOpToRemote(OpType.GET_OR_INIT, tableId, blockId, keyList,
Collections.emptyList(), Collections.emptyList(), remoteIdOptional.get(),
Collections.emptyList(), Collections.emptyList(), targetExecutorId,
true, tableComponents, aggregateDataOpResult);
}
} catch (BlockNotExistsException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void run() throws Exception {

for (int k = 0; k < numKeys; k++) {
final int key = startKey + k;
final Integer getResult = modelTable.get(key).get();
final Integer getResult = modelTable.get(key, false).get();
LOG.log(Level.INFO, "Get result for key {0}: {1}", new Object[]{key, getResult});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private boolean validate(final int expectedResult)

for (int i = 0; i < numKeys; i++) {
final int key = startKey + i;
final int result = modelTable.get(key).get();
final int result = modelTable.get(key, false).get();

if (expectedResult != result) {
LOG.log(Level.WARNING, "For key {0}, expected value {1} but received {2}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void run() throws Exception {

int itemCount = 0;
for (long key = 0; key < NUM_ITEMS; key++) {
final String value = table.get(key).get();
final String value = table.get(key, false).get();

if (value != null) { // value can be null, if the checkpoint was done with sampling
itemCount++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public void run() throws Exception {
final Table<Long, String, ?> hashedTable = tableAccessor.getTable(HASHED_TABLE_ID);
final Table<Long, String, ?> orderedTable = tableAccessor.getTable(ORDERED_TABLE_ID);

final String value00 = hashedTable.getOrInit(KEY0).get();
final String value01 = hashedTable.getOrInit(KEY1).get();
final String value10 = orderedTable.getOrInit(KEY0).get();
final String value11 = orderedTable.getOrInit(KEY1).get();
final String value00 = hashedTable.getOrInit(KEY0, false).get();
final String value01 = hashedTable.getOrInit(KEY1, false).get();
final String value10 = orderedTable.getOrInit(KEY0, false).get();
final String value11 = orderedTable.getOrInit(KEY1, false).get();

LOG.log(Level.INFO, "value for key {0} in a hashedTable is {1}", new Object[]{KEY0, value00});
LOG.log(Level.INFO, "value for key {0} in a hashedTable is {1}", new Object[]{KEY1, value01});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ public void run() {
for (int i = 0; i < NUM_OPERATIONS; i++) {
final long key = getKeyByTestType(i);
final String expectedValue = getExpectedValue(key, isUpdated, isNullTest);
final String value = table.get(key).get();
final String value = table.get(key, false).get();

// if data are removed, get method returns null
if (isNullTest) {
Expand Down