refactor: event bus refactor (#1574)

* refactored event dispatching
added EventDocumentType and EventOperation to Event
added event listeners to bulk recipe changes
overhauled shopping list item events to be more useful
modified shopping list item repo to return more information

* added internal documentation for event types

* renamed message_types.py to event_types.py

* added unique event id and fixed instantiation

* generalized event listeners and publishers
moved apprise publisher to new apprise event listener
fixed duplicate message bug with apprise publisher

* added JWT field for user-specified integration id

* removed obselete test notification route

* tuned up existing notification tests

* added dependency to get integration_id from jwt

* added base crud controller to facilitate events

* simplified event publishing

* temporarily fixed test notification
This commit is contained in:
Michael Genson
2022-08-27 13:52:45 -05:00
committed by GitHub
parent caa9e03050
commit 23c039b42d
17 changed files with 720 additions and 403 deletions

View File

@@ -0,0 +1,80 @@
import json
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
from fastapi.encoders import jsonable_encoder
from pydantic import UUID4
from sqlalchemy.orm.session import Session
from mealie.repos.repository_factory import AllRepositories
from mealie.schema.group.group_events import GroupEventNotifierPrivate
from .event_types import Event
from .publisher import ApprisePublisher, PublisherLike
class EventListenerBase:
def __init__(self, session: Session, group_id: UUID4, publisher: PublisherLike) -> None:
self.session = session
self.group_id = group_id
self.publisher = publisher
def get_subscribers(self, event: Event) -> list:
"""Get a list of all subscribers to this event"""
...
def publish_to_subscribers(self, event: Event, subscribers: list) -> None:
"""Publishes the event to all subscribers"""
...
class AppriseEventListener(EventListenerBase):
def __init__(self, session: Session, group_id: UUID4) -> None:
super().__init__(session, group_id, ApprisePublisher())
def get_subscribers(self, event: Event) -> list[str]:
repos = AllRepositories(self.session)
notifiers: list[GroupEventNotifierPrivate] = repos.group_event_notifier.by_group( # type: ignore
self.group_id
).multi_query({"enabled": True}, override_schema=GroupEventNotifierPrivate)
urls = [notifier.apprise_url for notifier in notifiers if getattr(notifier.options, event.event_type.name)]
urls = AppriseEventListener.update_urls_with_event_data(urls, event)
return urls
def publish_to_subscribers(self, event: Event, subscribers: list[str]) -> None:
self.publisher.publish(event, subscribers)
@staticmethod
def update_urls_with_event_data(urls: list[str], event: Event):
params = {
"event_type": event.event_type.name,
"integration_id": event.integration_id,
"document_data": json.dumps(jsonable_encoder(event.document_data)),
"event_id": str(event.event_id),
"timestamp": event.timestamp.isoformat(),
}
return [
# We use query params to add custom key: value pairs to the Apprise payload by prepending the key with ":".
AppriseEventListener.merge_query_parameters(url, {f":{k}": v for k, v in params.items()})
# only certain endpoints support the custom key: value pairs, so we only apply them to those endpoints
if AppriseEventListener.is_custom_url(url) else url
for url in urls
]
@staticmethod
def merge_query_parameters(url: str, params: dict):
scheme, netloc, path, query_string, fragment = urlsplit(url)
# merge query params
query_params = parse_qs(query_string)
query_params.update(params)
new_query_string = urlencode(query_params, doseq=True)
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
@staticmethod
def is_custom_url(url: str):
return url.split(":", 1)[0].lower() in ["form", "forms", "json", "jsons", "xml", "xmls"]

View File

@@ -1,14 +1,16 @@
from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit
from typing import Optional
from fastapi import BackgroundTasks, Depends
from pydantic import UUID4
from mealie.core.config import get_app_settings
from mealie.db.db_setup import generate_session
from mealie.repos.repository_factory import AllRepositories
from mealie.schema.group.group_events import GroupEventNotifierPrivate
from mealie.services.event_bus_service.event_bus_listeners import AppriseEventListener, EventListenerBase
from .message_types import EventBusMessage, EventTypes
from .publisher import ApprisePublisher, PublisherLike
from .event_types import Event, EventBusMessage, EventDocumentDataBase, EventTypes
settings = get_app_settings()
ALGORITHM = "HS256"
class EventSource:
@@ -35,66 +37,32 @@ class EventSource:
class EventBusService:
def __init__(self, bg: BackgroundTasks, session=Depends(generate_session)) -> None:
self.bg = bg
self._publisher = ApprisePublisher
self.session = session
self.group_id: UUID4 | None = None
@property
def publisher(self) -> PublisherLike:
return self._publisher()
def get_urls(self, event_type: EventTypes) -> list[str]:
repos = AllRepositories(self.session)
notifiers: list[GroupEventNotifierPrivate] = repos.group_event_notifier.by_group( # type: ignore
self.group_id
).multi_query({"enabled": True}, override_schema=GroupEventNotifierPrivate)
return [notifier.apprise_url for notifier in notifiers if getattr(notifier.options, event_type.name)]
self.listeners: list[EventListenerBase] = [AppriseEventListener(self.session, self.group_id)]
def dispatch(
self, group_id: UUID4, event_type: EventTypes, msg: str = "", event_source: EventSource = None
self,
integration_id: str,
group_id: UUID4,
event_type: EventTypes,
document_data: Optional[EventDocumentDataBase],
message: str = "",
) -> None:
self.group_id = group_id
def _dispatch(event_source: EventSource = None):
if urls := self.get_urls(event_type):
if event_source:
urls = EventBusService.update_urls_with_event_source(urls, event_source)
self.publisher.publish(EventBusMessage.from_type(event_type, body=msg), urls)
if dispatch_task := _dispatch(event_source=event_source):
self.bg.add_task(dispatch_task)
def test_publisher(self, url: str) -> None:
self.bg.add_task(
self.publisher.publish,
event=EventBusMessage.from_type(EventTypes.test_message, body="This is a test event."),
notification_urls=[url],
event = Event(
message=EventBusMessage.from_type(event_type, body=message),
event_type=event_type,
integration_id=integration_id,
document_data=document_data,
)
@staticmethod
def update_urls_with_event_source(urls: list[str], event_source: EventSource):
return [
# We use query params to add custom key: value pairs to the Apprise payload by prepending the key with ":".
EventBusService.merge_query_parameters(url, {f":{k}": v for k, v in event_source.dict().items()})
# only certain endpoints support the custom key: value pairs, so we only apply them to those endpoints
if EventBusService.is_custom_url(url) else url
for url in urls
]
self.bg.add_task(self.publish_event, event=event)
@staticmethod
def merge_query_parameters(url: str, params: dict):
scheme, netloc, path, query_string, fragment = urlsplit(url)
# merge query params
query_params = parse_qs(query_string)
query_params.update(params)
new_query_string = urlencode(query_params, doseq=True)
return urlunsplit((scheme, netloc, path, new_query_string, fragment))
@staticmethod
def is_custom_url(url: str):
return url.split(":", 1)[0].lower() in ["form", "forms", "json", "jsons", "xml", "xmls"]
def publish_event(self, event: Event) -> None:
"""Publishes the event to all listeners"""
for listener in self.listeners:
if subscribers := listener.get_subscribers(event):
listener.publish_to_subscribers(event, subscribers)

View File

@@ -0,0 +1,148 @@
import uuid
from datetime import datetime
from enum import Enum, auto
from pydantic import UUID4
from ...schema._mealie.mealie_model import MealieModel
class EventTypes(Enum):
"""
The event type defines whether or not a subscriber should receive an event.
Each event type is represented by a field on the subscriber repository, therefore any changes
made here must also be reflected in the database (and likely requires a database migration).
If you'd like more granular control over the metadata of the event, e.g. events for sub-records
(like shopping list items), modify the event document type instead (which is not tied to a database entry).
"""
test_message = auto()
recipe_created = auto()
recipe_updated = auto()
recipe_deleted = auto()
user_signup = auto()
data_migrations = auto()
data_export = auto()
data_import = auto()
mealplan_entry_created = auto()
shopping_list_created = auto()
shopping_list_updated = auto()
shopping_list_deleted = auto()
cookbook_created = auto()
cookbook_updated = auto()
cookbook_deleted = auto()
tag_created = auto()
tag_updated = auto()
tag_deleted = auto()
category_created = auto()
category_updated = auto()
category_deleted = auto()
class EventDocumentType(Enum):
generic = "generic"
category = "category"
cookbook = "cookbook"
shopping_list = "shopping_list"
shopping_list_item = "shopping_list_item"
recipe = "recipe"
recipe_bulk_report = "recipe_bulk_report"
tag = "tag"
class EventOperation(Enum):
info = "info"
create = "create"
update = "update"
delete = "delete"
class EventDocumentDataBase(MealieModel):
document_type: EventDocumentType
operation: EventOperation
...
class EventCategoryData(EventDocumentDataBase):
document_type = EventDocumentType.category
category_id: UUID4
class EventCookbookData(EventDocumentDataBase):
document_type = EventDocumentType.cookbook
cookbook_id: UUID4
class EventCookbookBulkData(EventDocumentDataBase):
document_type = EventDocumentType.cookbook
cookbook_ids: list[UUID4]
class EventShoppingListData(EventDocumentDataBase):
document_type = EventDocumentType.shopping_list
shopping_list_id: UUID4
class EventShoppingListItemData(EventDocumentDataBase):
document_type = EventDocumentType.shopping_list_item
shopping_list_id: UUID4
shopping_list_item_id: UUID4
class EventShoppingListItemBulkData(EventDocumentDataBase):
document_type = EventDocumentType.shopping_list_item
shopping_list_id: UUID4
shopping_list_item_ids: list[UUID4]
class EventRecipeData(EventDocumentDataBase):
document_type = EventDocumentType.recipe
recipe_slug: str
class EventRecipeBulkReportData(EventDocumentDataBase):
document_type = EventDocumentType.recipe_bulk_report
report_id: UUID4
class EventTagData(EventDocumentDataBase):
document_type = EventDocumentType.tag
tag_id: UUID4
class EventBusMessage(MealieModel):
title: str
body: str = ""
@classmethod
def from_type(cls, event_type: EventTypes, body: str = "") -> "EventBusMessage":
title = event_type.name.replace("_", " ").title()
return cls(title=title, body=body)
class Event(MealieModel):
message: EventBusMessage
event_type: EventTypes
integration_id: str
document_data: EventDocumentDataBase
# set at instantiation
event_id: UUID4 | None
timestamp: datetime | None
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.event_id = uuid.uuid4()
self.timestamp = datetime.now()

View File

@@ -1,47 +0,0 @@
from enum import Enum, auto
class EventTypes(Enum):
test_message = auto()
recipe_created = auto()
recipe_updated = auto()
recipe_deleted = auto()
user_signup = auto()
data_migrations = auto()
data_export = auto()
data_import = auto()
mealplan_entry_created = auto()
shopping_list_created = auto()
shopping_list_updated = auto()
shopping_list_deleted = auto()
cookbook_created = auto()
cookbook_updated = auto()
cookbook_deleted = auto()
tag_created = auto()
tag_updated = auto()
tag_deleted = auto()
category_created = auto()
category_updated = auto()
category_deleted = auto()
class EventBusMessage:
title: str
body: str = ""
def __init__(self, title, body) -> None:
self.title = title
self.body = body
@classmethod
def from_type(cls, event_type: EventTypes, body: str = "") -> "EventBusMessage":
title = event_type.name.replace("_", " ").title()
return cls(title=title, body=body)

View File

@@ -2,11 +2,11 @@ from typing import Protocol
import apprise
from mealie.services.event_bus_service.event_bus_service import EventBusMessage
from mealie.services.event_bus_service.event_types import Event
class PublisherLike(Protocol):
def publish(self, event: EventBusMessage, notification_urls: list[str]):
def publish(self, event: Event, notification_urls: list[str]):
...
@@ -19,11 +19,18 @@ class ApprisePublisher:
self.apprise = apprise.Apprise(asset=asset)
self.hard_fail = hard_fail
def publish(self, event: EventBusMessage, notification_urls: list[str]):
def publish(self, event: Event, notification_urls: list[str]):
"""Publishses a list of notification URLs"""
tags = []
for dest in notification_urls:
status = self.apprise.add(dest)
# we tag the url so it only sends each notification once
tag = str(event.event_id)
tags.append(tag)
status = self.apprise.add(dest, tag=tag)
if not status and self.hard_fail:
raise Exception("Apprise URL Add Failed")
self.apprise.notify(title=event.title, body=event.body)
self.apprise.notify(title=event.message.title, body=event.message.body, tag=tags)

View File

@@ -70,7 +70,7 @@ class ShoppingListService:
# Set References
new_refs = []
for ref in inner_item.recipe_references:
ref.shopping_list_item_id = base_item.id
ref.shopping_list_item_id = base_item.id # type: ignore
new_refs.append(ref)
base_item.recipe_references.extend(new_refs)
@@ -80,46 +80,64 @@ class ShoppingListService:
return consolidated_list
def consolidate_and_save(self, data: list[ShoppingListItemUpdate]):
def consolidate_and_save(
self, data: list[ShoppingListItemUpdate]
) -> tuple[list[ShoppingListItemOut], list[ShoppingListItemOut]]:
"""
returns:
- updated_shopping_list_items
- deleted_shopping_list_items
"""
# TODO: Convert to update many with single call
all_updates = []
all_deletes = []
keep_ids = []
for item in self.consolidate_list_items(data):
for item in self.consolidate_list_items(data): # type: ignore
updated_data = self.list_items.update(item.id, item)
all_updates.append(updated_data)
keep_ids.append(updated_data.id)
for item in data:
for item in data: # type: ignore
if item.id not in keep_ids:
self.list_items.delete(item.id)
all_deletes.append(item)
return all_updates
return all_updates, all_deletes
# =======================================================================
# Methods
def add_recipe_ingredients_to_list(self, list_id: UUID4, recipe_id: UUID4) -> ShoppingListOut:
def add_recipe_ingredients_to_list(
self, list_id: UUID4, recipe_id: UUID4
) -> tuple[ShoppingListOut, list[ShoppingListItemOut], list[ShoppingListItemOut], list[ShoppingListItemOut]]:
"""
returns:
- updated_shopping_list
- new_shopping_list_items
- updated_shopping_list_items
- deleted_shopping_list_items
"""
recipe = self.repos.recipes.get_one(recipe_id, "id")
to_create = []
for ingredient in recipe.recipe_ingredient:
food_id = None
try:
food_id = ingredient.food.id
food_id = ingredient.food.id # type: ignore
except AttributeError:
pass
label_id = None
try:
label_id = ingredient.food.label.id
label_id = ingredient.food.label.id # type: ignore
except AttributeError:
pass
unit_id = None
try:
unit_id = ingredient.unit.id
unit_id = ingredient.unit.id # type: ignore
except AttributeError:
pass
@@ -142,28 +160,67 @@ class ShoppingListService:
)
)
for item in to_create:
self.repos.group_shopping_list_item.create(item)
new_shopping_list_items = [self.repos.group_shopping_list_item.create(item) for item in to_create]
updated_list = self.shopping_lists.get_one(list_id)
updated_list.list_items = self.consolidate_and_save(updated_list.list_items)
updated_shopping_list = self.shopping_lists.get_one(list_id)
updated_shopping_list_items, deleted_shopping_list_items = self.consolidate_and_save(updated_shopping_list.list_items) # type: ignore
updated_shopping_list.list_items = updated_shopping_list_items
not_found = True
for refs in updated_list.recipe_references:
for refs in updated_shopping_list.recipe_references:
if refs.recipe_id == recipe_id:
refs.recipe_quantity += 1
not_found = False
if not_found:
updated_list.recipe_references.append(ShoppingListItemRecipeRef(recipe_id=recipe_id, recipe_quantity=1))
updated_shopping_list.recipe_references.append(
ShoppingListItemRecipeRef(recipe_id=recipe_id, recipe_quantity=1) # type: ignore
)
updated_list = self.shopping_lists.update(updated_list.id, updated_list)
updated_shopping_list = self.shopping_lists.update(updated_shopping_list.id, updated_shopping_list)
return updated_list
"""
There can be overlap between the list item collections, so we de-duplicate the lists.
def remove_recipe_ingredients_from_list(self, list_id: UUID4, recipe_id: UUID4) -> ShoppingListOut:
First new items are created, then existing items are updated, and finally some items are deleted,
so we can de-duplicate using this logic
"""
new_items_map = {list_item.id: list_item for list_item in new_shopping_list_items}
updated_items_map = {list_item.id: list_item for list_item in updated_shopping_list_items}
deleted_items_map = {list_item.id: list_item for list_item in deleted_shopping_list_items}
# if the item was created and then updated, replace the create with the update and remove the update
for id in list(updated_items_map.keys()):
if id in new_items_map:
new_items_map[id] = updated_items_map[id]
del updated_items_map[id]
# if the item was updated and then deleted, remove the update
updated_shopping_list_items = [
list_item for id, list_item in updated_items_map.items() if id not in deleted_items_map
]
# if the item was created and then deleted, remove it from both lists
new_shopping_list_items = [list_item for id, list_item in new_items_map.items() if id not in deleted_items_map]
deleted_shopping_list_items = [
list_item for id, list_item in deleted_items_map.items() if id not in new_items_map
]
return updated_shopping_list, new_shopping_list_items, updated_shopping_list_items, deleted_shopping_list_items
def remove_recipe_ingredients_from_list(
self, list_id: UUID4, recipe_id: UUID4
) -> tuple[ShoppingListOut, list[ShoppingListItemOut], list[ShoppingListItemOut]]:
"""
returns:
- updated_shopping_list
- updated_shopping_list_items
- deleted_shopping_list_items
"""
shopping_list = self.shopping_lists.get_one(list_id)
updated_shopping_list_items = []
deleted_shopping_list_items = []
for item in shopping_list.list_items:
found = False
@@ -171,7 +228,7 @@ class ShoppingListService:
remove_qty = 0.0
if ref.recipe_id == recipe_id:
self.list_item_refs.delete(ref.id)
self.list_item_refs.delete(ref.id) # type: ignore
item.recipe_references.remove(ref)
found = True
remove_qty = ref.recipe_quantity
@@ -183,20 +240,22 @@ class ShoppingListService:
if item.quantity <= 0:
self.list_items.delete(item.id)
deleted_shopping_list_items.append(item)
else:
self.list_items.update(item.id, item)
updated_shopping_list_items.append(item)
# Decrament the list recipe reference count
for ref in shopping_list.recipe_references:
# Decrement the list recipe reference count
for ref in shopping_list.recipe_references: # type: ignore
if ref.recipe_id == recipe_id:
ref.recipe_quantity -= 1
if ref.recipe_quantity <= 0:
self.list_refs.delete(ref.id)
self.list_refs.delete(ref.id) # type: ignore
else:
self.list_refs.update(ref.id, ref)
self.list_refs.update(ref.id, ref) # type: ignore
break
# Save Changes
return self.shopping_lists.get_one(shopping_list.id)
return self.shopping_lists.get_one(shopping_list.id), updated_shopping_list_items, deleted_shopping_list_items