This commit is contained in:
Hayden
2020-12-24 16:37:38 -09:00
commit beed8576c2
137 changed files with 40218 additions and 0 deletions

View File

@@ -0,0 +1,124 @@
import json
import shutil
import zipfile
from datetime import datetime
from pathlib import Path
from db.recipe_models import RecipeDocument
from jinja2 import Template
from utils.logger import logger
from services.recipe_services import IMG_DIR
CWD = Path(__file__).parent
BACKUP_DIR = CWD.parent.joinpath("data", "backups")
TEMPLATE_DIR = CWD.parent.joinpath("data", "templates")
TEMP_DIR = CWD.parent.joinpath("data", "temp")
def auto_backup_job():
for backup in BACKUP_DIR.glob("Auto*.zip"):
backup.unlink()
export_db(tag="Auto", template=None)
logger.info("Auto Backup Called")
def import_from_archive(file_name: str) -> list:
successful_imports = []
file_path = BACKUP_DIR.joinpath(file_name)
with zipfile.ZipFile(file_path, "r") as zip_ref:
zip_ref.extractall(TEMP_DIR)
recipe_dir = TEMP_DIR.joinpath("recipes")
for recipe in recipe_dir.glob("*.json"):
with open(recipe, "r") as f:
recipe_dict = json.loads(f.read())
del recipe_dict["_id"]
del recipe_dict["dateAdded"]
recipeDoc = RecipeDocument(**recipe_dict)
try:
recipeDoc.save()
successful_imports.append(recipe.stem)
except:
print("Failed Import:", recipe.stem)
image_dir = TEMP_DIR.joinpath("images")
for image in image_dir.iterdir():
if image.stem in successful_imports:
shutil.copy(image, IMG_DIR)
shutil.rmtree(TEMP_DIR)
return successful_imports
def export_db(tag=None, template=None):
if tag:
export_tag = tag + "_" + datetime.now().strftime("%Y-%b-%d")
else:
export_tag = datetime.now().strftime("%Y-%b-%d")
backup_folder = TEMP_DIR.joinpath(export_tag)
backup_folder.mkdir(parents=True, exist_ok=True)
img_folder = backup_folder.joinpath("images")
img_folder.mkdir(parents=True, exist_ok=True)
recipe_folder = backup_folder.joinpath("recipes")
recipe_folder.mkdir(parents=True, exist_ok=True)
export_images(img_folder)
export_recipes(recipe_folder, template)
zip_path = BACKUP_DIR.joinpath(f"{export_tag}")
shutil.make_archive(zip_path, "zip", backup_folder)
shutil.rmtree(backup_folder)
shutil.rmtree(TEMP_DIR)
def export_images(dest_dir) -> Path:
for file in IMG_DIR.iterdir():
shutil.copy(file, dest_dir.joinpath(file.name))
def export_recipes(dest_dir: Path, template=None) -> Path:
all_recipes = RecipeDocument.objects()
for recipe in all_recipes:
json_recipe = recipe.to_json(indent=4)
if template:
md_dest = dest_dir.parent.joinpath("markdown")
md_dest.mkdir(parents=True, exist_ok=True)
template = TEMPLATE_DIR.joinpath(template)
export_markdown(md_dest, json_recipe, template)
filename = recipe.slug + ".json"
file_path = dest_dir.joinpath(filename)
with open(file_path, "w") as f:
f.write(json_recipe)
def export_markdown(dest_dir: Path, recipe_data: json, template=Path) -> Path:
recipe_data: dict = json.loads(recipe_data)
recipe_template = TEMPLATE_DIR.joinpath("recipes.md")
with open(recipe_template, "r") as f:
template = Template(f.read())
out_file = dest_dir.joinpath(recipe_data["slug"] + ".md")
content = template.render(recipe=recipe_data)
with open(out_file, "w") as f:
f.write(content)
if __name__ == "__main__":
pass

View File

