Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I can't use two kafka client in one nest.js server to consume other topics. #13421

Closed
3 of 15 tasks
8471919 opened this issue Apr 12, 2024 · 1 comment
Closed
3 of 15 tasks
Labels
needs triage This issue has not been looked into

Comments

@8471919
Copy link

8471919 commented Apr 12, 2024

Is there an existing issue for this?

  • I have searched the existing issues

Current behavior

hi, I have one problem about @nestjs/microservices package.
When I use two kafka consumer for one kafka brokers in one nest.js server, I want to use the kafka client consuming other topic in same broker.
but they consume every topic in same broker.

so, When nest.js consumes the topic for using @MessagePattern(topic) decorator, the two kafka clients consumes the topic's message at the same time. so event occurs twice at once.

how can I make the kafka clients to consume other topics in one broker?

p.s.) Using same group id can be one of the solution, but I want to use different group id in two kafka client.

this is my example code

// main.ts
async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const configService = app.get<ConfigService>(ConfigService);
  app.connectMicroservice<MicroserviceOptions>(KAFKA_CONSUMER1_OPTION);
  app.connectMicroservice<MicroserviceOptions>(KAFKA_CONSUMER2_OPTION);

  setUpSwagger(app);

  const port = configService.get<string>('PORT') ?? 3000;

  await app.startAllMicroservices();

  await app.listen(port);
}
bootstrap();

// kafka.options.ts
export const KAFKA_CONSUMER1_OPTION: KafkaOptions = {
  transport: Transport.KAFKA,
  options: {
    client: {
      ssl: true,
      sasl: {
        mechanism: 'scram-sha-512',
        username: process.env.KAFKA_SASL_USERNAME!,
        password: process.env.KAFKA_SASL_PASSWORD!,
      },
      brokers: 'broker1-address',
    },
    consumer: {
      groupId: 'consumer1',
      allowAutoTopicCreation: false,
    },
    postfixId: '',
  },
};


export const KAFKA_CONSUMER2_OPTION: KafkaOptions = {
  transport: Transport.KAFKA,
  options: {
    client: {
      ssl: true,
      sasl: {
        mechanism: 'scram-sha-512',
        username: process.env.KAFKA_SASL_USERNAME!,
        password: process.env.KAFKA_SASL_PASSWORD!,
      },
      brokers: 'broker1-address',
    },
    consumer: {
      groupId: 'consumer2',
      allowAutoTopicCreation: false,
    },
    postfixId: '',
  },
};

// kafka.controller.ts
...
@MessagePattern('topic')
async consume(@Payload() message: unknown) {
  console.log(message);

  return 1;
}

and then open the server and publish some message to "topic".
The console.log(message) occurs twice.

this is picture about what I want.
스크린샷 2024-04-12 오후 2 13 16

스크린샷 2024-04-12 오후 2 14 45

this is terminal screen shot

스크린샷 2024-04-12 오후 2 43 25

when I emit one message.

image

the consuming server consumes twice.
Is there any option about this?

Minimum reproduction code

No response (Please Refer to the above code)

Steps to reproduce

No response

Expected behavior

No response (Please Refer to the above code)

Related Issue

I think my problem is related to this issue.
So I suggest make @MessagePattern to use identifying id like @MessagePattern('topic', identifyingId) and add identifyingId to microservice option.

Or
how about to add the topic option to KafkaOptions?
like

export const KAFKA_OPTION: KafkaOptions = {
  transport: Transport.KAFKA,
  options: {
    ...
    consumer: {
      allowAutoTopicCreation: false,
      topic: 'topic1'
    },
    postfixId: '',
  },
};

Package

  • I don't know. Or some 3rd-party package
  • @nestjs/common
  • @nestjs/core
  • @nestjs/microservices
  • @nestjs/platform-express
  • @nestjs/platform-fastify
  • @nestjs/platform-socket.io
  • @nestjs/platform-ws
  • @nestjs/testing
  • @nestjs/websockets
  • Other (see below)

Other package

No response

NestJS version

10.0.0

Packages versions

{
    "@nestjs/common": "^10.0.0",
    "@nestjs/config": "^3.2.0",
    "@nestjs/core": "^10.0.0",
    "@nestjs/microservices": "^10.3.5",
    "@nestjs/platform-express": "^10.0.0",
    "@nestjs/swagger": "^7.3.0",
    "class-transformer": "^0.5.1",
    "class-validator": "^0.14.1",
    "cross-env": "^7.0.3",
    "dotenv": "^16.4.5",
    "kafkajs": "^2.2.4",
    "lodash": "^4.17.21",
    "mime": "^4.0.1",
    "reflect-metadata": "^0.1.13",
    "rxjs": "^7.8.1",
    "swagger-ui-express": "^5.0.0"
  },

Node.js version

20.11.0

In which operating systems have you tested?

  • macOS
  • Windows
  • Linux

Other

No response

@8471919 8471919 added the needs triage This issue has not been looked into label Apr 12, 2024
@kamilmysliwiec
Copy link
Member

Duplicate #11298

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage This issue has not been looked into
Projects
None yet
Development

No branches or pull requests

2 participants