Source code for ebonite.core.objects.artifacts

import contextlib
import io
import os
import shutil
import sys
import tempfile
import typing
from abc import abstractmethod
from copy import copy

from pyjackson.core import Unserializable
from pyjackson.decorators import make_string, type_field

from ebonite.core.objects.base import EboniteParams

StreamContextManager = typing.Iterable[typing.BinaryIO]


[docs]@type_field('type') class Blob(EboniteParams): """ This class is a base class for blobs. Blob is a binary payload, which can be accessed either through :meth:`~Blob.bytestream` context manager, which returns file-like object, or through :meth:`~Blob.materialize` method, which places a file in local fs Must be pyjackson-able or marked Unserializable """ type = None @abstractmethod def materialize(self, path): """ Implementation must write blob's payload as a file to local fs :param path: path to write file """ pass # pragma: no cover @abstractmethod @contextlib.contextmanager def bytestream(self) -> StreamContextManager: """ Implementation must be a context manager (using @contextlib.contextmanager decorator) that returns file-like object with blob's payload :yields: file-like object """ pass # pragma: no cover def bytes(self) -> bytes: """ Returns blob's bytes :return: bytes """ with self.bytestream() as bs: return bs.read()
[docs]class LocalFileBlob(Blob): """ Blob implementation for local file :param path: path to local file """ type = 'local_file' def __init__(self, path: str): self.path = path
[docs] def materialize(self, path): """ Copies local file to another path :param path: target path """ os.makedirs(os.path.dirname(path), exist_ok=True) shutil.copy(self.path, path)
[docs] @contextlib.contextmanager def bytestream(self) -> StreamContextManager: """ Opens file for reading :return: file handler """ with open(self.path, 'rb') as f: yield f
# noinspection PyAbstractClass
[docs]class MaterializeOnlyBlobMixin(Blob): """ Mixin for blobs which always have to be materialized first """ def _get_file_name(self): return 'blob'
[docs] @contextlib.contextmanager def bytestream(self) -> StreamContextManager: """ Materializes blob to temporary dir and returns it's file handler :return: file handler """ with tempfile.TemporaryDirectory() as tmpdir: path = os.path.join(tmpdir, self._get_file_name()) self.materialize(path) with open(path, 'rb') as f: yield f
[docs]class InMemoryBlob(Blob, Unserializable): """ Blob implementation for in-memory bytes :param payload: bytes """ type = 'inmemory' def __init__(self, payload: bytes): self.payload = payload
[docs] def materialize(self, path): """ Writes payload to path :param path: target path """ os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'wb') as f: f.write(self.payload)
[docs] @contextlib.contextmanager def bytestream(self) -> StreamContextManager: """ Creates BytesIO object from bytes :yields: file-like object """ yield io.BytesIO(self.payload)
[docs]class LazyBlob(Blob, Unserializable): """Represents a lazy blob, which is computed only when needed :param source: function with no arguments, that must return str, bytes or file-like object :param encoding: encoding for payload if source returns str of io.StringIO """ def __init__(self, source: typing.Callable[[], typing.Union[str, bytes, typing.IO]], encoding: str = 'utf8'): self.encoding = encoding self.source = source
[docs] def materialize(self, path): """ Writes payload to path :param path: target path """ os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'wb') as f: source = self.source() if not isinstance(source, (str, bytes)): source.seek(0) source = source.read() if isinstance(source, str): source = source.encode(self.encoding) f.write(source)
[docs] @contextlib.contextmanager def bytestream(self) -> StreamContextManager: """ Creates BytesIO object from bytes :yields: file-like object """ source = self.source() source.seek(0) yield source
[docs]@type_field('type') class ArtifactCollection(EboniteParams): """ Base class for artifact collection. Artifact collection is a number of named artifacts, represented by Blob's Must be pyjackson-able """ type = None @abstractmethod def materialize(self, path): """ Implementation must write artifacts payload to local fs :param path: dir to write blobs """ pass # pragma: no cover @abstractmethod def bytes_dict(self) -> typing.Dict[str, bytes]: """ Implementation must return a dict of artifact name -> artifact payload :returns: dict of artifact names -> artifact payloads """ pass # pragma: no cover @abstractmethod def blob_dict(self) -> typing.ContextManager[typing.Dict[str, Blob]]: """ Implementation must be a context manager (using `@contextlib.contexmanager` decorator) that yields a dict of artifact name -> blob :yields: dict of artifact name -> blob """ pass # pragma: no cover def __add__(self, other): """ Creates a :py:class:`CompositeArtifactCollection` with union of artifacts from self and other ArtifactCollection :param other: :return: """ if not isinstance(other, ArtifactCollection): raise ValueError('Cant and {} to ArtifactCollection'.format(other)) return CompositeArtifactCollection([self, other]) @staticmethod def from_blobs(files: typing.Dict[str, 'Blob']): return Blobs(files)
[docs]@make_string class Blobs(ArtifactCollection): """ Artifact collection represented by a dictionary of blobs :param blobs: dict of name -> blob """ type = 'blobs' def __init__(self, blobs: typing.Dict[str, Blob]): self.blobs = blobs
[docs] def materialize(self, path): """ Materializes artifacts to path :param path: target dir """ os.makedirs(path, exist_ok=True) for name, blob in self.blobs.items(): blob.materialize(os.path.join(path, name))
[docs] def bytes_dict(self) -> typing.Dict[str, bytes]: return {name: b.bytes() for name, b in self.blobs.items()}
[docs] @contextlib.contextmanager def blob_dict(self) -> typing.ContextManager[typing.Dict[str, Blob]]: """ :yields: self.blobs """ yield self.blobs
def _enter_all_cm(managers): entered = [] dicts = [] try: for m in managers: dicts.append(m.__enter__()) entered.append(m) except Exception as e: for m in reversed(entered): try: m.__exit__(type(e), e, sys.exc_traceback) except Exception as e2: e = e2 raise e return dicts class _ExitAllCm: def __init__(self, managers): self.managers = managers def __enter__(self): pass def __exit__(self, exc_type, exc_val, exc_tb): for m in reversed(self.managers): try: m.__exit__(exc_type, exc_val, exc_tb) except Exception as e: exc_type, exc_val, exc_tb = type(e), e, sys.exc_traceback
[docs]class CompositeArtifactCollection(ArtifactCollection): """ Represents a merger of two or more ArtifactCollections :param artifacts: ArtifactCollections to merge """ type = 'composite' def __init__(self, artifacts: typing.List[ArtifactCollection]): self.artifacts = artifacts
[docs] def materialize(self, path): """ Materializes every ArtifactCollection to path :param path: target dir """ for a in self.artifacts: a.materialize(path)
[docs] def bytes_dict(self) -> typing.Dict[str, bytes]: return {k: v for a in self.artifacts for k, v in a.bytes_dict().items()}
[docs] @contextlib.contextmanager def blob_dict(self) -> typing.ContextManager[typing.Dict[str, Blob]]: """ Enters all ArtifactCollections blob_dict context managers and returns their union :yields: name -> blob mapping """ managers: typing.List[typing.ContextManager] = [a.blob_dict() for a in self.artifacts] dicts = _enter_all_cm(managers) with _ExitAllCm(managers): yield {name: blob for d in dicts for name, blob in d.items()}
def __add__(self, other): if not isinstance(other, ArtifactCollection): raise ValueError('Cant and {} to ArtifactCollection'.format(other)) arts = copy(self.artifacts) if isinstance(other, CompositeArtifactCollection): arts += other.artifacts else: arts.append(other) return CompositeArtifactCollection(arts) def __iadd__(self, other): if not isinstance(other, ArtifactCollection): raise ValueError('Cant and {} to ArtifactCollection') if isinstance(other, CompositeArtifactCollection): self.artifacts += other.artifacts else: self.artifacts.append(other) return self
class _RelativePathWrapper(ArtifactCollection): """ ArtifactCollection that adds path as a parent dir to every artifact :param path: parent dir """ type = 'relative' def __init__(self, artifact: ArtifactCollection, path: str): self.artifact = artifact self.path = path def materialize(self, path): return self.artifact.materialize(os.path.join(path, self.path)) def bytes_dict(self) -> typing.Dict[str, bytes]: return {os.path.join(self.path, p): b for p, b in self.artifact.bytes_dict().items()} @contextlib.contextmanager def blob_dict(self) -> typing.Iterable[typing.Dict[str, Blob]]: with self.artifact.blob_dict() as blobs: yield {os.path.join(self.path, p): b for p, b in blobs.items()}