Source code for dynamodb_encryption_sdk.material_providers.most_recent

# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Cryptographic materials provider that uses a provider store to obtain cryptographic materials."""
import logging
import time
from collections import OrderedDict
from enum import Enum
from threading import Lock, RLock

import attr
import six

from dynamodb_encryption_sdk.exceptions import InvalidVersionError, NoKnownVersionError
from dynamodb_encryption_sdk.identifiers import LOGGER_NAME
from dynamodb_encryption_sdk.materials import CryptographicMaterials  # noqa pylint: disable=unused-import
from dynamodb_encryption_sdk.structures import EncryptionContext  # noqa pylint: disable=unused-import

from . import CryptographicMaterialsProvider
from .store import ProviderStore

try:  # Python 3.5.0 and 3.5.1 have incompatible typing modules
    from typing import Any, Text  # noqa pylint: disable=unused-import
except ImportError:  # pragma: no cover
    # We only actually need these imports when running the mypy checks
    pass


__all__ = ("CachingMostRecentProvider",)
_LOGGER = logging.getLogger(LOGGER_NAME)
#: Grace period during which we will return the latest local materials. This allows multiple
#: threads to be using this same provider without risking lock contention or many threads
#: all attempting to create new versions simultaneously.
_GRACE_PERIOD = 0.5
_ENCRYPT_ACTION = "encrypt"
_DECRYPT_ACTION = "decrypt"


class TtlActions(Enum):
    """Actions that can be taken based on the version TTl state."""

    EXPIRED = 0
    GRACE_PERIOD = 1
    LIVE = 2


def _min_capacity_validator(instance, attribute, value):
    # pylint: disable=unused-argument
    """Attrs validator to require that value is at least 1."""
    if value < 1:
        raise ValueError("Cache capacity must be at least 1")


@attr.s(init=False)
class BasicCache(object):
    """Most basic LRU cache."""

    capacity = attr.ib(validator=(attr.validators.instance_of(int), _min_capacity_validator))

    def __init__(self, capacity):  # noqa=D107
        # type: (int) -> None
        # Workaround pending resolution of attrs/mypy interaction.
        # https://github.com/python/mypy/issues/2088
        # https://github.com/python-attrs/attrs/issues/215
        self.capacity = capacity
        attr.validate(self)
        self.__attrs_post_init__()

    def __attrs_post_init__(self):
        # type; () -> None
        """Initialize the internal cache."""
        self._cache_lock = RLock()  # attrs confuses pylint: disable=attribute-defined-outside-init
        self.clear()

    def _prune(self):
        # type: () -> None
        """Prunes internal cache until internal cache is within the defined limit."""
        with self._cache_lock:
            while len(self._cache) > self.capacity:
                self._cache.popitem(last=False)

    def put(self, name, value):
        # type: (Any, Any) -> None
        """Add a value to the cache.

        :param name: Hashable object to identify the value in the cache
        :param value: Value to add to cache
        """
        with self._cache_lock:
            self._cache[name] = value
            self._prune()

    def get(self, name):
        # type: (Any) -> Any
        """Get a value from the cache.

        :param name: Object to identify the value in the cache
        :returns: Value from cache
        """
        with self._cache_lock:
            value = self._cache.pop(name)
            # Re-insert the item to bump it to the front of the LRU cache
            self.put(name, value)
            return value

    def clear(self):
        # type: () -> None
        """Clear the cache."""
        _LOGGER.debug("Clearing cache")
        with self._cache_lock:
            self._cache = OrderedDict()  # type: OrderedDict[Any, Any] # pylint: disable=attribute-defined-outside-init

    def evict(self, name):
        # type: (Any) -> None
        """Evict a single entry from the cache."""
        with self._cache_lock:
            try:
                del self._cache[name]
            except KeyError:
                # If the key wasn't in the cache, do nothing
                pass


