Skip to content

Commit

Permalink
feat(construct): Update file transformation changes in constructs (#44)
Browse files Browse the repository at this point in the history
* update qa to match documentation and decouple from previous workflow

* update ingestion pipeline to name files differently

* update summarization following same model

* try catch on filename

* update doc for summarization

* remove unused variable from embeddings function

* bug fixes for multiple files

* fix permissions on bucket when existing resource is provided since cdk does not fail

* propagate fix to qa

* eslint

* bug fixes for multiple files

---------

Co-authored-by: Alain Krok <[email protected]>
Co-authored-by: Dinesh Sajwan <[email protected]>
  • Loading branch information
3 people authored Oct 20, 2023
1 parent 0c8f9d1 commit 87ce0f6
Show file tree
Hide file tree
Showing 20 changed files with 281 additions and 184 deletions.
2 changes: 1 addition & 1 deletion docs/emerging_tech_cdk_constructs.drawio

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def read_file_from_s3(bucket, key):

@tracer.capture_method
def check_file_exists(bucket,key):
logger.info(f"Checking if file exists: bucket: {bucket}, key: {key}")
s3_client = boto3.client('s3')
try:
resp = s3_client.head_object(Bucket=bucket, Key=key)
Expand All @@ -50,22 +51,26 @@ def check_file_exists(bucket,key):
@tracer.capture_method
def get_file_transformation(transformed_asset_bucket,transformed_file_name,
input_asset_bucket,original_file_name):

response = {
'status':'File transformation Pending',
'name':original_file_name,
'summary':''
}
if (check_file_exists(transformed_asset_bucket, transformed_file_name) == False):
if (check_file_exists(transformed_asset_bucket, transformed_file_name) is False):
logger.info("Starting file transformation")
loader = S3FileLoaderInMemory(input_asset_bucket, original_file_name)
document_content = loader.load()
if not document_content:
response['status'] = 'Error'
response['name'] = ''
response['summary'] = 'Not able to transform the file.'
return response
encoded_string = document_content.encode("utf-8")
s3.Bucket(transformed_asset_bucket).put_object(Key=transformed_file_name, Body=encoded_string)
response['status'] = 'File transformed'
response['name'] = transformed_file_name
else:
response['status'] = 'File already exists'
response['summary']=''
else:
logger.info("File already exists,skip transformation.")

return response
57 changes: 30 additions & 27 deletions lambda/aws-summarization-appsync-stepfn/document_reader/lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ def handler(event, context: LambdaContext):
logger.info(f"{event=}")

original_file_name = event["name"]
job_id = event["jobid"]

job_id = event["jobid"]
response = {
"is_summary_available": False,
"summary_job_id": job_id,
Expand Down Expand Up @@ -65,9 +64,10 @@ def handler(event, context: LambdaContext):
)
else:
metrics.add_metric(name="summary_llm_hit", unit=MetricUnit.Count, value=1)
transformed_file_name = original_file_name.replace(".pdf", "_transformed.txt")
transformed_file_name = original_file_name.replace(".pdf", ".txt")

if(is_file_tranformation_required):
logger.info("File transformation required")
transformed_file = get_file_transformation(transformed_bucket_name,
transformed_file_name,
input_bucket_name,
Expand All @@ -76,53 +76,56 @@ def handler(event, context: LambdaContext):
{
"file_name": original_file_name,
"status": transformed_file['status'],
"summary": '',
"summary": transformed_file['summary'],
"transformed_file_name":transformed_file_name,
"is_summary_available": False
}
)
else:
pdf_transformed_file = check_file_exists(transformed_bucket_name,
transformed_file_name)
if not pdf_transformed_file:
if pdf_transformed_file is False:
response.update(
{
"file_name": original_file_name,
"status": "Error",
"summary": f"No file {transformed_file_name} available to generate the summary.",
"status": "Error",
"summary": f"Error occured. No file {transformed_file_name} available to generate the summary."
}
)
logger.exception({"No file {transformed_file_name} available to generate the summary."})
return response


logger.info({"document reader response:": response})
updateSummaryJobStatus({'jobid': job_id,
'file_name':response["file_name"]
,'status':response['status'] ,
'summary':response["summary"]})
logger.info({"document reader response:::": response})
updateSummaryJobStatus({'jobid': job_id, 'files':
[{ 'status':response["status"],
'name':response['file_name'] ,
'summary':response["summary"] }]})
return response

@tracer.capture_method
def get_summary_from_cache(file_name):

logger.info({"Searching Redis for cached summary file: "+file_name})
redis_host = os.environ.get("REDIS_HOST", "N/A")
redis_port = os.environ.get("REDIS_PORT", "N/A")
redis_host = os.environ.get("REDIS_HOST")
redis_port = os.environ.get("REDIS_PORT")

logger.info({"Redis host: "+redis_host})
logger.info({"Redis port: "+redis_port})

try:
redis_client = redis.Redis(host=redis_host, port=redis_port)
fileSummary = redis_client.get(file_name)
except (ValueError, redis.ConnectionError) as e:
logger.exception({"An error occured while connecting to Redis" : e})
return
logger.info(f"Redis host: {redis_host}")
logger.info(f"Redis port: {redis_port}")

if redis_host is None or redis_port is None:
logger.exception({"Redis host or port is not set"})
else:
try:
logger.info({"Connecting Redis......"})
redis_client = redis.Redis(host=redis_host, port=redis_port)
fileSummary = redis_client.get(file_name)
except (ValueError, redis.ConnectionError) as e:
logger.exception({"An error occured while connecting to Redis" : e})
return

if fileSummary:
logger.info({"File summary found in cache: ": fileSummary})
return fileSummary.decode()
if fileSummary:
logger.info({"File summary found in cache: ": fileSummary})
return fileSummary.decode()


logger.info("File summary not found in cache, generating it from llm")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def load(self) -> str:
# read S3
try:
s3 = boto3.resource('s3')
logger.info(f"Reading file from s3: bucket: {self.bucket}, key: {self.key}")
obj = s3.Object(self.bucket, self.key)
encodedpdf = obj.get()['Body'].read()
pdfFile = PdfReader(BytesIO(encodedpdf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,24 @@ def get_credentials(secret_id: str, region_name: str) -> str:
def updateSummaryJobStatus(variables):

logger.info(f"send status variables :: {variables}")
query = """mutation updateSummaryJobStatus {
updateSummaryJobStatus(summary_job_id: \"$jobid\",file_name: \"$file_name\", status: \"$status\", summary: \"$summary\") {
summary_job_id
file_name
query = """
mutation updateSummaryJobStatus {
updateSummaryJobStatus(files: $files, summary_job_id: \"$jobid\") {
files {
name
status
summary
}
summary_job_id
}
}
"""

query = query.replace("$jobid", variables['jobid'])
query = query.replace("$file_name", variables['file_name'])
query = query.replace("$status", variables['status'])
query = query.replace("$summary", variables['summary'])
query = query.replace("$files", str(variables['files']).replace("\'", "\""))
query = query.replace("\"name\"", "name")
query = query.replace("\"status\"", "status")
query = query.replace("\"summary\"", "summary")


# query = query.replace("\"file_name\"", "file_name")
Expand Down
49 changes: 32 additions & 17 deletions lambda/aws-summarization-appsync-stepfn/input_validator/lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,39 +34,54 @@ def handler(event, context: LambdaContext)-> dict:

input_files = summary_input['files']

response = process_files(input_files)

updateFileStatus({'jobid': job_id, 'files': response['files']})

response = process_files(input_files,job_id)

response_transformed = add_job_id_to_response(response, job_id)

logger.info({"response": response_transformed})
return response_transformed


@tracer.capture_method
def process_files(input_files):
def process_files(input_files,job_id):
files_list = []
files_to_process = []
files_to_reject = []
valid = True

for i in range(len(input_files)):
filename = input_files[i]['name']
status = "Unsupported"
if filename.lower().endswith(('.pdf')):
status = "Supported"
metrics.add_metric(name="SupportedFile", unit=MetricUnit.Count, value=1)
file_list = {
'status':"Pending",
'name':filename ,
'summary':""
}
if filename.lower().endswith(('.pdf')) or filename.lower().endswith(('.txt')):
metrics.add_metric(name="SupportedFile", unit=MetricUnit.Count, value=1)
file_list.update({'status':'Supported'})
else:
logger.info("file {filename} extension is currently not supported")
file_list.update({'status':"Error"})
file_list.update({'summary':"Invalid file format"})
logger.info(f"file {filename} extension is currently not supported, skipping this file from summary generation")
metrics.add_metric(name="UnsupportedFile", unit=MetricUnit.Count, value=1)
file_to_process = {
'status':status,
'name':filename,
'summary':''
}
files_to_process.append(file_to_process)

files_list.append(file_list)

for file in files_list:
if file['status'] == "Error":
files_to_reject.append(file)
logger.info({" Rejected file :: file_name":file['name'],"status":file['status']})
else:
files_to_process.append(file)
logger.info({"Valid file :: file_name":file['name'],"status":file['status']})


updateFileStatus({'jobid': job_id, 'files': files_list})

if not files_to_process:
valid = False

logger.info("No valid file to process. Stopping the job.")

response = {
'isValid':valid,
'files':files_to_process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def updateFileStatus(variables):

print(f"send status variables :: {variables}")
query = """
mutation updateFileStatus {
updateFileStatus(files: $files, summary_job_id: \"$jobid\") {
mutation updateSummaryJobStatus {
updateSummaryJobStatus(files: $files, summary_job_id: \"$jobid\") {
files {
name
status
Expand Down
57 changes: 30 additions & 27 deletions lambda/aws-summarization-appsync-stepfn/summary_generator/lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
@metrics.log_metrics(capture_cold_start_metric=True)
def handler(event, context: LambdaContext)-> dict:
logger.info("Starting summary agent with input", event)

processed_files = []

job_id = event["summary_job_id"]

Expand All @@ -60,16 +60,11 @@ def handler(event, context: LambdaContext)-> dict:
original_file_name = event["file_name"]
transformed_file_name = event["transformed_file_name"]

# create response

response = {
"summary_job_id": job_id,
"file_name": original_file_name,
"status": "Pending",
"summary": ""
"files": processed_files,
}



summary_llm = Bedrock(
model_id="anthropic.claude-v2",
client=bedrock_client,
Expand All @@ -86,23 +81,36 @@ def handler(event, context: LambdaContext)-> dict:

inputFile = read_file_from_s3(transformed_bucket_name, transformed_file_name)
if inputFile is None:
response["status"] = "Failed to load file from S3"
processed_file = {
'status':"Error",
'name':original_file_name,
'summary':"No file available to read."
}
processed_files.append(processed_file)
logger.exception(f"Error occured:: {response}")
return response

finalsummary = generate_summary(summary_llm,chain_type,inputFile)

llm_answer_bytes = finalsummary.encode("utf-8")
base64_bytes = base64.b64encode(llm_answer_bytes)
llm_answer_base64_string = base64_bytes.decode("utf-8")
logger.info(finalsummary)
logger.info("Summarization done")

response.update({
'file_name':original_file_name,
'status':"Completed",
'summary':llm_answer_base64_string
}
)
if not finalsummary:
logger.exception("Error occured while generating summary")
processed_file = {
'status':"Error",
'name':original_file_name,
'summary':"Something went wrong while generating summary!"
}
else:
llm_answer_bytes = finalsummary.encode("utf-8")
base64_bytes = base64.b64encode(llm_answer_bytes)
llm_answer_base64_string = base64_bytes.decode("utf-8")
logger.info(f" Generated summary is:: {finalsummary}")
processed_file = {
'status':"Completed",
'name':original_file_name,
'summary':llm_answer_base64_string
}
processed_files.append(processed_file)


logger.info("Saving respone in Redis :: ",response)
try:
Expand All @@ -115,10 +123,7 @@ def handler(event, context: LambdaContext)-> dict:
f'Host: "{redis_host}", Port: "{redis_port}".\n'
f"Exception: {e}"
)
updateSummaryJobStatus({'jobid': job_id,
'file_name':response["file_name"]
,'status':response['status'] ,
'summary':response["summary"]})
updateSummaryJobStatus({'jobid': job_id, 'files': processed_files})
return response


Expand All @@ -128,8 +133,6 @@ def generate_summary(_summary_llm,chain_type,inputFile)-> str:

logger.info(f" Using chain_type as {chain_type} for the document")
docs = [Document(page_content=inputFile)]
# run LLM
# prompt = load_prompt("prompt.json")
chain = load_summarize_chain(
_summary_llm,
chain_type=chain_type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,24 @@ def get_credentials(secret_id: str, region_name: str) -> str:
def updateSummaryJobStatus(variables):

logger.info(f"send status variables :: {variables}")
query = """mutation updateSummaryJobStatus {
updateSummaryJobStatus(summary_job_id: \"$jobid\",file_name: \"$file_name\", status: \"$status\", summary: \"$summary\") {
summary_job_id
file_name
query = """
mutation updateSummaryJobStatus {
updateSummaryJobStatus(files: $files, summary_job_id: \"$jobid\") {
files {
name
status
summary
}
summary_job_id
}
}
"""

query = query.replace("$jobid", variables['jobid'])
query = query.replace("$file_name", variables['file_name'])
query = query.replace("$status", variables['status'])
query = query.replace("$summary", variables['summary'])
query = query.replace("$files", str(variables['files']).replace("\'", "\""))
query = query.replace("\"name\"", "name")
query = query.replace("\"status\"", "status")
query = query.replace("\"summary\"", "summary")


query = query.replace("\n", "")
Expand Down
Loading

0 comments on commit 87ce0f6

Please sign in to comment.