Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/2023 10 issue 29 #7

Merged
merged 10 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions data_extractor/code/esg_data_pipeline/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
# Modified mmdetection Dockerfile
ARG PYTORCH="1.4"
ARG CUDA="10.1"
ARG CUDNN="7"
FROM ubuntu:22.04
SHELL ["/bin/bash", "-c"]

FROM pytorch/pytorch:${PYTORCH}-cuda${CUDA}-cudnn${CUDNN}-devel

ENV TORCH_CUDA_ARCH_LIST="3.7 6.0 6.1 7.0+PTX"
ENV TORCH_NVCC_FLAGS="-Xfatbin -compress-all"
ENV CMAKE_PREFIX_PATH="$(dirname $(which conda))/../"

RUN apt-key del 3bf863cc
RUN apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/3bf863cc.pub

RUN apt-key del 7fa2af80
RUN apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64/7fa2af80.pub
# no prompt during installation:
ARG DEBIAN_FRONTEND=noninteractive

# Added poppler-utils, default-jre installations
RUN apt-get update && apt-get install -y git wget vim ninja-build poppler-utils default-jre \
RUN apt-get update && apt-get install -y apt-utils git wget python3 python3-pip vim ninja-build poppler-utils default-jre \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

Expand All @@ -31,9 +20,9 @@ WORKDIR /app/code/esg_data_pipeline
RUN pip install -e .

RUN mkdir -p /app/server_logs
RUN chmod -R 777 /app/server_logs

RUN mkdir -p /app/data

RUN chmod -R 777 /app/server_logs
RUN chmod -R 777 /app/data

CMD ./entry.sh
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .components import Extractor, PDFTextExtractor, \
TextCurator

from .components import Extractor, PDFTextExtractor, TextCurator
from .extraction_server import run_extraction
import logging
from .config import logging_config, config

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import re
from abc import abstractmethod
import re

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import json
import logging
import os
import re
from pathlib import Path

import pandas as pd
Expand Down Expand Up @@ -80,25 +79,26 @@ def extract_pdf_by_page(self, pdf_file):
_logger.warning("{}: Unable to process {}".format(e, pdf_file))
return {}

fp = open(pdf_file, 'rb')
pdf_content = {}

# Create a PDF resource manager
rsrcmgr = PDFResourceManager()
retstr = io.BytesIO()
codec = 'utf-8'
laparams = LAParams()
device = TextConverter(rsrcmgr, retstr, codec=codec, laparams=laparams)
interpreter = PDFPageInterpreter(rsrcmgr, device)

pdf_content = {}
for page_number, page in enumerate(PDFPage.get_pages(fp, check_extractable=False)):
interpreter.process_page(page)
data = retstr.getvalue().decode('utf-8')
data_paragraphs = self.process_page(data)
if len(data_paragraphs) == 0:
continue
pdf_content[page_number] = data_paragraphs
retstr.truncate(0)
retstr.seek(0)
fp.close()
retstr = io.StringIO()

with open(pdf_file, 'rb') as fp:
# Create a PDF page interpreter
device = TextConverter(rsrcmgr, retstr, laparams=laparams)
interpreter = PDFPageInterpreter(rsrcmgr, device)
for page_number, page in enumerate(PDFPage.get_pages(fp)):
interpreter.process_page(page)
data = retstr.getvalue()
data_paragraphs = self.process_page(data)
if len(data_paragraphs) == 0:
continue
pdf_content[page_number] = data_paragraphs
retstr.truncate(0)
retstr.seek(0)

return pdf_content

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import pandas as pd

import esg_data_pipeline.utils.kpi_mapping as kpi_mapping
import data_extractor.code.utils.kpi_mapping as kpi_mapping
from .base_curator import BaseCurator

logger = logging.getLogger(__name__)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@


Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
import pathlib

# General config
Expand All @@ -9,15 +8,16 @@
ROOT = CONFIG_FOLDER.parent.parent.parent.parent
DATA_FOLDER = ROOT / "data"

#Extraction inputs

# Extraction inputs
PDFTextExtractor_kwargs = {'min_paragraph_length': 30,
#Set to ANNOTATION_FOLDER if you want to extract just pdfs mentioned in the annotations
#Set to None to extract all pdfs in pdf folder (for production stage)
# Set to ANNOTATION_FOLDER if you want to extract just pdfs mentioned in the annotations
# Set to None to extract all pdfs in pdf folder (for production stage)
'annotation_folder': None,
'skip_extracted_files': False
}

