feat: Unit standardization / conversion (#7121)

This commit is contained in:
Michael Genson
2026-03-09 12:13:41 -05:00
committed by GitHub
parent 96597915ff
commit b5c089f58c
30 changed files with 1203 additions and 86 deletions

View File

@@ -28,6 +28,7 @@ from mealie.schema.recipe.recipe_ingredient import (
)
from mealie.schema.response.pagination import OrderDirection, PaginationQuery
from mealie.services.parser_services._base import DataMatcher
from mealie.services.parser_services.parser_utils import UnitConverter, merge_quantity_and_unit
class ShoppingListService:
@@ -41,8 +42,7 @@ class ShoppingListService:
self.list_refs = repos.group_shopping_list_recipe_refs
self.data_matcher = DataMatcher(self.repos, food_fuzzy_match_threshold=self.DEFAULT_FOOD_FUZZY_MATCH_THRESHOLD)
@staticmethod
def can_merge(item1: ShoppingListItemBase, item2: ShoppingListItemBase) -> bool:
def can_merge(self, item1: ShoppingListItemBase, item2: ShoppingListItemBase) -> bool:
"""Check to see if this item can be merged with another item"""
if any(
@@ -50,16 +50,28 @@ class ShoppingListService:
item1.checked,
item2.checked,
item1.food_id != item2.food_id,
item1.unit_id != item2.unit_id,
]
):
return False
# check if units match or if they're compatable
if item1.unit_id != item2.unit_id:
item1_unit = item1.unit or self.data_matcher.units_by_id.get(item1.unit_id)
item2_unit = item2.unit or self.data_matcher.units_by_id.get(item2.unit_id)
if not (item1_unit and item1_unit.standard_unit):
return False
if not (item2_unit and item2_unit.standard_unit):
return False
uc = UnitConverter()
if not uc.can_convert(item1_unit.standard_unit, item2_unit.standard_unit):
return False
# if foods match, we can merge, otherwise compare the notes
return bool(item1.food_id) or item1.note == item2.note
@staticmethod
def merge_items(
self,
from_item: ShoppingListItemCreate | ShoppingListItemUpdateBulk,
to_item: ShoppingListItemCreate | ShoppingListItemUpdateBulk | ShoppingListItemOut,
) -> ShoppingListItemUpdate:
@@ -69,7 +81,20 @@ class ShoppingListService:
Attributes of the `to_item` take priority over the `from_item`, except extras with overlapping keys
"""
to_item.quantity += from_item.quantity
to_item_unit = to_item.unit or self.data_matcher.units_by_id.get(to_item.unit_id)
from_item_unit = from_item.unit or self.data_matcher.units_by_id.get(from_item.unit_id)
if to_item_unit and to_item_unit.standard_unit and from_item_unit and from_item_unit.standard_unit:
merged_qty, merged_unit = merge_quantity_and_unit(
from_item.quantity or 0, from_item_unit, to_item.quantity or 0, to_item_unit
)
to_item.quantity = merged_qty
to_item.unit_id = merged_unit.id
to_item.unit = merged_unit
else:
# No conversion needed, just sum the quantities
to_item.quantity += from_item.quantity
if to_item.note != from_item.note:
to_item.note = " | ".join([note for note in [to_item.note, from_item.note] if note])

View File

@@ -29,18 +29,38 @@ class DataMatcher:
self._food_fuzzy_match_threshold = food_fuzzy_match_threshold
self._unit_fuzzy_match_threshold = unit_fuzzy_match_threshold
self._foods_by_id: dict[UUID4, IngredientFood] | None = None
self._units_by_id: dict[UUID4, IngredientUnit] | None = None
self._foods_by_alias: dict[str, IngredientFood] | None = None
self._units_by_alias: dict[str, IngredientUnit] | None = None
@property
def foods_by_alias(self) -> dict[str, IngredientFood]:
if self._foods_by_alias is None:
def foods_by_id(self) -> dict[UUID4, IngredientFood]:
if self._foods_by_id is None:
foods_repo = self.repos.ingredient_foods
query = PaginationQuery(page=1, per_page=-1)
all_foods = foods_repo.page_all(query).items
self._foods_by_id = {food.id: food for food in all_foods}
return self._foods_by_id
@property
def units_by_id(self) -> dict[UUID4, IngredientUnit]:
if self._units_by_id is None:
units_repo = self.repos.ingredient_units
query = PaginationQuery(page=1, per_page=-1)
all_units = units_repo.page_all(query).items
self._units_by_id = {unit.id: unit for unit in all_units}
return self._units_by_id
@property
def foods_by_alias(self) -> dict[str, IngredientFood]:
if self._foods_by_alias is None:
foods_by_alias: dict[str, IngredientFood] = {}
for food in all_foods:
for food in self.foods_by_id.values():
if food.name:
foods_by_alias[IngredientFoodModel.normalize(food.name)] = food
if food.plural_name:
@@ -57,12 +77,8 @@ class DataMatcher:
@property
def units_by_alias(self) -> dict[str, IngredientUnit]:
if self._units_by_alias is None:
units_repo = self.repos.ingredient_units
query = PaginationQuery(page=1, per_page=-1)
all_units = units_repo.page_all(query).items
units_by_alias: dict[str, IngredientUnit] = {}
for unit in all_units:
for unit in self.units_by_id.values():
if unit.name:
units_by_alias[IngredientUnitModel.normalize(unit.name)] = unit
if unit.plural_name:

View File

@@ -1 +1,2 @@
from .string_utils import *
from .unit_utils import *

View File

@@ -0,0 +1,146 @@
from typing import TYPE_CHECKING, Literal, overload
from pint import Quantity, Unit, UnitRegistry
if TYPE_CHECKING:
from mealie.schema.recipe.recipe_ingredient import CreateIngredientUnit
class UnitNotFound(Exception):
"""Raised when trying to access a unit not found in the unit registry."""
def __init__(self, message: str = "Unit not found in unit registry"):
self.message = message
super().__init__(self.message)
def __str__(self):
return f"{self.message}"
class UnitConverter:
def __init__(self):
self.ureg = UnitRegistry()
def _resolve_ounce(self, unit_1: Unit, unit_2: Unit) -> tuple[Unit, Unit]:
"""
Often times "ounce" is used in place of "fluid ounce" in recipes.
When trying to convert/combine ounces with a volume, we can assume it should have been a fluid ounce.
This function will convert ounces to fluid ounces if the other unit is a volume.
"""
OUNCE = self.ureg("ounce")
FL_OUNCE = self.ureg("fluid_ounce")
VOLUME = "[length] ** 3"
if unit_1 == OUNCE and unit_2.dimensionality == VOLUME:
return FL_OUNCE, unit_2
if unit_2 == OUNCE and unit_1.dimensionality == VOLUME:
return unit_1, FL_OUNCE
return unit_1, unit_2
@overload
def parse(self, unit: str | Unit, strict: Literal[False] = False) -> str | Unit: ...
@overload
def parse(self, unit: str | Unit, strict: Literal[True]) -> Unit: ...
def parse(self, unit: str | Unit, strict: bool = False) -> str | Unit:
"""
Parse a string unit into a pint.Unit.
If strict is False (default), returns a pint.Unit if it exists, otherwise returns the original string.
If strict is True, raises UnitNotFound instead of returning a string.
If the input is already a parsed pint.Unit, returns it as-is.
"""
if isinstance(unit, Unit):
return unit
try:
return self.ureg(unit).units
except Exception as e:
if strict:
raise UnitNotFound(f"Unit '{unit}' not found in unit registry") from e
return unit
def can_convert(self, unit: str | Unit, to_unit: str | Unit) -> bool:
"""Whether or not a given unit can be converted into another unit."""
unit = self.parse(unit)
to_unit = self.parse(to_unit)
if not (isinstance(unit, Unit) and isinstance(to_unit, Unit)):
return False
unit, to_unit = self._resolve_ounce(unit, to_unit)
return unit.is_compatible_with(to_unit)
def convert(self, quantity: float, unit: str | Unit, to_unit: str | Unit) -> tuple[float, Unit]:
"""
Convert a quantity and a unit into another unit.
Returns tuple[quantity, unit]
"""
unit = self.parse(unit, strict=True)
to_unit = self.parse(to_unit, strict=True)
unit, to_unit = self._resolve_ounce(unit, to_unit)
qty = quantity * unit
converted = qty.to(to_unit)
return float(converted.magnitude), converted.units
def merge(self, quantity_1: float, unit_1: str | Unit, quantity_2: float, unit_2: str | Unit) -> tuple[float, Unit]:
"""Merge two quantities together"""
unit_1 = self.parse(unit_1, strict=True)
unit_2 = self.parse(unit_2, strict=True)
unit_1, unit_2 = self._resolve_ounce(unit_1, unit_2)
q1 = quantity_1 * unit_1
q2 = quantity_2 * unit_2
out: Quantity = q1 + q2
return float(out.magnitude), out.units
def merge_quantity_and_unit[T: CreateIngredientUnit](
qty_1: float, unit_1: T, qty_2: float, unit_2: T
) -> tuple[float, T]:
"""
Merge a quantity and unit.
Returns tuple[quantity, unit]
"""
if not (unit_1.standard_quantity and unit_1.standard_unit and unit_2.standard_quantity and unit_2.standard_unit):
raise ValueError("Both units must contain standardized unit data")
PINT_UNIT_1_TXT = "_mealie_unit_1"
PINT_UNIT_2_TXT = "_mealie_unit_2"
uc = UnitConverter()
# pre-process units to account for ounce -> fluid_ounce conversion
unit_1_standard = uc.parse(unit_1.standard_unit, strict=True)
unit_2_standard = uc.parse(unit_2.standard_unit, strict=True)
unit_1_standard, unit_2_standard = uc._resolve_ounce(unit_1_standard, unit_2_standard)
# create custon unit definition so pint can handle them natively
uc.ureg.define(f"{PINT_UNIT_1_TXT} = {unit_1.standard_quantity} * {unit_1_standard}")
uc.ureg.define(f"{PINT_UNIT_2_TXT} = {unit_2.standard_quantity} * {unit_2_standard}")
pint_unit_1 = uc.parse(PINT_UNIT_1_TXT)
pint_unit_2 = uc.parse(PINT_UNIT_2_TXT)
merged_q, merged_u = uc.merge(qty_1, pint_unit_1, qty_2, pint_unit_2)
# Convert to the bigger unit if quantity >= 1, else the smaller unit
merged_q, merged_u = uc.convert(merged_q, merged_u, max(pint_unit_1, pint_unit_2))
if abs(merged_q) < 1:
merged_q, merged_u = uc.convert(merged_q, merged_u, min(pint_unit_1, pint_unit_2))
if str(merged_u) == PINT_UNIT_1_TXT:
return merged_q, unit_1
else:
return merged_q, unit_2