Skip to content

Commit

Permalink
feat: finish up the rdkafka event pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
mkaufmaner committed May 6, 2024
1 parent 1219546 commit cabfd40
Show file tree
Hide file tree
Showing 14 changed files with 393 additions and 157 deletions.
92 changes: 46 additions & 46 deletions integration/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,52 +1,52 @@
version: '3'

services:
# redis:
# container_name: test-redis
# image: redis
# ports:
# - "6379:6379"
# restart: always
# nats:
# container_name: test-nats
# image: nats
# ports:
# - "8222:8222"
# - "4222:4222"
# - "6222:6222"
# restart: always
# mqtt:
# container_name: test-mqtt
# image: eclipse-mosquitto
# volumes:
# - ./mosquitto.conf:/mosquitto/config/mosquitto.conf
# ports:
# - "1883:1883"
# - "9001:9001"
# restart: always
# mysql:
# image: mysql:8.3.0
# environment:
# MYSQL_ROOT_PASSWORD: root
# MYSQL_DATABASE: test
# ports:
# - "3306:3306"
# restart: always
# mongodb:
# container_name: test-mongodb
# image: mongo:latest
# environment:
# - MONGODB_DATABASE="test"
# ports:
# - 27017:27017
# rabbit:
# container_name: test-rabbit
# hostname: rabbit
# image: "rabbitmq:management"
# ports:
# - "15672:15672"
# - "5672:5672"
# tty: true
redis:
container_name: test-redis
image: redis
ports:
- '6379:6379'
restart: always
nats:
container_name: test-nats
image: nats
ports:
- '8222:8222'
- '4222:4222'
- '6222:6222'
restart: always
mqtt:
container_name: test-mqtt
image: eclipse-mosquitto
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf
ports:
- '1883:1883'
- '9001:9001'
restart: always
mysql:
image: mysql:8.3.0
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: test
ports:
- '3306:3306'
restart: always
mongodb:
container_name: test-mongodb
image: mongo:latest
environment:
- MONGODB_DATABASE="test"
ports:
- 27017:27017
rabbit:
container_name: test-rabbit
hostname: rabbit
image: 'rabbitmq:management'
ports:
- '15672:15672'
- '5672:5672'
tty: true
zookeeper:
container_name: test-zookeeper
hostname: zookeeper
Expand Down
60 changes: 48 additions & 12 deletions integration/microservices/e2e/sum-rd-kafka.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,54 @@ describe('RdKafka transport', function () {
await app.init();
});

it(`/POST (async event notification)`, done => {
request(server)
.post('/notify')
.send()
.end(() => {
setTimeout(() => {
expect(RdKafkaController.IS_NOTIFIED).to.be.true;
done();
}, 1000);
});
});

it(`/POST (async event notification with key)`, done => {
request(server)
.post('/notifyWithKey')
.send()
.end(() => {
setTimeout(() => {
expect(RdKafkaController.IS_NOTIFIED_WITH_KEY).to.be.true;
done();
}, 1000);
});
});

it(`/POST (async event notification with key and headers)`, done => {
request(server)
.post('/notifyWithKeyAndHeaders')
.send()
.end(() => {
setTimeout(() => {
expect(RdKafkaController.IS_NOTIFIED_WITH_KEY_AND_HEADERS).to.be.true;
done();
}, 1000);
});
});

it(`/POST (async event notification with key and many headers)`, done => {
request(server)
.post('/notifyWithKeyAndManyHeaders')
.send()
.end(() => {
setTimeout(() => {
expect(RdKafkaController.IS_NOTIFIED_WITH_KEY_AND_MANY_HEADERS).to.be.true;
done();
}, 1000);
});
});

