Skip to content

Commit

Permalink
feat: add aws self-managed kafka event source (#686)
Browse files Browse the repository at this point in the history
  • Loading branch information
brettdebt authored Aug 7, 2024
1 parent 04fe1f5 commit 6e3a7f2
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ serverlessExpress({
'AWS_KINESIS_DATA_STREAM': '/kinesis',
'AWS_S3': '/s3',
'AWS_STEP_FUNCTIONS': '/step-functions',
'AWS_SELF_MANAGED_KAFKA': '/self-managed-kafka',
}
})
```
Expand Down
20 changes: 20 additions & 0 deletions __tests__/unit.self-managed-kafka.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const eventSources = require('../src/event-sources')
const testUtils = require('./utils')

const selfManagedKafkaEventSource = eventSources.getEventSource({
eventSourceName: 'AWS_SELF_MANAGED_KAFKA'
})

test('request is correct', () => {
const req = getReq()
expect(typeof req).toEqual('object')
expect(req.headers).toEqual({ host: 'self-managed-kafka' })
expect(req.body).toEqual(testUtils.selfManagedKafkaEvent)
expect(req.method).toEqual('POST')
})

function getReq () {
const event = testUtils.selfManagedKafkaEvent
const request = selfManagedKafkaEventSource.getRequest({ event })
return request
}
44 changes: 43 additions & 1 deletion __tests__/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,42 @@ const kinesisDataStreamEvent = {
]
}

// Sample event from https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html
const selfManagedKafkaEvent = {
eventSource: 'SelfManagedKafka',
bootstrapServers: 'b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092',
records: {
'mytopic-0': [
{
topic: 'mytopic',
partition: 0,
offset: 15,
timestamp: 1545084650987,
timestampType: 'CREATE_TIME',
key: 'abcDEFghiJKLmnoPQRstuVWXyz1234==',
value: 'SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==',
headers: [
{
headerKey: [
104,
101,
97,
100,
101,
114,
86,
97,
108,
117,
101
]
}
]
}
]
}
}

describe('getEventSourceNameBasedOnEvent', () => {
test('throws error on empty event', () => {
expect(() => getEventSourceNameBasedOnEvent({ event: {} })).toThrow(
Expand Down Expand Up @@ -263,6 +299,11 @@ describe('getEventSourceNameBasedOnEvent', () => {
expect(result).toEqual('AWS_KINESIS_DATA_STREAM')
})

test('recognises self managed kafka event', () => {
const result = getEventSourceNameBasedOnEvent({ event: selfManagedKafkaEvent })
expect(result).toEqual('AWS_SELF_MANAGED_KAFKA')
})

test('recognizes eventbridge event', () => {
const result = getEventSourceNameBasedOnEvent({ event: eventbridgeEvent })
expect(result).toEqual('AWS_EVENTBRIDGE')
Expand All @@ -287,5 +328,6 @@ module.exports = {
eventbridgeEvent,
eventbridgeScheduledEvent,
eventbridgeCustomerEvent,
kinesisDataStreamEvent
kinesisDataStreamEvent,
selfManagedKafkaEvent
}
2 changes: 1 addition & 1 deletion src/configure.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Handler } from 'aws-lambda';
import { Logger } from './logger';
import Framework from './frameworks';

type EventSources = 'AWS_SNS' | 'AWS_DYNAMODB' | 'AWS_EVENTBRIDGE' | 'AWS_SQS' | 'AWS_KINESIS_DATA_STREAM' | 'AWS_S3' | 'AWS_STEP_FUNCTIONS';
type EventSources = 'AWS_SNS' | 'AWS_DYNAMODB' | 'AWS_EVENTBRIDGE' | 'AWS_SQS' | 'AWS_KINESIS_DATA_STREAM' | 'AWS_S3' | 'AWS_STEP_FUNCTIONS' | 'AWS_SELF_MANAGED_KAFKA';

interface EventSource {
getRequest?: any; // TODO:
Expand Down
17 changes: 17 additions & 0 deletions src/event-sources/aws/self-managed-kafka.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
const { emptyResponseMapper } = require('../utils')

const getRequestValuesFromSelfManagedKafka = ({ event }) => {
const method = 'POST'
const headers = { host: 'self-managed-kafka' }
const body = event

return {
method,
headers,
body
}
}
module.exports = {
getRequest: getRequestValuesFromSelfManagedKafka,
getResponse: emptyResponseMapper
}
3 changes: 3 additions & 0 deletions src/event-sources/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const awsEventBridgeEventSource = require('./aws/eventbridge')
const awsKinesisEventSource = require('./aws/kinesis')
const awsS3 = require('./aws/s3')
const awsStepFunctionsEventSource = require('./aws/step-functions')
const awsSelfManagedKafkaEventSource = require('./aws/self-managed-kafka')

function getEventSource ({ eventSourceName }) {
switch (eventSourceName) {
Expand Down Expand Up @@ -40,6 +41,8 @@ function getEventSource ({ eventSourceName }) {
return awsS3
case 'AWS_STEP_FUNCTIONS':
return awsStepFunctionsEventSource
case 'AWS_SELF_MANAGED_KAFKA':
return awsSelfManagedKafkaEventSource
default:
throw new Error('Couldn\'t detect valid event source.')
}
Expand Down
1 change: 1 addition & 0 deletions src/event-sources/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ function getEventSourceNameBasedOnEvent ({
event
}) {
if (event.requestContext && event.requestContext.elb) return 'AWS_ALB'
if (event.eventSource === 'SelfManagedKafka') return 'AWS_SELF_MANAGED_KAFKA'
if (event.Records) {
const eventSource = event.Records[0] ? event.Records[0].EventSource || event.Records[0].eventSource : undefined
if (eventSource === 'aws:sns') {
Expand Down

0 comments on commit 6e3a7f2

Please sign in to comment.