Skip to content

Commit

Permalink
Attempt to mitigate KafkaDslTests without timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
artembilan committed Dec 17, 2024
1 parent 5e3d8d8 commit af4277e
Showing 1 changed file with 2 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,6 @@ void testKafkaAdapters() throws Exception {
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(i + 1);
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo((long) i);
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048633L);
assertThat(headers.get("foo")).isEqualTo("bar");
}

Expand All @@ -211,8 +209,6 @@ void testKafkaAdapters() throws Exception {
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(i + 1);
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo((long) i);
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048644L);
}

Message<String> message = MessageBuilder.withPayload("BAR").setHeader(KafkaHeaders.TOPIC, TEST_TOPIC2).build();
Expand Down Expand Up @@ -360,8 +356,7 @@ public IntegrationFlow sendToKafkaFlow(
.enrichHeaders(h -> h.header(KafkaIntegrationHeaders.FUTURE_TOKEN, "foo"))
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf.handle(
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
.timestampExpression("T(Long).valueOf('1487694048633')"),
kafkaMessageHandler(producerFactory(), TEST_TOPIC1),
e -> e.id("kafkaProducer1")))
.subscribe(sf -> sf.handle(kafkaMessageHandlerTopic2, e -> e.id("kafkaProducer2")))
);
Expand All @@ -370,8 +365,7 @@ public IntegrationFlow sendToKafkaFlow(
@Bean
public KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandlerTopic2() {
return kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
.flush(msg -> true)
.timestamp(m -> 1487694048644L);
.flush(msg -> true);
}

private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
Expand Down

0 comments on commit af4277e

Please sign in to comment.