#Curation inputs
# Curation inputs
TextCurator_kwargs = {
'retrieve_paragraph': False,
'neg_pos_ratio': 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
from flask import Flask, Response, request
import shutil
import traceback
from s3_communication import S3Communication
from data_extractor.code.utils.s3_communication import S3Communication

from .components import Extractor
from .config import config
from .components import Curator

from esg_data_pipeline.components import Extractor
from esg_data_pipeline.config import config
from esg_data_pipeline.components import Curator

app = Flask(__name__)

Expand All @@ -33,21 +34,35 @@ def liveness():

@app.route('/extract/')
def run_extraction():
args = json.loads(request.args['payload'])
# args = json.loads(request.args['payload'])

# Local args dictionary
args = {"project_name": 'TEST'}
args.update({"s3_usage": False})
extraction_settings = {}
extraction_settings.update({"use_extractions": False})
extraction_settings.update({"seed": 42})
extraction_settings.update({"min_paragraph_length": 20})
extraction_settings.update({"annotation_folder": None})
extraction_settings.update({"skip_extracted_files": False})
extraction_settings.update({"store_extractions": True})
args.update({"extraction": extraction_settings})

# Load the extraction settings from args
project_name = args["project_name"]

extraction_settings = args['extraction']

BASE_DATA_PROJECT_FOLDER = config.DATA_FOLDER / project_name
config.PDF_FOLDER = BASE_DATA_PROJECT_FOLDER / 'interim' / 'pdfs'
BASE_INTERIM_FOLDER = BASE_DATA_PROJECT_FOLDER / 'interim' / 'ml'
config.EXTRACTION_FOLDER = BASE_INTERIM_FOLDER / 'extraction'
config.ANNOTATION_FOLDER = BASE_INTERIM_FOLDER / 'annotations'
config.STAGE = 'extract'

create_directory(config.EXTRACTION_FOLDER)
create_directory(config.ANNOTATION_FOLDER)
create_directory(config.PDF_FOLDER)

# Create path strings for the different folders
base_data_project_folder = config.DATA_FOLDER / project_name
pdf_folder = base_data_project_folder / 'interim' / 'pdfs'
base_interim_folder = base_data_project_folder / 'interim' / 'ml'
extraction_folder = base_interim_folder / 'extraction'
annotation_folder = base_interim_folder / 'annotations'

# Create the folders if they do not already exist
os.makedirs(extraction_folder, exist_ok=True)
os.makedirs(annotation_folder, exist_ok=True)
os.makedirs(pdf_folder, exist_ok=True)

s3_usage = args["s3_usage"]
if s3_usage:
Expand All @@ -68,48 +83,53 @@ def run_extraction():
)
if extraction_settings['use_extractions']:
s3c_main.download_files_in_prefix_to_dir(project_prefix + '/output/TEXT_EXTRACTION',
config.EXTRACTION_FOLDER)
extraction_folder)
s3c_interim.download_files_in_prefix_to_dir(project_prefix + '/interim/ml/annotations',
config.ANNOTATION_FOLDER)
annotation_folder)
if args['mode'] == 'train':
s3c_main.download_files_in_prefix_to_dir(project_prefix + '/input/pdfs/training',
config.PDF_FOLDER)
pdf_folder)
else:
s3c_main.download_files_in_prefix_to_dir(project_prefix + '/input/pdfs/inference',
config.PDF_FOLDER)
pdf_folder)

pdfs = glob.glob(os.path.join(config.PDF_FOLDER, "*.pdf"))
pdfs = glob.glob(os.path.join(pdf_folder, "*.pdf"))
if len(pdfs) == 0:
msg = "No pdf files found in the pdf directory ({})".format(config.PDF_FOLDER)
msg = "No pdf files found in the pdf directory ({})".format(pdf_folder)
return Response(msg, status=500)

annotation_files = glob.glob(os.path.join(config.ANNOTATION_FOLDER, "*.csv"))

"""
# TODO Why do we need annotation at all? Actually extraction does not need that!
annotation_files = glob.glob(os.path.join(annotation_folder, "*.csv"))
if len(annotation_files) == 0:
msg = "No annotations.csv file found on S3."
return Response(msg, status=500)
elif len(annotation_files) > 2:
msg = "Multiple annotations.csv files found on S3."
return Response(msg, status=500)

"""

# Update the settings in config to the user settings
config.STAGE = 'extract'
config.SEED = extraction_settings["seed"]
config.PDFTextExtractor_kwargs['min_paragraph_length'] = extraction_settings["min_paragraph_length"]
config.PDFTextExtractor_kwargs['annotation_folder'] = extraction_settings["annotation_folder"]
config.PDFTextExtractor_kwargs['skip_extracted_files'] = extraction_settings["skip_extracted_files"]

# Create an extractor class element with the newly updated extraction settings
ext = Extractor(config.EXTRACTORS)

try:
t1 = time.time()
ext.run_folder(config.PDF_FOLDER, config.EXTRACTION_FOLDER)
ext.run_folder(pdf_folder, extraction_folder)
t2 = time.time()
except Exception as e:
msg = "Error during extraction\nException:" + str(e)
return Response(msg, status=500)

extracted_files = os.listdir(config.EXTRACTION_FOLDER)
extracted_files = os.listdir(extraction_folder)
if len(extracted_files) == 0:
msg = "Extraction Failed. No file was found in the extraction directory ({})"\
.format(config.EXTRACTION_FOLDER)
.format(extraction_folder)
return Response(msg, status=500)

failed_to_extract = ""
Expand All @@ -124,32 +144,49 @@ def run_extraction():
msg += "The following pdf files, however, did not get extracted:\n" + failed_to_extract

