# *****************************************************************************
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# ******************************************************************************
import abc
import argparse
import json
import os
import six
import sys
from contextlib import contextmanager
from copy import deepcopy
from dlab_core.domain.repositories import BaseRepository, RepositoryException
[docs]LC_ERR_WRONG_ARGUMENTS = 'Unrecognized arguments'
[docs]LC_ERR_INVALID_DATA_TYPE = 'Invalid context type, should be instance of {name}'
[docs]LC_ERR_NO_FILE = 'No such file or directory: "{location}".'
[docs]LC_ERR_NOT_JSON_CONTENT = 'No JSON object could be decoded'
# TODO remove condition after Python 2.7 retirement
if six.PY2:
# noinspection PyUnresolvedReferences
from ConfigParser import ConfigParser # pragma: no cover
else:
# noinspection PyUnresolvedReferences
from configparser import ConfigParser
[docs]class RepositoryDataTypeException(RepositoryException):
"""Raised when try to assign wrong data type.
:type name: str
:param name: Type name.
"""
def __init__(self, name):
super(RepositoryDataTypeException, self).__init__(
LC_ERR_INVALID_DATA_TYPE.format(name=name)
)
[docs]class RepositoryFileNotFoundException(RepositoryException):
"""Raised when try to read unavailable file.
:type key: str
:param key: File location
"""
def __init__(self, key):
super(RepositoryFileNotFoundException, self).__init__(
LC_ERR_NO_FILE.format(location=key)
)
[docs]class RepositoryJSONContentException(RepositoryException):
"""Raised during non JSON content serialization."""
def __init__(self):
super(RepositoryJSONContentException, self).__init__(
LC_ERR_NOT_JSON_CONTENT
)
[docs]class RepositoryWrongArgumentException(RepositoryException):
"""Raised during non JSON content serialization."""
def __init__(self):
super(RepositoryWrongArgumentException, self).__init__(
LC_ERR_WRONG_ARGUMENTS
)
[docs]@six.add_metaclass(abc.ABCMeta)
class DictRepository(BaseRepository):
"""Dictionary repository. Can be used as a base for repositories data based
on dict.
"""
def __init__(self):
self._data = {}
@property
[docs] def data(self):
"""Repository data getter.
:rtype: dict of Tuple
:return: Repository data.
"""
return self._data
[docs] def find_one(self, key):
"""Find one record in storage.
:type key: str
:param key: Record unique identifier.
:rtype: dict
:return: Record data.
"""
return self.data.get(key)
[docs] def find_all(self):
"""Finds all entities in the repository.
:rtype: list of dict
:return: All records from data storage.
"""
return self.data
[docs]@six.add_metaclass(abc.ABCMeta)
class BaseLazyLoadRepository(DictRepository):
@property
[docs] def data(self):
"""Repository data getter.
:rtype: list of dict
:return: Repository data.
"""
if not self._data:
self._load_data()
return self._data
@abc.abstractmethod
[docs] def _load_data(self):
"""Load data from data source.
:rtype: list of dict
:return: Repository data.
"""
raise NotImplementedError
[docs]@six.add_metaclass(abc.ABCMeta)
class BaseFileRepository(DictRepository):
"""Repository based on file data.
:type location: str
:param location: Data source file location.
"""
def __init__(self, location):
super(BaseFileRepository, self).__init__()
self._location = None
self.location = location
@classmethod
[docs] def _validate(cls, location):
"""Validate file location.
:raise RepositoryDataTypeException:
:raise RepositoryFileNotFoundException:
"""
if not isinstance(location, str):
raise RepositoryDataTypeException(str.__name__)
if not os.path.isfile(location):
raise RepositoryFileNotFoundException(location)
@property
[docs] def location(self):
"""File location getter.
:rtype: str
:return: File location.
"""
return self._location
@location.setter
def location(self, location):
"""File location setter.
:type location: str
:param location: File location.
"""
self._validate(location)
self._location = location
self._data = {}
[docs]class ArrayRepository(DictRepository):
"""Repository based on dict data.
:type data: dict
:param data: Repository data.
"""
def __init__(self, data=None):
super(ArrayRepository, self).__init__()
if data is not None:
self._validate(data)
self._data = data
[docs] def append(self, key, value):
"""Add new element into data set.
:type key: str
:param key: Record UUID.
:param value: Record value.
"""
self._data[key] = value
@staticmethod
[docs] def _validate(data):
"""Data source validator.
:param data: Data for validation.
:raise RepositoryDataTypeException:
"""
if not isinstance(data, dict):
raise RepositoryDataTypeException(str.__name__)
# FIXME: there can be problems with find_all method for win32 platform
[docs]class EnvironRepository(DictRepository):
"""Repository based on os.environ data."""
def __init__(self):
super(EnvironRepository, self).__init__()
self._data.update(os.environ)
[docs] def find_one(self, key):
"""Find one record in storage.
:type key: str
:param key: Record unique identifier.
:rtype: dict
:return: Record data.
"""
if sys.platform == 'win32':
key = key.upper() # pragma: no cover
return super(EnvironRepository, self).find_one(key)
[docs]class JSONContentRepository(DictRepository):
"""Repository based on JSON data.
:type content: str
:param content: JSON content for data source.
"""
def __init__(self, content=None):
super(JSONContentRepository, self).__init__()
self.content = content
@property
[docs] def content(self):
"""Content getter.
:rtype: str
:returns: JSON data content.
"""
return self._content
@content.setter
def content(self, content):
"""Content setter.
:type content: str
:param content: JSON data content.
:raise RepositoryDataTypeException:
:raise RepositoryJSONContentException:
"""
if not isinstance(content, str):
raise RepositoryDataTypeException(str.__name__)
try:
json_data = json.loads(content)
self._data = deepcopy(json_data)
except ValueError:
raise RepositoryJSONContentException()
self._content = content
[docs]class ArgumentsRepository(BaseLazyLoadRepository):
""" Repository based on CLI arguments as data source.
:type arg_parse: argparse.ArgumentParser
:param arg_parse: Argument Parser.
"""
def __init__(self, arg_parse=None):
super(ArgumentsRepository, self).__init__()
self.arg_parse = arg_parse or argparse.ArgumentParser()
@property
[docs] def arg_parse(self):
"""Argument parser getter.
:rtype: argparse.ArgumentParser
:returns: Argument Parser.
"""
return self._arg_parse
@arg_parse.setter
def arg_parse(self, arg_parse):
"""Argument parser setter.
:type arg_parse: argparse.ArgumentParser
:param arg_parse: Argument Parser.
:raise RepositoryDataTypeException:
"""
if not isinstance(arg_parse, argparse.ArgumentParser):
raise RepositoryDataTypeException(str.__name__)
self._arg_parse = arg_parse
@staticmethod
@contextmanager
[docs] def _silence_stderr():
"""Turn off stderr."""
new_target = open(os.devnull, "w")
old_target = sys.stderr
sys.stderr = new_target
try:
yield new_target
finally:
sys.stderr.close()
sys.stderr = old_target
[docs] def _load_data(self):
"""Load data from data source.
:rtype: list of dict
:return: Repository data.
:raise RepositoryWrongArgumentException:
"""
try:
with self._silence_stderr():
args = self._arg_parse.parse_args()
except SystemExit:
raise RepositoryWrongArgumentException()
for key, val in vars(args).items():
self._data[key] = val
[docs] def add_argument(self, *args, **kwargs):
"""Add argument parser argument."""
self._arg_parse.add_argument(*args, **kwargs)
self._data = {}
[docs]class ConfigRepository(BaseFileRepository, BaseLazyLoadRepository):
"""Repository based on file data.
:type location: str
:param location: Data source file location.
"""
[docs] VARIABLE_TEMPLATE = "{0}_{1}"
def __init__(self, location):
super(ConfigRepository, self).__init__(location)
[docs] def _load_data(self):
"""Load data from data source.
:rtype: list of dict
:return: Repository data.
"""
config = ConfigParser()
config.read(self.location)
for section in config.sections():
for option in config.options(section):
var = self.VARIABLE_TEMPLATE.format(section, option)
self._data[var] = self._data.get(var,
config.get(section, option))
[docs]class ChainOfRepositories(DictRepository):
"""List of repositories executed one by one till data will be found.
:type repos: list of BaseRepository.
:param repos: List of repositories executed in chain.
:raise RepositoryDataTypeException:
"""
def __init__(self, repos=()):
super(ChainOfRepositories, self).__init__()
if not isinstance(repos, list) and not isinstance(repos, tuple):
raise RepositoryDataTypeException("{} or {}".format(
list.__name__,
tuple.__name__)
)
self._repos = []
for repo in repos:
self.register(repo)
[docs] def register(self, repo):
"""Register new repository in chain.
:type repo: BaseRepository
:param repo: Repository.
:raise RepositoryDataTypeException:
"""
if not issubclass(type(repo), DictRepository):
raise RepositoryDataTypeException(DictRepository.__name__)
self._repos.append(repo)
[docs] def find_one(self, key):
"""Find one record in storage.
:type key: str
:param key: Record unique identifier.
:rtype: dict
:return: Record data.
"""
for repo in self._repos:
value = repo.find_one(key)
if value is not None:
return value
return None
[docs] def find_all(self):
"""Finds all entities in the repository.
:rtype: list of dict
:return: All records from data storage.
"""
data = {}
if len(self._repos):
for repo in self._repos:
data.update(repo.data)
return data