Skip to content

Commit

Permalink
Merge pull request #87 from Police-Data-Accessibility-Project/parse-r…
Browse files Browse the repository at this point in the history
…esponse-59

parse_response refactor
  • Loading branch information
maxachis authored May 30, 2024
2 parents b16630d + e40e472 commit ddbea3b
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 54 deletions.
19 changes: 19 additions & 0 deletions html_tag_collector/DataClassTags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dataclasses import dataclass


@dataclass
class Tags:
index: int = None
url: str = ""
url_path: str = ""
html_title: str = ""
meta_description: str = ""
root_page_title: str = ""
http_response: int = -1
h1: str = ""
h2: str = ""
h3: str = ""
h4: str = ""
h5: str = ""
h6: str = ""
div_text: str = ""
263 changes: 209 additions & 54 deletions html_tag_collector/collector.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
""" The tag collector is used to collect HTML tags and other relevant data from websites that is useful for training prediction models.
Information being collected includes:
- The URL's path
- HTML title
- Meta description
- The root page's HTML title
- HTTP response code
- Contents of H1-H6 header tags
- Contents of div tags
"""


from dataclasses import asdict
from collections import namedtuple
import json
import ssl
import urllib3
Expand All @@ -20,6 +34,8 @@

from RootURLCache import RootURLCache
from common import get_user_agent
from DataClassTags import Tags


# Define the list of header tags we want to extract
header_tags = ["h1", "h2", "h3", "h4", "h5", "h6"]
Expand Down Expand Up @@ -162,27 +178,46 @@ async def get_response(session, url, index):
except (KeyError, AttributeError):
pass

# If the response size is greater than 10 MB
# or the response is an unreadable content type
# or the response code from the website is not in the 200s
if (
response is not None and len(response.content) > 10000000
or content_type is not None and any(
filtered_type in content_type
for filtered_type in ["pdf", "excel", "msword", "image", "rtf", "zip", "octet", "csv", "json"]
)
or response is not None and not response.ok
):
# Discard the response content to prevent out of memory errors
if DEBUG:
print("Large or unreadable content discarded:", len(response.content), url)
new_response = requests.Response()
new_response.status_code = response.status_code
response = new_response
response = response_valid(response, content_type, url)

return {"index": index, "response": response}


def response_valid(response, content_type, url):
"""Checks the response to see if content is too large, unreadable, or invalid response code. The response is discarded if it is invalid.
Args:
response (Response): Response object to check.
content_type (str): The content type returned by the website.
url (str): URL that was requested.
Returns:
Response: The response object is returned either unmodified or discarded.
"""
# If the response size is greater than 10 MB
# or the response is an unreadable content type
# or the response code from the website is not in the 200s
if (
response is not None
and len(response.content) > 10000000
or content_type is not None
and any(
filtered_type in content_type
for filtered_type in ["pdf", "excel", "msword", "image", "rtf", "zip", "octet", "csv", "json"]
)
or response is not None
and not response.ok
):
# Discard the response content to prevent out of memory errors
if DEBUG:
print("Large or unreadable content discarded:", len(response.content), url)
new_response = requests.Response()
new_response.status_code = response.status_code
response = new_response

return response


async def render_js(urls_responses):
"""Renders JavaScript from a list of urls.
Expand Down Expand Up @@ -231,70 +266,182 @@ def parse_response(url_response):
"""Parses relevant HTML tags from a Response object into a dictionary.
Args:
url_response (list[dict]): List of dictionaries containing urls and theeir responses.
url_response (list[dict]): List of dictionaries containing urls and their responses.
Returns:
list[dict]: List of dictionaries containing urls and relevant HTML tags.
dict: Dictionary containing the url and relevant HTML tags.
"""
remove_excess_whitespace = lambda s: " ".join(s.split()).strip()

tags = {}
tags = Tags()
res = url_response["response"]
tags["index"] = url_response["index"]
tags.index = url_response["index"]

# Drop hostname from urls to reduce training bias
tags.url, tags.url_path = get_url(url_response)

tags.root_page_title = remove_excess_whitespace(root_url_cache.get_title(tags.url))

verified, tags.http_response = verify_response(res)
if verified is False:
return asdict(tags)

parser = get_parser(res)
if parser is False:
return asdict(tags)

try:
soup = BeautifulSoup(res.html.html, parser)
except (bs4.builder.ParserRejectedMarkup, AssertionError, AttributeError):
return asdict(tags)

tags.html_title = get_html_title(soup)

tags.meta_description = get_meta_description(soup)

tags = get_header_tags(tags, soup)

tags.div_text = get_div_text(soup)

# Prevents most bs4 memory leaks
if soup.html:
soup.html.decompose()

return asdict(tags)


def get_url(url_response):
"""Returns the url and url_path.
Args:
url_response (list[dict]): List of dictionaries containing urls and their responses.
Returns:
(str, str): Tuple with the url and url_path.
"""
url = url_response["url"][0]
tags["url"] = url
if not url.startswith("http"):
url = "https://" + url
tags["url_path"] = urlparse(url).path[1:]
new_url = url
if not new_url.startswith("http"):
new_url = "https://" + new_url

tags["html_title"] = ""
tags["meta_description"] = ""
tags["root_page_title"] = remove_excess_whitespace(root_url_cache.get_title(tags["url"]))
# Drop hostname from urls to reduce training bias
url_path = urlparse(new_url).path[1:]
# Remove trailing backslash
if url_path and url_path[-1] == "/":
url_path = url_path[:-1]

return url, url_path


def verify_response(res):
"""Verifies the webpage response is readable and ok.
Args:
res (HTMLResponse|Response): Response object to verify.
Returns:
VerifiedResponse(bool, int): A named tuple containing False if verification fails, True otherwise and the http response code.
"""
VerifiedResponse = namedtuple("VerifiedResponse", "verified http_response")
# The response is None if there was an error during connection, meaning there is no content to read
if res is None:
tags["http_response"] = -1
return tags
return VerifiedResponse(False, -1)

tags["http_response"] = res.status_code
# If the connection did not return a 200 code, we can assume there is no relevant content to read
http_response = res.status_code
if not res.ok:
return tags
return VerifiedResponse(False, http_response)

return VerifiedResponse(True, http_response)


def get_parser(res):
"""Retrieves the parser type to use with BeautifulSoup.
Args:
res (HTMLResponse|Response): Response object to read the content-type from.
Returns:
str|bool: A string of the parser to use, or False if not readable.
"""
# Attempt to read the content-type, set the parser accordingly to avoid warning messages
try:
content_type = res.headers["content-type"]
except KeyError:
return tags
return False

# If content type does not contain "html" or "xml" then we can assume that the content is unreadable
if "html" in content_type:
parser = "lxml"
elif "xml" in content_type:
parser = "lxml-xml"
else:
return tags
return False

try:
soup = BeautifulSoup(res.html.html, parser)
except (bs4.builder.ParserRejectedMarkup, AssertionError, AttributeError):
return tags
return parser


def get_html_title(soup):
"""Retrieves the HTML title from a BeautifulSoup object.
Args:
soup (BeautifulSoup): BeautifulSoup object to pull the HTML title from.
Returns:
str: The HTML title.
"""
html_title = ""

if soup.title is not None and soup.title.string is not None:
tags["html_title"] = remove_excess_whitespace(soup.title.string)
else:
tags["html_title"] = ""
html_title = remove_excess_whitespace(soup.title.string)

return html_title


def get_meta_description(soup):
"""Retrieves the meta description from a BeautifulSoup object.
Args:
soup (BeautifulSoup): BeautifulSoup object to pull the meta description from.
Returns:
str: The meta description.
"""
meta_tag = soup.find("meta", attrs={"name": "description"})
try:
tags["meta_description"] = remove_excess_whitespace(meta_tag["content"]) if meta_tag is not None else ""
meta_description = remove_excess_whitespace(meta_tag["content"]) if meta_tag is not None else ""
except KeyError:
tags["meta_description"] = ""
return ""

return meta_description


def get_header_tags(tags, soup):
"""Updates the Tags DataClass with the header tags.
Args:
tags (Tags): DataClass for relevant HTML tags.
soup (BeautifulSoup): BeautifulSoup object to pull the header tags from.
Returns:
Tags: DataClass with updated header tags.
"""
for header_tag in header_tags:
headers = soup.find_all(header_tag)
# Retreives and drops headers containing links to reduce training bias
# Retrieves and drops headers containing links to reduce training bias
header_content = [header.get_text(" ", strip=True) for header in headers if not header.a]
tags[header_tag] = json.dumps(header_content, ensure_ascii=False)
tag_content = json.dumps(header_content, ensure_ascii=False)
setattr(tags, header_tag, tag_content)

return tags


def get_div_text(soup):
"""Retrieves the div text from a BeautifulSoup object.
Args:
soup (BeautifulSoup): BeautifulSoup object to pull the div text from.
Returns:
str: The div text.
"""
# Extract max 500 words of text from HTML <div>'s
div_text = ""
MAX_WORDS = 500
Expand All @@ -307,14 +454,22 @@ def parse_response(url_response):
else:
break # Stop adding text if word limit is reached

# truncate to 5000 characters in case of run-on 'words'
tags["div_text"] = div_text[:MAX_WORDS * 10]
# Truncate to 5000 characters in case of run-on 'words'
div_text = div_text[: MAX_WORDS * 10]

# Prevents most bs4 memory leaks
if soup.html:
soup.html.decompose()
return div_text

return tags

def remove_excess_whitespace(s):
"""Removes leading, trailing, and excess adjacent whitespace.
Args:
s (str): String to remove whitespace from.
Returns:
str: Clean string with excess whitespace stripped.
"""
return " ".join(s.split()).strip()


def collector_main(df, render_javascript=False):
Expand Down

0 comments on commit ddbea3b

Please sign in to comment.