"""Test generating the Xibo API."""
from .logging_application import LoggingApplication
from .logging_context import LoggingContext
from .logging_setup_manager import LoggingSetupManager
from .http_response_error import HttpResponseError
from .exceptions import DatasetDiscoveryError, ContainmentLoopError, \
JsonConversionError, MissingEnvVarError
from .meetup2xibo import Meetup2Xibo, XiboSessionProcessor, \
XiboEventCrudProcessor
from .meetup_api import MeetupEventsRetriever
from .place_finder import PlaceFinder
from .location_chooser import LocationChooser
from .conflict_analyzer import ConflictAnalyzer, NullConflictAnalyzer
from .conflict_places import ConflictPlaces, ConflictPlacesLoader
from .event_converter import EventConverter, EventListConverter
from .event_location import EventLocation
from .event_suppressor import EventSuppressor
from .event_updater import EventUpdater
from .phrase_mapper import PhraseMapper
from .xibo_api_url_builder import XiboApiUrlBuilder
from .site_cert_assurer import SiteCertAssurer
from .oauth2_session_starter import Oauth2SessionStarter, \
Oauth2SessionStarterError
from .special_location_monitor import SpecialLocationMonitor
from .time_converter import DateTimeCreator
from .xibo_api import XiboApi
from .xibo_dataset_id_finder import XiboDatasetIdFinder
from .xibo_event import XiboEvent, XiboEventColumnNameManager, \
XiboEventColumnIdManager
from .xibo_event_crud import XiboEventCrud
from .anti_flapper import AntiFlapper
from ahocorasick import Automaton
from requests_toolbelt import user_agent
from pytz import timezone
import certifi
[docs]def inject_logging_application(application_scope):
"""Return a logging application configured by an application scope."""
return LoggingApplication(
inject_logging_context(application_scope),
inject_enter_logging_application_scope(application_scope)
)
[docs]def inject_logging_context(application_scope):
"""Return a logging context configured by an application scope."""
return LoggingContext(
app_name=application_scope.app_name,
description=application_scope.version,
logging_setup_manager=inject_logging_setup_manager(application_scope),
no_trace_exceptions=inject_no_trace_exceptions())
[docs]def inject_logging_setup_manager(application_scope):
"""Return a logging setup manager configured by an application scope."""
return LoggingSetupManager(
log_level=application_scope.log_level,
filename=application_scope.logfile,
verbose=application_scope.verbose,
warnings=application_scope.warnings,
mappings=application_scope.mappings)
[docs]def inject_enter_logging_application_scope(application_scope):
"""Return a function configured by an application scope that provides a
processor configured by an application scope and a notional logging
application scope."""
def enter():
return inject_meetup2xibo(application_scope)
return enter
[docs]def inject_no_trace_exceptions():
"""Return a tuple listing exception classes that need no traceback."""
return (
HttpResponseError,
ContainmentLoopError,
DatasetDiscoveryError,
JsonConversionError,
MissingEnvVarError,
Oauth2SessionStarterError)
[docs]def inject_meetup_events_retriever(application_scope):
"""Return a Meetup events retriever configured by an application scope."""
return MeetupEventsRetriever(
group_url_name=application_scope.meetup_group_url_name,
events_wanted=application_scope.meetup_events_wanted,
cancelled_last_time=inject_cancelled_last_time(application_scope))
[docs]def inject_location_chooser(application_scope):
"""Return a location builder configured by an application scope."""
return LocationChooser(
inject_place_finder(application_scope),
application_scope.special_locations_dict,
inject_default_event_location(application_scope))
[docs]def inject_place_finder(application_scope):
"""Return a place finder configured by an application scope."""
return PlaceFinder(inject_phrase_mappers(application_scope))
[docs]def inject_phrase_mappers(application_scope):
"""Return a list of phrase mappers configured by an application scope."""
return [
inject_places_phrase_mapper(application_scope),
inject_more_places_phrase_mapper(application_scope),
]
[docs]def inject_places_phrase_mapper(application_scope):
"""Return a phrase mapper for place phrases configured by an application
scope."""
return inject_phrase_mapper(
application_scope,
application_scope.place_phrase_tuples)
[docs]def inject_more_places_phrase_mapper(application_scope):
"""Return a phrase mapper for more place phrases configured by an
application scope."""
return inject_phrase_mapper(
application_scope,
application_scope.more_place_phrase_tuples)
[docs]def inject_phrase_mapper(application_scope, phrase_tuples):
"""Return a phrase mapper configured by an application scope and a list of
tuples containing a phrase and its preferred phrase."""
return PhraseMapper(inject_automaton(), phrase_tuples).setup()
[docs]def inject_automaton():
"""Return an Aho-Corasick automaton."""
return Automaton()
[docs]def inject_default_event_location(application_scope):
"""Return an event location for the default location and places configured
by an application scope."""
return EventLocation(
application_scope.default_location,
application_scope.default_places)
[docs]def inject_event_list_converter(application_scope):
"""Return an event list converter configured by an application scope."""
return EventListConverter(
inject_event_converter(application_scope),
inject_event_suppressor(application_scope))
[docs]def inject_event_converter(application_scope):
"""Return an event converter configured by an application scope."""
return EventConverter(
inject_location_chooser(application_scope),
inject_date_time_creator(application_scope))
[docs]def inject_xibo_api_url_builder(application_scope):
"""Return a Xibo API URL builder configured by an application scope."""
return XiboApiUrlBuilder(
application_scope.xibo_host,
application_scope.xibo_port)
[docs]def inject_site_cert_assurer(application_scope):
"""Return a site certificate assurer configured by an application scope."""
return SiteCertAssurer(
sys_ca_path=certifi.where(),
site_ca_path=application_scope.site_ca_path,
site_url=inject_cert_validation_url(application_scope),
user_agent=inject_user_agent(application_scope))
[docs]def inject_cert_validation_url(application_scope):
"""Return the URL for certificate validation configured by an application
scope."""
return inject_xibo_api_url_builder(application_scope) \
.cert_validation_url()
[docs]def inject_xibo_token_url(application_scope):
"""Return the URL for retrieving tokens configured by an application
scope."""
return inject_xibo_api_url_builder(application_scope) \
.access_token_url()
[docs]def inject_user_agent(application_scope):
"""Return the user agent string for web requests configured by an
application scope."""
return user_agent(
application_scope.app_name,
application_scope.version)
[docs]def inject_oauth2_session_starter(application_scope):
"""Return an Oauth2SessionStarter configured by an application scope."""
return Oauth2SessionStarter(
application_scope.xibo_client_id,
application_scope.xibo_client_secret,
inject_xibo_token_url(application_scope),
inject_user_agent(application_scope))
[docs]def inject_event_suppressor(application_scope):
"""Return an event suppressor configured by an application scope."""
return application_scope.event_suppressor(
inject_event_suppressor_provider(application_scope))
[docs]def inject_event_suppressor_provider(application_scope):
"""Return a function that provides an event suppressor configured by an
application session scope."""
def get():
return EventSuppressor(application_scope.suppressed_event_ids)
return get
[docs]def inject_enter_xibo_session_scope(application_scope):
"""Return a function configured by an application scope that provides a
Xibo session processor configured by an application scope and a Xibo
session scope."""
def enter(xibo_session_scope):
return inject_xibo_session_processor(
application_scope, xibo_session_scope)
return enter
[docs]def inject_xibo_session_processor(application_scope, xibo_session_scope):
"""Return a Xibo session processor configured by an application scope and a
Xibo session scope."""
return XiboSessionProcessor(
application_scope.event_dataset_code,
inject_xibo_dataset_id_finder(application_scope, xibo_session_scope),
inject_column_name_manager(application_scope),
inject_xibo_api(application_scope, xibo_session_scope),
inject_enter_xibo_event_crud_scope(
application_scope, xibo_session_scope)
)
[docs]def inject_xibo_api(application_scope, xibo_session_scope):
"""Return a Xibo API manager configured by an application scope and a Xibo
session scope."""
return XiboApi(
xibo_session_scope.xibo_session,
inject_xibo_api_url_builder(application_scope),
application_scope.xibo_page_length)
[docs]def inject_xibo_dataset_id_finder(application_scope, xibo_session_scope):
"""Return a Xibo dataset ID finder configured by an application scope and a
Xibo session scope."""
return XiboDatasetIdFinder(
inject_xibo_api(application_scope, xibo_session_scope))
[docs]def inject_column_name_manager(application_scope):
"""Return a Xibo column name manager configured by an application scope."""
return XiboEventColumnNameManager(
inject_xibo_column_names(application_scope))
[docs]def inject_xibo_column_names(application_scope):
"""Return Xibo event column names configured by an application scope."""
return XiboEvent(
xibo_id=application_scope.xibo_id_column_name,
meetup_id=application_scope.meetup_id_column_name,
name=application_scope.name_column_name,
location=application_scope.location_column_name,
start_time=application_scope.start_time_column_name,
end_time=application_scope.end_time_column_name
)
[docs]def inject_enter_xibo_event_crud_scope(application_scope, xibo_session_scope):
"""Return a function configured by an application scope that provides a
Xibo event CRUD processor configured by an application scope and a Xibo
session scope."""
def enter(xibo_event_crud_scope):
return inject_xibo_event_crud_processor(
application_scope, xibo_session_scope, xibo_event_crud_scope)
return enter
[docs]def inject_xibo_event_crud_processor(
application_scope, xibo_session_scope, xibo_event_crud_scope):
"""Return Xibo event CRUD processor configured by an application scope, a
Xibo session scope and a Xibo event CRUD scope."""
return XiboEventCrudProcessor(
inject_xibo_event_crud(
application_scope, xibo_session_scope, xibo_event_crud_scope),
inject_event_updater_provider(application_scope, xibo_session_scope)
)
[docs]def inject_xibo_event_crud(
application_scope, xibo_session_scope, xibo_event_crud_scope):
"""Return a Xibo event CRUD manager configured by an application scope, a
Xibo session scope and a Xibo event CRUD scope."""
return XiboEventCrud(
inject_xibo_api(application_scope, xibo_session_scope),
xibo_event_crud_scope.event_dataset_id,
inject_column_name_manager(application_scope),
inject_xibo_event_column_id_manager(xibo_event_crud_scope)
)
[docs]def inject_xibo_event_column_id_manager(xibo_event_crud_scope):
"""Return a Xibo column ID manager configured by a Xibo event CRUD
scope."""
return XiboEventColumnIdManager(xibo_event_crud_scope.event_column_ids)
[docs]def inject_event_updater_provider(application_scope, xibo_session_scope):
"""Return a function that provides an event updater configured by a Xibo
session scope."""
def get(xibo_event_crud, xibo_events):
return EventUpdater(
xibo_session_scope.meetup_events,
xibo_session_scope.cancelled_meetup_events,
xibo_events,
xibo_event_crud,
inject_anti_flapper(application_scope),
inject_event_suppressor(application_scope),
inject_special_location_monitor(application_scope)
)
return get
[docs]def inject_anti_flapper(application_scope):
"""Return an anti-flapper configured by an application scope."""
return AntiFlapper(
inject_recent_limit(application_scope),
inject_current_limit(application_scope),
inject_future_limit(application_scope)
)
[docs]def inject_special_location_monitor(application_scope):
"""Return a special location monitor configured by an application scope."""
return SpecialLocationMonitor(application_scope.special_locations_dict)
[docs]def inject_tzinfo(application_scope):
"""Return timezone info configured by an application scope."""
return timezone(application_scope.timezone)
[docs]def inject_date_time_creator(application_scope):
"""Return a date/time creator configured by an application scope."""
return DateTimeCreator(inject_tzinfo(application_scope))
[docs]def inject_selected_conflict_analyzer(application_scope):
"""Return the conflict analyzer selected by a command-line argument and
configured by an application scope."""
if application_scope.conflicts:
return inject_conflict_analyzer(application_scope)
else:
return inject_null_conflict_analyzer()
[docs]def inject_conflict_analyzer(application_scope):
"""Return a conflict analyzer configured by an application scope."""
return ConflictAnalyzer(inject_conflict_places(application_scope))
[docs]def inject_conflict_places(application_scope):
"""Return conflict places configured by an application scope."""
return ConflictPlacesLoader(
ConflictPlaces(),
application_scope.conflict_places,
application_scope.containing_places
).load()
[docs]def inject_null_conflict_analyzer():
"""Return a null conflict analyzer."""
return NullConflictAnalyzer()
[docs]def inject_recent_limit(application_scope):
"""Return the recent flapping limit configured by an application scope."""
return inject_date_time_creator(application_scope).xibo_offset_time(
-application_scope.delete_after_end_seconds)
[docs]def inject_current_limit(application_scope):
"""Return the current flapping limit configured by an application scope."""
return inject_date_time_creator(application_scope).xibo_offset_time(
application_scope.delete_before_start_seconds)
[docs]def inject_future_limit(application_scope):
"""Return the future flapping limit configured by an application scope."""
return inject_date_time_creator(application_scope).xibo_offset_time(
application_scope.delete_until_future_seconds)
[docs]def inject_cancelled_last_time(application_scope):
"""Return the last time allowed for cancelled Meetup events configured by
an application scope."""
return inject_date_time_creator(application_scope).meetup_offset_time(
application_scope.ignore_cancelled_after_seconds)
[docs]def inject_meetup2xibo(application_scope):
"""Return a Meetup to Xibo converter configured by an application scope."""
return Meetup2Xibo(
inject_meetup_events_retriever(application_scope),
inject_selected_conflict_analyzer(application_scope),
inject_event_list_converter(application_scope),
inject_site_cert_assurer(application_scope),
inject_oauth2_session_starter(application_scope),
inject_event_suppressor(application_scope),
inject_enter_xibo_session_scope(application_scope),
)
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 autoindent