From c3089499b7d34b92becf9e53e3f2f5404d60389d Mon Sep 17 00:00:00 2001 From: Sun Shun Date: Fri, 23 Sep 2022 17:25:33 +0800 Subject: [PATCH] Enable customized and isolated python environment for Pyspark --- linkis-dist/package/db/linkis_dml.sql | 4 ++ .../spark/config/SparkConfiguration.scala | 3 -- .../spark/executor/SparkPythonExecutor.scala | 51 ++++++++----------- 3 files changed, 24 insertions(+), 34 deletions(-) diff --git a/linkis-dist/package/db/linkis_dml.sql b/linkis-dist/package/db/linkis_dml.sql index b2472e81a09..e5dba789a27 100644 --- a/linkis-dist/package/db/linkis_dml.sql +++ b/linkis-dist/package/db/linkis_dml.sql @@ -77,6 +77,10 @@ INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.port', NULL, NULL, '4000', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark'); INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.user', NULL, NULL, 'root', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark'); INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.python.version', '取值范围:python2,python3', 'python版本','python2', 'OFT', '[\"python3\",\"python2\"]', '0', '0', '1', 'spark引擎设置', 'spark'); +INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.yarn.dist.archives', 'Comma separated list of archives to be extracted into the working directory of each executor.', NULL, NULL, 'None', NULL, '0', '0', '1', 'spark引擎设置', 'spark'); +INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.pyspark.python', 'Python binary executable to use for PySpark in both driver and executors.', NULL, NULL, 'None', NULL, '0', '0', '1', 'spark引擎设置', 'spark'); +INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.pyspark.driver.python', 'Python binary executable to use for PySpark in driver.', NULL, NULL, 'None', NULL, '0', '0', '1', 'spark引擎设置', 'spark'); + -- hive INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', '范围:1-20,单位:个', 'hive引擎最大并发数', '10', 'NumInterval', '[1,20]', '0', '0', '1', '队列资源', 'hive'); INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.memory', '取值范围:1-10,单位:G', 'hive引擎初始化内存大小','1g', 'Regex', '^([1-9]|10)(G|g)$', '0', '0', '1', 'hive引擎设置', 'hive'); diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala index 6faf30ce348..fb6f6ab5c4c 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala @@ -108,9 +108,6 @@ object SparkConfiguration extends Logging { val ENGINE_SHUTDOWN_LOGS = CommonVars("wds.linkis.spark.engineconn.fatal.log", "error writing class;OutOfMemoryError") - val PYSPARK_PYTHON3_PATH = - CommonVars[String]("pyspark.python3.path", "/appcom/Install/anaconda3/bin/python") - val ENABLE_REPLACE_PACKAGE_NAME = CommonVars("wds.linkis.spark.engine.scala.replace_package_header.enable", true) diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala index 1b421ad3c02..7ca4dd734d3 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SparkPythonExecutor.scala @@ -135,26 +135,25 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In private def initGateway = { // If the python version set by the user is obtained from the front end as python3, the environment variable of python3 is taken; otherwise, the default is python2 - logger.info( - s"spark.python.version => ${engineCreationContext.getOptions.get("spark.python.version")}" - ) - val userDefinePythonVersion = engineCreationContext.getOptions - .getOrDefault("spark.python.version", "python") - .toString - .toLowerCase() - val sparkPythonVersion = - if ( - StringUtils - .isNotBlank(userDefinePythonVersion) && userDefinePythonVersion.equals("python3") - ) { - SparkConfiguration.PYSPARK_PYTHON3_PATH.getValue - } else { - userDefinePythonVersion - } - val pythonExec = CommonVars("PYSPARK_DRIVER_PYTHON", sparkPythonVersion).getValue + logger.info(s"spark.python.version => ${engineCreationContext.getOptions.get("spark.python.version")}") + val userDefinePythonVersion = engineCreationContext.getOptions.getOrDefault("spark.python.version", "python").toString.toLowerCase() + val sparkPythonVersion = if (StringUtils.isNotBlank(userDefinePythonVersion)) userDefinePythonVersion else "python" + + // extra pyspark driver Python + val pySparkDriverPythonConf = "spark.pyspark.driver.python" + val userDefinePySparkDriverPython = sc.getConf.getOption(pySparkDriverPythonConf).getOrElse("") + val defaultPySparkDriverPython = CommonVars("PYSPARK_DRIVER_PYTHON", "").getValue + val pySparkDriverPython = if (StringUtils.isNotBlank(userDefinePySparkDriverPython)) userDefinePySparkDriverPython else defaultPySparkDriverPython + logger.info(s"PYSPARK_DRIVER_PYTHON => $pySparkDriverPython") + + // extra pyspark Python + val pySparkPythonConf = "spark.pyspark.python" + val userDefinePySparkPython = sc.getConf.getOption(pySparkPythonConf).getOrElse("") + val defaultPySparkPython = CommonVars("PYSPARK_PYTHON", "").getValue + val pySparkPython = if (StringUtils.isNotBlank(userDefinePySparkPython)) userDefinePySparkPython else defaultPySparkPython + logger.info(s"PYSPARK_PYTHON => $pySparkPython") val pythonScriptPath = CommonVars("python.script.path", "python/mix_pyspark.py").getValue - val port: Int = EngineUtils.findAvailPort gatewayServer = gwBuilder.entryPoint(this).javaPort(port).build() gatewayServer.start() @@ -166,6 +165,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In ) val pythonClasspath = new StringBuilder(pythonPath) + // extra spark files val files = sc.getConf.get("spark.files", "") logger.info(s"output spark files ${files}") if (StringUtils.isNotEmpty(files)) { @@ -184,7 +184,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In .filter(_.endsWith(".zip")) .foreach(pythonClasspath ++= File.pathSeparator ++= _) - val cmd = CommandLine.parse(pythonExec) + val cmd = CommandLine.parse(pySparkDriverPython) cmd.addArgument(createFakeShell(pythonScriptPath).getAbsolutePath, false) cmd.addArgument(port.toString, false) cmd.addArgument(EngineUtils.sparkSubmitVersion().replaceAll("\\.", ""), false) @@ -193,19 +193,8 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In cmd.addArgument(pyFiles, false) val builder = new ProcessBuilder(cmd.toStrings.toSeq.toList.asJava) - val env = builder.environment() - if (StringUtils.isBlank(sc.getConf.get("spark.pyspark.python", ""))) { - logger.info("spark.pyspark.python is null") - if (userDefinePythonVersion.equals("python3")) { - logger.info(s"userDefinePythonVersion is $pythonExec will be set to PYSPARK_PYTHON") - env.put("PYSPARK_PYTHON", pythonExec) - } - } else { - val executorPython = sc.getConf.get("spark.pyspark.python") - logger.info(s"set PYSPARK_PYTHON spark.pyspark.python is $executorPython") - env.put("PYSPARK_PYTHON", executorPython) - } + if (StringUtils.isNotBlank(pySparkPython)) env.put("PYSPARK_PYTHON", pySparkPython) env.put("PYTHONPATH", pythonClasspath.toString()) env.put("PYTHONUNBUFFERED", "YES") env.put("PYSPARK_GATEWAY_PORT", "" + port)