Skip to content

Commit

Permalink
Updated code as per the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Kanamarlapudi authored and praveen-kanamarlapudi committed May 16, 2017
1 parent e94f507 commit 75d8511
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,9 @@ class InteractiveSession(
heartbeat()

private val app = mockApp.orElse {
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
.map(new LineBufferedProcess(_, livyConf.getInt(LivyConf.SPARK_LOGS_SIZE)))
Option(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
driverProcess.map(_ => SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
}

if (client.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ class LineBufferedStream(inputStream: InputStream, logSize: Int) extends Logging
private val thread = new Thread {
override def run() = {
val lines = Source.fromInputStream(inputStream).getLines()
for (line <- lines) {
for (line <- lines) {
info(s"stdout: $line")
_lock.lock()
try {
info(s"stdout: $line")
if(logSize > 0) _lines add line
try {
_lines add line
_condition.signalAll()
} finally {
_lock.unlock()
Expand Down
3 changes: 1 addition & 2 deletions server/src/main/scala/com/cloudera/livy/utils/SparkApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ object SparkApp {
if (livyConf.isRunningOnYarn()) {
new SparkYarnApp(uniqueAppTag, appId, process, listener, livyConf)
} else {
// process is None in recovery mode
// require(process.isDefined, "process must not be None when Livy master is not YARN.")
require(process.isDefined, "process must not be None when Livy master is not YARN.")
new SparkProcApp(process.get, listener)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class SparkProcApp (

override def log(): IndexedSeq[String] =
("stdout: " +: process.inputLines) ++ ("\nstderr: " +: process.errorLines)

private def changeState(newState: SparkApp.State.Value) = {
if (state != newState) {
listener.foreach(_.stateChanged(state, newState))
Expand Down

0 comments on commit 75d8511

Please sign in to comment.