Skip to content

Commit

Permalink
Merge pull request #7 from DaBeIDS/feature/2023-10_issue_29
Browse files Browse the repository at this point in the history
Feature/2023 10 issue 29
  • Loading branch information
RishavIDS authored Oct 25, 2023
2 parents 4793d75 + fbfd2af commit ed7bbab
Show file tree
Hide file tree
Showing 19 changed files with 220 additions and 119 deletions.
25 changes: 7 additions & 18 deletions data_extractor/code/esg_data_pipeline/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
# Modified mmdetection Dockerfile
ARG PYTORCH="1.4"
ARG CUDA="10.1"
ARG CUDNN="7"
FROM ubuntu:22.04
SHELL ["/bin/bash", "-c"]

FROM pytorch/pytorch:${PYTORCH}-cuda${CUDA}-cudnn${CUDNN}-devel

ENV TORCH_CUDA_ARCH_LIST="3.7 6.0 6.1 7.0+PTX"
ENV TORCH_NVCC_FLAGS="-Xfatbin -compress-all"
ENV CMAKE_PREFIX_PATH="$(dirname $(which conda))/../"

RUN apt-key del 3bf863cc
RUN apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/3bf863cc.pub

RUN apt-key del 7fa2af80
RUN apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64/7fa2af80.pub
# no prompt during installation:
ARG DEBIAN_FRONTEND=noninteractive

# Added poppler-utils, default-jre installations
RUN apt-get update && apt-get install -y git wget vim ninja-build poppler-utils default-jre \
RUN apt-get update && apt-get install -y apt-utils git wget python3 python3-pip vim ninja-build poppler-utils default-jre \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

Expand All @@ -31,9 +20,9 @@ WORKDIR /app/code/esg_data_pipeline
RUN pip install -e .

RUN mkdir -p /app/server_logs
RUN chmod -R 777 /app/server_logs

RUN mkdir -p /app/data

RUN chmod -R 777 /app/server_logs
RUN chmod -R 777 /app/data

CMD ./entry.sh
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .components import Extractor, PDFTextExtractor, \
TextCurator

from .components import Extractor, PDFTextExtractor, TextCurator
from .extraction_server import run_extraction
import logging
from .config import logging_config, config

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import re
from abc import abstractmethod
import re

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import json
import logging
import os
import re
from pathlib import Path

import pandas as pd
Expand Down Expand Up @@ -80,25 +79,26 @@ def extract_pdf_by_page(self, pdf_file):
_logger.warning("{}: Unable to process {}".format(e, pdf_file))
return {}

fp = open(pdf_file, 'rb')
pdf_content = {}

# Create a PDF resource manager
rsrcmgr = PDFResourceManager()
retstr = io.BytesIO()
codec = 'utf-8'
laparams = LAParams()
device = TextConverter(rsrcmgr, retstr, codec=codec, laparams=laparams)
interpreter = PDFPageInterpreter(rsrcmgr, device)

pdf_content = {}
for page_number, page in enumerate(PDFPage.get_pages(fp, check_extractable=False)):
interpreter.process_page(page)
data = retstr.getvalue().decode('utf-8')
data_paragraphs = self.process_page(data)
if len(data_paragraphs) == 0:
continue
pdf_content[page_number] = data_paragraphs
retstr.truncate(0)
retstr.seek(0)
fp.close()
retstr = io.StringIO()

with open(pdf_file, 'rb') as fp:
# Create a PDF page interpreter
device = TextConverter(rsrcmgr, retstr, laparams=laparams)
interpreter = PDFPageInterpreter(rsrcmgr, device)
for page_number, page in enumerate(PDFPage.get_pages(fp)):
interpreter.process_page(page)
data = retstr.getvalue()
data_paragraphs = self.process_page(data)
if len(data_paragraphs) == 0:
continue
pdf_content[page_number] = data_paragraphs
retstr.truncate(0)
retstr.seek(0)

return pdf_content

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import pandas as pd

