Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34324][test] Makes all s3 related operations being declared and called in a single location #24463

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,14 @@ function run_group_2 {
run_test "Streaming SQL end-to-end test using planner with Scala version" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh scala-planner" "skip_check_exceptions"
run_test "Sql Jdbc Driver end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_jdbc_driver.sh" "skip_check_exceptions"

if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then # FLINK-21400
run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local StreamingFileSink" "skip_check_exceptions"
run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 StreamingFileSink" "skip_check_exceptions"
run_test "New File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local FileSink" "skip_check_exceptions"
run_test "New File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 FileSink" "skip_check_exceptions"

run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
fi
run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local StreamingFileSink" "skip_check_exceptions"
run_test "Streaming File Sink (Minio) end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh minio StreamingFileSink" "skip_check_exceptions"
run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 StreamingFileSink" "skip_check_exceptions"
run_test "New File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local FileSink" "skip_check_exceptions"
run_test "New File Sink end-to-end (Minio) test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh minio FileSink" "skip_check_exceptions"
run_test "New File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 FileSink" "skip_check_exceptions"

run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"

run_test "Netty shuffle direct memory consumption end-to-end test" "$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh"

Expand Down
4 changes: 2 additions & 2 deletions flink-end-to-end-tests/test-scripts/common_s3_minio.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ function s3_start {
while [[ "$(docker inspect -f {{.State.Running}} "$MINIO_CONTAINER_ID")" -ne "true" ]]; do
sleep 0.1
done
export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | sed s'/0\.0\.0\.0/localhost/')"
export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | grep -F '0.0.0.0' | sed s'/0\.0\.0\.0/localhost/')"
echo "Started minio @ $S3_ENDPOINT"
on_exit s3_stop
}
Expand Down Expand Up @@ -115,4 +115,4 @@ function s3_setup_with_provider {
set_config_key "s3.path-style-access" "true"
}

source "$(dirname "$0")"/common_s3_operations.sh
source "$(dirname "$0")"/common_s3_operations.sh
111 changes: 59 additions & 52 deletions flink-end-to-end-tests/test-scripts/test_file_sink.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,53 +20,16 @@
OUT_TYPE="${1:-local}" # other type: s3
SINK_TO_TEST="${2:-"StreamingFileSink"}"

S3_PREFIX=temp/test_file_sink-$(uuidgen)
OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
source "$(dirname "$0")"/common.sh

if [ "${OUT_TYPE}" == "s3" ]; then
source "$(dirname "$0")"/common_s3.sh
else
echo "S3 environment is not loaded for non-s3 test runs (test run type: $OUT_TYPE)."
fi

# randomly set up openSSL with dynamically/statically linked libraries
OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; fi)
echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')"

s3_setup hadoop
set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}"
set_config_key "metrics.fetcher.update-interval" "2000"
# this test relies on global failovers
set_config_key "jobmanager.execution.failover-strategy" "full"

# OUTPUT_PATH is a local folder that can be used as a download folder for remote data
# the helper functions will access this folder
RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)"
OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}"
mkdir -p $OUTPUT_PATH

if [ "${OUT_TYPE}" == "local" ]; then
echo "Use local output"
JOB_OUTPUT_PATH=${OUTPUT_PATH}
elif [ "${OUT_TYPE}" == "s3" ]; then
echo "Use s3 output"
JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk"
mkdir -p "$OUTPUT_PATH-chk"
else
echo "Unknown output type: ${OUT_TYPE}"
exit 1
fi

# make sure we delete the file at the end
function out_cleanup {
s3_delete_by_full_path_prefix "$S3_PREFIX"
s3_delete_by_full_path_prefix "${S3_PREFIX}-chk"
rollback_openssl_lib
}
if [ "${OUT_TYPE}" == "s3" ]; then
on_exit out_cleanup
fi

TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
# JOB_OUTPUT_PATH is the location where the job writes its data to
JOB_OUTPUT_PATH="${OUTPUT_PATH}"

###################################
# Get all lines in part files and sort them numerically.
Expand All @@ -79,30 +42,74 @@ TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.
# sorted content of part files
###################################
function get_complete_result {
if [ "${OUT_TYPE}" == "s3" ]; then
s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" "part-" true
fi
find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
}

###################################
# Get total number of lines in part files.
#
# Globals:
# S3_PREFIX
# OUTPUT_PATH
# Arguments:
# None
# Returns:
# line number in part files
###################################
function get_total_number_of_valid_lines {
if [ "${OUT_TYPE}" == "local" ]; then
get_complete_result | wc -l | tr -d '[:space:]'
elif [ "${OUT_TYPE}" == "s3" ]; then
s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-"
fi
get_complete_result | wc -l | tr -d '[:space:]'
}

if [ "${OUT_TYPE}" == "local" ]; then
echo "[INFO] Test run in local environment: No S3 environment is not loaded."
elif [ "${OUT_TYPE}" == "s3" ] || [ "${OUT_TYPE}" == "minio" ]; then
if [ "${OUT_TYPE}" == "s3" ]; then
source "$(dirname "$0")"/common_s3.sh
else
source "$(dirname "$0")"/common_s3_minio.sh
fi

s3_setup hadoop

# overwrites JOB_OUTPUT_PATH to point to S3
S3_DATA_PREFIX="${RANDOM_PREFIX}"
S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk"
JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}"
set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}"

# overwrites implementation for local runs
function get_complete_result {
# copies the data from S3 to the local OUTPUT_PATH
s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$FILE_SINK_TEST_TEMP_SUBFOLDER" "part-" true

# and prints the sorted output
find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
}

# overwrites implementation for local runs
function get_total_number_of_valid_lines {
s3_get_number_of_lines_by_prefix "${FILE_SINK_TEST_TEMP_SUBFOLDER}" "part-"
}

# make sure we delete the file at the end
function out_cleanup {
s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}"
s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}"
}

on_exit out_cleanup
else
echo "[ERROR] Unknown out type: ${OUT_TYPE}"
exit 1
fi

# randomly set up openSSL with dynamically/statically linked libraries

set_config_key "metrics.fetcher.update-interval" "2000"
# this test relies on global failovers
set_config_key "jobmanager.execution.failover-strategy" "full"

TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"

###################################
# Waits until a number of values have been written within a timeout.
# If the timeout expires, exit with return code 1.
Expand Down