Source code for ebonite.repository.metadata.local

import copy
import os
from typing import Dict, List, Optional, Set, Tuple, Union

import pyjackson

from ebonite.core.errors import (EnvironmentWithInstancesError, ExistingEnvironmentError, ExistingImageError,
                                 ExistingInstanceError, ExistingModelError, ExistingPipelineError, ExistingProjectError,
                                 ExistingTaskError, ImageWithInstancesError, NonExistingEnvironmentError,
                                 NonExistingImageError, NonExistingInstanceError, NonExistingModelError,
                                 NonExistingPipelineError, NonExistingProjectError, NonExistingTaskError,
                                 ProjectWithTasksError, TaskWithFKError)
from ebonite.core.objects.core import Image, Model, Pipeline, Project, RuntimeEnvironment, RuntimeInstance, Task
from ebonite.repository.metadata.base import MetadataRepository, ProjectVar, TaskVar, bind_to_self
from ebonite.utils.log import logger

_Projects = Dict[int, Project]
_Tasks = Dict[int, Task]
_Models = Dict[int, Model]
_Pipelines = Dict[int, Pipeline]
_Images = Dict[int, Image]
_Environments = Dict[int, RuntimeEnvironment]
_Instances = Dict[int, RuntimeInstance]


class _LocalContainer:
    def __init__(self, next_project_id: int = 0, projects: _Projects = None,
                 next_task_id: int = 0, tasks: _Tasks = None,
                 next_model_id: int = 0, models: _Models = None,
                 next_pipeline_id: int = 0, pipelines: _Pipelines = None,
                 next_image_id: int = 0, images: _Images = None,
                 next_environment_id: int = 0, environments: _Environments = None,
                 next_instance_id: int = 0, instances: _Instances = None):
        self.next_project_id = next_project_id
        self.projects: _Projects = {}
        self.project_name_index: Dict[str, int] = {}

        self.next_task_id = next_task_id
        self.tasks: _Tasks = {}
        self.task_name_index: Dict[Tuple[int, str], int] = {}

        self.next_model_id = next_model_id
        self.models: _Models = {}
        self.model_name_index: Dict[Tuple[int, str], int] = {}

        self.next_pipeline_id = next_pipeline_id
        self.pipelines: _Pipelines = {}
        self.pipeline_name_index: Dict[Tuple[int, str], int] = {}

        self.next_image_id = next_image_id
        self.images: _Images = {}
        self.image_name_index: Dict[Tuple[int, str], int] = {}

        self.next_environment_id = next_environment_id
        self.environments: _Environments = {}
        self.environment_name_index: Dict[str, int] = {}

        self.next_instance_id = next_instance_id
        self.instances: _Instances = {}
        self.instance_name_index: Dict[Tuple[int, int, str], int] = {}
        self.instance_index: Dict[Tuple[int, int], Set[int]] = {}
        self.image_instance: Dict[int, Set[int]] = {}
        self.environment_instance: Dict[int, Set[int]] = {}

        for p in (projects or {}).values():
            self.add_project(p)

        for t in (tasks or {}).values():
            self.add_task(t)

        for m in (models or {}).values():
            self.add_model(m)

        for p in (pipelines or {}).values():
            self.add_pipeline(p)

        for i in (images or {}).values():
            self.add_image(i)

        for e in (environments or {}).values():
            self.add_environment(e)

        for i in (instances or {}).values():
            self.add_instance(i)

    def get_and_increment(self, name):
        next_id = getattr(self, name)
        setattr(self, name, next_id + 1)
        return next_id

    def add_project(self, project: Project):
        assert project.id is not None
        self.projects[project.id] = project
        self.project_name_index[project.name] = project.id

    def get_project_by_id(self, project_id):
        return self.projects.get(project_id)

    def get_project_by_name(self, name: str):
        return self.get_project_by_id(self.project_name_index.get(name, None))

    def remove_project(self, project_id):
        project = self.projects.pop(project_id, None)
        del self.project_name_index[project.name]
        return project

    def add_task(self, task: Task):
        assert task.id is not None
        self.tasks[task.id] = task
        self.task_name_index[(task.project_id, task.name)] = task.id
        self.projects[task.project_id]._tasks.add(task)

    def get_task_by_id(self, task_id):
        return self.tasks.get(task_id)

    def get_task_by_name(self, project_id: int, name: str):
        return self.get_task_by_id(self.task_name_index.get((project_id, name), None))

    def remove_task(self, task_id):
        task = self.tasks.pop(task_id, None)

        self.task_name_index.pop((task.project_id, task.name), None)
        del self.projects[task.project_id]._tasks[task.id]
        return task

    def add_model(self, model: Model):
        assert model.id is not None
        self.models[model.id] = model
        self.model_name_index[(model.task_id, model.name)] = model.id
        self.tasks[model.task_id]._models.add(model)

    def get_model_by_id(self, model_id):
        return self.models.get(model_id, None)

    def get_model_by_name(self, task_id: int, name: str):
        return self.get_model_by_id(self.model_name_index.get((task_id, name), None))

    def remove_model(self, model_id):
        model = self.models.pop(model_id, None)
        self.model_name_index.pop((model.task_id, model.name), None)
        del self.tasks[model.task_id]._models[model.id]
        return model

    def add_pipeline(self, pipeline: Pipeline):
        assert pipeline.id is not None
        self.pipelines[pipeline.id] = pipeline
        self.pipeline_name_index[(pipeline.task_id, pipeline.name)] = pipeline.id
        self.tasks[pipeline.task_id]._pipelines.add(pipeline)

    def get_pipeline_by_id(self, pipeline_id):
        return self.pipelines.get(pipeline_id, None)

    def get_pipeline_by_name(self, task_id: int, name: str):
        return self.get_pipeline_by_id(self.pipeline_name_index.get((task_id, name), None))

    def remove_pipeline(self, pipeline_id):
        pipeline = self.pipelines.pop(pipeline_id, None)
        self.pipeline_name_index.pop((pipeline.task_id, pipeline.name), None)
        del self.tasks[pipeline.task_id]._pipelines[pipeline.id]
        return pipeline

    def add_image(self, image: Image):
        assert image.id is not None
        self.images[image.id] = image
        self.image_name_index[(image.task_id, image.name)] = image.id
        self.tasks[image.task_id]._images.add(image)

    def get_image_by_id(self, image_id):
        return self.images.get(image_id, None)

    def get_image_by_name(self, task_id: int, name: str):
        return self.get_image_by_id(self.image_name_index.get((task_id, name), None))

    def remove_image(self, image_id):
        image = self.images.pop(image_id, None)
        self.image_name_index.pop((image.task_id, image.name), None)
        del self.tasks[image.task_id]._images[image.id]
        return image

    def add_environment(self, environment: RuntimeEnvironment):
        assert environment.id is not None
        self.environments[environment.id] = environment
        self.environment_name_index[environment.name] = environment.id

    def get_environment_by_id(self, environment_id):
        return self.environments.get(environment_id)

    def get_environment_by_name(self, name: str):
        return self.get_environment_by_id(self.environment_name_index.get(name, None))

    def remove_environment(self, environment_id):
        environment = self.environments.pop(environment_id, None)
        del self.environment_name_index[environment.name]
        return environment

    def add_instance(self, instance: RuntimeInstance):
        assert instance.id is not None
        self.instances[instance.id] = instance
        self.instance_name_index[(instance.environment_id, instance.image_id, instance.name)] = instance.id
        self.instance_index.setdefault((instance.environment_id, instance.image_id), set()).add(instance.id)
        self.environment_instance.setdefault(instance.environment_id, set()).add(instance.id)
        self.image_instance.setdefault(instance.image_id, set()).add(instance.id)

    def get_instance_by_id(self, instance_id: int):
        return self.instances.get(instance_id, None)

    def get_instance_by_name(self, environment_id: int, image_id: int, name: str):
        return self.get_instance_by_id(self.instance_name_index.get((environment_id, image_id, name), None))

    def get_instances(self, environment_id: int, image_id: int):
        return [self.get_instance_by_id(iid) for iid in self.instance_index.get((environment_id, image_id), set())]

    def get_instances_by_image_id(self, image_id):
        return [self.get_instance_by_id(iid) for iid in self.image_instance.get(image_id, set())]

    def get_instances_by_environment_id(self, environment_id):
        return [self.get_instance_by_id(iid) for iid in self.environment_instance.get(environment_id, set())]

    def remove_instance(self, instance_id: int):
        instance = self.instances.pop(instance_id, None)
        self.instance_name_index.pop((instance.environment_id, instance.image_id, instance.name), None)
        self.instance_index[(instance.environment_id, instance.image_id)].discard(instance_id)
        self.environment_instance[instance.environment_id].discard(instance_id)
        self.image_instance[instance.image_id].discard(instance_id)
        return instance


