Skip to content

Commit

Permalink
[MIST-1157] NullPointerException in QueryAllocationManager (#1160)
Browse files Browse the repository at this point in the history
This PR closes #1157 via
* Fix `NullPointerException` in ApplicationAwareQueryAllocationManager by adopting global read/write lock.
* Update `AppTaskInfoMap` up-to-date.
  • Loading branch information
DifferentSC authored and taegeonum committed Jul 11, 2018
1 parent 10a2e52 commit 3e14542
Show file tree
Hide file tree
Showing 18 changed files with 474 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ public void onNext(final FailedEvaluator failedEvaluator) {
runningTaskInfoStore.remove(failedTaskId);
proxyToMaster.notifyFailedTask(failedTaskId);
} catch (final AvroRemoteException e) {
e.printStackTrace();
LOG.log(Level.SEVERE, "Cannot connect to MistMaster for notifying failure! " + e.toString());
throw new IllegalStateException("Cannot connect to MistMaster while failure recovery");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public final class DefaultTaskRequestorImpl implements TaskRequestor {
*/
private CountDownLatch countDownLatch;

/**
* The shared task info read/write lock.
*/
private TaskInfoRWLock taskInfoRWLock;

@Inject
private DefaultTaskRequestorImpl(
final TaskStatsMap taskStatsMap,
Expand All @@ -132,7 +137,8 @@ private DefaultTaskRequestorImpl(
@Parameter(NewRatio.class) final int newRatio,
@Parameter(ReservedCodeCacheSize.class) final int reservedCodeCacheSize,
final MistCommonConfigs commonConfigs,
final MistTaskConfigs taskConfigs) throws IOException {
final MistTaskConfigs taskConfigs,
final TaskInfoRWLock taskInfoRWLock) throws IOException {
this.taskIdIndex = 0;
this.taskStatsMap = taskStatsMap;
this.proxyToTaskMap = proxyToTaskMap;
Expand Down Expand Up @@ -190,6 +196,7 @@ public synchronized void recoverTaskConn() {
// Retrieve the internal data structures.
for (final TaskInfo taskInfo : runningTaskIdList) {
final String taskId = taskInfo.getTaskId();
taskInfoRWLock.writeLock().lock();
taskStatsMap.addTask(taskId);
taskAddressInfoMap.put(taskInfo.getTaskId(), new TaskAddressInfo(taskInfo.getTaskHostname(),
taskInfo.getClientToTaskPort(), taskInfo.getMasterToTaskPort()));
Expand All @@ -199,7 +206,9 @@ public synchronized void recoverTaskConn() {
taskInfo.getTaskHostname(), taskInfo.getMasterToTaskPort())));
} catch (final IOException e) {
e.printStackTrace();
return;
throw new RuntimeException("Failed to recovery the connection to mist task...");
} finally {
taskInfoRWLock.writeLock().unlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,34 @@ public final class ProxyToTaskMap {

private ConcurrentMap<String, MasterToTaskMessage> innerMap;

/**
* The shared lock for task info synchronization.
*/
private final TaskInfoRWLock taskInfoRWLock;

@Inject
private ProxyToTaskMap() {
private ProxyToTaskMap(final TaskInfoRWLock taskInfoRWLock) {
this.taskInfoRWLock = taskInfoRWLock;
this.innerMap = new ConcurrentHashMap<>();
}

public void addNewProxy(final String taskId, final MasterToTaskMessage proxyToTask) {
assert taskInfoRWLock.isWriteLockedByCurrentThread();
innerMap.put(taskId, proxyToTask);
}

public Set<Map.Entry<String, MasterToTaskMessage>> entrySet() {
assert taskInfoRWLock.isReadHoldByCurrentThread();
return innerMap.entrySet();
}

public MasterToTaskMessage get(final String taskId) {
assert taskInfoRWLock.isReadHoldByCurrentThread();
return innerMap.get(taskId);
}

public MasterToTaskMessage remove(final String taskId) {
assert taskInfoRWLock.isWriteLockedByCurrentThread();
return innerMap.remove(taskId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,37 @@ public final class TaskAddressInfoMap {

private final ConcurrentMap<String, TaskAddressInfo> innerMap;

/**
* The read/write lock for task info synchronization.
*/
private final TaskInfoRWLock taskInfoRWLock;

@Inject
private TaskAddressInfoMap() {
private TaskAddressInfoMap(final TaskInfoRWLock taskInfoRWLock) {
this.taskInfoRWLock = taskInfoRWLock;
this.innerMap = new ConcurrentHashMap<>();
}

public TaskAddressInfo get(final String taskId) {
assert taskInfoRWLock.isReadHoldByCurrentThread();
return innerMap.get(taskId);
}

public TaskAddressInfo put(final String taskId, final TaskAddressInfo taskAddressInfo) {
assert taskInfoRWLock.isWriteLockedByCurrentThread();
return innerMap.put(taskId, taskAddressInfo);
}

public IPAddress getClientToTaskAddress(final String taskId) {
return new IPAddress(innerMap.get(taskId).getHostname(), innerMap.get(taskId).getClientToTaskPort());
return new IPAddress(this.get(taskId).getHostname(), this.get(taskId).getClientToTaskPort());
}

public IPAddress getMasterToTaskAddress(final String taskId) {
return new IPAddress(innerMap.get(taskId).getHostname(), innerMap.get(taskId).getMasterToTaskPort());
return new IPAddress(this.get(taskId).getHostname(), this.get(taskId).getMasterToTaskPort());
}

public TaskAddressInfo remove(final String taskId) {
assert taskInfoRWLock.isWriteLockedByCurrentThread();
return innerMap.remove(taskId);
}
}
111 changes: 111 additions & 0 deletions mist-core/src/main/java/edu/snu/mist/core/master/TaskInfoRWLock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (C) 2018 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.mist.core.master;

import javax.inject.Inject;
import java.util.Arrays;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* The shared read/write lock used for synchronizing adding / deleting task info
* based on ReentrantReadWriteLock class.
*/
public final class TaskInfoRWLock extends ReentrantReadWriteLock {

private static final Logger LOG = Logger.getLogger(TaskInfoRWLock.class.getName());

private final LoggableReadLock loggableReadLock;

private final LoggableWriteLock loggableWriteLock;

// The injectable constructor.
@Inject
private TaskInfoRWLock() {
super();
// Do nothing.
this.loggableReadLock = new LoggableReadLock(this);
this.loggableWriteLock = new LoggableWriteLock(this);
}

/**
* This method utilizes getReadHoldCount() for determining
* whether the current thread is holding a read lock or not.
* @return true / false
*/
public boolean isReadHoldByCurrentThread() {
return this.getReadHoldCount() > 0 || this.isWriteLockedByCurrentThread();
}

@Override
public WriteLock writeLock() {
return loggableWriteLock;
}

@Override
public ReadLock readLock() {
return loggableReadLock;
}

private static class LoggableReadLock extends ReadLock {
public LoggableReadLock(final ReentrantReadWriteLock lock) {
super(lock);
}

@Override
public void lock() {
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "Acquiring Readlock on TaskInfoRWLock: {0}",
Arrays.toString(Thread.currentThread().getStackTrace()));
}
super.lock();
}

@Override
public void unlock() {
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "Releasing Readlock on TaskInfoRWLock: {0}",
Arrays.toString(Thread.currentThread().getStackTrace()));
}
super.unlock();
}
}

private static class LoggableWriteLock extends WriteLock {
public LoggableWriteLock(final ReentrantReadWriteLock lock) {
super(lock);
}

@Override
public void lock() {
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "Acquiring Writelock on TaskInfoRWLock: {0}",
Arrays.toString(Thread.currentThread().getStackTrace()));
}
super.lock();
}

@Override
public void unlock() {
if (LOG.isLoggable(Level.FINE)) {
LOG.log(Level.FINE, "Releasing Readlock on TaskInfoRWLock: {0}",
Arrays.toString(Thread.currentThread().getStackTrace()));
}
super.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,25 @@ public final class TaskStatsMap {
*/
private final List<String> innerList;

/**
* The read/write lock for task info update.
*/
private final TaskInfoRWLock taskInfoRWLock;

@Inject
private TaskStatsMap() {
private TaskStatsMap(final TaskInfoRWLock taskInfoRWLock) {
this.taskInfoRWLock = taskInfoRWLock;
this.innerMap = new ConcurrentHashMap<>();
this.innerList = new CopyOnWriteArrayList<>();
}

public TaskStats get(final String taskId) {
assert taskInfoRWLock.isReadHoldByCurrentThread();
return innerMap.get(taskId);
}

public TaskStats addTask(final String taskId) {
assert taskInfoRWLock.isWriteLockedByCurrentThread();
innerList.add(taskId);
return innerMap.putIfAbsent(taskId, TaskStats.newBuilder()
.setTaskLoad(0.0)
Expand All @@ -65,6 +73,7 @@ public TaskStats addTask(final String taskId) {
}

public TaskStats removeTask(final String taskId) {
assert taskInfoRWLock.isWriteLockedByCurrentThread();
innerList.remove(taskId);
return innerMap.remove(taskId);
}
Expand All @@ -76,10 +85,12 @@ public void updateTaskStats(final String taskId, final TaskStats updatedTaskStat
}

public Set<Map.Entry<String, TaskStats>> entrySet() {
assert taskInfoRWLock.isReadHoldByCurrentThread();
return innerMap.entrySet();
}

public List<String> getTaskList() {
assert taskInfoRWLock.isReadHoldByCurrentThread();
return new ArrayList<>(innerMap.keySet());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (C) 2018 Seoul National University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.snu.mist.core.master.lb;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* The map which contains app-task mapping information.
*/
public final class AppTaskListMap {

private final ConcurrentMap<String, List<String>> innerMap;

@Inject
private AppTaskListMap() {
this.innerMap = new ConcurrentHashMap<>();
}

public synchronized void removeTask(final String removedTaskId) {
for (final Map.Entry<String, List<String>> entry: innerMap.entrySet()) {
final List<String> taskIdList = entry.getValue();
taskIdList.removeIf(taskId -> taskId.equals(removedTaskId));
if (taskIdList.isEmpty()) {
innerMap.remove(entry.getKey());
}
}
}

public synchronized void addTaskToApp(final String appId, final String taskId) {
if (!innerMap.containsKey(appId)) {
innerMap.putIfAbsent(appId, new ArrayList<>());
}
innerMap.get(appId).add(taskId);
}

public synchronized List<String> getTaskListForApp(final String appId) {
return innerMap.get(appId);
}
}
Loading

0 comments on commit 3e14542

Please sign in to comment.