\x20\t\r\n\f]*)[^>]*)\/>/gi,Aa=/
+
+
+ {title}
+
+
+ def navBar(pageName: String): Seq[Node] =
+
+
+ def createPage(pageName: String, pageContents: Seq[Node]): Seq[Node] =
+
+ {getHeader("Livy - " + pageName)}
+
+
+ {navBar(pageName)}
+ {pageContents}
+
+
+
+
+ get("/") {
+ val content =
+
+
+ createPage("Sessions", content)
+ }
+}
diff --git a/server/src/main/scala/com/cloudera/livy/sessions/Session.scala b/server/src/main/scala/com/cloudera/livy/sessions/Session.scala
index faf342262..a0d0f8b10 100644
--- a/server/src/main/scala/com/cloudera/livy/sessions/Session.scala
+++ b/server/src/main/scala/com/cloudera/livy/sessions/Session.scala
@@ -197,7 +197,6 @@ abstract class Session(val id: Int, val owner: String, val livyConf: LivyConf)
}
}
- val timeout: Long = TimeUnit.HOURS.toNanos(1)
override def toString(): String = s"${this.getClass.getSimpleName} $id"
diff --git a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala b/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
index ac0561fb6..edf61c704 100644
--- a/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
+++ b/server/src/main/scala/com/cloudera/livy/sessions/SessionManager.scala
@@ -29,14 +29,13 @@ import scala.util.control.NonFatal
import com.cloudera.livy.{LivyConf, Logging}
import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
-import com.cloudera.livy.server.interactive.{InteractiveRecoveryMetadata, InteractiveSession}
+import com.cloudera.livy.server.interactive.{InteractiveRecoveryMetadata, InteractiveSession, SessionHeartbeatWatchdog}
import com.cloudera.livy.server.recovery.SessionStore
import com.cloudera.livy.sessions.Session.RecoveryMetadata
object SessionManager {
val SESSION_RECOVERY_MODE_OFF = "off"
val SESSION_RECOVERY_MODE_RECOVERY = "recovery"
- val SESSION_TIMEOUT = LivyConf.Entry("livy.server.session.timeout", "1h")
}
class BatchSessionManager(
@@ -56,9 +55,13 @@ class InteractiveSessionManager(
sessionStore,
"interactive",
mockSessions)
+ with SessionHeartbeatWatchdog[InteractiveSession, InteractiveRecoveryMetadata]
+ {
+ start()
+ }
class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
- livyConf: LivyConf,
+ protected val livyConf: LivyConf,
sessionRecovery: R => S,
sessionStore: SessionStore,
sessionType: String,
@@ -72,8 +75,11 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
protected[this] final val idCounter = new AtomicInteger(0)
protected[this] final val sessions = mutable.LinkedHashMap[Int, S]()
+ private[this] final val sessionTimeoutCheck = livyConf.getBoolean(LivyConf.SESSION_TIMEOUT_CHECK)
private[this] final val sessionTimeout =
- TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(SessionManager.SESSION_TIMEOUT))
+ TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_TIMEOUT))
+ private[this] final val sessionStateRetainedInSec =
+ TimeUnit.MILLISECONDS.toNanos(livyConf.getTimeAsMs(LivyConf.SESSION_STATE_RETAIN_TIME))
mockSessions.getOrElse(recover()).foreach(register)
new GarbageCollector().start()
@@ -128,8 +134,20 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
def collectGarbage(): Future[Iterable[Unit]] = {
def expired(session: Session): Boolean = {
- val currentTime = System.nanoTime()
- currentTime - session.lastActivity > math.max(sessionTimeout, session.timeout)
+ session.state match {
+ case s: FinishedSessionState =>
+ val currentTime = System.nanoTime()
+ currentTime - s.time > sessionStateRetainedInSec
+ case _ =>
+ if (!sessionTimeoutCheck) {
+ false
+ } else if (session.isInstanceOf[BatchSession]) {
+ false
+ } else {
+ val currentTime = System.nanoTime()
+ currentTime - session.lastActivity > sessionTimeout
+ }
+ }
}
Future.sequence(all().filter(expired).map(delete))
diff --git a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
index d54bfb0bf..a8949aff0 100644
--- a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
+++ b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
@@ -39,7 +39,6 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
for (line <- lines) {
_lock.lock()
try {
- trace("stdout: ", line)
_lines = _lines :+ line
_condition.signalAll()
} finally {
@@ -47,6 +46,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
}
}
+ _lines.map { line => info("stdout: ", line) }
_lock.lock()
try {
_finished = true
diff --git a/server/src/main/scala/com/cloudera/livy/utils/LivySparkUtils.scala b/server/src/main/scala/com/cloudera/livy/utils/LivySparkUtils.scala
index 2a7391f1f..766eb91ee 100644
--- a/server/src/main/scala/com/cloudera/livy/utils/LivySparkUtils.scala
+++ b/server/src/main/scala/com/cloudera/livy/utils/LivySparkUtils.scala
@@ -32,6 +32,8 @@ object LivySparkUtils extends Logging {
// For each Spark version we supported, we need to add this mapping relation in case Scala
// version cannot be detected from "spark-submit --version".
private val _defaultSparkScalaVersion = SortedMap(
+ // Spark 2.1 + Scala 2.11
+ (2, 1) -> "2.11",
// Spark 2.0 + Scala 2.11
(2, 0) -> "2.11",
// Spark 1.6 + Scala 2.10
@@ -40,7 +42,7 @@ object LivySparkUtils extends Logging {
// Supported Spark version
private val MIN_VERSION = (1, 6)
- private val MAX_VERSION = (2, 1)
+ private val MAX_VERSION = (2, 2)
private val sparkVersionRegex = """version (.*)""".r.unanchored
private val scalaVersionRegex = """Scala version (.*), Java""".r.unanchored
@@ -143,11 +145,11 @@ object LivySparkUtils extends Logging {
* @return Two element tuple, one is major version and the other is minor version
*/
def formatSparkVersion(version: String): (Int, Int) = {
- val versionPattern = """(\d)+\.(\d)+(?:[\.-]\d*)*""".r
- version match {
- case versionPattern(major, minor) =>
- (major.toInt, minor.toInt)
- case _ =>
+ val versionPattern = """^(\d+)\.(\d+)(\..*)?$""".r
+ versionPattern.findFirstMatchIn(version) match {
+ case Some(m) =>
+ (m.group(1).toInt, m.group(2).toInt)
+ case None =>
throw new IllegalArgumentException(s"Fail to parse Spark version from $version")
}
}
diff --git a/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala b/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala
index 28ed82d10..fb47e5e4b 100644
--- a/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala
@@ -37,6 +37,15 @@ import com.cloudera.livy.{LivyConf, Logging, Utils}
import com.cloudera.livy.util.LineBufferedProcess
object SparkYarnApp extends Logging {
+
+ def init(livyConf: LivyConf): Unit = {
+ sessionLeakageCheckInterval = livyConf.getTimeAsMs(LivyConf.YARN_APP_LEAKAGE_CHECK_INTERVAL)
+ sessionLeakageCheckTimeout = livyConf.getTimeAsMs(LivyConf.YARN_APP_LEAKAGE_CHECK_TIMEOUT)
+ leakedAppsGCThread.setDaemon(true)
+ leakedAppsGCThread.setName("LeakedAppsGCThread")
+ leakedAppsGCThread.start()
+ }
+
// YarnClient is thread safe. Create once, share it across threads.
lazy val yarnClient = {
val c = YarnClient.createYarnClient()
@@ -50,6 +59,47 @@ object SparkYarnApp extends Logging {
private def getYarnPollInterval(livyConf: LivyConf): FiniteDuration =
livyConf.getTimeAsMs(LivyConf.YARN_POLL_INTERVAL) milliseconds
+
+ private val appType = Set("SPARK").asJava
+
+ private val leakedAppTags = new java.util.concurrent.ConcurrentHashMap[String, Long]()
+
+ private var sessionLeakageCheckTimeout: Long = _
+
+ private var sessionLeakageCheckInterval: Long = _
+
+ private val leakedAppsGCThread = new Thread() {
+ override def run(): Unit = {
+ while (true) {
+ if (!leakedAppTags.isEmpty) {
+ // kill the app if found it and remove it if exceeding a threashold
+ val iter = leakedAppTags.entrySet().iterator()
+ var isRemoved = false
+ val now = System.currentTimeMillis()
+ val apps = yarnClient.getApplications(appType).asScala
+ while(iter.hasNext) {
+ val entry = iter.next()
+ apps.find(_.getApplicationTags.contains(entry.getKey))
+ .foreach({ e =>
+ info(s"Kill leaked app ${e.getApplicationId}")
+ yarnClient.killApplication(e.getApplicationId)
+ iter.remove()
+ isRemoved = true
+ })
+ if (!isRemoved) {
+ if ((entry.getValue - now) > sessionLeakageCheckTimeout) {
+ iter.remove()
+ info(s"Remove leaked yarn app tag ${entry.getKey}")
+ }
+ }
+ }
+ }
+ Thread.sleep(sessionLeakageCheckInterval)
+ }
+ }
+ }
+
+
}
/**
@@ -78,7 +128,9 @@ class SparkYarnApp private[utils] (
private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
override def log(): IndexedSeq[String] =
- process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++ yarnDiagnostics
+ ("stdout: " +: process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String])) ++
+ ("\nstderr: " +: process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String])) ++
+ ("\nYARN Diagnostics: " +: yarnDiagnostics)
override def kill(): Unit = synchronized {
if (isRunning) {
@@ -122,13 +174,16 @@ class SparkYarnApp private[utils] (
// FIXME Should not loop thru all YARN applications but YarnClient doesn't offer an API.
// Consider calling rmClient in YarnClient directly.
- val appType = Set("SPARK").asJava
yarnClient.getApplications(appType).asScala.find(_.getApplicationTags.contains(appTagLowerCase))
match {
case Some(app) => app.getApplicationId
case None =>
if (deadline.isOverdue) {
- throw new Exception(s"No YARN application is tagged with $appTagLowerCase.")
+ process.foreach(_.destroy())
+ leakedAppTags.put(appTag, System.currentTimeMillis())
+ throw new Exception(s"No YARN application is found with tag $appTagLowerCase in " +
+ livyConf.getTimeAsMs(LivyConf.YARN_APP_LOOKUP_TIMEOUT)/1000 + " seconds. " +
+ "Please check your cluster status, it is may be very busy.")
} else {
Clock.sleep(pollInterval.toMillis)
getAppIdFromTag(appTagLowerCase, pollInterval, deadline)
diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/BaseInteractiveServletSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/BaseInteractiveServletSpec.scala
index 4aa16c1ce..f08e09ea4 100644
--- a/server/src/test/scala/com/cloudera/livy/server/interactive/BaseInteractiveServletSpec.scala
+++ b/server/src/test/scala/com/cloudera/livy/server/interactive/BaseInteractiveServletSpec.scala
@@ -50,7 +50,7 @@ abstract class BaseInteractiveServletSpec
}
super.createConf()
.set(LivyConf.SESSION_STAGING_DIR, tempDir.toURI().toString())
- .set(InteractiveSession.LIVY_REPL_JARS, "")
+ .set(LivyConf.REPL_JARS, "dummy.jar")
.set(LivyConf.LIVY_SPARK_VERSION, "1.6.0")
.set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10.5")
}
diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala
index 0a3fb99fc..63d605d98 100644
--- a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala
+++ b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionServletSpec.scala
@@ -19,12 +19,11 @@
package com.cloudera.livy.server.interactive
import java.util.concurrent.atomic.AtomicInteger
-import javax.servlet.http.HttpServletRequest
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import scala.collection.JavaConverters._
import scala.concurrent.Future
-import org.json4s.JsonAST._
import org.json4s.jackson.Json4sScalaModule
import org.mockito.Matchers._
import org.mockito.Mockito._
@@ -79,6 +78,14 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
statement
}
})
+ when(session.cancelStatement(anyInt())).thenAnswer(
+ new Answer[Unit] {
+ override def answer(args: InvocationOnMock): Unit = {
+ statements = IndexedSeq(
+ new Statement(statementCounter.get(), StatementState.Cancelled, null))
+ }
+ }
+ )
session
}
@@ -114,6 +121,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
jpost[Map[String, Any]]("/0/statements", ExecuteRequest("foo")) { data =>
data("id") should be (0)
+ data("progress") should be (0.0)
data("output") shouldBe 1
}
@@ -122,6 +130,15 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
data("statements").asInstanceOf[Seq[Map[String, Any]]](0)("id") should be (0)
}
+ jpost[Map[String, Any]]("/0/statements/0/cancel", null, HttpServletResponse.SC_OK) { data =>
+ data should equal(Map("msg" -> "canceled"))
+ }
+
+ jget[Map[String, Any]]("/0/statements") { data =>
+ data("total_statements") should be (1)
+ data("statements").asInstanceOf[Seq[Map[String, Any]]](0)("state") should be ("cancelled")
+ }
+
jdelete[Map[String, Any]]("/0") { data =>
data should equal (Map("msg" -> "deleted"))
diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala
index c5fd5925d..28d715781 100644
--- a/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala
+++ b/server/src/test/scala/com/cloudera/livy/server/interactive/InteractiveSessionSpec.scala
@@ -45,7 +45,7 @@ class InteractiveSessionSpec extends FunSpec
with Matchers with BeforeAndAfterAll with LivyBaseUnitTestSuite {
private val livyConf = new LivyConf()
- livyConf.set(InteractiveSession.LIVY_REPL_JARS, "")
+ livyConf.set(LivyConf.REPL_JARS, "dummy.jar")
.set(LivyConf.LIVY_SPARK_VERSION, "1.6.0")
.set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10.5")
@@ -76,7 +76,7 @@ class InteractiveSessionSpec extends FunSpec
val id = session.executeStatement(ExecuteRequest(code)).id
eventually(timeout(30 seconds), interval(100 millis)) {
val s = session.getStatement(id).get
- s.state shouldBe StatementState.Available
+ s.state.get() shouldBe StatementState.Available
parse(s.output)
}
}
@@ -100,6 +100,61 @@ class InteractiveSessionSpec extends FunSpec
}
describe("A spark session") {
+
+ it("should get scala version matched jars with livy.repl.jars") {
+ val testedJars = Seq(
+ "test_2.10-0.1.jar",
+ "local://dummy-path/test/test1_2.10-1.0.jar",
+ "file:///dummy-path/test/test2_2.11-1.0-SNAPSHOT.jar",
+ "hdfs:///dummy-path/test/test3.jar",
+ "non-jar",
+ "dummy.jar"
+ )
+ val livyConf = new LivyConf(false)
+ .set(LivyConf.REPL_JARS, testedJars.mkString(","))
+ .set(LivyConf.LIVY_SPARK_VERSION, "1.6.2")
+ .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10")
+ val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf)
+ assert(properties(LivyConf.SPARK_JARS).split(",").toSet === Set("test_2.10-0.1.jar",
+ "local://dummy-path/test/test1_2.10-1.0.jar",
+ "hdfs:///dummy-path/test/test3.jar",
+ "dummy.jar"))
+
+ livyConf.set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.11")
+ val properties1 = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf)
+ assert(properties1(LivyConf.SPARK_JARS).split(",").toSet === Set(
+ "file:///dummy-path/test/test2_2.11-1.0-SNAPSHOT.jar",
+ "hdfs:///dummy-path/test/test3.jar",
+ "dummy.jar"))
+ }
+
+
+ it("should set rsc jars through livy conf") {
+ val rscJars = Set(
+ "dummy.jar",
+ "local:///dummy-path/dummy1.jar",
+ "file:///dummy-path/dummy2.jar",
+ "hdfs:///dummy-path/dummy3.jar")
+ val livyConf = new LivyConf(false)
+ .set(LivyConf.REPL_JARS, "dummy.jar")
+ .set(LivyConf.RSC_JARS, rscJars.mkString(","))
+ .set(LivyConf.LIVY_SPARK_VERSION, "1.6.2")
+ .set(LivyConf.LIVY_SPARK_SCALA_VERSION, "2.10")
+ val properties = InteractiveSession.prepareBuilderProp(Map.empty, Spark(), livyConf)
+ // if livy.rsc.jars is configured in LivyConf, it should be passed to RSCConf.
+ properties(RSCConf.Entry.LIVY_JARS.key()).split(",").toSet === rscJars
+
+ val rscJars1 = Set(
+ "foo.jar",
+ "local:///dummy-path/foo1.jar",
+ "file:///dummy-path/foo2.jar",
+ "hdfs:///dummy-path/foo3.jar")
+ val properties1 = InteractiveSession.prepareBuilderProp(
+ Map(RSCConf.Entry.LIVY_JARS.key() -> rscJars1.mkString(",")), Spark(), livyConf)
+ // if rsc jars are configured both in LivyConf and RSCConf, RSCConf should take precedence.
+ properties1(RSCConf.Entry.LIVY_JARS.key()).split(",").toSet === rscJars1
+ }
+
it("should start in the idle state") {
session = createSession()
session.state should (be(a[SessionState.Starting]) or be(a[SessionState.Idle]))
@@ -149,15 +204,32 @@ class InteractiveSessionSpec extends FunSpec
))
result should equal (expectedResult)
- session.state shouldBe a[SessionState.Idle]
+ eventually(timeout(10 seconds), interval(30 millis)) {
+ session.state shouldBe a[SessionState.Idle]
+ }
+ }
+
+ withSession("should get statement progress along with statement result") { session =>
+ val code =
+ """
+ |from time import sleep
+ |sleep(3)
+ """.stripMargin
+ val statement = session.executeStatement(ExecuteRequest(code))
+ statement.progress should be (0.0)
+
+ eventually(timeout(10 seconds), interval(100 millis)) {
+ val s = session.getStatement(statement.id).get
+ s.state.get() shouldBe StatementState.Available
+ s.progress should be (1.0)
+ }
}
withSession("should error out the session if the interpreter dies") { session =>
- executeStatement("import os; os._exit(666)")
- (session.state match {
- case SessionState.Error(_) => true
- case _ => false
- }) should equal(true)
+ session.executeStatement(ExecuteRequest("import os; os._exit(666)"))
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ session.state shouldBe a[SessionState.Error]
+ }
}
}
@@ -168,7 +240,8 @@ class InteractiveSessionSpec extends FunSpec
val mockClient = mock[RSCClient]
when(mockClient.submit(any(classOf[PingJob]))).thenReturn(mock[JobHandle[Void]])
val m =
- InteractiveRecoveryMetadata(78, None, "appTag", Spark(), null, None, Some(URI.create("")))
+ InteractiveRecoveryMetadata(
+ 78, None, "appTag", Spark(), 0, null, None, Some(URI.create("")))
val s = InteractiveSession.recover(m, conf, sessionStore, None, Some(mockClient))
s.state shouldBe a[SessionState.Recovering]
@@ -181,7 +254,8 @@ class InteractiveSessionSpec extends FunSpec
it("should recover session to dead state if rscDriverUri is unknown") {
val conf = new LivyConf()
val sessionStore = mock[SessionStore]
- val m = InteractiveRecoveryMetadata(78, Some("appId"), "appTag", Spark(), null, None, None)
+ val m = InteractiveRecoveryMetadata(
+ 78, Some("appId"), "appTag", Spark(), 0, null, None, None)
val s = InteractiveSession.recover(m, conf, sessionStore, None)
s.state shouldBe a[SessionState.Dead]
diff --git a/server/src/test/scala/com/cloudera/livy/server/interactive/SessionHeartbeatSpec.scala b/server/src/test/scala/com/cloudera/livy/server/interactive/SessionHeartbeatSpec.scala
new file mode 100644
index 000000000..0526a2a9a
--- /dev/null
+++ b/server/src/test/scala/com/cloudera/livy/server/interactive/SessionHeartbeatSpec.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.livy.server.interactive
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+import org.mockito.Mockito.{never, verify, when}
+import org.scalatest.{FunSpec, Matchers}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.mock.MockitoSugar.mock
+
+import com.cloudera.livy.LivyConf
+import com.cloudera.livy.server.recovery.SessionStore
+import com.cloudera.livy.sessions.{Session, SessionManager}
+import com.cloudera.livy.sessions.Session.RecoveryMetadata
+
+class SessionHeartbeatSpec extends FunSpec with Matchers {
+ describe("SessionHeartbeat") {
+ class TestHeartbeat(override val heartbeatTimeout: FiniteDuration) extends SessionHeartbeat {}
+
+ it("should not expire if heartbeat was never called.") {
+ val t = new TestHeartbeat(Duration.Zero)
+ t.heartbeatExpired shouldBe false
+ }
+
+ it("should expire if time has elapsed.") {
+ val t = new TestHeartbeat(Duration.fromNanos(1))
+ t.heartbeat()
+ eventually(timeout(2 nano), interval(1 nano)) {
+ t.heartbeatExpired shouldBe true
+ }
+ }
+
+ it("should not expire if time hasn't elapsed.") {
+ val t = new TestHeartbeat(Duration.create(1, DAYS))
+ t.heartbeat()
+ t.heartbeatExpired shouldBe false
+ }
+ }
+
+ describe("SessionHeartbeatWatchdog") {
+ abstract class TestSession extends Session(0, null, null) with SessionHeartbeat {}
+ class TestWatchdog(conf: LivyConf)
+ extends SessionManager[TestSession, RecoveryMetadata](
+ conf,
+ { _ => assert(false).asInstanceOf[TestSession] },
+ mock[SessionStore],
+ "test",
+ Some(Seq.empty))
+ with SessionHeartbeatWatchdog[TestSession, RecoveryMetadata] {}
+
+ it("should delete only expired sessions") {
+ val expiredSession: TestSession = mock[TestSession]
+ when(expiredSession.id).thenReturn(0)
+ when(expiredSession.heartbeatExpired).thenReturn(true)
+
+ val nonExpiredSession: TestSession = mock[TestSession]
+ when(nonExpiredSession.id).thenReturn(1)
+ when(nonExpiredSession.heartbeatExpired).thenReturn(false)
+
+ val n = new TestWatchdog(new LivyConf())
+
+ n.register(expiredSession)
+ n.register(nonExpiredSession)
+ n.deleteExpiredSessions()
+
+ verify(expiredSession).stop()
+ verify(nonExpiredSession, never).stop()
+ }
+ }
+}
diff --git a/server/src/test/scala/com/cloudera/livy/server/recovery/FileSystemStateStoreSpec.scala b/server/src/test/scala/com/cloudera/livy/server/recovery/FileSystemStateStoreSpec.scala
index 935911b02..2b90a52e4 100644
--- a/server/src/test/scala/com/cloudera/livy/server/recovery/FileSystemStateStoreSpec.scala
+++ b/server/src/test/scala/com/cloudera/livy/server/recovery/FileSystemStateStoreSpec.scala
@@ -82,9 +82,9 @@ class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
}
test("600")
test("400")
- test("777")
- test("770")
- test("707")
+ test("677")
+ test("670")
+ test("607")
}
it("set should write with an intermediate file") {
diff --git a/server/src/test/scala/com/cloudera/livy/sessions/MockSession.scala b/server/src/test/scala/com/cloudera/livy/sessions/MockSession.scala
index dba60bf06..c276a5eb4 100644
--- a/server/src/test/scala/com/cloudera/livy/sessions/MockSession.scala
+++ b/server/src/test/scala/com/cloudera/livy/sessions/MockSession.scala
@@ -31,7 +31,5 @@ class MockSession(id: Int, owner: String, conf: LivyConf) extends Session(id, ow
override def state: SessionState = SessionState.Idle()
- override val timeout: Long = 0L
-
override def recoveryMetadata: RecoveryMetadata = RecoveryMetadata(0)
}
diff --git a/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala b/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
index 8195bf8f6..79f248269 100644
--- a/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
+++ b/server/src/test/scala/com/cloudera/livy/sessions/SessionManagerSpec.scala
@@ -23,21 +23,24 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.{Failure, Try}
-import org.mockito.Mockito.{never, verify, when}
+import org.mockito.Mockito.{doReturn, never, verify, when}
import org.scalatest.{FunSpec, Matchers}
import org.scalatest.concurrent.Eventually._
import org.scalatest.mock.MockitoSugar.mock
import com.cloudera.livy.{LivyBaseUnitTestSuite, LivyConf}
import com.cloudera.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
+import com.cloudera.livy.server.interactive.InteractiveSession
import com.cloudera.livy.server.recovery.SessionStore
import com.cloudera.livy.sessions.Session.RecoveryMetadata
class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuite {
+ implicit def executor: ExecutionContext = ExecutionContext.global
+
describe("SessionManager") {
it("should garbage collect old sessions") {
val livyConf = new LivyConf()
- livyConf.set(SessionManager.SESSION_TIMEOUT, "100ms")
+ livyConf.set(LivyConf.SESSION_TIMEOUT, "100ms")
val manager = new SessionManager[MockSession, RecoveryMetadata](
livyConf,
{ _ => assert(false).asInstanceOf[MockSession] },
@@ -51,6 +54,59 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
manager.get(session.id) should be(None)
}
}
+
+ it("batch session should not be gc-ed until application is finished") {
+ val sessionId = 24
+ val session = mock[BatchSession]
+ when(session.id).thenReturn(sessionId)
+ when(session.stop()).thenReturn(Future {})
+ when(session.lastActivity).thenReturn(System.nanoTime())
+
+ val conf = new LivyConf().set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
+ val sm = new BatchSessionManager(conf, mock[SessionStore], Some(Seq(session)))
+ testSessionGC(session, sm)
+ }
+
+ it("interactive session should not gc-ed if session timeout check is off") {
+ val sessionId = 24
+ val session = mock[InteractiveSession]
+ when(session.id).thenReturn(sessionId)
+ when(session.stop()).thenReturn(Future {})
+ when(session.lastActivity).thenReturn(System.nanoTime())
+
+ val conf = new LivyConf().set(LivyConf.SESSION_TIMEOUT_CHECK, false)
+ .set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
+ val sm = new InteractiveSessionManager(conf, mock[SessionStore], Some(Seq(session)))
+ testSessionGC(session, sm)
+ }
+
+ def testSessionGC(session: Session, sm: SessionManager[_, _]): Unit = {
+
+ def changeStateAndCheck(s: SessionState)(fn: SessionManager[_, _] => Unit): Unit = {
+ doReturn(s).when(session).state
+ Await.result(sm.collectGarbage(), Duration.Inf)
+ fn(sm)
+ }
+
+ // Batch session should not be gc-ed when alive
+ for (s <- Seq(SessionState.Running(),
+ SessionState.Idle(),
+ SessionState.Recovering(),
+ SessionState.NotStarted(),
+ SessionState.Busy(),
+ SessionState.ShuttingDown())) {
+ changeStateAndCheck(s) { sm => sm.get(session.id) should be (Some(session)) }
+ }
+
+ // Stopped session should be gc-ed after retained timeout
+ for (s <- Seq(SessionState.Error(),
+ SessionState.Success(),
+ SessionState.Dead())) {
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ changeStateAndCheck(s) { sm => sm.get(session.id) should be (None) }
+ }
+ }
+ }
}
describe("BatchSessionManager") {
diff --git a/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala b/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala
index 528ea6524..9981fa241 100644
--- a/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala
+++ b/server/src/test/scala/com/cloudera/livy/utils/LivySparkUtilsSuite.scala
@@ -47,6 +47,7 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu
test("should support Spark 1.6") {
testSparkVersion("1.6.0")
testSparkVersion("1.6.1")
+ testSparkVersion("1.6.1-SNAPSHOT")
testSparkVersion("1.6.2")
testSparkVersion("1.6")
testSparkVersion("1.6.3.2.5.0-12")
@@ -56,15 +57,19 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu
testSparkVersion("2.0.0")
testSparkVersion("2.0.1")
testSparkVersion("2.0.2")
+ testSparkVersion("2.0.3-SNAPSHOT")
testSparkVersion("2.0.0.2.5.1.0-56") // LIVY-229
testSparkVersion("2.0")
+ testSparkVersion("2.1.0")
+ testSparkVersion("2.1.1")
}
- test("should not support Spark older than 1.6 or newer than 2.0") {
+ test("should not support Spark older than 1.6") {
intercept[IllegalArgumentException] { testSparkVersion("1.4.0") }
intercept[IllegalArgumentException] { testSparkVersion("1.5.0") }
intercept[IllegalArgumentException] { testSparkVersion("1.5.1") }
intercept[IllegalArgumentException] { testSparkVersion("1.5.2") }
+ intercept[IllegalArgumentException] { testSparkVersion("1.5.0-cdh5.6.1") }
}
test("should fail on bad version") {
@@ -131,5 +136,6 @@ class LivySparkUtilsSuite extends FunSuite with Matchers with LivyBaseUnitTestSu
sparkScalaVersion(formatSparkVersion("1.6.2"), None, livyConf) shouldBe "2.10"
sparkScalaVersion(formatSparkVersion("2.0.0"), None, livyConf) shouldBe "2.11"
sparkScalaVersion(formatSparkVersion("2.0.1"), None, livyConf) shouldBe "2.11"
+ sparkScalaVersion(formatSparkVersion("2.1.0"), None, livyConf) shouldBe "2.11"
}
}
diff --git a/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala
index 2f786213d..1d903677f 100644
--- a/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala
@@ -136,8 +136,12 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]
val mockSparkSubmit = mock[LineBufferedProcess]
- val sparkSubmitLog = IndexedSeq("SPARK-SUBMIT", "LOG")
- when(mockSparkSubmit.inputLines).thenReturn(sparkSubmitLog)
+ val sparkSubmitInfoLog = IndexedSeq("SPARK-SUBMIT", "LOG")
+ val sparkSubmitErrorLog = IndexedSeq("SPARK-SUBMIT", "error log")
+ val sparkSubmitLog = ("stdout: " +: sparkSubmitInfoLog) ++
+ ("\nstderr: " +: sparkSubmitErrorLog) :+ "\nYARN Diagnostics: "
+ when(mockSparkSubmit.inputLines).thenReturn(sparkSubmitInfoLog)
+ when(mockSparkSubmit.errorLines).thenReturn(sparkSubmitErrorLog)
val waitForCalledLatch = new CountDownLatch(1)
when(mockSparkSubmit.waitFor()).thenAnswer(new Answer[Int]() {
override def answer(invocation: InvocationOnMock): Int = {
@@ -168,15 +172,18 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
it("can kill spark-submit while it's running") {
Clock.withSleepMethod(mockSleep) {
+ val livyConf = new LivyConf()
+ livyConf.set(LivyConf.YARN_APP_LOOKUP_TIMEOUT, "0")
+
val mockYarnClient = mock[YarnClient]
val mockSparkSubmit = mock[LineBufferedProcess]
- when(mockSparkSubmit.exitValue()).thenReturn(1)
val sparkSubmitRunningLatch = new CountDownLatch(1)
// Simulate a running spark-submit
- when(mockSparkSubmit.inputLines).thenAnswer(new Answer[Unit]() {
- override def answer(invocation: InvocationOnMock): Unit = {
+ when(mockSparkSubmit.waitFor()).thenAnswer(new Answer[Int]() {
+ override def answer(invocation: InvocationOnMock): Int = {
sparkSubmitRunningLatch.await()
+ 0
}
})
@@ -190,6 +197,7 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
cleanupThread(app.yarnAppMonitorThread) {
app.kill()
verify(mockSparkSubmit, times(1)).destroy()
+ sparkSubmitRunningLatch.countDown()
}
}
}
diff --git a/test-lib/pom.xml b/test-lib/pom.xml
index 0fd9e9033..b9a943243 100644
--- a/test-lib/pom.xml
+++ b/test-lib/pom.xml
@@ -21,12 +21,12 @@
com.cloudera.livy
livy-main
- 0.3.0-SNAPSHOT
+ 0.4.0-SNAPSHOT
com.cloudera.livy
livy-test-lib
- 0.3.0-SNAPSHOT
+ 0.4.0-SNAPSHOT
jar
@@ -85,7 +85,37 @@
true
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+
+
+ parse-spark-version
+ process-sources
+
+ parse-version
+
+
+ spark
+ ${spark.version}
+
+
+
+ add-spark2-source-code
+ process-sources
+
+ add-source
+
+
+
+
+
+
+
+
+
+
-
diff --git a/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/DatasetTest.java b/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/DatasetTest.java
new file mode 100644
index 000000000..086609874
--- /dev/null
+++ b/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/DatasetTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.livy.test.jobs.spark2;
+
+import java.util.Arrays;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FilterFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import com.cloudera.livy.Job;
+import com.cloudera.livy.JobContext;
+
+public class DatasetTest implements Job {
+
+ @Override
+ public Long call(JobContext jc) throws Exception {
+ SparkSession spark = jc.sparkSession();
+
+ JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+ JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3)).map(
+ new Function() {
+ public Row call(Integer integer) throws Exception {
+ return RowFactory.create(integer);
+ }
+ });
+ StructType schema = DataTypes.createStructType(new StructField[] {
+ DataTypes.createStructField("value", DataTypes.IntegerType, false)
+ });
+
+ Dataset ds = spark.createDataFrame(rdd, schema);
+
+ return ds.filter(new FilterFunction() {
+ @Override
+ public boolean call(Row row) throws Exception {
+ return row.getInt(0) >= 2;
+ }
+ }).count();
+ }
+}
diff --git a/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/SparkSessionTest.java b/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/SparkSessionTest.java
new file mode 100644
index 000000000..f6d1f2645
--- /dev/null
+++ b/test-lib/src/main/spark2/java/com/cloudera/livy/test/jobs/spark2/SparkSessionTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to Cloudera, Inc. under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Cloudera, Inc. licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.livy.test.jobs.spark2;
+
+import java.util.Arrays;
+
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+import com.cloudera.livy.Job;
+import com.cloudera.livy.JobContext;
+
+public class SparkSessionTest implements Job {
+
+ @Override
+ public Long call(JobContext jc) throws Exception {
+ // Make sure SparkSession and SparkContext is callable
+ SparkSession session = jc.sparkSession();
+
+ JavaSparkContext sc = new JavaSparkContext(session.sparkContext());
+ return sc.parallelize(Arrays.asList(1, 2, 3)).count();
+ }
+}