diff --git a/conf/livy.conf.template b/conf/livy.conf.template index 8bef995c0..140c651b8 100644 --- a/conf/livy.conf.template +++ b/conf/livy.conf.template @@ -31,6 +31,9 @@ # If livy should impersonate the requesting users when creating a new session. # livy.impersonation.enabled = true +# Logs size livy can cache for each session/batch. 0 means don't cache the logs. +# livy.spark.logs.size = 200 + # Comma-separated list of Livy RSC jars. By default Livy will upload jars from its installation # directory every time a session is started. By caching these files in HDFS, for example, startup # time of sessions on YARN can be reduced. diff --git a/server/src/main/scala/com/cloudera/livy/LivyConf.scala b/server/src/main/scala/com/cloudera/livy/LivyConf.scala index 233aa6315..7d3e6cfd6 100644 --- a/server/src/main/scala/com/cloudera/livy/LivyConf.scala +++ b/server/src/main/scala/com/cloudera/livy/LivyConf.scala @@ -115,6 +115,9 @@ object LivyConf { */ val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "") + // Livy will cache the max no of logs specified. 0 means don't cache the logs. + val SPARK_LOGS_SIZE = Entry("livy.spark.logs.size", 200) + // If Livy can't find the yarn app within this time, consider it lost. val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "60s") diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala index 6d10c573c..43b830a74 100644 --- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala @@ -380,17 +380,9 @@ class InteractiveSession( heartbeat() private val app = mockApp.orElse { - if (livyConf.isRunningOnYarn()) { val driverProcess = client.flatMap { c => Option(c.getDriverProcess) } - .map(new LineBufferedProcess(_)) - // When Livy is running with YARN, SparkYarnApp can provide better YARN integration. - // (e.g. Reflect YARN application state to session state). + .map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))) Option(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this))) - } else { - // When Livy is running with other cluster manager, SparkApp doesn't provide any - // additional benefit over controlling RSCDriver using RSCClient. Don't use it. - None - } } if (client.isEmpty) { diff --git a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala index 9f6b614b3..422e08e09 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedProcess.scala @@ -20,10 +20,10 @@ package com.cloudera.livy.util import com.cloudera.livy.{Logging, Utils} -class LineBufferedProcess(process: Process) extends Logging { +class LineBufferedProcess(process: Process, logSize: Int) extends Logging { - private[this] val _inputStream = new LineBufferedStream(process.getInputStream) - private[this] val _errorStream = new LineBufferedStream(process.getErrorStream) + private[this] val _inputStream = new LineBufferedStream(process.getInputStream, logSize) + private[this] val _errorStream = new LineBufferedStream(process.getErrorStream, logSize) def inputLines: IndexedSeq[String] = _inputStream.lines def errorLines: IndexedSeq[String] = _errorStream.lines diff --git a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala index a8949aff0..e296b4572 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala @@ -23,11 +23,13 @@ import java.util.concurrent.locks.ReentrantLock import scala.io.Source +import com.google.common.collect.EvictingQueue + import com.cloudera.livy.Logging -class LineBufferedStream(inputStream: InputStream) extends Logging { +class LineBufferedStream(inputStream: InputStream, logSize: Int) extends Logging { - private[this] var _lines: IndexedSeq[String] = IndexedSeq() + private[this] val _lines: EvictingQueue[String] = EvictingQueue.create[String](logSize) private[this] val _lock = new ReentrantLock() private[this] val _condition = _lock.newCondition() @@ -36,17 +38,17 @@ class LineBufferedStream(inputStream: InputStream) extends Logging { private val thread = new Thread { override def run() = { val lines = Source.fromInputStream(inputStream).getLines() - for (line <- lines) { + for (line <- lines) { _lock.lock() try { - _lines = _lines :+ line + info(s"stdout: $line") + if(logSize > 0) _lines add line _condition.signalAll() } finally { _lock.unlock() } } - _lines.map { line => info("stdout: ", line) } _lock.lock() try { _finished = true @@ -59,7 +61,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging { thread.setDaemon(true) thread.start() - def lines: IndexedSeq[String] = _lines + def lines: IndexedSeq[String] = IndexedSeq.empty[String] ++ _lines.toArray(Array.empty[String]) def iterator: Iterator[String] = { new LinesIterator @@ -71,7 +73,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging { private[this] var index = 0 override def hasNext: Boolean = { - if (index < _lines.length) { + if (_lines.size > 0) { true } else { // Otherwise we might still have more data. @@ -81,7 +83,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging { false } else { _condition.await() - index < _lines.length + _lines.size > 0 } } finally { _lock.unlock() @@ -90,7 +92,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging { } override def next(): String = { - val line = _lines(index) + val line = _lines.poll() index += 1 line } diff --git a/server/src/main/scala/com/cloudera/livy/utils/LivySparkUtils.scala b/server/src/main/scala/com/cloudera/livy/utils/LivySparkUtils.scala index 766eb91ee..71d57c254 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/LivySparkUtils.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/LivySparkUtils.scala @@ -100,7 +100,7 @@ object LivySparkUtils extends Logging { pb.environment().put("LIVY_TEST_CLASSPATH", sys.props("java.class.path")) } - val process = new LineBufferedProcess(pb.start()) + val process = new LineBufferedProcess(pb.start(), 200) val exitCode = process.waitFor() val output = process.inputIterator.mkString("\n") diff --git a/server/src/main/scala/com/cloudera/livy/utils/SparkProcApp.scala b/server/src/main/scala/com/cloudera/livy/utils/SparkProcApp.scala index ca27ab050..bec56d135 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/SparkProcApp.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/SparkProcApp.scala @@ -40,8 +40,8 @@ class SparkProcApp ( } } - override def log(): IndexedSeq[String] = process.inputLines - + override def log(): IndexedSeq[String] = + ("stdout: " +: process.inputLines) ++ ("\nstderr: " +: process.errorLines) private def changeState(newState: SparkApp.State.Value) = { if (state != newState) { listener.foreach(_.stateChanged(state, newState)) diff --git a/server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala b/server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala index a38a76177..c239946b4 100644 --- a/server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala +++ b/server/src/main/scala/com/cloudera/livy/utils/SparkProcessBuilder.scala @@ -214,7 +214,7 @@ class SparkProcessBuilder(livyConf: LivyConf) extends Logging { _redirectError.foreach(pb.redirectError) _redirectErrorStream.foreach(pb.redirectErrorStream) - new LineBufferedProcess(pb.start()) + new LineBufferedProcess(pb.start(), livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)) } }