Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

groupTuple randomly returns sorted elements #5579

Open
VasLem opened this issue Dec 5, 2024 · 6 comments
Open

groupTuple randomly returns sorted elements #5579

VasLem opened this issue Dec 5, 2024 · 6 comments

Comments

@VasLem
Copy link

VasLem commented Dec 5, 2024

Bug report

This is entirely for posterity, as the error occurred in 24.09.2-edge, and it has been fixed on latest version 24.11.0-edge . However, I believe this bug is also quite strange by nature, and maybe it had not been observed. It cannot be reproduced outside of Google Cloud, and it has to do only with data coming from an output of another workflow.

For a very specific example:

script within workflow:

    aggregated_dmps_tsvs_by_case.toList().view{"aggregated_dmps_tsvs_by_case :\n\t ${it.join('\n\t')}"}
    aggregated_dmps_tsvs_by_case_input = aggregated_dmps_tsvs_by_case.groupTuple()
    aggregated_dmps_tsvs_by_case_input.toList().view{"aggregated_dmps_tsvs_by_case_input:\n\t ${it.join('\n\t')}"}

Program output

aggregated_dmps_tsvs_by_case :
         [rundir, NET, gs://my-bucket/rundir/aggregated_dmps/NET_by_case.tsv.gz]
        [rundir, PNET, gs://my-bucket/rundir/aggregated_dmps/PNET_by_case.tsv.gz]
        [rundir, LNET, gs://my-bucket/rundir/aggregated_dmps/LNET_by_case.tsv.gz]
        [rundir, SINET, gs://my-bucket/rundir/aggregated_dmps/SINET_by_case.tsv.gz]
aggregated_dmps_tsvs_by_case_input:
         [rundir, [LNET, NET, PNET, SINET], [gs://my-bucket/rundir/aggregated_dmps/NET_by_case.tsv.gz, gs://my-bucket/rundir/aggregated_dmps/PNET_by_case.tsv.gz, gs://my-bucket/rundir/aggregated_dmps/LNET_by_case.tsv.gz, gs://my-bucket/rundir/aggregated_dmps/SINET_by_case.tsv.gz]]

The second list is sorted at the output.

Environment

  • Nextflow version: 24.09.2-edge
  • Java version: 17.0.13
  • Operating system: Linux (GCP)
@bentsherman
Copy link
Member

I think it is a limitation of groupTuple. If you have multiple grouped lists and sorting enabled, each list will be sorted independently of the others. So the sorted lists might not correspond to each other.

I think the fix you're referring to is to disable sorting, in which case the lists will be consistent with each other. Previously this would cause issues with resume but I think this was addressed in 24.10

@VasLem
Copy link
Author

VasLem commented Dec 5, 2024

Hi Ben, thanks for the quick reply. So, I tested again on the new version, it is still there. How to disable sorting? Isn't it false by default ? If I set ".groupTuple(sort:false)" I am getting invalid value for sort:

ERROR ~ Not a valid sort argument: false

@bentsherman
Copy link
Member

Sort is disabled by default, so it should work if you don't specify the sort option

@VasLem
Copy link
Author

VasLem commented Dec 5, 2024

Even weirder:

    aggregated_dmps_tsvs_by_case_input = aggregated_dmps_tsvs_by_case.groupTuple().map{it->[
        it[0], 
        (it[2] as List)]}.map{it->[it[0], it[1].collect{x -> {
            def r = x.split("/").last().split("_").first()
            println "${x} becomes ${r}"
            r
            }},
        it[1]]}
    aggregated_dmps_tsvs_by_case_input.toList().view{"aggregated_dmps_tsvs_by_case_input:\n\t ${it.join('\n\t')}"}

produces

gs://my-bucket/rundir/aggregated_dmps/PNET_by_case.tsv.gz becomes PNET
gs://my-bucket/rundir/aggregated_dmps/LNET_by_case.tsv.gz becomes LNET
gs://my-bucket/rundir/aggregated_dmps/SINET_by_case.tsv.gz becomes SINET
gs://my-bucket/rundir/aggregated_dmps/NET_by_case.tsv.gz becomes NET

[rundir, [LNET, NET, PNET, SINET], [gs://my-bucket/rundir/aggregated_dmps/PNET_by_case.tsv.gz, gs://my-bucket/rundir/aggregated_dmps/LNET_by_case.tsv.gz, gs://my-bucket/rundir/aggregated_dmps/SINET_by_case.tsv.gz, gs://my-bucket/rundir/aggregated_dmps/NET_by_case.tsv.gz]]

..... what is going on? :D I am feeling like in the twilight zone.

Hahah, and if I do

    aggregated_dmps_tsvs_by_case_input = aggregated_dmps_tsvs_by_case.groupTuple().map{it->[
        it[0], 
        (it[2] as List)]}.map{it->[it[0], it[1].collect{x -> {
            def r = x.split("/").last().split("_").first()
            println "${x} becomes ${r}"
            r
            }}, 
                 it[1].collect{x -> {
            def r = x.split("/").last().split("_").first()
            println "${x} becomes ${r}"
            r}},
        it[1]]}
    aggregated_dmps_tsvs_by_case_input.toList().view{"aggregated_dmps_tsvs_by_case_input:\n\t ${it.join('\n\t')}"}

I am getting:


[rundir, [LNET, NET, PNET, SINET], [NET, PNET, LNET, SINET], [gs://my-bucket/rundir/aggregated_dmps/PNET_by_case.tsv.gz, gs://my-bucket/rundir/aggregated_dmps/LNET_by_case.tsv.gz, gs://my-bucket/rundir/aggregated_dmps/SINET_by_case.tsv.gz, gs://my-bucket/rundir/aggregated_dmps/NET_by_case.tsv.gz]]

Still, the third element is a class nextflow.util.ArrayBag , it is not supposed to be randomly accessed, and also, I am converting it to a list (with it[2] as List)

@bentsherman
Copy link
Member

I created this minimal example from your snippets:

inputs = Channel.of(
    ['rundir', 'PNET', file('gs://my-bucket/rundir/aggregated_dmps/PNET_by_case.tsv.gz')],
    ['rundir', 'LNET', file('gs://my-bucket/rundir/aggregated_dmps/LNET_by_case.tsv.gz')],
    ['rundir', 'SINET', file('gs://my-bucket/rundir/aggregated_dmps/SINET_by_case.tsv.gz')],
    ['rundir', 'NET', file('gs://my-bucket/rundir/aggregated_dmps/NET_by_case.tsv.gz')]
)

inputs
    .groupTuple()
    .view()
    .transpose()
    .view()

But I am not seeing any issues with the grouped lists:

$ NXF_VER=24.11.0-edge nextflow main.nf

[rundir, [PNET, LNET, SINET, NET], [/rundir/aggregated_dmps/PNET_by_case.tsv.gz, /rundir/aggregated_dmps/LNET_by_case.tsv.gz, /rundir/aggregated_dmps/SINET_by_case.tsv.gz, /rundir/aggregated_dmps/NET_by_case.tsv.gz]]

[rundir, PNET, /rundir/aggregated_dmps/PNET_by_case.tsv.gz]
[rundir, LNET, /rundir/aggregated_dmps/LNET_by_case.tsv.gz]
[rundir, SINET, /rundir/aggregated_dmps/SINET_by_case.tsv.gz]
[rundir, NET, /rundir/aggregated_dmps/NET_by_case.tsv.gz]

@VasLem
Copy link
Author

VasLem commented Dec 5, 2024

Yes indeed, thanks for trying to reproduce it, I tried the same on my local, it only happens on that example, after it has been produced by a sub-workflow, I cannot reproduce it locally either, only on Google Cloud. The producing workflow goes like that:

workflow mergeDMs {
   take:
           combined_tsvs
           kind
           suffix
           OUTPUT_ROOT
    main:
...
    already_aggregated_dms_tsv_by_case = combined_tsvs.filter { it ->
        file("${OUTPUT_ROOT}/${it[0]}/aggregated_${kind}${suffix}/${it[1]}_by_case.tsv.gz").exists() && 
        file("${OUTPUT_ROOT}/${it[0]}/aggregated_${kind}${suffix}/${it[1]}_by_case.tsv.gz").size() > 10
    }
    aggregated_dms_tsv_by_case_input = combined_tsvs.filter { it ->
        !(file("${OUTPUT_ROOT}/${it[0]}/aggregated_${kind}${suffix}/${it[1]}_by_case.tsv.gz").exists() &&
         file("${OUTPUT_ROOT}/${it[0]}/aggregated_${kind}${suffix}/${it[1]}_by_case.tsv.gz").size() > 10)
    }

    aggregated_dms_tsv_by_case = aggregateCombinedDmsTsvByCase(
        aggregated_dms_tsv_by_case_input.map { it -> [[it[0], it[1]], it[2]] },
        "case",
        params.max_allowed_beta_difference,
        aggregated_dms_tsv_by_case_input.map { it -> "${OUTPUT_ROOT}/${it[0]}/aggregated_${kind}${suffix}/${it[1]}_by_case.tsv.gz" }
    )
    aggregated_dms_tsv_by_case = aggregated_dms_tsv_by_case
        .map { it.flatten() }
        .mix(already_aggregated_dms_tsv_by_case)
        .map { it ->
            [it[0], it[1], "${OUTPUT_ROOT}/${it[0]}/aggregated_${kind}${suffix}/${it[1]}_by_case.tsv.gz"]
        }

    emit:
        aggregated_dms_tsv_by_case = aggregated_dms_tsv_by_case

And then I am retrieving that variable in the parent workflow like that:

    ret = mergeDMs(
        combined_tsvs, "dmps", "", OUTPUT_ROOT)
    aggregated_dmps_tsvs_by_case = ret.aggregated_dms_tsv_by_case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants