refactor: rewrite cleaner functions for parsing recipe dicts (#1743)

* rewrite cleaner functions

* unify verbage

* try importing dep during check

* fix syntax

* allow override defaults

* satisfy mypy
This commit is contained in:
Hayden
2022-11-10 15:16:51 -09:00
committed by GitHub
parent 77316d639b
commit 89d0cae51d
7 changed files with 918 additions and 347 deletions

View File

@@ -1,14 +1,30 @@
import contextlib
import functools
import html
import json
import operator
import re
import typing
from datetime import datetime, timedelta
from typing import Optional
from slugify import slugify
from mealie.core.root_logger import get_logger
MATCH_DIGITS = re.compile(r"\d+([.,]\d+)?")
""" Allow for commas as decimals (common in Europe) """
logger = get_logger()
MATCH_ISO_STR = re.compile(
r"^P((\d+)Y)?((\d+)M)?((?P<days>\d+)D)?" r"T((?P<hours>\d+)H)?((?P<minutes>\d+)M)?((?P<seconds>\d+(?:\.\d+)?)S)?$",
)
""" Match Duration Strings """
MATCH_HTML_TAGS = re.compile(r"<[^<]+?>")
""" Matches HTML tags `<p>Text</p>` -> `Text` """
MATCH_MULTI_SPACE = re.compile(r" +")
""" Matches multiple spaces `Hello World` -> `Hello World` """
MATCH_ERRONEOUS_WHITE_SPACE = re.compile(r"\n\s*\n")
""" Matches multiple new lines and removes erroneous white space """
def clean(recipe_data: dict, url=None) -> dict:
@@ -27,167 +43,167 @@ def clean(recipe_data: dict, url=None) -> dict:
recipe_data["prepTime"] = clean_time(recipe_data.get("prepTime"))
recipe_data["performTime"] = clean_time(recipe_data.get("performTime"))
recipe_data["totalTime"] = clean_time(recipe_data.get("totalTime"))
recipe_data["recipeCategory"] = category(recipe_data.get("recipeCategory", []))
recipe_data["recipeYield"] = yield_amount(recipe_data.get("recipeYield"))
recipe_data["recipeIngredient"] = ingredient(recipe_data.get("recipeIngredient"))
recipe_data["recipeInstructions"] = instructions(recipe_data.get("recipeInstructions"))
recipe_data["image"] = image(recipe_data.get("image"))
recipe_data["slug"] = slugify(recipe_data.get("name")) # type: ignore
recipe_data["recipeCategory"] = clean_categories(recipe_data.get("recipeCategory", []))
recipe_data["recipeYield"] = clean_yield(recipe_data.get("recipeYield"))
recipe_data["recipeIngredient"] = clean_ingredients(recipe_data.get("recipeIngredient", []))
recipe_data["recipeInstructions"] = clean_instructions(recipe_data.get("recipeInstructions", []))
recipe_data["image"] = clean_image(recipe_data.get("image"))
recipe_data["slug"] = slugify(recipe_data.get("name", ""))
recipe_data["orgURL"] = url
return recipe_data
def clean_string(text: str) -> str:
if isinstance(text, list):
text = text[0]
def clean_string(text: str | list | int) -> str:
"""Cleans a string of HTML tags and extra white space"""
if not isinstance(text, str):
if isinstance(text, list):
text = text[0]
if isinstance(text, int):
text = str(text)
if isinstance(text, int):
text = str(text)
if text == "" or text is None:
if not text:
return ""
text = typing.cast(str, text) # at this point we know text is a string
cleaned_text = html.unescape(text)
cleaned_text = re.sub("<[^<]+?>", "", cleaned_text)
cleaned_text = re.sub(" +", " ", cleaned_text)
cleaned_text = re.sub("</p>", "\n", cleaned_text)
cleaned_text = re.sub(r"\n\s*\n", "\n\n", cleaned_text)
cleaned_text = cleaned_text.replace("\xa0", " ").replace("\t", " ").strip()
cleaned_text = MATCH_HTML_TAGS.sub("", cleaned_text)
cleaned_text = MATCH_MULTI_SPACE.sub(" ", cleaned_text)
cleaned_text = MATCH_ERRONEOUS_WHITE_SPACE.sub("\n\n", cleaned_text)
cleaned_text = cleaned_text.replace("</p>", "\n").replace("\xa0", " ").replace("\t", " ").strip()
return cleaned_text
def category(category: str):
if isinstance(category, list) and len(category) > 0 and isinstance(category[0], dict):
# If the category is a list of dicts, it's probably from a migration
# validate that the required fields are present
valid = []
for cat in category:
if "name" in cat and "slug" in cat:
valid.append(cat)
def clean_image(image: str | list | dict | None = None, default="no image") -> str:
"""
image attempts to parse the image field from a recipe and return a string. Currenty
return valid
Supported Structures:
- `["https://exmaple.com"]` - A list of strings
- `https://exmaple.com` - A string
- `{ "url": "https://exmaple.com"` - A dictionary with a `url` key
if isinstance(category, str) and category != "":
return [category]
Raises:
TypeError: If the image field is not a supported type a TypeError is raised.
return []
def clean_nutrition(nutrition: Optional[dict]) -> dict[str, str]:
# Assumes that all units are supplied in grams, except sodium which may be in mg.
# Fn only expects a dict[str,str]. Other structures should not be parsed.
if not isinstance(nutrition, dict):
return {}
# Allow for commas as decimals (common in Europe)
# Compile once for efficiency
re_match_digits = re.compile(r"\d+([.,]\d+)?")
output_nutrition = {}
for key, val in nutrition.items():
# If the val contains digits matching the regex, add the first match to the output dict.
# Handle unexpected datastructures safely.
try:
if matched_digits := re_match_digits.search(val):
output_nutrition[key] = matched_digits.group(0)
except Exception:
continue
output_nutrition = {key: val.replace(",", ".") for key, val in output_nutrition.items()}
if (
"sodiumContent" in nutrition
and type(nutrition["sodiumContent"]) == str
and "m" not in nutrition["sodiumContent"]
and "g" in nutrition["sodiumContent"]
):
# Sodium is in grams. Parse its value, multiple by 1k and return to string.
try:
output_nutrition["sodiumContent"] = str(float(output_nutrition["sodiumContent"]) * 1000)
except ValueError:
# Could not parse sodium content as float, so don't touch it.
pass
return output_nutrition
def image(image=None) -> str:
Returns:
str: "no image" if any empty string is provided or the url of the image
"""
if not image:
return "no image"
if isinstance(image, list):
return image[0]
elif isinstance(image, dict):
return image["url"]
elif isinstance(image, str):
return image
else:
raise Exception(f"Unrecognised image URL format: {image}")
return default
match image:
case str(image):
return image
case list(image):
return image[0]
case {"url": str(image)}:
return image
case _:
raise TypeError(f"Unexpected type for image: {type(image)}, {image}")
def instructions(instructions) -> list[dict]:
try:
instructions = json.loads(instructions)
except Exception:
pass
def clean_instructions(steps_object: list | dict | str, default: list | None = None) -> list[dict]:
"""
instructions attempts to parse the instructions field from a recipe and return a list of
dictionaries. See match statement for supported types and structures
if not instructions:
return []
Raises:
TypeError: If the instructions field is not a supported type a TypeError is raised.
# Dictionary (Keys: step number strings, Values: the instructions)
if isinstance(instructions, dict):
instructions = list(instructions.values())
Returns:
list[dict]: An ordered list of dictionaries with the keys `text`
"""
if not steps_object:
return default or []
if isinstance(instructions, list) and isinstance(instructions[0], list):
instructions = instructions[0]
# One long string split by (possibly multiple) new lines
if isinstance(instructions, str):
return [{"text": _instruction(line)} for line in instructions.splitlines() if line]
# Plain strings in a list
elif isinstance(instructions, list) and isinstance(instructions[0], str):
return [{"text": _instruction(step)} for step in instructions]
# Dictionaries (let's assume it's a HowToStep) in a list
elif isinstance(instructions, list) and isinstance(instructions[0], dict):
# Try List of Dictionary without "@type" or "type"
if not instructions[0].get("@type", False) and not instructions[0].get("type", False):
return [{"text": _instruction(step["text"])} for step in instructions]
try:
# If HowToStep is under HowToSection
sectionSteps = []
for step in instructions:
if step["@type"] == "HowToSection":
for sectionStep in step["itemListElement"]:
sectionSteps.append(sectionStep)
if len(sectionSteps) > 0:
return [{"text": _instruction(step["text"])} for step in sectionSteps if step["@type"] == "HowToStep"]
return [{"text": _instruction(step["text"])} for step in instructions if step["@type"] == "HowToStep"]
except Exception as e:
logger.error(e)
# Not "@type", try "type"
try:
return [
{"text": _instruction(step["properties"]["text"])}
for step in instructions
if step["type"].find("HowToStep") > -1
]
except Exception:
pass
else:
raise Exception(f"Unrecognised instruction format: {instructions}")
return []
match steps_object:
case [{"text": str()}]: # Base Case
return steps_object
case [{"text": str()}, *_]:
# The is the most common case. Most other operations eventually resolve to this
# match case before being converted to a list of instructions
#
# [
# {"text": "Instruction A"},
# {"text": "Instruction B"},
# ]
#
return [
{"text": _sanitize_instruction_text(instruction["text"])}
for instruction in steps_object
if instruction["text"].strip()
]
case {0: {"text": str()}} | {"0": {"text": str()}} | {1: {"text": str()}} | {"1": {"text": str()}}:
# Some recipes have a dict with a string key representing the index, unsure if these can
# be an int or not so we match against both. Additionally, we match against both 0 and 1 indexed
# list like dicts.
#
# {
# "0": {"text": "Instruction A"},
# "1": {"text": "Instruction B"},
# }
#
steps_object = typing.cast(dict, steps_object)
return clean_instructions([x for x in steps_object.values()])
case str(step_as_str):
# Strings are weird, some sites return a single string with newlines
# others returns a json string for some reasons
#
# "Instruction A\nInstruction B\nInstruction C"
# '{"0": {"text": "Instruction A"}, "1": {"text": "Instruction B"}, "2": {"text": "Instruction C"}}'
#
if step_as_str.startswith("[") or step_as_str.startswith("{"):
try:
return clean_instructions(json.loads(step_as_str))
except json.JSONDecodeError:
pass
return [
{"text": _sanitize_instruction_text(instruction)}
for instruction in step_as_str.splitlines()
if instruction.strip()
]
case [str(), *_]:
# Assume list of strings is a valid list of instructions
#
# [
# "Instruction A",
# "Instruction B",
# ]
#
return [
{"text": _sanitize_instruction_text(instruction)} for instruction in steps_object if instruction.strip()
]
case [{"@type": "HowToSection"}, *_] | [{"type": "HowToSection"}, *_]:
# HowToSections should have the following layout,
# {
# "@type": "HowToSection",
# "itemListElement": [
# {
# "@type": "HowToStep",
# "text": "Instruction A"
# },
# }
#
steps_object = typing.cast(list[dict[str, str]], steps_object)
return clean_instructions(functools.reduce(operator.concat, [x["itemListElement"] for x in steps_object], [])) # type: ignore
case _:
raise TypeError(f"Unexpected type for instructions: {type(steps_object)}, {steps_object}")
def _instruction(line) -> str:
def _sanitize_instruction_text(line: str | dict) -> str:
"""
_sanitize_instructions_text does some basic checking if the value is a string or dictionary
and returns the value of the `text` key if it is a dictionary. The returned string is passed through the
`clean_string` function to remove any html tags and extra whitespace in a loop until the string
is stable.
Calling `clean_string` in a loop is necessary because some sites return a string with erroneously escaped
html tags or markup.
"""
if isinstance(line, dict):
# Some Recipes dotnot adhear to schema
try:
@@ -195,58 +211,111 @@ def _instruction(line) -> str:
except Exception:
line = ""
if not line:
return ""
line = typing.cast(str, line)
clean_line = clean_string(line.strip())
# Some sites erroneously escape their strings on multiple levels
while not clean_line == (clean_line := clean_string(clean_line)):
pass
return clean_line
def ingredient(ingredients: list | None) -> list[str]:
if ingredients:
return [clean_string(ing) for ing in ingredients]
else:
return []
def clean_ingredients(ingredients: list | str | None, default: list = None) -> list[str]:
"""
ingredient attempts to parse the ingredients field from a recipe and return a list of
Supported Structures:
- `["1 cup flour"]` - A list of strings
- `"1 cup flour"` - A string
- `None` - returns an empty list
Raises:
TypeError: If the ingredients field is not a supported type a TypeError is raised.
"""
match ingredients:
case None:
return default or []
case list(ingredients):
return [clean_string(ingredient) for ingredient in ingredients]
case str(ingredients):
return [clean_string(ingredient) for ingredient in ingredients.splitlines()]
case _:
raise TypeError(f"Unexpected type for ingredients: {type(ingredients)}, {ingredients}")
def yield_amount(yld) -> str:
def clean_yield(yld: str | list[str] | None) -> str:
"""
yield_amount attemps to parse out the yield amount from a recipe.
Supported Structures:
- `"4 servings"` - returns the string unmodified
- `["4 servings", "4 Pies"]` - returns the last value
Returns:
str: The yield amount, if it can be parsed else an empty string
"""
if not yld:
return ""
if isinstance(yld, list):
return yld[-1]
else:
return yld
return yld
def clean_time(time_entry):
if time_entry is None or time_entry == "" or time_entry == " ":
def clean_time(time_entry: str | timedelta | None) -> None | str:
"""_summary_
Supported Structures:
- `None` - returns None
- `"PT1H"` - returns "1 hour"
- `"PT1H30M"` - returns "1 hour 30 minutes"
- `timedelta(hours=1, minutes=30)` - returns "1 hour 30 minutes"
Raises:
TypeError: if the type is not supported a TypeError is raised
Returns:
None | str: None if the time_entry is None, otherwise a string representing the time
"""
if not time_entry:
return None
elif isinstance(time_entry, timedelta):
return pretty_print_timedelta(time_entry)
elif isinstance(time_entry, datetime):
pass
# print(time_entry)
elif isinstance(time_entry, str):
try:
time_delta_object = parse_duration(time_entry)
return pretty_print_timedelta(time_delta_object)
except ValueError:
logger.error(f"Could not parse time_entry `{time_entry}`")
match time_entry:
case str(time_entry):
if not time_entry.strip():
return None
try:
time_delta_instructionsect = parse_duration(time_entry)
return pretty_print_timedelta(time_delta_instructionsect)
except ValueError:
return str(time_entry)
case timedelta():
return pretty_print_timedelta(time_entry)
case datetime():
# TODO: Not sure what to do here
return str(time_entry)
else:
return str(time_entry)
case _:
raise TypeError(f"Unexpected type for time: {type(time_entry)}, {time_entry}")
def parse_duration(iso_duration):
"""Parses an ISO 8601 duration string into a datetime.timedelta instance.
def parse_duration(iso_duration: str) -> timedelta:
"""
Parses an ISO 8601 duration string into a datetime.timedelta instance.
Args:
iso_duration: an ISO 8601 duration string.
Returns:
a datetime.timedelta instance
Raises:
ValueError: if the input string is not a valid ISO 8601 duration string.
"""
m = re.match(
r"^P((\d+)Y)?((\d+)M)?((?P<days>\d+)D)?"
r"T((?P<hours>\d+)H)?((?P<minutes>\d+)M)?((?P<seconds>\d+(?:\.\d+)?)S)?$",
iso_duration,
)
m = MATCH_ISO_STR.match(iso_duration)
if m is None:
raise ValueError("invalid ISO 8601 duration string")
@@ -257,7 +326,7 @@ def parse_duration(iso_duration):
# convert parsed years and months to specific number of days.
times = {"days": 0, "hours": 0, "minutes": 0, "seconds": 0}
for unit, _ in times.items():
for unit in times.keys():
if m.group(unit):
times[unit] = int(float(m.group(unit)))
@@ -299,30 +368,73 @@ def pretty_print_timedelta(t: timedelta, max_components=None, max_decimal_places
return " ".join(out_list)
def clean_categories(category: str | list) -> list[str]:
if not category:
return []
match category:
case str(category):
if not category.strip():
return []
return [category]
case [str(), *_]:
return [cat.strip().title() for cat in category if cat.strip()]
case [{"name": str(), "slug": str()}, *_]:
# Special case for when we use the cleaner to cleanup a migration.
#
# [
# { "name": "Dessert", "slug": "dessert"}
# ]
#
return [cat["name"] for cat in category if "name" in cat]
case _:
raise TypeError(f"Unexpected type for category: {type(category)}, {category}")
def clean_tags(data: str | list[str]) -> list[str]:
"""
Gets keywords as a list or natural language list and returns them into a list of strings of individual tags
Gets keywords as a list or natural language list and returns
them into a list of strings of individual tags
"""
if data is None:
if not data:
return []
if isinstance(data, list):
all_str = True
i = 0
while all_str and i < len(data):
all_str = isinstance(data[i], str)
i = i + 1
match data:
case [str(), *_]:
return [tag.strip().title() for tag in data if tag.strip()]
case str(data):
return clean_tags([t for t in data.split(",")])
case _:
return []
# should probably raise exception
# raise TypeError(f"Unexpected type for tags: {type(data)}, {data}")
if all_str:
return data
return []
if isinstance(data, str):
tag_list = data.split(",")
def clean_nutrition(nutrition: dict | None) -> dict[str, str]:
"""
clean_nutrition takes a dictionary of nutrition information and cleans it up
to be stored in the database. It will remove any keys that are not in the
list of valid keys
for i in range(len(tag_list)):
tag_list[i] = tag_list[i].strip().capitalize()
Assumptionas:
- All units are supplied in grams, expect sodium which maybe be in milligrams
return tag_list
Returns:
dict[str, str]: If the argument is None, or not a dictionary, an empty dictionary is returned
"""
if not isinstance(nutrition, dict):
return {}
return []
output_nutrition = {}
for key, val in nutrition.items():
with contextlib.suppress(AttributeError, TypeError):
if matched_digits := MATCH_DIGITS.search(val):
output_nutrition[key] = matched_digits.group(0).replace(",", ".")
if sodium := nutrition.get("sodiumContent", None):
if isinstance(sodium, str) and "m" not in sodium and "g" in sodium:
with contextlib.suppress(AttributeError, TypeError):
output_nutrition["sodiumContent"] = str(float(output_nutrition["sodiumContent"]) * 1000)
return output_nutrition

View File

@@ -123,7 +123,7 @@ class RecipeScraperPackage(ABCScraperStrategy):
self.logger.debug(f"Scraped Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}")
instruction_as_text = cleaner.instructions(instruction_as_text)
instruction_as_text = cleaner.clean_instructions(instruction_as_text)
self.logger.debug(f"Cleaned Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}")
@@ -147,7 +147,9 @@ class RecipeScraperPackage(ABCScraperStrategy):
description=try_get_default(None, "description", "", cleaner.clean_string),
nutrition=try_get_default(None, "nutrition", None, cleaner.clean_nutrition),
recipe_yield=try_get_default(scraped_data.yields, "recipeYield", "1", cleaner.clean_string),
recipe_ingredient=try_get_default(scraped_data.ingredients, "recipeIngredient", [""], cleaner.ingredient),
recipe_ingredient=try_get_default(
scraped_data.ingredients, "recipeIngredient", [""], cleaner.clean_ingredients
),
recipe_instructions=get_instructions(),
total_time=try_get_default(None, "totalTime", None, cleaner.clean_time),
prep_time=try_get_default(None, "prepTime", None, cleaner.clean_time),