From 6e3a7f20d037da50e03060855525d3132c079b6e Mon Sep 17 00:00:00 2001 From: brett <54417064+brettdebt@users.noreply.github.com> Date: Wed, 7 Aug 2024 01:52:47 -0700 Subject: [PATCH] feat: add aws self-managed kafka event source (#686) --- README.md | 1 + __tests__/unit.self-managed-kafka.js | 20 ++++++++++ __tests__/utils.js | 44 ++++++++++++++++++++- src/configure.d.ts | 2 +- src/event-sources/aws/self-managed-kafka.js | 17 ++++++++ src/event-sources/index.js | 3 ++ src/event-sources/utils.js | 1 + 7 files changed, 86 insertions(+), 2 deletions(-) create mode 100644 __tests__/unit.self-managed-kafka.js create mode 100644 src/event-sources/aws/self-managed-kafka.js diff --git a/README.md b/README.md index ca63951c..169f1140 100644 --- a/README.md +++ b/README.md @@ -283,6 +283,7 @@ serverlessExpress({ 'AWS_KINESIS_DATA_STREAM': '/kinesis', 'AWS_S3': '/s3', 'AWS_STEP_FUNCTIONS': '/step-functions', + 'AWS_SELF_MANAGED_KAFKA': '/self-managed-kafka', } }) ``` diff --git a/__tests__/unit.self-managed-kafka.js b/__tests__/unit.self-managed-kafka.js new file mode 100644 index 00000000..4720c0a2 --- /dev/null +++ b/__tests__/unit.self-managed-kafka.js @@ -0,0 +1,20 @@ +const eventSources = require('../src/event-sources') +const testUtils = require('./utils') + +const selfManagedKafkaEventSource = eventSources.getEventSource({ + eventSourceName: 'AWS_SELF_MANAGED_KAFKA' +}) + +test('request is correct', () => { + const req = getReq() + expect(typeof req).toEqual('object') + expect(req.headers).toEqual({ host: 'self-managed-kafka' }) + expect(req.body).toEqual(testUtils.selfManagedKafkaEvent) + expect(req.method).toEqual('POST') +}) + +function getReq () { + const event = testUtils.selfManagedKafkaEvent + const request = selfManagedKafkaEventSource.getRequest({ event }) + return request +} diff --git a/__tests__/utils.js b/__tests__/utils.js index 5a668e1e..c85a5e4b 100644 --- a/__tests__/utils.js +++ b/__tests__/utils.js @@ -231,6 +231,42 @@ const kinesisDataStreamEvent = { ] } +// Sample event from https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html +const selfManagedKafkaEvent = { + eventSource: 'SelfManagedKafka', + bootstrapServers: 'b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092', + records: { + 'mytopic-0': [ + { + topic: 'mytopic', + partition: 0, + offset: 15, + timestamp: 1545084650987, + timestampType: 'CREATE_TIME', + key: 'abcDEFghiJKLmnoPQRstuVWXyz1234==', + value: 'SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==', + headers: [ + { + headerKey: [ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101 + ] + } + ] + } + ] + } +} + describe('getEventSourceNameBasedOnEvent', () => { test('throws error on empty event', () => { expect(() => getEventSourceNameBasedOnEvent({ event: {} })).toThrow( @@ -263,6 +299,11 @@ describe('getEventSourceNameBasedOnEvent', () => { expect(result).toEqual('AWS_KINESIS_DATA_STREAM') }) + test('recognises self managed kafka event', () => { + const result = getEventSourceNameBasedOnEvent({ event: selfManagedKafkaEvent }) + expect(result).toEqual('AWS_SELF_MANAGED_KAFKA') + }) + test('recognizes eventbridge event', () => { const result = getEventSourceNameBasedOnEvent({ event: eventbridgeEvent }) expect(result).toEqual('AWS_EVENTBRIDGE') @@ -287,5 +328,6 @@ module.exports = { eventbridgeEvent, eventbridgeScheduledEvent, eventbridgeCustomerEvent, - kinesisDataStreamEvent + kinesisDataStreamEvent, + selfManagedKafkaEvent } diff --git a/src/configure.d.ts b/src/configure.d.ts index 05891a80..a51d3c25 100644 --- a/src/configure.d.ts +++ b/src/configure.d.ts @@ -3,7 +3,7 @@ import { Handler } from 'aws-lambda'; import { Logger } from './logger'; import Framework from './frameworks'; -type EventSources = 'AWS_SNS' | 'AWS_DYNAMODB' | 'AWS_EVENTBRIDGE' | 'AWS_SQS' | 'AWS_KINESIS_DATA_STREAM' | 'AWS_S3' | 'AWS_STEP_FUNCTIONS'; +type EventSources = 'AWS_SNS' | 'AWS_DYNAMODB' | 'AWS_EVENTBRIDGE' | 'AWS_SQS' | 'AWS_KINESIS_DATA_STREAM' | 'AWS_S3' | 'AWS_STEP_FUNCTIONS' | 'AWS_SELF_MANAGED_KAFKA'; interface EventSource { getRequest?: any; // TODO: diff --git a/src/event-sources/aws/self-managed-kafka.js b/src/event-sources/aws/self-managed-kafka.js new file mode 100644 index 00000000..3ef9ad1c --- /dev/null +++ b/src/event-sources/aws/self-managed-kafka.js @@ -0,0 +1,17 @@ +const { emptyResponseMapper } = require('../utils') + +const getRequestValuesFromSelfManagedKafka = ({ event }) => { + const method = 'POST' + const headers = { host: 'self-managed-kafka' } + const body = event + + return { + method, + headers, + body + } +} +module.exports = { + getRequest: getRequestValuesFromSelfManagedKafka, + getResponse: emptyResponseMapper +} diff --git a/src/event-sources/index.js b/src/event-sources/index.js index ef7e6749..a83a672a 100644 --- a/src/event-sources/index.js +++ b/src/event-sources/index.js @@ -11,6 +11,7 @@ const awsEventBridgeEventSource = require('./aws/eventbridge') const awsKinesisEventSource = require('./aws/kinesis') const awsS3 = require('./aws/s3') const awsStepFunctionsEventSource = require('./aws/step-functions') +const awsSelfManagedKafkaEventSource = require('./aws/self-managed-kafka') function getEventSource ({ eventSourceName }) { switch (eventSourceName) { @@ -40,6 +41,8 @@ function getEventSource ({ eventSourceName }) { return awsS3 case 'AWS_STEP_FUNCTIONS': return awsStepFunctionsEventSource + case 'AWS_SELF_MANAGED_KAFKA': + return awsSelfManagedKafkaEventSource default: throw new Error('Couldn\'t detect valid event source.') } diff --git a/src/event-sources/utils.js b/src/event-sources/utils.js index 2c5d5a53..da3c80f5 100644 --- a/src/event-sources/utils.js +++ b/src/event-sources/utils.js @@ -71,6 +71,7 @@ function getEventSourceNameBasedOnEvent ({ event }) { if (event.requestContext && event.requestContext.elb) return 'AWS_ALB' + if (event.eventSource === 'SelfManagedKafka') return 'AWS_SELF_MANAGED_KAFKA' if (event.Records) { const eventSource = event.Records[0] ? event.Records[0].EventSource || event.Records[0].eventSource : undefined if (eventSource === 'aws:sns') {