import ast
import inspect
import io
import os
import re
import sys
import threading
import warnings
from collections import namedtuple
from functools import wraps
from pickle import PickleError
from types import FunctionType, LambdaType, MethodType, ModuleType
import requests
from isort.finders import FindersManager
from isort.settings import default
from ebonite.core.objects.requirements import (MODULE_PACKAGE_MAPPING, CustomRequirement, InstallableRequirement,
Requirements)
from ebonite.utils import importing
from ebonite.utils.importing import import_module
from ebonite.utils.log import logger
from ebonite.utils.pickling import EbonitePickler
PYTHON_BASE = os.path.dirname(threading.__file__)
[docs]def analyze_module_imports(module_path):
module = importing.import_module(module_path)
requirements = set()
for name, obj in module.__dict__.items():
if isinstance(obj, ModuleType):
mod = obj
else:
mod = get_object_base_module(obj)
if is_installable_module(mod) and not is_private_module(mod):
requirements.add(get_module_repr(mod))
return requirements
[docs]def check_pypi_module(module_name, module_version=None, raise_on_error=False, warn_on_error=True):
"""
Checks that module with given name and (optionally) version exists in PyPi repository.
:param module_name: name of module to look for in PyPi
:param module_version: (optional) version of module to look for in PyPi
:param raise_on_error: raise `ValueError` if module is not found in PyPi instead of returning `False`
:param warn_on_error: print a warning if module is not found in PyPi
:return: `True` if module found in PyPi, `False` otherwise
"""
r = requests.get('https://pypi.org/pypi/{}/json'.format(module_name))
if r.status_code != 200:
msg = 'Cant find package {} in PyPi'.format(module_name)
if raise_on_error:
raise ValueError(msg)
elif warn_on_error:
warnings.warn(msg)
return False
if module_version is not None and module_version not in r.json()['releases']:
msg = 'Cant find package version {}=={} in PyPi'.format(module_name, module_version)
if raise_on_error:
raise ImportError(msg)
elif warn_on_error:
warnings.warn(msg)
return False
return True
[docs]def get_object_base_module(obj: object) -> ModuleType:
"""
Determines base module of module given object comes from.
>>> import numpy
>>> get_object_base_module(numpy.random.Generator)
<module 'numpy' from '...'>
Essentially this function is a combination of :func:`get_object_module` and :func:`get_base_module`.
:param obj: object to determine base module for
:return: Python module object for base module
"""
mod = inspect.getmodule(obj)
return get_base_module(mod)
[docs]def get_base_module(mod: ModuleType):
"""
Determines base module for given module.
>>> import numpy
>>> get_base_module(numpy.random)
<module 'numpy' from '...'>
:param mod: Python module object to determine base module for
:return: Python module object for base module
"""
if mod is None:
mod = inspect.getmodule(type(mod))
if mod is None:
return None
base, _sep, _stem = mod.__name__.partition('.')
return sys.modules[base]
[docs]def get_object_module(obj: object) -> ModuleType:
"""
Determines module given object comes from
>>> import numpy
>>> get_object_module(numpy.ndarray)
<module 'numpy' from '...'>
:param obj: obj to determine module it comes from
:return: Python module object for object module
"""
return inspect.getmodule(obj)
def _create_section(section):
def is_section(cls: 'ISortModuleFinder', module: str):
cls.init()
if module in cls.instance.module2section:
mod_section = cls.instance.module2section[module]
else:
mod_section = cls.instance.finder.find(module)
cls.instance.module2section[module] = mod_section
return mod_section == section
return is_section
[docs]class ISortModuleFinder:
"""
Determines type of module: standard library (:meth:`ISortModuleFinder.is_stdlib`) or
third party (:meth:`ISortModuleFinder.is_thirdparty`).
This class uses `isort` library heuristics with some modifications.
"""
instance: 'ISortModuleFinder' = None
def __init__(self):
config = default.copy()
config['known_first_party'].append('ebonite')
config['known_third_party'].append('xgboost')
config['known_standard_library'].extend(
['opcode', 'nturl2path', # pytest requirements missed by isort
'pkg_resources', # EBNT-112: workaround for imports from setup.py (see build/builder/docker.py)
'posixpath', 'setuptools',
'pydevconsole', 'pydevd_tracing', 'pydev_ipython.matplotlibtools', 'pydev_console.protocol',
'pydevd_file_utils', 'pydevd_plugins.extensions.types.pydevd_plugins_django_form_str', 'pydev_console',
'pydev_ipython', 'pydevd_plugins.extensions.types.pydevd_plugin_numpy_types',
'pydevd_plugins.extensions.types.pydevd_helpers', 'pydevd_plugins', 'pydevd_plugins.extensions.types',
'pydevd_plugins.extensions', 'pydev_ipython.inputhook']) # "built-in" pydev (and pycharm) modules
section_names = config['sections']
sections = namedtuple('Sections', section_names)(*[name for name in section_names])
self.finder = FindersManager(config, sections)
self.module2section = {}
[docs] @classmethod
def init(cls):
if cls.instance is None:
cls.instance = cls()
is_stdlib = classmethod(_create_section('STDLIB'))
is_thirdparty = classmethod(_create_section('THIRDPARTY'))
[docs]def is_private_module(mod: ModuleType):
"""
Determines that given module object represents private module.
:param mod: module object to use
:return: boolean flag
"""
return mod.__name__.startswith('_')
[docs]def is_pseudo_module(mod: ModuleType):
"""
Determines that given module object represents pseudo (aka Python "magic") module.
:param mod: module object to use
:return: boolean flag
"""
return mod.__name__.startswith('__') and mod.__name__.endswith('__')
[docs]def is_extension_module(mod: ModuleType):
"""
Determines that given module object represents native code extension module.
:param mod: module object to use
:return: boolean flag
"""
try:
path = mod.__file__
return any(path.endswith(ext) for ext in {'.so', '.pyd'})
except AttributeError:
return True
[docs]def is_installable_module(mod: ModuleType):
"""
Determines that given module object represents PyPi-installable (aka third party) module.
:param mod: module object to use
:return: boolean flag
"""
return ISortModuleFinder.is_thirdparty(mod.__name__)
# return hasattr(mod, '__file__') and mod.__file__.startswith(PYTHON_BASE) and 'site-packages' in mod.__file__
[docs]def is_builtin_module(mod: ModuleType):
"""
Determines that given module object represents standard library (aka builtin) module.
:param mod: module object to use
:return: boolean flag
"""
return ISortModuleFinder.is_stdlib(mod.__name__)
[docs]def is_ebonite_module(mod: ModuleType):
"""
Determines that given module object is ebonite module
:param mod: module object to use
:return: boolean flag
"""
return mod.__name__ == 'ebonite' or mod.__name__.startswith('ebonite.')
[docs]def is_local_module(mod: ModuleType):
"""
Determines that given module object represents local module.
Local module is a module (Python file) which is not from standard library and not installed via pip.
:param mod: module object to use
:return: boolean flag
"""
return (not is_pseudo_module(mod) and not is_ebonite_module(mod) and not is_builtin_module(mod) and
not is_installable_module(mod) and not is_extension_module(mod))
[docs]def is_from_installable_module(obj: object):
"""
Determines that given object comes from PyPi-installable (aka third party) module.
:param obj: object to check
:return: boolean flag
"""
return is_installable_module(get_object_base_module(obj))
[docs]def get_module_version(mod: ModuleType):
"""
Determines version of given module object.
:param mod: module object to use
:return: version as `str` or `None` if version could not be determined
"""
try:
return mod.__version__
except AttributeError:
for name in os.listdir(os.path.dirname(mod.__file__)):
m = re.match(re.escape(mod.__name__) + '-(.+)\\.dist-info', name)
if m:
return m.group(1)
return None
[docs]def get_python_version():
"""
:return: Current python version in 'major.minor.micro' format
"""
major, minor, micro, *_ = sys.version_info
return f'{major}.{minor}.{micro}'
[docs]def get_package_name(mod: ModuleType) -> str:
"""
Determines PyPi package name for given module object
:param mod: module object to use
:return: name as `str`
"""
if mod is None:
raise ValueError('mod must not be None')
name = mod.__name__
return MODULE_PACKAGE_MAPPING.get(name, name)
[docs]def get_module_repr(mod: ModuleType, validate_pypi=False) -> str:
"""
Builds PyPi `requirements.txt`-compatible representation of given module object
:param mod: module object to use
:param validate_pypi: if `True` (default is `False`) perform representation validation in PyPi repository
:return: representation as `str`
"""
if mod is None:
raise ValueError('mod must not be None')
mod_name = get_package_name(mod)
mod_version = get_module_version(mod)
rpr = '{}=={}'.format(mod_name, mod_version)
if validate_pypi:
check_pypi_module(mod_name, mod_version, raise_on_error=True)
return rpr
[docs]def get_module_as_requirement(mod: ModuleType, validate_pypi=False) -> InstallableRequirement:
"""
Builds Ebonite representation of given module object
:param mod: module object to use
:param validate_pypi: if `True` (default is `False`) perform representation validation in PyPi repository
:return: representation as :class:`.InstallableRequirement`
"""
mod_version = get_module_version(mod)
if validate_pypi:
mod_name = get_package_name(mod)
check_pypi_module(mod_name, mod_version, raise_on_error=True)
return InstallableRequirement(mod.__name__, mod_version)
[docs]def get_local_module_reqs(mod):
tree = ast.parse(inspect.getsource(mod))
imports = []
for statement in tree.body:
if isinstance(statement, ast.Import):
imports += [(n.name, None) for n in statement.names]
elif isinstance(statement, ast.ImportFrom):
if statement.level == 0:
imp = (statement.module, None)
else:
imp = ('.' + statement.module, mod.__package__)
imports.append(imp)
result = [import_module(i, p) for i, p in imports]
if mod.__file__.endswith('__init__.py'):
# add loaded subpackages
prefix = mod.__name__ + '.'
result += [mod for name, mod in sys.modules.items() if name.startswith(prefix)]
return result
[docs]def add_closure_inspection(f):
@wraps(f)
def wrapper(pickler: '_EboniteRequirementAnalyzer', obj):
closure = inspect.getclosurevars(obj)
for field in ['nonlocals', 'globals']:
for o in getattr(closure, field).values():
if isinstance(o, ModuleType):
pickler._add_requirement(o)
else:
pickler.save(o)
if is_from_installable_module(obj):
return f(pickler, obj)
# to add from local imports inside user (non PIP package) code
tree = ast.parse(inspect.getsource(obj).strip())
class ImportFromVisitor(ast.NodeVisitor):
def visit_ImportFrom(self, node: ast.ImportFrom): # noqa
warnings.warn(f'Detected local import in {obj.__module__}.{obj.__name__}')
if node.level == 0:
mod = import_module(node.module)
else:
mod = import_module('.' + node.module, get_object_module(obj).__package__)
pickler._add_requirement(mod)
ImportFromVisitor().visit(tree)
return f(pickler, obj)
return wrapper
class _EboniteRequirementAnalyzer(EbonitePickler):
ignoring = (
'dill',
'ebonite',
'tests' # pytest scans all test modules and all their imports are treated as requirements
)
dispatch = EbonitePickler.dispatch.copy()
add_closure_for = [FunctionType, MethodType, staticmethod, classmethod, LambdaType]
dispatch.update({
t: add_closure_inspection(EbonitePickler.dispatch[t]) for t in add_closure_for
})
def __init__(self, *args, **kwargs):
super().__init__(io.BytesIO(), *args, **kwargs) # TODO maybe patch memo and other stuff too
self.framer.write = self.skip_write
self.write = self.skip_write
self.memoize = self.skip_write
self.seen = set()
self._modules = set()
@property
def custom_modules(self):
return set(m for m in self._modules if not is_installable_module(m))
def to_requirements(self):
r = Requirements()
for mod in self._modules:
if is_installable_module(mod):
r.add(get_module_as_requirement(get_base_module(mod)))
elif is_local_module(mod):
r.add(CustomRequirement.from_module(mod))
return r
def _should_ignore(self, mod: ModuleType):
return any(mod.__name__.startswith(i) for i in self.ignoring) or \
is_private_module(mod) or is_pseudo_module(mod)
def _add_requirement(self, obj_or_module):
if not isinstance(obj_or_module, ModuleType):
try:
module = get_object_module(obj_or_module)
except AttributeError as e:
# Some internal Tensorflow 2.x object crashes `inspect` module on Python 3.6
logger.debug('Skipping dependency analysis for %s because of %s: %s', obj_or_module,
type(e).__name__, e)
return
else:
module = obj_or_module
if module is not None and not self._should_ignore(module):
self._modules.add(module)
if is_local_module(module):
# add imports of this module
for local_req in get_local_module_reqs(module):
if local_req in self._modules:
continue
self._add_requirement(local_req)
def save(self, obj, save_persistent_id=True):
if id(obj) in self.seen:
return
self.seen.add(id(obj))
self._add_requirement(obj)
try:
return super(EbonitePickler, self).save(obj, save_persistent_id)
except (ValueError, TypeError, PickleError) as e:
# if object cannot be serialized, it's probably a C object and we don't need to go deeper
logger.debug('Skipping dependency analysis for %s because of %s: %s', obj, type(e).__name__, e)
def skip_write(self, *args, **kwargs):
pass
[docs]def get_object_requirements(obj) -> Requirements:
"""
Analyzes packages required for given object to perform its function.
This function uses `pickle`/`dill` libraries serialization hooks internally.
Thus result of this function depend on given object being serializable by `pickle`/`dill` libraries:
all nodes in objects graph which can't be serialized are skipped and their dependencies are lost.
:param obj: obj to analyze
:return: :class:`.Requirements` object containing all required packages
"""
a = _EboniteRequirementAnalyzer(recurse=True)
a.dump(obj)
return a.to_requirements()