diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/DefaultSharedStoreManagerImpl.java b/mist-core/src/main/java/edu/snu/mist/core/master/DefaultSharedStoreManagerImpl.java new file mode 100644 index 00000000..961f0962 --- /dev/null +++ b/mist-core/src/main/java/edu/snu/mist/core/master/DefaultSharedStoreManagerImpl.java @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.mist.core.master; + +import edu.snu.mist.core.parameters.SharedStorePath; +import org.apache.reef.tang.annotations.Parameter; + +import javax.inject.Inject; +import java.io.File; + +/** + * The default shared store manager implementation class. + */ +public final class DefaultSharedStoreManagerImpl implements SharedStoreManager { + + /** + * The shared store path. + */ + private final String sharedStorePath; + + @Inject + private DefaultSharedStoreManagerImpl(@Parameter(SharedStorePath.class) final String sharedStorePath) { + this.sharedStorePath = sharedStorePath; + } + + @Override + public boolean clearSharedStore() { + final File sharedStoreDir = new File(sharedStorePath); + final File[] listing = sharedStoreDir.listFiles(); + if (listing == null) { + throw new RuntimeException("Inavlid shared store directory path. Terminate MIST..."); + } + if (listing.length == 0) { + return true; + } + for (final File file: listing) { + final String fileName = file.getName(); + if (fileName.endsWith(".checkpoint") || fileName.endsWith(".query") || fileName.endsWith(".querylist")) { + // Try to delete the already existing query and checkpoint files... + if (!file.delete()) { + // If we fail to delete the existing files, there should be problems with the shared store... + return false; + } + } + } + return true; + } +} diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/MistMaster.java b/mist-core/src/main/java/edu/snu/mist/core/master/MistMaster.java index 62e0a200..6565a504 100644 --- a/mist-core/src/main/java/edu/snu/mist/core/master/MistMaster.java +++ b/mist-core/src/main/java/edu/snu/mist/core/master/MistMaster.java @@ -75,6 +75,11 @@ public final class MistMaster implements Task { */ private final ApplicationCodeManager applicationCodeManager; + /** + * The shared store manager. + */ + private final SharedStoreManager sharedStoreManager; + @Inject private MistMaster( @Parameter(DriverToMasterPort.class) final int driverToMasterPort, @@ -88,13 +93,15 @@ private MistMaster( final MasterSetupFinished masterSetupFinished, @Parameter(MasterRecovery.class) final boolean masterRecovery, final ApplicationCodeManager applicationCodeManager, - final DynamicScalingManager dynamicScalingManager) throws Exception { + final DynamicScalingManager dynamicScalingManager, + final SharedStoreManager sharedStoreManager) throws Exception { this.initialTaskNum = initialTaskNum; this.taskRequestor = taskRequestor; this.masterSetupFinished = masterSetupFinished; this.masterRecovery = masterRecovery; this.applicationCodeManager = applicationCodeManager; this.dynamicScalingManager = dynamicScalingManager; + this.sharedStoreManager = sharedStoreManager; // Initialize countdown latch this.countDownLatch = new CountDownLatch(1); @@ -114,6 +121,10 @@ public byte[] call(final byte[] memento) throws Exception { if (!masterRecovery) { taskRequestor.setupTaskAndConn(initialTaskNum); masterSetupFinished.setFinished(); + // Clear the shared store when the master is initiated for the first time. + if (!sharedStoreManager.clearSharedStore()) { + throw new RuntimeException("Cannot clear the shared store.. shutting down MIST..."); + } } else { applicationCodeManager.recoverAppJarInfo(); taskRequestor.recoverTaskConn(); diff --git a/mist-core/src/main/java/edu/snu/mist/core/master/SharedStoreManager.java b/mist-core/src/main/java/edu/snu/mist/core/master/SharedStoreManager.java new file mode 100644 index 00000000..b00e88f3 --- /dev/null +++ b/mist-core/src/main/java/edu/snu/mist/core/master/SharedStoreManager.java @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.mist.core.master; + +import org.apache.reef.tang.annotations.DefaultImplementation; + +/** + * The interface for master-side. shared store manager + */ +@DefaultImplementation(DefaultSharedStoreManagerImpl.class) +public interface SharedStoreManager { + + /** + * Clear the legacy query and checkpoint files on the shared store. + */ + boolean clearSharedStore(); +}