Skip to content

Circuit Maintenance Handle Notifications

nautobot_circuit_maintenance.handle_notifications

Handle Notifications

handler

Notifications jobs.

DryRunTransactionSkip

Bases: Exception

Exception to handle dryrun mode.

Source code in nautobot_circuit_maintenance/handle_notifications/handler.py
class DryRunTransactionSkip(Exception):
    """Exception to handle dryrun mode."""

HandleCircuitMaintenanceNotifications

Bases: Job

Job to handle external circuit maintenance notifications and turn them into Circuit Maintenances.

Source code in nautobot_circuit_maintenance/handle_notifications/handler.py
class HandleCircuitMaintenanceNotifications(Job):
    """Job to handle external circuit maintenance notifications and turn them into Circuit Maintenances."""

    dryrun = DryRunVar()

    class Meta:
        """Meta object boilerplate for HandleParsedNotifications."""

        name = "Update Circuit Maintenances"
        has_sensitive_variables = False
        description = "Fetch Circuit Maintenance Notifications from Sources and create or update Circuit Maintenances accordingly."

    # pylint: disable=arguments-differ
    def run(self, dryrun=False) -> List[uuid.UUID]:
        """Fetch notifications, process them and update Circuit Maintenance accordingly."""
        self.logger.debug("Starting Handle Notifications job.")

        notification_sources = NotificationSource.objects.all()
        if not notification_sources:
            self.logger.warning("No notification sources configured to retrieve notifications from.")
            return []

        try:
            notifications = get_notifications(
                job=self,
                notification_sources=notification_sources,
                since=get_since_reference(self),
            )
        except Exception:
            self.logger.error(
                f"Unexpected exception when retrieving notifications from sources ({notification_sources})",
                exc_info=True,
            )
            raise

        if not notifications:
            self.logger.info("No notifications received.")
            return []

        raw_notification_ids = []
        for notification in notifications:
            self.logger.info(f"Processing notification `{notification.subject}`.", extra={"object": notification})
            try:
                with transaction.atomic():
                    raw_id = process_raw_notification(self, notification)
                    if raw_id:
                        raw_notification_ids.append(raw_id)
                    if dryrun:
                        raise DryRunTransactionSkip()
            except DryRunTransactionSkip:
                self.logger.info("DRYRUN mode, nothing has been committed.")
            except Exception:
                self.logger.error(
                    "Unexpected exception when parsing notifications",
                    extra={"object": notification},
                    exc_info=True,
                )

        self.logger.info(f"{len(raw_notification_ids)} notifications processed.")

        return raw_notification_ids
Meta

Meta object boilerplate for HandleParsedNotifications.

Source code in nautobot_circuit_maintenance/handle_notifications/handler.py
class Meta:
    """Meta object boilerplate for HandleParsedNotifications."""

    name = "Update Circuit Maintenances"
    has_sensitive_variables = False
    description = "Fetch Circuit Maintenance Notifications from Sources and create or update Circuit Maintenances accordingly."
run(dryrun=False)

Fetch notifications, process them and update Circuit Maintenance accordingly.

Source code in nautobot_circuit_maintenance/handle_notifications/handler.py
def run(self, dryrun=False) -> List[uuid.UUID]:
    """Fetch notifications, process them and update Circuit Maintenance accordingly."""
    self.logger.debug("Starting Handle Notifications job.")

    notification_sources = NotificationSource.objects.all()
    if not notification_sources:
        self.logger.warning("No notification sources configured to retrieve notifications from.")
        return []

    try:
        notifications = get_notifications(
            job=self,
            notification_sources=notification_sources,
            since=get_since_reference(self),
        )
    except Exception:
        self.logger.error(
            f"Unexpected exception when retrieving notifications from sources ({notification_sources})",
            exc_info=True,
        )
        raise

    if not notifications:
        self.logger.info("No notifications received.")
        return []

    raw_notification_ids = []
    for notification in notifications:
        self.logger.info(f"Processing notification `{notification.subject}`.", extra={"object": notification})
        try:
            with transaction.atomic():
                raw_id = process_raw_notification(self, notification)
                if raw_id:
                    raw_notification_ids.append(raw_id)
                if dryrun:
                    raise DryRunTransactionSkip()
        except DryRunTransactionSkip:
            self.logger.info("DRYRUN mode, nothing has been committed.")
        except Exception:
            self.logger.error(
                "Unexpected exception when parsing notifications",
                extra={"object": notification},
                exc_info=True,
            )

    self.logger.info(f"{len(raw_notification_ids)} notifications processed.")

    return raw_notification_ids

create_circuit_maintenance(job, notification, maintenance_id, parser_maintenance, provider)

Handles the creation of a new circuit maintenance.

Source code in nautobot_circuit_maintenance/handle_notifications/handler.py
def create_circuit_maintenance(
    job: Job,
    notification: MaintenanceNotification,
    maintenance_id: str,
    parser_maintenance: Maintenance,
    provider: Provider,
) -> CircuitMaintenance:
    """Handles the creation of a new circuit maintenance."""
    circuit_maintenance_entry = CircuitMaintenance(
        name=maintenance_id[:MAX_MAINTENANCE_NAME_LENGTH],
        start_time=datetime.datetime.fromtimestamp(parser_maintenance.start, tz=datetime.timezone.utc),
        end_time=datetime.datetime.fromtimestamp(parser_maintenance.end, tz=datetime.timezone.utc),
        description=parser_maintenance.summary,
        status=(
            parser_maintenance.status
            if parser_maintenance.status in CircuitMaintenanceStatusChoices.values()
            else CircuitMaintenanceStatusChoices.UNKNOWN
        ),
    )
    circuit_maintenance_entry.save()
    job.logger.info("Created Circuit Maintenance.", extra={"object": circuit_maintenance_entry})

    for circuit in parser_maintenance.circuits:
        circuit_entry = Circuit.objects.filter(cid__iexact=circuit.circuit_id, provider=provider).last()
        if circuit_entry:
            circuit_impact_entry, created = CircuitImpact.objects.get_or_create(
                maintenance=circuit_maintenance_entry, circuit=circuit_entry, defaults={"impact": circuit.impact}
            )
            if created:
                job.logger.info(
                    f"Circuit ID {circuit.circuit_id} linked to Maintenance {maintenance_id}",
                    extra={"object": circuit_impact_entry},
                )
        else:
            note_entry, created = Note.objects.get_or_create(
                maintenance=circuit_maintenance_entry,
                title=f"Nonexistent circuit ID {circuit.circuit_id}"[:MAX_NOTE_TITLE_LENGTH],
                comment=(
                    f"Circuit ID {circuit.circuit_id} referenced was not found in the database, so omitted from the "
                    "maintenance."
                ),
                level="WARNING",
            )
            if created:
                job.logger.warning(
                    f"Circuit ID {circuit.circuit_id} referenced in {maintenance_id} is not in the Database, adding a note",
                    extra={"object": note_entry},
                )
            notification.source.tag_message(job, notification.msg_id, MessageProcessingStatus.UNKNOWN_CIDS)

    if not CircuitImpact.objects.filter(maintenance=circuit_maintenance_entry):
        job.logger.warning(
            "Circuit Maintenance has none Circuit IDs in the DB.", extra={"object": circuit_maintenance_entry}
        )

    return circuit_maintenance_entry

create_or_update_circuit_maintenance(job, notification, raw_entry, parser_maintenance, provider)

Processes a Maintenance, creating or updating the related Circuit Maintenance.

It returns the CircuitMaintenance entry created or updated.

