import time from abc import ABC, abstractmethod from collections.abc import Callable from typing import Any import extruct from fastapi import HTTPException, status from httpx import AsyncClient from recipe_scrapers import NoSchemaFoundInWildMode, SchemaScraperFactory, scrape_html from slugify import slugify from w3lib.html import get_base_url from mealie.core.root_logger import get_logger from mealie.schema.recipe.recipe import Recipe, RecipeStep from mealie.services.scraper.scraped_extras import ScrapedExtras from . import cleaner _FIREFOX_UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:86.0) Gecko/20100101 Firefox/86.0" SCRAPER_TIMEOUT = 15 class ForceTimeoutException(Exception): pass async def safe_scrape_html(url: str) -> str: """ Scrapes the html from a url but will cancel the request if the request takes longer than 15 seconds. This is used to mitigate DDOS attacks from users providing a url with arbitrary large content. """ async with AsyncClient() as client: html_bytes = b"" async with client.stream("GET", url, timeout=SCRAPER_TIMEOUT, headers={"User-Agent": _FIREFOX_UA}) as resp: start_time = time.time() async for chunk in resp.aiter_bytes(chunk_size=1024): html_bytes += chunk if time.time() - start_time > SCRAPER_TIMEOUT: raise ForceTimeoutException() # ===================================== # Copied from requests text property # Try charset from content-type content = None encoding = resp.encoding if not html_bytes: return "" # Fallback to auto-detected encoding. if encoding is None: encoding = resp.apparent_encoding # Decode unicode from given encoding. try: content = str(html_bytes, encoding, errors="replace") except (LookupError, TypeError): # A LookupError is raised if the encoding was not found which could # indicate a misspelling or similar mistake. # # A TypeError can be raised if encoding is None # # So we try blindly encoding. content = str(html_bytes, errors="replace") return content class ABCScraperStrategy(ABC): """ Abstract class for all recipe parsers. """ url: str def __init__(self, url: str) -> None: self.logger = get_logger() self.url = url @abstractmethod async def get_html(self, url: str) -> str: ... @abstractmethod async def parse(self) -> tuple[Recipe, ScrapedExtras] | tuple[None, None]: """Parse a recipe from a web URL. Args: recipe_url (str): Full URL of the recipe to scrape. Returns: Recipe: Recipe object. """ ... class RecipeScraperPackage(ABCScraperStrategy): async def get_html(self, url: str) -> str: return await safe_scrape_html(url) def clean_scraper(self, scraped_data: SchemaScraperFactory.SchemaScraper, url: str) -> tuple[Recipe, ScrapedExtras]: def try_get_default(func_call: Callable | None, get_attr: str, default: Any, clean_func=None): value = default if func_call: try: value = func_call() except Exception: self.logger.error(f"Error parsing recipe func_call for '{get_attr}'") if value == default: try: value = scraped_data.schema.data.get(get_attr) except Exception: self.logger.error(f"Error parsing recipe attribute '{get_attr}'") if clean_func: value = clean_func(value) return value def get_instructions() -> list[RecipeStep]: instruction_as_text = try_get_default( scraped_data.instructions, "recipeInstructions", ["No Instructions Found"] ) self.logger.debug(f"Scraped Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}") instruction_as_text = cleaner.clean_instructions(instruction_as_text) self.logger.debug(f"Cleaned Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}") try: return [RecipeStep(title="", text=x.get("text")) for x in instruction_as_text] except TypeError: return [] cook_time = try_get_default(None, "performTime", None, cleaner.clean_time) or try_get_default( None, "cookTime", None, cleaner.clean_time ) extras = ScrapedExtras() extras.set_tags(try_get_default(None, "keywords", "", cleaner.clean_tags)) recipe = Recipe( name=try_get_default(scraped_data.title, "name", "No Name Found", cleaner.clean_string), slug="", image=try_get_default(None, "image", None), description=try_get_default(None, "description", "", cleaner.clean_string), nutrition=try_get_default(None, "nutrition", None, cleaner.clean_nutrition), recipe_yield=try_get_default(scraped_data.yields, "recipeYield", "1", cleaner.clean_string), recipe_ingredient=try_get_default( scraped_data.ingredients, "recipeIngredient", [""], cleaner.clean_ingredients ), recipe_instructions=get_instructions(), total_time=try_get_default(None, "totalTime", None, cleaner.clean_time), prep_time=try_get_default(None, "prepTime", None, cleaner.clean_time), perform_time=cook_time, org_url=url, ) return recipe, extras async def scrape_url(self) -> SchemaScraperFactory.SchemaScraper | Any | None: recipe_html = await self.get_html(self.url) try: scraped_schema = scrape_html(recipe_html, org_url=self.url) except (NoSchemaFoundInWildMode, AttributeError): self.logger.error("Recipe Scraper was unable to extract a recipe.") return None except ConnectionError as e: raise HTTPException(status.HTTP_400_BAD_REQUEST, {"details": "CONNECTION_ERROR"}) from e # Check to see if the recipe is valid try: ingredients = scraped_schema.ingredients() except Exception: ingredients = [] try: instruct: list | str = scraped_schema.instructions() except Exception: instruct = [] if instruct or ingredients: return scraped_schema self.logger.debug(f"Recipe Scraper [Package] was unable to extract a recipe from {self.url}") return None async def parse(self): """ Parse a recipe from a given url. """ scraped_data = await self.scrape_url() if scraped_data is None: return None return self.clean_scraper(scraped_data, self.url) class RecipeScraperOpenGraph(ABCScraperStrategy): """ Abstract class for all recipe parsers. """ async def get_html(self, url: str) -> str: return await safe_scrape_html(url) def get_recipe_fields(self, html) -> dict | None: """ Get the recipe fields from the Open Graph data. """ def og_field(properties: dict, field_name: str) -> str: return next((val for name, val in properties if name == field_name), "") def og_fields(properties: list[tuple[str, str]], field_name: str) -> list[str]: return list({val for name, val in properties if name == field_name}) base_url = get_base_url(html, self.url) data = extruct.extract(html, base_url=base_url, errors="log") try: properties = data["opengraph"][0]["properties"] except Exception: return None return { "name": og_field(properties, "og:title"), "description": og_field(properties, "og:description"), "image": og_field(properties, "og:image"), "recipeYield": "", "recipeIngredient": ["Could not detect ingredients"], "recipeInstructions": [{"text": "Could not detect instructions"}], "slug": slugify(og_field(properties, "og:title")), "orgURL": og_field(properties, "og:url"), "categories": [], "tags": og_fields(properties, "og:article:tag"), "dateAdded": None, "notes": [], "extras": [], } async def parse(self): """ Parse a recipe from a given url. """ html = await self.get_html(self.url) og_data = self.get_recipe_fields(html) if og_data is None: return None return Recipe(**og_data), ScrapedExtras()