@@ -0,0 +1,62 @@
import shutil
from pathlib import Path
import requests
from fastapi.responses import FileResponse
CWD = Path(__file__).parent
IMG_DIR = CWD.parent.joinpath("data", "img")
def read_image(recipe_slug: str) -> FileResponse:
recipe_slug = recipe_slug.split(".")[0]
for file in IMG_DIR.glob(f"{recipe_slug}*"):
return file
def write_image(recipe_slug: str, file_data: bytes, extension: str) -> Path.name:
pass
delete_image(recipe_slug)
image_path = Path(IMG_DIR.joinpath(f"{recipe_slug}.{extension}"))
with open(image_path, "ab") as f:
f.write(file_data)
return image_path
def delete_image(recipe_slug: str) -> str:
recipe_slug = recipe_slug.split(".")[0]
for file in IMG_DIR.glob(f"{recipe_slug}*"):
return file.unlink()
def scrape_image(image_url: str, slug: str) -> Path:
if isinstance(image_url, str): # Handles String Types
image_url = image_url
if isinstance(image_url, list): # Handles List Types
image_url = image_url[0]
if isinstance(image_url, dict): # Handles Dictionary Types
for key in image_url:
if key == "url":
image_url = image_url.get("url")
filename = slug + "." + image_url.split(".")[-1]
filename = IMG_DIR.joinpath(filename)
try:
r = requests.get(image_url, stream=True)
except:
return None
if r.status_code == 200:
r.raw.decode_content = True
with open(filename, "wb") as f:
shutil.copyfileobj(r.raw, f)
return filename
return None

View File

@@ -0,0 +1,153 @@
import json
from datetime import date, timedelta
from pathlib import Path
from typing import List, Optional
from db.meal_models import MealDocument, MealPlanDocument
from pydantic import BaseModel
from services.recipe_services import Recipe
CWD = Path(__file__).parent
THIS_WEEK = CWD.parent.joinpath("data", "meal_plan", "this_week.json")
NEXT_WEEK = CWD.parent.joinpath("data", "meal_plan", "next_week.json")
WEEKDAYS = [
"monday",
"tuesday",
"wednesday",
"thursday",
"friday",
"saturday",
"sunday",
]
class Meal(BaseModel):
slug: str
name: Optional[str]
date: Optional[date]
dateText: str
image: Optional[str]
description: Optional[str]
class MealData(BaseModel):
slug: str
dateText: str
class MealPlan(BaseModel):
uid: Optional[str]
startDate: date
endDate: date
meals: List[Meal]
class Config:
schema_extra = {
"example": {
"startDate": date.today(),
"endDate": date.today(),
"meals": [
{"slug": "Packed Mac and Cheese", "date": date.today()},
{"slug": "Eggs and Toast", "date": date.today()},
],
}
}
def process_meals(self):
meals = []
for x, meal in enumerate(self.meals):
recipe = Recipe.get_by_slug(meal.slug)
meal_data = {
"slug": recipe.slug,
"name": recipe.name,
"date": self.startDate + timedelta(days=x),
"dateText": meal.dateText,
"image": recipe.image,
"description": recipe.description,
}
meals.append(Meal(**meal_data))
self.meals = meals
def save_to_db(self):
meal_docs = []
for meal in self.meals:
meal = meal.dict()
meal_doc = MealDocument(**meal)
meal_docs.append(meal_doc)
self.meals = meal_docs
meal_plan = MealPlanDocument(**self.dict())
meal_plan.save()
@staticmethod
def get_all() -> List:
all_meals = []
for plan in MealPlanDocument.objects.order_by("startDate"):
all_meals.append(MealPlan._unpack_doc(plan))
print(all_meals)
return all_meals
def update(self, uid):
document = MealPlanDocument.objects.get(uid=uid)
meal_docs = []
for meal in self.meals:
meal = meal.dict()
meal_doc = MealDocument(**meal)
meal_docs.append(meal_doc)
self.meals = meal_docs
if document:
document.update(set__meals=self.meals)
document.save()
@staticmethod
def delete(uid):
document = MealPlanDocument.objects.get(uid=uid)
if document:
document.delete()
@staticmethod
def _unpack_doc(document: MealPlanDocument):
meal_plan = json.loads(document.to_json())
del meal_plan["_id"]["$oid"]
print(meal_plan)
meal_plan["uid"] = meal_plan["uid"]["$uuid"]
meal_plan["startDate"] = meal_plan["startDate"]["$date"]
meal_plan["endDate"] = meal_plan["endDate"]["$date"]
meals = []
for meal in meal_plan["meals"]:
meal["date"] = meal["date"]["$date"]
meals.append(Meal(**meal))
meal_plan["meals"] = meals
return MealPlan(**meal_plan)
@staticmethod
def today() -> str:
""" Returns the meal slug for Today """
meal_plan = MealPlanDocument.objects.order_by("startDate").limit(1)
meal_plan = MealPlan._unpack_doc(meal_plan[0])
for meal in meal_plan.meals:
if meal.date == date.today():
return meal.slug
return "No Meal Today"
@staticmethod
def this_week():
meal_plan = MealPlanDocument.objects.order_by("startDate").limit(1)
meal_plan = MealPlan._unpack_doc(meal_plan[0])
return meal_plan

