Skip to content

Commit

Permalink
Effect tries to reconnect 3 times when an error occurs. After 3 times…
Browse files Browse the repository at this point in the history
…, writes the failing batch to another file

Former-commit-id: aa7fd61
  • Loading branch information
Cristina Yenyxe Gonzalez Garcia committed Nov 28, 2012
1 parent 89136f0 commit a69dbf1
Showing 1 changed file with 70 additions and 42 deletions.
112 changes: 70 additions & 42 deletions src/effect/effect_runner.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ int run_effect(char **urls, shared_options_data_t *shared_options, effect_option
{
// Enable nested parallelism and set the number of threads the user has chosen
omp_set_nested(1);
omp_set_num_threads(shared_options->num_threads);

LOG_DEBUG_F("Thread %d processes data\n", omp_get_thread_num());

Expand All @@ -139,20 +138,29 @@ int run_effect(char **urls, shared_options_data_t *shared_options, effect_option
if (shared_options->chain != NULL) {
filters = sort_filter_chain(shared_options->chain, &num_filters);
}
FILE *passed_file = NULL, *failed_file = NULL;
FILE *passed_file = NULL, *failed_file = NULL, *non_processed_file = NULL;
get_output_files(shared_options, &passed_file, &failed_file);

// Filename structure outdir/vcfname.errors
char *prefix_filename = calloc(strlen(shared_options->vcf_filename), sizeof(char));
get_filename_from_path(shared_options->vcf_filename, prefix_filename);
char *non_processed_filename = malloc((strlen(shared_options->output_directory) + strlen(prefix_filename) + 8) * sizeof(char));
sprintf(non_processed_filename, "%s/%s.errors", shared_options->output_directory, prefix_filename);
non_processed_file = fopen(non_processed_filename, "w");
free(non_processed_filename);

start = omp_get_wtime();

int i = 0;
vcf_batch_t *batch = NULL;

int ret_ws_0 = 0, ret_ws_1 = 0, ret_ws_2 = 0;

start = omp_get_wtime();

while ((batch = fetch_vcf_batch(file)) != NULL) {
if (i == 0) {
// Write file format, header entries and delimiter
if (passed_file != NULL) { write_vcf_header(file, passed_file); }
if (failed_file != NULL) { write_vcf_header(file, failed_file); }
if (non_processed_file != NULL) { write_vcf_header(file, non_processed_file); }

LOG_DEBUG("VCF header written\n");
}
Expand All @@ -175,60 +183,71 @@ int run_effect(char **urls, shared_options_data_t *shared_options, effect_option
passed_records = run_filter_chain(input_records, failed_records, filters, num_filters);
}

int reconnections = 0;
int max_reconnections = 3; // TODO allow to configure?

// Write records that passed to a separate file, and query the WS with them as args
if (passed_records->size > 0) {
// Divide the list of passed records in ranges of size defined in config file
int num_chunks;
int *chunk_sizes;
int *chunk_starts = create_chunks(passed_records->size, shared_options->entries_per_thread, &num_chunks, &chunk_sizes);


// OpenMP: Launch a thread for each range
#pragma omp parallel for
for (int j = 0; j < num_chunks; j++) {
LOG_DEBUG_F("[%d] WS invocation\n", omp_get_thread_num());
LOG_DEBUG_F("[%d] -- effect WS\n", omp_get_thread_num());
// printf("batch loaded = '%.*s'\n", 50, batch->text);
// printf("[%d] first chromosome pos = %lu\n", omp_get_thread_num(), ((vcf_record_t*) (passed_records->items + chunk_starts[j])[0])->position);
ret_ws_0 = invoke_effect_ws(urls[0], (vcf_record_t**) (passed_records->items + chunk_starts[j]), chunk_sizes[j], options_data->excludes);
if (!options_data->no_phenotypes) {
LOG_DEBUG_F("[%d] -- snp WS\n", omp_get_thread_num());
ret_ws_1 = invoke_snp_phenotype_ws(urls[1], (vcf_record_t**) (passed_records->items + chunk_starts[j]), chunk_sizes[j]);
LOG_DEBUG_F("[%d] -- mutation WS\n", omp_get_thread_num());
ret_ws_2 = invoke_mutation_phenotype_ws(urls[2], (vcf_record_t**) (passed_records->items + chunk_starts[j]), chunk_sizes[j]);
}
}

free(chunk_starts);
free(chunk_sizes);

LOG_DEBUG_F("*** %dth web services invocation finished\n", i);

if (ret_ws_0 || ret_ws_1 || ret_ws_2) {
if (ret_ws_0) {
LOG_ERROR_F("Effect web service error: %s\n", get_last_http_error(ret_ws_0));
}
if (ret_ws_1) {
LOG_ERROR_F("SNP phenotype web service error: %s\n", get_last_http_error(ret_ws_1));
}
if (ret_ws_2) {
LOG_ERROR_F("Mutations phenotype web service error: %s\n", get_last_http_error(ret_ws_2));
do {
// OpenMP: Launch a thread for each range
#pragma omp parallel for num_threads(shared_options->num_threads)
for (int j = 0; j < num_chunks; j++) {
LOG_DEBUG_F("[%d] WS invocation\n", omp_get_thread_num());
LOG_DEBUG_F("[%d] -- effect WS\n", omp_get_thread_num());
if (!reconnections || ret_ws_0) {
ret_ws_0 = invoke_effect_ws(urls[0], (vcf_record_t**) (passed_records->items + chunk_starts[j]), chunk_sizes[j], options_data->excludes);
}

if (!options_data->no_phenotypes) {
if (!reconnections || ret_ws_1) {
LOG_DEBUG_F("[%d] -- snp WS\n", omp_get_thread_num());
ret_ws_1 = invoke_snp_phenotype_ws(urls[1], (vcf_record_t**) (passed_records->items + chunk_starts[j]), chunk_sizes[j]);
}

if (!reconnections || ret_ws_2) {
LOG_DEBUG_F("[%d] -- mutation WS\n", omp_get_thread_num());
ret_ws_2 = invoke_mutation_phenotype_ws(urls[2], (vcf_record_t**) (passed_records->items + chunk_starts[j]), chunk_sizes[j]);
}
}
}

LOG_FATAL("Can not continue execution after a web service error occurred");
break;
}
LOG_DEBUG_F("*** %dth web services invocation finished\n", i);

if (ret_ws_0 || ret_ws_1 || ret_ws_2) {
if (ret_ws_0) {
LOG_ERROR_F("Effect web service error: %s\n", get_last_http_error(ret_ws_0));
}
if (ret_ws_1) {
LOG_ERROR_F("SNP phenotype web service error: %s\n", get_last_http_error(ret_ws_1));
}
if (ret_ws_2) {
LOG_ERROR_F("Mutations phenotype web service error: %s\n", get_last_http_error(ret_ws_2));
}

// In presence of errors, wait 5 seconds before retrying
reconnections++;
LOG_ERROR_F("Some errors ocurred, reconnection #%d\n", reconnections);
sleep(5);
} else {
free(chunk_starts);
free(chunk_sizes);
}
} while (reconnections < max_reconnections && (ret_ws_0 || ret_ws_1 || ret_ws_2));
}

// Write records that passed and failed to separate files
// Write records that passed and failed filters to separate files
if (passed_file != NULL && failed_file != NULL) {
if (passed_records != NULL && passed_records->size > 0) {
#pragma omp critical
{
for (int r = 0; r < passed_records->size; r++) {
write_vcf_record(passed_records->items[r], passed_file);
}
// write_batch(passed_records, passed_file);
}
}
if (failed_records != NULL && failed_records->size > 0) {
Expand All @@ -237,11 +256,19 @@ int run_effect(char **urls, shared_options_data_t *shared_options, effect_option
for (int r = 0; r < passed_records->size; r++) {
write_vcf_record(failed_records->items[r], failed_file);
}
// write_batch(failed_records, failed_file);
}
}
}

// If the maximum number of reconnections was reached still with errors,
// write the non-processed batch to the corresponding file
if (reconnections == max_reconnections && (ret_ws_0 || ret_ws_1 || ret_ws_2)) {
#pragma omp critical
{
write_vcf_batch(batch, non_processed_file);
}
}

// Free items in both lists (not their internal data)
if (passed_records != input_records) {
LOG_DEBUG_F("[Batch %d] %zu passed records\n", i, passed_records->size);
Expand All @@ -267,6 +294,7 @@ int run_effect(char **urls, shared_options_data_t *shared_options, effect_option
// Free resources
if (passed_file) { fclose(passed_file); }
if (failed_file) { fclose(failed_file); }
if (non_processed_file) { fclose(non_processed_file); }

// Free filters
for (i = 0; i < num_filters; i++) {
Expand Down

0 comments on commit a69dbf1

Please sign in to comment.