Feature/improve error message on scrape (#476)

* add better feedback on failed scrape

* fix json download link

* add better recipe parser

* dump deps

* fix force open on mobile

* formatting

* rewrite scraper to use new library

* fix failing tests

* bookmarklet support

* bookmarklet instructions

* recipes changelog

Co-authored-by: hay-kot <hay-kot@pm.me>
This commit is contained in:
Hayden
2021-06-09 13:04:54 -08:00
committed by GitHub
parent 3702331630
commit a78fbea711
22 changed files with 658 additions and 15582 deletions

View File

@@ -13,6 +13,7 @@ from mealie.services.events import create_recipe_event
from mealie.services.image.image import scrape_image, write_image
from mealie.services.recipe.media import check_assets, delete_assets
from mealie.services.scraper.scraper import create_from_url
from scrape_schema_recipe import scrape_url
from slugify import slugify
from sqlalchemy.orm.session import Session
@@ -41,6 +42,11 @@ def create_from_json(
return recipe.slug
@router.post("/test-scrape-url", dependencies=[Depends(get_current_user)])
def test_parse_recipe_url(url: RecipeURLIn):
return scrape_url(url.url)
@router.post("/create-url", status_code=201, response_model=str)
def parse_recipe_url(
background_tasks: BackgroundTasks,

View File

@@ -1,15 +1,14 @@
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, HTTPException, status
from mealie.routes.deps import validate_file_token
from starlette.responses import FileResponse
from fastapi import HTTPException, status
router = APIRouter(prefix="/api/utils", tags=["Utils"], include_in_schema=True)
@router.get("/download/{token}")
@router.get("/download")
async def download_file(file_path: Optional[Path] = Depends(validate_file_token)):
"""Uses a file token obtained by an active user to retrieve a file from the operating
system."""

View File

@@ -42,6 +42,7 @@ def write_image(recipe_slug: str, file_data: bytes, extension: str) -> Path:
def scrape_image(image_url: str, slug: str) -> Path:
logger.info(f"Image URL: {image_url}")
if isinstance(image_url, str): # Handles String Types
image_url = image_url
@@ -64,7 +65,7 @@ def scrape_image(image_url: str, slug: str) -> Path:
if r.status_code == 200:
r.raw.decode_content = True
logger.info(f"File Name Suffix {filename.suffix}")
write_image(slug, r.raw, filename.suffix)
filename.unlink(missing_ok=True)

View File

@@ -39,6 +39,8 @@ def minify_image(image_file: Path, force=False) -> ImageSizes:
min_dest = image_file.parent.joinpath("min-original.webp")
tiny_dest = image_file.parent.joinpath("tiny-original.webp")
cleanup_images = False
if min_dest.exists() and tiny_dest.exists() and org_dest.exists() and not force:
return
try:

View File

@@ -9,7 +9,7 @@ from mealie.db.database import db
from mealie.schema.migration import MigrationImport
from mealie.schema.recipe import Recipe
from mealie.services.image import image
from mealie.services.scraper.cleaner import Cleaner
from mealie.services.scraper import cleaner
from mealie.utils.unzip import unpack_zip
from pydantic import BaseModel
@@ -144,7 +144,7 @@ class MigrationBase(BaseModel):
"""Calls the rewrite_alias function and the Cleaner.clean function on a
dictionary and returns the result unpacked into a Recipe object"""
recipe_dict = self.rewrite_alias(recipe_dict)
recipe_dict = Cleaner.clean(recipe_dict, url=recipe_dict.get("org_url", None))
recipe_dict = cleaner.clean(recipe_dict, url=recipe_dict.get("org_url", None))
return Recipe(**recipe_dict)

View File

@@ -1,4 +1,5 @@
import html
import json
import re
from datetime import datetime, timedelta
from typing import List
@@ -6,157 +7,157 @@ from typing import List
from slugify import slugify
class Cleaner:
"""A Namespace for utility function to clean recipe data extracted
from a url and returns a dictionary that is ready for import into
the database. Cleaner.clean is the main entrypoint
def clean(recipe_data: dict, url=None) -> dict:
"""Main entrypoint to clean a recipe extracted from the web
and format the data into an accectable format for the database
Args:
recipe_data (dict): raw recipe dicitonary
Returns:
dict: cleaned recipe dictionary
"""
recipe_data["description"] = clean_string(recipe_data.get("description", ""))
@staticmethod
def clean(recipe_data: dict, url=None) -> dict:
"""Main entrypoint to clean a recipe extracted from the web
and format the data into an accectable format for the database
# Times
recipe_data["prepTime"] = clean_time(recipe_data.get("prepTime"))
recipe_data["performTime"] = clean_time(recipe_data.get("performTime"))
recipe_data["totalTime"] = clean_time(recipe_data.get("totalTime"))
recipe_data["recipeCategory"] = category(recipe_data.get("recipeCategory", []))
Args:
recipe_data (dict): raw recipe dicitonary
recipe_data["recipeYield"] = yield_amount(recipe_data.get("recipeYield"))
recipe_data["recipeIngredient"] = ingredient(recipe_data.get("recipeIngredient"))
recipe_data["recipeInstructions"] = instructions(recipe_data.get("recipeInstructions"))
recipe_data["image"] = image(recipe_data.get("image"))
recipe_data["slug"] = slugify(recipe_data.get("name"))
recipe_data["orgURL"] = url
Returns:
dict: cleaned recipe dictionary
"""
recipe_data["description"] = Cleaner.html(recipe_data.get("description", ""))
return recipe_data
# Times
recipe_data["prepTime"] = Cleaner.time(recipe_data.get("prepTime"))
recipe_data["performTime"] = Cleaner.time(recipe_data.get("performTime"))
recipe_data["totalTime"] = Cleaner.time(recipe_data.get("totalTime"))
recipe_data["recipeCategory"] = Cleaner.category(recipe_data.get("recipeCategory", []))
recipe_data["recipeYield"] = Cleaner.yield_amount(recipe_data.get("recipeYield"))
recipe_data["recipeIngredient"] = Cleaner.ingredient(recipe_data.get("recipeIngredient"))
recipe_data["recipeInstructions"] = Cleaner.instructions(recipe_data.get("recipeInstructions"))
recipe_data["image"] = Cleaner.image(recipe_data.get("image"))
recipe_data["slug"] = slugify(recipe_data.get("name"))
recipe_data["orgURL"] = url
def clean_string(text: str) -> str:
cleaned_text = html.unescape(text)
cleaned_text = re.sub("<[^<]+?>", "", cleaned_text)
cleaned_text = re.sub(" +", " ", cleaned_text)
cleaned_text = re.sub("</p>", "\n", cleaned_text)
cleaned_text = re.sub(r"\n\s*\n", "\n\n", cleaned_text)
cleaned_text = cleaned_text.replace("\xa0", " ").replace("\t", " ").strip()
return cleaned_text
return recipe_data
@staticmethod
def category(category: str):
if isinstance(category, str) and category != "":
return [category]
else:
return []
def category(category: str):
if isinstance(category, str) and category != "":
return [category]
else:
return []
@staticmethod
def html(raw_html):
cleanr = re.compile("<.*?>")
return re.sub(cleanr, "", raw_html)
@staticmethod
def image(image=None) -> str:
if not image:
return "no image"
if isinstance(image, list):
return image[0]
elif isinstance(image, dict):
return image["url"]
elif isinstance(image, str):
return image
else:
raise Exception(f"Unrecognised image URL format: {image}")
def clean_html(raw_html):
cleanr = re.compile("<.*?>")
return re.sub(cleanr, "", raw_html)
@staticmethod
def instructions(instructions) -> List[dict]:
if not instructions:
return []
if isinstance(instructions[0], list):
instructions = instructions[0]
def image(image=None) -> str:
if not image:
return "no image"
if isinstance(image, list):
return image[0]
elif isinstance(image, dict):
return image["url"]
elif isinstance(image, str):
return image
else:
raise Exception(f"Unrecognised image URL format: {image}")
# One long string split by (possibly multiple) new lines
if isinstance(instructions, str):
return [{"text": Cleaner._instruction(line)} for line in instructions.splitlines() if line]
# Plain strings in a list
elif isinstance(instructions, list) and isinstance(instructions[0], str):
return [{"text": Cleaner._instruction(step)} for step in instructions]
def instructions(instructions) -> List[dict]:
try:
instructions = json.loads(instructions)
except Exception:
pass
# Dictionaries (let's assume it's a HowToStep) in a list
elif isinstance(instructions, list) and isinstance(instructions[0], dict):
# Try List of Dictionary without "@type" or "type"
if not instructions[0].get("@type", False) and not instructions[0].get("type", False):
return [{"text": Cleaner._instruction(step["text"])} for step in instructions]
if not instructions:
return []
if isinstance(instructions, list) and isinstance(instructions[0], list):
instructions = instructions[0]
# One long string split by (possibly multiple) new lines
if isinstance(instructions, str):
return [{"text": _instruction(line)} for line in instructions.splitlines() if line]
# Plain strings in a list
elif isinstance(instructions, list) and isinstance(instructions[0], str):
return [{"text": _instruction(step)} for step in instructions]
# Dictionaries (let's assume it's a HowToStep) in a list
elif isinstance(instructions, list) and isinstance(instructions[0], dict):
# Try List of Dictionary without "@type" or "type"
if not instructions[0].get("@type", False) and not instructions[0].get("type", False):
return [{"text": _instruction(step["text"])} for step in instructions]
try:
# If HowToStep is under HowToSection
sectionSteps = []
for step in instructions:
if step["@type"] == "HowToSection":
[sectionSteps.append(item) for item in step["itemListElement"]]
if len(sectionSteps) > 0:
return [{"text": _instruction(step["text"])} for step in sectionSteps if step["@type"] == "HowToStep"]
return [{"text": _instruction(step["text"])} for step in instructions if step["@type"] == "HowToStep"]
except Exception as e:
print(e)
# Not "@type", try "type"
try:
# If HowToStep is under HowToSection
sectionSteps = []
for step in instructions:
if step["@type"] == "HowToSection":
[sectionSteps.append(item) for item in step["itemListElement"]]
if len(sectionSteps) > 0:
return [
{"text": Cleaner._instruction(step["text"])}
for step in sectionSteps
if step["@type"] == "HowToStep"
]
return [
{"text": Cleaner._instruction(step["text"])}
{"text": _instruction(step["properties"]["text"])}
for step in instructions
if step["@type"] == "HowToStep"
if step["type"].find("HowToStep") > -1
]
except Exception as e:
print(e)
# Not "@type", try "type"
try:
return [
{"text": Cleaner._instruction(step["properties"]["text"])}
for step in instructions
if step["type"].find("HowToStep") > -1
]
except Exception:
pass
except Exception:
pass
else:
raise Exception(f"Unrecognised instruction format: {instructions}")
else:
raise Exception(f"Unrecognised instruction format: {instructions}")
@staticmethod
def _instruction(line) -> str:
clean_line = Cleaner.html(line.strip())
# Some sites erroneously escape their strings on multiple levels
while not clean_line == (clean_line := html.unescape(clean_line)):
pass
return clean_line
@staticmethod
def ingredient(ingredients: list) -> str:
if ingredients:
return [Cleaner.html(html.unescape(ing)) for ing in ingredients]
else:
return []
def _instruction(line) -> str:
clean_line = clean_string(line.strip())
# Some sites erroneously escape their strings on multiple levels
while not clean_line == (clean_line := clean_string(clean_line)):
pass
return clean_line
@staticmethod
def yield_amount(yld) -> str:
if isinstance(yld, list):
return yld[-1]
else:
return yld
@staticmethod
def time(time_entry):
if time_entry is None:
return None
elif isinstance(time_entry, timedelta):
pretty_print_timedelta(time_entry)
elif isinstance(time_entry, datetime):
print(time_entry)
elif isinstance(time_entry, str):
if re.match("PT.*H.*M", time_entry):
time_delta_object = parse_duration(time_entry)
return pretty_print_timedelta(time_delta_object)
else:
return str(time_entry)
def ingredient(ingredients: list) -> str:
if ingredients:
return [clean_string(ing) for ing in ingredients]
else:
return []
def yield_amount(yld) -> str:
if isinstance(yld, list):
return yld[-1]
else:
return yld
def clean_time(time_entry):
if time_entry is None:
return None
elif isinstance(time_entry, timedelta):
pretty_print_timedelta(time_entry)
elif isinstance(time_entry, datetime):
print(time_entry)
elif isinstance(time_entry, str):
if re.match("PT.*H.*M", time_entry):
time_delta_object = parse_duration(time_entry)
return pretty_print_timedelta(time_delta_object)
else:
return str(time_entry)
# ! TODO: Cleanup Code Below

View File

@@ -1,18 +1,20 @@
import json
from typing import List
from enum import Enum
from typing import Any, Callable
import requests
import scrape_schema_recipe
from mealie.core import root_logger
from fastapi import HTTPException, status
from mealie.core.config import app_dirs
from mealie.schema.recipe import Recipe
from mealie.core.root_logger import get_logger
from mealie.schema.recipe import Recipe, RecipeStep
from mealie.services.image.image import scrape_image
from mealie.services.scraper import open_graph
from mealie.services.scraper.cleaner import Cleaner
from mealie.services.scraper import cleaner, open_graph
from recipe_scrapers import NoSchemaFoundInWildMode, SchemaScraperFactory, WebsiteNotImplementedError, scrape_me
LAST_JSON = app_dirs.DEBUG_DIR.joinpath("last_recipe.json")
logger = root_logger.get_logger()
logger = get_logger()
def create_from_url(url: str) -> Recipe:
@@ -25,52 +27,130 @@ def create_from_url(url: str) -> Recipe:
Returns:
Recipe: Recipe Object
"""
r = requests.get(url)
new_recipe = extract_recipe_from_html(r.text, url)
new_recipe = Cleaner.clean(new_recipe, url)
new_recipe = download_image_for_recipe(new_recipe)
return Recipe(**new_recipe)
def extract_recipe_from_html(html: str, url: str) -> dict:
try:
scraped_recipes: List[dict] = scrape_schema_recipe.loads(html, python_objects=True)
dump_last_json(scraped_recipes)
if not scraped_recipes:
scraped_recipes: List[dict] = scrape_schema_recipe.scrape_url(url, python_objects=True)
except Exception as e:
print(e)
scraped_recipes: List[dict] = scrape_schema_recipe.loads(html)
dump_last_json(scraped_recipes)
if not scraped_recipes:
scraped_recipes: List[dict] = scrape_schema_recipe.scrape_url(url)
if scraped_recipes:
new_recipe: dict = scraped_recipes[0]
logger.info(f"Recipe Scraped From Web: {new_recipe}")
if not new_recipe:
return "fail" # TODO: Return Better Error Here
new_recipe = Cleaner.clean(new_recipe, url)
else:
new_recipe = open_graph.basic_recipe_from_opengraph(html, url)
logger.info(f"Recipe Scraped from opengraph metadata: {new_recipe}")
new_recipe = scrape_from_url(url)
logger.info(f"Image {new_recipe.image}")
new_recipe.image = download_image_for_recipe(new_recipe.slug, new_recipe.image)
return new_recipe
def download_image_for_recipe(recipe: dict) -> dict:
try:
img_path = scrape_image(recipe.get("image"), recipe.get("slug"))
recipe["image"] = img_path.name
except Exception:
recipe["image"] = "no image"
class ParserErrors(str, Enum):
bad_recipe = "BAD_RECIPE_DATA"
no_recipe_data = "NO_RECIPE_DATA"
connection_error = "CONNECTION_ERROR"
return recipe
def extract_open_graph_values(url) -> Recipe:
r = requests.get(url)
recipe = open_graph.basic_recipe_from_opengraph(r.text, url)
return Recipe(**recipe)
def scrape_from_url(url: str) -> Recipe:
"""Entry function to generating are recipe obejct from a url
This will determine if a url can be parsed and raise an appropriate error keyword
This keyword is used on the frontend to reference a localized string to present on the UI.
Args:
url (str): String Representing the URL
Raises:
HTTPException: 400_BAD_REQUEST - See ParserErrors Class for Key Details
Returns:
Recipe: Recipe Model
"""
try:
scraped_schema = scrape_me(url)
except (WebsiteNotImplementedError, AttributeError):
try:
scraped_schema = scrape_me(url, wild_mode=True)
except (NoSchemaFoundInWildMode, AttributeError):
recipe = extract_open_graph_values(url)
if recipe.name != "":
return recipe
raise HTTPException(status.HTTP_400_BAD_REQUEST, {"details": ParserErrors.bad_recipe.value})
except ConnectionError:
raise HTTPException(status.HTTP_400_BAD_REQUEST, {"details": ParserErrors.connection_error.value})
try:
instruct = scraped_schema.instructions()
except Exception:
instruct = []
try:
ing = scraped_schema.ingredients()
except Exception:
ing = []
if not instruct and not ing:
raise HTTPException(status.HTTP_400_BAD_REQUEST, {"details": ParserErrors.no_recipe_data.value})
else:
return clean_scraper(scraped_schema, url)
def clean_scraper(scraped_data: SchemaScraperFactory.SchemaScraper, url: str) -> Recipe:
def try_get_default(func_call: Callable, get_attr: str, default: Any, clean_func=None):
value = default
try:
value = func_call()
except Exception:
logger.error(f"Error parsing recipe func_call for '{get_attr}'")
if value == default:
try:
value = scraped_data.schema.data.get(get_attr)
except Exception:
logger.error(f"Error parsing recipe attribute '{get_attr}'")
if clean_func:
value = clean_func(value)
return value
def get_instructions() -> list[dict]:
instruction_as_text = try_get_default(
scraped_data.instructions, "recipeInstructions", ["No Instructions Found"]
)
logger.info(f"Scraped Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}")
instruction_as_text = cleaner.instructions(instruction_as_text)
logger.info(f"Cleaned Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}")
try:
return [RecipeStep(title="", text=x.get("text")) for x in instruction_as_text]
except TypeError:
return []
return Recipe(
name=try_get_default(scraped_data.title, "name", "No Name Found", cleaner.clean_string),
slug="",
image=try_get_default(scraped_data.image, "image", None),
description=try_get_default(None, "description", "", cleaner.clean_string),
recipe_yield=try_get_default(scraped_data.yields, "recipeYield", "1", cleaner.clean_string),
recipe_ingredient=try_get_default(scraped_data.ingredients, "recipeIngredient", [""], cleaner.ingredient),
recipe_instructions=get_instructions(),
total_time=try_get_default(None, "totalTime", None, cleaner.clean_time),
prep_time=try_get_default(None, "prepTime", None, cleaner.clean_time),
perform_time=try_get_default(None, "performTime", None, cleaner.clean_time),
org_url=url,
)
def download_image_for_recipe(slug, image_url) -> dict:
img_name = None
try:
img_path = scrape_image(image_url, slug)
img_name = img_path.name
except Exception as e:
logger.error(f"Error Scraping Image: {e}")
img_name = None
return img_name or "no image"
def dump_last_json(recipe_data: dict):