Source code for acapi2.resources.acquiadata

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Acquia Cloud API Data network resource"""
import logging
import time
import uuid
from platform import python_version
from pprint import pformat
from urllib.parse import urlencode

import backoff
import httphmac
import requests

from acapi2.http_request import HttpRequest
from acapi2.version import __version__

_logger = logging.getLogger("acapi2.resources.acquiadata")


[docs]class AcquiaData(object): _agent = "Acquia Cloud API Client/{mver} (Python {pver})" _realm = _agent.format(mver=__version__, pver=python_version()) def __init__( self, uri: str, api_key: str, api_secret: str, data: dict = None ) -> None: self.uri = uri self.api_key = api_key self.api_secret = api_secret self._data = data or {} self.last_response: requests.Response = requests.Response() @property def data(self): return self._data @data.setter def data(self, data: dict): self._data = data
[docs] @staticmethod def generate_url_query(uri: str, params: dict) -> str: # TODO: add params support in httphmac.Request() qry = urlencode(params) url = "{uri}?{qry}".format(uri=uri, qry=qry) return url
[docs] @backoff.on_exception( backoff.expo, requests.exceptions.RequestException, max_time=10 ) def request( self, uri: str = None, method: str = "GET", data: dict = None, params: dict = None, ): if not uri: uri = self.uri if params: params = {k: v for k, v in params.items() if v is not None} uri = self.generate_url_query(uri, params) response: requests.Response request = HttpRequest() auth_headers = { "realm": self._realm, "id": self.api_key, "nonce": uuid.uuid4().hex, "version": "2.0", } request.with_url(uri).with_method(method).with_time() signer = httphmac.V2Signer() signer.sign_direct(request, auth_headers, self.api_secret) if "GET" == method: attempt = 0 while attempt <= 5: response = request.do() if response.status_code not in list(range(500, 505)): break attempt += 1 time.sleep((attempt ** 2.0) / 10) if "POST" == method or "PUT" == method: request.with_json_body(data) signer.sign_direct(request, auth_headers, self.api_secret) response = request.do() logging.debug(response.content) if "DELETE" == method: response = request.do() logging.debug(response.content) self.last_response = response if ( response.status_code != requests.codes.ok and response.status_code != requests.codes.accepted ): try: response.raise_for_status() except requests.exceptions.HTTPError as exp: _logger.warning( "Failed request response headers: \n%s", pformat(exp.response.headers, indent=2), ) raise return response