Source code for meetup2xibo.updater.event_updater

"""Updates the events in the Xibo database to conform to events from
Meetup.com."""

from .anti_flapper import EventFlappingStatus
import logging


[docs]class EventUpdater: """Updates events in a Xibo dataset by inserting, updating, or deleting events to make the Xibo events conform to Meetup.com events.""" logger = logging.getLogger("EventUpdater") def __init__( self, meetup_events, cancelled_meetup_events, xibo_events, xibo_event_crud, anti_flapper, event_suppressor, special_location_monitor): """Initialize with lists (or iterables) of Meetup and Xibo events, a Xibo event CRUD manager, an anti-flapper, an event suppressor, and a special location monitor.""" self.meetup_events = self.event_list_to_dict(meetup_events) self.cancelled_meetup_events = self.event_list_to_dict( cancelled_meetup_events) self.xibo_events = self.event_list_to_dict(xibo_events) self.xibo_event_crud = xibo_event_crud self.anti_flapper = anti_flapper self.event_suppressor = event_suppressor self.special_location_monitor = special_location_monitor
[docs] def update_xibo(self): """Update the Xibo database by inserting, updating, or deleting events to make the Xibo events conform to Meetup.com events.""" ids_from_meetup = set(self.meetup_events.keys()) ids_cancelled_from_meetup = set(self.cancelled_meetup_events.keys()) ids_from_xibo = set(self.xibo_events.keys()) self.update_known_events(ids_from_meetup & ids_from_xibo) self.update_cancelled_events(ids_cancelled_from_meetup & ids_from_xibo) self.insert_new_events(ids_from_meetup - ids_from_xibo) self.delete_unknown_events( ids_from_xibo - ids_from_meetup - ids_cancelled_from_meetup)
[docs] def update_known_events(self, event_ids): """Update changed events given a set of event IDs.""" for event_id in event_ids: xibo_event = self.xibo_events[event_id] meetup_event = self.meetup_events[event_id] self.update_changed_event(xibo_event, meetup_event)
[docs] def update_cancelled_events(self, event_ids): """Update changed events given a set of cancelled event IDs.""" for event_id in event_ids: xibo_event = self.xibo_events[event_id] meetup_event = self.cancelled_meetup_events[event_id] self.update_changed_event(xibo_event, meetup_event)
[docs] def update_changed_event(self, xibo_event, meetup_event): """Update the Xibo event with data from the Meetup event if they differ.""" if xibo_event.name == meetup_event.name \ and xibo_event.location == meetup_event.location \ and xibo_event.start_time == meetup_event.start_time \ and xibo_event.end_time == meetup_event.end_time: self.logger.debug("Unchanged %s", xibo_event) return self.xibo_event_crud.update_xibo_event(xibo_event, meetup_event)
[docs] def insert_new_events(self, event_ids): """Insert new events given a set of event IDs.""" for event_id in event_ids: meetup_event = self.meetup_events[event_id] self.xibo_event_crud.insert_meetup_event(meetup_event)
[docs] def delete_unknown_events(self, event_ids): """Delete unknown (to Meetup) events given a set of event IDs.""" for event_id in event_ids: xibo_event = self.xibo_events[event_id] if self.event_suppressor.should_suppress(event_id): self.delete_xibo_event(xibo_event, "Suppressed") else: self.check_flapping_event(xibo_event)
[docs] def check_flapping_event(self, xibo_event): """Retire or delete events not flapping.""" action = self.anti_flapper.categorize(xibo_event) if action is not EventFlappingStatus.keep: self.delete_xibo_event(xibo_event, action.action)
[docs] def delete_xibo_event(self, xibo_event, action): """Delete an event from Xibo, logging an action such as "Retired".""" self.xibo_event_crud.delete_xibo_event(xibo_event, action) self.special_location_monitor.deleted_event(xibo_event)
[docs] @staticmethod def event_list_to_dict(events): return {event.meetup_id: event for event in events}
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 autoindent