Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add rdkafka microservice #13535 #13538

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
28 changes: 14 additions & 14 deletions integration/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,36 +1,36 @@
version: "3"
version: '3'

services:
redis:
container_name: test-redis
image: redis
ports:
- "6379:6379"
- '6379:6379'
restart: always
nats:
container_name: test-nats
image: nats
ports:
- "8222:8222"
- "4222:4222"
- "6222:6222"
- '8222:8222'
- '4222:4222'
- '6222:6222'
restart: always
mqtt:
container_name: test-mqtt
image: eclipse-mosquitto
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf
ports:
- "1883:1883"
- "9001:9001"
- '1883:1883'
- '9001:9001'
restart: always
mysql:
image: mysql:8.3.0
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: test
ports:
- "3306:3306"
- '3306:3306'
restart: always
mongodb:
container_name: test-mongodb
Expand All @@ -42,17 +42,17 @@ services:
rabbit:
container_name: test-rabbit
hostname: rabbit
image: "rabbitmq:management"
image: 'rabbitmq:management'
ports:
- "15672:15672"
- "5672:5672"
- '15672:15672'
- '5672:5672'
tty: true
zookeeper:
container_name: test-zookeeper
hostname: zookeeper
image: confluentinc/cp-zookeeper:7.5.3
ports:
- "2181:2181"
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
Expand All @@ -63,8 +63,8 @@ services:
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
- '29092:29092'
- '9092:9092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
Expand Down
2 changes: 1 addition & 1 deletion integration/microservices/e2e/sum-kafka.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { KafkaMessagesController } from '../src/kafka/kafka.messages.controller'
* Skip this flaky test in CI/CD pipeline as it frequently
* fails to connect to Kafka container in the cloud.
*/
describe.skip('Kafka transport', function () {
describe('Kafka transport', function () {
let server: any;
let app: INestApplication;

Expand Down
170 changes: 170 additions & 0 deletions integration/microservices/e2e/sum-rd-kafka.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { INestApplication } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as request from 'supertest';
import { BusinessDto } from '../src/rd-kafka/dtos/business.dto';
import { UserDto } from '../src/rd-kafka/dtos/user.dto';
import { UserEntity } from '../src/rd-kafka/entities/user.entity';
import { RdKafkaController } from '../src/rd-kafka/rd-kafka.controller';
import { RdKafkaMessagesController } from '../src/rd-kafka/rd-kafka.messages.controller';

describe('RdKafka transport', function () {
let server: any;
let app: INestApplication;

// set timeout to be longer (especially for the after hook)
this.timeout(50000);
this.retries(10);

before(`Start Kafka app`, async () => {
const module = await Test.createTestingModule({
controllers: [RdKafkaController, RdKafkaMessagesController],
}).compile();

app = module.createNestApplication();
server = app.getHttpAdapter().getInstance();

app.connectMicroservice<MicroserviceOptions>({
transport: Transport.RD_KAFKA,
options: {
client: {
'metadata.broker.list': 'localhost:9092'
}
},
});
app.enableShutdownHooks();
await app.startAllMicroservices();
await app.init();
});

it(`/POST (async event notification)`, done => {
request(server)
.post('/notify')
.send()
.end(() => {
setTimeout(() => {
expect(RdKafkaController.IS_NOTIFIED).to.be.true;
done();
}, 1000);
});
});

it(`/POST (async event notification with key)`, done => {
request(server)
.post('/notifyWithKey')
.send()
.end(() => {
setTimeout(() => {
expect(RdKafkaController.IS_NOTIFIED_WITH_KEY).to.be.true;
done();
}, 1000);
});
});

it(`/POST (async event notification with key and headers)`, done => {
request(server)
.post('/notifyWithKeyAndHeaders')
.send()
.end(() => {
setTimeout(() => {
expect(RdKafkaController.IS_NOTIFIED_WITH_KEY_AND_HEADERS).to.be.true;
done();
}, 1000);
});
});

it(`/POST (async event notification with key and many headers)`, done => {
request(server)
.post('/notifyWithKeyAndManyHeaders')
.send()
.end(() => {
setTimeout(() => {
expect(RdKafkaController.IS_NOTIFIED_WITH_KEY_AND_MANY_HEADERS).to.be.true;
done();
}, 1000);
});
});

// it(`/POST (sync sum kafka message)`, function () {
// return request(server)
// .post('/mathSumSyncKafkaMessage')
// .send([1, 2, 3, 4, 5])
// .expect(200)
// .expect(200, '15');
// });

// it(`/POST (sync sum kafka(ish) message without key and only the value)`, () => {
// return request(server)
// .post('/mathSumSyncWithoutKey')
// .send([1, 2, 3, 4, 5])
// .expect(200)
// .expect(200, '15');
// });

// it(`/POST (sync sum plain object)`, () => {
// return request(server)
// .post('/mathSumSyncPlainObject')
// .send([1, 2, 3, 4, 5])
// .expect(200)
// .expect(200, '15');
// });

// it(`/POST (sync sum array)`, () => {
// return request(server)
// .post('/mathSumSyncArray')
// .send([1, 2, 3, 4, 5])
// .expect(200)
// .expect(200, '15');
// });

// it(`/POST (sync sum string)`, () => {
// return request(server)
// .post('/mathSumSyncString')
// .send([1, 2, 3, 4, 5])
// .expect(200)
// .expect(200, '15');
// });

// it(`/POST (sync sum number)`, () => {
// return request(server)
// .post('/mathSumSyncNumber')
// .send([12345])
// .expect(200)
// .expect(200, '15');
// });

// const userDto: UserDto = {
// email: '[email protected]',
// name: 'Ben',
// phone: '1112223331',
// years: 33,
// };
// const newUser: UserEntity = new UserEntity(userDto);
// const businessDto: BusinessDto = {
// name: 'Example',
// phone: '2233441122',
// user: newUser,
// };
// it(`/POST (sync command create user)`, () => {
// return request(server).post('/user').send(userDto).expect(200);
// });

// it(`/POST (sync command create business`, () => {
// return request(server).post('/business').send(businessDto).expect(200);
// });

// it(`/POST (sync command create user) Concurrency Test`, async () => {
// const promises = [];
// for (let concurrencyKey = 0; concurrencyKey < 100; concurrencyKey++) {
// const innerUserDto = JSON.parse(JSON.stringify(userDto));
// innerUserDto.name += `+${concurrencyKey}`;
// promises.push(request(server).post('/user').send(userDto).expect(200));
// }
// await Promise.all(promises);
// });

after(`Stopping Kafka app`, async () => {
await app.close();
});
});
7 changes: 7 additions & 0 deletions integration/microservices/src/rd-kafka/dtos/business.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { UserEntity } from '../entities/user.entity';

export class BusinessDto {
name: string;
phone: string;
user: UserEntity;
}
6 changes: 6 additions & 0 deletions integration/microservices/src/rd-kafka/dtos/user.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export class UserDto {
name: string;
email: string;
phone: string;
years: number;
}
19 changes: 19 additions & 0 deletions integration/microservices/src/rd-kafka/entities/business.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { UserEntity } from './user.entity';
import { BusinessDto } from '../dtos/business.dto';

export class BusinessEntity {
constructor(business: BusinessDto) {
this.id = Math.random() * 99999999;
this.name = business.name;
this.phone = business.phone;
this.createdBy = {
id: business.user.id,
};
this.created = new Date();
}
id: number;
name: string;
phone: string;
createdBy: Partial<UserEntity>;
created: Date;
}
18 changes: 18 additions & 0 deletions integration/microservices/src/rd-kafka/entities/user.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { UserDto } from '../dtos/user.dto';

export class UserEntity {
constructor(user: UserDto) {
this.id = Math.random() * 99999999;
this.name = user.name;
this.email = user.email;
this.phone = user.phone;
this.years = user.years;
this.created = new Date();
}
id: number;
name: string;
email: string;
phone: string;
years: number;
created: Date;
}
Loading
Loading