Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIVY-359. Cache livy logs as config driven #332

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
# If livy should impersonate the requesting users when creating a new session.
# livy.impersonation.enabled = true

# Logs size livy can cache for each session/batch. 0 means don't cache the logs.
# livy.cache-log.size = 200

# Comma-separated list of Livy RSC jars. By default Livy will upload jars from its installation
# directory every time a session is started. By caching these files in HDFS, for example, startup
# time of sessions on YARN can be reduced.
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/scala/com/cloudera/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ object LivyConf {
*/
val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "")

// Livy will cache the max no of logs specified. 0 means don't cache the logs.
val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200)

// If Livy can't find the yarn app within this time, consider it lost.
val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "60s")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,17 +380,9 @@ class InteractiveSession(
heartbeat()

private val app = mockApp.orElse {
if (livyConf.isRunningOnYarn()) {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_))
// When Livy is running with YARN, SparkYarnApp can provide better YARN integration.
// (e.g. Reflect YARN application state to session state).
Option(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else {
// When Livy is running with other cluster manager, SparkApp doesn't provide any
// additional benefit over controlling RSCDriver using RSCClient. Don't use it.
None
}
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
driverProcess.map { _ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)) }
}

if (client.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package com.cloudera.livy.util

import com.cloudera.livy.{Logging, Utils}

class LineBufferedProcess(process: Process) extends Logging {
class LineBufferedProcess(process: Process, logSize: Int) extends Logging {

private[this] val _inputStream = new LineBufferedStream(process.getInputStream)
private[this] val _errorStream = new LineBufferedStream(process.getErrorStream)
private[this] val _inputStream = new LineBufferedStream(process.getInputStream, logSize)
private[this] val _errorStream = new LineBufferedStream(process.getErrorStream, logSize)

def inputLines: IndexedSeq[String] = _inputStream.lines
def errorLines: IndexedSeq[String] = _errorStream.lines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import java.util.concurrent.locks.ReentrantLock

import scala.io.Source

import com.google.common.collect.EvictingQueue

import com.cloudera.livy.Logging

class LineBufferedStream(inputStream: InputStream) extends Logging {
class LineBufferedStream(inputStream: InputStream, logSize: Int) extends Logging {

private[this] var _lines: IndexedSeq[String] = IndexedSeq()
private[this] val _lines: EvictingQueue[String] = EvictingQueue.create[String](logSize)

private[this] val _lock = new ReentrantLock()
private[this] val _condition = _lock.newCondition()
Expand All @@ -37,16 +39,16 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
override def run() = {
val lines = Source.fromInputStream(inputStream).getLines()
for (line <- lines) {
info(s"stdout: $line")
_lock.lock()
try {
_lines = _lines :+ line
_lines.add(line)
_condition.signalAll()
} finally {
_lock.unlock()
}
}

_lines.map { line => info("stdout: ", line) }
_lock.lock()
try {
_finished = true
Expand All @@ -59,7 +61,12 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
thread.setDaemon(true)
thread.start()

def lines: IndexedSeq[String] = _lines
def lines: IndexedSeq[String] = {
_lock.lock()
val lines = IndexedSeq.empty[String] ++ _lines.toArray(Array.empty[String])
_lock.unlock()
lines
}

def iterator: Iterator[String] = {
new LinesIterator
Expand All @@ -68,10 +75,9 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
def waitUntilClose(): Unit = thread.join()

private class LinesIterator extends Iterator[String] {
private[this] var index = 0

override def hasNext: Boolean = {
if (index < _lines.length) {
if (_lines.size > 0) {
true
} else {
// Otherwise we might still have more data.
Expand All @@ -81,7 +87,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
false
} else {
_condition.await()
index < _lines.length
_lines.size > 0
}
} finally {
_lock.unlock()
Expand All @@ -90,8 +96,9 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
}

override def next(): String = {
val line = _lines(index)
index += 1
_lock.lock()
val line = _lines.poll()
_lock.unlock()
line
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object LivySparkUtils extends Logging {
pb.environment().put("LIVY_TEST_CLASSPATH", sys.props("java.class.path"))
}

val process = new LineBufferedProcess(pb.start())
val process = new LineBufferedProcess(pb.start(), 200)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using configuration instead of hard-coded number.

Copy link
Contributor Author

@praveen-kanamarlapudi praveen-kanamarlapudi May 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it runs only when livy server is started. Do we need to make it config driven?
If the number is set too low by mistake, the livy server mayn't start.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see.

val exitCode = process.waitFor()
val output = process.inputIterator.mkString("\n")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class SparkProcApp (
}
}

override def log(): IndexedSeq[String] = process.inputLines
override def log(): IndexedSeq[String] =
("stdout: " +: process.inputLines) ++ ("\nstderr: " +: process.errorLines)

private def changeState(newState: SparkApp.State.Value) = {
if (state != newState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class SparkProcessBuilder(livyConf: LivyConf) extends Logging {
_redirectError.foreach(pb.redirectError)
_redirectErrorStream.foreach(pb.redirectErrorStream)

new LineBufferedProcess(pb.start())
new LineBufferedProcess(pb.start(), livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))
}

}