Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context propagation lost using parallel streams #12000

Open
claudio-canellada-edo opened this issue Aug 12, 2024 · 8 comments
Open

Context propagation lost using parallel streams #12000

claudio-canellada-edo opened this issue Aug 12, 2024 · 8 comments

Comments

@claudio-canellada-edo
Copy link

Hi, it seems that the context propagation is not done when using parallel streams, as only one of the child threads are getting it. I'm testing this using version 2.6.0 of the opentelemetry-instrumentation-api artifact.

This is the test:

public class StreamsTest {

    private static final ContextKey<Double> ANY_DOUBLE_KEY = ContextKey.named("any-double-key");
    private static final ContextKey<String> ANY_STRING_KEY = ContextKey.named("any-string-key");
    private static final Double ANY_DOUBLE_VALUE = 17.0d;
    private static final String ANY_STRING_VALUE = "any-string-value";
    
    @Test
    public void contextTest() throws Exception {
        // The scope is auto closable
        try (Scope scope = Context.current() // Get the current context to add values
                .with(ANY_DOUBLE_KEY, ANY_DOUBLE_VALUE) // Add any double value
                .makeCurrent()) {  // A scope is created when we set the new context as current

            // If we want to add another key/value we need to create a new scope
            try (Scope innerScope = Context.current()
                    .with(ANY_STRING_KEY, ANY_STRING_VALUE) // Add any string value
                    .makeCurrent()) {

                executeSomethingWithStreams();
            }
        }
    }

    private void executeSomethingWithStreams() {
        Integer sum = IntStream.rangeClosed(0, 3)
                .parallel()
                .peek(this::assertContext)
                .reduce(0, Integer::sum);

        assertEquals(sum, Integer.valueOf(6));
    }

    private void assertContext(int i) {
        System.out.printf("I'm process %d and my double value from context is %s%n", i, Context.current().get(ANY_DOUBLE_KEY));
    }
}

And the output:

I'm process 2 and my double value from context is 17.0
I'm process 0 and my double value from context is null
I'm process 1 and my double value from context is null
I'm process 3 and my double value from context is null
@heyams
Copy link
Contributor

heyams commented Aug 16, 2024

can you try this:

 try (Scope ignored = Context.current().makeCurrent()) { // **makeCurrent()**
   assert Context.current() == ctx;
   ...
 }

@trask
Copy link
Member

trask commented Aug 19, 2024

I'm testing this using version 2.6.0 of the opentelemetry-instrumentation-api artifact.

are you also using the Java agent?

@ajaparicio
Copy link

Yes, we have the java agent too

@ajaparicio
Copy link

can you try this:

 try (Scope ignored = Context.current().makeCurrent()) { // **makeCurrent()**
   assert Context.current() == ctx;
   ...
 }

Hi @heyams, I don't know what is "ctx" to create the assert
Sorry

@heyams
Copy link
Contributor

heyams commented Aug 20, 2024

@ajaparicio don't worry about that assert statement, can you try this instead:

    private void executeSomethingWithStreams() {
	Context context = Context.current();
        Integer sum = IntStream.rangeClosed(0, 3)
                .parallel()
                .peek(i -> {
		    try (Scope scope = context.makeCurrent()) {
	                 assertContext(i);
		    }
                 })
                .reduce(0, Integer::sum);
        assertEquals(sum, Integer.valueOf(6));
    }

please let me know if that works.

@ajaparicio
Copy link

@heyams yes, setting the scope and then call the method works, thanks! We can use this workaround. Do you thing in the future we will have the propagation automatically?

@heyams
Copy link
Contributor

heyams commented Aug 21, 2024

Context propagation is done automatically in these instrumentations

It is not handled automatically in this case. I don't know if it can be implemented in the future. It will require some investigation.

@ajaparicio
Copy link

Thank you for the response, @heyams.
In that case, we should keep this issue open until we find an implementation solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants