Skip to content

Commit

Permalink
[SPARK-17613] S3A base paths with no '/' at the end return empty Data…
Browse files Browse the repository at this point in the history
…Frames

## What changes were proposed in this pull request?

Consider you have a bucket as `s3a://some-bucket`
and under it you have files:
```
s3a://some-bucket/file1.parquet
s3a://some-bucket/file2.parquet
```
Getting the parent path of `s3a://some-bucket/file1.parquet` yields
`s3a://some-bucket/` and the ListingFileCatalog uses this as the key in the hash map.

When catalog.allFiles is called, we use `s3a://some-bucket` (no slash at the end) to get the list of files, and we're left with an empty list!

This PR fixes this by adding a `/` at the end of the `URI` iff the given `Path` doesn't have a parent, i.e. is the root. This is a no-op if the path already had a `/` at the end, and is handled through the Hadoop Path, path merging semantics.

## How was this patch tested?

Unit test in `FileCatalogSuite`.

Author: Burak Yavuz <[email protected]>

Closes apache#15169 from brkyvz/SPARK-17613.
  • Loading branch information
brkyvz authored and JoshRosen committed Sep 22, 2016
1 parent 9f24a17 commit 85d609c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ abstract class PartitioningAwareFileCatalog(
paths.flatMap { path =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val fs = path.getFileSystem(hadoopConf)
val qualifiedPath = fs.makeQualified(path)
val qualifiedPathPre = fs.makeQualified(path)
val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) {
// SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories,
// because the `leafFile.getParent` would have returned an absolute path with the
// separator at the end.
new Path(qualifiedPathPre, Path.SEPARATOR)
} else {
qualifiedPathPre
}

// There are three cases possible with each path
// 1. The path is a directory and has children files in it. Then it must be present in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.spark.sql.execution.datasources

import java.io.File
import java.net.URI

import scala.collection.mutable
import scala.language.reflectiveCalls

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}

import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -78,4 +80,45 @@ class FileCatalogSuite extends SharedSQLContext {
assert(catalog1.listLeafFiles(catalog1.paths).isEmpty)
}
}

test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") {
class MockCatalog(
override val paths: Seq[Path]) extends PartitioningAwareFileCatalog(spark, Map.empty, None) {

override def refresh(): Unit = {}

override def leafFiles: mutable.LinkedHashMap[Path, FileStatus] = mutable.LinkedHashMap(
new Path("mockFs://some-bucket/file1.json") -> new FileStatus()
)

override def leafDirToChildrenFiles: Map[Path, Array[FileStatus]] = Map(
new Path("mockFs://some-bucket/") -> Array(new FileStatus())
)

override def partitionSpec(): PartitionSpec = {
PartitionSpec.emptySpec
}
}

withSQLConf(
"fs.mockFs.impl" -> classOf[FakeParentPathFileSystem].getName,
"fs.mockFs.impl.disable.cache" -> "true") {
val pathWithSlash = new Path("mockFs://some-bucket/")
assert(pathWithSlash.getParent === null)
val pathWithoutSlash = new Path("mockFs://some-bucket")
assert(pathWithoutSlash.getParent === null)
val catalog1 = new MockCatalog(Seq(pathWithSlash))
val catalog2 = new MockCatalog(Seq(pathWithoutSlash))
assert(catalog1.allFiles().nonEmpty)
assert(catalog2.allFiles().nonEmpty)
}
}
}

class FakeParentPathFileSystem extends RawLocalFileSystem {
override def getScheme: String = "mockFs"

override def getUri: URI = {
URI.create("mockFs://some-bucket")
}
}

0 comments on commit 85d609c

Please sign in to comment.