Skip to content

Commit

Permalink
[FLINK-29114][connector][filesystem] Fix issue of file overwriting ca…
Browse files Browse the repository at this point in the history
…used by multiple writes to the same sink table and shared staging directory

This closes #24390

* Fix unstable TableSourceITCase#testTableHintWithLogicalTableScanReuse
* Moves the staging dir configuration into builder for easier testing

---------

Co-authored-by: Matthias Pohl <[email protected]>
  • Loading branch information
LadyForest and XComp authored Mar 13, 2024
1 parent 398bb50 commit 7d0111d
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -679,17 +679,18 @@ Method <org.apache.flink.connector.file.table.DynamicPartitionWriter.write(java.
Method <org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapReader(org.apache.flink.connector.file.src.reader.BulkFormat$Reader, org.apache.flink.connector.file.src.FileSourceSplit)> calls method <org.apache.flink.table.utils.PartitionPathUtils.convertStringToInternalValue(java.lang.String, org.apache.flink.table.types.DataType)> in (FileInfoExtractorBulkFormat.java:156)
Method <org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapReader(org.apache.flink.connector.file.src.reader.BulkFormat$Reader, org.apache.flink.connector.file.src.FileSourceSplit)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)> in (FileInfoExtractorBulkFormat.java:140)
Method <org.apache.flink.connector.file.table.FileSystemCommitter.commitPartitionsWithFiles(java.util.Map)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)> in (FileSystemCommitter.java:146)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:288)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:289)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:290)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:291)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:292)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:324)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:325)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:326)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:327)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:328)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setOutputFileConfig(org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in (FileSystemOutputFormat.java:0)
Method <org.apache.flink.connector.file.table.FileSystemTableSink$TableBucketAssigner.getBucketId(org.apache.flink.table.data.RowData, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner$Context)> calls method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (FileSystemTableSink.java:566)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setStagingPath(org.apache.flink.core.fs.Path)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FileSystemOutputFormat.java:291)
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat.createStagingDirectory(org.apache.flink.core.fs.Path)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (FileSystemOutputFormat.java:109)
Method <org.apache.flink.connector.file.table.FileSystemTableSink$TableBucketAssigner.getBucketId(org.apache.flink.table.data.RowData, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner$Context)> calls method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (FileSystemTableSink.java:553)
Method <org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream, org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, boolean)> in (FileSystemTableSink.java:208)
Method <org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream, org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSystemTableSink.java:189)
Method <org.apache.flink.connector.file.table.FileSystemTableSink.createStreamingSink(org.apache.flink.table.connector.ProviderContext, org.apache.flink.streaming.api.datastream.DataStream, org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSystemTableSink.java:233)
Method <org.apache.flink.connector.file.table.FileSystemTableSink.toStagingPath()> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (FileSystemTableSink.java:380)
Method <org.apache.flink.connector.file.table.FileSystemTableSource.listPartitions()> calls method <org.apache.flink.table.utils.PartitionPathUtils.searchPartSpecAndPaths(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, int)> in (FileSystemTableSource.java:328)
Method <org.apache.flink.connector.file.table.FileSystemTableSource.paths()> has return type <[Lorg.apache.flink.core.fs.Path;> in (FileSystemTableSource.java:0)
Method <org.apache.flink.connector.file.table.FileSystemTableSource.paths()> references method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (FileSystemTableSource.java:295)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.connector.file.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.OutputFormat;
Expand All @@ -28,12 +29,14 @@
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -56,7 +59,7 @@ public class FileSystemOutputFormat<T>
private final TableMetaStoreFactory msFactory;
private final boolean overwrite;
private final boolean isToLocal;
private final Path tmpPath;
private final Path stagingPath;
private final String[] partitionColumns;
private final boolean dynamicGrouped;
private final LinkedHashMap<String, String> staticPartitions;
Expand All @@ -74,7 +77,7 @@ private FileSystemOutputFormat(
TableMetaStoreFactory msFactory,
boolean overwrite,
boolean isToLocal,
Path tmpPath,
Path stagingPath,
String[] partitionColumns,
boolean dynamicGrouped,
LinkedHashMap<String, String> staticPartitions,
Expand All @@ -87,7 +90,7 @@ private FileSystemOutputFormat(
this.msFactory = msFactory;
this.overwrite = overwrite;
this.isToLocal = isToLocal;
this.tmpPath = tmpPath;
this.stagingPath = stagingPath;
this.partitionColumns = partitionColumns;
this.dynamicGrouped = dynamicGrouped;
this.staticPartitions = staticPartitions;
Expand All @@ -96,6 +99,22 @@ private FileSystemOutputFormat(
this.outputFileConfig = outputFileConfig;
this.identifier = identifier;
this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;

createStagingDirectory(this.stagingPath);
}

private static void createStagingDirectory(Path stagingPath) {
try {
final FileSystem stagingFileSystem = stagingPath.getFileSystem();
Preconditions.checkState(
!stagingFileSystem.exists(stagingPath),
"Staging dir %s already exists",
stagingPath);
stagingFileSystem.mkdirs(stagingPath);
} catch (IOException e) {
throw new RuntimeException(
"An IO error occurred while accessing the staging FileSystem.", e);
}
}

@Override
Expand All @@ -108,7 +127,7 @@ public void finalizeGlobal(FinalizationContext context) {
Thread.currentThread().getContextClassLoader(),
() -> {
try {
return fsFactory.create(tmpPath.toUri());
return fsFactory.create(stagingPath.toUri());
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -120,7 +139,7 @@ public void finalizeGlobal(FinalizationContext context) {
fsFactory,
msFactory,
overwrite,
tmpPath,
stagingPath,
partitionColumns.length,
isToLocal,
identifier,
Expand All @@ -141,7 +160,7 @@ public void finalizeGlobal(FinalizationContext context) {
throw new TableException("Exception in finalizeGlobal", e);
} finally {
try {
fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);
fsFactory.create(stagingPath.toUri()).delete(stagingPath, true);
} catch (IOException ignore) {
}
}
Expand All @@ -158,7 +177,7 @@ public void open(InitializationContext context) throws IOException {
PartitionTempFileManager fileManager =
new PartitionTempFileManager(
fsFactory,
tmpPath,
stagingPath,
context.getTaskNumber(),
context.getAttemptNumber(),
outputFileConfig);
Expand Down Expand Up @@ -203,7 +222,7 @@ public static class Builder<T> {
private String[] partitionColumns;
private OutputFormatFactory<T> formatFactory;
private TableMetaStoreFactory metaStoreFactory;
private Path tmpPath;
private Path stagingPath;

private LinkedHashMap<String, String> staticPartitions = new LinkedHashMap<>();
private boolean dynamicGrouped = false;
Expand Down Expand Up @@ -258,11 +277,23 @@ public Builder<T> setIsToLocal(boolean isToLocal) {
return this;
}

public Builder<T> setTempPath(Path tmpPath) {
this.tmpPath = tmpPath;
public Builder<T> setPath(Path parentPath) {
this.stagingPath = toStagingPath(parentPath);
return this;
}

@VisibleForTesting
Builder<T> setStagingPath(Path stagingPath) {
this.stagingPath = stagingPath;
return this;
}

private Path toStagingPath(Path parentPath) {
return new Path(
parentPath,
String.format(".staging_%d_%s", System.currentTimeMillis(), UUID.randomUUID()));
}

public Builder<T> setPartitionComputer(PartitionComputer<T> computer) {
this.computer = computer;
return this;
Expand All @@ -288,15 +319,15 @@ public FileSystemOutputFormat<T> build() {
checkNotNull(partitionColumns, "partitionColumns should not be null");
checkNotNull(formatFactory, "formatFactory should not be null");
checkNotNull(metaStoreFactory, "metaStoreFactory should not be null");
checkNotNull(tmpPath, "tmpPath should not be null");
checkNotNull(stagingPath, "stagingPath should not be null");
checkNotNull(computer, "partitionComputer should not be null");

return new FileSystemOutputFormat<>(
fileSystemFactory,
metaStoreFactory,
overwrite,
isToLocal,
tmpPath,
stagingPath,
partitionColumns,
dynamicGrouped,
staticPartitions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private DataStreamSink<RowData> createBatchSink(
.setMetaStoreFactory(new EmptyMetaStoreFactory(path))
.setOverwrite(overwrite)
.setStaticPartitions(staticPartitions)
.setTempPath(toStagingPath())
.setPath(path)
.setOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("part-" + UUID.randomUUID())
Expand Down Expand Up @@ -373,19 +373,6 @@ public DynamicTableSource.DataStructureConverter createDataStructureConverter(
};
}

private Path toStagingPath() {
Path stagingDir = new Path(path, ".staging_" + System.currentTimeMillis());
try {
FileSystem fs = stagingDir.getFileSystem();
Preconditions.checkState(
fs.exists(stagingDir) || fs.mkdirs(stagingDir),
"Failed to create staging dir " + stagingDir);
return stagingDir;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@SuppressWarnings("unchecked")
private OutputFormatFactory<RowData> createOutputFormatFactory(Context sinkContext) {
Object writer = createWriter(sinkContext);
Expand Down
Loading

0 comments on commit 7d0111d

Please sign in to comment.