Source code for ebonite.core.objects.requirements

import base64
import glob
import itertools
import json
import os
import zlib
from types import ModuleType
from typing import Dict, List, Type, TypeVar, Union

from pyjackson.decorators import make_string, type_field

from ebonite.core.objects.base import EboniteParams

# TODO i dont know how to do this better
MODULE_PACKAGE_MAPPING = {
    'sklearn': 'scikit-learn',
    'skimage': 'scikit-image'
}
PACKAGE_MODULE_MAPPING = {v: k for k, v in MODULE_PACKAGE_MAPPING.items()}


[docs]def read(path, bin=False): with open(path, 'r' if not bin else 'rb') as f: return f.read()
[docs]@type_field('type') class Requirement(EboniteParams): """ Base class for python requirement """ type = None
[docs]class PythonRequirement(Requirement): module = None
[docs]@make_string(include_name=True) class InstallableRequirement(PythonRequirement): """ This class represents pip-installable python library :param module: name of python module :param version: version of python package :param package_name: Optional. pip package name for this module, if it is different from module name """ type = 'installable' def __init__(self, module: str, version: str = None, package_name: str = None): self.module = module self.version = version self.package_name = package_name @property def package(self): """ Pip package name """ return self.package_name or MODULE_PACKAGE_MAPPING.get(self.module, self.module)
[docs] def to_str(self): """ pip installable representation of this module """ if self.version is not None: return '{}=={}'.format(self.package, self.version) return self.package
[docs] @classmethod def from_module(cls, mod: ModuleType, package_name: str = None) -> 'InstallableRequirement': """ Factory method to create :class:`InstallableRequirement` from module object :param mod: module object :param package_name: PIP package name if it is not equal to module name :return: :class:`InstallableRequirement` """ from ebonite.utils.module import get_module_version return InstallableRequirement(mod.__name__, get_module_version(mod), package_name)
[docs] @classmethod def from_str(cls, name): """ Factory method for creating :class:`InstallableRequirement` from string :param name: string representation :return: :class:`InstallableRequirement` """ for rel in ['==', '>=', '<=']: # TODO for now we interpret everything as exact version if rel in name: package, version = name.split(rel) return InstallableRequirement(package, version) return InstallableRequirement(name) # FIXME for other relations like > < !=
[docs]@make_string(include_name=True) class CustomRequirement(PythonRequirement): """ This class represents local python code that you need as a requirement for your code :param name: filename of this code :param source64zip: zipped and base64-encoded source :param is_package: whether this code should be in %name%/__init__.py """ type = 'custom' def __init__(self, name: str, source64zip: str, is_package: bool): self.source64zip = source64zip self.name = name self.is_package = is_package
[docs] @staticmethod def from_module(mod: ModuleType) -> 'CustomRequirement': """ Factory method to create :class:`CustomRequirement` from module object :param mod: module object :return: :class:`CustomRequirement` """ is_package = mod.__file__.endswith('__init__.py') if is_package: pkg_dir = os.path.dirname(mod.__file__) par = os.path.dirname(pkg_dir) sources = {os.path.relpath(p, par): read(p, bin=True) for p in glob.glob(os.path.join(pkg_dir, '**', '*'), recursive=True) if os.path.isfile(p)} src = CustomRequirement.compress_package(sources) else: src = CustomRequirement.compress(read(mod.__file__)) return CustomRequirement(mod.__name__, src, is_package)
[docs] @staticmethod def compress(s: str) -> str: """ Helper method to compress source code :param s: source code :return: base64 encoded string of zipped source """ zp = zlib.compress(s.encode('utf8')) b64 = base64.standard_b64encode(zp) return b64.decode('utf8')
[docs] @staticmethod def compress_package(s: Dict[str, bytes]) -> str: sources = { path: base64.standard_b64encode(zlib.compress(payload)).decode('utf8') for path, payload in s.items() } return CustomRequirement.compress(json.dumps(sources))
[docs] @staticmethod def decompress(s: str) -> str: """ Helper method to decompress source code :param s: compressed source code :return: decompressed source code """ zp = base64.standard_b64decode(s.encode('utf8')) src = zlib.decompress(zp) return src.decode('utf8')
[docs] @staticmethod def decompress_package(s: str) -> Dict[str, bytes]: sources = json.loads(CustomRequirement.decompress(s)) return { path: zlib.decompress(base64.standard_b64decode(payload.encode('utf8'))) for path, payload in sources.items() }
@property def module(self): """ Module name for this requirement """ return self.name.split('.')[0] @property def source(self) -> str: """ Source code of this requirement """ if not self.is_package: return CustomRequirement.decompress(self.source64zip) raise AttributeError("package requirement does not have source attribute") @property def sources(self) -> Dict[str, bytes]: if self.is_package: return CustomRequirement.decompress_package(self.source64zip) raise AttributeError("non package requirement does not have sources attribute")
[docs] def to_sources_dict(self): """ Mapping path -> source code for this requirement :return: dict path -> source """ if self.is_package: return self.sources else: return {self.name.replace('.', '/') + '.py': self.source}
[docs]class FileRequirement(CustomRequirement): def __init__(self, name: str, source64zip: str): super().__init__(name, source64zip, False)
[docs] def to_sources_dict(self): """ Mapping path -> source code for this requirement :return: dict path -> source """ return {self.name: self.source}
[docs] @classmethod def from_path(cls, path: str): return FileRequirement(path, cls.compress(read(path)))
[docs]@make_string class UnixPackageRequirement(Requirement): def __init__(self, package_name: str): self.package_name = package_name
T = TypeVar('T', bound=Requirement)
[docs]class Requirements(EboniteParams): """ A collection of requirements :param requirements: list of :class:`Requirement` instances """ def __init__(self, requirements: List[Requirement] = None): self.requirements = requirements or [] @property def installable(self) -> List[InstallableRequirement]: """ List of installable requirements """ return self.of_type(InstallableRequirement) @property def custom(self) -> List[CustomRequirement]: """ List of custom requirements """ return self.of_type(CustomRequirement)
[docs] def of_type(self, type_: Type[T]) -> List[T]: """ :param type_: type of requirements :return: List of requirements of type `type_` """ return [r for r in self.requirements if isinstance(r, type_)]
@property def modules(self) -> List[str]: """ List of module names """ return [r.module for r in self.of_type(PythonRequirement)]
[docs] def add(self, requirement: Requirement): """ Adds requirement to this collection :param requirement: :class:`Requirement` instance to add """ if isinstance(requirement, InstallableRequirement): for r in self.installable: if r.package == requirement.package: if r.version == requirement.version: break if r.version is not None and r.version != requirement.version: raise ValueError('Conflicting versions for package {}: {} and {}'.format(r.package, r.version, requirement.version)) else: self.requirements.append(requirement) elif isinstance(requirement, CustomRequirement): if requirement.is_package: for r in self.custom: if r.name.startswith(requirement.name + '.') or r.name == requirement.name: # existing req is subpackage or equals to new req self.requirements.remove(r) if requirement.name.startswith(r.name + '.'): # new req is subpackage of existing break else: self.requirements.append(requirement) else: for r in self.custom: if r.is_package and requirement.name.startswith(r.name + '.'): # new req is from existing package break if not r.is_package and r.name == requirement.name: # new req equals to existing break else: self.requirements.append(requirement) else: # TODO better checks here if requirement not in self.requirements: self.requirements.append(requirement)
[docs] def to_pip(self) -> List[str]: """ :return: list of pip installable packages """ return [r.to_str() for r in self.installable]
def __add__(self, other: 'AnyRequirements'): other = resolve_requirements(other) res = Requirements([]) for r in itertools.chain(self.requirements, other.requirements): res.add(r) return res def __iadd__(self, other: 'AnyRequirements'): return self + other
[docs]def resolve_requirements(other: 'AnyRequirements') -> Requirements: """ Helper method to create :class:`Requirements` from any supported source. Supported formats: :class:`Requirements`, :class:`Requirement`, list of :class:`Requirement`, string representation or list of string representations :param other: requirement in supported format :return: :class:`Requirements` instance """ if not isinstance(other, Requirements): if isinstance(other, list): if isinstance(other[0], str): other = Requirements([InstallableRequirement.from_str(r) for r in other]) elif isinstance(other[0], Requirement): other = Requirements([r for r in other]) elif isinstance(other, Requirement): other = Requirements([other]) elif isinstance(other, str): other = Requirements([InstallableRequirement.from_str(other)]) else: raise TypeError('only other Requirements, Requirement, list of Requirement objects, string ' '(or list of strings) can be added') return other
AnyRequirements = Union[Requirements, Requirement, List[Requirement], str, List[str]]