Skip to content

Commit

Permalink
Added cache log size as config driven.
Browse files Browse the repository at this point in the history
  • Loading branch information
Kanamarlapudi committed May 15, 2017
1 parent 61b206e commit 7d76aae
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 25 deletions.
3 changes: 3 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
# If livy should impersonate the requesting users when creating a new session.
# livy.impersonation.enabled = true

# Logs size livy can cache for each session/batch. 0 means don't cache the logs.
# livy.spark.logs.size = 200

# Comma-separated list of Livy RSC jars. By default Livy will upload jars from its installation
# directory every time a session is started. By caching these files in HDFS, for example, startup
# time of sessions on YARN can be reduced.
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/scala/com/cloudera/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ object LivyConf {
*/
val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "")

// Livy will cache the max no of logs specified. 0 means don't cache the logs.
val SPARK_LOGS_SIZE = Entry("livy.spark.logs.size", 200)

// If Livy can't find the yarn app within this time, consider it lost.
val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "60s")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,17 +380,9 @@ class InteractiveSession(
heartbeat()

private val app = mockApp.orElse {
if (livyConf.isRunningOnYarn()) {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_))
// When Livy is running with YARN, SparkYarnApp can provide better YARN integration.
// (e.g. Reflect YARN application state to session state).
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
Option(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else {
// When Livy is running with other cluster manager, SparkApp doesn't provide any
// additional benefit over controlling RSCDriver using RSCClient. Don't use it.
None
}
}

if (client.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package com.cloudera.livy.util

import com.cloudera.livy.{Logging, Utils}

class LineBufferedProcess(process: Process) extends Logging {
class LineBufferedProcess(process: Process, logSize: Int) extends Logging {

private[this] val _inputStream = new LineBufferedStream(process.getInputStream)
private[this] val _errorStream = new LineBufferedStream(process.getErrorStream)
private[this] val _inputStream = new LineBufferedStream(process.getInputStream, logSize)
private[this] val _errorStream = new LineBufferedStream(process.getErrorStream, logSize)

def inputLines: IndexedSeq[String] = _inputStream.lines
def errorLines: IndexedSeq[String] = _errorStream.lines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import java.util.concurrent.locks.ReentrantLock

import scala.io.Source

import com.google.common.collect.EvictingQueue

import com.cloudera.livy.Logging

class LineBufferedStream(inputStream: InputStream) extends Logging {
class LineBufferedStream(inputStream: InputStream, logSize: Int) extends Logging {

private[this] var _lines: IndexedSeq[String] = IndexedSeq()
private[this] val _lines: EvictingQueue[String] = EvictingQueue.create[String](logSize)

private[this] val _lock = new ReentrantLock()
private[this] val _condition = _lock.newCondition()
Expand All @@ -36,17 +38,17 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
private val thread = new Thread {
override def run() = {
val lines = Source.fromInputStream(inputStream).getLines()
for (line <- lines) {
for (line <- lines) {
_lock.lock()
try {
_lines = _lines :+ line
info(s"stdout: $line")
if(logSize > 0) _lines add line
_condition.signalAll()
} finally {
_lock.unlock()
}
}

_lines.map { line => info("stdout: ", line) }
_lock.lock()
try {
_finished = true
Expand All @@ -59,7 +61,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
thread.setDaemon(true)
thread.start()

def lines: IndexedSeq[String] = _lines
def lines: IndexedSeq[String] = IndexedSeq.empty[String] ++ _lines.toArray(Array.empty[String])

def iterator: Iterator[String] = {
new LinesIterator
Expand All @@ -71,7 +73,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
private[this] var index = 0

override def hasNext: Boolean = {
if (index < _lines.length) {
if (_lines.size > 0) {
true
} else {
// Otherwise we might still have more data.
Expand All @@ -81,7 +83,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
false
} else {
_condition.await()
index < _lines.length
_lines.size > 0
}
} finally {
_lock.unlock()
Expand All @@ -90,7 +92,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
}

override def next(): String = {
val line = _lines(index)
val line = _lines.poll()
index += 1
line
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object LivySparkUtils extends Logging {
pb.environment().put("LIVY_TEST_CLASSPATH", sys.props("java.class.path"))
}

val process = new LineBufferedProcess(pb.start())
val process = new LineBufferedProcess(pb.start(), 200)
val exitCode = process.waitFor()
val output = process.inputIterator.mkString("\n")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class SparkProcApp (
}
}

override def log(): IndexedSeq[String] = process.inputLines

override def log(): IndexedSeq[String] =
("stdout: " +: process.inputLines) ++ ("\nstderr: " +: process.errorLines)
private def changeState(newState: SparkApp.State.Value) = {
if (state != newState) {
listener.foreach(_.stateChanged(state, newState))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class SparkProcessBuilder(livyConf: LivyConf) extends Logging {
_redirectError.foreach(pb.redirectError)
_redirectErrorStream.foreach(pb.redirectErrorStream)

new LineBufferedProcess(pb.start())
new LineBufferedProcess(pb.start(), livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))
}

}

0 comments on commit 7d76aae

Please sign in to comment.