diff --git a/frontend/package.json b/frontend/package.json index 791fbe4..4668b37 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -13,8 +13,10 @@ "author": "Botium GmbH", "license": "MIT", "dependencies": { - "@aws-sdk/client-polly": "^3.47.1", - "@aws-sdk/client-transcribe-streaming": "^3.47.1", + "@aws-sdk/client-polly": "^3.48.0", + "@aws-sdk/client-s3": "^3.48.0", + "@aws-sdk/client-transcribe": "^3.48.0", + "@aws-sdk/client-transcribe-streaming": "^3.48.0", "@google-cloud/speech": "^4.10.0", "@google-cloud/storage": "^5.18.0", "@google-cloud/text-to-speech": "^3.4.0", diff --git a/frontend/resources/.env b/frontend/resources/.env index 5d63e9f..fcb8049 100644 --- a/frontend/resources/.env +++ b/frontend/resources/.env @@ -57,6 +57,7 @@ BOTIUM_SPEECH_AZURE_REGION= BOTIUM_SPEECH_AWS_REGION= BOTIUM_SPEECH_AWS_ACCESS_KEY_ID= BOTIUM_SPEECH_AWS_SECRET_ACCESS_KEY= +BOTIUM_SPEECH_AWS_S3_BUCKET= # WAV Conversion Command Line BOTIUM_SPEECH_CONVERT_PROFILE_WAVTOMONOWAV_CMD=sox -t wav - -r 16k -t wav -c 1 -b 16 -e signed {{{output}}} diff --git a/frontend/src/stt/awstranscribe.js b/frontend/src/stt/awstranscribe.js index 00b0136..517751b 100644 --- a/frontend/src/stt/awstranscribe.js +++ b/frontend/src/stt/awstranscribe.js @@ -1,11 +1,15 @@ const _ = require('lodash') -const { TranscribeStreamingClient, StartStreamTranscriptionCommand } = require('@aws-sdk/client-transcribe-streaming') +const { v1: uuidv1 } = require('uuid') +const axios = require('axios').default +const { TranscribeStreamingClient, StartStreamTranscriptionCommand, MediaEncoding } = require('@aws-sdk/client-transcribe-streaming') +const { TranscribeClient, StartTranscriptionJobCommand, GetTranscriptionJobCommand, DeleteTranscriptionJobCommand, TranscriptionJobStatus } = require('@aws-sdk/client-transcribe') +const { S3Client, PutObjectCommand, DeleteObjectCommand } = require('@aws-sdk/client-s3') const { PassThrough } = require('stream') const EventEmitter = require('events') const debug = require('debug')('botium-speech-processing-awstranscribe-stt') -const { awstranscribeOptions } = require('../utils') +const { awstranscribeOptions, applyIfExists } = require('../utils') const languageCodes = [ 'af-ZA', @@ -67,19 +71,18 @@ class AwsTranscribeSTT { const request = { LanguageCode: language, - MediaEncoding: 'pcm', + MediaEncoding: MediaEncoding.PCM, MediaSampleRateHertz: 16000, AudioStream: audioStream() } - if (req.body && req.body.awstranscribe && req.body.awstranscribe.config) { - Object.assign(request, req.body.awstranscribe.config) - } + applyIfExists(request, req, 'req.body.awstranscribe.config.streaming') const events = new EventEmitter() try { const cmdResponse = await transcribeClient.send(new StartStreamTranscriptionCommand(request)) setTimeout(async () => { try { + debug('Starting to listen to TranscriptResultStream ') for await (const event of cmdResponse.TranscriptResultStream) { const results = _.get(event, 'TranscriptEvent.Transcript.Results') if (results && results.length > 0) { @@ -96,15 +99,17 @@ class AwsTranscribeSTT { } } } catch (err) { + debug(`TranscriptResultStream failure: ${err.Message || err.message || err}`) events.emit('data', { - err: `${err.message}` + err: `${err.message || err}` }) } events.emit('close') + debug('Ready listening to TranscriptResultStream ') }, 0) } catch (err) { - debug(err) - throw new Error(`AWS Transcribe STT streaming failed: ${err.message}`) + debug(`StartStreamTranscriptionCommand failure: ${err.Message || err.message || err}`) + throw new Error(`AWS Transcribe STT streaming failed: ${err.Message || err.message || err}`) } return { events, @@ -126,7 +131,86 @@ class AwsTranscribeSTT { } async stt (req, { language, buffer, hint }) { + const transcribeClient = new TranscribeClient(awstranscribeOptions(req)) + const s3Client = new S3Client(awstranscribeOptions(req)) + + const jobId = uuidv1() + + const putRequest = { + Bucket: _.get(req, 'body.awstranscribe.credentials.bucket') || process.env.BOTIUM_SPEECH_AWS_S3_BUCKET || 'botium-speech-processing', + Key: `botium-transcribe-source-${jobId}` + } + applyIfExists(putRequest, req, 'req.body.awstranscribe.config.s3') + const transcribeJobRequest = { + TranscriptionJobName: `botium-transcribe-job-${jobId}`, + LanguageCode: language, + Media: { + MediaFileUri: `s3://${putRequest.Bucket}/${putRequest.Key}` + } + } + applyIfExists(putRequest, req, 'req.body.awstranscribe.config.transcribe') + + try { + await s3Client.send(new PutObjectCommand({ + ...putRequest, + Body: buffer + })) + } catch (err) { + throw new Error(`S3 Upload to ${putRequest.Bucket}/${putRequest.Key} failure: ${err.message || err}`) + } + + try { + let transcriptionJob = null + try { + const transcribeJobResponse = await transcribeClient.send(new StartTranscriptionJobCommand(transcribeJobRequest)) + transcriptionJob = transcribeJobResponse.TranscriptionJob + } catch (err) { + throw new Error(`Creating Transcription Job for ${transcribeJobRequest.Media.MediaFileUri} failure: ${err.message || err}`) + } + + while (true) { + try { + const jobStatus = await transcribeClient.send(new GetTranscriptionJobCommand({ + TranscriptionJobName: transcriptionJob.TranscriptionJobName + })) + debug(`Checking Transcription Job for ${transcribeJobRequest.Media.MediaFileUri} status: ${JSON.stringify(jobStatus.TranscriptionJob)}`) + if (jobStatus.TranscriptionJob.TranscriptionJobStatus === TranscriptionJobStatus.COMPLETED) { + try { + const transcriptionFile = await axios.get(jobStatus.TranscriptionJob.Transcript.TranscriptFileUri) + return { + text: _.get(transcriptionFile.data, 'results.transcripts[0].transcript'), + debug: transcriptionFile.data + } + } catch (err) { + throw new Error(`Downloading Transcription Result for ${transcribeJobRequest.Media.MediaFileUri} failure: ${err.message || err}`) + } + } else if (jobStatus.TranscriptionJob.TranscriptionJobStatus === TranscriptionJobStatus.FAILED) { + throw new Error(`Transcription Job for ${transcribeJobRequest.Media.MediaFileUri} failed, reason: ${jobStatus.TranscriptionJob.FailureReason}`) + } else { + await new Promise(resolve => setTimeout(resolve, 1000)) + } + } catch (err) { + throw new Error(`Checking Transcription Job Status for ${transcribeJobRequest.Media.MediaFileUri} failure: ${err.message || err}`) + } + } + } finally { + try { + await s3Client.send(new DeleteObjectCommand({ + Bucket: putRequest.Bucket, + Key: putRequest.Key + })) + } catch (err) { + debug(`Deleting S3 Object ${putRequest.Bucket}/${putRequest.Key} failure: ${err.message || err}`) + } + try { + await transcribeClient.send(new DeleteTranscriptionJobCommand({ + TranscriptionJobName: transcribeJobRequest.TranscriptionJobName + })) + } catch (err) { + debug(`Deleting Transcription Job ${transcribeJobRequest.TranscriptionJobName} failure: ${err.message || err}`) + } + } } } diff --git a/frontend/src/utils.js b/frontend/src/utils.js index 5324211..df6ea7f 100644 --- a/frontend/src/utils.js +++ b/frontend/src/utils.js @@ -166,6 +166,10 @@ const readBaseUrls = (req) => { } } +const applyIfExists = (target, src, p) => { + return Object.assign(target, _.get(src, p) || {}) +} + module.exports = { asJson, enumValueToName, @@ -179,5 +183,6 @@ module.exports = { azureSpeechConfig, applyExtraAzureSpeechConfig, getAzureErrorDetails, - readBaseUrls + readBaseUrls, + applyIfExists }