Skip to content

Commit

Permalink
[SPARK-17640][SQL] Avoid using -1 as the default batchId for FileStre…
Browse files Browse the repository at this point in the history
…amSource.FileEntry

## What changes were proposed in this pull request?

Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we can make sure not writing any FileEntry(..., batchId = -1) into the log. This also avoids people misusing it in future (apache#15203 is an example).

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu <[email protected]>

Closes apache#15206 from zsxwing/cleanup.
  • Loading branch information
zsxwing committed Sep 23, 2016
1 parent 947b8c6 commit 62ccf27
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class FileStreamSource(
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)

metadataLog.allFiles().foreach { entry =>
seenFiles.add(entry)
seenFiles.add(entry.path, entry.timestamp)
}
seenFiles.purge()

Expand All @@ -73,14 +73,16 @@ class FileStreamSource(
*/
private def fetchMaxOffset(): LongOffset = synchronized {
// All the new files found - ignore aged files and files that we have seen.
val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)
val newFiles = fetchAllFiles().filter {
case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
}

// Obey user's setting to limit the number of files in this batch trigger.
val batchFiles =
if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles

batchFiles.foreach { file =>
seenFiles.add(file)
seenFiles.add(file._1, file._2)
logDebug(s"New file: $file")
}
val numPurged = seenFiles.purge()
Expand All @@ -95,7 +97,9 @@ class FileStreamSource(

if (batchFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = maxBatchId)).toArray)
metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) =>
FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId)
}.toArray)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}

Expand Down Expand Up @@ -140,12 +144,12 @@ class FileStreamSource(
/**
* Returns a list of files found, sorted by their timestamp.
*/
private def fetchAllFiles(): Seq[FileEntry] = {
private def fetchAllFiles(): Seq[(String, Long)] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
FileEntry(status.getPath.toUri.toString, status.getModificationTime)
(status.getPath.toUri.toString, status.getModificationTime)
}
val endTime = System.nanoTime
val listingTimeMs = (endTime.toDouble - startTime) / 1000000
Expand All @@ -172,10 +176,7 @@ object FileStreamSource {
/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long

val NOT_SET = -1L

case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = NOT_SET)
extends Serializable
case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable

/**
* A custom hash map used to track the list of files seen. This map is not thread-safe.
Expand All @@ -196,21 +197,21 @@ object FileStreamSource {
private var lastPurgeTimestamp: Timestamp = 0L

/** Add a new file to the map. */
def add(file: FileEntry): Unit = {
map.put(file.path, file.timestamp)
if (file.timestamp > latestTimestamp) {
latestTimestamp = file.timestamp
def add(path: String, timestamp: Timestamp): Unit = {
map.put(path, timestamp)
if (timestamp > latestTimestamp) {
latestTimestamp = timestamp
}
}

/**
* Returns true if we should consider this file a new file. The file is only considered "new"
* if it is new enough that we are still tracking, and we have not seen it before.
*/
def isNewFile(file: FileEntry): Boolean = {
def isNewFile(path: String, timestamp: Timestamp): Boolean = {
// Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
// is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
file.timestamp >= lastPurgeTimestamp && !map.containsKey(file.path)
timestamp >= lastPurgeTimestamp && !map.containsKey(path)
}

/** Removes aged entries and returns the number of files removed. */
Expand All @@ -230,8 +231,8 @@ object FileStreamSource {

def size: Int = map.size()

def allEntries: Seq[FileEntry] = {
map.entrySet().asScala.map(entry => FileEntry(entry.getKey, entry.getValue)).toSeq
def allEntries: Seq[(String, Timestamp)] = {
map.asScala.toSeq
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,51 +36,51 @@ class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
test("SeenFilesMap") {
val map = new SeenFilesMap(maxAgeMs = 10)

map.add(FileEntry("a", 5))
map.add("a", 5)
assert(map.size == 1)
map.purge()
assert(map.size == 1)

// Add a new entry and purge should be no-op, since the gap is exactly 10 ms.
map.add(FileEntry("b", 15))
map.add("b", 15)
assert(map.size == 2)
map.purge()
assert(map.size == 2)

// Add a new entry that's more than 10 ms than the first entry. We should be able to purge now.
map.add(FileEntry("c", 16))
map.add("c", 16)
assert(map.size == 3)
map.purge()
assert(map.size == 2)

// Override existing entry shouldn't change the size
map.add(FileEntry("c", 25))
map.add("c", 25)
assert(map.size == 2)

// Not a new file because we have seen c before
assert(!map.isNewFile(FileEntry("c", 20)))
assert(!map.isNewFile("c", 20))

// Not a new file because timestamp is too old
assert(!map.isNewFile(FileEntry("d", 5)))
assert(!map.isNewFile("d", 5))

// Finally a new file: never seen and not too old
assert(map.isNewFile(FileEntry("e", 20)))
assert(map.isNewFile("e", 20))
}

test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
val map = new SeenFilesMap(maxAgeMs = 10)

map.add(FileEntry("a", 20))
map.add("a", 20)
assert(map.size == 1)

// Timestamp 5 should still considered a new file because purge time should be 0
assert(map.isNewFile(FileEntry("b", 9)))
assert(map.isNewFile(FileEntry("b", 10)))
assert(map.isNewFile("b", 9))
assert(map.isNewFile("b", 10))

// Once purge, purge time should be 10 and then b would be a old file if it is less than 10.
map.purge()
assert(!map.isNewFile(FileEntry("b", 9)))
assert(map.isNewFile(FileEntry("b", 10)))
assert(!map.isNewFile("b", 9))
assert(map.isNewFile("b", 10))
}

testWithUninterruptibleThread("do not recheck that files exist during getBatch") {
Expand Down

0 comments on commit 62ccf27

Please sign in to comment.