View File

@@ -0,0 +1,178 @@
import datetime
import json
from pathlib import Path
from typing import Any, List, Optional
from db.recipe_models import RecipeDocument
from pydantic import BaseModel, validator
from slugify import slugify
from services.image_services import delete_image
CWD = Path(__file__).parent
ALL_RECIPES = CWD.parent.joinpath("data", "all_recipes.json")
IMG_DIR = CWD.parent.joinpath("data", "img")
class RecipeNote(BaseModel):
title: str
text: str
class RecipeStep(BaseModel):
text: str
class Recipe(BaseModel):
# Standard Schema
name: str
description: Optional[str]
image: Optional[Any]
recipeYield: Optional[str]
recipeIngredient: Optional[list]
recipeInstructions: Optional[list]
totalTime: Optional[Any]
# Mealie Specific
slug: Optional[str] = ""
categories: Optional[List[str]]
tags: Optional[List[str]]
dateAdded: Optional[datetime.date]
notes: Optional[List[RecipeNote]]
rating: Optional[int]
rating: Optional[int]
orgURL: Optional[str]
extras: Optional[List[str]]
class Config:
schema_extra = {
"example": {
"name": "Chicken and Rice With Leeks and Salsa Verde",
"description": "This one-skillet dinner gets deep oniony flavor from lots of leeks cooked down to jammy tenderness.",
"image": "chicken-and-rice-with-leeks-and-salsa-verde.jpg",
"recipeYield": "4 Servings",
"recipeIngredient": [
"1 1/2 lb. skinless, boneless chicken thighs (4-8 depending on size)",
"Kosher salt, freshly ground pepper",
"3 Tbsp. unsalted butter, divided",
],
"recipeInstructions": [
{
"text": "Season chicken with salt and pepper.",
},
],
"slug": "chicken-and-rice-with-leeks-and-salsa-verde",
"tags": ["favorite", "yummy!"],
"categories": ["Dinner", "Pasta"],
"notes": [{"title": "Watch Out!", "text": "Prep the day before!"}],
"orgURL": "https://www.bonappetit.com/recipe/chicken-and-rice-with-leeks-and-salsa-verde",
"rating": 3,
}
}
@validator("slug", always=True, pre=True)
def validate_slug(slug: str, values):
name: str = values["name"]
calc_slug: str = slugify(name)
if slug == calc_slug:
return slug
else:
slug = calc_slug
return slug
@classmethod
def _unpack_doc(cls, document):
document = json.loads(document.to_json())
del document["_id"]
document["dateAdded"] = document["dateAdded"]["$date"]
return cls(**document)
@classmethod
def get_by_slug(_cls, slug: str):
""" Returns a recipe dictionary from the slug """
document = RecipeDocument.objects.get(slug=slug)
return Recipe._unpack_doc(document)
def save_to_db(self) -> str:
recipe_dict = self.dict()
extension = Path(recipe_dict["image"]).suffix
recipe_dict["image"] = recipe_dict.get("slug") + extension
try:
total_time = recipe_dict.get("totalTime")
recipe_dict["totalTime"] = str(total_time)
except:
pass
recipeDoc = RecipeDocument(**recipe_dict)
recipeDoc.save()
return recipeDoc.slug
@staticmethod
def delete(recipe_slug: str) -> str:
""" Removes the recipe from the database by slug """
delete_image(recipe_slug)
document = RecipeDocument.objects.get(slug=recipe_slug)
if document:
document.delete()
return "Document Deleted"
@staticmethod
def update(recipe_slug: str, data: dict) -> dict:
""" Updates the recipe from the database by slug """
document = RecipeDocument.objects.get(slug=recipe_slug)
if document:
document.update(set__name=data.get("name"))
document.update(set__description=data.get("description"))
document.update(set__image=data.get("image"))
document.update(set__recipeYield=data.get("recipeYield"))
document.update(set__recipeIngredient=data.get("recipeIngredient"))
document.update(set__recipeInstructions=data.get("recipeInstructions"))
document.update(set__totalTime=data.get("totalTime"))
document.update(set__categories=data.get("categories"))
document.update(set__tags=data.get("tags"))
document.update(set__notes=data.get("notes"))
document.update(set__orgURL=data.get("orgURL"))
document.update(set__rating=data.get("rating"))
document.update(set__extras=data.get("extras"))
document.save()
def read_requested_values(keys: list, max_results: int = 0) -> List[dict]:
"""
Pass in a list of key values to be run against the database. If a match is found
it is then added to a dictionary inside of a list. If a key does not exist the
it will simply not be added to the return data.
Parameters:
keys: list
Returns: returns a list of dicts containing recipe data
"""
recipe_list = []
for recipe in RecipeDocument.objects.order_by("dateAdded").limit(max_results):
recipe_details = {}
for key in keys:
try:
recipe_key = {key: recipe[key]}
except:
continue
recipe_details.update(recipe_key)
recipe_list.append(recipe_details)
return recipe_list