Source code in nautobot_circuit_maintenance/handle_notifications/handler.py
def create_or_update_circuit_maintenance(
    job: Job,
    notification: MaintenanceNotification,
    raw_entry: RawNotification,
    parser_maintenance: Maintenance,
    provider: Provider,
) -> CircuitMaintenance:
    """Processes a Maintenance, creating or updating the related Circuit Maintenance.

    It returns the CircuitMaintenance entry created or updated.
    """
    maintenance_id = f"{raw_entry.provider.name}-{parser_maintenance.maintenance_id}"[:MAX_MAINTENANCE_NAME_LENGTH]
    try:
        circuit_maintenance_entry = CircuitMaintenance.objects.get(name=maintenance_id)
        # Using the RawNotification.stamp as the reference to sort because it's the one that takes into account the
        # source receving time.
        last_parsed_notification = (
            circuit_maintenance_entry.parsednotification_set.order_by("raw_notification__stamp").reverse().last()
        )

        # If the notification is older than the latest one used to update the CircuitMaintenance, we skip updating it
        parser_maintenance_datetime = datetime.datetime.fromtimestamp(
            parser_maintenance.stamp, tz=datetime.timezone.utc
        )
        if last_parsed_notification and last_parsed_notification.raw_notification.stamp > parser_maintenance_datetime:
            job.logger.debug(
                f"Not updating CircuitMaintenance {maintenance_id} because the notification is from "
                f"{parser_maintenance_datetime}, older than the most recent notification from {last_parsed_notification.raw_notification.stamp}.",
                extra={"object": circuit_maintenance_entry},
            )
            notification.source.tag_message(job, notification.msg_id, MessageProcessingStatus.OUT_OF_SEQUENCE)
            return circuit_maintenance_entry

        update_circuit_maintenance(job, notification, circuit_maintenance_entry, parser_maintenance, provider)
    except ObjectDoesNotExist:
        circuit_maintenance_entry = create_circuit_maintenance(
            job, notification, maintenance_id, parser_maintenance, provider
        )

    return circuit_maintenance_entry

create_raw_notification(job, notification, provider)

Create a RawNotification.

If it already exists, we return None to signal we are skipping it.

Source code in nautobot_circuit_maintenance/handle_notifications/handler.py
def create_raw_notification(
    job: Job,
    notification: MaintenanceNotification,
    provider: Provider,
) -> RawNotification:
    """Create a RawNotification.

    If it already exists, we return `None` to signal we are skipping it.
    """
    try:
        raw_entry = RawNotification.objects.get(
            subject=notification.subject[:MAX_NOTIFICATION_SUBJECT_LENGTH],
            provider=provider,
            stamp=parser.parse(notification.date),
        )
        # If the RawNotification was already created, we ignore it.
        job.logger.debug(f"Raw notification already existed with ID: {raw_entry.id}", extra={"object": raw_entry})
        return None
    except ObjectDoesNotExist:
        try:
            raw_entry = RawNotification(
                subject=notification.subject[:MAX_NOTIFICATION_SUBJECT_LENGTH],
                provider=provider,
                raw=notification.raw_payload,
                sender=notification.sender[:MAX_NOTIFICATION_SENDER_LENGTH],
                source=NotificationSource.objects.filter(name=notification.source.name).last(),
                stamp=parser.parse(notification.date),
            )

            raw_entry.save()
            job.logger.info("Raw notification created.", extra={"object": raw_entry})
        except Exception:
            job.logger.error(
                f"Raw notification '{notification.subject}' not created",
                extra={"object": notification},
                exc_info=True,
            )
            raise

    return raw_entry

get_maintenances_from_notification(job, notification, provider)

Use the circuit_maintenance_parser library to get Maintenances from the notification.

Source code in nautobot_circuit_maintenance/handle_notifications/handler.py
def get_maintenances_from_notification(job: Job, notification: MaintenanceNotification, provider: Provider):
    """Use the `circuit_maintenance_parser` library to get Maintenances from the notification."""
    provider_type = provider.cf.get("provider_parser_circuit_maintenances", "").lower() or provider.name

    parser_provider = init_provider(provider_type=provider_type)
    if not parser_provider:
        job.logger.warning(
            f"Notification Parser not found for {notification.provider_type}", extra={"object": notification}
        )
        notification.source.tag_message(job, notification.msg_id, MessageProcessingStatus.PARSING_FAILED)
        return None

    data_to_process = NotificationData.init_from_email_bytes(notification.raw_payload)
    if not data_to_process:
        job.logger.error(
            "Notification data was not accepted by the parser: {notification.raw_payload}",
            extra={"object": notification},
        )
        notification.source.tag_message(job, notification.msg_id, MessageProcessingStatus.PARSING_FAILED)
        raise ValueError("Notification data was not accepted by the parser")

    try:
        result = parser_provider.get_maintenances(data_to_process)
        notification.source.tag_message(job, notification.msg_id, MessageProcessingStatus.PARSED)
        if not result:
            job.logger.info(
                f"No maintenance notifications detected in `{notification.subject}`",
                extra={"object": notification},
            )
            notification.source.tag_message(job, notification.msg_id, MessageProcessingStatus.IGNORED)
        return result
    except ProviderError:
        job.logger.error(
            "Parsing failed for notification",
            extra={"object": notification},
            exc_info=True,
        )
        notification.source.tag_message(job, notification.msg_id, MessageProcessingStatus.PARSING_FAILED)
        raise
    except Exception:
        job.logger.error(
            "Unexpected exception while parsing notification",
            extra={"object": notification},
            exc_info=True,
        )
        notification.source.tag_message(job, notification.msg_id, MessageProcessingStatus.PARSING_FAILED)
        raise

get_since_reference(job)

Get the timestamp from the latest processed RawNotification or a reference from config raw_notification_initial_days_since.

Source code in nautobot_circuit_maintenance/handle_notifications/handler.py
def get_since_reference(job: Job) -> int:
    """Get the timestamp from the latest processed RawNotification or a reference from config `raw_notification_initial_days_since`."""
    # Latest retrieved notification will limit the scope of notifications to retrieve
    last_raw_notification = RawNotification.objects.last()
    if last_raw_notification:
        since_reference = last_raw_notification.last_updated.timestamp()
    else:
        since_reference = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(
            days=PLUGIN_SETTINGS.get("raw_notification_initial_days_since")
        )
        since_reference = int(since_reference.timestamp())
    job.logger.info(f"Processing notifications since {since_reference}", extra={"object": last_raw_notification})
    return since_reference

process_raw_notification(job, notification)

Processes a raw notification (maybe containing multiple parsed notifications).

It creates a RawNotification and if it could be parsed, create the corresponding ParsedNotification and the related objects. Finally returns the the UUID of the RawNotification modified.