// it(`/POST (sync sum kafka message)`, function () {
// return request(server)
// .post('/mathSumSyncKafkaMessage')
Expand Down Expand Up @@ -86,18 +134,6 @@ describe('RdKafka transport', function () {
// .expect(200, '15');
// });

it(`/POST (async event notification)`, done => {
request(server)
.post('/notify')
.send()
.end(() => {
setTimeout(() => {
expect(RdKafkaController.IS_NOTIFIED).to.be.true;
done();
}, 1000);
});
});

// const userDto: UserDto = {
// email: '[email protected]',
// name: 'Ben',
Expand Down
57 changes: 51 additions & 6 deletions integration/microservices/src/rd-kafka/rd-kafka.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import { UserDto } from './dtos/user.dto';
export class RdKafkaController implements OnModuleInit, OnModuleDestroy {
protected readonly logger = new Logger(RdKafkaController.name);
static IS_NOTIFIED = false;
static IS_NOTIFIED_WITH_KEY = false;
static IS_NOTIFIED_WITH_KEY_AND_HEADERS = false;
static IS_NOTIFIED_WITH_KEY_AND_MANY_HEADERS = false;

static MATH_SUM = 0;

@Client({
Expand Down Expand Up @@ -51,6 +55,53 @@ export class RdKafkaController implements OnModuleInit, OnModuleDestroy {
await this.client.close();
}

// async notify
@Post('notify')
async sendNotification(): Promise<any> {
return this.client.emit('notify', { notify: true });
}

// async notify with key
@Post('notifyWithKey')
async sendNotificationWithKey(): Promise<any> {
return this.client.emit('notify.with.key', {
key: 'unique-key',
value: {
notify: true
}
});
}

// async notify with key and headers
@Post('notifyWithKeyAndHeaders')
async sendNotificationWithKeyAndHeaders(): Promise<any> {
return this.client.emit('notify.with.key.and.headers', {
key: 'unique-key-with-header',
headers: {
'custom': 'something'
},
value: {
notify: true
}
});
}

// async notify with key and headers
@Post('notifyWithKeyAndManyHeaders')
async sendNotificationWithKeyAndManyHeaders(): Promise<any> {
return this.client.emit('notify.with.key.and.many.headers', {
key: 'unique-key-with-many-headers',
headers: {
'custom': 'something',
'custom2': 'something2',
'int': 123
},
value: {
notify: true
}
});
}

// sync send kafka message
@Post('mathSumSyncKafkaMessage')
@HttpCode(200)
Expand Down Expand Up @@ -127,12 +178,6 @@ export class RdKafkaController implements OnModuleInit, OnModuleDestroy {
return result;
}

// async notify
@Post('notify')
async sendNotification(): Promise<any> {
return this.client.emit('notify', { notify: true });
}

// Complex data to send.
@Post('/user')
@HttpCode(200)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Controller, Logger } from '@nestjs/common';
import { EventPattern, MessagePattern } from '@nestjs/microservices';
import { Ctx, EventPattern, MessagePattern, Payload, RdKafkaContext } from '@nestjs/microservices';
import { BusinessDto } from './dtos/business.dto';
import { UserDto } from './dtos/user.dto';
import { BusinessEntity } from './entities/business.entity';
Expand All @@ -11,6 +11,36 @@ export class RdKafkaMessagesController {
protected readonly logger = new Logger(RdKafkaMessagesController.name);
static IS_NOTIFIED = false;

@EventPattern('notify')
notify(data: any) {
// console.log('notify data', data);
RdKafkaController.IS_NOTIFIED = data.notify;
}

@EventPattern('notify.with.key')
notifyWithKey(@Payload() data: any, @Ctx() context: RdKafkaContext) {
// console.log('notifyWithKey data', data);
// console.log('notifyWithKey context', context);
RdKafkaController.IS_NOTIFIED_WITH_KEY = data.notify;
}

@EventPattern('notify.with.key.and.headers')
notifyWithKeyAndHeaders(@Payload() data: any, @Ctx() context: RdKafkaContext) {
// console.log('notifyWithKeyAndHeaders data', data);
// console.log('notifyWithKeyAndHeaders context', context);
// console.log('notifyWithKeyAndHeaders context message', context.getMessage());
RdKafkaController.IS_NOTIFIED_WITH_KEY_AND_HEADERS = data.notify;
}


@EventPattern('notify.with.key.and.many.headers')
notifyWithKeyAndManyHeaders(@Payload() data: any, @Ctx() context: RdKafkaContext) {
// console.log('notifyWithKeyAndHeaders data', data);
// console.log('notifyWithKeyAndHeaders context', context);
// console.log('notifyWithKeyAndHeaders context message', context.getMessage());
RdKafkaController.IS_NOTIFIED_WITH_KEY_AND_MANY_HEADERS = data.notify;
}

@MessagePattern('math.sum.sync.kafka.message')
mathSumSyncKafkaMessage(data: any) {
return (data.value.numbers || []).reduce((a, b) => a + b);
Expand Down Expand Up @@ -51,11 +81,6 @@ export class RdKafkaMessagesController {
.reduce((a, b) => a + b);
}

@EventPattern('notify')
eventHandler(data: any) {
RdKafkaController.IS_NOTIFIED = data.notify;
}

// Complex data to send.
@MessagePattern('user.create')
async createUser(params: { value: { user: UserDto } }) {
Expand Down
Loading

0 comments on commit cabfd40

Please sign in to comment.