[docs]@attr.s(init=False) @attr.s(init=False) class CachingMostRecentProvider(CryptographicMaterialsProvider): # pylint: disable=too-many-instance-attributes """Cryptographic materials provider that uses a provider store to obtain cryptography materials. Materials obtained from the store are cached for a user-defined amount of time, then removed from the cache and re-retrieved from the store. When encrypting, the most recent provider that the provider store knows about will always be used. :param ProviderStore provider_store: Provider store to use :param str material_name: Name of materials for which to ask the provider store :param float version_ttl: Max time in seconds to go until checking with provider store for a more recent version :param int cache_size: The maximum number of entries that the cache can hold """ _provider_store = attr.ib(validator=attr.validators.instance_of(ProviderStore)) _material_name = attr.ib(validator=attr.validators.instance_of(six.string_types)) _version_ttl = attr.ib(validator=attr.validators.instance_of(float)) _cache_size = attr.ib(validator=attr.validators.instance_of(int)) def __init__(self, provider_store, material_name, version_ttl, cache_size=1000): # noqa=D107 # type: (ProviderStore, Text, float, int) -> None # Workaround pending resolution of attrs/mypy interaction. # https://github.com/python/mypy/issues/2088 # https://github.com/python-attrs/attrs/issues/215 self._provider_store = provider_store self._material_name = material_name self._version_ttl = version_ttl self._grace_period = _GRACE_PERIOD self._cache_size = cache_size attr.validate(self) self.__attrs_post_init__() def __attrs_post_init__(self): # type: () -> None """Initialize the cache.""" self._version = None # type: int # pylint: disable=attribute-defined-outside-init self._last_updated = None # type: float # pylint: disable=attribute-defined-outside-init self._lock = Lock() # pylint: disable=attribute-defined-outside-init self._cache = BasicCache(self._cache_size) # pylint: disable=attribute-defined-outside-init self.refresh()
[docs] def decryption_materials(self, encryption_context): # type: (EncryptionContext) -> CryptographicMaterials """Return decryption materials. :param EncryptionContext encryption_context: Encryption context for request :raises AttributeError: if no decryption materials are available """ provider = None version = self._provider_store.version_from_material_description(encryption_context.material_description) ttl_action = self._ttl_action(version, _DECRYPT_ACTION) if ttl_action is TtlActions.EXPIRED: self._cache.evict(self._version) _LOGGER.debug('TTL Action "%s" when getting decryption materials', ttl_action.name) if ttl_action is TtlActions.LIVE: try: _LOGGER.debug("Looking in cache for encryption materials provider version %d", version) _, provider = self._cache.get(version) except KeyError: _LOGGER.debug("Decryption materials provider not found in cache") if provider is None: try: provider = self._get_provider_with_grace_period(version, ttl_action) except InvalidVersionError: _LOGGER.exception("Unable to get decryption materials from provider store.") raise AttributeError("No decryption materials available") return provider.decryption_materials(encryption_context)
def _ttl_action(self, version, action): # type: (str) -> TtlActions """Determine the correct action to take based on the local resources and TTL. :param action: The action being taken (encrypt or decrypt) :returns: decision :rtype: TtlActions """ try: if action == _ENCRYPT_ACTION: # On encrypt, always check the class-level variable indicating when we last checked for updates. # The cache timestamps will be updated by calls to decrypt, so we don't want frequent decrypts to # prevent us from re-checking for a newer encryption version if self._last_updated is None: return TtlActions.EXPIRED timestamp = self._last_updated else: timestamp, _ = self._cache.get(version) time_since_updated = time.time() - timestamp if time_since_updated < self._version_ttl: return TtlActions.LIVE if time_since_updated < self._version_ttl + self._grace_period: return TtlActions.GRACE_PERIOD _LOGGER.debug("TTL Expired because known version has expired") return TtlActions.EXPIRED except KeyError: _LOGGER.debug("TTL Expired because the requested version doesn't exist in the cache") return TtlActions.EXPIRED def _get_max_version(self): # type: () -> int """Ask the provider store for the most recent version of this material. :returns: Latest version in the provider store (0 if not found) :rtype: int """ try: return self._provider_store.max_version(self._material_name) except NoKnownVersionError: return 0 def _get_provider(self, version): # type: (int) -> CryptographicMaterialsProvider """Ask the provider for a specific version of this material. :param int version: Version to request :returns: Cryptographic materials provider for the requested version :rtype: CryptographicMaterialsProvider :raises AttributeError: if provider could not locate version """ try: return self._provider_store.get_or_create_provider(self._material_name, version) except InvalidVersionError: _LOGGER.exception("Unable to get encryption materials from provider store.") raise AttributeError("No encryption materials available") def _get_provider_with_grace_period(self, version, ttl_action): # type: (int, bool) -> CryptographicMaterialsProvider """Ask the provider to retrieve a specific version of this material, falling back to the cache if another caller currently holds the lock for retrieval. :param int version: Version to request :param TtlActions ttl_action: The ttl action to take for this version :returns: Cryptographic materials provider for the requested version :rtype: CryptographicMaterialsProvider :raises AttributeError: if provider could not locate version """ blocking_wait = bool(ttl_action is TtlActions.EXPIRED) acquired = self._lock.acquire(blocking_wait) # pylint: disable=consider-using-with if not acquired: # We failed to acquire the lock. # If blocking, we will never reach this point. # If not blocking, we want whatever the latest local version is. _LOGGER.debug("Failed to acquire lock. Returning the last cached version.") _, provider = self._cache.get(version) return provider try: # If the entry was expired then we blocked waiting for the lock, so it's possible some other thread already # queried the provider store and re-populated the cache. If so, we don't want to re-query the provider # store, so check if the entry is back in the cache first if ttl_action is TtlActions.EXPIRED: try: _, provider = self._cache.get(version) return provider except KeyError: pass provider = self._provider_store.provider(self._material_name, version) self._cache.put(version, (time.time(), provider)) return provider finally: self._lock.release() def _get_most_recent_version(self, ttl_action): # type: (bool) -> CryptographicMaterialsProvider """Get the most recent version of the provider. If allowing local and we cannot obtain the lock, just return the most recent local version. Otherwise, wait for the lock and ask the provider store for the most recent version of the provider. :param TtlActions ttl_action: The ttl action to take for this version :returns: version and corresponding cryptographic materials provider :rtype: CryptographicMaterialsProvider """ blocking_wait = bool(ttl_action is TtlActions.EXPIRED) acquired = self._lock.acquire(blocking_wait) # pylint: disable=consider-using-with if not acquired: # We failed to acquire the lock. # If blocking, we will never reach this point. # If not blocking, we want whatever the latest local version is. _LOGGER.debug("Failed to acquire lock. Returning the last cached version.") version = self._version _, provider = self._cache.get(version) return provider try: # If the entry was expired then we blocked waiting for the lock, so it's possible some other thread already # queried the provider store and re-populated the cache. If so, we don't want to re-query the provider # store, so check if the entry is back in the cache first if ttl_action is TtlActions.EXPIRED: try: _, provider = self._cache.get(self._version) return provider except KeyError: pass max_version = self._get_max_version() try: _, provider = self._cache.get(max_version) except KeyError: provider = self._get_provider(max_version) received_version = self._provider_store.version_from_material_description( provider._material_description # pylint: disable=protected-access ) _LOGGER.debug("Caching materials provider version %d", received_version) self._version = received_version # pylint: disable=attribute-defined-outside-init self._last_updated = time.time() # pylint: disable=attribute-defined-outside-init self._cache.put(received_version, (self._last_updated, provider)) finally: self._lock.release() _LOGGER.debug("New latest version is %d", self._version) return provider
[docs] def encryption_materials(self, encryption_context): # type: (EncryptionContext) -> CryptographicMaterials """Return encryption materials. :param EncryptionContext encryption_context: Encryption context for request :raises AttributeError: if no encryption materials are available """ ttl_action = self._ttl_action(self._version, _ENCRYPT_ACTION) _LOGGER.debug('TTL Action "%s" when getting encryption materials', ttl_action.name) provider = None if ttl_action is TtlActions.EXPIRED: self._cache.evict(self._version) if ttl_action is TtlActions.LIVE: try: _LOGGER.debug("Looking in cache for encryption materials provider version %d", self._version) _, provider = self._cache.get(self._version) except KeyError: _LOGGER.debug("Encryption materials provider not found in cache") ttl_action = TtlActions.EXPIRED if provider is None: _LOGGER.debug("Getting most recent materials provider version") provider = self._get_most_recent_version(ttl_action) return provider.encryption_materials(encryption_context)
[docs] def refresh(self): # type: () -> None """Clear all local caches for this provider.""" _LOGGER.debug("Refreshing CachingMostRecentProvider instance.") with self._lock: self._cache.clear() self._version = None # type: int # pylint: disable=attribute-defined-outside-init self._last_updated = None # type: float # pylint: disable=attribute-defined-outside-init