View File

@@ -0,0 +1,72 @@
import collections
import json
import requests
from apscheduler.schedulers.background import BackgroundScheduler
from utils.logger import logger
from services.backup_services import auto_backup_job
from services.meal_services import MealPlan
from services.recipe_services import Recipe
from services.settings_services import SiteSettings
Cron = collections.namedtuple("Cron", "hours minutes")
def cron_parser(time_str: str) -> Cron:
time = time_str.split(":")
cron = Cron(hours=int(time[0]), minutes=int(time[1]))
return cron
def post_webhooks():
all_settings = SiteSettings.get_site_settings()
if all_settings.webhooks.enabled:
todays_meal = Recipe.get_by_slug(MealPlan.today()).dict()
urls = all_settings.webhooks.webhookURLs
for url in urls:
requests.post(url, json.dumps(todays_meal, default=str))
class Scheduler:
def startup_scheduler(self):
self.scheduler = BackgroundScheduler()
logger.info("----INIT SCHEDULE OBJECT-----")
self.scheduler.start()
self.scheduler.add_job(
auto_backup_job, trigger="cron", hour="3", max_instances=1
)
settings = SiteSettings.get_site_settings()
time = cron_parser(settings.webhooks.webhookTime)
self.webhook = self.scheduler.add_job(
post_webhooks,
trigger="cron",
name="webhooks",
hour=time.hours,
minute=time.minutes,
max_instances=1,
)
logger.info(self.scheduler.print_jobs())
def reschedule_webhooks(self):
"""
Reads the site settings database entry to reschedule the webhooks task
Called after each post to the webhooks endpoint.
"""
settings = SiteSettings.get_site_settings()
time = cron_parser(settings.webhooks.webhookTime)
self.scheduler.reschedule_job(
self.webhook.id,
trigger="cron",
hour=time.hours,
minute=time.minutes,
)
logger.info(self.scheduler.print_jobs())

