diff --git a/scripts/start_cluster.py b/scripts/start_cluster.py index 1c7fece..193cfc0 100644 --- a/scripts/start_cluster.py +++ b/scripts/start_cluster.py @@ -95,6 +95,15 @@ def _get_mem_from_cmd(cmd: str, divide_by: int) -> float: return min(host_mem_gb, container_mem_gb) +def mem_to_str(mem_gb: float) -> str: + """ + Convert memory in GB to a string in GB or MB with the appropriate symbol. + """ + if round(mem_gb) > 0 and mem_gb % round(mem_gb) <= 0.01: + return f"{int(mem_gb)}GB" + return f"{int(mem_gb * 1024)}MB" + + @dataclass class MemoryConfig: """ @@ -155,6 +164,13 @@ def as_gb(self) -> float: return self.min return max(self.pct * total_mem_gb(), self.min) + def to_str(self) -> str: + """ + The memory configuration parsed as a string with the "GB" symbol, if + the memory is greater than 1GB, or "MB" otherwise. + """ + return mem_to_str(self.as_gb()) + class ClusterSpec(TypedDict): """ @@ -222,21 +238,19 @@ def calculate(self) -> ClusterSpec: return ClusterSpec( driver_cores=self.driver_cores, - driver_memory=f"{round(self.driver_mem.as_gb(), 1)}G", + driver_memory=self.driver_mem.to_str(), worker_cores=max(remaining_cores, executor_cores), - worker_memory=f"{round(remaining_mem, 1)}G", + worker_memory=mem_to_str(remaining_mem), executor_instances=num_executors, executor_cores=executor_cores, - executor_memory=f"{round(executor_mem, 1)}G", + executor_memory=mem_to_str(executor_mem), ) - def _apply_to_conf(self) -> None: + def _apply_to_conf(self, spark_conf: ClusterSpec) -> None: """ Idempotently apply the cluster specifications to the conf file. """ - # Calculated configs plus the default auto scale configs, if set. - spark_conf = self.calculate() if self.auto_scale: max_executors = spark_conf["executor_instances"] spark_conf = { @@ -264,31 +278,31 @@ def start(self) -> None: """ Start the calculated cluster. """ - self._apply_to_conf() + conf = self.calculate() + self._apply_to_conf(conf) + start_prefix = "/opt/spark/sbin/start" print(f"Starting master...") subprocess.run([f"{start_prefix}-master.sh"], check=True) - print(f"Starting worker...") - subprocess.run([f"{start_prefix}-worker.sh", "spark://0.0.0.0:7077"], check=True) - conf = self.calculate() - def fmt_mem( - name: Literal['driver_memory', 'worker_memory', 'executor_memory'], - ) -> str: - return f"{conf[name].replace('G', '')} GB RAM" + print(f"Starting worker...") + worker_command = [f"{start_prefix}-worker.sh", "spark://0.0.0.0:7077"] + worker_mem = ["--memory", conf["worker_memory"]] + worker_cores = ["--cores", str(conf["worker_cores"])] + subprocess.run([*worker_command, *worker_mem, *worker_cores], check=True) print( f"# \n" f"# Spark standalone cluster started with success.\n" f"# The cluster have been configured to preserve {self.free_cores} core(s) " - f"and {round(self.free_mem.as_gb(), 1)}GB of memory for \n" + f"and {self.free_mem.to_str()} of memory for \n" f"# the OS and other apps, using the following resources:\n" f"# \n" - f"# * Driver: {conf['driver_cores']} cores / {fmt_mem('driver_memory')}\n" - f"# * Worker: {conf['worker_cores']} cores / {fmt_mem('worker_memory')}\n" + f"# * Driver: {conf['driver_cores']} cores / {conf['driver_memory']} RAM\n" + f"# * Worker: {conf['worker_cores']} cores / {conf['worker_memory']} RAM\n" f"# \n" f"# The worker will spawn {conf['executor_instances']} executor(s) with " - f"{conf['executor_cores']} cores and {fmt_mem('executor_memory')}. " + f"{conf['executor_cores']} cores and {conf['executor_memory']} RAM. " f"{'Auto scale is enabled.' if self.auto_scale else ''}\n" f"# " )