mirror of
https://github.com/mealie-recipes/mealie.git
synced 2026-02-04 15:03:10 -05:00
refactor: ♻️ rewrite migrations frontend/backend (#841)
* refactor(frontend): ♻️ rewrite migrations UI * refactor(backend): ♻️ rewrite recipe migrations * remove vue-demi Co-authored-by: hay-kot <hay-kot@pm.me>
This commit is contained in:
37
mealie/services/group_services/migration_service.py
Normal file
37
mealie/services/group_services/migration_service.py
Normal file
@@ -0,0 +1,37 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from functools import cached_property
|
||||
from pathlib import Path
|
||||
|
||||
from pydantic.types import UUID4
|
||||
|
||||
from mealie.core.root_logger import get_logger
|
||||
from mealie.schema.group.group_migration import SupportedMigrations
|
||||
from mealie.schema.reports.reports import ReportOut, ReportSummary
|
||||
from mealie.services._base_http_service.http_services import UserHttpService
|
||||
from mealie.services.events import create_group_event
|
||||
from mealie.services.migrations import ChowdownMigrator, NextcloudMigrator
|
||||
|
||||
logger = get_logger(module=__name__)
|
||||
|
||||
|
||||
class GroupMigrationService(UserHttpService[int, ReportOut]):
|
||||
event_func = create_group_event
|
||||
_restrict_by_group = True
|
||||
_schema = ReportOut
|
||||
|
||||
@cached_property
|
||||
def dal(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def populate_item(self, id: UUID4) -> ReportOut:
|
||||
return None
|
||||
|
||||
def migrate(self, migration: SupportedMigrations, archive: Path) -> ReportSummary:
|
||||
if migration == SupportedMigrations.nextcloud:
|
||||
self.migration_type = NextcloudMigrator(archive, self.db, self.session, self.user.id, self.group_id)
|
||||
|
||||
if migration == SupportedMigrations.chowdown:
|
||||
self.migration_type = ChowdownMigrator(archive, self.db, self.session, self.user.id, self.group_id)
|
||||
|
||||
return self.migration_type.migrate(f"{migration.value.title()} Migration")
|
||||
31
mealie/services/group_services/reports_service.py
Normal file
31
mealie/services/group_services/reports_service.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from functools import cached_property
|
||||
|
||||
from mealie.core.root_logger import get_logger
|
||||
from mealie.schema.reports.reports import ReportCategory, ReportCreate, ReportOut, ReportSummary
|
||||
from mealie.services._base_http_service.crud_http_mixins import CrudHttpMixins
|
||||
from mealie.services._base_http_service.http_services import UserHttpService
|
||||
from mealie.services.events import create_group_event
|
||||
|
||||
logger = get_logger(module=__name__)
|
||||
|
||||
|
||||
class GroupReportService(CrudHttpMixins[ReportOut, ReportCreate, ReportCreate], UserHttpService[int, ReportOut]):
|
||||
event_func = create_group_event
|
||||
_restrict_by_group = True
|
||||
_schema = ReportOut
|
||||
|
||||
@cached_property
|
||||
def dal(self):
|
||||
return self.db.group_reports
|
||||
|
||||
def populate_item(self, id: int) -> ReportOut:
|
||||
self.item = self.dal.get_one(id)
|
||||
return self.item
|
||||
|
||||
def _get_all(self, report_type: ReportCategory = None) -> list[ReportSummary]:
|
||||
return self.dal.multi_query({"group_id": self.group_id, "category": report_type}, limit=9999)
|
||||
|
||||
def delete_one(self, id: int = None) -> ReportOut:
|
||||
return self._delete_one(id)
|
||||
@@ -0,0 +1,2 @@
|
||||
from .chowdown import *
|
||||
from .nextcloud import *
|
||||
|
||||
@@ -1,122 +1,134 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
import yaml
|
||||
from pydantic import BaseModel
|
||||
from typing import Tuple
|
||||
|
||||
from mealie.core import root_logger
|
||||
from mealie.db.database import get_database
|
||||
from mealie.schema.admin import MigrationImport
|
||||
from mealie.db.database import Database
|
||||
from mealie.schema.recipe import Recipe
|
||||
from mealie.schema.user.user import PrivateUser
|
||||
from mealie.services.image import image
|
||||
from mealie.schema.reports.reports import (
|
||||
ReportCategory,
|
||||
ReportCreate,
|
||||
ReportEntryCreate,
|
||||
ReportOut,
|
||||
ReportSummary,
|
||||
ReportSummaryStatus,
|
||||
)
|
||||
from mealie.services.scraper import cleaner
|
||||
from mealie.utils.unzip import unpack_zip
|
||||
|
||||
logger = root_logger.get_logger()
|
||||
from .._base_service import BaseService
|
||||
from .utils.migration_alias import MigrationAlias
|
||||
|
||||
|
||||
class MigrationAlias(BaseModel):
|
||||
"""A datatype used by MigrationBase to pre-process a recipe dictionary to rewrite
|
||||
the alias key in the dictionary, if it exists, to the key. If set a `func` attribute
|
||||
will be called on the value before assigning the value to the new key
|
||||
"""
|
||||
class BaseMigrator(BaseService):
|
||||
key_aliases: list[MigrationAlias]
|
||||
|
||||
key: str
|
||||
alias: str
|
||||
func: Optional[Callable] = None
|
||||
report_entries: list[ReportEntryCreate]
|
||||
report_id: int
|
||||
report: ReportOut
|
||||
|
||||
def __init__(self, archive: Path, db: Database, session, user_id: int, group_id: int):
|
||||
self.archive = archive
|
||||
self.db = db
|
||||
self.session = session
|
||||
self.user_id = user_id
|
||||
self.group_id = group_id
|
||||
|
||||
class MigrationBase(BaseModel):
|
||||
migration_report: list[MigrationImport] = []
|
||||
migration_file: Path
|
||||
session: Optional[Any]
|
||||
key_aliases: Optional[list[MigrationAlias]]
|
||||
self.report_entries = []
|
||||
|
||||
user: PrivateUser
|
||||
self.logger = root_logger.get_logger()
|
||||
|
||||
@property
|
||||
def db(self):
|
||||
return get_database(self.session)
|
||||
super().__init__()
|
||||
|
||||
@property
|
||||
def temp_dir(self) -> TemporaryDirectory:
|
||||
"""unpacks the migration_file into a temporary directory
|
||||
that can be used as a context manager.
|
||||
def _migrate(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
Returns:
|
||||
TemporaryDirectory:
|
||||
def _create_report(self, report_name: str) -> None:
|
||||
report_to_save = ReportCreate(
|
||||
name=report_name,
|
||||
category=ReportCategory.migration,
|
||||
status=ReportSummaryStatus.in_progress,
|
||||
group_id=self.group_id,
|
||||
)
|
||||
|
||||
self.report = self.db.group_reports.create(report_to_save)
|
||||
self.report_id = self.report.id
|
||||
|
||||
def _save_all_entries(self) -> None:
|
||||
|
||||
is_success = True
|
||||
is_failure = True
|
||||
|
||||
for entry in self.report_entries:
|
||||
if is_failure and entry.success:
|
||||
is_failure = False
|
||||
|
||||
if is_success and not entry.success:
|
||||
is_success = False
|
||||
|
||||
self.db.group_report_entries.create(entry)
|
||||
|
||||
if is_success:
|
||||
self.report.status = ReportSummaryStatus.success
|
||||
|
||||
if is_failure:
|
||||
self.report.status = ReportSummaryStatus.failure
|
||||
|
||||
if not is_success and not is_failure:
|
||||
self.report.status = ReportSummaryStatus.partial
|
||||
|
||||
self.db.group_reports.update(self.report.id, self.report)
|
||||
|
||||
def migrate(self, report_name: str) -> ReportSummary:
|
||||
self._create_report(report_name)
|
||||
self._migrate()
|
||||
self._save_all_entries()
|
||||
return self.db.group_reports.get(self.report_id)
|
||||
|
||||
def import_recipes_to_database(self, validated_recipes: list[Recipe]) -> list[Tuple[str, bool]]:
|
||||
"""
|
||||
return unpack_zip(self.migration_file)
|
||||
|
||||
@staticmethod
|
||||
def json_reader(json_file: Path) -> dict:
|
||||
with open(json_file, "r") as f:
|
||||
return json.loads(f.read())
|
||||
|
||||
@staticmethod
|
||||
def yaml_reader(yaml_file: Path) -> dict:
|
||||
"""A helper function to read in a yaml file from a Path. This assumes that the
|
||||
first yaml document is the recipe data and the second, if exists, is the description.
|
||||
Used as a single access point to process a list of Recipe objects into the
|
||||
database in a predictable way. If an error occurs the session is rolled back
|
||||
and the process will continue. All import information is appended to the
|
||||
'migration_report' attribute to be returned to the frontend for display.
|
||||
|
||||
Args:
|
||||
yaml_file (Path): Path to yaml file
|
||||
|
||||
Returns:
|
||||
dict: representing the yaml file as a dictionary
|
||||
validated_recipes (list[Recipe]):
|
||||
"""
|
||||
with open(yaml_file, "r") as f:
|
||||
contents = f.read().split("---")
|
||||
recipe_data = {}
|
||||
for _, document in enumerate(contents):
|
||||
|
||||
# Check if None or Empty String
|
||||
if document is None or document == "":
|
||||
continue
|
||||
return_vars = []
|
||||
|
||||
# Check if 'title:' present
|
||||
elif "title:" in document:
|
||||
recipe_data.update(yaml.safe_load(document))
|
||||
for recipe in validated_recipes:
|
||||
|
||||
else:
|
||||
recipe_data["description"] = document
|
||||
recipe.user_id = self.user_id
|
||||
recipe.group_id = self.group_id
|
||||
|
||||
return recipe_data
|
||||
exception = ""
|
||||
status = False
|
||||
try:
|
||||
self.db.recipes.create(recipe)
|
||||
status = True
|
||||
|
||||
@staticmethod
|
||||
def glob_walker(directory: Path, glob_str: str, return_parent=True) -> list[Path]: # TODO:
|
||||
"""A Helper function that will return the glob matches for the temporary directotry
|
||||
that was unpacked and passed in as the `directory` parameter. If `return_parent` is
|
||||
True the return Paths will be the parent directory for the file that was matched. If
|
||||
false the file itself will be returned.
|
||||
except Exception as inst:
|
||||
exception = inst
|
||||
self.logger.exception(inst)
|
||||
self.session.rollback()
|
||||
|
||||
Args:
|
||||
directory (Path): Path to search directory
|
||||
glob_str ([type]): glob style match string
|
||||
return_parent (bool, optional): To return parent directory of match. Defaults to True.
|
||||
|
||||
Returns:
|
||||
list[Path]:
|
||||
"""
|
||||
directory = directory if isinstance(directory, Path) else Path(directory)
|
||||
matches = []
|
||||
for match in directory.glob(glob_str):
|
||||
if return_parent:
|
||||
matches.append(match.parent)
|
||||
if status:
|
||||
message = f"Imported {recipe.name} successfully"
|
||||
else:
|
||||
matches.append(match)
|
||||
message = f"Failed to import {recipe.name}"
|
||||
|
||||
return matches
|
||||
return_vars.append((recipe.slug, status))
|
||||
|
||||
@staticmethod
|
||||
def import_image(src: Path, dest_slug: str):
|
||||
"""Read the successful migrations attribute and for each import the image
|
||||
appropriately into the image directory. Minification is done in mass
|
||||
after the migration occurs.
|
||||
"""
|
||||
image.write_image(dest_slug, src, extension=src.suffix)
|
||||
self.report_entries.append(
|
||||
ReportEntryCreate(
|
||||
report_id=self.report_id,
|
||||
success=status,
|
||||
message=message,
|
||||
exception=str(exception),
|
||||
)
|
||||
)
|
||||
|
||||
return return_vars
|
||||
|
||||
def rewrite_alias(self, recipe_dict: dict) -> dict:
|
||||
"""A helper function to reassign attributes by an alias using a list
|
||||
@@ -137,7 +149,6 @@ class MigrationBase(BaseModel):
|
||||
try:
|
||||
prop_value = recipe_dict.pop(alias.alias)
|
||||
except KeyError:
|
||||
logger.info(f"Key {alias.alias} Not Found. Skipping...")
|
||||
continue
|
||||
|
||||
if alias.func:
|
||||
@@ -147,7 +158,7 @@ class MigrationBase(BaseModel):
|
||||
|
||||
return recipe_dict
|
||||
|
||||
def clean_recipe_dictionary(self, recipe_dict) -> Recipe:
|
||||
def clean_recipe_dictionary(self, recipe_dict: dict) -> Recipe:
|
||||
"""
|
||||
Calls the rewrite_alias function and the Cleaner.clean function on a
|
||||
dictionary and returns the result unpacked into a Recipe object
|
||||
@@ -156,33 +167,3 @@ class MigrationBase(BaseModel):
|
||||
recipe_dict = cleaner.clean(recipe_dict, url=recipe_dict.get("org_url", None))
|
||||
|
||||
return Recipe(**recipe_dict)
|
||||
|
||||
def import_recipes_to_database(self, validated_recipes: list[Recipe]) -> None:
|
||||
"""
|
||||
Used as a single access point to process a list of Recipe objects into the
|
||||
database in a predictable way. If an error occurs the session is rolled back
|
||||
and the process will continue. All import information is appended to the
|
||||
'migration_report' attribute to be returned to the frontend for display.
|
||||
|
||||
Args:
|
||||
validated_recipes (list[Recipe]):
|
||||
"""
|
||||
|
||||
for recipe in validated_recipes:
|
||||
|
||||
recipe.user_id = self.user.id
|
||||
recipe.group_id = self.user.group_id
|
||||
|
||||
exception = ""
|
||||
status = False
|
||||
try:
|
||||
self.db.recipes.create(recipe.dict())
|
||||
status = True
|
||||
|
||||
except Exception as inst:
|
||||
exception = inst
|
||||
logger.exception(inst)
|
||||
self.session.rollback()
|
||||
|
||||
import_status = MigrationImport(slug=recipe.slug, name=recipe.name, status=status, exception=str(exception))
|
||||
self.migration_report.append(import_status)
|
||||
|
||||
@@ -1,50 +1,50 @@
|
||||
import tempfile
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy.orm.session import Session
|
||||
from mealie.db.database import Database
|
||||
|
||||
from mealie.core.config import get_app_dirs
|
||||
|
||||
app_dirs = get_app_dirs()
|
||||
from mealie.schema.admin import MigrationImport
|
||||
from mealie.schema.user.user import PrivateUser
|
||||
from mealie.services.migrations import helpers
|
||||
from mealie.services.migrations._migration_base import MigrationAlias, MigrationBase
|
||||
from ._migration_base import BaseMigrator
|
||||
from .utils.migration_alias import MigrationAlias
|
||||
from .utils.migration_helpers import MigrationReaders, import_image, split_by_comma
|
||||
|
||||
|
||||
class ChowdownMigration(MigrationBase):
|
||||
key_aliases: Optional[list[MigrationAlias]] = [
|
||||
MigrationAlias(key="name", alias="title", func=None),
|
||||
MigrationAlias(key="recipeIngredient", alias="ingredients", func=None),
|
||||
MigrationAlias(key="recipeInstructions", alias="directions", func=None),
|
||||
MigrationAlias(key="tags", alias="tags", func=helpers.split_by_comma),
|
||||
]
|
||||
class ChowdownMigrator(BaseMigrator):
|
||||
def __init__(self, archive: Path, db: Database, session, user_id: int, group_id: int):
|
||||
super().__init__(archive, db, session, user_id, group_id)
|
||||
|
||||
self.key_aliases = [
|
||||
MigrationAlias(key="name", alias="title", func=None),
|
||||
MigrationAlias(key="recipeIngredient", alias="ingredients", func=None),
|
||||
MigrationAlias(key="recipeInstructions", alias="directions", func=None),
|
||||
MigrationAlias(key="tags", alias="tags", func=split_by_comma),
|
||||
]
|
||||
|
||||
def migrate(user: PrivateUser, session: Session, zip_path: Path) -> list[MigrationImport]:
|
||||
cd_migration = ChowdownMigration(user=user, migration_file=zip_path, session=session)
|
||||
def _migrate(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
with zipfile.ZipFile(self.archive) as zip_file:
|
||||
zip_file.extractall(tmpdir)
|
||||
|
||||
with cd_migration.temp_dir as dir:
|
||||
chow_dir = next(Path(dir).iterdir())
|
||||
image_dir = app_dirs.TEMP_DIR.joinpath(chow_dir, "images")
|
||||
recipe_dir = app_dirs.TEMP_DIR.joinpath(chow_dir, "_recipes")
|
||||
temp_path = Path(tmpdir)
|
||||
|
||||
recipes_as_dicts = [y for x in recipe_dir.glob("*.md") if (y := ChowdownMigration.yaml_reader(x)) is not None]
|
||||
chow_dir = next(temp_path.iterdir())
|
||||
image_dir = temp_path.joinpath(chow_dir, "images")
|
||||
recipe_dir = temp_path.joinpath(chow_dir, "_recipes")
|
||||
|
||||
recipes = [cd_migration.clean_recipe_dictionary(x) for x in recipes_as_dicts]
|
||||
recipes_as_dicts = [y for x in recipe_dir.glob("*.md") if (y := MigrationReaders.yaml(x)) is not None]
|
||||
|
||||
cd_migration.import_recipes_to_database(recipes)
|
||||
recipes = [self.clean_recipe_dictionary(x) for x in recipes_as_dicts]
|
||||
|
||||
recipe_lookup = {r.slug: r for r in recipes}
|
||||
results = self.import_recipes_to_database(recipes)
|
||||
|
||||
for report in cd_migration.migration_report:
|
||||
if report.status:
|
||||
try:
|
||||
original_image = recipe_lookup.get(report.slug).image
|
||||
cd_image = image_dir.joinpath(original_image)
|
||||
except StopIteration:
|
||||
continue
|
||||
if cd_image:
|
||||
ChowdownMigration.import_image(cd_image, report.slug)
|
||||
recipe_lookup = {r.slug: r for r in recipes}
|
||||
|
||||
return cd_migration.migration_report
|
||||
for slug, status in results:
|
||||
if status:
|
||||
try:
|
||||
original_image = recipe_lookup.get(slug).image
|
||||
cd_image = image_dir.joinpath(original_image)
|
||||
except StopIteration:
|
||||
continue
|
||||
if cd_image:
|
||||
import_image(cd_image, slug)
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
def split_by_comma(tag_string: str):
|
||||
"""Splits a single string by ',' performs a line strip and then title cases the resulting string
|
||||
|
||||
Args:
|
||||
tag_string (str): [description]
|
||||
|
||||
Returns:
|
||||
[type]: [description]
|
||||
"""
|
||||
if not isinstance(tag_string, str):
|
||||
return None
|
||||
return [x.title().lstrip() for x in tag_string.split(",") if x != ""]
|
||||
@@ -1,50 +0,0 @@
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
from mealie.core import root_logger
|
||||
from mealie.schema.admin import MigrationImport
|
||||
from mealie.services.migrations import chowdown, nextcloud
|
||||
|
||||
logger = root_logger.get_logger()
|
||||
|
||||
|
||||
class Migration(str, Enum):
|
||||
"""The class defining the supported types of migrations for Mealie. Pass the
|
||||
class attribute of the class instead of the string when using.
|
||||
"""
|
||||
|
||||
nextcloud = "nextcloud"
|
||||
chowdown = "chowdown"
|
||||
|
||||
|
||||
def migrate(user, migration_type: str, file_path: Path, session: Session) -> list[MigrationImport]:
|
||||
"""The new entry point for accessing migrations within the 'migrations' service.
|
||||
Using the 'Migrations' enum class as a selector for migration_type to direct which function
|
||||
to call. All migrations will return a MigrationImport object that is built for displaying
|
||||
detailed information on the frontend. This will provide a single point of access
|
||||
|
||||
Args:
|
||||
migration_type (str): a string option representing the migration type. See Migration attributes for options
|
||||
file_path (Path): Path to the zip file containing the data
|
||||
session (Session): a SqlAlchemy Session
|
||||
|
||||
Returns:
|
||||
list[MigrationImport]: [description]
|
||||
"""
|
||||
|
||||
logger.info(f"Starting Migration from {migration_type}")
|
||||
|
||||
if migration_type == Migration.nextcloud.value:
|
||||
migration_imports = nextcloud.migrate(user, session, file_path)
|
||||
|
||||
elif migration_type == Migration.chowdown.value:
|
||||
migration_imports = chowdown.migrate(user, session, file_path)
|
||||
|
||||
else:
|
||||
return []
|
||||
|
||||
logger.info(f"Finishing Migration from {migration_type}")
|
||||
|
||||
return migration_imports
|
||||
@@ -1,14 +1,16 @@
|
||||
import tempfile
|
||||
import zipfile
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from slugify import slugify
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
from mealie.schema.admin import MigrationImport
|
||||
from mealie.schema.user.user import PrivateUser
|
||||
from mealie.services.migrations import helpers
|
||||
from mealie.services.migrations._migration_base import MigrationAlias, MigrationBase
|
||||
from mealie.db.database import Database
|
||||
|
||||
from ._migration_base import BaseMigrator
|
||||
from .utils.migration_alias import MigrationAlias
|
||||
from .utils.migration_helpers import MigrationReaders, glob_walker, import_image, split_by_comma
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -33,39 +35,38 @@ class NextcloudDir:
|
||||
except StopIteration:
|
||||
image_file = None
|
||||
|
||||
return cls(name=dir.name, recipe=NextcloudMigration.json_reader(json_file), image=image_file)
|
||||
return cls(name=dir.name, recipe=MigrationReaders.json(json_file), image=image_file)
|
||||
|
||||
|
||||
class NextcloudMigration(MigrationBase):
|
||||
key_aliases: Optional[list[MigrationAlias]] = [
|
||||
MigrationAlias(key="tags", alias="keywords", func=helpers.split_by_comma),
|
||||
MigrationAlias(key="org_url", alias="url", func=None),
|
||||
]
|
||||
class NextcloudMigrator(BaseMigrator):
|
||||
def __init__(self, archive: Path, db: Database, session, user_id: int, group_id: int):
|
||||
super().__init__(archive, db, session, user_id, group_id)
|
||||
|
||||
self.key_aliases = [
|
||||
MigrationAlias(key="tags", alias="keywords", func=split_by_comma),
|
||||
MigrationAlias(key="org_url", alias="url", func=None),
|
||||
]
|
||||
|
||||
def migrate(user: PrivateUser, session: Session, zip_path: Path) -> list[MigrationImport]:
|
||||
def _migrate(self) -> None:
|
||||
# Unzip File into temp directory
|
||||
|
||||
nc_migration = NextcloudMigration(user=user, migration_file=zip_path, session=session)
|
||||
# get potential recipe dirs
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
with zipfile.ZipFile(self.archive) as zip_file:
|
||||
zip_file.extractall(tmpdir)
|
||||
|
||||
with nc_migration.temp_dir as dir:
|
||||
potential_recipe_dirs = NextcloudMigration.glob_walker(dir, glob_str="**/[!.]*.json", return_parent=True)
|
||||
potential_recipe_dirs = glob_walker(Path(tmpdir), glob_str="**/[!.]*.json", return_parent=True)
|
||||
nextcloud_dirs = {y.slug: y for x in potential_recipe_dirs if (y := NextcloudDir.from_dir(x))}
|
||||
|
||||
# nextcloud_dirs = [NextcloudDir.from_dir(x) for x in potential_recipe_dirs]
|
||||
nextcloud_dirs = {y.slug: y for x in potential_recipe_dirs if (y := NextcloudDir.from_dir(x))}
|
||||
# nextcloud_dirs = {x.slug: x for x in nextcloud_dirs}
|
||||
all_recipes = []
|
||||
for _, nc_dir in nextcloud_dirs.items():
|
||||
recipe = self.clean_recipe_dictionary(nc_dir.recipe)
|
||||
all_recipes.append(recipe)
|
||||
|
||||
all_recipes = []
|
||||
for _, nc_dir in nextcloud_dirs.items():
|
||||
recipe = nc_migration.clean_recipe_dictionary(nc_dir.recipe)
|
||||
all_recipes.append(recipe)
|
||||
all_statuses = self.import_recipes_to_database(all_recipes)
|
||||
|
||||
nc_migration.import_recipes_to_database(all_recipes)
|
||||
|
||||
for report in nc_migration.migration_report:
|
||||
|
||||
if report.status:
|
||||
nc_dir: NextcloudDir = nextcloud_dirs[report.slug]
|
||||
if nc_dir.image:
|
||||
NextcloudMigration.import_image(nc_dir.image, nc_dir.slug)
|
||||
|
||||
return nc_migration.migration_report
|
||||
for slug, status in all_statuses:
|
||||
if status:
|
||||
nc_dir: NextcloudDir = nextcloud_dirs[slug]
|
||||
if nc_dir.image:
|
||||
import_image(nc_dir.image, nc_dir.slug)
|
||||
|
||||
0
mealie/services/migrations/utils/__init__.py
Normal file
0
mealie/services/migrations/utils/__init__.py
Normal file
14
mealie/services/migrations/utils/migration_alias.py
Normal file
14
mealie/services/migrations/utils/migration_alias.py
Normal file
@@ -0,0 +1,14 @@
|
||||
from typing import Callable, Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class MigrationAlias(BaseModel):
|
||||
"""A datatype used by MigrationBase to pre-process a recipe dictionary to rewrite
|
||||
the alias key in the dictionary, if it exists, to the key. If set a `func` attribute
|
||||
will be called on the value before assigning the value to the new key
|
||||
"""
|
||||
|
||||
key: str
|
||||
alias: str
|
||||
func: Optional[Callable] = None
|
||||
89
mealie/services/migrations/utils/migration_helpers.py
Normal file
89
mealie/services/migrations/utils/migration_helpers.py
Normal file
@@ -0,0 +1,89 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
from mealie.services.image import image
|
||||
|
||||
|
||||
class MigrationReaders:
|
||||
@staticmethod
|
||||
def json(json_file: Path) -> dict:
|
||||
with open(json_file, "r") as f:
|
||||
return json.loads(f.read())
|
||||
|
||||
@staticmethod
|
||||
def yaml(yaml_file: Path) -> dict:
|
||||
"""A helper function to read in a yaml file from a Path. This assumes that the
|
||||
first yaml document is the recipe data and the second, if exists, is the description.
|
||||
|
||||
Args:
|
||||
yaml_file (Path): Path to yaml file
|
||||
|
||||
Returns:
|
||||
dict: representing the yaml file as a dictionary
|
||||
"""
|
||||
with open(yaml_file, "r") as f:
|
||||
contents = f.read().split("---")
|
||||
recipe_data = {}
|
||||
for _, document in enumerate(contents):
|
||||
|
||||
# Check if None or Empty String
|
||||
if document is None or document == "":
|
||||
continue
|
||||
|
||||
# Check if 'title:' present
|
||||
elif "title:" in document:
|
||||
recipe_data.update(yaml.safe_load(document))
|
||||
|
||||
else:
|
||||
recipe_data["description"] = document
|
||||
|
||||
return recipe_data
|
||||
|
||||
|
||||
def split_by_comma(tag_string: str):
|
||||
"""Splits a single string by ',' performs a line strip and then title cases the resulting string
|
||||
|
||||
Args:
|
||||
tag_string (str): [description]
|
||||
|
||||
Returns:
|
||||
[type]: [description]
|
||||
"""
|
||||
if not isinstance(tag_string, str):
|
||||
return None
|
||||
return [x.title().lstrip() for x in tag_string.split(",") if x != ""]
|
||||
|
||||
|
||||
def glob_walker(directory: Path, glob_str: str, return_parent=True) -> list[Path]: # TODO:
|
||||
"""A Helper function that will return the glob matches for the temporary directotry
|
||||
that was unpacked and passed in as the `directory` parameter. If `return_parent` is
|
||||
True the return Paths will be the parent directory for the file that was matched. If
|
||||
false the file itself will be returned.
|
||||
|
||||
Args:
|
||||
directory (Path): Path to search directory
|
||||
glob_str ([type]): glob style match string
|
||||
return_parent (bool, optional): To return parent directory of match. Defaults to True.
|
||||
|
||||
Returns:
|
||||
list[Path]:
|
||||
"""
|
||||
directory = directory if isinstance(directory, Path) else Path(directory)
|
||||
matches = []
|
||||
for match in directory.glob(glob_str):
|
||||
if return_parent:
|
||||
matches.append(match.parent)
|
||||
else:
|
||||
matches.append(match)
|
||||
|
||||
return matches
|
||||
|
||||
|
||||
def import_image(src: Path, dest_slug: str):
|
||||
"""Read the successful migrations attribute and for each import the image
|
||||
appropriately into the image directory. Minification is done in mass
|
||||
after the migration occurs.
|
||||
"""
|
||||
image.write_image(dest_slug, src, extension=src.suffix)
|
||||
Reference in New Issue
Block a user