Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIVY-359. Cache livy logs as config driven #332

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
# If livy should impersonate the requesting users when creating a new session.
# livy.impersonation.enabled = true

# Logs size livy can cache for each session/batch. 0 means don't cache the logs.
# livy.spark.logs.size = 200
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to rename this to "livy.session.cache-log.size".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Let me do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we name it like livy.cache-log.size or something as it applies to both interactive and batch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'm fine with it.


# Comma-separated list of Livy RSC jars. By default Livy will upload jars from its installation
# directory every time a session is started. By caching these files in HDFS, for example, startup
# time of sessions on YARN can be reduced.
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/scala/com/cloudera/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ object LivyConf {
*/
val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "")

// Livy will cache the max no of logs specified. 0 means don't cache the logs.
val SPARK_LOGS_SIZE = Entry("livy.spark.logs.size", 200)

// If Livy can't find the yarn app within this time, consider it lost.
val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "60s")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,17 +380,9 @@ class InteractiveSession(
heartbeat()

private val app = mockApp.orElse {
if (livyConf.isRunningOnYarn()) {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_))
// When Livy is running with YARN, SparkYarnApp can provide better YARN integration.
// (e.g. Reflect YARN application state to session state).
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
Copy link
Contributor Author

@praveen-kanamarlapudi praveen-kanamarlapudi May 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao Earlier we were returning None if the master is local (not yarn) so the driver logs didn't go to livy logs.
I think this change should solve the problem.
Let me know if we are ok with this change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I want.

Option(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else {
// When Livy is running with other cluster manager, SparkApp doesn't provide any
// additional benefit over controlling RSCDriver using RSCClient. Don't use it.
None
}
}

if (client.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ package com.cloudera.livy.util

import com.cloudera.livy.{Logging, Utils}

class LineBufferedProcess(process: Process) extends Logging {
class LineBufferedProcess(process: Process, logSize: Int) extends Logging {

private[this] val _inputStream = new LineBufferedStream(process.getInputStream)
private[this] val _errorStream = new LineBufferedStream(process.getErrorStream)
private[this] val _inputStream = new LineBufferedStream(process.getInputStream, logSize)
private[this] val _errorStream = new LineBufferedStream(process.getErrorStream, logSize)

def inputLines: IndexedSeq[String] = _inputStream.lines
def errorLines: IndexedSeq[String] = _errorStream.lines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import java.util.concurrent.locks.ReentrantLock

import scala.io.Source

import com.google.common.collect.EvictingQueue

import com.cloudera.livy.Logging

class LineBufferedStream(inputStream: InputStream) extends Logging {
class LineBufferedStream(inputStream: InputStream, logSize: Int) extends Logging {

private[this] var _lines: IndexedSeq[String] = IndexedSeq()
private[this] val _lines: EvictingQueue[String] = EvictingQueue.create[String](logSize)

private[this] val _lock = new ReentrantLock()
private[this] val _condition = _lock.newCondition()
Expand All @@ -36,17 +38,17 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
private val thread = new Thread {
override def run() = {
val lines = Source.fromInputStream(inputStream).getLines()
for (line <- lines) {
for (line <- lines) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove unnecessary white space.

_lock.lock()
try {
_lines = _lines :+ line
info(s"stdout: $line")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao In client mode we were seeing spark driver logs in livy log after great delay as the log statement was after the loop. I agree log inside the lock is heavy.

Let me know if there is a better way to handle the scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can move this line out of try finally block.

if(logSize > 0) _lines add line
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add if check, I assume if logSize is zero then we cannot add line to this queue.

_condition.signalAll()
} finally {
_lock.unlock()
}
}

_lines.map { line => info("stdout: ", line) }
_lock.lock()
try {
_finished = true
Expand All @@ -59,7 +61,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
thread.setDaemon(true)
thread.start()

def lines: IndexedSeq[String] = _lines
def lines: IndexedSeq[String] = IndexedSeq.empty[String] ++ _lines.toArray(Array.empty[String])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will introduce threading issue when copying to new array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add lock here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so.


def iterator: Iterator[String] = {
new LinesIterator
Expand All @@ -71,7 +73,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
private[this] var index = 0

override def hasNext: Boolean = {
if (index < _lines.length) {
if (_lines.size > 0) {
true
} else {
// Otherwise we might still have more data.
Expand All @@ -81,7 +83,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
false
} else {
_condition.await()
index < _lines.length
_lines.size > 0
}
} finally {
_lock.unlock()
Expand All @@ -90,7 +92,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
}

override def next(): String = {
val line = _lines(index)
val line = _lines.poll()
index += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This index seems not used any more.

Also I was thinking this poll may has threading issue with add. I'm not sure if EvictingQueue is a thread safe queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. It's not thread safe. Added lock to it.

line
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object LivySparkUtils extends Logging {
pb.environment().put("LIVY_TEST_CLASSPATH", sys.props("java.class.path"))
}

val process = new LineBufferedProcess(pb.start())
val process = new LineBufferedProcess(pb.start(), 200)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using configuration instead of hard-coded number.

Copy link
Contributor Author

@praveen-kanamarlapudi praveen-kanamarlapudi May 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it runs only when livy server is started. Do we need to make it config driven?
If the number is set too low by mistake, the livy server mayn't start.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see.

val exitCode = process.waitFor()
val output = process.inputIterator.mkString("\n")

Expand Down
3 changes: 2 additions & 1 deletion server/src/main/scala/com/cloudera/livy/utils/SparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ object SparkApp {
if (livyConf.isRunningOnYarn()) {
new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf)
} else {
require(process.isDefined, "process must not be None when Livy master is not YARN.")
// process is None in recovery mode
// require(process.isDefined, "process must not be None when Livy master is not YARN.")
new SparkProcApp(process.get, listener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If process can be None, will this process.get work?

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class SparkProcApp (
}
}

override def log(): IndexedSeq[String] = process.inputLines

override def log(): IndexedSeq[String] =
("stdout: " +: process.inputLines) ++ ("\nstderr: " +: process.errorLines)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add empty line after it.

private def changeState(newState: SparkApp.State.Value) = {
if (state != newState) {
listener.foreach(_.stateChanged(state, newState))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class SparkProcessBuilder(livyConf: LivyConf) extends Logging {
_redirectError.foreach(pb.redirectError)
_redirectErrorStream.foreach(pb.redirectErrorStream)

new LineBufferedProcess(pb.start())
new LineBufferedProcess(pb.start(), livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))
}

}