import esg_data_pipeline.utils.kpi_mapping as kpi_mapping
import data_extractor.code.utils.kpi_mapping as kpi_mapping
from .base_curator import BaseCurator

logger = logging.getLogger(__name__)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@


Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
import pathlib

# General config
Expand All @@ -9,15 +8,16 @@
ROOT = CONFIG_FOLDER.parent.parent.parent.parent
DATA_FOLDER = ROOT / "data"

#Extraction inputs

# Extraction inputs
PDFTextExtractor_kwargs = {'min_paragraph_length': 30,
#Set to ANNOTATION_FOLDER if you want to extract just pdfs mentioned in the annotations
#Set to None to extract all pdfs in pdf folder (for production stage)
# Set to ANNOTATION_FOLDER if you want to extract just pdfs mentioned in the annotations
# Set to None to extract all pdfs in pdf folder (for production stage)
'annotation_folder': None,
'skip_extracted_files': False
}

#Curation inputs
# Curation inputs
TextCurator_kwargs = {
'retrieve_paragraph': False,
'neg_pos_ratio': 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
from flask import Flask, Response, request
import shutil
import traceback
from s3_communication import S3Communication
from data_extractor.code.utils.s3_communication import S3Communication

from .components import Extractor
from .config import config
from .components import Curator

from esg_data_pipeline.components import Extractor
from esg_data_pipeline.config import config
from esg_data_pipeline.components import Curator

app = Flask(__name__)

Expand All @@ -33,21 +34,35 @@ def liveness():

@app.route('/extract/')
def run_extraction():
args = json.loads(request.args['payload'])
# args = json.loads(request.args['payload'])

# Local args dictionary
args = {"project_name": 'TEST'}
args.update({"s3_usage": False})
extraction_settings = {}
extraction_settings.update({"use_extractions": False})
extraction_settings.update({"seed": 42})
extraction_settings.update({"min_paragraph_length": 20})
extraction_settings.update({"annotation_folder": None})
extraction_settings.update({"skip_extracted_files": False})
extraction_settings.update({"store_extractions": True})
args.update({"extraction": extraction_settings})

# Load the extraction settings from args
project_name = args["project_name"]

extraction_settings = args['extraction']

BASE_DATA_PROJECT_FOLDER = config.DATA_FOLDER / project_name
config.PDF_FOLDER = BASE_DATA_PROJECT_FOLDER / 'interim' / 'pdfs'
BASE_INTERIM_FOLDER = BASE_DATA_PROJECT_FOLDER / 'interim' / 'ml'
config.EXTRACTION_FOLDER = BASE_INTERIM_FOLDER / 'extraction'
config.ANNOTATION_FOLDER = BASE_INTERIM_FOLDER / 'annotations'
config.STAGE = 'extract'

create_directory(config.EXTRACTION_FOLDER)
create_directory(config.ANNOTATION_FOLDER)
create_directory(config.PDF_FOLDER)

# Create path strings for the different folders
base_data_project_folder = config.DATA_FOLDER / project_name
pdf_folder = base_data_project_folder / 'interim' / 'pdfs'
base_interim_folder = base_data_project_folder / 'interim' / 'ml'
extraction_folder = base_interim_folder / 'extraction'
annotation_folder = base_interim_folder / 'annotations'

# Create the folders if they do not already exist
os.makedirs(extraction_folder, exist_ok=True)
os.makedirs(annotation_folder, exist_ok=True)
os.makedirs(pdf_folder, exist_ok=True)

s3_usage = args["s3_usage"]
if s3_usage:
Expand All @@ -68,48 +83,53 @@ def run_extraction():
)
if extraction_settings['use_extractions']:
s3c_main.download_files_in_prefix_to_dir(project_prefix + '/output/TEXT_EXTRACTION',
config.EXTRACTION_FOLDER)
extraction_folder)
s3c_interim.download_files_in_prefix_to_dir(project_prefix + '/interim/ml/annotations',
config.ANNOTATION_FOLDER)
annotation_folder)
if args['mode'] == 'train':
s3c_main.download_files_in_prefix_to_dir(project_prefix + '/input/pdfs/training',
config.PDF_FOLDER)
pdf_folder)
else:
s3c_main.download_files_in_prefix_to_dir(project_prefix + '/input/pdfs/inference',
config.PDF_FOLDER)
pdf_folder)

