diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java index 3803e07f88e9..d706b2bf582d 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java @@ -101,7 +101,7 @@ public static CloseableIterable readPaths(ManifestFile manifest, FileIO public static CloseableIterable> readPathsWithSnapshotId( ManifestFile manifest, FileIO io) { return CloseableIterable.transform( - read(manifest, io, null).select(ImmutableList.of("file_path", "snapshot_id")).liveEntries(), + open(manifest, io, null).select(ImmutableList.of("file_path", "snapshot_id")).liveEntries(), entry -> Pair.of(entry.file().path().toString(), entry.snapshotId())); } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseCopyTableSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseCopyTableSparkAction.java index f68304a1482f..e1f0e129502c 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseCopyTableSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/BaseCopyTableSparkAction.java @@ -21,14 +21,19 @@ import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.ManifestEntry; import org.apache.iceberg.ManifestFile; @@ -37,6 +42,7 @@ import org.apache.iceberg.ManifestReader; import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StaticTableOperations; @@ -48,10 +54,26 @@ import org.apache.iceberg.TableMetadataUtil; import org.apache.iceberg.actions.BaseCopyTableActionResult; import org.apache.iceberg.actions.CopyTable; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.orc.GenericOrcWriter; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.DeleteSchemaUtil; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -186,8 +208,11 @@ private void validateInputs() { } if (stagingDir.isEmpty()) { - stagingDir = getMetadataLocation(table) + "copy-table-staging-" + UUID.randomUUID() + "/"; - } else if (!stagingDir.endsWith("/")) { + String stagingDirName = "copy-table-staging-" + UUID.randomUUID(); + stagingDir = Paths.get(table.location(), stagingDirName).toString(); + } + + if (!stagingDir.endsWith("/")) { stagingDir = stagingDir + "/"; } } @@ -347,7 +372,7 @@ private Set getDiffSnapshotIds(Set allSnapshotIds) { private Set rewriteVersionFiles(TableMetadata metadata) { Set allSnapshotIds = Sets.newHashSet(); - String stagingPath = stagingPath(endVersion, stagingDir); + String stagingPath = stagingPath(relativize(endVersion, sourcePrefix), stagingDir); metadata.snapshots().forEach(snapshot -> allSnapshotIds.add(snapshot.snapshotId())); rewriteVersionFile(metadata, stagingPath); @@ -361,7 +386,7 @@ private Set rewriteVersionFiles(TableMetadata metadata) { Preconditions.checkArgument( fileExist(versionFilePath), String.format("Version file %s doesn't exist", versionFilePath)); - String newPath = stagingPath(versionFilePath, stagingDir); + String newPath = stagingPath(relativize(versionFilePath, sourcePrefix), stagingDir); TableMetadata tableMetadata = new StaticTableOperations(versionFilePath, table.io()).current(); @@ -393,7 +418,7 @@ private void rewriteVersionFile(TableMetadata metadata, String stagingPath) { private void rewriteManifestList(Snapshot snapshot, TableMetadata tableMetadata) { List manifestFiles = manifestFilesInSnapshot(snapshot); String path = snapshot.manifestListLocation(); - String stagingPath = stagingPath(path, stagingDir); + String stagingPath = stagingPath(relativize(path, sourcePrefix), stagingDir); OutputFile outputFile = table.io().newOutputFile(stagingPath); try (FileAppender writer = ManifestLists.write( @@ -492,28 +517,54 @@ private static MapPartitionsFunction toManifests( return rows -> { List manifests = Lists.newArrayList(); while (rows.hasNext()) { - manifests.add( - writeManifest( - rows.next(), io, stagingLocation, format, specsById, sourcePrefix, targetPrefix)); + ManifestFile manifestFile = rows.next(); + + switch (manifestFile.content()) { + case DATA: + manifests.add( + writeDataManifest( + manifestFile, + stagingLocation, + io, + format, + specsById, + sourcePrefix, + targetPrefix)); + break; + case DELETES: + manifests.add( + writeDeleteManifest( + manifestFile, + stagingLocation, + io, + format, + specsById, + sourcePrefix, + targetPrefix)); + break; + default: + throw new UnsupportedOperationException( + "Unsupported manifest type: " + manifestFile.content()); + } } return manifests.iterator(); }; } - private static ManifestFile writeManifest( + private static ManifestFile writeDataManifest( ManifestFile manifestFile, - Broadcast io, String stagingLocation, + Broadcast io, int format, Map specsById, String sourcePrefix, String targetPrefix) throws IOException { - - String stagingPath = stagingPath(manifestFile.path(), stagingLocation); - OutputFile outputFile = io.value().newOutputFile(stagingPath); PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); + String stagingPath = + stagingPath(relativize(manifestFile.path(), sourcePrefix), stagingLocation); + OutputFile outputFile = io.value().newOutputFile(stagingPath); ManifestWriter writer = ManifestFiles.write(format, spec, outputFile, manifestFile.snapshotId()); @@ -521,37 +572,214 @@ private static ManifestFile writeManifest( ManifestFiles.read(manifestFile, io.getValue(), specsById).select(Arrays.asList("*"))) { reader .entries() - .forEach(entry -> appendEntry(entry, writer, spec, sourcePrefix, targetPrefix)); + .forEach( + entry -> + appendEntryWithFile( + entry, writer, newDataFile(entry.file(), spec, sourcePrefix, targetPrefix))); } finally { writer.close(); } + return writer.toManifestFile(); + } + + private static DataFile newDataFile( + DataFile file, PartitionSpec spec, String sourcePrefix, String targetPrefix) { + DataFile transformedFile = file; + String filePath = file.path().toString(); + if (filePath.startsWith(sourcePrefix)) { + filePath = newPath(filePath, sourcePrefix, targetPrefix); + transformedFile = DataFiles.builder(spec).copy(file).withPath(filePath).build(); + } + return transformedFile; + } + + private static ManifestFile writeDeleteManifest( + ManifestFile manifestFile, + String stagingLocation, + Broadcast io, + int format, + Map specsById, + String sourcePrefix, + String targetPrefix) + throws IOException { + PartitionSpec spec = specsById.get(manifestFile.partitionSpecId()); + String manifestStagingPath = + stagingPath(relativize(manifestFile.path(), sourcePrefix), stagingLocation); + OutputFile manifestOutputFile = io.value().newOutputFile(manifestStagingPath); + ManifestWriter writer = + ManifestFiles.writeDeleteManifest( + format, spec, manifestOutputFile, manifestFile.snapshotId()); + + try (ManifestReader reader = + ManifestFiles.readDeleteManifest(manifestFile, io.getValue(), specsById) + .select(Arrays.asList("*"))) { + + for (ManifestEntry entry : reader.entries()) { + DeleteFile file = entry.file(); + + switch (file.content()) { + case POSITION_DELETES: + String deleteFileStagingPath = + stagingPath(relativize(file.path().toString(), sourcePrefix), stagingLocation); + rewritePositionDeleteFile( + io, file, deleteFileStagingPath, spec, sourcePrefix, targetPrefix); + appendEntryWithFile( + entry, writer, newDeleteFile(file, spec, sourcePrefix, targetPrefix)); + break; + case EQUALITY_DELETES: + appendEntryWithFile( + entry, writer, newDeleteFile(file, spec, sourcePrefix, targetPrefix)); + break; + default: + throw new UnsupportedOperationException( + "Unsupported delete file type: " + file.content()); + } + } + } finally { + writer.close(); + } return writer.toManifestFile(); } - private static void appendEntry( - ManifestEntry entry, - ManifestWriter writer, + private static DeleteFile newDeleteFile( + DeleteFile file, PartitionSpec spec, String sourcePrefix, String targetPrefix) { + DeleteFile transformedFile = file; + String filePath = file.path().toString(); + if (filePath.startsWith(sourcePrefix)) { + filePath = newPath(filePath, sourcePrefix, targetPrefix); + transformedFile = FileMetadata.deleteFileBuilder(spec).copy(file).withPath(filePath).build(); + } + return transformedFile; + } + + private static PositionDelete newPositionDeleteRecord( + Record record, String sourcePrefix, String targetPrefix) { + PositionDelete delete = PositionDelete.create(); + delete.set( + newPath((String) record.get(0), sourcePrefix, targetPrefix), + (Long) record.get(1), + record.get(2)); + return delete; + } + + private static DeleteFile rewritePositionDeleteFile( + Broadcast io, + DeleteFile current, + String path, PartitionSpec spec, String sourcePrefix, - String targetPrefix) { - DataFile dataFile = entry.file(); - String dataFilePath = dataFile.path().toString(); - if (dataFilePath.startsWith(sourcePrefix)) { - dataFilePath = newPath(dataFilePath, sourcePrefix, targetPrefix); - dataFile = DataFiles.builder(spec).copy(entry.file()).withPath(dataFilePath).build(); + String targetPrefix) + throws IOException { + OutputFile targetFile = io.value().newOutputFile(path); + InputFile sourceFile = io.value().newInputFile(current.path().toString()); + + try (CloseableIterable reader = + positionDeletesReader(sourceFile, current.format(), spec)) { + Record record = null; + Schema rowSchema = null; + CloseableIterator recordIt = reader.iterator(); + + if (recordIt.hasNext()) { + record = recordIt.next(); + rowSchema = record.get(2) != null ? spec.schema() : null; + } + + PositionDeleteWriter writer = + positionDeletesWriter(targetFile, current.format(), spec, current.partition(), rowSchema); + + try { + if (record != null) { + writer.write(newPositionDeleteRecord(record, sourcePrefix, targetPrefix)); + } + + while (recordIt.hasNext()) { + record = recordIt.next(); + writer.write(newPositionDeleteRecord(record, sourcePrefix, targetPrefix)); + } + } finally { + writer.close(); + } + return writer.toDeleteFile(); + } + } + + private static CloseableIterable positionDeletesReader( + InputFile inputFile, FileFormat format, PartitionSpec spec) throws IOException { + Schema deleteSchema = DeleteSchemaUtil.posDeleteSchema(spec.schema()); + switch (format) { + case AVRO: + return Avro.read(inputFile) + .project(deleteSchema) + .reuseContainers() + .createReaderFunc(DataReader::create) + .build(); + + case PARQUET: + return Parquet.read(inputFile) + .project(deleteSchema) + .reuseContainers() + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema)) + .build(); + + case ORC: + return ORC.read(inputFile) + .project(deleteSchema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(deleteSchema, fileSchema)) + .build(); + + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); + } + } + + private static PositionDeleteWriter positionDeletesWriter( + OutputFile outputFile, + FileFormat format, + PartitionSpec spec, + StructLike partition, + Schema rowSchema) + throws IOException { + switch (format) { + case AVRO: + return Avro.writeDeletes(outputFile) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case PARQUET: + return Parquet.writeDeletes(outputFile) + .createWriterFunc(GenericParquetWriter::buildWriter) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + case ORC: + return ORC.writeDeletes(outputFile) + .createWriterFunc(GenericOrcWriter::buildWriter) + .withPartition(partition) + .rowSchema(rowSchema) + .withSpec(spec) + .buildPositionWriter(); + default: + throw new UnsupportedOperationException("Unsupported file format: " + format); } + } + private static > void appendEntryWithFile( + ManifestEntry entry, ManifestWriter writer, F file) { switch (entry.status()) { case ADDED: - writer.add(dataFile); + writer.add(file); break; case EXISTING: writer.existing( - dataFile, entry.snapshotId(), entry.dataSequenceNumber(), entry.fileSequenceNumber()); + file, entry.snapshotId(), entry.dataSequenceNumber(), entry.fileSequenceNumber()); break; case DELETED: - writer.delete(dataFile, entry.dataSequenceNumber(), entry.fileSequenceNumber()); + writer.delete(file, entry.dataSequenceNumber(), entry.fileSequenceNumber()); break; } } @@ -579,18 +807,26 @@ private boolean fileExist(String path) { return table.io().newInputFile(path).exists(); } + private static String relativize(String path, String prefix) { + if (!path.startsWith(prefix)) { + throw new IllegalArgumentException( + String.format("Path %s does not start with %s", path, prefix)); + } + return path.replaceFirst(prefix, ""); + } + + private static String stagingPath(String originalPath, String stagingLocation) { + return Paths.get(stagingLocation, originalPath).toString(); + } + private static String newPath(String path, String sourcePrefix, String targetPrefix) { - return path.replaceFirst(sourcePrefix, targetPrefix); + return Paths.get(targetPrefix, relativize(path, sourcePrefix)).toString(); } private void addToRebuiltFiles(String path) { metadataFilesToMove.add(path); } - private static String stagingPath(String originalPath, String stagingLocation) { - return stagingLocation + fileName(originalPath); - } - private String currentMetadataPath(Table tbl) { return ((HasTableOperations) tbl).operations().current().metadataFileLocation(); } @@ -603,18 +839,4 @@ private static String fileName(String path) { } return filename; } - - private String getMetadataLocation(Table tbl) { - String currentMetadataPath = - ((HasTableOperations) tbl).operations().current().metadataFileLocation(); - int lastIndex = currentMetadataPath.lastIndexOf(File.separator); - String metadataDir = ""; - if (lastIndex != -1) { - metadataDir = currentMetadataPath.substring(0, lastIndex + 1); - } - - Preconditions.checkArgument( - !metadataDir.isEmpty(), "Failed to get the metadata file root directory"); - return metadataDir; - } } diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestCopyTableAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestCopyTableAction.java index 7d44ff0e3e33..f79b04c5b648 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestCopyTableAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestCopyTableAction.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -38,6 +39,7 @@ import org.apache.iceberg.actions.ActionsProvider; import org.apache.iceberg.actions.CopyTable; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.FileHelpers; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -46,6 +48,7 @@ import org.apache.iceberg.spark.SparkTestBase; import org.apache.iceberg.spark.source.ThreeColumnRecord; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -264,6 +267,50 @@ public void testDeleteDataFile() throws Exception { resultDF.as(Encoders.bean(ThreeColumnRecord.class)).count()); } + @Test + public void testWithDeleteManifests() throws Exception { + String location = newTableLocation(); + Table sourceTable = createATableWith2Snapshots(location); + String targetLocation = newTableLocation(); + + List> deletes = + Lists.newArrayList( + Pair.of( + sourceTable + .currentSnapshot() + .addedDataFiles(sourceTable.io()) + .iterator() + .next() + .path(), + 0L)); + + File file = new File(removePrefix(sourceTable.location()) + "/data/deeply/nested/file.parquet"); + DeleteFile positionDeletes = + FileHelpers.writeDeleteFile( + table, sourceTable.io().newOutputFile(file.toURI().toString()), deletes) + .first(); + + Dataset resultDF = spark.read().format("iceberg").load(location); + sourceTable.newRowDelta().addDeletes(positionDeletes).commit(); + + CopyTable.Result result = + actions().copyTable(sourceTable).rewriteLocationPrefix(location, targetLocation).execute(); + + // We have one more snapshot, an additional manifest list, and a new (delete) manifest + checkMetadataFileNum(4, 3, 3, result); + // We have one additional file for positional deletes + checkDataFileNum(3, result); + + // copy the metadata files and data files + moveTableFiles(location, targetLocation, stagingDir(result)); + + // Positional delete affects a single row, so only one row must remain + Assert.assertEquals( + "The number of rows should be", + 1, + spark.read().format("iceberg").load(targetLocation).count()); + } + @Test public void testFullTableCopyWithDeletedVersionFiles() throws Exception { String location = newTableLocation(); @@ -690,8 +737,8 @@ private void moveTableFiles(String sourceDir, String targetDir, String stagingDi throws Exception { FileUtils.copyDirectory( new File(removePrefix(sourceDir) + "data/"), new File(removePrefix(targetDir) + "/data/")); - FileUtils.copyDirectory( - new File(removePrefix(stagingDir)), new File(removePrefix(targetDir) + "/metadata/")); + // Copy staged metadata files, overwrite previously copied data files with staged ones + FileUtils.copyDirectory(new File(removePrefix(stagingDir)), new File(removePrefix(targetDir))); } private String removePrefix(String path) {