forked from sakshamgarg/spark-pbspro-connector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path0001-Add-PBS-Pro-as-an-external-resource-manager.patch
182 lines (172 loc) · 7.82 KB
/
0001-Add-PBS-Pro-as-an-external-resource-manager.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
From 7825997f76727bfc04636d47a1aa604af792181a Mon Sep 17 00:00:00 2001
From: Saksham Garg <[email protected]>
Date: Sat, 4 Jan 2020 14:58:06 +0530
Subject: [PATCH] Add PBS Pro as an external resource manager
---
assembly/pom.xml | 10 ++++
.../org/apache/spark/deploy/SparkSubmit.scala | 55 ++++++++++++++++++----
.../spark/launcher/AbstractCommandBuilder.java | 1 +
pom.xml | 7 +++
4 files changed, 64 insertions(+), 9 deletions(-)
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 68ebfad..86115ad 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -149,6 +149,16 @@
</dependencies>
</profile>
<profile>
+ <id>pbs</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-pbs_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
<id>kubernetes</id>
<dependencies>
<dependency>
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 8a03af5..720ee4b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -234,8 +234,9 @@ private[spark] class SparkSubmit extends Logging {
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
+ case m if m.startsWith("pbs") => PBS
case _ =>
- error("Master must either be yarn or start with spark, mesos, k8s, or local")
+ error("Master must either be yarn or start with spark, mesos, k8s, pbs or local")
-1
}
@@ -267,6 +268,14 @@ private[spark] class SparkSubmit extends Logging {
}
}
+ if (clusterManager == PBS) {
+ if (!Utils.classIsLoadable(PBS_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
+ error(
+ "Could not load PBS classes. " +
+ "This copy of Spark may not have been compiled with PBS support.")
+ }
+ }
+
// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
@@ -299,6 +308,8 @@ private[spark] class SparkSubmit extends Logging {
val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
val isKubernetesClusterModeDriver = isKubernetesClient &&
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
+ val isPbsClient = clusterManager == PBS && deployMode == CLIENT
+ val isPbsCluster = clusterManager == PBS && deployMode == CLUSTER
if (!isMesosCluster && !isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
@@ -596,20 +607,20 @@ private[spark] class SparkSubmit extends Logging {
// Other options
OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = EXECUTOR_INSTANCES.key),
- OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
+ OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES | PBS, ALL_DEPLOY_MODES,
confKey = EXECUTOR_CORES.key),
- OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
+ OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES | PBS, ALL_DEPLOY_MODES,
confKey = EXECUTOR_MEMORY.key),
- OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
+ OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES | PBS, ALL_DEPLOY_MODES,
confKey = CORES_MAX.key),
- OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
+ OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES | PBS, ALL_DEPLOY_MODES,
confKey = FILES.key),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key),
- OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
+ OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES | PBS, ALL_DEPLOY_MODES,
confKey = JARS.key),
- OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
+ OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES | PBS, CLUSTER,
confKey = DRIVER_MEMORY.key),
- OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
+ OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES | PBS, CLUSTER,
confKey = DRIVER_CORES.key),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
confKey = DRIVER_SUPERVISE.key),
@@ -775,6 +786,29 @@ private[spark] class SparkSubmit extends Logging {
}
}
+ if (isPbsCluster) {
+ childMainClass = PBS_CLUSTER_SUBMIT_CLASS
+ if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
+ if (args.isPython) {
+ childArgs ++= Array("--primary-py-file", args.primaryResource)
+ childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
+ if (args.pyFiles != null) {
+ childArgs ++= Array("--other-py-files", args.pyFiles)
+ }
+ } else {
+ childArgs ++= Array("--primary-java-resource", args.primaryResource)
+ childArgs ++= Array("--main-class", args.mainClass)
+ }
+ } else {
+ childArgs ++= Array("--main-class", args.mainClass)
+ }
+ if (args.childArgs != null) {
+ args.childArgs.foreach { arg =>
+ childArgs += ("--arg", arg)
+ }
+ }
+ }
+
// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
@@ -958,7 +992,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private val MESOS = 4
private val LOCAL = 8
private val KUBERNETES = 16
- private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES
+ private val PBS = 32
+ private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES | PBS
// Deploy modes
private val CLIENT = 1
@@ -981,6 +1016,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
+ private[deploy] val PBS_CLUSTER_SUBMIT_CLASS =
+ "org.apache.spark.deploy.pbs.PbsClusterApplication"
override def main(args: Array[String]): Unit = {
val submit = new SparkSubmit() {
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index 3ae4633..e9ec28f 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -163,6 +163,7 @@ abstract class AbstractCommandBuilder {
"repl",
"resource-managers/mesos",
"resource-managers/yarn",
+ "resource-managers/pbs",
"sql/catalyst",
"sql/core",
"sql/hive",
diff --git a/pom.xml b/pom.xml
index e72fcd9..6b62a7d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2999,6 +2999,13 @@
</profile>
<profile>
+ <id>pbs</id>
+ <modules>
+ <module>resource-managers/pbs</module>
+ </modules>
+ </profile>
+
+ <profile>
<id>hive-thriftserver</id>
<modules>
<module>sql/hive-thriftserver</module>
--
1.8.3.1