From 56971b34c3917003b376f7a74aa4f7e12a3daf05 Mon Sep 17 00:00:00 2001 From: DanteNiewenhuis <d.niewenhuis@hotmail.com> Date: Tue, 7 Jan 2025 17:45:27 +0100 Subject: [PATCH 1/6] Added maxCpuDemand to TraceWorkload, don't know if this will be needed so might remove later. Updated SimTraceWorkload to properly handle creating checkpoints Fixed a bug with the updatedConsumers in the FlowDistributor Implemented a first version of scaling the runtime of fragments. --- .../compute/simulator/internal/Guest.kt | 8 +- .../compute/workload/ComputeWorkloadLoader.kt | 11 +- .../org/opendc/compute/workload/Task.kt | 2 +- .../experiments/base/runner/ScenarioRunner.kt | 7 + .../opendc/experiments/base/ExperimentTest.kt | 2 +- .../base/FailuresAndCheckpointingTest.kt | 10 +- .../experiments/base/FlowDistributorTest.kt | 2 +- .../experiments/base/FragmentScalingTest.kt | 503 ++++++++++++++++++ .../opendc/experiments/base/TestingUtils.kt | 8 +- .../compute/workload/SimWorkload.java | 8 +- .../{ => trace}/SimTraceWorkload.java | 120 +++-- .../workload/{ => trace}/TraceFragment.java | 2 +- .../workload/{ => trace}/TraceWorkload.java | 42 +- .../trace/scaling/NoDelayScaling.java | 25 + .../trace/scaling/PerfectScaling.java | 25 + .../workload/trace/scaling/ScalingPolicy.java | 10 + .../opendc/simulator/compute/Coroutines.kt | 2 +- .../simulator/engine/engine/FlowEngine.java | 14 +- .../engine/graph/FlowDistributor.java | 33 +- .../simulator/engine/graph/FlowGraph.java | 4 +- 20 files changed, 737 insertions(+), 101 deletions(-) create mode 100644 opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt rename opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/{ => trace}/SimTraceWorkload.java (71%) rename opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/{ => trace}/TraceFragment.java (96%) rename opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/{ => trace}/TraceWorkload.java (77%) create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java create mode 100644 opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index ee2cb319e..2151bcadb 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -31,8 +31,9 @@ import org.opendc.compute.simulator.telemetry.GuestSystemStats import org.opendc.simulator.compute.machine.SimMachine import org.opendc.simulator.compute.machine.VirtualMachine import org.opendc.simulator.compute.workload.ChainWorkload -import org.opendc.simulator.compute.workload.TraceFragment -import org.opendc.simulator.compute.workload.TraceWorkload +import org.opendc.simulator.compute.workload.trace.TraceFragment +import org.opendc.simulator.compute.workload.trace.TraceWorkload +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling import java.time.Duration import java.time.Instant import java.time.InstantSource @@ -93,6 +94,8 @@ public class Guest( onStart() + val scalingPolicy = NoDelayScaling() + val bootworkload = TraceWorkload( ArrayList( @@ -107,6 +110,7 @@ public class Guest( 0, 0, 0.0, + scalingPolicy ) if (task.workload is TraceWorkload) { diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index 655bacb9c..e01054179 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -23,7 +23,9 @@ package org.opendc.compute.workload import mu.KotlinLogging -import org.opendc.simulator.compute.workload.TraceWorkload +import org.opendc.simulator.compute.workload.trace.TraceWorkload +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling +import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy import org.opendc.trace.Trace import org.opendc.trace.conv.TABLE_RESOURCES import org.opendc.trace.conv.TABLE_RESOURCE_STATES @@ -53,6 +55,7 @@ public class ComputeWorkloadLoader( private val checkpointInterval: Long = 0L, private val checkpointDuration: Long = 0L, private val checkpointIntervalScaling: Double = 1.0, + private val scalingPolicy: ScalingPolicy = NoDelayScaling() ) : WorkloadLoader(subMissionTime) { /** * The logger for this instance. @@ -84,7 +87,7 @@ public class ComputeWorkloadLoader( val cores = reader.getInt(coresCol) val cpuUsage = reader.getDouble(usageCol) - val builder = fragments.computeIfAbsent(id) { Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling) } + val builder = fragments.computeIfAbsent(id) { Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy) } builder.add(durationMs, cpuUsage, cores) } @@ -178,7 +181,7 @@ public class ComputeWorkloadLoader( /** * A builder for a VM trace. */ - private class Builder(checkpointInterval: Long, checkpointDuration: Long, checkpointIntervalScaling: Double) { + private class Builder(checkpointInterval: Long, checkpointDuration: Long, checkpointIntervalScaling: Double, scalingPolicy: ScalingPolicy) { /** * The total load of the trace. */ @@ -187,7 +190,7 @@ public class ComputeWorkloadLoader( /** * The internal builder for the trace. */ - private val builder = TraceWorkload.builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling) + private val builder = TraceWorkload.builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy) /** * Add a fragment to the trace. diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt index 60be9299d..7a5089b9a 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/Task.kt @@ -22,7 +22,7 @@ package org.opendc.compute.workload -import org.opendc.simulator.compute.workload.TraceWorkload +import org.opendc.simulator.compute.workload.trace.TraceWorkload import java.time.Instant import java.util.UUID diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index c9c2729de..22c0b3ec0 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -34,6 +34,9 @@ import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor import org.opendc.compute.topology.clusterTopology import org.opendc.experiments.base.experiment.Scenario import org.opendc.experiments.base.experiment.specs.getWorkloadLoader +import org.opendc.experiments.base.experiment.specs.getWorkloadType +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling +import org.opendc.simulator.compute.workload.trace.scaling.PerfectScaling import org.opendc.simulator.kotlin.runSimulation import java.io.File import java.time.Duration @@ -80,6 +83,9 @@ public fun runScenario( val checkpointDuration = scenario.checkpointModelSpec?.checkpointDuration ?: 0L val checkpointIntervalScaling = scenario.checkpointModelSpec?.checkpointIntervalScaling ?: 1.0 + val scalingPolicy = NoDelayScaling(); +// val scalingPolicy = PerfectScaling(); + val workloadLoader = getWorkloadLoader( scenario.workloadSpec.type, @@ -88,6 +94,7 @@ public fun runScenario( checkpointInterval, checkpointDuration, checkpointIntervalScaling, + scalingPolicy ) val workload = workloadLoader.sampleByLoad(scenario.workloadSpec.sampleFraction) diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt index 568505583..e271fce7d 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/ExperimentTest.kt @@ -26,7 +26,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.workload.Task -import org.opendc.simulator.compute.workload.TraceFragment +import org.opendc.simulator.compute.workload.trace.TraceFragment import java.util.ArrayList /** diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt index 90737ab68..a3c3b3852 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt @@ -27,7 +27,7 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.workload.Task import org.opendc.experiments.base.experiment.specs.TraceBasedFailureModelSpec -import org.opendc.simulator.compute.workload.TraceFragment +import org.opendc.simulator.compute.workload.trace.TraceFragment import java.util.ArrayList /** @@ -225,7 +225,7 @@ class FailuresAndCheckpointingTest { } /** - * Failure test 1: Single Task with checkpointing + * Checkpointing test 1: Single Task with checkpointing * In this test, a single task is scheduled that is interrupted by a failure after 5 min. * Because there is no checkpointing, the full task has to be rerun. * @@ -263,7 +263,7 @@ class FailuresAndCheckpointingTest { } /** - * Failure test 2: Single Task with scaling checkpointing + * Checkpointing test 2: Single Task with scaling checkpointing * In this test, a single task is scheduled that is interrupted by a failure after 5 min. * Because there is no checkpointing, the full task has to be rerun. * @@ -302,7 +302,7 @@ class FailuresAndCheckpointingTest { } /** - * Checkpoint test 3: Single Task, single failure with checkpointing + * Checkpointing test 3: Single Task, single failure with checkpointing * In this test, a single task is scheduled that is interrupted by a failure after 5 min. * Because there is no checkpointing, the full task has to be rerun. * @@ -360,7 +360,7 @@ class FailuresAndCheckpointingTest { } /** - * Checkpoint test 4: Single Task, repeated failure with checkpointing + * Checkpointing test 4: Single Task, repeated failure with checkpointing * In this test, a single task is scheduled that is interrupted by a failure after 5 min. * Because there is no checkpointing, the full task has to be rerun. * diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt index 4a7c9341b..3d7333607 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FlowDistributorTest.kt @@ -26,7 +26,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.workload.Task -import org.opendc.simulator.compute.workload.TraceFragment +import org.opendc.simulator.compute.workload.trace.TraceFragment import java.util.ArrayList /** diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt new file mode 100644 index 000000000..8c94f0737 --- /dev/null +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt @@ -0,0 +1,503 @@ +/* + * Copyright (c) 2020 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package org.opendc.experiments.base + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertAll +import org.opendc.compute.simulator.provisioner.Provisioner +import org.opendc.compute.simulator.provisioner.registerComputeMonitor +import org.opendc.compute.simulator.provisioner.setupComputeService +import org.opendc.compute.simulator.provisioner.setupHosts +import org.opendc.compute.simulator.scheduler.FilterScheduler +import org.opendc.compute.simulator.scheduler.filters.ComputeFilter +import org.opendc.compute.simulator.scheduler.filters.RamFilter +import org.opendc.compute.simulator.scheduler.filters.VCpuFilter +import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher +import org.opendc.compute.simulator.service.ComputeService +import org.opendc.compute.simulator.telemetry.ComputeMonitor +import org.opendc.compute.simulator.telemetry.table.HostTableReader +import org.opendc.compute.simulator.telemetry.table.ServiceTableReader +import org.opendc.compute.simulator.telemetry.table.TaskTableReader +import org.opendc.compute.topology.clusterTopology +import org.opendc.compute.topology.specs.ClusterSpec +import org.opendc.compute.workload.ComputeWorkloadLoader +import org.opendc.compute.workload.Task +import org.opendc.experiments.base.runner.replay +import org.opendc.simulator.compute.workload.trace.TraceFragment +import org.opendc.simulator.compute.workload.trace.TraceWorkload +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling +import org.opendc.simulator.compute.workload.trace.scaling.PerfectScaling +import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy +import org.opendc.simulator.kotlin.runSimulation +import java.io.File +import java.time.Duration +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneId +import java.util.ArrayList +import java.util.UUID + +/** + * Testing suite containing tests that specifically test the scaling of trace fragments + */ +class FragmentScalingTest { + /** + * The monitor used to keep track of the metrics. + */ + private lateinit var monitor: TestComputeMonitor + + /** + * The [FilterScheduler] to use for all experiments. + */ + private lateinit var computeScheduler: FilterScheduler + + /** + * The [ComputeWorkloadLoader] responsible for loading the traces. + */ + private lateinit var workloadLoader: ComputeWorkloadLoader + + private val basePath = "src/test/resources/FragmentScaling" + + /** + * Set up the experimental environment. + */ + @BeforeEach + fun setUp() { + monitor = TestComputeMonitor() + computeScheduler = + FilterScheduler( + filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), + weighers = listOf(CoreRamWeigher(multiplier = 1.0)), + ) + workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 0L, 0L, 0.0) + } + + private fun createTestTask( + name: String, + cpuCount: Int = 1, + cpuCapacity: Double = 0.0, + memCapacity: Long = 0L, + submissionTime: String = "1970-01-01T00:00", + duration: Long = 0L, + fragments: ArrayList<TraceFragment>, + scalingPolicy: ScalingPolicy = NoDelayScaling(), + ): Task { + return Task( + UUID.nameUUIDFromBytes(name.toByteArray()), + name, + cpuCount, + cpuCapacity, + memCapacity, + 1800000.0, + LocalDateTime.parse(submissionTime).atZone(ZoneId.systemDefault()).toInstant(), + duration, + TraceWorkload( + fragments, + 0L, 0L, 0.0, + scalingPolicy + ), + ) + } + + private fun runTest( + topology: List<ClusterSpec>, + workload: ArrayList<Task> + ): TestComputeMonitor { + + val monitor = TestComputeMonitor() + runSimulation { + val seed = 0L + Provisioner(dispatcher, seed).use { provisioner -> + provisioner.runSteps( + setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), + registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), + setupHosts(serviceDomain = "compute.opendc.org", topology), + ) + + val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! + service.replay(timeSource, workload) + } + } + return monitor + } + + /** + * Scaling test 1: A single fitting task + * In this test, a single task is scheduled that should fit the system. + * This means nothing will be delayed regardless of the scaling policy + */ + @Test + fun testScaling1() { + val workloadNoDelay: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 2000.0, 1), + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + scalingPolicy = NoDelayScaling() + ), + ) + + val workloadPerfect: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 2000.0, 1), + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + scalingPolicy = PerfectScaling() + ), + ) + val topology = createTopology("single_1_2000.json") + + val monitorNoDelay = runTest(topology, workloadNoDelay) + val monitorPerfect = runTest(topology, workloadPerfect) + + assertAll( + { assertEquals(1200000, monitorNoDelay.finalTimestamp) { "The workload took longer to finish than expected." } }, + { assertEquals(1200000, monitorPerfect.finalTimestamp) { "The workload took longer to finish than expected." } }, + + { assertEquals(2000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, + + { assertEquals(2000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, + + { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitorPerfect.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, + + { assertEquals(1000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(1000.0, monitorPerfect.taskCpuSupplied["0"]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, + ) + } + + /** + * Scaling test 2: A single task with a single non-fitting fragment + * In this test, a single task is scheduled that should not fit. + * This means the Task is getting only 2000 Mhz while it was demanding 4000 Mhz + * + * For the NoDelay scaling policy, the task should take the planned 10 min. + * For the Perfect scaling policy, the task should be slowed down by 50% resulting in a runtime of 20 min. + */ + @Test + fun testScaling2() { + val workloadNoDelay: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 4000.0, 1), + ), + scalingPolicy = NoDelayScaling() + ), + ) + + val workloadPerfect: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 4000.0, 1), + ), + scalingPolicy = PerfectScaling() + ), + ) + val topology = createTopology("single_1_2000.json") + + val monitorNoDelay = runTest(topology, workloadNoDelay) + val monitorPerfect = runTest(topology, workloadPerfect) + + assertAll( + { assertEquals(600000, monitorNoDelay.finalTimestamp) { "The workload took longer to finish than expected." } }, + { assertEquals(1200000, monitorPerfect.finalTimestamp) { "The workload took longer to finish than expected." } }, + + { assertEquals(4000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(4000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, + + { assertEquals(2000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, + ) + } + + /** + * Scaling test 3: A single task that switches between fitting and not fitting + * In this test, a single task is scheduled has one fragment that does not fit + * This means the second fragment is getting only 2000 Mhz while it was demanding 4000 Mhz + * + * For the NoDelay scaling policy, the task should take the planned 30 min. + * For the Perfect scaling policy, the second fragment should be slowed down by 50% resulting in a runtime of 20 min, + * and a total runtime of 40 min. + */ + @Test + fun testScaling3() { + val workloadNoDelay: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + TraceFragment(10 * 60 * 1000, 4000.0, 1), + TraceFragment(10 * 60 * 1000, 1500.0, 1), + ), + scalingPolicy = NoDelayScaling() + ), + ) + + val workloadPerfect: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + TraceFragment(10 * 60 * 1000, 4000.0, 1), + TraceFragment(10 * 60 * 1000, 1500.0, 1), + ), + scalingPolicy = PerfectScaling() + ), + ) + val topology = createTopology("single_1_2000.json") + + val monitorNoDelay = runTest(topology, workloadNoDelay) + val monitorPerfect = runTest(topology, workloadPerfect) + + assertAll( + { assertEquals(1800000, monitorNoDelay.finalTimestamp) { "The workload took longer to finish than expected." } }, + { assertEquals(2400000, monitorPerfect.finalTimestamp) { "The workload took longer to finish than expected." } }, + + { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, + + { assertEquals(1000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(1000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, + + { assertEquals(4000.0, monitorNoDelay.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(4000.0, monitorPerfect.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, + + { assertEquals(2000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(2000.0, monitorPerfect.taskCpuSupplied["0"]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, + + { assertEquals(1500.0, monitorNoDelay.taskCpuDemands["0"]?.get(19)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(4000.0, monitorPerfect.taskCpuDemands["0"]?.get(19)) { "The cpu demanded by task 0 is incorrect" } }, + + { assertEquals(1500.0, monitorNoDelay.taskCpuSupplied["0"]?.get(19)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(2000.0, monitorPerfect.taskCpuSupplied["0"]?.get(19)) { "The cpu supplied to task 0 is incorrect" } }, + + { assertEquals(1500.0, monitorPerfect.taskCpuDemands["0"]?.get(29)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(1500.0, monitorPerfect.taskCpuSupplied["0"]?.get(29)) { "The cpu supplied to task 0 is incorrect" } }, + ) + } + + /** + * Scaling test 4: Two tasks, that both fit + * In this test, two tasks are scheduled that both fit + * + * For both scaling policies, the tasks should run without delay. + */ + @Test + fun testScaling4() { + val workloadNoDelay: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + scalingPolicy = NoDelayScaling() + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 3000.0, 1), + ), + scalingPolicy = NoDelayScaling() + ), + ) + + val workloadPerfect: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + scalingPolicy = PerfectScaling() + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 3000.0, 1), + ), + scalingPolicy = PerfectScaling() + ), + ) + val topology = createTopology("single_2_2000.json") + + val monitorNoDelay = runTest(topology, workloadNoDelay) + val monitorPerfect = runTest(topology, workloadPerfect) + + assertAll( + { assertEquals(600000, monitorNoDelay.finalTimestamp) { "The workload took longer to finish than expected." } }, + { assertEquals(600000, monitorPerfect.finalTimestamp) { "The workload took longer to finish than expected." } }, + + { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitorNoDelay.taskCpuDemands["1"]?.get(0)) { "The cpu demanded by task 1 is incorrect" } }, + { assertEquals(1000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, + { assertEquals(3000.0, monitorPerfect.taskCpuDemands["1"]?.get(0)) { "The cpu demanded by task 1 is incorrect" } }, + + { assertEquals(1000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(3000.0, monitorNoDelay.taskCpuSupplied["1"]?.get(0)) { "The cpu supplied to task 1 is incorrect" } }, + { assertEquals(1000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, + { assertEquals(3000.0, monitorPerfect.taskCpuSupplied["1"]?.get(0)) { "The cpu supplied to task 1 is incorrect" } }, + ) + } + + /** + * Scaling test 5: Two tasks, that don't fit together + * In this test, two tasks are scheduled that do not fit together + * This means the Task_1 is getting only 2000 Mhz while it was demanding 4000 Mhz + * + * For the NoDelay scaling policy, the tasks should complete in 10 min + * For the Perfect scaling policy, task_1 is delayed while task_0 is still going. + * In the first 10 min (while Task_0 is still running), Task_1 is running at 50%. + * This means that after Task_0 is done, Task_1 still needs to run for 5 minutes, making the total runtime 15 min. + */ + @Test + fun testScaling5() { + val workloadNoDelay: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 2000.0, 1), + ), + scalingPolicy = NoDelayScaling() + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 4000.0, 1), + ), + scalingPolicy = NoDelayScaling() + ), + ) + + val workloadPerfect: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 2000.0, 1), + ), + scalingPolicy = PerfectScaling() + ), + createTestTask( + name = "1", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 4000.0, 1), + ), + scalingPolicy = PerfectScaling() + ), + ) + val topology = createTopology("single_2_2000.json") + +// val monitorNoDelay = runTest(topology, workloadNoDelay) + val monitorPerfect = runTest(topology, workloadPerfect) + +// assertAll( +// { assertEquals(600000, monitorNoDelay.finalTimestamp) { "The workload took longer to finish than expected." } }, +// { assertEquals(900000, monitorPerfect.finalTimestamp) { "The workload took longer to finish than expected." } }, +// +// { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, +// { assertEquals(3000.0, monitorNoDelay.taskCpuDemands["1"]?.get(0)) { "The cpu demanded by task 1 is incorrect" } }, +// { assertEquals(1000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, +// { assertEquals(3000.0, monitorPerfect.taskCpuDemands["1"]?.get(0)) { "The cpu demanded by task 1 is incorrect" } }, +// +// { assertEquals(1000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, +// { assertEquals(3000.0, monitorNoDelay.taskCpuSupplied["1"]?.get(0)) { "The cpu supplied to task 1 is incorrect" } }, +// { assertEquals(1000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, +// { assertEquals(3000.0, monitorPerfect.taskCpuSupplied["1"]?.get(0)) { "The cpu supplied to task 1 is incorrect" } }, +// ) + } + /** + * Obtain the topology factory for the test. + */ + private fun createTopology(name: String): List<ClusterSpec> { + val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/FragmentScaling/topologies/$name")) + return stream.use { clusterTopology(stream) } + } + + class TestComputeMonitor : ComputeMonitor { + var finalTimestamp: Long = 0L; + + + override fun record(reader: ServiceTableReader) { + finalTimestamp = reader.timestamp.toEpochMilli(); + + super.record(reader) + } + + + + var hostCpuDemands = ArrayList<Double>() + var hostCpuSupplied = ArrayList<Double>() + + override fun record(reader: HostTableReader) { + hostCpuDemands.add(reader.cpuDemand) + hostCpuSupplied.add(reader.cpuUsage) + } + + var taskCpuDemands = mutableMapOf<String, ArrayList<Double>>() + var taskCpuSupplied = mutableMapOf<String, ArrayList<Double>>() + + override fun record(reader: TaskTableReader) { + val taskName: String = reader.taskInfo.name + + if (taskName in taskCpuDemands) { + taskCpuDemands[taskName]?.add(reader.cpuDemand) + taskCpuSupplied[taskName]?.add(reader.cpuUsage) + } else { + taskCpuDemands[taskName] = arrayListOf(reader.cpuDemand) + taskCpuSupplied[taskName] = arrayListOf(reader.cpuUsage) + } + } + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt index eadd97e48..6f91ec43f 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt @@ -43,8 +43,10 @@ import org.opendc.compute.topology.specs.ClusterSpec import org.opendc.compute.workload.Task import org.opendc.experiments.base.experiment.specs.FailureModelSpec import org.opendc.experiments.base.runner.replay -import org.opendc.simulator.compute.workload.TraceFragment -import org.opendc.simulator.compute.workload.TraceWorkload +import org.opendc.simulator.compute.workload.trace.TraceFragment +import org.opendc.simulator.compute.workload.trace.TraceWorkload +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling +import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy import org.opendc.simulator.kotlin.runSimulation import java.time.Duration import java.time.LocalDateTime @@ -69,6 +71,7 @@ fun createTestTask( checkpointInterval: Long = 0L, checkpointDuration: Long = 0L, checkpointIntervalScaling: Double = 1.0, + scalingPolicy: ScalingPolicy = NoDelayScaling() ): Task { return Task( UUID.nameUUIDFromBytes(name.toByteArray()), @@ -84,6 +87,7 @@ fun createTestTask( checkpointInterval, checkpointDuration, checkpointIntervalScaling, + scalingPolicy ), ) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java index 2919fc3a2..ebfcc552b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java @@ -55,11 +55,11 @@ public SimWorkload(FlowGraph parentGraph) { public abstract Workload getSnapshot(); - abstract void createCheckpointModel(); + public abstract void createCheckpointModel(); - abstract long getCheckpointInterval(); + public abstract long getCheckpointInterval(); - abstract long getCheckpointDuration(); + public abstract long getCheckpointDuration(); - abstract double getCheckpointIntervalScaling(); + public abstract double getCheckpointIntervalScaling(); } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java similarity index 71% rename from opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java rename to opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java index 9b12b1e34..8f0d7c7d5 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.workload; +package org.opendc.simulator.compute.workload.trace; import java.util.LinkedList; import org.opendc.simulator.engine.graph.FlowConsumer; @@ -29,6 +29,10 @@ import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; +import org.opendc.simulator.compute.workload.SimWorkload; +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; +import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy; + public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private LinkedList<TraceFragment> remainingFragments; private int fragmentIndex; @@ -37,13 +41,18 @@ public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private long startOfFragment; private FlowEdge machineEdge; - private double currentDemand; - private double currentSupply; + + private double cpuFreqDemand = 0.0; // The Cpu demanded by fragment + private double cpuFreqSupplied = 0.0; // The Cpu speed supplied + private double newCpuFreqSupplied = 0.0; // The Cpu speed supplied + private double remainingWork = 0.0; // The duration of the fragment at the demanded speed private long checkpointDuration; private TraceWorkload snapshot; + private ScalingPolicy scalingPolicy = new NoDelayScaling(); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Basic Getters and Setters //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -57,27 +66,20 @@ public TraceWorkload getSnapshot() { } @Override - long getCheckpointInterval() { + public long getCheckpointInterval() { return 0; } @Override - long getCheckpointDuration() { + public long getCheckpointDuration() { return 0; } @Override - double getCheckpointIntervalScaling() { + public double getCheckpointIntervalScaling() { return 0; } - public TraceFragment getNextFragment() { - this.currentFragment = this.remainingFragments.pop(); - this.fragmentIndex++; - - return this.currentFragment; - } - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Constructors //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -87,15 +89,12 @@ public SimTraceWorkload(FlowSupplier supplier, TraceWorkload workload, long now) this.snapshot = workload; this.checkpointDuration = workload.getCheckpointDuration(); + this.scalingPolicy = workload.getScalingPolicy(); this.remainingFragments = new LinkedList<>(workload.getFragments()); this.fragmentIndex = 0; final FlowGraph graph = ((FlowNode) supplier).getGraph(); graph.addEdge(this, supplier); - - this.currentFragment = this.getNextFragment(); - pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage()); - this.startOfFragment = now; } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -105,38 +104,54 @@ public SimTraceWorkload(FlowSupplier supplier, TraceWorkload workload, long now) @Override public long onUpdate(long now) { long passedTime = getPassedTime(now); - long duration = this.currentFragment.duration(); + this.startOfFragment = now; - // The current Fragment has not yet been finished, continue - if (passedTime < duration) { - return now + (duration - passedTime); + // The amount of work done since last update + double finishedWork = this.scalingPolicy.getFinishedWork(this.cpuFreqDemand, this.cpuFreqSupplied, passedTime); + + this.remainingWork -= finishedWork; + + // If this.remainingWork <= 0, the fragment has been completed + if (this.remainingWork <= 0) { + this.startNextFragment(); + + this.invalidate(); + return Long.MAX_VALUE; } - // Loop through fragments until the passed time is filled. - // We need a while loop to account for skipping of fragments. - while (passedTime >= duration) { - if (this.remainingFragments.isEmpty()) { - this.stopWorkload(); - return Long.MAX_VALUE; - } + this.cpuFreqSupplied = this.newCpuFreqSupplied; + + // The amount of time required to finish the fragment at this speed + long remainingDuration = this.scalingPolicy.getRemainingDuration(this.cpuFreqDemand, this.newCpuFreqSupplied, this.remainingWork); - passedTime = passedTime - duration; + return now + remainingDuration; + } - // get next Fragment - currentFragment = this.getNextFragment(); - duration = currentFragment.duration(); + public TraceFragment getNextFragment() { + if (this.remainingFragments.isEmpty()) { + return null; } + this.currentFragment = this.remainingFragments.pop(); + this.fragmentIndex++; - // start new fragment - this.startOfFragment = now - passedTime; + return this.currentFragment; + } - // Change the cpu Usage to the new Fragment - pushOutgoingDemand(machineEdge, this.currentFragment.cpuUsage()); + private void startNextFragment() { - // Return the time when the current fragment will complete - return this.startOfFragment + duration; + // TODO: turn this into a loop, should not be needed, but might be safer + TraceFragment nextFragment = this.getNextFragment(); + if (nextFragment == null) { + this.stopWorkload(); + return; + } + double demand = nextFragment.cpuUsage(); + this.remainingWork = this.scalingPolicy.getRemainingWork(demand, nextFragment.duration()); + this.pushOutgoingDemand(this.machineEdge, demand); } + + @Override public void stopWorkload() { if (this.machineEdge == null) { @@ -159,7 +174,7 @@ public void stopWorkload() { * TODO: Maybe add checkpoint models for SimTraceWorkload */ @Override - void createCheckpointModel() {} + public void createCheckpointModel() {} /** * Create a new snapshot based on the current status of the workload. @@ -171,7 +186,14 @@ public void makeSnapshot(long now) { // Get remaining time of current fragment long passedTime = getPassedTime(now); - long remainingTime = currentFragment.duration() - passedTime; + + // The amount of work done since last update + double finishedWork = this.scalingPolicy.getFinishedWork(this.cpuFreqDemand, this.cpuFreqSupplied, passedTime); + + this.remainingWork -= finishedWork; + + // The amount of time required to finish the fragment at this speed + long remainingTime = this.scalingPolicy.getRemainingDuration(this.cpuFreqDemand, this.cpuFreqDemand, this.remainingWork); // If this is the end of the Task, don't make a snapshot if (remainingTime <= 0 && remainingFragments.isEmpty()) { @@ -180,7 +202,7 @@ public void makeSnapshot(long now) { // Create a new fragment based on the current fragment and remaining duration TraceFragment newFragment = - new TraceFragment(remainingTime, currentFragment.cpuUsage(), currentFragment.coreCount()); + new TraceFragment(remainingTime, currentFragment.cpuUsage(), currentFragment.coreCount()); // Alter the snapshot by removing finished fragments this.snapshot.removeFragments(this.fragmentIndex); @@ -194,8 +216,8 @@ public void makeSnapshot(long now) { this.remainingFragments.addFirst(snapshotFragment); this.fragmentIndex = -1; - this.currentFragment = getNextFragment(); - pushOutgoingDemand(this.machineEdge, this.currentFragment.cpuUsage()); + startNextFragment(); + this.startOfFragment = now; this.invalidate(); @@ -213,11 +235,14 @@ public void makeSnapshot(long now) { */ @Override public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { - if (newSupply == this.currentSupply) { + if (newSupply == this.cpuFreqSupplied) { return; } - this.currentSupply = newSupply; + this.cpuFreqSupplied = this.newCpuFreqSupplied; + this.newCpuFreqSupplied = newSupply; + + this.invalidate(); } /** @@ -228,11 +253,11 @@ public void handleIncomingSupply(FlowEdge supplierEdge, double newSupply) { */ @Override public void pushOutgoingDemand(FlowEdge supplierEdge, double newDemand) { - if (newDemand == this.currentDemand) { + if (newDemand == this.cpuFreqDemand) { return; } - this.currentDemand = newDemand; + this.cpuFreqDemand = newDemand; this.machineEdge.pushDemand(newDemand); } @@ -257,6 +282,7 @@ public void removeSupplierEdge(FlowEdge supplierEdge) { if (this.machineEdge == null) { return; } + this.stopWorkload(); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java similarity index 96% rename from opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java rename to opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java index 550c2135d..a09206a15 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceFragment.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceFragment.java @@ -20,7 +20,7 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.workload; +package org.opendc.simulator.compute.workload.trace; public record TraceFragment(long duration, double cpuUsage, int coreCount) { diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java similarity index 77% rename from opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java rename to opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java index 7f82ab714..7dcda4c57 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/TraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java @@ -20,10 +20,16 @@ * SOFTWARE. */ -package org.opendc.simulator.compute.workload; +package org.opendc.simulator.compute.workload.trace; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; + +import org.opendc.simulator.compute.workload.SimWorkload; +import org.opendc.simulator.compute.workload.Workload; +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; +import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy; import org.opendc.simulator.engine.graph.FlowSupplier; public class TraceWorkload implements Workload { @@ -31,20 +37,32 @@ public class TraceWorkload implements Workload { private final long checkpointInterval; private final long checkpointDuration; private final double checkpointIntervalScaling; + private final double maxCpuDemand; + + public ScalingPolicy getScalingPolicy() { + return scalingPolicy; + } + + private final ScalingPolicy scalingPolicy; public TraceWorkload( - ArrayList<TraceFragment> fragments, - long checkpointInterval, - long checkpointDuration, - double checkpointIntervalScaling) { + ArrayList<TraceFragment> fragments, + long checkpointInterval, + long checkpointDuration, + double checkpointIntervalScaling, + ScalingPolicy scalingPolicy) { this.fragments = fragments; this.checkpointInterval = checkpointInterval; this.checkpointDuration = checkpointDuration; this.checkpointIntervalScaling = checkpointIntervalScaling; + this.scalingPolicy = scalingPolicy; + + // TODO: remove if we decide not to use it. + this.maxCpuDemand = fragments.stream().max(Comparator.comparing(TraceFragment::cpuUsage)).get().cpuUsage(); } public TraceWorkload(ArrayList<TraceFragment> fragments) { - this(fragments, 0L, 0L, 1.0); + this(fragments, 0L, 0L, 1.0, new NoDelayScaling()); } public ArrayList<TraceFragment> getFragments() { @@ -83,11 +101,11 @@ public SimWorkload startWorkload(FlowSupplier supplier, long now) { } public static Builder builder() { - return builder(0L, 0L, 0.0); + return builder(0L, 0L, 0.0, new NoDelayScaling()); } - public static Builder builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling) { - return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling); + public static Builder builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling, ScalingPolicy scalingPolicy) { + return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy); } /** @@ -125,15 +143,17 @@ public static final class Builder { private final long checkpointInterval; private final long checkpointDuration; private final double checkpointIntervalScaling; + private final ScalingPolicy scalingPolicy; /** * Construct a new {@link Builder} instance. */ - private Builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling) { + private Builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling, ScalingPolicy scalingPolicy) { this.fragments = new ArrayList<>(); this.checkpointInterval = checkpointInterval; this.checkpointDuration = checkpointDuration; this.checkpointIntervalScaling = checkpointIntervalScaling; + this.scalingPolicy = scalingPolicy; } /** @@ -152,7 +172,7 @@ public void add(long duration, double usage, int cores) { */ public TraceWorkload build() { return new TraceWorkload( - this.fragments, this.checkpointInterval, this.checkpointDuration, this.checkpointIntervalScaling); + this.fragments, this.checkpointInterval, this.checkpointDuration, this.checkpointIntervalScaling, this.scalingPolicy); } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java new file mode 100644 index 000000000..c0ec750d7 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java @@ -0,0 +1,25 @@ +package org.opendc.simulator.compute.workload.trace.scaling; + +/** + * The NoDelay scaling policy states that there will be no delay + * when less CPU can be provided than needed. + * + * This could be used in situations where the data is streamed. + * This will also result in the same behaviour as older OpenDC. + */ +public class NoDelayScaling implements ScalingPolicy { + @Override + public double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime) { + return cpuFreqDemand * passedTime; + } + + @Override + public long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork) { + return (long) (remainingWork / cpuFreqDemand); + } + + @Override + public double getRemainingWork(double cpuFreqDemand, long duration) { + return cpuFreqDemand * duration; + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java new file mode 100644 index 000000000..281068880 --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java @@ -0,0 +1,25 @@ +package org.opendc.simulator.compute.workload.trace.scaling; + +/** + * PerfectScaling scales the workload duration perfectly + * based on the CPU capacity. + * + * This means that if a fragment has a duration of 10 min at 4000 mHz, + * it will take 20 min and 2000 mHz. + */ +public class PerfectScaling implements ScalingPolicy { + @Override + public double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime) { + return cpuFreqSupplied * passedTime; + } + + @Override + public long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork) { + return (long) (remainingWork / cpuFreqSupplied); + } + + @Override + public double getRemainingWork(double cpuFreqDemand, long duration) { + return cpuFreqDemand * duration; + } +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java new file mode 100644 index 000000000..ee734aeec --- /dev/null +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java @@ -0,0 +1,10 @@ +package org.opendc.simulator.compute.workload.trace.scaling; + +public interface ScalingPolicy { + + double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime); + + long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork); + + double getRemainingWork(double cpuFreqDemand, long duration); +} diff --git a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt index ad69a3d62..49baaf48f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt +++ b/opendc-simulator/opendc-simulator-compute/src/main/kotlin/org/opendc/simulator/compute/Coroutines.kt @@ -24,7 +24,7 @@ package org.opendc.simulator.compute import kotlinx.coroutines.suspendCancellableCoroutine import org.opendc.simulator.compute.machine.SimMachine -import org.opendc.simulator.compute.workload.TraceWorkload +import org.opendc.simulator.compute.workload.trace.TraceWorkload import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java index 67540f4e6..3f18ac768 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/engine/FlowEngine.java @@ -139,16 +139,16 @@ public void scheduleDelayed(FlowNode ctx) { * This method should only be invoked while inside an engine cycle. */ public void scheduleDelayedInContext(FlowNode ctx) { - FlowEventQueue timerQueue = this.eventQueue; - timerQueue.enqueue(ctx); + FlowEventQueue eventQueue = this.eventQueue; + eventQueue.enqueue(ctx); } /** * Run all the enqueued actions for the specified timestamp (<code>now</code>). */ private void doRunEngine(long now) { - final FlowCycleQueue queue = this.cycleQueue; - final FlowEventQueue timerQueue = this.eventQueue; + final FlowCycleQueue cycleQueue = this.cycleQueue; + final FlowEventQueue eventQueue = this.eventQueue; try { // Mark the engine as active to prevent concurrent calls to this method @@ -156,7 +156,7 @@ private void doRunEngine(long now) { // Execute all scheduled updates at current timestamp while (true) { - final FlowNode ctx = timerQueue.poll(now); + final FlowNode ctx = eventQueue.poll(now); if (ctx == null) { break; } @@ -166,7 +166,7 @@ private void doRunEngine(long now) { // Execute all immediate updates while (true) { - final FlowNode ctx = queue.poll(); + final FlowNode ctx = cycleQueue.poll(); if (ctx == null) { break; } @@ -178,7 +178,7 @@ private void doRunEngine(long now) { } // Schedule an engine invocation for the next update to occur. - long headDeadline = timerQueue.peekDeadline(); + long headDeadline = eventQueue.peekDeadline(); if (headDeadline != Long.MAX_VALUE && headDeadline >= now) { trySchedule(futureInvocations, now, headDeadline); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index 16bb161fd..54eb6636a 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -24,6 +24,9 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsumer { private final ArrayList<FlowEdge> consumerEdges = new ArrayList<>(); @@ -36,12 +39,12 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu private double currentIncomingSupply; // The current supply provided by the supplier private boolean outgoingDemandUpdateNeeded = false; + private final Set<Integer> updatedDemands = new HashSet<>(); // Array of consumers that updated their demand in this cycle private boolean overloaded = false; private double capacity; // What is the max capacity. Can probably be removed - private final ArrayList<Integer> updatedDemands = new ArrayList<>(); public FlowDistributor(FlowGraph graph) { super(graph); @@ -68,7 +71,9 @@ public long onUpdate(long now) { return Long.MAX_VALUE; } - this.updateOutgoingSupplies(); + if (!this.outgoingSupplies.isEmpty()) { + this.updateOutgoingSupplies(); + } return Long.MAX_VALUE; } @@ -100,7 +105,7 @@ private void updateOutgoingSupplies() { // provide all consumers with their demand if (this.overloaded) { for (int idx = 0; idx < this.consumerEdges.size(); idx++) { - if (this.outgoingSupplies.get(idx) != this.incomingDemands.get(idx)) { + if (!Objects.equals(this.outgoingSupplies.get(idx), this.incomingDemands.get(idx))) { this.pushOutgoingSupply(this.consumerEdges.get(idx), this.incomingDemands.get(idx)); } } @@ -190,23 +195,25 @@ public void removeConsumerEdge(FlowEdge consumerEdge) { this.totalIncomingDemand -= consumerEdge.getDemand(); + // Remove idx from consumers that updated their demands + this.updatedDemands.remove(idx); + this.consumerEdges.remove(idx); this.incomingDemands.remove(idx); this.outgoingSupplies.remove(idx); // update the consumer index for all consumerEdges higher than this. for (int i = idx; i < this.consumerEdges.size(); i++) { - this.consumerEdges.get(i).setConsumerIndex(i); + FlowEdge other = this.consumerEdges.get(i); + + other.setConsumerIndex(other.getConsumerIndex() - 1); } - for (int i = 0; i < this.updatedDemands.size(); i++) { - int j = this.updatedDemands.get(i); + for (int idx_other : this.updatedDemands) { - if (j == idx) { - this.updatedDemands.remove(idx); - } - if (j > idx) { - this.updatedDemands.set(i, j - 1); + if (idx_other > idx) { + this.updatedDemands.remove(idx_other); + this.updatedDemands.add(idx_other - 1); } } @@ -220,7 +227,9 @@ public void removeSupplierEdge(FlowEdge supplierEdge) { this.capacity = 0; this.currentIncomingSupply = 0; - this.invalidate(); + this.updatedDemands.clear(); + + this.closeNode(); } @Override diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java index 0e6e137cc..916629503 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowGraph.java @@ -63,7 +63,7 @@ public void removeNode(FlowNode node) { // Remove all edges connected to node final ArrayList<FlowEdge> connectedEdges = nodeToEdge.get(node); - while (connectedEdges.size() > 0) { + while (!connectedEdges.isEmpty()) { removeEdge(connectedEdges.get(0)); } @@ -90,7 +90,7 @@ public void addEdge(FlowConsumer flowConsumer, FlowSupplier flowSupplier) { throw new IllegalArgumentException("The consumer is not a node in this graph"); } if (!(this.nodes.contains((FlowNode) flowSupplier))) { - throw new IllegalArgumentException("The consumer is not a node in this graph"); + throw new IllegalArgumentException("The supplier is not a node in this graph"); } final FlowEdge flowEdge = new FlowEdge(flowConsumer, flowSupplier); From 4bc9b767ef1e9e67e0d9f2c7c6dab495e29c9c02 Mon Sep 17 00:00:00 2001 From: DanteNiewenhuis <d.niewenhuis@hotmail.com> Date: Wed, 22 Jan 2025 10:08:00 +0100 Subject: [PATCH 2/6] small update --- .../compute/workload/trace/SimTraceWorkload.java | 2 +- .../compute/workload/trace/TraceWorkload.java | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java index 8f0d7c7d5..a194aea5e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -212,7 +212,7 @@ public void makeSnapshot(long now) { // Create and add a fragment for processing the snapshot process // TODO: improve the implementation of cpuUsage and coreCount - TraceFragment snapshotFragment = new TraceFragment(this.checkpointDuration, 123456, 1); + TraceFragment snapshotFragment = new TraceFragment(this.checkpointDuration, this.snapshot.getMaxCpuDemand(), this.snapshot.getMaxCoreCount()); this.remainingFragments.addFirst(snapshotFragment); this.fragmentIndex = -1; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java index 7dcda4c57..ab0c8e4a0 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java @@ -38,6 +38,7 @@ public class TraceWorkload implements Workload { private final long checkpointDuration; private final double checkpointIntervalScaling; private final double maxCpuDemand; + private final int maxCoreCount; public ScalingPolicy getScalingPolicy() { return scalingPolicy; @@ -59,6 +60,7 @@ public TraceWorkload( // TODO: remove if we decide not to use it. this.maxCpuDemand = fragments.stream().max(Comparator.comparing(TraceFragment::cpuUsage)).get().cpuUsage(); + this.maxCoreCount = fragments.stream().max(Comparator.comparing(TraceFragment::coreCount)).get().coreCount(); } public TraceWorkload(ArrayList<TraceFragment> fragments) { @@ -84,6 +86,14 @@ public double getCheckpointIntervalScaling() { return checkpointIntervalScaling; } + public int getMaxCoreCount() { + return maxCoreCount; + } + + public double getMaxCpuDemand() { + return maxCpuDemand; + } + public void removeFragments(int numberOfFragments) { if (numberOfFragments <= 0) { return; From e7774571b83892f41cfc184c9e26b16b55772ffc Mon Sep 17 00:00:00 2001 From: DanteNiewenhuis <d.niewenhuis@hotmail.com> Date: Thu, 23 Jan 2025 11:55:29 +0100 Subject: [PATCH 3/6] updated tests to reflect the changes in the checkpointing model --- .../org/opendc/experiments/base/CarbonTest.kt | 2 +- .../base/FailuresAndCheckpointingTest.kt | 124 +++++++++------ .../experiments/base/FragmentScalingTest.kt | 143 ++---------------- .../opendc/experiments/base/SchedulerTest.kt | 2 +- 4 files changed, 93 insertions(+), 178 deletions(-) diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/CarbonTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/CarbonTest.kt index 091f506a2..895eee927 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/CarbonTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/CarbonTest.kt @@ -26,7 +26,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll import org.opendc.compute.workload.Task -import org.opendc.simulator.compute.workload.TraceFragment +import org.opendc.simulator.compute.workload.trace.TraceFragment import java.util.ArrayList /** diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt index a3c3b3852..8867f2fde 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt @@ -256,14 +256,12 @@ class FailuresAndCheckpointingTest { assertAll( { assertEquals((10 * 60000) + (9 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } }, - { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } }, - { assertEquals(((10 * 30000) + (9 * 1000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } }, - { assertEquals((10 * 60 * 150.0) + (9 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + { assertEquals((10 * 60 * 150.0) + (9 * 150.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, ) } /** - * Checkpointing test 2: Single Task with scaling checkpointing + * Checkpointing test 1: Single Task with checkpointing * In this test, a single task is scheduled that is interrupted by a failure after 5 min. * Because there is no checkpointing, the full task has to be rerun. * @@ -275,6 +273,80 @@ class FailuresAndCheckpointingTest { */ @Test fun testCheckpoints2() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 2000.0, 1), + TraceFragment(10 * 60 * 1000, 1000.0, 1), + ), + checkpointInterval = 60 * 1000L, + checkpointDuration = 1000L, + ), + ) + + val topology = createTopology("single_1_2000.json") + + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals((20 * 60000) + (19 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } }, + { assertEquals((10 * 60 * 200.0) + (10 * 60 * 150.0) + (19 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + ) + } + + /** + * Checkpointing test 1: Single Task with checkpointing + * In this test, a single task is scheduled that is interrupted by a failure after 5 min. + * Because there is no checkpointing, the full task has to be rerun. + * + * This means the final runtime is 20 minutes + * + * When the task is running, it is using 50% of the cpu. + * This means that half of the time is active, and half is idle. + * When the task is failed, all time is idle. + */ + @Test + fun testCheckpoints3() { + val workload: ArrayList<Task> = + arrayListOf( + createTestTask( + name = "0", + fragments = + arrayListOf( + TraceFragment(10 * 60 * 1000, 1000.0, 1), + TraceFragment(10 * 60 * 1000, 2000.0, 1), + ), + checkpointInterval = 60 * 1000L, + checkpointDuration = 1000L, + ), + ) + + val topology = createTopology("single_1_2000.json") + + val monitor = runTest(topology, workload) + + assertAll( + { assertEquals((20 * 60000) + (19 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } }, + { assertEquals((10 * 60 * 200.0) + (10 * 60 * 150.0) + (19 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + ) + } + + /** + * Checkpointing test 2: Single Task with scaling checkpointing + * In this test, a single task is scheduled that is interrupted by a failure after 5 min. + * Because there is no checkpointing, the full task has to be rerun. + * + * This means the final runtime is 20 minutes + * + * When the task is running, it is using 50% of the cpu. + * This means that half of the time is active, and half is idle. + * When the task is failed, all time is idle. + */ + @Test + fun testCheckpoints4() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -295,9 +367,7 @@ class FailuresAndCheckpointingTest { assertAll( { assertEquals((10 * 60000) + (4 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } }, - { assertEquals(((10 * 30000)).toLong(), monitor.hostIdleTimes["H01"]?.sum()) { "Idle time incorrect" } }, - { assertEquals(((10 * 30000) + (4 * 1000)).toLong(), monitor.hostActiveTimes["H01"]?.sum()) { "Active time incorrect" } }, - { assertEquals((10 * 60 * 150.0) + (4 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + { assertEquals((10 * 60 * 150.0) + (4 * 150.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, ) } @@ -308,7 +378,7 @@ class FailuresAndCheckpointingTest { * */ @Test - fun testCheckpoints3() { + fun testCheckpoints5() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -336,23 +406,7 @@ class FailuresAndCheckpointingTest { { assertEquals((960 * 1000) + 5000, monitor.maxTimestamp) { "Total runtime incorrect" } }, { assertEquals( - ((300 * 1000) + (296 * 500) + (360 * 500)).toLong(), - monitor.hostIdleTimes["H01"]?.sum(), - ) { "Idle time incorrect" } - }, - { - assertEquals( - ((296 * 500) + 4000 + (360 * 500) + 5000).toLong(), - monitor.hostActiveTimes["H01"]?.sum(), - ) { "Active time incorrect" } - }, - { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } }, - { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } }, - { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } }, - { - assertEquals( - (296 * 150.0) + (4 * 200.0) + (300 * 100.0) + - (360 * 150.0) + (5 * 200.0), + (665 * 150.0) + (300 * 100.0), monitor.hostEnergyUsages["H01"]?.sum(), ) { "Incorrect energy usage" } }, @@ -366,7 +420,7 @@ class FailuresAndCheckpointingTest { * */ @Test - fun testCheckpoints4() { + fun testCheckpoints6() { val workload: ArrayList<Task> = arrayListOf( createTestTask( @@ -394,23 +448,7 @@ class FailuresAndCheckpointingTest { { assertEquals((22 * 60000) + 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, { assertEquals( - ((10 * 60000) + (2 * 296 * 500) + (120 * 500)).toLong(), - monitor.hostIdleTimes["H01"]?.sum(), - ) { "Idle time incorrect" } - }, - { - assertEquals( - ((2 * 296 * 500) + 8000 + (120 * 500) + 1000).toLong(), - monitor.hostActiveTimes["H01"]?.sum(), - ) { "Active time incorrect" } - }, - { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(0)) { "Incorrect energy usage" } }, - { assertEquals(6000.0, monitor.hostEnergyUsages["H01"]?.get(5)) { "Incorrect energy usage" } }, - { assertEquals(9000.0, monitor.hostEnergyUsages["H01"]?.get(10)) { "Incorrect energy usage" } }, - { - assertEquals( - (2 * 296 * 150.0) + (8 * 200.0) + (600 * 100.0) + - (120 * 150.0) + (200.0), + (300 * 150.0) + (300 * 100.0) + (300 * 150.0) + (300 * 100.0) + (121 * 150.0) , monitor.hostEnergyUsages["H01"]?.sum(), ) { "Incorrect energy usage" } }, diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt index 8c94f0737..fb9bd95ac 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt @@ -63,86 +63,6 @@ import java.util.UUID * Testing suite containing tests that specifically test the scaling of trace fragments */ class FragmentScalingTest { - /** - * The monitor used to keep track of the metrics. - */ - private lateinit var monitor: TestComputeMonitor - - /** - * The [FilterScheduler] to use for all experiments. - */ - private lateinit var computeScheduler: FilterScheduler - - /** - * The [ComputeWorkloadLoader] responsible for loading the traces. - */ - private lateinit var workloadLoader: ComputeWorkloadLoader - - private val basePath = "src/test/resources/FragmentScaling" - - /** - * Set up the experimental environment. - */ - @BeforeEach - fun setUp() { - monitor = TestComputeMonitor() - computeScheduler = - FilterScheduler( - filters = listOf(ComputeFilter(), VCpuFilter(16.0), RamFilter(1.0)), - weighers = listOf(CoreRamWeigher(multiplier = 1.0)), - ) - workloadLoader = ComputeWorkloadLoader(File("$basePath/traces"), 0L, 0L, 0.0) - } - - private fun createTestTask( - name: String, - cpuCount: Int = 1, - cpuCapacity: Double = 0.0, - memCapacity: Long = 0L, - submissionTime: String = "1970-01-01T00:00", - duration: Long = 0L, - fragments: ArrayList<TraceFragment>, - scalingPolicy: ScalingPolicy = NoDelayScaling(), - ): Task { - return Task( - UUID.nameUUIDFromBytes(name.toByteArray()), - name, - cpuCount, - cpuCapacity, - memCapacity, - 1800000.0, - LocalDateTime.parse(submissionTime).atZone(ZoneId.systemDefault()).toInstant(), - duration, - TraceWorkload( - fragments, - 0L, 0L, 0.0, - scalingPolicy - ), - ) - } - - private fun runTest( - topology: List<ClusterSpec>, - workload: ArrayList<Task> - ): TestComputeMonitor { - - val monitor = TestComputeMonitor() - runSimulation { - val seed = 0L - Provisioner(dispatcher, seed).use { provisioner -> - provisioner.runSteps( - setupComputeService(serviceDomain = "compute.opendc.org", { computeScheduler }), - registerComputeMonitor(serviceDomain = "compute.opendc.org", monitor, exportInterval = Duration.ofMinutes(1)), - setupHosts(serviceDomain = "compute.opendc.org", topology), - ) - - val service = provisioner.registry.resolve("compute.opendc.org", ComputeService::class.java)!! - service.replay(timeSource, workload) - } - } - return monitor - } - /** * Scaling test 1: A single fitting task * In this test, a single task is scheduled that should fit the system. @@ -181,8 +101,8 @@ class FragmentScalingTest { val monitorPerfect = runTest(topology, workloadPerfect) assertAll( - { assertEquals(1200000, monitorNoDelay.finalTimestamp) { "The workload took longer to finish than expected." } }, - { assertEquals(1200000, monitorPerfect.finalTimestamp) { "The workload took longer to finish than expected." } }, + { assertEquals(1200000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } }, + { assertEquals(1200000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } }, { assertEquals(2000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, @@ -237,8 +157,8 @@ class FragmentScalingTest { val monitorPerfect = runTest(topology, workloadPerfect) assertAll( - { assertEquals(600000, monitorNoDelay.finalTimestamp) { "The workload took longer to finish than expected." } }, - { assertEquals(1200000, monitorPerfect.finalTimestamp) { "The workload took longer to finish than expected." } }, + { assertEquals(600000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } }, + { assertEquals(1200000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } }, { assertEquals(4000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(4000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, @@ -292,8 +212,8 @@ class FragmentScalingTest { val monitorPerfect = runTest(topology, workloadPerfect) assertAll( - { assertEquals(1800000, monitorNoDelay.finalTimestamp) { "The workload took longer to finish than expected." } }, - { assertEquals(2400000, monitorPerfect.finalTimestamp) { "The workload took longer to finish than expected." } }, + { assertEquals(1800000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } }, + { assertEquals(2400000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } }, { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(1000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, @@ -371,8 +291,8 @@ class FragmentScalingTest { val monitorPerfect = runTest(topology, workloadPerfect) assertAll( - { assertEquals(600000, monitorNoDelay.finalTimestamp) { "The workload took longer to finish than expected." } }, - { assertEquals(600000, monitorPerfect.finalTimestamp) { "The workload took longer to finish than expected." } }, + { assertEquals(600000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } }, + { assertEquals(600000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } }, { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(3000.0, monitorNoDelay.taskCpuDemands["1"]?.get(0)) { "The cpu demanded by task 1 is incorrect" } }, @@ -443,8 +363,8 @@ class FragmentScalingTest { val monitorPerfect = runTest(topology, workloadPerfect) // assertAll( -// { assertEquals(600000, monitorNoDelay.finalTimestamp) { "The workload took longer to finish than expected." } }, -// { assertEquals(900000, monitorPerfect.finalTimestamp) { "The workload took longer to finish than expected." } }, +// { assertEquals(600000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } }, +// { assertEquals(900000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } }, // // { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, // { assertEquals(3000.0, monitorNoDelay.taskCpuDemands["1"]?.get(0)) { "The cpu demanded by task 1 is incorrect" } }, @@ -457,47 +377,4 @@ class FragmentScalingTest { // { assertEquals(3000.0, monitorPerfect.taskCpuSupplied["1"]?.get(0)) { "The cpu supplied to task 1 is incorrect" } }, // ) } - /** - * Obtain the topology factory for the test. - */ - private fun createTopology(name: String): List<ClusterSpec> { - val stream = checkNotNull(object {}.javaClass.getResourceAsStream("/FragmentScaling/topologies/$name")) - return stream.use { clusterTopology(stream) } - } - - class TestComputeMonitor : ComputeMonitor { - var finalTimestamp: Long = 0L; - - - override fun record(reader: ServiceTableReader) { - finalTimestamp = reader.timestamp.toEpochMilli(); - - super.record(reader) - } - - - - var hostCpuDemands = ArrayList<Double>() - var hostCpuSupplied = ArrayList<Double>() - - override fun record(reader: HostTableReader) { - hostCpuDemands.add(reader.cpuDemand) - hostCpuSupplied.add(reader.cpuUsage) - } - - var taskCpuDemands = mutableMapOf<String, ArrayList<Double>>() - var taskCpuSupplied = mutableMapOf<String, ArrayList<Double>>() - - override fun record(reader: TaskTableReader) { - val taskName: String = reader.taskInfo.name - - if (taskName in taskCpuDemands) { - taskCpuDemands[taskName]?.add(reader.cpuDemand) - taskCpuSupplied[taskName]?.add(reader.cpuUsage) - } else { - taskCpuDemands[taskName] = arrayListOf(reader.cpuDemand) - taskCpuSupplied[taskName] = arrayListOf(reader.cpuUsage) - } - } - } } diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt index 6d80ce564..f9a20c68b 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/SchedulerTest.kt @@ -30,7 +30,7 @@ import org.opendc.compute.simulator.scheduler.filters.ComputeFilter import org.opendc.compute.simulator.scheduler.filters.RamFilter import org.opendc.compute.simulator.scheduler.filters.VCpuFilter import org.opendc.compute.workload.Task -import org.opendc.simulator.compute.workload.TraceFragment +import org.opendc.simulator.compute.workload.trace.TraceFragment import java.util.ArrayList class SchedulerTest { From 0972b5da1f9ba430526d9b88afc690246ec163a5 Mon Sep 17 00:00:00 2001 From: DanteNiewenhuis <d.niewenhuis@hotmail.com> Date: Fri, 24 Jan 2025 12:36:14 +0100 Subject: [PATCH 4/6] Updated the checkpointing tests to reflect the changes made --- .../compute/simulator/internal/Guest.kt | 2 +- .../compute/workload/ComputeWorkloadLoader.kt | 14 +++- .../base/experiment/specs/WorkloadSpec.kt | 23 ++++-- .../experiments/base/runner/ScenarioRunner.kt | 6 +- .../base/FailuresAndCheckpointingTest.kt | 63 ++++++++-------- .../experiments/base/FragmentScalingTest.kt | 71 ++++--------------- .../opendc/experiments/base/TestingUtils.kt | 4 +- .../workload/trace/SimTraceWorkload.java | 22 +++--- .../compute/workload/trace/TraceWorkload.java | 39 +++++++--- .../trace/scaling/NoDelayScaling.java | 22 ++++++ .../trace/scaling/PerfectScaling.java | 22 ++++++ .../workload/trace/scaling/ScalingPolicy.java | 49 +++++++++++++ .../engine/graph/FlowDistributor.java | 4 +- 13 files changed, 214 insertions(+), 127 deletions(-) diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt index 2151bcadb..7f5f09eb3 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/internal/Guest.kt @@ -110,7 +110,7 @@ public class Guest( 0, 0, 0.0, - scalingPolicy + scalingPolicy, ) if (task.workload is TraceWorkload) { diff --git a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt index e01054179..2b8b589f3 100644 --- a/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt +++ b/opendc-compute/opendc-compute-workload/src/main/kotlin/org/opendc/compute/workload/ComputeWorkloadLoader.kt @@ -55,7 +55,7 @@ public class ComputeWorkloadLoader( private val checkpointInterval: Long = 0L, private val checkpointDuration: Long = 0L, private val checkpointIntervalScaling: Double = 1.0, - private val scalingPolicy: ScalingPolicy = NoDelayScaling() + private val scalingPolicy: ScalingPolicy = NoDelayScaling(), ) : WorkloadLoader(subMissionTime) { /** * The logger for this instance. @@ -87,7 +87,10 @@ public class ComputeWorkloadLoader( val cores = reader.getInt(coresCol) val cpuUsage = reader.getDouble(usageCol) - val builder = fragments.computeIfAbsent(id) { Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy) } + val builder = + fragments.computeIfAbsent( + id, + ) { Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy) } builder.add(durationMs, cpuUsage, cores) } @@ -181,7 +184,12 @@ public class ComputeWorkloadLoader( /** * A builder for a VM trace. */ - private class Builder(checkpointInterval: Long, checkpointDuration: Long, checkpointIntervalScaling: Double, scalingPolicy: ScalingPolicy) { + private class Builder( + checkpointInterval: Long, + checkpointDuration: Long, + checkpointIntervalScaling: Double, + scalingPolicy: ScalingPolicy, + ) { /** * The total load of the trace. */ diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt index 87c7abe9a..01a45788e 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt @@ -25,6 +25,9 @@ package org.opendc.experiments.base.experiment.specs import kotlinx.serialization.Serializable import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.WorkloadLoader +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling +import org.opendc.simulator.compute.workload.trace.scaling.PerfectScaling +import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy import java.io.File /** @@ -41,6 +44,7 @@ public data class WorkloadSpec( val type: WorkloadTypes, val sampleFraction: Double = 1.0, val submissionTime: String? = null, + val scalingPolicy: ScalingPolicyEnum = ScalingPolicyEnum.NoDelay ) { public val name: String = File(pathToFile).nameWithoutExtension @@ -56,11 +60,6 @@ public data class WorkloadSpec( * @constructor Create empty Workload types */ public enum class WorkloadTypes { - /** - * Compute workload - * - * @constructor Create empty Compute workload - */ ComputeWorkload, } @@ -74,6 +73,7 @@ public fun getWorkloadLoader( checkpointInterval: Long, checkpointDuration: Long, checkpointIntervalScaling: Double, + scalingPolicy: ScalingPolicy ): WorkloadLoader { return when (type) { WorkloadTypes.ComputeWorkload -> @@ -83,6 +83,19 @@ public fun getWorkloadLoader( checkpointInterval, checkpointDuration, checkpointIntervalScaling, + scalingPolicy ) } } + +public enum class ScalingPolicyEnum { + NoDelay, + Perfect +} + +public fun getScalingPolicy(scalingPolicyEnum: ScalingPolicyEnum): ScalingPolicy { + return when (scalingPolicyEnum) { + ScalingPolicyEnum.NoDelay -> NoDelayScaling() + ScalingPolicyEnum.Perfect -> PerfectScaling() + } +} diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index 22c0b3ec0..e706280c1 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -33,10 +33,9 @@ import org.opendc.compute.simulator.service.ComputeService import org.opendc.compute.simulator.telemetry.parquet.ParquetComputeMonitor import org.opendc.compute.topology.clusterTopology import org.opendc.experiments.base.experiment.Scenario +import org.opendc.experiments.base.experiment.specs.getScalingPolicy import org.opendc.experiments.base.experiment.specs.getWorkloadLoader -import org.opendc.experiments.base.experiment.specs.getWorkloadType import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling -import org.opendc.simulator.compute.workload.trace.scaling.PerfectScaling import org.opendc.simulator.kotlin.runSimulation import java.io.File import java.time.Duration @@ -83,8 +82,7 @@ public fun runScenario( val checkpointDuration = scenario.checkpointModelSpec?.checkpointDuration ?: 0L val checkpointIntervalScaling = scenario.checkpointModelSpec?.checkpointIntervalScaling ?: 1.0 - val scalingPolicy = NoDelayScaling(); -// val scalingPolicy = PerfectScaling(); + val scalingPolicy = getScalingPolicy(scenario.workloadSpec.scalingPolicy) val workloadLoader = getWorkloadLoader( diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt index 8867f2fde..3231f533f 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FailuresAndCheckpointingTest.kt @@ -227,13 +227,10 @@ class FailuresAndCheckpointingTest { /** * Checkpointing test 1: Single Task with checkpointing * In this test, a single task is scheduled that is interrupted by a failure after 5 min. - * Because there is no checkpointing, the full task has to be rerun. - * - * This means the final runtime is 20 minutes + * The system is using checkpointing, taking snapshots every minute. * - * When the task is running, it is using 50% of the cpu. - * This means that half of the time is active, and half is idle. - * When the task is failed, all time is idle. + * This means that after failure, only 6 minutes of the task is left. + * However, taking a snapshot takes 1 second, which means 9 seconds have to be added to the total runtime. */ @Test fun testCheckpoints1() { @@ -261,15 +258,15 @@ class FailuresAndCheckpointingTest { } /** - * Checkpointing test 1: Single Task with checkpointing + * Checkpointing test 2: Single Task with checkpointing, higher cpu demand * In this test, a single task is scheduled that is interrupted by a failure after 5 min. - * Because there is no checkpointing, the full task has to be rerun. + * The system is using checkpointing, taking snapshots every minute. * - * This means the final runtime is 20 minutes + * This means that after failure, only 16 minutes of the task is left. + * However, taking a snapshot takes 1 second, which means 19 seconds have to be added to the total runtime. * - * When the task is running, it is using 50% of the cpu. - * This means that half of the time is active, and half is idle. - * When the task is failed, all time is idle. + * This is similar to the previous test, but the cpu demand of taking a snapshot is higher. + * The cpu demand of taking a snapshot is as high as the highest fragment */ @Test fun testCheckpoints2() { @@ -293,20 +290,25 @@ class FailuresAndCheckpointingTest { assertAll( { assertEquals((20 * 60000) + (19 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } }, - { assertEquals((10 * 60 * 200.0) + (10 * 60 * 150.0) + (19 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + { + assertEquals( + (10 * 60 * 200.0) + (10 * 60 * 150.0) + (19 * 200.0), + monitor.hostEnergyUsages["H01"]?.sum(), + ) { "Incorrect energy usage" } + }, ) } /** - * Checkpointing test 1: Single Task with checkpointing + * Checkpointing test 3: Single Task with checkpointing, higher cpu demand * In this test, a single task is scheduled that is interrupted by a failure after 5 min. - * Because there is no checkpointing, the full task has to be rerun. + * The system is using checkpointing, taking snapshots every minute. * - * This means the final runtime is 20 minutes + * This means that after failure, only 16 minutes of the task is left. + * However, taking a snapshot takes 1 second, which means 19 seconds have to be added to the total runtime. + * + * This is similar to the previous test, but the fragments are reversed * - * When the task is running, it is using 50% of the cpu. - * This means that half of the time is active, and half is idle. - * When the task is failed, all time is idle. */ @Test fun testCheckpoints3() { @@ -330,20 +332,21 @@ class FailuresAndCheckpointingTest { assertAll( { assertEquals((20 * 60000) + (19 * 1000), monitor.maxTimestamp) { "Total runtime incorrect" } }, - { assertEquals((10 * 60 * 200.0) + (10 * 60 * 150.0) + (19 * 200.0), monitor.hostEnergyUsages["H01"]?.sum()) { "Incorrect energy usage" } }, + { + assertEquals( + (10 * 60 * 200.0) + (10 * 60 * 150.0) + (19 * 200.0), + monitor.hostEnergyUsages["H01"]?.sum(), + ) { "Incorrect energy usage" } + }, ) } /** - * Checkpointing test 2: Single Task with scaling checkpointing - * In this test, a single task is scheduled that is interrupted by a failure after 5 min. - * Because there is no checkpointing, the full task has to be rerun. + * Checkpointing test 4: Single Task with scaling checkpointing + * In this test, checkpointing is used, with a scaling factor of 1.5 * - * This means the final runtime is 20 minutes + * This means that the interval between checkpoints starts at 1 min, but is multiplied by 1.5 every snapshot. * - * When the task is running, it is using 50% of the cpu. - * This means that half of the time is active, and half is idle. - * When the task is failed, all time is idle. */ @Test fun testCheckpoints4() { @@ -372,7 +375,7 @@ class FailuresAndCheckpointingTest { } /** - * Checkpointing test 3: Single Task, single failure with checkpointing + * Checkpointing test 5: Single Task, single failure with checkpointing * In this test, a single task is scheduled that is interrupted by a failure after 5 min. * Because there is no checkpointing, the full task has to be rerun. * @@ -414,7 +417,7 @@ class FailuresAndCheckpointingTest { } /** - * Checkpointing test 4: Single Task, repeated failure with checkpointing + * Checkpointing test 6: Single Task, repeated failure with checkpointing * In this test, a single task is scheduled that is interrupted by a failure after 5 min. * Because there is no checkpointing, the full task has to be rerun. * @@ -448,7 +451,7 @@ class FailuresAndCheckpointingTest { { assertEquals((22 * 60000) + 1000, monitor.maxTimestamp) { "Total runtime incorrect" } }, { assertEquals( - (300 * 150.0) + (300 * 100.0) + (300 * 150.0) + (300 * 100.0) + (121 * 150.0) , + (300 * 150.0) + (300 * 100.0) + (300 * 150.0) + (300 * 100.0) + (121 * 150.0), monitor.hostEnergyUsages["H01"]?.sum(), ) { "Incorrect energy usage" } }, diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt index fb9bd95ac..b0aa3555d 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/FragmentScalingTest.kt @@ -23,41 +23,13 @@ package org.opendc.experiments.base import org.junit.jupiter.api.Assertions.assertEquals -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertAll -import org.opendc.compute.simulator.provisioner.Provisioner -import org.opendc.compute.simulator.provisioner.registerComputeMonitor -import org.opendc.compute.simulator.provisioner.setupComputeService -import org.opendc.compute.simulator.provisioner.setupHosts -import org.opendc.compute.simulator.scheduler.FilterScheduler -import org.opendc.compute.simulator.scheduler.filters.ComputeFilter -import org.opendc.compute.simulator.scheduler.filters.RamFilter -import org.opendc.compute.simulator.scheduler.filters.VCpuFilter -import org.opendc.compute.simulator.scheduler.weights.CoreRamWeigher -import org.opendc.compute.simulator.service.ComputeService -import org.opendc.compute.simulator.telemetry.ComputeMonitor -import org.opendc.compute.simulator.telemetry.table.HostTableReader -import org.opendc.compute.simulator.telemetry.table.ServiceTableReader -import org.opendc.compute.simulator.telemetry.table.TaskTableReader -import org.opendc.compute.topology.clusterTopology -import org.opendc.compute.topology.specs.ClusterSpec -import org.opendc.compute.workload.ComputeWorkloadLoader import org.opendc.compute.workload.Task -import org.opendc.experiments.base.runner.replay import org.opendc.simulator.compute.workload.trace.TraceFragment -import org.opendc.simulator.compute.workload.trace.TraceWorkload import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling import org.opendc.simulator.compute.workload.trace.scaling.PerfectScaling -import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy -import org.opendc.simulator.kotlin.runSimulation -import java.io.File -import java.time.Duration -import java.time.Instant -import java.time.LocalDateTime -import java.time.ZoneId import java.util.ArrayList -import java.util.UUID /** * Testing suite containing tests that specifically test the scaling of trace fragments @@ -79,7 +51,7 @@ class FragmentScalingTest { TraceFragment(10 * 60 * 1000, 2000.0, 1), TraceFragment(10 * 60 * 1000, 1000.0, 1), ), - scalingPolicy = NoDelayScaling() + scalingPolicy = NoDelayScaling(), ), ) @@ -92,7 +64,7 @@ class FragmentScalingTest { TraceFragment(10 * 60 * 1000, 2000.0, 1), TraceFragment(10 * 60 * 1000, 1000.0, 1), ), - scalingPolicy = PerfectScaling() + scalingPolicy = PerfectScaling(), ), ) val topology = createTopology("single_1_2000.json") @@ -103,16 +75,12 @@ class FragmentScalingTest { assertAll( { assertEquals(1200000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } }, { assertEquals(1200000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } }, - { assertEquals(2000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(2000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, - { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(1000.0, monitorPerfect.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(1000.0, monitorPerfect.taskCpuSupplied["0"]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, ) @@ -136,7 +104,7 @@ class FragmentScalingTest { arrayListOf( TraceFragment(10 * 60 * 1000, 4000.0, 1), ), - scalingPolicy = NoDelayScaling() + scalingPolicy = NoDelayScaling(), ), ) @@ -148,7 +116,7 @@ class FragmentScalingTest { arrayListOf( TraceFragment(10 * 60 * 1000, 4000.0, 1), ), - scalingPolicy = PerfectScaling() + scalingPolicy = PerfectScaling(), ), ) val topology = createTopology("single_1_2000.json") @@ -159,10 +127,8 @@ class FragmentScalingTest { assertAll( { assertEquals(600000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } }, { assertEquals(1200000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } }, - { assertEquals(4000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(4000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(2000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, ) @@ -189,7 +155,7 @@ class FragmentScalingTest { TraceFragment(10 * 60 * 1000, 4000.0, 1), TraceFragment(10 * 60 * 1000, 1500.0, 1), ), - scalingPolicy = NoDelayScaling() + scalingPolicy = NoDelayScaling(), ), ) @@ -203,7 +169,7 @@ class FragmentScalingTest { TraceFragment(10 * 60 * 1000, 4000.0, 1), TraceFragment(10 * 60 * 1000, 1500.0, 1), ), - scalingPolicy = PerfectScaling() + scalingPolicy = PerfectScaling(), ), ) val topology = createTopology("single_1_2000.json") @@ -214,25 +180,18 @@ class FragmentScalingTest { assertAll( { assertEquals(1800000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } }, { assertEquals(2400000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } }, - { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(1000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(1000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, - { assertEquals(4000.0, monitorNoDelay.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(4000.0, monitorPerfect.taskCpuDemands["0"]?.get(9)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(2000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(2000.0, monitorPerfect.taskCpuSupplied["0"]?.get(9)) { "The cpu supplied to task 0 is incorrect" } }, - { assertEquals(1500.0, monitorNoDelay.taskCpuDemands["0"]?.get(19)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(4000.0, monitorPerfect.taskCpuDemands["0"]?.get(19)) { "The cpu demanded by task 0 is incorrect" } }, - { assertEquals(1500.0, monitorNoDelay.taskCpuSupplied["0"]?.get(19)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(2000.0, monitorPerfect.taskCpuSupplied["0"]?.get(19)) { "The cpu supplied to task 0 is incorrect" } }, - { assertEquals(1500.0, monitorPerfect.taskCpuDemands["0"]?.get(29)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(1500.0, monitorPerfect.taskCpuSupplied["0"]?.get(29)) { "The cpu supplied to task 0 is incorrect" } }, ) @@ -254,7 +213,7 @@ class FragmentScalingTest { arrayListOf( TraceFragment(10 * 60 * 1000, 1000.0, 1), ), - scalingPolicy = NoDelayScaling() + scalingPolicy = NoDelayScaling(), ), createTestTask( name = "1", @@ -262,7 +221,7 @@ class FragmentScalingTest { arrayListOf( TraceFragment(10 * 60 * 1000, 3000.0, 1), ), - scalingPolicy = NoDelayScaling() + scalingPolicy = NoDelayScaling(), ), ) @@ -274,7 +233,7 @@ class FragmentScalingTest { arrayListOf( TraceFragment(10 * 60 * 1000, 1000.0, 1), ), - scalingPolicy = PerfectScaling() + scalingPolicy = PerfectScaling(), ), createTestTask( name = "1", @@ -282,7 +241,7 @@ class FragmentScalingTest { arrayListOf( TraceFragment(10 * 60 * 1000, 3000.0, 1), ), - scalingPolicy = PerfectScaling() + scalingPolicy = PerfectScaling(), ), ) val topology = createTopology("single_2_2000.json") @@ -293,12 +252,10 @@ class FragmentScalingTest { assertAll( { assertEquals(600000, monitorNoDelay.maxTimestamp) { "The workload took longer to finish than expected." } }, { assertEquals(600000, monitorPerfect.maxTimestamp) { "The workload took longer to finish than expected." } }, - { assertEquals(1000.0, monitorNoDelay.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(3000.0, monitorNoDelay.taskCpuDemands["1"]?.get(0)) { "The cpu demanded by task 1 is incorrect" } }, { assertEquals(1000.0, monitorPerfect.taskCpuDemands["0"]?.get(0)) { "The cpu demanded by task 0 is incorrect" } }, { assertEquals(3000.0, monitorPerfect.taskCpuDemands["1"]?.get(0)) { "The cpu demanded by task 1 is incorrect" } }, - { assertEquals(1000.0, monitorNoDelay.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, { assertEquals(3000.0, monitorNoDelay.taskCpuSupplied["1"]?.get(0)) { "The cpu supplied to task 1 is incorrect" } }, { assertEquals(1000.0, monitorPerfect.taskCpuSupplied["0"]?.get(0)) { "The cpu supplied to task 0 is incorrect" } }, @@ -326,7 +283,7 @@ class FragmentScalingTest { arrayListOf( TraceFragment(10 * 60 * 1000, 2000.0, 1), ), - scalingPolicy = NoDelayScaling() + scalingPolicy = NoDelayScaling(), ), createTestTask( name = "1", @@ -334,7 +291,7 @@ class FragmentScalingTest { arrayListOf( TraceFragment(10 * 60 * 1000, 4000.0, 1), ), - scalingPolicy = NoDelayScaling() + scalingPolicy = NoDelayScaling(), ), ) @@ -346,7 +303,7 @@ class FragmentScalingTest { arrayListOf( TraceFragment(10 * 60 * 1000, 2000.0, 1), ), - scalingPolicy = PerfectScaling() + scalingPolicy = PerfectScaling(), ), createTestTask( name = "1", @@ -354,7 +311,7 @@ class FragmentScalingTest { arrayListOf( TraceFragment(10 * 60 * 1000, 4000.0, 1), ), - scalingPolicy = PerfectScaling() + scalingPolicy = PerfectScaling(), ), ) val topology = createTopology("single_2_2000.json") diff --git a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt index 6f91ec43f..df45f3743 100644 --- a/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt +++ b/opendc-experiments/opendc-experiments-base/src/test/kotlin/org/opendc/experiments/base/TestingUtils.kt @@ -71,7 +71,7 @@ fun createTestTask( checkpointInterval: Long = 0L, checkpointDuration: Long = 0L, checkpointIntervalScaling: Double = 1.0, - scalingPolicy: ScalingPolicy = NoDelayScaling() + scalingPolicy: ScalingPolicy = NoDelayScaling(), ): Task { return Task( UUID.nameUUIDFromBytes(name.toByteArray()), @@ -87,7 +87,7 @@ fun createTestTask( checkpointInterval, checkpointDuration, checkpointIntervalScaling, - scalingPolicy + scalingPolicy, ), ) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java index a194aea5e..93733268f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/SimTraceWorkload.java @@ -23,16 +23,15 @@ package org.opendc.simulator.compute.workload.trace; import java.util.LinkedList; +import org.opendc.simulator.compute.workload.SimWorkload; +import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; +import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy; import org.opendc.simulator.engine.graph.FlowConsumer; import org.opendc.simulator.engine.graph.FlowEdge; import org.opendc.simulator.engine.graph.FlowGraph; import org.opendc.simulator.engine.graph.FlowNode; import org.opendc.simulator.engine.graph.FlowSupplier; -import org.opendc.simulator.compute.workload.SimWorkload; -import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; -import org.opendc.simulator.compute.workload.trace.scaling.ScalingPolicy; - public class SimTraceWorkload extends SimWorkload implements FlowConsumer { private LinkedList<TraceFragment> remainingFragments; private int fragmentIndex; @@ -122,7 +121,8 @@ public long onUpdate(long now) { this.cpuFreqSupplied = this.newCpuFreqSupplied; // The amount of time required to finish the fragment at this speed - long remainingDuration = this.scalingPolicy.getRemainingDuration(this.cpuFreqDemand, this.newCpuFreqSupplied, this.remainingWork); + long remainingDuration = this.scalingPolicy.getRemainingDuration( + this.cpuFreqDemand, this.newCpuFreqSupplied, this.remainingWork); return now + remainingDuration; } @@ -139,7 +139,6 @@ public TraceFragment getNextFragment() { private void startNextFragment() { - // TODO: turn this into a loop, should not be needed, but might be safer TraceFragment nextFragment = this.getNextFragment(); if (nextFragment == null) { this.stopWorkload(); @@ -150,8 +149,6 @@ private void startNextFragment() { this.pushOutgoingDemand(this.machineEdge, demand); } - - @Override public void stopWorkload() { if (this.machineEdge == null) { @@ -193,7 +190,8 @@ public void makeSnapshot(long now) { this.remainingWork -= finishedWork; // The amount of time required to finish the fragment at this speed - long remainingTime = this.scalingPolicy.getRemainingDuration(this.cpuFreqDemand, this.cpuFreqDemand, this.remainingWork); + long remainingTime = + this.scalingPolicy.getRemainingDuration(this.cpuFreqDemand, this.cpuFreqDemand, this.remainingWork); // If this is the end of the Task, don't make a snapshot if (remainingTime <= 0 && remainingFragments.isEmpty()) { @@ -202,7 +200,7 @@ public void makeSnapshot(long now) { // Create a new fragment based on the current fragment and remaining duration TraceFragment newFragment = - new TraceFragment(remainingTime, currentFragment.cpuUsage(), currentFragment.coreCount()); + new TraceFragment(remainingTime, currentFragment.cpuUsage(), currentFragment.coreCount()); // Alter the snapshot by removing finished fragments this.snapshot.removeFragments(this.fragmentIndex); @@ -211,8 +209,8 @@ public void makeSnapshot(long now) { this.remainingFragments.addFirst(newFragment); // Create and add a fragment for processing the snapshot process - // TODO: improve the implementation of cpuUsage and coreCount - TraceFragment snapshotFragment = new TraceFragment(this.checkpointDuration, this.snapshot.getMaxCpuDemand(), this.snapshot.getMaxCoreCount()); + TraceFragment snapshotFragment = new TraceFragment( + this.checkpointDuration, this.snapshot.getMaxCpuDemand(), this.snapshot.getMaxCoreCount()); this.remainingFragments.addFirst(snapshotFragment); this.fragmentIndex = -1; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java index ab0c8e4a0..47292a7b8 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/TraceWorkload.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; - import org.opendc.simulator.compute.workload.SimWorkload; import org.opendc.simulator.compute.workload.Workload; import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling; @@ -47,11 +46,11 @@ public ScalingPolicy getScalingPolicy() { private final ScalingPolicy scalingPolicy; public TraceWorkload( - ArrayList<TraceFragment> fragments, - long checkpointInterval, - long checkpointDuration, - double checkpointIntervalScaling, - ScalingPolicy scalingPolicy) { + ArrayList<TraceFragment> fragments, + long checkpointInterval, + long checkpointDuration, + double checkpointIntervalScaling, + ScalingPolicy scalingPolicy) { this.fragments = fragments; this.checkpointInterval = checkpointInterval; this.checkpointDuration = checkpointDuration; @@ -59,8 +58,14 @@ public TraceWorkload( this.scalingPolicy = scalingPolicy; // TODO: remove if we decide not to use it. - this.maxCpuDemand = fragments.stream().max(Comparator.comparing(TraceFragment::cpuUsage)).get().cpuUsage(); - this.maxCoreCount = fragments.stream().max(Comparator.comparing(TraceFragment::coreCount)).get().coreCount(); + this.maxCpuDemand = fragments.stream() + .max(Comparator.comparing(TraceFragment::cpuUsage)) + .get() + .cpuUsage(); + this.maxCoreCount = fragments.stream() + .max(Comparator.comparing(TraceFragment::coreCount)) + .get() + .coreCount(); } public TraceWorkload(ArrayList<TraceFragment> fragments) { @@ -114,7 +119,11 @@ public static Builder builder() { return builder(0L, 0L, 0.0, new NoDelayScaling()); } - public static Builder builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling, ScalingPolicy scalingPolicy) { + public static Builder builder( + long checkpointInterval, + long checkpointDuration, + double checkpointIntervalScaling, + ScalingPolicy scalingPolicy) { return new Builder(checkpointInterval, checkpointDuration, checkpointIntervalScaling, scalingPolicy); } @@ -158,7 +167,11 @@ public static final class Builder { /** * Construct a new {@link Builder} instance. */ - private Builder(long checkpointInterval, long checkpointDuration, double checkpointIntervalScaling, ScalingPolicy scalingPolicy) { + private Builder( + long checkpointInterval, + long checkpointDuration, + double checkpointIntervalScaling, + ScalingPolicy scalingPolicy) { this.fragments = new ArrayList<>(); this.checkpointInterval = checkpointInterval; this.checkpointDuration = checkpointDuration; @@ -182,7 +195,11 @@ public void add(long duration, double usage, int cores) { */ public TraceWorkload build() { return new TraceWorkload( - this.fragments, this.checkpointInterval, this.checkpointDuration, this.checkpointIntervalScaling, this.scalingPolicy); + this.fragments, + this.checkpointInterval, + this.checkpointDuration, + this.checkpointIntervalScaling, + this.scalingPolicy); } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java index c0ec750d7..4230bb55f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/NoDelayScaling.java @@ -1,3 +1,25 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package org.opendc.simulator.compute.workload.trace.scaling; /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java index 281068880..7eae70e6c 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/PerfectScaling.java @@ -1,3 +1,25 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package org.opendc.simulator.compute.workload.trace.scaling; /** diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java index ee734aeec..a0f473ba9 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/trace/scaling/ScalingPolicy.java @@ -1,10 +1,59 @@ +/* + * Copyright (c) 2025 AtLarge Research + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + package org.opendc.simulator.compute.workload.trace.scaling; +/** + * Interface for the scaling policy. + * A scaling decides how a TaskFragment should scale when it is not getting the demanded capacity + */ public interface ScalingPolicy { + /** + * Calculate how much work was finished based on the demanded and supplied cpu + * + * @param cpuFreqDemand + * @param cpuFreqSupplied + * @param passedTime + * @return + */ double getFinishedWork(double cpuFreqDemand, double cpuFreqSupplied, long passedTime); + /** + * Calculate the remaining duration of this fragment based on the demanded and supplied cpu + * + * @param cpuFreqDemand + * @param cpuFreqSupplied + * @param remainingWork + * @return + */ long getRemainingDuration(double cpuFreqDemand, double cpuFreqSupplied, double remainingWork); + /** + * Calculate how much work is remaining based on the demanded and supplied cpu + * + * @param cpuFreqDemand + * @param duration + * @return + */ double getRemainingWork(double cpuFreqDemand, long duration); } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java index 54eb6636a..ff7ff1996 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/engine/graph/FlowDistributor.java @@ -39,13 +39,13 @@ public class FlowDistributor extends FlowNode implements FlowSupplier, FlowConsu private double currentIncomingSupply; // The current supply provided by the supplier private boolean outgoingDemandUpdateNeeded = false; - private final Set<Integer> updatedDemands = new HashSet<>(); // Array of consumers that updated their demand in this cycle + private final Set<Integer> updatedDemands = + new HashSet<>(); // Array of consumers that updated their demand in this cycle private boolean overloaded = false; private double capacity; // What is the max capacity. Can probably be removed - public FlowDistributor(FlowGraph graph) { super(graph); } From 1e19d37a66ee5ecc3dc31800484130695fc17cf0 Mon Sep 17 00:00:00 2001 From: DanteNiewenhuis <d.niewenhuis@hotmail.com> Date: Fri, 24 Jan 2025 13:10:28 +0100 Subject: [PATCH 5/6] updated wrapper-validation-action --- .github/workflows/build.yml | 2 +- .github/workflows/publish.yml | 2 +- .github/workflows/release.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 807c6d26a..71fb15a85 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,7 +22,7 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 - name: Validate Gradle wrapper - uses: gradle/wrapper-validation-action@v1 + uses: gradle/wrapper-validation-action@v3 - name: Set up JDK uses: actions/setup-java@v4 with: diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index e2b095503..660b7899b 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -90,7 +90,7 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 - name: Validate Gradle wrapper - uses: gradle/wrapper-validation-action@v1 + uses: gradle/wrapper-validation-action@v3 - name: Set up JDK uses: actions/setup-java@v3 with: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 5b3357f44..3079d25a0 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,7 +13,7 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 - name: Validate Gradle wrapper - uses: gradle/wrapper-validation-action@v1 + uses: gradle/wrapper-validation-action@v3 - name: Set up JDK uses: actions/setup-java@v3 with: From df3cdff6294fa6c19f8f3027b1ee0eceb42e8e5b Mon Sep 17 00:00:00 2001 From: DanteNiewenhuis <d.niewenhuis@hotmail.com> Date: Fri, 24 Jan 2025 13:13:01 +0100 Subject: [PATCH 6/6] Applied spotless --- .../experiments/base/experiment/specs/WorkloadSpec.kt | 8 ++++---- .../org/opendc/experiments/base/runner/ScenarioRunner.kt | 3 +-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt index 01a45788e..cf40d88d2 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/experiment/specs/WorkloadSpec.kt @@ -44,7 +44,7 @@ public data class WorkloadSpec( val type: WorkloadTypes, val sampleFraction: Double = 1.0, val submissionTime: String? = null, - val scalingPolicy: ScalingPolicyEnum = ScalingPolicyEnum.NoDelay + val scalingPolicy: ScalingPolicyEnum = ScalingPolicyEnum.NoDelay, ) { public val name: String = File(pathToFile).nameWithoutExtension @@ -73,7 +73,7 @@ public fun getWorkloadLoader( checkpointInterval: Long, checkpointDuration: Long, checkpointIntervalScaling: Double, - scalingPolicy: ScalingPolicy + scalingPolicy: ScalingPolicy, ): WorkloadLoader { return when (type) { WorkloadTypes.ComputeWorkload -> @@ -83,14 +83,14 @@ public fun getWorkloadLoader( checkpointInterval, checkpointDuration, checkpointIntervalScaling, - scalingPolicy + scalingPolicy, ) } } public enum class ScalingPolicyEnum { NoDelay, - Perfect + Perfect, } public fun getScalingPolicy(scalingPolicyEnum: ScalingPolicyEnum): ScalingPolicy { diff --git a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt index e706280c1..56278bf24 100644 --- a/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt +++ b/opendc-experiments/opendc-experiments-base/src/main/kotlin/org/opendc/experiments/base/runner/ScenarioRunner.kt @@ -35,7 +35,6 @@ import org.opendc.compute.topology.clusterTopology import org.opendc.experiments.base.experiment.Scenario import org.opendc.experiments.base.experiment.specs.getScalingPolicy import org.opendc.experiments.base.experiment.specs.getWorkloadLoader -import org.opendc.simulator.compute.workload.trace.scaling.NoDelayScaling import org.opendc.simulator.kotlin.runSimulation import java.io.File import java.time.Duration @@ -92,7 +91,7 @@ public fun runScenario( checkpointInterval, checkpointDuration, checkpointIntervalScaling, - scalingPolicy + scalingPolicy, ) val workload = workloadLoader.sampleByLoad(scenario.workloadSpec.sampleFraction)