Source code in nautobot_circuit_maintenance/handle_notifications/handler.py
def process_raw_notification(job: Job, notification: MaintenanceNotification) -> Optional[uuid.UUID]:
    """Processes a raw notification (maybe containing multiple parsed notifications).

    It creates a RawNotification and if it could be parsed, create the corresponding ParsedNotification and the
    related objects. Finally returns the the UUID of the RawNotification modified.
    """
    try:
        provider = Provider.objects.get_by_natural_key(notification.provider_type)
    except ObjectDoesNotExist:
        job.logger.warning(
            "Raw notification not created because is referencing to a provider not existent.",
            extra={"object": notification},
        )
        notification.source.tag_message(job, notification.msg_id, MessageProcessingStatus.UNKNOWN_PROVIDER)
        return None

    raw_entry = create_raw_notification(job, notification, provider)
    if not raw_entry:
        return None

    parser_maintenances = get_maintenances_from_notification(job, notification, provider)
    if not parser_maintenances:
        return raw_entry.id

    for parser_maintenance in parser_maintenances:
        try:
            circuit_maintenance_entry = create_or_update_circuit_maintenance(
                job, notification, raw_entry, parser_maintenance, provider
            )
            # Update raw notification as properly parsed
            raw_entry.parsed = True
            raw_entry.save()

            # Insert parsed notification in DB
            parsed_entry = ParsedNotification.objects.create(
                maintenance=circuit_maintenance_entry,
                raw_notification=raw_entry,
                json=parser_maintenance.to_json(),
            )
            parsed_entry.save()
            job.logger.info(
                f"Saved Parsed Notification for {circuit_maintenance_entry.name}.",
                extra={"object": parsed_entry},
            )

        except Exception:
            job.logger.error(
                "Unexpected exception while handling parsed notification",
                extra={"object": notification},
                exc_info=True,
            )
            raise

    return raw_entry.id

update_circuit_maintenance(job, notification, circuit_maintenance_entry, parser_maintenance, provider)

Handles the update of an existent circuit maintenance.

Source code in nautobot_circuit_maintenance/handle_notifications/handler.py
def update_circuit_maintenance(
    job: Job,
    notification: MaintenanceNotification,
    circuit_maintenance_entry: CircuitMaintenance,
    parser_maintenance: Maintenance,
    provider: Provider,
):  # pylint: disable=too-many-locals
    """Handles the update of an existent circuit maintenance."""
    maintenance_id = circuit_maintenance_entry.name
    circuit_maintenance_entry.description = parser_maintenance.summary
    if parser_maintenance.status != "NO-CHANGE":
        if parser_maintenance.status in CircuitMaintenanceStatusChoices.values():
            circuit_maintenance_entry.status = parser_maintenance.status
        else:
            circuit_maintenance_entry.status = CircuitMaintenanceStatusChoices.UNKNOWN
    circuit_maintenance_entry.start_time = datetime.datetime.fromtimestamp(
        parser_maintenance.start, tz=datetime.timezone.utc
    )
    circuit_maintenance_entry.end_time = datetime.datetime.fromtimestamp(
        parser_maintenance.end, tz=datetime.timezone.utc
    )
    circuit_maintenance_entry.ack = False
    circuit_maintenance_entry.save()

    circuit_entries = CircuitImpact.objects.filter(maintenance=circuit_maintenance_entry)

    new_cids = {parsed_circuit.circuit_id.lower() for parsed_circuit in parser_maintenance.circuits}
    existing_cids = {circuit_entry.circuit.cid.lower() for circuit_entry in circuit_entries}

    cids_to_update = new_cids & existing_cids
    cids_to_create = new_cids - existing_cids
    cids_to_remove = existing_cids - new_cids

    for cid in cids_to_create:
        circuit_entry = Circuit.objects.filter(cid__iexact=cid, provider=provider.pk).last()
        circuit = [
            parsed_circuit
            for parsed_circuit in parser_maintenance.circuits
            if parsed_circuit.circuit_id.lower() == cid.lower()
        ][0]
        if circuit_entry:
            circuit_impact_entry = CircuitImpact.objects.create(
                maintenance=circuit_maintenance_entry,
                circuit=circuit_entry,
                impact=circuit.impact,
            )
            job.logger.info(
                f"Circuit ID {circuit.circuit_id} linked to Maintenance {maintenance_id}",
                extra={"object": circuit_impact_entry},
            )
        else:
            note_entry, created = Note.objects.get_or_create(
                maintenance=circuit_maintenance_entry,
                title=f"Nonexistent circuit ID {circuit.circuit_id}"[:MAX_NOTE_TITLE_LENGTH],
                comment=(
                    f"Circuit ID {circuit.circuit_id} referenced was not found in the database, so omitted from the "
                    "maintenance."
                ),
                level="WARNING",
            )
            if created:
                job.logger.warning(
                    f"Circuit ID {circuit.circuit_id} referenced in {maintenance_id} is not in the Database, adding a note",
                    extra={"object": note_entry},
                )
            notification.source.tag_message(job, notification.msg_id, MessageProcessingStatus.UNKNOWN_CIDS)

    for cid in cids_to_update:
        circuit_entry = Circuit.objects.filter(cid__iexact=cid, provider=provider.pk).last()
        circuitimpact_entry = CircuitImpact.objects.filter(
            circuit=circuit_entry, maintenance=circuit_maintenance_entry
        ).last()
        circuit = [
            parsed_circuit
            for parsed_circuit in parser_maintenance.circuits
            if parsed_circuit.circuit_id.lower() == cid.lower()
        ][0]
        circuitimpact_entry.impact = circuit.impact
        circuitimpact_entry.save()

    for cid in cids_to_remove:
        circuit_entry = Circuit.objects.filter(cid__iexact=cid, provider=provider.pk).last()
        CircuitImpact.objects.filter(circuit=circuit_entry, maintenance=circuit_maintenance_entry).delete()

    job.logger.info(
        f"Updated Circuit Maintenance {maintenance_id}",
        extra={"object": circuit_maintenance_entry},
    )

sources

Notification Source classes.

EmailSource

Bases: Source

Abstract class that shares some methods and attributes accross email based sources.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
class EmailSource(Source):  # pylint: disable=abstract-method
    """Abstract class that shares some methods and attributes accross email based sources."""

    account: str
    emails_to_fetch = []
    source_header: str = "From"

    def get_account_id(self) -> str:
        """Method to get an identifier of the related account."""
        return self.account

    def validate_providers(self, job: Job, notification_source: NotificationSource, since_txt: str) -> bool:
        """Method to validate that the NotificationSource has attached Providers.

        Args:
            job (Job): Job to use its logger
            notification_source (NotificationSource): Notification Source to validate providers
            since_txt (str): Date string to be used to log

        Returns:
            bool: True if there are relevant providers attached or False otherwise
        """
        providers_with_email = []
        providers_without_email = []
        if not notification_source.providers.all():
            job.logger.warning(
                f"Skipping source '{notification_source.name}' because no providers were defined.",
                extra={"object": notification_source},
            )
            return False

        for provider in notification_source.providers.all():
            provider_emails = provider.cf.get("emails_circuit_maintenances")
            if provider_emails:
                self.emails_to_fetch.extend([src.strip().lower() for src in provider_emails.split(",")])
                providers_with_email.append(provider.name)
            else:
                providers_without_email.append(provider.name)

        if providers_without_email:
            job.logger.warning(
                f"Skipping {', '.join(providers_without_email)} because these providers have no email configured.",
                extra={"object": notification_source},
            )

        if not providers_with_email:
            job.logger.warning(
                (
                    f"Skipping Notification Source {notification_source.name} because none of the related providers "
                    "have emails defined."
                ),
                extra={"object": notification_source},
            )
            return False
        job.logger.debug(f"Fetching emails from {self.emails_to_fetch}")
        job.logger.info(
            (
                f"Retrieving notifications from {notification_source.name} for "
                f"{', '.join(providers_with_email)} since {since_txt}"
            ),
            extra={"object": notification_source},
        )

        return True

    @staticmethod
    def extract_email_source(email_source: str) -> str:
        """Method to get the sender email address."""
        try:
            email_source = re.search(r"\<([-A-Za-z0-9_@.]+)\>", email_source).group(1)
        except AttributeError:
            try:
                email_source = re.search(r"([-A-Za-z0-9_@.]+)", email_source).group(1)
            except AttributeError:
                return ""
        return email_source.lower()

    @staticmethod
    def get_provider_type_from_email(email_source: str) -> Optional[str]:
        """Return the `Provider` type related to the source."""
        for provider in Provider.objects.all():
            emails_for_provider = provider.cf.get("emails_circuit_maintenances")
            if not emails_for_provider:
                continue
            sources = [src.strip().lower() for src in emails_for_provider.split(",")]
            if email_source in sources:
                return provider.name
        return None

    def process_email(
        self, job: Job, email_message: email.message.EmailMessage, msg_id: Union[str, bytes]
    ) -> Optional[MaintenanceNotification]:
        """Process an EmailMessage to create the MaintenanceNotification."""
        email_source = None
        if email_message[self.source_header]:
            email_source = self.extract_email_source(email_message[self.source_header])

        if not email_source:
            job.logger.error(
                (
                    "Not possible to determine the email sender from "
                    f'"{self.source_header}: {email_message[self.source_header]}"',
                ),
                extra={"object": email_message},
            )
            self.tag_message(job, msg_id, MessageProcessingStatus.UNKNOWN_PROVIDER)
            raise ValueError("Not possible to determine the email sender.")

        provider_type = self.get_provider_type_from_email(email_source)
        if not provider_type:
            job.logger.warning(
                f"Not possible to determine the provider_type for {email_source}",
                extra={"object": email_source},
            )
            self.tag_message(job, msg_id, MessageProcessingStatus.UNKNOWN_PROVIDER)
            return None

        return MaintenanceNotification(
            source=self,
            sender=email_source,
            subject=email_message["Subject"],
            provider_type=provider_type,
            raw_payload=email_message.as_bytes(),
            date=email_message["Date"],
            msg_id=msg_id,
        )
extract_email_source(email_source) staticmethod

Method to get the sender email address.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
@staticmethod
def extract_email_source(email_source: str) -> str:
    """Method to get the sender email address."""
    try:
        email_source = re.search(r"\<([-A-Za-z0-9_@.]+)\>", email_source).group(1)
    except AttributeError:
        try:
            email_source = re.search(r"([-A-Za-z0-9_@.]+)", email_source).group(1)
        except AttributeError:
            return ""
    return email_source.lower()
get_account_id()

Method to get an identifier of the related account.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def get_account_id(self) -> str:
    """Method to get an identifier of the related account."""
    return self.account
get_provider_type_from_email(email_source) staticmethod

Return the Provider type related to the source.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
@staticmethod
def get_provider_type_from_email(email_source: str) -> Optional[str]:
    """Return the `Provider` type related to the source."""
    for provider in Provider.objects.all():
        emails_for_provider = provider.cf.get("emails_circuit_maintenances")
        if not emails_for_provider:
            continue
        sources = [src.strip().lower() for src in emails_for_provider.split(",")]
        if email_source in sources:
            return provider.name
    return None
process_email(job, email_message, msg_id)

Process an EmailMessage to create the MaintenanceNotification.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def process_email(
    self, job: Job, email_message: email.message.EmailMessage, msg_id: Union[str, bytes]
) -> Optional[MaintenanceNotification]:
    """Process an EmailMessage to create the MaintenanceNotification."""
    email_source = None
    if email_message[self.source_header]:
        email_source = self.extract_email_source(email_message[self.source_header])

    if not email_source:
        job.logger.error(
            (
                "Not possible to determine the email sender from "
                f'"{self.source_header}: {email_message[self.source_header]}"',
            ),
            extra={"object": email_message},
        )
        self.tag_message(job, msg_id, MessageProcessingStatus.UNKNOWN_PROVIDER)
        raise ValueError("Not possible to determine the email sender.")

    provider_type = self.get_provider_type_from_email(email_source)
    if not provider_type:
        job.logger.warning(
            f"Not possible to determine the provider_type for {email_source}",
            extra={"object": email_source},
        )
        self.tag_message(job, msg_id, MessageProcessingStatus.UNKNOWN_PROVIDER)
        return None

    return MaintenanceNotification(
        source=self,
        sender=email_source,
        subject=email_message["Subject"],
        provider_type=provider_type,
        raw_payload=email_message.as_bytes(),
        date=email_message["Date"],
        msg_id=msg_id,
    )
validate_providers(job, notification_source, since_txt)

Method to validate that the NotificationSource has attached Providers.

Parameters:

Name Type Description Default
job Job

Job to use its logger

required
notification_source NotificationSource

Notification Source to validate providers

required
since_txt str

Date string to be used to log

required

Returns:

Name Type Description
bool bool

True if there are relevant providers attached or False otherwise

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def validate_providers(self, job: Job, notification_source: NotificationSource, since_txt: str) -> bool:
    """Method to validate that the NotificationSource has attached Providers.

    Args:
        job (Job): Job to use its logger
        notification_source (NotificationSource): Notification Source to validate providers
        since_txt (str): Date string to be used to log

    Returns:
        bool: True if there are relevant providers attached or False otherwise
    """
    providers_with_email = []
    providers_without_email = []
    if not notification_source.providers.all():
        job.logger.warning(
            f"Skipping source '{notification_source.name}' because no providers were defined.",
            extra={"object": notification_source},
        )
        return False

    for provider in notification_source.providers.all():
        provider_emails = provider.cf.get("emails_circuit_maintenances")
        if provider_emails:
            self.emails_to_fetch.extend([src.strip().lower() for src in provider_emails.split(",")])
            providers_with_email.append(provider.name)
        else:
            providers_without_email.append(provider.name)

    if providers_without_email:
        job.logger.warning(
            f"Skipping {', '.join(providers_without_email)} because these providers have no email configured.",
            extra={"object": notification_source},
        )

    if not providers_with_email:
        job.logger.warning(
            (
                f"Skipping Notification Source {notification_source.name} because none of the related providers "
                "have emails defined."
            ),
            extra={"object": notification_source},
        )
        return False
    job.logger.debug(f"Fetching emails from {self.emails_to_fetch}")
    job.logger.info(
        (
            f"Retrieving notifications from {notification_source.name} for "
            f"{', '.join(providers_with_email)} since {since_txt}"
        ),
        extra={"object": notification_source},
    )

    return True

GmailAPI

Bases: EmailSource

GmailAPI class.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
class GmailAPI(EmailSource):
    """GmailAPI class."""

    credentials_file: str
    account: str
    service: Optional[Resource] = None
    credentials: Optional[Union[service_account.Credentials, Credentials]] = None

    # The required scope for baseline functionality (add gmail.modify permission in extra_scopes to enable tagging)
    SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]

    extra_scopes: List[str] = []
    limit_emails_with_not_header_from: List[str] = []
    labels: Dict[str, str] = {}

    class Config:
        """Pydantic BaseModel config."""

        arbitrary_types_allowed = True

    def load_credentials(self, force_refresh=False):
        """Load Credentials for Gmail API."""
        raise NotImplementedError

    def build_service(self):
        """Build API service."""
        self.service = build("gmail", "v1", credentials=self.credentials)

    def close_service(self):
        """Close API Service."""
        if self.service:
            self.service.close()

    def _authentication_logic(self):
        """Inner method to run the custom class validation logic."""
        self.load_credentials(force_refresh=True)

    def extract_raw_payload(self, body: Dict, msg_id: str) -> bytes:
        """Extracts the raw_payload from body or attachement."""
        if "attachmentId" in body:
            attachment = (
                self.service.users()  # pylint: disable=no-member
                .messages()
                .attachments()
                .get(userId=self.account, messageId=msg_id, id=body["attachmentId"])
                .execute()
            )
            return base64.b64decode(attachment["data"])
        if "data" in body:
            return base64.urlsafe_b64decode(body["data"])

        return b""

    def fetch_email(self, job: Job, msg_id: str) -> Optional[MaintenanceNotification]:
        """Fetch an specific email ID.

        See data format:  https://developers.google.com/gmail/api/reference/rest/v1/users.messages#Message
        """
        received_email = (
            self.service.users()  # pylint: disable=no-member
            .messages()
            .get(userId=self.account, id=msg_id, format="raw")
            .execute()
        )

        raw_email_string = base64.urlsafe_b64decode(received_email["raw"].encode("utf8"))
        email_message = email.message_from_bytes(raw_email_string)
        return self.process_email(job, email_message, msg_id)

    def _get_search_criteria(self, since_timestamp: datetime.datetime = None) -> str:
        """Build "search" criteria to filter emails, from date of from sender."""
        search_criteria = ""
        if since_timestamp:
            since_txt = since_timestamp.strftime("%Y/%m/%d")
            search_criteria = f"after:{since_txt}"

        # If source_header is not "From" but some other custom header such as X-Original-Sender,
        # the Gmail API doesn't let us filter by that, but if we provided via config a list of
        # source via `limit_emails_with_not_header_from`, we filter by that.
        if self.emails_to_fetch and self.source_header == "From":
            emails_with_from = [f"from:{email}" for email in self.emails_to_fetch]
            search_criteria += " {" + f'{" ".join(emails_with_from)}' + "}"
        elif self.emails_to_fetch and self.limit_emails_with_not_header_from:
            emails_with_from = [f"from:{email}" for email in self.limit_emails_with_not_header_from]
            search_criteria += " {" + f'{" ".join(emails_with_from)}' + "}"

        return search_criteria

    def tag_message(self, job: Job, msg_id: Union[str, bytes], tag: MessageProcessingStatus):
        """Apply the given Gmail label to the given message."""
        # Do we have a configured label ID corresponding to the given tag?
        if tag.value not in self.labels:
            return

        # Gmail API expects a str msg_id, but MaintenanceNotification coerces it to bytes - change it back if needed
        if isinstance(msg_id, bytes):
            msg_id = str(msg_id.decode())

        try:
            self.service.users().messages().modify(  # pylint: disable=no-member
                userId=self.account, id=msg_id, body={"addLabelIds": [self.labels[tag.value]]}
            ).execute()
        except HttpError:
            job.logger.warning(
                f"Error in applying tag '{tag.value}' ({self.labels[tag.value]}) to message {msg_id}:",
                exc_info=True,
            )

    def receive_notifications(
        self, job: Job, since_timestamp: datetime.datetime = None
    ) -> Iterable[MaintenanceNotification]:
        """Retrieve emails since an specific time, if provided."""
        self.load_credentials()
        self.build_service()

        search_criteria = self._get_search_criteria(since_timestamp)

        # messages.list() returns 100 emails at a time;
        # we need to loop with list_next() until we have all relevant messages
        request = (
            self.service.users().messages().list(userId=self.account, q=search_criteria)  # pylint: disable=no-member
        )
        msg_ids = []
        while request is not None:
            response = request.execute()
            msg_ids.extend(msg["id"] for msg in response.get("messages", []))
            request = self.service.users().messages().list_next(request, response)  # pylint: disable=no-member

        job.logger.debug(
            f"Fetched {len(msg_ids)} emails from {self.name} source using search pattern: {search_criteria}."
        )

        received_notifications = []
        for msg_id in msg_ids:
            raw_notification = self.fetch_email(job, msg_id)
            if raw_notification:
                received_notifications.append(raw_notification)

        job.logger.debug(f"Raw notifications created {len(received_notifications)} from {self.name}.")
        job.logger.debug(f"Raw notifications: {received_notifications}")

        self.close_service()
        return received_notifications
Config

Pydantic BaseModel config.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
class Config:
    """Pydantic BaseModel config."""

    arbitrary_types_allowed = True
build_service()

Build API service.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def build_service(self):
    """Build API service."""
    self.service = build("gmail", "v1", credentials=self.credentials)
close_service()

Close API Service.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def close_service(self):
    """Close API Service."""
    if self.service:
        self.service.close()
extract_raw_payload(body, msg_id)

Extracts the raw_payload from body or attachement.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def extract_raw_payload(self, body: Dict, msg_id: str) -> bytes:
    """Extracts the raw_payload from body or attachement."""
    if "attachmentId" in body:
        attachment = (
            self.service.users()  # pylint: disable=no-member
            .messages()
            .attachments()
            .get(userId=self.account, messageId=msg_id, id=body["attachmentId"])
            .execute()
        )
        return base64.b64decode(attachment["data"])
    if "data" in body:
        return base64.urlsafe_b64decode(body["data"])

    return b""
fetch_email(job, msg_id)

Fetch an specific email ID.

See data format: https://developers.google.com/gmail/api/reference/rest/v1/users.messages#Message

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def fetch_email(self, job: Job, msg_id: str) -> Optional[MaintenanceNotification]:
    """Fetch an specific email ID.

    See data format:  https://developers.google.com/gmail/api/reference/rest/v1/users.messages#Message
    """
    received_email = (
        self.service.users()  # pylint: disable=no-member
        .messages()
        .get(userId=self.account, id=msg_id, format="raw")
        .execute()
    )

    raw_email_string = base64.urlsafe_b64decode(received_email["raw"].encode("utf8"))
    email_message = email.message_from_bytes(raw_email_string)
    return self.process_email(job, email_message, msg_id)
load_credentials(force_refresh=False)

Load Credentials for Gmail API.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def load_credentials(self, force_refresh=False):
    """Load Credentials for Gmail API."""
    raise NotImplementedError
receive_notifications(job, since_timestamp=None)

Retrieve emails since an specific time, if provided.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def receive_notifications(
    self, job: Job, since_timestamp: datetime.datetime = None
) -> Iterable[MaintenanceNotification]:
    """Retrieve emails since an specific time, if provided."""
    self.load_credentials()
    self.build_service()

    search_criteria = self._get_search_criteria(since_timestamp)

    # messages.list() returns 100 emails at a time;
    # we need to loop with list_next() until we have all relevant messages
    request = (
        self.service.users().messages().list(userId=self.account, q=search_criteria)  # pylint: disable=no-member
    )
    msg_ids = []
    while request is not None:
        response = request.execute()
        msg_ids.extend(msg["id"] for msg in response.get("messages", []))
        request = self.service.users().messages().list_next(request, response)  # pylint: disable=no-member

    job.logger.debug(
        f"Fetched {len(msg_ids)} emails from {self.name} source using search pattern: {search_criteria}."
    )

    received_notifications = []
    for msg_id in msg_ids:
        raw_notification = self.fetch_email(job, msg_id)
        if raw_notification:
            received_notifications.append(raw_notification)

    job.logger.debug(f"Raw notifications created {len(received_notifications)} from {self.name}.")
    job.logger.debug(f"Raw notifications: {received_notifications}")

    self.close_service()
    return received_notifications
tag_message(job, msg_id, tag)

Apply the given Gmail label to the given message.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def tag_message(self, job: Job, msg_id: Union[str, bytes], tag: MessageProcessingStatus):
    """Apply the given Gmail label to the given message."""
    # Do we have a configured label ID corresponding to the given tag?
    if tag.value not in self.labels:
        return

    # Gmail API expects a str msg_id, but MaintenanceNotification coerces it to bytes - change it back if needed
    if isinstance(msg_id, bytes):
        msg_id = str(msg_id.decode())

    try:
        self.service.users().messages().modify(  # pylint: disable=no-member
            userId=self.account, id=msg_id, body={"addLabelIds": [self.labels[tag.value]]}
        ).execute()
    except HttpError:
        job.logger.warning(
            f"Error in applying tag '{tag.value}' ({self.labels[tag.value]}) to message {msg_id}:",
            exc_info=True,
        )

GmailAPIOauth

Bases: GmailAPI

GmailAPIOauth class.

See: https://developers.google.com/identity/protocols/oauth2/web-server

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
class GmailAPIOauth(GmailAPI):
    """GmailAPIOauth class.

    See: https://developers.google.com/identity/protocols/oauth2/web-server
    """

    def load_credentials(self, force_refresh=False):
        """Load Gmail API OAuth credentials."""
        notification_source = NotificationSource.objects.get(name=self.name)
        try:
            self.credentials = notification_source.token
        except EOFError:
            logger.debug("Google OAuth Token has not been initialized yet.")

        if force_refresh or not self.credentials or not self.credentials.valid:
            if self.credentials and self.credentials.refresh_token and (self.credentials.expired or force_refresh):
                try:
                    self.credentials.refresh(Request())
                except RefreshError:
                    # Bad token, discard it
                    notification_source._token = b""  # pylint: disable=protected-access
                    notification_source.save()
                    raise

                notification_source.token = self.credentials
                notification_source.save()
            else:
                raise RedirectAuthorize(url_name="google_authorize", source_name=self.name)
load_credentials(force_refresh=False)

Load Gmail API OAuth credentials.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def load_credentials(self, force_refresh=False):
    """Load Gmail API OAuth credentials."""
    notification_source = NotificationSource.objects.get(name=self.name)
    try:
        self.credentials = notification_source.token
    except EOFError:
        logger.debug("Google OAuth Token has not been initialized yet.")

    if force_refresh or not self.credentials or not self.credentials.valid:
        if self.credentials and self.credentials.refresh_token and (self.credentials.expired or force_refresh):
            try:
                self.credentials.refresh(Request())
            except RefreshError:
                # Bad token, discard it
                notification_source._token = b""  # pylint: disable=protected-access
                notification_source.save()
                raise

            notification_source.token = self.credentials
            notification_source.save()
        else:
            raise RedirectAuthorize(url_name="google_authorize", source_name=self.name)

GmailAPIServiceAccount

Bases: GmailAPI

GmailAPIServiceAccount class.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
class GmailAPIServiceAccount(GmailAPI):
    """GmailAPIServiceAccount class."""

    def load_credentials(self, force_refresh=False):
        """Load Gmail API Service Account credentials."""
        if force_refresh or not self.credentials:
            self.credentials = service_account.Credentials.from_service_account_file(self.credentials_file)
            self.credentials = self.credentials.with_scopes(self.SCOPES + self.extra_scopes)
            self.credentials = self.credentials.with_subject(self.account)
            self.credentials.refresh(Request())
load_credentials(force_refresh=False)

Load Gmail API Service Account credentials.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def load_credentials(self, force_refresh=False):
    """Load Gmail API Service Account credentials."""
    if force_refresh or not self.credentials:
        self.credentials = service_account.Credentials.from_service_account_file(self.credentials_file)
        self.credentials = self.credentials.with_scopes(self.SCOPES + self.extra_scopes)
        self.credentials = self.credentials.with_subject(self.account)
        self.credentials.refresh(Request())

IMAP

Bases: EmailSource

IMAP class, extending Source class.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
class IMAP(EmailSource):
    """IMAP class, extending Source class."""

    password: str
    imap_server: str
    imap_port: int = 993

    session: Optional[imaplib.IMAP4_SSL] = None

    class Config:
        """Pydantic BaseModel config."""

        arbitrary_types_allowed = True

    def open_session(self):
        """Open session to IMAP server.

        See states: https://github.com/python/cpython/blob/3.9/Lib/imaplib.py#L58
        """
        if not self.session:
            self.session = imaplib.IMAP4_SSL(self.imap_server, self.imap_port)
        if self.session.state == "NONAUTH":
            self.session.login(self.account, self.password)

    def close_session(self):
        """Close session to IMAP server.

        See states: https://github.com/python/cpython/blob/3.9/Lib/imaplib.py#L58
        """
        if self.session:
            if self.session.state == "SELECTED":
                self.session.close()
            if self.session.state == "AUTH":
                self.session.logout()

    def _authentication_logic(self):
        """Inner method to run the custom class validation logic."""
        self.open_session()
        self.close_session()

    def fetch_email(self, job: Job, msg_id: bytes) -> Optional[MaintenanceNotification]:
        """Fetch an specific email ID."""
        _, data = self.session.fetch(msg_id, "(RFC822)")
        email_message = email.message_from_bytes(data[0][1])

        return self.process_email(job, email_message, msg_id)

    def receive_notifications(
        self, job: Job, since_timestamp: datetime.datetime = None
    ) -> Iterable[MaintenanceNotification]:
        """Retrieve emails since an specific time, if provided."""
        self.open_session()

        # Define searching criteria
        self.session.select("Inbox")

        # TODO: find the right way to search messages from several senders
        # Maybe extend filtering options, for instance, to discard some type of notifications
        msg_ids = []

        # TODO: define a similar function to _get_search_criteria
        since_date = ""
        if since_timestamp:
            since_txt = since_timestamp.strftime("%d-%b-%Y")
            since_date = f'SINCE "{since_txt}"'

        if self.emails_to_fetch:
            for sender in self.emails_to_fetch:
                if self.source_header == "From":
                    search_items = (f'FROM "{sender}"', since_date)
                else:
                    search_items = (f'HEADER {self.source_header} "{sender}"', since_date)
                search_text = " ".join(search_items).strip()
                search_criteria = f"({search_text})"
                messages = self.session.search(None, search_criteria)[1][0]
                msg_ids.extend(messages.split())
                job.logger.debug(
                    f"Fetched {len(messages.split())} emails from {self.name}"
                    f" source using search pattern: {search_criteria}."
                )
        else:
            search_criteria = f"({since_date})"
            messages = self.session.search(None, search_criteria)[1][0]
            msg_ids.extend(messages.split())
            job.logger.debug(
                f"Fetched {len(messages.split())} emails from {self.name} "
                f"source using search pattern: {search_criteria}."
            )

        received_notifications = []
        for msg_id in msg_ids:
            raw_notification = self.fetch_email(job, msg_id)
            if raw_notification:
                received_notifications.append(raw_notification)

        job.logger.debug(f"Raw notifications created {len(received_notifications)} from {self.name}.")

        self.close_session()
        return received_notifications
Config

Pydantic BaseModel config.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
class Config:
    """Pydantic BaseModel config."""

    arbitrary_types_allowed = True
close_session()

Close session to IMAP server.

See states: https://github.com/python/cpython/blob/3.9/Lib/imaplib.py#L58

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def close_session(self):
    """Close session to IMAP server.

    See states: https://github.com/python/cpython/blob/3.9/Lib/imaplib.py#L58
    """
    if self.session:
        if self.session.state == "SELECTED":
            self.session.close()
        if self.session.state == "AUTH":
            self.session.logout()
fetch_email(job, msg_id)

Fetch an specific email ID.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def fetch_email(self, job: Job, msg_id: bytes) -> Optional[MaintenanceNotification]:
    """Fetch an specific email ID."""
    _, data = self.session.fetch(msg_id, "(RFC822)")
    email_message = email.message_from_bytes(data[0][1])

    return self.process_email(job, email_message, msg_id)
open_session()

Open session to IMAP server.

See states: https://github.com/python/cpython/blob/3.9/Lib/imaplib.py#L58

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def open_session(self):
    """Open session to IMAP server.

    See states: https://github.com/python/cpython/blob/3.9/Lib/imaplib.py#L58
    """
    if not self.session:
        self.session = imaplib.IMAP4_SSL(self.imap_server, self.imap_port)
    if self.session.state == "NONAUTH":
        self.session.login(self.account, self.password)
receive_notifications(job, since_timestamp=None)

Retrieve emails since an specific time, if provided.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def receive_notifications(
    self, job: Job, since_timestamp: datetime.datetime = None
) -> Iterable[MaintenanceNotification]:
    """Retrieve emails since an specific time, if provided."""
    self.open_session()

    # Define searching criteria
    self.session.select("Inbox")

    # TODO: find the right way to search messages from several senders
    # Maybe extend filtering options, for instance, to discard some type of notifications
    msg_ids = []

    # TODO: define a similar function to _get_search_criteria
    since_date = ""
    if since_timestamp:
        since_txt = since_timestamp.strftime("%d-%b-%Y")
        since_date = f'SINCE "{since_txt}"'

    if self.emails_to_fetch:
        for sender in self.emails_to_fetch:
            if self.source_header == "From":
                search_items = (f'FROM "{sender}"', since_date)
            else:
                search_items = (f'HEADER {self.source_header} "{sender}"', since_date)
            search_text = " ".join(search_items).strip()
            search_criteria = f"({search_text})"
            messages = self.session.search(None, search_criteria)[1][0]
            msg_ids.extend(messages.split())
            job.logger.debug(
                f"Fetched {len(messages.split())} emails from {self.name}"
                f" source using search pattern: {search_criteria}."
            )
    else:
        search_criteria = f"({since_date})"
        messages = self.session.search(None, search_criteria)[1][0]
        msg_ids.extend(messages.split())
        job.logger.debug(
            f"Fetched {len(messages.split())} emails from {self.name} "
            f"source using search pattern: {search_criteria}."
        )

    received_notifications = []
    for msg_id in msg_ids:
        raw_notification = self.fetch_email(job, msg_id)
        if raw_notification:
            received_notifications.append(raw_notification)

    job.logger.debug(f"Raw notifications created {len(received_notifications)} from {self.name}.")

    self.close_session()
    return received_notifications

MaintenanceNotification

Bases: BaseModel

Representation of all the data related to a Maintenance Notification.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
class MaintenanceNotification(BaseModel):
    """Representation of all the data related to a Maintenance Notification."""

    msg_id: bytes
    source: Source
    sender: str
    subject: str
    provider_type: str
    raw_payload: bytes
    date: str

RedirectAuthorize

Bases: Exception

Custom class to signal a redirect to trigger OAuth autorization workflow for a specific source_name.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
class RedirectAuthorize(Exception):
    """Custom class to signal a redirect to trigger OAuth autorization workflow for a specific source_name."""

    def __init__(self, url_name, source_name):
        """Init for RedirectAuthorize."""
        self.url_name = url_name
        self.source_name = source_name
        super().__init__()
__init__(url_name, source_name)

Init for RedirectAuthorize.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def __init__(self, url_name, source_name):
    """Init for RedirectAuthorize."""
    self.url_name = url_name
    self.source_name = source_name
    super().__init__()

Source

Bases: BaseModel

Base class to retrieve notifications. To be extended for each scheme.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
class Source(BaseModel):
    """Base class to retrieve notifications. To be extended for each scheme."""

    name: str
    url: str

    def get_account_id(self) -> str:
        """Method to get an identifier of the related account."""
        raise NotImplementedError

    def receive_notifications(
        self, job: Job, since_timestamp: datetime.datetime = None
    ) -> Iterable["MaintenanceNotification"]:
        """Function to retrieve notifications since one moment in time.

        The `MaintenanceNotification` attributes will contains these attributes:
        * source: self.name
        * sender: it could be the email 'from' or omitted if not relevant
        * subject: it could be the email 'subject' or some meaningful identifier from notification
        * provider_type: mapping to the Provider that is related to this notification
        * raw: the raw_payload from notification
        """
        # TODO: `senders` is used to limit the scope of emails retrieved, this won't have sense depending on the
        # Notification Source.
        raise NotImplementedError

    def validate_providers(self, job: Job, notification_source: NotificationSource, since_txt: str) -> bool:
        """Method to validate that the NotificationSource has attached Providers.

        Args:
            job (Job): Job to use its logger
            notification_source (NotificationSource): Notification Source to validate providers
            since_txt (str): Date string to be used to log

        Returns:
            bool: True if there are relevant providers attached or False otherwise
        """
        raise NotImplementedError

    def _authentication_logic(self):
        """Inner method to run the custom class validation logic."""
        raise NotImplementedError

    def test_authentication(self) -> Tuple[bool, str]:
        """Method to validate the authentication of the Source.

        Returns:
            Tuple:
                bool: True if authentication was successful, False otherwise
                str: Message from authentication execution
        """
        try:
            self._authentication_logic()
            is_authenticated = True
            message = "Test OK"
        except RedirectAuthorize:
            raise
        except Exception as exc:
            is_authenticated = False
            if isinstance(exc.args[0], bytes):
                message = str(exc.args[0].decode())
            else:
                message = str(exc)

        return is_authenticated, message

    @classmethod
    def init(cls: Type[T], name: str) -> T:
        """Factory Pattern to get the specific Source Class depending on the scheme."""
        for notification_source in settings.PLUGINS_CONFIG.get("nautobot_circuit_maintenance", {}).get(
            "notification_sources", []
        ):
            if notification_source.get("name", "") == name:
                config = notification_source
                break
        else:
            raise ValueError(f"Name {name} not found in PLUGINS_CONFIG.")

        url = config.get("url")
        if not url:
            raise ValueError(f"URL for {name} not found in PLUGINS_CONFIG.")

        url_components = urlparse(url)
        scheme = url_components.scheme.lower()
        if scheme == "imap":
            return IMAP(
                name=name,
                url=url,
                account=config.get("account"),
                password=config.get("secret"),
                imap_server=url_components.netloc.split(":")[0],
                imap_port=url_components.port or 993,
                source_header=config.get("source_header", "From"),
            )
        if scheme == "https" and url_components.netloc.split(":")[0] == "accounts.google.com":
            creds_filename = config.get("credentials_file")
            if not creds_filename:
                raise ValueError(f"Credentials_file for {name} not found in PLUGINS_CONFIG.")

            if not os.path.isfile(creds_filename):
                raise ValueError(f"Credentials_file {creds_filename} for {name} is not available.")

            with open(creds_filename, encoding="utf-8") as credentials_file:
                credentials = json.load(credentials_file)
                if credentials.get("type") == "service_account":
                    gmail_api_class = GmailAPIServiceAccount
                elif "web" in credentials:
                    gmail_api_class = GmailAPIOauth
                else:
                    raise NotImplementedError(f"File {creds_filename} doens't contain any supported credentials.")
                return gmail_api_class(
                    name=name,
                    url=url,
                    account=config.get("account"),
                    credentials_file=creds_filename,
                    source_header=config.get("source_header", "From"),
                    limit_emails_with_not_header_from=config.get("limit_emails_with_not_header_from", []),
                    extra_scopes=config.get("extra_scopes", []),
                    labels=config.get("labels", {}),
                )

        raise ValueError(
            f"Scheme {scheme} not supported as Notification Source (only IMAP or HTTPS to accounts.google.com)."
        )

    def tag_message(self, job: Job, msg_id: Union[str, bytes], tag: MessageProcessingStatus):
        """If supported, apply the given tag to the given message for future reference and categorization.

        The default implementation of this method is a no-op but specific Source subclasses may implement it.
        """
get_account_id()

Method to get an identifier of the related account.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def get_account_id(self) -> str:
    """Method to get an identifier of the related account."""
    raise NotImplementedError
init(name) classmethod

Factory Pattern to get the specific Source Class depending on the scheme.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
@classmethod
def init(cls: Type[T], name: str) -> T:
    """Factory Pattern to get the specific Source Class depending on the scheme."""
    for notification_source in settings.PLUGINS_CONFIG.get("nautobot_circuit_maintenance", {}).get(
        "notification_sources", []
    ):
        if notification_source.get("name", "") == name:
            config = notification_source
            break
    else:
        raise ValueError(f"Name {name} not found in PLUGINS_CONFIG.")

    url = config.get("url")
    if not url:
        raise ValueError(f"URL for {name} not found in PLUGINS_CONFIG.")

    url_components = urlparse(url)
    scheme = url_components.scheme.lower()
    if scheme == "imap":
        return IMAP(
            name=name,
            url=url,
            account=config.get("account"),
            password=config.get("secret"),
            imap_server=url_components.netloc.split(":")[0],
            imap_port=url_components.port or 993,
            source_header=config.get("source_header", "From"),
        )
    if scheme == "https" and url_components.netloc.split(":")[0] == "accounts.google.com":
        creds_filename = config.get("credentials_file")
        if not creds_filename:
            raise ValueError(f"Credentials_file for {name} not found in PLUGINS_CONFIG.")

        if not os.path.isfile(creds_filename):
            raise ValueError(f"Credentials_file {creds_filename} for {name} is not available.")

        with open(creds_filename, encoding="utf-8") as credentials_file:
            credentials = json.load(credentials_file)
            if credentials.get("type") == "service_account":
                gmail_api_class = GmailAPIServiceAccount
            elif "web" in credentials:
                gmail_api_class = GmailAPIOauth
            else:
                raise NotImplementedError(f"File {creds_filename} doens't contain any supported credentials.")
            return gmail_api_class(
                name=name,
                url=url,
                account=config.get("account"),
                credentials_file=creds_filename,
                source_header=config.get("source_header", "From"),
                limit_emails_with_not_header_from=config.get("limit_emails_with_not_header_from", []),
                extra_scopes=config.get("extra_scopes", []),
                labels=config.get("labels", {}),
            )

    raise ValueError(
        f"Scheme {scheme} not supported as Notification Source (only IMAP or HTTPS to accounts.google.com)."
    )
receive_notifications(job, since_timestamp=None)

Function to retrieve notifications since one moment in time.

The MaintenanceNotification attributes will contains these attributes: * source: self.name * sender: it could be the email 'from' or omitted if not relevant * subject: it could be the email 'subject' or some meaningful identifier from notification * provider_type: mapping to the Provider that is related to this notification * raw: the raw_payload from notification

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def receive_notifications(
    self, job: Job, since_timestamp: datetime.datetime = None
) -> Iterable["MaintenanceNotification"]:
    """Function to retrieve notifications since one moment in time.

    The `MaintenanceNotification` attributes will contains these attributes:
    * source: self.name
    * sender: it could be the email 'from' or omitted if not relevant
    * subject: it could be the email 'subject' or some meaningful identifier from notification
    * provider_type: mapping to the Provider that is related to this notification
    * raw: the raw_payload from notification
    """
    # TODO: `senders` is used to limit the scope of emails retrieved, this won't have sense depending on the
    # Notification Source.
    raise NotImplementedError
tag_message(job, msg_id, tag)

If supported, apply the given tag to the given message for future reference and categorization.

The default implementation of this method is a no-op but specific Source subclasses may implement it.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def tag_message(self, job: Job, msg_id: Union[str, bytes], tag: MessageProcessingStatus):
    """If supported, apply the given tag to the given message for future reference and categorization.

    The default implementation of this method is a no-op but specific Source subclasses may implement it.
    """
test_authentication()

Method to validate the authentication of the Source.

Returns:

Name Type Description
Tuple Tuple[bool, str]

bool: True if authentication was successful, False otherwise str: Message from authentication execution

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def test_authentication(self) -> Tuple[bool, str]:
    """Method to validate the authentication of the Source.

    Returns:
        Tuple:
            bool: True if authentication was successful, False otherwise
            str: Message from authentication execution
    """
    try:
        self._authentication_logic()
        is_authenticated = True
        message = "Test OK"
    except RedirectAuthorize:
        raise
    except Exception as exc:
        is_authenticated = False
        if isinstance(exc.args[0], bytes):
            message = str(exc.args[0].decode())
        else:
            message = str(exc)

    return is_authenticated, message
validate_providers(job, notification_source, since_txt)

Method to validate that the NotificationSource has attached Providers.

Parameters:

Name Type Description Default
job Job

Job to use its logger

required
notification_source NotificationSource

Notification Source to validate providers

required
since_txt str

Date string to be used to log

required

Returns:

Name Type Description
bool bool

True if there are relevant providers attached or False otherwise

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def validate_providers(self, job: Job, notification_source: NotificationSource, since_txt: str) -> bool:
    """Method to validate that the NotificationSource has attached Providers.

    Args:
        job (Job): Job to use its logger
        notification_source (NotificationSource): Notification Source to validate providers
        since_txt (str): Date string to be used to log

    Returns:
        bool: True if there are relevant providers attached or False otherwise
    """
    raise NotImplementedError

get_notifications(job, notification_sources, since)

Method to fetch notifications from multiple sources and return MaintenanceNotification objects.

Source code in nautobot_circuit_maintenance/handle_notifications/sources.py
def get_notifications(
    job: Job,
    notification_sources: Iterable[NotificationSource],
    since: int,
) -> Iterable[MaintenanceNotification]:
    """Method to fetch notifications from multiple sources and return MaintenanceNotification objects."""
    received_notifications = []

    for notification_source in notification_sources:
        try:
            since_date = datetime.datetime.fromtimestamp(since)
            since_txt = since_date.strftime("%d-%b-%Y")

            try:
                source = Source.init(name=notification_source.name)
            except ValidationError as validation_error:
                job.logger.warning(
                    (
                        f"Notification Source {notification_source.name} "
                        f"is not matching class expectations: {validation_error}"
                    ),
                    extra={"object": notification_source},
                    exc_info=True,
                )
                continue
            except ValueError:
                job.logger.warning(
                    f"Skipping notification source {notification_source}",
                    extra={"object": notification_source},
                    exc_info=True,
                )
                continue

            if source.validate_providers(job, notification_source, since_txt):
                if since_date:
                    # When using the SINCE filter, we add one extra day to check for notifications received
                    # on the very same day since last notification.
                    since_date -= datetime.timedelta(days=1)

                raw_notifications = source.receive_notifications(job, since_date)
                received_notifications.extend(raw_notifications)

                if not raw_notifications:
                    job.logger.info(
                        (
                            f"No notifications received for "
                            f"{', '.join(notification_source.providers.all().values_list('name', flat=True))} since "
                            f"{since_txt} from {notification_source.name}"
                        ),
                        extra={"object": notification_source},
                    )

        except Exception:
            job.logger.error(
                f"Issue fetching notifications from {notification_source.name}",
                extra={"object": notification_source},
                exc_info=True,
            )
            raise

    return received_notifications