pdfs = glob.glob(os.path.join(config.PDF_FOLDER, "*.pdf"))
pdfs = glob.glob(os.path.join(pdf_folder, "*.pdf"))
if len(pdfs) == 0:
msg = "No pdf files found in the pdf directory ({})".format(config.PDF_FOLDER)
msg = "No pdf files found in the pdf directory ({})".format(pdf_folder)
return Response(msg, status=500)

annotation_files = glob.glob(os.path.join(config.ANNOTATION_FOLDER, "*.csv"))

"""
# TODO Why do we need annotation at all? Actually extraction does not need that!
annotation_files = glob.glob(os.path.join(annotation_folder, "*.csv"))
if len(annotation_files) == 0:
msg = "No annotations.csv file found on S3."
return Response(msg, status=500)
elif len(annotation_files) > 2:
msg = "Multiple annotations.csv files found on S3."
return Response(msg, status=500)

"""

# Update the settings in config to the user settings
config.STAGE = 'extract'
config.SEED = extraction_settings["seed"]
config.PDFTextExtractor_kwargs['min_paragraph_length'] = extraction_settings["min_paragraph_length"]
config.PDFTextExtractor_kwargs['annotation_folder'] = extraction_settings["annotation_folder"]
config.PDFTextExtractor_kwargs['skip_extracted_files'] = extraction_settings["skip_extracted_files"]

# Create an extractor class element with the newly updated extraction settings
ext = Extractor(config.EXTRACTORS)

try:
t1 = time.time()
ext.run_folder(config.PDF_FOLDER, config.EXTRACTION_FOLDER)
ext.run_folder(pdf_folder, extraction_folder)
t2 = time.time()
except Exception as e:
msg = "Error during extraction\nException:" + str(e)
return Response(msg, status=500)

extracted_files = os.listdir(config.EXTRACTION_FOLDER)
extracted_files = os.listdir(extraction_folder)
if len(extracted_files) == 0:
msg = "Extraction Failed. No file was found in the extraction directory ({})"\
.format(config.EXTRACTION_FOLDER)
.format(extraction_folder)
return Response(msg, status=500)

failed_to_extract = ""
Expand All @@ -124,32 +144,49 @@ def run_extraction():
msg += "The following pdf files, however, did not get extracted:\n" + failed_to_extract

