2023-08-20 13:38:46 -05:00
|
|
|
from collections.abc import Iterable
|
|
|
|
|
from typing import cast
|
|
|
|
|
from uuid import UUID
|
|
|
|
|
|
2022-03-27 15:12:18 -08:00
|
|
|
from pydantic import UUID4
|
2023-08-20 13:38:46 -05:00
|
|
|
from slugify import slugify
|
2024-08-22 10:14:32 -05:00
|
|
|
from sqlalchemy import select
|
2023-08-20 13:38:46 -05:00
|
|
|
from sqlalchemy.exc import IntegrityError
|
2021-08-28 14:27:56 -08:00
|
|
|
|
2021-08-27 20:27:20 -08:00
|
|
|
from mealie.db.models.group import Group
|
2024-01-30 12:41:37 -06:00
|
|
|
from mealie.schema.user.user import GroupBase, GroupInDB, UpdateGroup
|
2021-08-22 16:08:37 -08:00
|
|
|
|
2021-12-18 20:52:36 -09:00
|
|
|
from .repository_generic import RepositoryGeneric
|
2021-08-22 16:08:37 -08:00
|
|
|
|
|
|
|
|
|
2021-12-18 20:52:36 -09:00
|
|
|
class RepositoryGroup(RepositoryGeneric[GroupInDB, Group]):
|
2023-08-20 13:38:46 -05:00
|
|
|
def create(self, data: GroupBase | dict) -> GroupInDB:
|
|
|
|
|
if isinstance(data, GroupBase):
|
2024-02-11 10:47:37 -06:00
|
|
|
data = data.model_dump()
|
2023-08-20 13:38:46 -05:00
|
|
|
|
|
|
|
|
max_attempts = 10
|
|
|
|
|
original_name = cast(str, data["name"])
|
|
|
|
|
|
|
|
|
|
attempts = 0
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
data["slug"] = slugify(data["name"])
|
|
|
|
|
return super().create(data)
|
|
|
|
|
except IntegrityError:
|
|
|
|
|
self.session.rollback()
|
|
|
|
|
attempts += 1
|
|
|
|
|
if attempts >= max_attempts:
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
data["name"] = f"{original_name} ({attempts})"
|
|
|
|
|
|
|
|
|
|
def create_many(self, data: Iterable[GroupInDB | dict]) -> list[GroupInDB]:
|
|
|
|
|
# since create uses special logic for resolving slugs, we don't want to use the standard create_many method
|
|
|
|
|
return [self.create(new_group) for new_group in data]
|
|
|
|
|
|
2024-01-30 12:41:37 -06:00
|
|
|
def update(self, match_value: str | int | UUID4, new_data: UpdateGroup | dict) -> GroupInDB:
|
|
|
|
|
if isinstance(new_data, GroupBase):
|
|
|
|
|
new_data.slug = slugify(new_data.name)
|
|
|
|
|
else:
|
|
|
|
|
new_data["slug"] = slugify(new_data["name"])
|
|
|
|
|
|
|
|
|
|
return super().update(match_value, new_data)
|
|
|
|
|
|
|
|
|
|
def update_many(self, data: Iterable[UpdateGroup | dict]) -> list[GroupInDB]:
|
|
|
|
|
# since update uses special logic for resolving slugs, we don't want to use the standard update_many method
|
|
|
|
|
return [self.update(group["id"] if isinstance(group, dict) else group.id, group) for group in data]
|
|
|
|
|
|
2023-02-06 18:43:12 -09:00
|
|
|
def get_by_name(self, name: str) -> GroupInDB | None:
|
|
|
|
|
dbgroup = self.session.execute(select(self.model).filter_by(name=name)).scalars().one_or_none()
|
2022-03-15 23:28:42 +01:00
|
|
|
if dbgroup is None:
|
|
|
|
|
return None
|
2024-02-11 10:47:37 -06:00
|
|
|
return self.schema.model_validate(dbgroup)
|
2022-03-27 15:12:18 -08:00
|
|
|
|
2023-08-20 13:38:46 -05:00
|
|
|
def get_by_slug_or_id(self, slug_or_id: str | UUID) -> GroupInDB | None:
|
|
|
|
|
if isinstance(slug_or_id, str):
|
|
|
|
|
try:
|
|
|
|
|
slug_or_id = UUID(slug_or_id)
|
|
|
|
|
except ValueError:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
if isinstance(slug_or_id, UUID):
|
|
|
|
|
return self.get_one(slug_or_id)
|
|
|
|
|
else:
|
|
|
|
|
return self.get_one(slug_or_id, key="slug")
|