View File

@@ -0,0 +1,40 @@
from scrape_schema_recipe import scrape_url
from slugify import slugify
from services.image_services import scrape_image
from services.recipe_services import Recipe
def create_from_url(url: str) -> dict:
recipe_data = process_recipe_url(url)
recipe = Recipe(**recipe_data)
return recipe.save_to_db()
def process_recipe_url(url: str) -> dict:
new_recipe: dict = scrape_url(url, python_objects=True)[0]
if not new_recipe:
return "fail" # TODO: Return Better Error Here
slug = slugify(new_recipe["name"])
mealie_tags = {
"slug": slug,
"orgURL": url,
"categories": [],
"tags": [],
"dateAdded": None,
"notes": [],
"extras": [],
}
new_recipe.update(mealie_tags)
try:
img_path = scrape_image(new_recipe.get("image"), slug)
new_recipe["image"] = img_path.name
except:
new_recipe["image"] = None
return new_recipe

View File

@@ -0,0 +1,111 @@
import json
from typing import List, Optional
from db.settings_models import (SiteSettingsDocument, SiteThemeDocument,
ThemeColorsDocument, WebhooksDocument)
from pydantic import BaseModel
class Webhooks(BaseModel):
webhookTime: str
webhookURLs: Optional[List[str]]
enabled: bool
@staticmethod
def run():
pass
class SiteSettings(BaseModel):
name: str = "main"
webhooks: Webhooks
@staticmethod
def _unpack_doc(document: SiteSettingsDocument):
document = json.loads(document.to_json())
del document["_id"]
document["webhhooks"] = Webhooks(**document["webhooks"])
return SiteSettings(**document)
@staticmethod
def get_site_settings():
try:
document = SiteSettingsDocument.objects.get(name="main")
except:
webhooks = WebhooksDocument()
document = SiteSettingsDocument(name="main", webhooks=webhooks)
document.save()
return SiteSettings._unpack_doc(document)
def update(self):
document = SiteSettingsDocument.objects.get(name="main")
new_webhooks = WebhooksDocument(**self.webhooks.dict())
document.update(set__webhooks=new_webhooks)
document.save()
class Colors(BaseModel):
primary: str
accent: str
secondary: str
success: str
info: str
warning: str
error: str
class SiteTheme(BaseModel):
name: str
colors: Colors
@staticmethod
def get_by_name(theme_name):
document = SiteThemeDocument.objects.get(name=theme_name)
return SiteTheme._unpack_doc(document)
@staticmethod
def _unpack_doc(document):
document = json.loads(document.to_json())
del document["_id"]
theme_colors = SiteTheme(**document)
return theme_colors
@staticmethod
def get_all():
all_themes = []
for theme in SiteThemeDocument.objects():
all_themes.append(SiteTheme._unpack_doc(theme))
return all_themes
def save_to_db(self):
theme = self.dict()
theme["colors"] = ThemeColorsDocument(**theme["colors"])
theme_document = SiteThemeDocument(**theme)
theme_document.save()
def update_document(self):
theme = self.dict()
theme["colors"] = ThemeColorsDocument(**theme["colors"])
theme_document = SiteThemeDocument.objects.get(name=self.name)
if theme_document:
theme_document.update(set__colors=theme["colors"])
theme_document.save()
@staticmethod
def delete_theme(theme_name: str) -> str:
""" Removes the theme by name """
document = SiteThemeDocument.objects.get(name=theme_name)
if document:
document.delete()
return "Document Deleted"