Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LIVY-990][SERVER] Persisting Livy Statements #423

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,6 @@
# Enable to allow custom classpath by proxy user in cluster mode
# The below configuration parameter is disabled by default.
# livy.server.session.allow-custom-classpath = true

# livy.server.statements.persistent.enabled = false
# livy.server.statements.persistent.path = /tmp/livy/statements
4 changes: 4 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ object LivyConf {

val SESSION_ALLOW_CUSTOM_CLASSPATH = Entry("livy.server.session.allow-custom-classpath", false)

val STATEMENTS_PERSISTENT_ENABLED = Entry("livy.server.statements.persistent.enabled", false)
val STATEMENTS_PERSISTENT_PATH = Entry("livy.server.statements.persistent.path",
"/tmp/livy/statements")

val SPARK_MASTER = "spark.master"
val SPARK_DEPLOY_MODE = "spark.submit.deployMode"
val SPARK_JARS = "spark.jars"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ class InteractiveSession(
import InteractiveSession._

private var serverSideState: SessionState = initialState
private val statementStore = new StatementStore(livyConf)

override protected val heartbeatTimeout: FiniteDuration = {
val heartbeatTimeoutInSecond = heartbeatTimeoutS
Expand Down Expand Up @@ -537,6 +538,10 @@ class InteractiveSession(

override def stopSession(): Unit = {
try {
statementStore.save(appTag + "_statements", Map(
"total_statements" -> statements.length,
"statements" -> statements
))
transition(SessionState.ShuttingDown)
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
client.foreach { _.stop(true) }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.server.interactive

import java.net.URI
import java.util

import scala.util.control.NonFatal

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.Options.{CreateOpts, Rename}

import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.sessions.SessionKindModule

class StatementStore(livyConf: LivyConf, mockFileContext: Option[FileContext] = None)
extends Logging {

protected val mapper: ObjectMapper = new ObjectMapper()
.registerModule(DefaultScalaModule)
.registerModule(new SessionKindModule())

def serializeToBytes(value: Object): Array[Byte] = mapper.writeValueAsBytes(value)

private val fsUri = {
val fsPath = livyConf.get(LivyConf.STATEMENTS_PERSISTENT_PATH)
require(fsPath != null && fsPath.nonEmpty,
s"Please config ${LivyConf.STATEMENTS_PERSISTENT_PATH.key}.")
new URI(fsPath)
}

private val fileContext: FileContext = mockFileContext.getOrElse {
FileContext.getFileContext(fsUri)
}

def save(fileName: String, value: Object): Unit = {
if (!livyConf.getBoolean(LivyConf.STATEMENTS_PERSISTENT_ENABLED)) {
return
}
logger.info(s"Writing statements info $fileName.")
// Write to a temp file then rename to avoid file corruption if livy-server crashes
// in the middle of the write.
val tmpPath = absPath(s"$fileName.tmp")
val createFlag = util.EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
try {
val tmpFile = fileContext.create(tmpPath, createFlag, CreateOpts.createParent())
tmpFile.write(serializeToBytes(value))
tmpFile.close()
// Assume rename is atomic.
fileContext.rename(tmpPath, absPath(fileName), Rename.OVERWRITE)
} catch {
case e: Exception => logger.error(s"Failed to write statements into $fileName.", e)
}

try {
val crcPath = new Path(tmpPath.getParent, s".${tmpPath.getName}.crc")
fileContext.delete(crcPath, false)
} catch {
case NonFatal(_) => // Swallow the exception.
}
}

private def absPath(fileName: String): Path = new Path(fsUri.getPath, fileName)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.server.interactive

import java.io.IOException
import java.util

import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
import org.hamcrest.Description
import org.mockito.ArgumentMatcher
import org.mockito.Matchers.{any, anyBoolean, argThat, eq => equal}
import org.mockito.Mockito.{atLeastOnce, never, verify, when}
import org.mockito.internal.matchers.Equals
import org.scalatest.FunSpec
import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}

class StatementStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
describe("StatementStore") {
def pathEq(wantedPath: String): Path = argThat(new ArgumentMatcher[Path] {
private val matcher = new Equals(wantedPath)

override def matches(path: Any): Boolean = matcher.matches(path.toString)

override def describeTo(d: Description): Unit = {
matcher.describeTo(d)
}
})

def makeConf(): LivyConf = {
val conf = new LivyConf()
conf.set(LivyConf.STATEMENTS_PERSISTENT_ENABLED, true)
conf.set(LivyConf.STATEMENTS_PERSISTENT_PATH, "/tmp/")
conf
}

def mockFileContext(rootDirPermission: String): FileContext = {
val fileContext = mock[FileContext]
val rootDirStatus = mock[FileStatus]
when(fileContext.getFileStatus(any())).thenReturn(rootDirStatus)
when(rootDirStatus.getPermission).thenReturn(new FsPermission(rootDirPermission))
fileContext
}

it("save should write with an intermediate file") {
val fileContext = mockFileContext("700")
val outputStream = mock[FSDataOutputStream]
when(fileContext.create(pathEq("/tmp/statements.tmp"), any[util.EnumSet[CreateFlag]],
any[CreateOpts])).thenReturn(outputStream)

val stateStore = new StatementStore(makeConf(), Some(fileContext))
stateStore.save("statements", "value")

verify(outputStream).write(""""value"""".getBytes)
verify(outputStream, atLeastOnce).close()
verify(fileContext).rename(pathEq("/tmp/statements.tmp"), pathEq("/tmp/statements"),
equal(Rename.OVERWRITE))
verify(fileContext).delete(pathEq("/tmp/.statements.tmp.crc"), equal(false))
}

it("save should not write with an intermediate file") {
val fileContext = mockFileContext("700")
val outputStream = mock[FSDataOutputStream]
when(fileContext.create(pathEq("/tmp/statements.tmp"), any[util.EnumSet[CreateFlag]],
any[CreateOpts])).thenReturn(outputStream)

val stateStore = new StatementStore(new LivyConf(), Some(fileContext))
stateStore.save("statements", "value")

verify(outputStream, never).write(any[Byte])
}

it("save throws exception during file write") {
val fileContext = mockFileContext("700")
val outputStream = mock[FSDataOutputStream]
when(outputStream.close()).thenThrow(new IOException())

val stateStore = new StatementStore(makeConf(), Some(fileContext))
stateStore.save("statements", "value")

verify(outputStream, never).close()
}

it("save throws exception during file delete") {
val fileContext = mockFileContext("700")
val outputStream = mock[FSDataOutputStream]
when(fileContext.create(pathEq("/tmp/statements.tmp"), any[util.EnumSet[CreateFlag]],
any[CreateOpts])).thenReturn(outputStream)
when(fileContext.delete(any[Path], anyBoolean())).thenThrow(new IOException())

val stateStore = new StatementStore(makeConf(), Some(fileContext))
stateStore.save("statements", "value")

verify(outputStream).write(""""value"""".getBytes)
}
}
}