-
Notifications
You must be signed in to change notification settings - Fork 858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[enhance][pulsar] add apache-pulsar client support #5926
Merged
Merged
Changes from 50 commits
Commits
Show all changes
51 commits
Select commit
Hold shift + click to select a range
693b89b
add apache-pulsar support
dao-jun 7086bd8
remove repositories
dao-jun 29cc849
checkstyle
dao-jun 1ed1686
checkstyle
dao-jun e347fa8
bug fix
dao-jun 883c401
bug fix
dao-jun 47e9f1c
bug fix
dao-jun 9cc4f45
fix pulsar producer send back
dao-jun 09052da
review fix
dao-jun 2e127e6
checkstyle fix
dao-jun 975671c
checkstyle fix
dao-jun f22dd03
Merge branch 'main' of https://github.com/tjiuming/opentelemetry-java…
dao-jun 0845f0e
merge master into current
dao-jun 34c425a
codestyle fix
dao-jun 1e6aa3d
codestyle fix
dao-jun d23932b
complete tests
dao-jun 57c7b74
Merge branch 'main' into dev/pulsar
dao-jun 79ca3a2
Refactor with Instrument API
dao-jun 6755ae6
Fix tests
dao-jun 2f5ab6f
Fix checkstyle
dao-jun a7f3da7
Fix checkstyle
dao-jun 1f6faed
Fix checkstyle
dao-jun a2fe9a3
Fix checkstyle
dao-jun b87b21a
Fix checkstyle
dao-jun 92f5c72
Fix instrumentation
dao-jun 63438b6
review fix
dao-jun 40c1115
fix checkstyle
dao-jun 7184126
fix tests
dao-jun 333f636
fix tests
dao-jun db57a7b
fix tests
dao-jun 3de32af
fix tests
dao-jun 01b616a
fix tests
dao-jun 5207cc9
fix tests
dao-jun 1f307c6
fix checkstyle
dao-jun fbeb11e
fix instrumentation & test
dao-jun ab5f0c5
fix instrumentation
dao-jun e0f214d
fix test `test send non-partitioned topic`
dao-jun 8328ca1
fix test `test send non-partitioned topic`
dao-jun 5a6917b
fix test `test send non-partitioned topic`
dao-jun c8b7551
fix all tests
dao-jun 7fe7329
fix all tests
dao-jun 8dd1207
fix tests
dao-jun b704cd8
fix tests
dao-jun 90268c2
fix tests
dao-jun 2e02517
fix tests
dao-jun 5c05c29
fix tests codenarc
dao-jun cdc53bd
review fix
dao-jun 399d63f
fix code format
dao-jun cbd934f
Merge branch 'main' into dev/pulsar
dao-jun 2d44361
fix deprecated semantic attributes
dao-jun 9d0af62
review fix
dao-jun File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
20 changes: 20 additions & 0 deletions
20
instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/build.gradle.kts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
plugins { | ||
id("otel.javaagent-instrumentation") | ||
} | ||
|
||
muzzle { | ||
pass { | ||
group.set("org.apache.pulsar") | ||
module.set("pulsar-client") | ||
versions.set("[2.8.0,)") | ||
assertInverse.set(true) | ||
} | ||
} | ||
|
||
dependencies { | ||
library("org.apache.pulsar:pulsar-client:2.8.0") | ||
|
||
testImplementation("javax.annotation:javax.annotation-api:1.3.2") | ||
testImplementation("org.testcontainers:pulsar:1.17.1") | ||
testImplementation("org.apache.pulsar:pulsar-client-admin:2.8.0") | ||
} |
165 changes: 165 additions & 0 deletions
165
...va/io/opentelemetry/javaagent/instrumentation/pulsar/v28/ConsumerImplInstrumentation.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.javaagent.instrumentation.pulsar.v28; | ||
|
||
import static io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry.PulsarSingletons.startAndEndConsumerReceive; | ||
import static net.bytebuddy.matcher.ElementMatchers.isConstructor; | ||
import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
import static net.bytebuddy.matcher.ElementMatchers.isProtected; | ||
import static net.bytebuddy.matcher.ElementMatchers.isPublic; | ||
import static net.bytebuddy.matcher.ElementMatchers.named; | ||
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; | ||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
|
||
import io.opentelemetry.context.Context; | ||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; | ||
import java.util.concurrent.CompletableFuture; | ||
import net.bytebuddy.asm.Advice; | ||
import net.bytebuddy.description.type.TypeDescription; | ||
import net.bytebuddy.matcher.ElementMatcher; | ||
import org.apache.pulsar.client.api.Consumer; | ||
import org.apache.pulsar.client.api.Message; | ||
import org.apache.pulsar.client.api.PulsarClient; | ||
import org.apache.pulsar.client.impl.PulsarClientImpl; | ||
|
||
public class ConsumerImplInstrumentation implements TypeInstrumentation { | ||
|
||
@Override | ||
public ElementMatcher<TypeDescription> typeMatcher() { | ||
return namedOneOf( | ||
"org.apache.pulsar.client.impl.ConsumerImpl", | ||
"org.apache.pulsar.client.impl.MultiTopicsConsumerImpl"); | ||
} | ||
|
||
@Override | ||
public void transform(TypeTransformer transformer) { | ||
String klassName = ConsumerImplInstrumentation.class.getName(); | ||
|
||
transformer.applyAdviceToMethod(isConstructor(), klassName + "$ConsumerConstructorAdviser"); | ||
|
||
// internalReceive will apply to Consumer#receive(long,TimeUnit) | ||
// and called before MessageListener#receive. | ||
transformer.applyAdviceToMethod( | ||
isMethod() | ||
.and(isProtected()) | ||
.and(named("internalReceive")) | ||
.and(takesArguments(2)) | ||
.and(takesArgument(1, named("java.util.concurrent.TimeUnit"))), | ||
klassName + "$ConsumerInternalReceiveAdviser"); | ||
// receive/batchReceive will apply to Consumer#receive()/Consumer#batchReceive() | ||
transformer.applyAdviceToMethod( | ||
isMethod() | ||
.and(isPublic()) | ||
.and(namedOneOf("receive", "batchReceive")) | ||
.and(takesArguments(0)), | ||
klassName + "$ConsumerSyncReceiveAdviser"); | ||
// receiveAsync/batchReceiveAsync will apply to | ||
// Consumer#receiveAsync()/Consumer#batchReceiveAsync() | ||
transformer.applyAdviceToMethod( | ||
isMethod() | ||
.and(isPublic()) | ||
.and(namedOneOf("receiveAsync", "batchReceiveAsync")) | ||
.and(takesArguments(0)), | ||
klassName + "$ConsumerAsyncReceiveAdviser"); | ||
} | ||
|
||
@SuppressWarnings("unused") | ||
public static class ConsumerConstructorAdviser { | ||
private ConsumerConstructorAdviser() {} | ||
|
||
@Advice.OnMethodExit(suppress = Throwable.class) | ||
public static void after( | ||
@Advice.This Consumer<?> consumer, @Advice.Argument(value = 0) PulsarClient client) { | ||
|
||
PulsarClientImpl pulsarClient = (PulsarClientImpl) client; | ||
String url = pulsarClient.getLookup().getServiceUrl(); | ||
VirtualFieldStore.inject(consumer, url); | ||
} | ||
} | ||
|
||
@SuppressWarnings("unused") | ||
public static class ConsumerInternalReceiveAdviser { | ||
private ConsumerInternalReceiveAdviser() {} | ||
|
||
@Advice.OnMethodEnter | ||
public static void before(@Advice.Local(value = "startTime") long startTime) { | ||
startTime = System.currentTimeMillis(); | ||
} | ||
|
||
@Advice.OnMethodExit(onThrowable = Throwable.class) | ||
public static void after( | ||
@Advice.This Consumer<?> consumer, | ||
@Advice.Return Message<?> message, | ||
@Advice.Thrown Throwable t, | ||
@Advice.Local(value = "startTime") long startTime) { | ||
if (t != null) { | ||
return; | ||
} | ||
|
||
Context parent = Context.current(); | ||
Context current = startAndEndConsumerReceive(parent, message, startTime, consumer); | ||
if (current != null) { | ||
// ConsumerBase#internalReceive(long,TimeUnit) will be called before | ||
// ConsumerListener#receive(Consumer,Message), so, need to inject Context into Message. | ||
VirtualFieldStore.inject(message, current); | ||
} | ||
} | ||
} | ||
|
||
@SuppressWarnings("unused") | ||
public static class ConsumerSyncReceiveAdviser { | ||
private ConsumerSyncReceiveAdviser() {} | ||
|
||
@Advice.OnMethodEnter | ||
public static void before(@Advice.Local(value = "startTime") long startTime) { | ||
startTime = System.currentTimeMillis(); | ||
} | ||
|
||
@Advice.OnMethodExit(onThrowable = Throwable.class) | ||
public static void after( | ||
@Advice.This Consumer<?> consumer, | ||
@Advice.Return Message<?> message, | ||
@Advice.Thrown Throwable t, | ||
@Advice.Local(value = "startTime") long startTime) { | ||
if (t != null) { | ||
return; | ||
} | ||
|
||
Context parent = Context.current(); | ||
startAndEndConsumerReceive(parent, message, startTime, consumer); | ||
// No need to inject context to message. | ||
} | ||
} | ||
|
||
@SuppressWarnings("unused") | ||
public static class ConsumerAsyncReceiveAdviser { | ||
private ConsumerAsyncReceiveAdviser() {} | ||
|
||
@Advice.OnMethodEnter | ||
public static void before(@Advice.Local(value = "startTime") long startTime) { | ||
startTime = System.currentTimeMillis(); | ||
} | ||
|
||
@Advice.OnMethodExit(onThrowable = Throwable.class) | ||
public static void after( | ||
@Advice.This Consumer<?> consumer, | ||
@Advice.Return CompletableFuture<Message<?>> future, | ||
@Advice.Local(value = "startTime") long startTime) { | ||
future.whenComplete( | ||
(message, t) -> { | ||
if (t != null) { | ||
return; | ||
} | ||
|
||
Context parent = Context.current(); | ||
startAndEndConsumerReceive(parent, message, startTime, consumer); | ||
// No need to inject context to message. | ||
}); | ||
} | ||
} | ||
} |
43 changes: 43 additions & 0 deletions
43
...in/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/MessageInstrumentation.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.javaagent.instrumentation.pulsar.v28; | ||
|
||
import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
import static net.bytebuddy.matcher.ElementMatchers.isPublic; | ||
import static net.bytebuddy.matcher.ElementMatchers.named; | ||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
|
||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; | ||
import net.bytebuddy.asm.Advice; | ||
import net.bytebuddy.description.type.TypeDescription; | ||
import net.bytebuddy.matcher.ElementMatcher; | ||
import org.apache.pulsar.client.api.Message; | ||
|
||
public class MessageInstrumentation implements TypeInstrumentation { | ||
@Override | ||
public ElementMatcher<TypeDescription> typeMatcher() { | ||
return named("org.apache.pulsar.client.impl.MessageImpl"); | ||
} | ||
|
||
@Override | ||
public void transform(TypeTransformer transformer) { | ||
transformer.applyAdviceToMethod( | ||
isMethod().and(isPublic()).and(named("recycle")).and(takesArguments(0)), | ||
MessageInstrumentation.class.getName() + "$MessageRecycleAdvice"); | ||
} | ||
|
||
@SuppressWarnings("unused") | ||
public static class MessageRecycleAdvice { | ||
private MessageRecycleAdvice() {} | ||
|
||
@Advice.OnMethodExit | ||
public static void after(@Advice.This Message<?> message) { | ||
// Clean context to prevent memory leak. | ||
VirtualFieldStore.inject(message, null); | ||
} | ||
} | ||
} |
96 changes: 96 additions & 0 deletions
96
...io/opentelemetry/javaagent/instrumentation/pulsar/v28/MessageListenerInstrumentation.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.javaagent.instrumentation.pulsar.v28; | ||
|
||
import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
import static net.bytebuddy.matcher.ElementMatchers.isPublic; | ||
import static net.bytebuddy.matcher.ElementMatchers.named; | ||
|
||
import io.opentelemetry.api.common.Attributes; | ||
import io.opentelemetry.context.Context; | ||
import io.opentelemetry.context.Scope; | ||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; | ||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; | ||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; | ||
import io.opentelemetry.javaagent.instrumentation.pulsar.v28.telemetry.PulsarSingletons; | ||
import net.bytebuddy.asm.Advice; | ||
import net.bytebuddy.description.type.TypeDescription; | ||
import net.bytebuddy.implementation.bytecode.assign.Assigner; | ||
import net.bytebuddy.matcher.ElementMatcher; | ||
import org.apache.pulsar.client.api.Consumer; | ||
import org.apache.pulsar.client.api.Message; | ||
import org.apache.pulsar.client.api.MessageListener; | ||
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; | ||
|
||
public class MessageListenerInstrumentation implements TypeInstrumentation { | ||
|
||
@Override | ||
public ElementMatcher<TypeDescription> typeMatcher() { | ||
// return hasSuperType(named("org.apache.pulsar.client.api.MessageListener")); | ||
// can't enhance MessageListener here like above due to jvm can't enhance lambda. | ||
return named("org.apache.pulsar.client.impl.conf.ConsumerConfigurationData"); | ||
} | ||
|
||
@Override | ||
public void transform(TypeTransformer transformer) { | ||
transformer.applyAdviceToMethod( | ||
isMethod().and(isPublic()).and(named("getMessageListener")), | ||
MessageListenerInstrumentation.class.getName() + "$ConsumerConfigurationDataMethodAdviser"); | ||
} | ||
|
||
@SuppressWarnings("unused") | ||
public static class ConsumerConfigurationDataMethodAdviser { | ||
private ConsumerConfigurationDataMethodAdviser() {} | ||
|
||
@Advice.OnMethodExit | ||
public static void after( | ||
@Advice.This ConsumerConfigurationData<?> data, | ||
@Advice.Return(readOnly = false, typing = Assigner.Typing.DYNAMIC) | ||
MessageListener<?> listener) { | ||
if (null == listener) { | ||
tjiuming marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return; | ||
} | ||
|
||
listener = new MessageListenerWrapper<>(listener); | ||
} | ||
} | ||
|
||
public static class MessageListenerWrapper<T> implements MessageListener<T> { | ||
private static final long serialVersionUID = 1L; | ||
|
||
private final MessageListener<T> delegator; | ||
|
||
public MessageListenerWrapper(MessageListener<T> messageListener) { | ||
this.delegator = messageListener; | ||
} | ||
|
||
@Override | ||
public void received(Consumer<T> consumer, Message<T> msg) { | ||
Context parent = VirtualFieldStore.extract(msg); | ||
|
||
Instrumenter<Message<?>, Attributes> instrumenter = | ||
PulsarSingletons.consumerListenerInstrumenter(); | ||
if (!instrumenter.shouldStart(parent, msg)) { | ||
this.delegator.received(consumer, msg); | ||
return; | ||
} | ||
|
||
Context current = instrumenter.start(parent, msg); | ||
try (Scope scope = current.makeCurrent()) { | ||
this.delegator.received(consumer, msg); | ||
instrumenter.end(current, msg, null, null); | ||
} catch (Throwable t) { | ||
instrumenter.end(current, msg, null, t); | ||
throw t; | ||
} | ||
} | ||
|
||
@Override | ||
public void reachedEndOfTopic(Consumer<T> consumer) { | ||
this.delegator.reachedEndOfTopic(consumer); | ||
} | ||
} | ||
} |
20 changes: 20 additions & 0 deletions
20
...ent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v28/ProducerData.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
/* | ||
* Copyright The OpenTelemetry Authors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package io.opentelemetry.javaagent.instrumentation.pulsar.v28; | ||
|
||
public class ProducerData { | ||
public final String url; | ||
public final String topic; | ||
|
||
private ProducerData(String url, String topic) { | ||
this.url = url; | ||
this.topic = topic; | ||
} | ||
|
||
public static ProducerData create(String url, String topic) { | ||
return new ProducerData(url, topic); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you try MessageListener and it didn't work? there is some magic that should enhance lambdas: #4182
hasSuperType
is a more expensive matcher compared tonamed
however, so the current implementation could still be betterThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, too. The current implementation works fine, so it needn't to change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many thanks for your review!