if s3_usage:
s3c_interim.upload_files_in_dir_to_prefix(config.EXTRACTION_FOLDER,
s3c_interim.upload_files_in_dir_to_prefix(extraction_folder,
project_prefix + '/interim/ml/extraction')
# clear folder
create_directory(config.EXTRACTION_FOLDER)
create_directory(config.ANNOTATION_FOLDER)
create_directory(config.PDF_FOLDER)
create_directory(extraction_folder)
create_directory(annotation_folder)
create_directory(pdf_folder)
time_elapsed = str(timedelta(seconds=t2 - t1))
msg += "\nTime elapsed:{}".format(time_elapsed)
return Response(msg, status=200)


@app.route('/curate/')
def run_curation():
args = json.loads(request.args['payload'])
#args = json.loads(request.args['payload'])

# Local args dictionary
args = {"project_name": 'TEST'}
args.update({"s3_usage": False})
curation_settings = {}
curation_settings.update({"retrieve_paragraph": False})
curation_settings.update({"neg_pos_ratio": 1})
curation_settings.update({"columns_to_read": ["company", "source_file", "source_page", "kpi_id", "year", "answer", "data_type", "relevant_paragraphs"]})
curation_settings.update({"company_to_exclude": []})
curation_settings.update({"create_neg_samples": True})
curation_settings.update({"min_length_neg_sample": 50})
curation_settings.update({"seed": 41})
args.update({"curation": curation_settings})

# Load the extraction settings from args
project_name = args["project_name"]
curation_settings = args["curation"]
curation_settings = args['curation']

BASE_DATA_PROJECT_FOLDER = config.DATA_FOLDER / project_name
BASE_INTERIM_FOLDER = BASE_DATA_PROJECT_FOLDER / 'interim' / 'ml'
config.EXTRACTION_FOLDER = BASE_INTERIM_FOLDER / 'extraction'
extraction_folder = BASE_INTERIM_FOLDER / 'extraction'
config.CURATION_FOLDER = BASE_INTERIM_FOLDER / 'curation'
config.ANNOTATION_FOLDER = BASE_INTERIM_FOLDER / 'annotations'
annotation_folder = BASE_INTERIM_FOLDER / 'annotations'
config.KPI_FOLDER = BASE_DATA_PROJECT_FOLDER / 'interim' / 'kpi_mapping'
create_directory(config.EXTRACTION_FOLDER)
create_directory(config.CURATION_FOLDER)
create_directory(config.ANNOTATION_FOLDER)

os.makedirs(extraction_folder, exist_ok=True)
os.makedirs(config.CURATION_FOLDER, exist_ok=True)
os.makedirs(annotation_folder, exist_ok=True)
os.makedirs(config.KPI_FOLDER, exist_ok=True)

s3_usage = args["s3_usage"]
if s3_usage:
Expand All @@ -169,9 +206,9 @@ def run_curation():
s3_bucket=os.getenv(s3_settings['interim_bucket']['s3_bucket_name']),
)
s3c_main.download_files_in_prefix_to_dir(project_prefix + '/input/kpi_mapping', config.KPI_FOLDER)
s3c_interim.download_files_in_prefix_to_dir(project_prefix + '/interim/ml/extraction', config.EXTRACTION_FOLDER)
s3c_interim.download_files_in_prefix_to_dir(project_prefix + '/interim/ml/extraction', extraction_folder)
s3c_main.download_files_in_prefix_to_dir(project_prefix + '/input/annotations',
config.ANNOTATION_FOLDER)
annotation_folder)

shutil.copyfile(os.path.join(config.KPI_FOLDER, "kpi_mapping.csv"), "/app/code/kpi_mapping.csv")

Expand All @@ -183,21 +220,21 @@ def run_curation():
config.TextCurator_kwargs['min_length_neg_sample'] = curation_settings['min_length_neg_sample']
config.SEED = curation_settings['seed']

try:
if len(config.CURATORS) != 0:
cur = Curator(config.CURATORS)
cur.run(config.EXTRACTION_FOLDER, config.ANNOTATION_FOLDER, config.CURATION_FOLDER)
except Exception as e:
msg = "Error during curation\nException:" + str(repr(e)) + traceback.format_exc()
return Response(msg, status=500)
#try:
if len(config.CURATORS) != 0:
cur = Curator(config.CURATORS)
cur.run(extraction_folder, annotation_folder, config.CURATION_FOLDER)
#except Exception as e:
#msg = "Error during curation\nException:" + str(repr(e)) + traceback.format_exc()
#return Response(msg, status=500)

if s3_usage:
s3c_interim.upload_files_in_dir_to_prefix(config.CURATION_FOLDER,
project_prefix + '/interim/ml/curation')
# clear folder
create_directory(config.KPI_FOLDER)
create_directory(config.EXTRACTION_FOLDER)
create_directory(config.ANNOTATION_FOLDER)
create_directory(extraction_folder)
create_directory(annotation_folder)
create_directory(config.CURATION_FOLDER)

return Response("Curation OK", status=200)
Expand Down
Empty file.
Loading

0 comments on commit ed7bbab

Please sign in to comment.