You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello, we need some advice on how to increase throughput in our Pulsar consumers. Here are some details:
We run 1 consumer per pod in a kubernetes cluster
We run in the Shared subscription mode. Strict ordering does not matter for us
To keep debugging simple we have not enabled batching
We've been using the listener pattern
We've found that our messages are being processed sequentially, which leads to poor throughput. We need to speed things up a bit. What we are wondering is what is the recommended way to do so. I've attached two options we are considering below
Increases the number of consumers and uses the listener pattern
Uses the receiver pattern with multiple workers
We'd like to understand what the community considers best practice and why. Thank you :)
Running multiple consumers per pod
import { faker } from '@faker-js/faker';
import Pulsar from 'pulsar-client';
process.env.ENVIRONMENT = 'development';
process.env.PULSAR_SERVICE_URL = 'pulsar://localhost:6650';
const PULSAR_TOPIC = `test-${faker.string.alpha(10)}`;
const PULSAR_SUBSCRIPTION = `sub-${PULSAR_TOPIC}`;
const CONCURRENCY = 5;
const SEND_NUMBER = 10;
async function handleMessage(
message: Pulsar.Message,
consumer: Pulsar.Consumer,
): Promise<void> {
console.log('Received message: ', message.getData().toString());
await new Promise((resolve) => setTimeout(resolve, 1000));
await consumer.acknowledge(message);
}
async function main() {
const client = new Pulsar.Client({
serviceUrl: process.env.PULSAR_SERVICE_URL as string,
log: logconfig(),
messageListenerThreads: CONCURRENCY,
});
console.log('Topic: ', PULSAR_TOPIC);
console.log('Subscription: ', PULSAR_SUBSCRIPTION);
// Create the main consumer
const consumers = [];
const counter = new Map<string, number>();
const subscriptionType = 'Shared';
const ackTimeoutMs = 10_000;
const nAckRedeliverTimeoutMs = 2_000;
const batchIndexAckEnabled = false;
for (let i = 0; i < CONCURRENCY; i += 1) {
const consumer = await client.subscribe({
topic: PULSAR_TOPIC,
subscription: PULSAR_SUBSCRIPTION,
subscriptionType,
ackTimeoutMs,
nAckRedeliverTimeoutMs,
receiverQueueSize: 10,
batchIndexAckEnabled,
listener: (message, consumer) => handleMessage(message, consumer),
});
consumers.push(consumer);
}
// Send messages
const producer = await client.createProducer({ topic: PULSAR_TOPIC });
for (let i = 0; i < SEND_NUMBER; i += 1) {
const msg = `test-message-${i}`;
counter.set(msg, 0);
await producer.send({ data: Buffer.from(msg) });
}
// Sleep 20 seconds to wait for the messages to be processed
await new Promise((resolve) => setTimeout(resolve, 50000));
await producer.close();
for (const consumer of consumers) {
await consumer.close();
}
process.exit(0);
}
void main();
function logconfig() {
return (level: any, _file: any, _line: any, message: any) => {
switch (level) {
case Pulsar.LogLevel.DEBUG:
console.debug(message);
break;
case Pulsar.LogLevel.INFO:
console.info(message);
break;
case Pulsar.LogLevel.WARN:
console.warn(message);
break;
case Pulsar.LogLevel.ERROR:
console.error(message);
break;
}
};
}
Increasing concurrency per consumer
import Pulsar from 'pulsar-client';
import logger from '../../utils/logger';
process.env.ENVIRONMENT = 'development';
process.env.PULSAR_SERVICE_URL = 'pulsar://localhost:6650';
const PULSAR_TOPIC = `test-${faker.string.alpha(10)}`;
const PULSAR_SUBSCRIPTION = `sub-${PULSAR_TOPIC}`;
const CONCURRENCY = 5;
const SEND_NUMBER = 10;
async function handleMessage(
message: Pulsar.Message,
consumer: Pulsar.Consumer,
): Promise<void> {
console.log('Received message: ', message.getData().toString());
await new Promise((resolve) => setTimeout(resolve, 1000));
await consumer.acknowledge(message);
}
async function main() {
const client = new Pulsar.Client({
serviceUrl: process.env.PULSAR_SERVICE_URL as string,
log: logconfig()
});
console.log('Topic: ', PULSAR_TOPIC);
console.log('Subscription: ', PULSAR_SUBSCRIPTION);
// Create the main consumer
const consumers = [];
const counter = new Map<string, number>();
const subscriptionType = 'Shared';
const ackTimeoutMs = 10_000;
const nAckRedeliverTimeoutMs = 2_000;
const batchIndexAckEnabled = false;
const consumer = await client.subscribe({
topic: PULSAR_TOPIC,
subscription: PULSAR_SUBSCRIPTION,
subscriptionType,
ackTimeoutMs,
nAckRedeliverTimeoutMs,
receiverQueueSize: 10,
batchIndexAckEnabled,
});
await listen(
consumer,
async (consumer, message) => handleMessage(message, consumer),
CONCURRENCY,
);
// Send messages
const producer = await client.createProducer({ topic: PULSAR_TOPIC });
for (let i = 0; i < SEND_NUMBER; i += 1) {
const msg = `test-message-${i}`;
counter.set(msg, 0);
await producer.send({ data: Buffer.from(msg) });
}
// Sleep 20 seconds to wait for the messages to be processed
await new Promise((resolve) => setTimeout(resolve, 50000));
await producer.close();
await consumer.close();
process.exit(0);
}
void main();
/**
* Receive messages from a Pulsar consumer and process them concurrently.
*
* @param consumer - Pulsar consumer to receive messages from.
* @param listener - Message handler function.
* @param concurrency - Maximum number of messages to process at a time.
*/
export async function listen(
consumer: Pulsar.Consumer,
listener: (
consumer: Pulsar.Consumer,
message: Pulsar.Message,
) => Promise<void>,
concurrency = 1,
): Promise<void> {
const workers = new Array<Promise<void>>();
for (let i = 0; i < concurrency; i++) {
const worker = async () => {
for (;;) {
try {
const message = await consumer.receive();
await listener(consumer, message);
} catch (err: any) {
logger.error(`Message processing error: ${err.message}`);
}
}
};
workers.push(worker());
}
await Promise.all(workers);
}
function logconfig() {
return (level: any, _file: any, _line: any, message: any) => {
switch (level) {
case Pulsar.LogLevel.DEBUG:
console.debug(message);
break;
case Pulsar.LogLevel.INFO:
console.info(message);
break;
case Pulsar.LogLevel.WARN:
console.warn(message);
break;
case Pulsar.LogLevel.ERROR:
console.error(message);
break;
}
};
}
The text was updated successfully, but these errors were encountered:
Hello, we need some advice on how to increase throughput in our Pulsar consumers. Here are some details:
Shared
subscription mode. Strict ordering does not matter for uslistener
patternWe've found that our messages are being processed sequentially, which leads to poor throughput. We need to speed things up a bit. What we are wondering is what is the recommended way to do so. I've attached two options we are considering below
We'd like to understand what the community considers best practice and why. Thank you :)
Running multiple consumers per pod
Increasing concurrency per consumer
The text was updated successfully, but these errors were encountered: