Source code for acapi2.resources.notification

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Notification object"""


import logging
import time
from datetime import datetime, timedelta

import requests_cache

from acapi2.exceptions import (
    AcquiaCloudNotificationFailedException,
    AcquiaCloudTimeoutError,
)
from acapi2.resources.acquiaresource import AcquiaResource

LOGGER = logging.getLogger("acapi2.resources.notification")


[docs]class Notification(AcquiaResource): POLL_INTERVAL = 3 def __init__( self, uri: str, api_key: str, api_secret: str, filters: dict = None, *args, **kwargs, ) -> None: super().__init__(uri, api_key, api_secret, *args, **kwargs)
[docs] def pending(self) -> bool: """Check if a notification is still pending. :return: is the notification task in progress or no """ with requests_cache.disabled(): notification = self.request().json() self.data = notification return notification["status"] == "in-progress"
[docs] def wait(self, timeout: int = 1800) -> "Notification": """Wait for a notification task to finish executing. :param timeout: the max number of seconds to wait for the notification to complete :return: Notification object or raise exceptions """ start = datetime.now() max_time = start + timedelta(seconds=timeout) while self.pending(): # Ensure the timeout hasn't been exceeded. if datetime.now() >= max_time: msg = ( f"Time out of exceeded while waiting " f"for {self.data['uuid']}" ) raise AcquiaCloudTimeoutError(msg, self.data) time.sleep(self.POLL_INTERVAL) # Grab the cached response notification = self.data if notification["status"] in ["failed", "error"]: raise AcquiaCloudNotificationFailedException( f"Notification {notification['uuid']} failed", notification ) end = datetime.now() delta = end - start LOGGER.info( "Waited %.2f seconds for notification to complete", delta.total_seconds(), ) return self