Skip to content

Commit

Permalink
BOT-2463 add support for AWS Transcribe
Browse files Browse the repository at this point in the history
  • Loading branch information
Florian Treml committed Jan 24, 2022
1 parent 99c0072 commit 3ff96c8
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 12 deletions.
6 changes: 4 additions & 2 deletions frontend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
"author": "Botium GmbH",
"license": "MIT",
"dependencies": {
"@aws-sdk/client-polly": "^3.47.1",
"@aws-sdk/client-transcribe-streaming": "^3.47.1",
"@aws-sdk/client-polly": "^3.48.0",
"@aws-sdk/client-s3": "^3.48.0",
"@aws-sdk/client-transcribe": "^3.48.0",
"@aws-sdk/client-transcribe-streaming": "^3.48.0",
"@google-cloud/speech": "^4.10.0",
"@google-cloud/storage": "^5.18.0",
"@google-cloud/text-to-speech": "^3.4.0",
Expand Down
1 change: 1 addition & 0 deletions frontend/resources/.env
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ BOTIUM_SPEECH_AZURE_REGION=
BOTIUM_SPEECH_AWS_REGION=
BOTIUM_SPEECH_AWS_ACCESS_KEY_ID=
BOTIUM_SPEECH_AWS_SECRET_ACCESS_KEY=
BOTIUM_SPEECH_AWS_S3_BUCKET=

# WAV Conversion Command Line
BOTIUM_SPEECH_CONVERT_PROFILE_WAVTOMONOWAV_CMD=sox -t wav - -r 16k -t wav -c 1 -b 16 -e signed {{{output}}}
Expand Down
102 changes: 93 additions & 9 deletions frontend/src/stt/awstranscribe.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
const _ = require('lodash')
const { TranscribeStreamingClient, StartStreamTranscriptionCommand } = require('@aws-sdk/client-transcribe-streaming')
const { v1: uuidv1 } = require('uuid')
const axios = require('axios').default
const { TranscribeStreamingClient, StartStreamTranscriptionCommand, MediaEncoding } = require('@aws-sdk/client-transcribe-streaming')
const { TranscribeClient, StartTranscriptionJobCommand, GetTranscriptionJobCommand, DeleteTranscriptionJobCommand, TranscriptionJobStatus } = require('@aws-sdk/client-transcribe')
const { S3Client, PutObjectCommand, DeleteObjectCommand } = require('@aws-sdk/client-s3')
const { PassThrough } = require('stream')
const EventEmitter = require('events')

const debug = require('debug')('botium-speech-processing-awstranscribe-stt')

const { awstranscribeOptions } = require('../utils')
const { awstranscribeOptions, applyIfExists } = require('../utils')

const languageCodes = [
'af-ZA',
Expand Down Expand Up @@ -67,19 +71,18 @@ class AwsTranscribeSTT {

const request = {
LanguageCode: language,
MediaEncoding: 'pcm',
MediaEncoding: MediaEncoding.PCM,
MediaSampleRateHertz: 16000,
AudioStream: audioStream()
}
if (req.body && req.body.awstranscribe && req.body.awstranscribe.config) {
Object.assign(request, req.body.awstranscribe.config)
}
applyIfExists(request, req, 'req.body.awstranscribe.config.streaming')

const events = new EventEmitter()
try {
const cmdResponse = await transcribeClient.send(new StartStreamTranscriptionCommand(request))
setTimeout(async () => {
try {
debug('Starting to listen to TranscriptResultStream ')
for await (const event of cmdResponse.TranscriptResultStream) {
const results = _.get(event, 'TranscriptEvent.Transcript.Results')
if (results && results.length > 0) {
Expand All @@ -96,15 +99,17 @@ class AwsTranscribeSTT {
}
}
} catch (err) {
debug(`TranscriptResultStream failure: ${err.Message || err.message || err}`)
events.emit('data', {
err: `${err.message}`
err: `${err.message || err}`
})
}
events.emit('close')
debug('Ready listening to TranscriptResultStream ')
}, 0)
} catch (err) {
debug(err)
throw new Error(`AWS Transcribe STT streaming failed: ${err.message}`)
debug(`StartStreamTranscriptionCommand failure: ${err.Message || err.message || err}`)
throw new Error(`AWS Transcribe STT streaming failed: ${err.Message || err.message || err}`)
}
return {
events,
Expand All @@ -126,7 +131,86 @@ class AwsTranscribeSTT {
}

async stt (req, { language, buffer, hint }) {
const transcribeClient = new TranscribeClient(awstranscribeOptions(req))
const s3Client = new S3Client(awstranscribeOptions(req))

const jobId = uuidv1()

const putRequest = {
Bucket: _.get(req, 'body.awstranscribe.credentials.bucket') || process.env.BOTIUM_SPEECH_AWS_S3_BUCKET || 'botium-speech-processing',
Key: `botium-transcribe-source-${jobId}`
}
applyIfExists(putRequest, req, 'req.body.awstranscribe.config.s3')

const transcribeJobRequest = {
TranscriptionJobName: `botium-transcribe-job-${jobId}`,
LanguageCode: language,
Media: {
MediaFileUri: `s3://${putRequest.Bucket}/${putRequest.Key}`
}
}
applyIfExists(putRequest, req, 'req.body.awstranscribe.config.transcribe')

try {
await s3Client.send(new PutObjectCommand({
...putRequest,
Body: buffer
}))
} catch (err) {
throw new Error(`S3 Upload to ${putRequest.Bucket}/${putRequest.Key} failure: ${err.message || err}`)
}

try {
let transcriptionJob = null
try {
const transcribeJobResponse = await transcribeClient.send(new StartTranscriptionJobCommand(transcribeJobRequest))
transcriptionJob = transcribeJobResponse.TranscriptionJob
} catch (err) {
throw new Error(`Creating Transcription Job for ${transcribeJobRequest.Media.MediaFileUri} failure: ${err.message || err}`)
}

while (true) {
try {
const jobStatus = await transcribeClient.send(new GetTranscriptionJobCommand({
TranscriptionJobName: transcriptionJob.TranscriptionJobName
}))
debug(`Checking Transcription Job for ${transcribeJobRequest.Media.MediaFileUri} status: ${JSON.stringify(jobStatus.TranscriptionJob)}`)
if (jobStatus.TranscriptionJob.TranscriptionJobStatus === TranscriptionJobStatus.COMPLETED) {
try {
const transcriptionFile = await axios.get(jobStatus.TranscriptionJob.Transcript.TranscriptFileUri)
return {
text: _.get(transcriptionFile.data, 'results.transcripts[0].transcript'),
debug: transcriptionFile.data
}
} catch (err) {
throw new Error(`Downloading Transcription Result for ${transcribeJobRequest.Media.MediaFileUri} failure: ${err.message || err}`)
}
} else if (jobStatus.TranscriptionJob.TranscriptionJobStatus === TranscriptionJobStatus.FAILED) {
throw new Error(`Transcription Job for ${transcribeJobRequest.Media.MediaFileUri} failed, reason: ${jobStatus.TranscriptionJob.FailureReason}`)
} else {
await new Promise(resolve => setTimeout(resolve, 1000))
}
} catch (err) {
throw new Error(`Checking Transcription Job Status for ${transcribeJobRequest.Media.MediaFileUri} failure: ${err.message || err}`)
}
}
} finally {
try {
await s3Client.send(new DeleteObjectCommand({
Bucket: putRequest.Bucket,
Key: putRequest.Key
}))
} catch (err) {
debug(`Deleting S3 Object ${putRequest.Bucket}/${putRequest.Key} failure: ${err.message || err}`)
}
try {
await transcribeClient.send(new DeleteTranscriptionJobCommand({
TranscriptionJobName: transcribeJobRequest.TranscriptionJobName
}))
} catch (err) {
debug(`Deleting Transcription Job ${transcribeJobRequest.TranscriptionJobName} failure: ${err.message || err}`)
}
}
}
}

Expand Down
7 changes: 6 additions & 1 deletion frontend/src/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ const readBaseUrls = (req) => {
}
}

const applyIfExists = (target, src, p) => {
return Object.assign(target, _.get(src, p) || {})
}

module.exports = {
asJson,
enumValueToName,
Expand All @@ -179,5 +183,6 @@ module.exports = {
azureSpeechConfig,
applyExtraAzureSpeechConfig,
getAzureErrorDetails,
readBaseUrls
readBaseUrls,
applyIfExists
}

0 comments on commit 3ff96c8

Please sign in to comment.