Source code for acapi2.resources.task
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Acquia API task queue resource."""
import logging
import time
from datetime import datetime, timedelta
import requests_cache
from acapi2.exceptions import AcquiaCloudTaskFailedException
from acapi2.resources.acquiaresource import AcquiaResource
LOGGER = logging.getLogger("acapi2.resources.task")
[docs]class Task(AcquiaResource):
"""
Task is deprecated, let's not use it anymore.
"""
POLL_INTERVAL = 3
[docs] def mangle_uri(self, uri: str, task_data: dict):
raise NotImplementedError
[docs] def is_pending(self) -> bool:
with requests_cache.disabled():
tasks = self.request().json()["embedded"]["_items"]
self.data = tasks[self.__getitem__("uuid")]
return self.data["status"] != "in-progress"
[docs] def wait(self, timeout: int = 1000) -> "Task":
start = datetime.now()
max_time = start + timedelta(seconds=timeout)
while self.is_pending():
if start >= max_time:
msg = (
f"Time out exceeded while "
f"waiting for {self.data['uuid']}"
)
raise AcquiaCloudTaskFailedException(msg, self.data)
time.sleep(self.POLL_INTERVAL)
task = self.data
if task["status"] == "failed":
msg = f"Task {self.data['uuid']} failed"
raise AcquiaCloudTaskFailedException(msg, task)
end = datetime.now()
delta = end - start
LOGGER.info(
"Waited %.2f seconds for task to complete", delta.total_seconds()
)
return self