if s3_usage:
s3c_interim.upload_files_in_dir_to_prefix(config.EXTRACTION_FOLDER,
s3c_interim.upload_files_in_dir_to_prefix(extraction_folder,
project_prefix + '/interim/ml/extraction')
# clear folder
create_directory(config.EXTRACTION_FOLDER)
create_directory(config.ANNOTATION_FOLDER)
create_directory(config.PDF_FOLDER)
create_directory(extraction_folder)
create_directory(annotation_folder)
create_directory(pdf_folder)
time_elapsed = str(timedelta(seconds=t2 - t1))
msg += "\nTime elapsed:{}".format(time_elapsed)
return Response(msg, status=200)


@app.route('/curate/')
def run_curation():
args = json.loads(request.args['payload'])
#args = json.loads(request.args['payload'])

# Local args dictionary
args = {"project_name": 'TEST'}
args.update({"s3_usage": False})
curation_settings = {}
curation_settings.update({"retrieve_paragraph": False})
curation_settings.update({"neg_pos_ratio": 1})
curation_settings.update({"columns_to_read": ["company", "source_file", "source_page", "kpi_id", "year", "answer", "data_type", "relevant_paragraphs"]})
curation_settings.update({"company_to_exclude": []})
curation_settings.update({"create_neg_samples": True})
curation_settings.update({"min_length_neg_sample": 50})
curation_settings.update({"seed": 41})
args.update({"curation": curation_settings})

# Load the extraction settings from args
project_name = args["project_name"]
curation_settings = args["curation"]
curation_settings = args['curation']

BASE_DATA_PROJECT_FOLDER = config.DATA_FOLDER / project_name
BASE_INTERIM_FOLDER = BASE_DATA_PROJECT_FOLDER / 'interim' / 'ml'
config.EXTRACTION_FOLDER = BASE_INTERIM_FOLDER / 'extraction'
extraction_folder = BASE_INTERIM_FOLDER / 'extraction'
config.CURATION_FOLDER = BASE_INTERIM_FOLDER / 'curation'
config.ANNOTATION_FOLDER = BASE_INTERIM_FOLDER / 'annotations'
annotation_folder = BASE_INTERIM_FOLDER / 'annotations'
config.KPI_FOLDER = BASE_DATA_PROJECT_FOLDER / 'interim' / 'kpi_mapping'
create_directory(config.EXTRACTION_FOLDER)
create_directory(config.CURATION_FOLDER)
create_directory(config.ANNOTATION_FOLDER)

os.makedirs(extraction_folder, exist_ok=True)
os.makedirs(config.CURATION_FOLDER, exist_ok=True)
os.makedirs(annotation_folder, exist_ok=True)
os.makedirs(config.KPI_FOLDER, exist_ok=True)

s3_usage = args["s3_usage"]
if s3_usage:
Expand All @@ -169,9 +206,9 @@ def run_curation():
s3_bucket=os.getenv(s3_settings['interim_bucket']['s3_bucket_name']),
)
s3c_main.download_files_in_prefix_to_dir(project_prefix + '/input/kpi_mapping', config.KPI_FOLDER)
s3c_interim.download_files_in_prefix_to_dir(project_prefix + '/interim/ml/extraction', config.EXTRACTION_FOLDER)
s3c_interim.download_files_in_prefix_to_dir(project_prefix + '/interim/ml/extraction', extraction_folder)
s3c_main.download_files_in_prefix_to_dir(project_prefix + '/input/annotations',
config.ANNOTATION_FOLDER)
annotation_folder)

shutil.copyfile(os.path.join(config.KPI_FOLDER, "kpi_mapping.csv"), "/app/code/kpi_mapping.csv")

Expand All @@ -183,21 +220,21 @@ def run_curation():
config.TextCurator_kwargs['min_length_neg_sample'] = curation_settings['min_length_neg_sample']
config.SEED = curation_settings['seed']

try:
if len(config.CURATORS) != 0:
cur = Curator(config.CURATORS)
cur.run(config.EXTRACTION_FOLDER, config.ANNOTATION_FOLDER, config.CURATION_FOLDER)
except Exception as e:
msg = "Error during curation\nException:" + str(repr(e)) + traceback.format_exc()
return Response(msg, status=500)
#try:
if len(config.CURATORS) != 0:
cur = Curator(config.CURATORS)
cur.run(extraction_folder, annotation_folder, config.CURATION_FOLDER)
#except Exception as e:
#msg = "Error during curation\nException:" + str(repr(e)) + traceback.format_exc()
#return Response(msg, status=500)

if s3_usage:
s3c_interim.upload_files_in_dir_to_prefix(config.CURATION_FOLDER,
project_prefix + '/interim/ml/curation')
# clear folder
create_directory(config.KPI_FOLDER)
create_directory(config.EXTRACTION_FOLDER)
create_directory(config.ANNOTATION_FOLDER)
create_directory(extraction_folder)
create_directory(annotation_folder)
create_directory(config.CURATION_FOLDER)

return Response("Curation OK", status=200)
Expand Down
Empty file.
Loading
Loading