diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 57f4fa22f7b74..6d08150e478f2 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -189,14 +189,14 @@ function run_group_2 { run_test "Streaming SQL end-to-end test using planner with Scala version" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh scala-planner" "skip_check_exceptions" run_test "Sql Jdbc Driver end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_jdbc_driver.sh" "skip_check_exceptions" - if [[ ${PROFILE} != *"enable-adaptive-scheduler"* ]]; then # FLINK-21400 - run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local StreamingFileSink" "skip_check_exceptions" - run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 StreamingFileSink" "skip_check_exceptions" - run_test "New File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local FileSink" "skip_check_exceptions" - run_test "New File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 FileSink" "skip_check_exceptions" - - run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4" - fi + run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local StreamingFileSink" "skip_check_exceptions" + run_test "Streaming File Sink (Minio) end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh minio StreamingFileSink" "skip_check_exceptions" + run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 StreamingFileSink" "skip_check_exceptions" + run_test "New File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local FileSink" "skip_check_exceptions" + run_test "New File Sink end-to-end (Minio) test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh minio FileSink" "skip_check_exceptions" + run_test "New File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 FileSink" "skip_check_exceptions" + + run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4" run_test "Netty shuffle direct memory consumption end-to-end test" "$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh" diff --git a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh index a56ce4e9410d3..d3d08392caac7 100644 --- a/flink-end-to-end-tests/test-scripts/common_s3_minio.sh +++ b/flink-end-to-end-tests/test-scripts/common_s3_minio.sh @@ -54,7 +54,7 @@ function s3_start { while [[ "$(docker inspect -f {{.State.Running}} "$MINIO_CONTAINER_ID")" -ne "true" ]]; do sleep 0.1 done - export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | sed s'/0\.0\.0\.0/localhost/')" + export S3_ENDPOINT="http://$(docker port "$MINIO_CONTAINER_ID" 9000 | grep -F '0.0.0.0' | sed s'/0\.0\.0\.0/localhost/')" echo "Started minio @ $S3_ENDPOINT" on_exit s3_stop } @@ -115,4 +115,4 @@ function s3_setup_with_provider { set_config_key "s3.path-style-access" "true" } -source "$(dirname "$0")"/common_s3_operations.sh \ No newline at end of file +source "$(dirname "$0")"/common_s3_operations.sh diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_file_sink.sh index 711f74b66729f..79b97495b7ef5 100755 --- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh +++ b/flink-end-to-end-tests/test-scripts/test_file_sink.sh @@ -20,53 +20,16 @@ OUT_TYPE="${1:-local}" # other type: s3 SINK_TO_TEST="${2:-"StreamingFileSink"}" -S3_PREFIX=temp/test_file_sink-$(uuidgen) -OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX" -S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX" source "$(dirname "$0")"/common.sh -if [ "${OUT_TYPE}" == "s3" ]; then - source "$(dirname "$0")"/common_s3.sh -else - echo "S3 environment is not loaded for non-s3 test runs (test run type: $OUT_TYPE)." -fi - -# randomly set up openSSL with dynamically/statically linked libraries -OPENSSL_LINKAGE=$(if (( RANDOM % 2 )) ; then echo "dynamic"; else echo "static"; fi) -echo "Executing test with ${OPENSSL_LINKAGE} openSSL linkage (random selection between 'dynamic' and 'static')" - -s3_setup hadoop -set_conf_ssl "mutual" "OPENSSL" "${OPENSSL_LINKAGE}" -set_config_key "metrics.fetcher.update-interval" "2000" -# this test relies on global failovers -set_config_key "jobmanager.execution.failover-strategy" "full" - +# OUTPUT_PATH is a local folder that can be used as a download folder for remote data +# the helper functions will access this folder +RANDOM_PREFIX="temp/test_file_sink-$(uuidgen)" +OUTPUT_PATH="$TEST_DATA_DIR/${RANDOM_PREFIX}" mkdir -p $OUTPUT_PATH -if [ "${OUT_TYPE}" == "local" ]; then - echo "Use local output" - JOB_OUTPUT_PATH=${OUTPUT_PATH} -elif [ "${OUT_TYPE}" == "s3" ]; then - echo "Use s3 output" - JOB_OUTPUT_PATH=${S3_OUTPUT_PATH} - set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/$S3_PREFIX-chk" - mkdir -p "$OUTPUT_PATH-chk" -else - echo "Unknown output type: ${OUT_TYPE}" - exit 1 -fi - -# make sure we delete the file at the end -function out_cleanup { - s3_delete_by_full_path_prefix "$S3_PREFIX" - s3_delete_by_full_path_prefix "${S3_PREFIX}-chk" - rollback_openssl_lib -} -if [ "${OUT_TYPE}" == "s3" ]; then - on_exit out_cleanup -fi - -TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar" +# JOB_OUTPUT_PATH is the location where the job writes its data to +JOB_OUTPUT_PATH="${OUTPUT_PATH}" ################################### # Get all lines in part files and sort them numerically. @@ -79,9 +42,6 @@ TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram. # sorted content of part files ################################### function get_complete_result { - if [ "${OUT_TYPE}" == "s3" ]; then - s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$S3_PREFIX" "part-" true - fi find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g } @@ -89,20 +49,67 @@ function get_complete_result { # Get total number of lines in part files. # # Globals: -# S3_PREFIX +# OUTPUT_PATH # Arguments: # None # Returns: # line number in part files ################################### function get_total_number_of_valid_lines { - if [ "${OUT_TYPE}" == "local" ]; then - get_complete_result | wc -l | tr -d '[:space:]' - elif [ "${OUT_TYPE}" == "s3" ]; then - s3_get_number_of_lines_by_prefix "${S3_PREFIX}" "part-" - fi + get_complete_result | wc -l | tr -d '[:space:]' } +if [ "${OUT_TYPE}" == "local" ]; then + echo "[INFO] Test run in local environment: No S3 environment is not loaded." +elif [ "${OUT_TYPE}" == "s3" ] || [ "${OUT_TYPE}" == "minio" ]; then + if [ "${OUT_TYPE}" == "s3" ]; then + source "$(dirname "$0")"/common_s3.sh + else + source "$(dirname "$0")"/common_s3_minio.sh + fi + + s3_setup hadoop + + # overwrites JOB_OUTPUT_PATH to point to S3 + S3_DATA_PREFIX="${RANDOM_PREFIX}" + S3_CHECKPOINT_PREFIX="${RANDOM_PREFIX}-chk" + JOB_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/${S3_DATA_PREFIX}" + set_config_key "state.checkpoints.dir" "s3://$IT_CASE_S3_BUCKET/${S3_CHECKPOINT_PREFIX}" + + # overwrites implementation for local runs + function get_complete_result { + # copies the data from S3 to the local OUTPUT_PATH + s3_get_by_full_path_and_filename_prefix "$OUTPUT_PATH" "$FILE_SINK_TEST_TEMP_SUBFOLDER" "part-" true + + # and prints the sorted output + find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g + } + + # overwrites implementation for local runs + function get_total_number_of_valid_lines { + s3_get_number_of_lines_by_prefix "${FILE_SINK_TEST_TEMP_SUBFOLDER}" "part-" + } + + # make sure we delete the file at the end + function out_cleanup { + s3_delete_by_full_path_prefix "${S3_DATA_PREFIX}" + s3_delete_by_full_path_prefix "${S3_CHECKPOINT_PREFIX}" + } + + on_exit out_cleanup +else + echo "[ERROR] Unknown out type: ${OUT_TYPE}" + exit 1 +fi + +# randomly set up openSSL with dynamically/statically linked libraries + +set_config_key "metrics.fetcher.update-interval" "2000" +# this test relies on global failovers +set_config_key "jobmanager.execution.failover-strategy" "full" + +TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar" + ################################### # Waits until a number of values have been written within a timeout. # If the timeout expires, exit with return code 1.