Skip to content

Commit

Permalink
[HiveSink]Fix the risk of resource leakage. (apache#6721)
Browse files Browse the repository at this point in the history
Co-authored-by: lightzhao <[email protected]>
  • Loading branch information
lightzhao and lightzhao authored Apr 19, 2024
1 parent 94afa89 commit c23804f
Showing 1 changed file with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,29 @@ public HiveSinkAggregatedCommitter(
@Override
public List<FileAggregatedCommitInfo> commit(
List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
List<FileAggregatedCommitInfo> errorCommitInfos = super.commit(aggregatedCommitInfos);
if (errorCommitInfos.isEmpty()) {
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
Map<String, List<String>> partitionDirAndValuesMap =
aggregatedCommitInfo.getPartitionDirAndValuesMap();
List<String> partitions =
partitionDirAndValuesMap.keySet().stream()
.map(partition -> partition.replaceAll("\\\\", "/"))
.collect(Collectors.toList());
try {
hiveMetaStore.addPartitions(dbName, tableName, partitions);
log.info("Add these partitions {}", partitions);
} catch (TException e) {
log.error("Failed to add these partitions {}", partitions, e);
errorCommitInfos.add(aggregatedCommitInfo);
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(pluginConfig);
try {
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
Map<String, List<String>> partitionDirAndValuesMap =
aggregatedCommitInfo.getPartitionDirAndValuesMap();
List<String> partitions =
partitionDirAndValuesMap.keySet().stream()
.map(partition -> partition.replaceAll("\\\\", "/"))
.collect(Collectors.toList());
try {
hiveMetaStore.addPartitions(dbName, tableName, partitions);
log.info("Add these partitions {}", partitions);
} catch (TException e) {
log.error("Failed to add these partitions {}", partitions, e);
errorCommitInfos.add(aggregatedCommitInfo);
}
}
} finally {
hiveMetaStore.close();
}
}
hiveMetaStore.close();
return errorCommitInfos;
}

Expand Down

0 comments on commit c23804f

Please sign in to comment.