Source code for ebonite.repository.dataset.artifact

import pickle
from abc import abstractmethod
from typing import Tuple

from pyjackson.decorators import type_field

from ebonite.core.errors import ArtifactExistsError, DatasetExistsError, NoSuchArtifactError, NoSuchDataset
from ebonite.core.objects import ArtifactCollection, DatasetType
from ebonite.core.objects.artifacts import Blobs, InMemoryBlob
from ebonite.core.objects.base import EboniteParams
from ebonite.core.objects.dataset_source import Dataset, DatasetSource
from ebonite.repository import ArtifactRepository
from ebonite.repository.dataset.base import DatasetRepository


[docs]@type_field('type') class DatasetReader(EboniteParams): """ABC for reading Dataset from files (artifacts) to use with ArtifactDatasetSource""" @abstractmethod def read(self, artifacts: ArtifactCollection) -> Dataset: """Method to read Dataset from artifacts :param artifacts: artifacts to read"""
[docs]@type_field('type') class DatasetWriter(EboniteParams): """ABC for writing Dataset to files (artifacts) to use with ArtifactDatasetSource""" @abstractmethod def write(self, dataset: Dataset) -> Tuple[DatasetReader, ArtifactCollection]: """Method to write dataset to artifacts :param dataset: dataset to write :returns: tuple of DatasetReader and ArtifactCollection. DatasetReader must produce the same dataset if used with same artifacts"""
[docs]class OneFileDatasetReader(DatasetReader): def __init__(self, dataset_type: DatasetType): self.dataset_type = dataset_type
[docs] def read(self, artifacts: ArtifactCollection) -> Dataset: payload = artifacts.bytes_dict()[OneFileDatasetWriter.FILENAME] return Dataset(self.convert(payload), self.dataset_type)
[docs] @abstractmethod def convert(self, payload: bytes): """""" # TODO
[docs]class OneFileDatasetWriter(DatasetWriter): FILENAME = 'data'
[docs] def convert(self, instance) -> bytes: """"""
[docs] def write(self, dataset: Dataset) -> Tuple[DatasetReader, ArtifactCollection]: return OneFileDatasetReader(dataset.dataset_type), \ Blobs.from_blobs({self.FILENAME: InMemoryBlob(self.convert(dataset.data))})
[docs]class PrimitiveDatasetReader(OneFileDatasetReader):
[docs] def convert(self, payload: bytes): return self.dataset_type.to_type(payload)
[docs]class PrimitiveDatasetWriter(OneFileDatasetWriter):
[docs] def convert(self, instance) -> bytes: return str(instance).encode('utf8')
[docs]class PickleReader(OneFileDatasetReader):
[docs] def convert(self, payload: bytes): return pickle.loads(payload)
[docs]class PickleWriter(OneFileDatasetWriter):
[docs] def convert(self, instance) -> bytes: return pickle.dumps(instance)
[docs]class ArtifactDatasetRepository(DatasetRepository): """DatasetRpository implementation that saves datasets as artifacts to ArtifactRepository :param repo: underlying ArtifactRepository""" ARTIFACT_TYPE = 'datasets' def __init__(self, repo: ArtifactRepository): self.repo = repo
[docs] def save(self, dataset_id: str, dataset: Dataset) -> DatasetSource: writer = dataset.get_writer() if writer is None: raise ValueError(f'{dataset.dataset_type} does not support artifact persistance') reader, artifacts = writer.write(dataset) with artifacts.blob_dict() as blobs: try: pushed = self.repo.push_artifact(self.ARTIFACT_TYPE, dataset_id, blobs) except ArtifactExistsError as e: raise DatasetExistsError(dataset_id, self, e) return ArtifactDatasetSource(reader, pushed, dataset.dataset_type)
[docs] def delete(self, dataset_id: str): try: self.repo.delete_artifact(self.ARTIFACT_TYPE, dataset_id) except NoSuchArtifactError as e: raise NoSuchDataset(dataset_id, self, e)
[docs]class ArtifactDatasetSource(DatasetSource): """DatasetSource for reading datasets from ArtifactDatasetRepository :param reader: DatasetReader for this dataset :param artifacts: ArtifactCollection with actual files :param dataset_type: DatasetType of contained dataset""" def __init__(self, reader: DatasetReader, artifacts: ArtifactCollection, dataset_type: DatasetType): super(ArtifactDatasetSource, self).__init__(dataset_type) self.reader = reader self.artifacts = artifacts
[docs] def read(self) -> Dataset: return self.reader.read(self.artifacts)