[docs]class LocalMetadataRepository(MetadataRepository): """ :class:`.MetadataRepository` implementation which stores metadata in a local filesystem as JSON file. Warning: file storage is completely overwritten on each update, thus this repository is not suitable for high-performance scenarios. :param path: path to json with the metadata, if `None` metadata is stored in-memory. """ type = 'local' def __init__(self, path=None): self.path = path if self.path is not None: os.makedirs(os.path.dirname(self.path), exist_ok=True) self.data: _LocalContainer = _LocalContainer() self.load() self.save()
[docs] def load(self): if self.path is not None and os.path.exists(self.path): with open(self.path, 'r', encoding='utf8') as f: logger.debug('Loading metadata from %s', self.path) self.data = pyjackson.load(f, _LocalContainer) else: self.data = _LocalContainer()
[docs] def save(self): if self.path is None: return with open(self.path, 'w', encoding='utf8') as f: logger.debug('Saving metadata to %s', self.path) pyjackson.dump(f, self.data)
[docs] @bind_to_self def get_projects(self) -> List[Project]: return copy.deepcopy([self.data.get_project_by_id(p) for p in self.data.projects.keys()])
[docs] @bind_to_self def get_project_by_name(self, name: str) -> Project: return copy.deepcopy(self.data.get_project_by_name(name))
[docs] @bind_to_self def get_project_by_id(self, id) -> Project: return copy.deepcopy(self.data.get_project_by_id(id))
[docs] @bind_to_self def create_project(self, project: Project) -> Project: if self.get_project_by_name(project.name) is not None: raise ExistingProjectError(project) project._id = self.data.get_and_increment('next_project_id') self.data.add_project(copy.deepcopy(project)) self.save() return project
[docs] def update_project(self, project: Project) -> Project: existing_project = self.get_project_by_id(project.id) if existing_project is None: raise NonExistingProjectError(project) self.data.remove_project(project.id) proj_copy = copy.deepcopy(project) self.data.add_project(proj_copy) for task in proj_copy.tasks.values(): self.save_task(task) self.save() return project
[docs] def delete_project(self, project: Project): try: if self.get_tasks(project): raise ProjectWithTasksError(project) self.data.remove_project(project.id) self.save() project.unbind_meta_repo() except (KeyError, AttributeError): raise NonExistingProjectError(project)
[docs] @bind_to_self def get_tasks(self, project: ProjectVar) -> List[Task]: project = self._resolve_project(project) return copy.deepcopy(list(project.tasks.values()))
[docs] @bind_to_self def get_task_by_name(self, project: ProjectVar, task_name: str) -> Optional[Task]: project = self._resolve_project(project) if project is None: return None return copy.deepcopy(self.data.get_task_by_name(project.id, task_name))
[docs] @bind_to_self def get_task_by_id(self, id) -> Task: return copy.deepcopy(self.data.get_task_by_id(id))
[docs] @bind_to_self def create_task(self, task: Task) -> Task: self._validate_task(task) existing_project = self.get_project_by_id(task.project_id) if existing_project is None: raise NonExistingProjectError(task.project_id) existing_task = self.get_task_by_name(existing_project, task.name) if existing_task is not None: raise ExistingTaskError(task) task._id = self.data.get_and_increment('next_task_id') self.data.add_task(copy.deepcopy(task)) self.save() return task
[docs] def update_task(self, task: Task) -> Task: if task.id is None or self.get_task_by_id(task.id) is None: raise NonExistingTaskError(task) self._validate_task(task) existing_project = self.get_project_by_id(task.project_id) if existing_project is None: raise NonExistingProjectError(task.project_id) self.data.remove_task(task.id) task_copy = copy.deepcopy(task) self.data.add_task(task_copy) for model in task_copy.models.values(): self.save_model(model) for pipeline in task_copy.pipelines.values(): self.save_pipeline(pipeline) for image in task_copy.images.values(): self.save_image(image) self.save() return task
[docs] def delete_task(self, task: Task): if task.id is None: raise NonExistingTaskError(task) if self.get_models(task) or self.get_pipelines(task) or self.get_images(task): raise TaskWithFKError(task) self.data.remove_task(task.id) self.save() task.unbind_meta_repo()
[docs] @bind_to_self def get_models(self, task: TaskVar, project: ProjectVar = None) -> List[Model]: task = self._resolve_task(task, project) return copy.deepcopy(list(task.models.values()))
[docs] @bind_to_self def get_model_by_name(self, model_name: str, task: TaskVar, project: ProjectVar = None) -> Optional[Model]: task = self._resolve_task(task, project) if task is None: return None return copy.deepcopy(self.data.get_model_by_name(task.id, model_name))
[docs] @bind_to_self def get_model_by_id(self, id) -> Model: return copy.deepcopy(self.data.get_model_by_id(id))
[docs] @bind_to_self def create_model(self, model: Model) -> Model: self._validate_model(model) existing_task = self.get_task_by_id(model.task_id) if existing_task is None: raise NonExistingTaskError(model.task_id) if self.get_model_by_name(model.name, existing_task) is not None: raise ExistingModelError(model) model._id = self.data.get_and_increment('next_model_id') self.data.add_model(copy.deepcopy(model)) self.save() return model
[docs] def update_model(self, model: Model) -> Model: self._validate_model(model) task = self.get_task_by_id(model.task_id) if task is None: raise NonExistingTaskError(model.task_id) existing_model = self.get_model_by_id(model.id) if existing_model is None: raise NonExistingModelError(model) self.data.remove_model(model.id) model_copy = copy.deepcopy(model) self.data.add_model(model_copy) self.save() return model
[docs] def delete_model(self, model: Model): if model.id is None: raise NonExistingModelError(model) self.data.remove_model(model.id) self.save() model.unbind_meta_repo()
# ____________-
[docs] @bind_to_self def get_pipelines(self, task: TaskVar, project: ProjectVar = None) -> List[Pipeline]: task = self._resolve_task(task, project) return copy.deepcopy(list(task.pipelines.values()))
[docs] @bind_to_self def get_pipeline_by_name(self, pipeline_name: str, task: TaskVar, project: ProjectVar = None) -> Optional[Pipeline]: task = self._resolve_task(task, project) if task is None: return None return copy.deepcopy(self.data.get_pipeline_by_name(task.id, pipeline_name))
[docs] @bind_to_self def get_pipeline_by_id(self, id) -> Pipeline: return copy.deepcopy(self.data.get_pipeline_by_id(id))
[docs] @bind_to_self def create_pipeline(self, pipeline: Pipeline) -> Pipeline: self._validate_pipeline(pipeline) existing_task = self.get_task_by_id(pipeline.task_id) if existing_task is None: raise NonExistingTaskError(pipeline.task_id) if self.get_pipeline_by_name(pipeline.name, existing_task) is not None: raise ExistingPipelineError(pipeline) pipeline._id = self.data.get_and_increment('next_pipeline_id') self.data.add_pipeline(copy.deepcopy(pipeline)) self.save() return pipeline
[docs] def update_pipeline(self, pipeline: Pipeline) -> Pipeline: self._validate_pipeline(pipeline) task = self.get_task_by_id(pipeline.task_id) if task is None: raise NonExistingTaskError(pipeline.task_id) existing_pipeline = self.get_pipeline_by_id(pipeline.id) if existing_pipeline is None: raise NonExistingPipelineError(pipeline) self.data.remove_pipeline(pipeline.id) pipeline_copy = copy.deepcopy(pipeline) self.data.add_pipeline(pipeline_copy) self.save() return pipeline
[docs] def delete_pipeline(self, pipeline: Pipeline): if pipeline.id is None: raise NonExistingPipelineError(pipeline) self.data.remove_pipeline(pipeline.id) self.save() pipeline.unbind_meta_repo()
# _____________
[docs] @bind_to_self def get_images(self, task: TaskVar, project: ProjectVar = None) -> List[Image]: task = self._resolve_task(task, project) return copy.deepcopy(list(task.images.values()))
[docs] @bind_to_self def get_image_by_name(self, image_name, task: TaskVar, project: ProjectVar = None) -> Optional[Image]: task = self._resolve_task(task, project) return copy.deepcopy(self.data.get_image_by_name(task.id, image_name))
[docs] @bind_to_self def get_image_by_id(self, id: int) -> Optional[Image]: return copy.deepcopy(self.data.get_image_by_id(id))
[docs] @bind_to_self def create_image(self, image: Image) -> Image: self._validate_image(image) task = self.get_task_by_id(image.task_id) if task is None: raise NonExistingTaskError(image.task_id) if self.get_image_by_name(image.name, task) is not None: raise ExistingImageError(image) image._id = self.data.get_and_increment('next_image_id') self.data.add_image(copy.deepcopy(image)) self.save() return image
[docs] def update_image(self, image: Image) -> Image: self._validate_image(image) existing_task = self.get_task_by_id(image.task_id) if existing_task is None: raise NonExistingTaskError(image.task_id) existing_image = self.get_image_by_id(image.id) if existing_image is None: raise NonExistingImageError(image) self.data.remove_image(image.id) self.data.add_image(copy.deepcopy(image)) self.save() return image
[docs] def delete_image(self, image: Image): if self.data.get_instances_by_image_id(image.id): raise ImageWithInstancesError(image) if image.id is None: raise NonExistingImageError(image) self.data.remove_image(image.id) self.save() image.unbind_meta_repo()
[docs] @bind_to_self def get_environments(self) -> List[RuntimeEnvironment]: return copy.deepcopy([self.data.get_environment_by_id(e) for e in self.data.environments.keys()])
[docs] @bind_to_self def get_environment_by_name(self, name) -> Optional[RuntimeEnvironment]: return copy.deepcopy(self.data.get_environment_by_name(name))
[docs] @bind_to_self def get_environment_by_id(self, id: int) -> Optional[RuntimeEnvironment]: return copy.deepcopy(self.data.get_environment_by_id(id))
[docs] @bind_to_self def create_environment(self, environment: RuntimeEnvironment) -> RuntimeEnvironment: self._validate_environment(environment) if self.get_environment_by_name(environment.name) is not None: raise ExistingEnvironmentError(environment) environment._id = self.data.get_and_increment('next_environment_id') self.data.add_environment(copy.deepcopy(environment)) self.save() return environment
[docs] def update_environment(self, environment: RuntimeEnvironment) -> RuntimeEnvironment: self._validate_environment(environment) existing_environment = self.get_environment_by_id(environment.id) if existing_environment is None: raise NonExistingEnvironmentError(environment) self.data.remove_environment(environment.id) self.data.add_environment(copy.deepcopy(environment)) self.save() return environment
[docs] def delete_environment(self, environment: RuntimeEnvironment): if self.data.get_instances_by_environment_id(environment.id): raise EnvironmentWithInstancesError(environment) try: self.data.remove_environment(environment.id) self.save() environment.unbind_meta_repo() except (KeyError, AttributeError): raise NonExistingEnvironmentError(environment)
[docs] @bind_to_self def get_instances(self, image: Union[int, Image] = None, environment: Union[int, RuntimeEnvironment] = None) \ -> List[RuntimeInstance]: if image is None and environment is None: raise ValueError('Image and environment were not provided to the function') if image is not None: image = image.id if isinstance(image, Image) else image if environment is not None: environment = environment.id if isinstance(environment, RuntimeEnvironment) else environment if image is not None and environment is not None: return self.data.get_instances(environment, image) elif image is not None: return self.data.get_instances_by_image_id(image) else: return self.data.get_instances_by_environment_id(environment)
[docs] @bind_to_self def get_instance_by_name(self, instance_name, image: Union[int, Image], environment: Union[int, RuntimeEnvironment]) -> Optional[RuntimeInstance]: image = image.id if isinstance(image, Image) else image environment = environment.id if isinstance(environment, RuntimeEnvironment) else environment return self.data.get_instance_by_name(environment, image, instance_name)
[docs] @bind_to_self def get_instance_by_id(self, id: int) -> Optional[RuntimeInstance]: return self.data.get_instance_by_id(id)
[docs] @bind_to_self def create_instance(self, instance: RuntimeInstance) -> RuntimeInstance: self._validate_instance(instance) image = self.get_image_by_id(instance.image_id) if image is None: raise NonExistingImageError(instance.image_id) environment = self.get_environment_by_id(instance.environment_id) if environment is None: raise NonExistingEnvironmentError(instance.environment_id) if self.get_instance_by_name(instance.name, image, environment) is not None: raise ExistingInstanceError(instance) instance._id = self.data.get_and_increment('next_instance_id') self.data.add_instance(copy.deepcopy(instance)) self.save() return instance
[docs] def update_instance(self, instance: RuntimeInstance) -> RuntimeInstance: self._validate_instance(instance) existing_instance = self.get_instance_by_id(instance.id) if existing_instance is None: raise NonExistingInstanceError(instance) self.data.remove_instance(instance.id) self.data.add_instance(copy.deepcopy(instance)) self.save() return instance
[docs] def delete_instance(self, instance: RuntimeInstance): try: self.data.remove_instance(instance.id) self.save() instance.unbind_meta_repo() except (KeyError, AttributeError): raise NonExistingInstanceError(instance)