Files
mealie/mealie/services/migrations/tandoor.py
2024-06-07 15:35:36 +00:00

156 lines
5.7 KiB
Python

import json
import os
import tempfile
import zipfile
from pathlib import Path
from typing import Any
from mealie.schema.recipe.recipe_ingredient import RecipeIngredientBase
from mealie.schema.reports.reports import ReportEntryCreate
from ._migration_base import BaseMigrator
from .utils.migration_alias import MigrationAlias
from .utils.migration_helpers import import_image
def _build_ingredient_from_ingredient_data(ingredient_data: dict[str, Any], title: str | None = None) -> dict[str, Any]:
quantity = ingredient_data.get("amount", "1")
if unit_data := ingredient_data.get("unit"):
unit = unit_data.get("plural_name") or unit_data.get("name")
else:
unit = None
if food_data := ingredient_data.get("food"):
food = food_data.get("plural_name") or food_data.get("name")
else:
food = None
base_ingredient = RecipeIngredientBase(quantity=quantity, unit=unit, food=food)
return {"title": title, "note": base_ingredient.display}
def extract_instructions_and_ingredients(steps: list[dict[str, Any]]) -> tuple[list[str], list[dict[str, Any]]]:
"""Returns a list of instructions and ingredients for a recipe"""
instructions: list[str] = []
ingredients: list[dict[str, Any]] = []
for step in steps:
if instruction_text := step.get("instruction"):
instructions.append(instruction_text)
if ingredients_data := step.get("ingredients"):
for i, ingredient in enumerate(ingredients_data):
if not i and (title := step.get("name")):
ingredients.append(_build_ingredient_from_ingredient_data(ingredient, title))
else:
ingredients.append(_build_ingredient_from_ingredient_data(ingredient))
return instructions, ingredients
def _format_time(minutes: int) -> str:
# TODO: make this translatable
hour_label = "hour"
hours_label = "hours"
minute_label = "minute"
minutes_label = "minutes"
hours, minutes = divmod(minutes, 60)
parts: list[str] = []
if hours:
parts.append(f"{int(hours)} {hour_label if hours == 1 else hours_label}")
if minutes:
parts.append(f"{minutes} {minute_label if minutes == 1 else minutes_label}")
return " ".join(parts)
def parse_times(working_time: int, waiting_time: int) -> tuple[str, str]:
"""Returns the performTime and totalTime"""
total_time = working_time + waiting_time
return _format_time(working_time), _format_time(total_time)
class TandoorMigrator(BaseMigrator):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.name = "tandoor"
self.key_aliases = [
MigrationAlias(key="tags", alias="keywords", func=lambda kws: [kw["name"] for kw in kws if kw.get("name")]),
MigrationAlias(key="orgURL", alias="source_url", func=None),
]
def _process_recipe_document(self, source_dir: Path, recipe_data: dict) -> dict:
steps_data = recipe_data.pop("steps", [])
recipe_data["recipeInstructions"], recipe_data["recipeIngredient"] = extract_instructions_and_ingredients(
steps_data
)
recipe_data["performTime"], recipe_data["totalTime"] = parse_times(
recipe_data.pop("working_time", 0), recipe_data.pop("waiting_time", 0)
)
serving_size = recipe_data.pop("servings", 0)
serving_text = recipe_data.pop("servings_text", "")
if serving_size and serving_text:
recipe_data["recipeYield"] = f"{serving_size} {serving_text}"
try:
recipe_image_path = next(source_dir.glob("image.*"))
recipe_data["image"] = str(recipe_image_path)
except StopIteration:
pass
return recipe_data
def _migrate(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
with zipfile.ZipFile(self.archive) as zip_file:
zip_file.extractall(tmpdir)
source_dir = self.get_zip_base_path(Path(tmpdir))
recipes_as_dicts: list[dict] = []
for i, recipe_zip_file in enumerate(source_dir.glob("*.zip")):
try:
recipe_dir = str(source_dir.joinpath(f"recipe_{i+1}"))
os.makedirs(recipe_dir)
with zipfile.ZipFile(recipe_zip_file) as recipe_zip:
recipe_zip.extractall(recipe_dir)
recipe_source_dir = Path(recipe_dir)
try:
recipe_json_path = next(recipe_source_dir.glob("*.json"))
except StopIteration as e:
raise Exception("recipe.json not found") from e
with open(recipe_json_path) as f:
recipes_as_dicts.append(self._process_recipe_document(recipe_source_dir, json.load(f)))
except Exception as e:
self.report_entries.append(
ReportEntryCreate(
report_id=self.report_id,
success=False,
message="Failed to parse recipe",
exception=f"{type(e).__name__}: {e}",
)
)
recipes = [self.clean_recipe_dictionary(x) for x in recipes_as_dicts]
results = self.import_recipes_to_database(recipes)
recipe_lookup = {r.slug: r for r in recipes}
for slug, recipe_id, status in results:
if status:
try:
r = recipe_lookup.get(slug)
if not r or not r.image:
continue
except StopIteration:
continue
import_image(r.image, recipe_id)