Source code for ebonite.repository.metadata.base
from abc import abstractmethod
from functools import wraps
from typing import List, Optional, Sequence, TypeVar, Union
from pyjackson.decorators import type_field
from ebonite.client.expose import ExposedMethod
from ebonite.core import errors
from ebonite.core.objects import core
Project = 'core.Project'
Task = 'core.Task'
Model = 'core.Model'
Image = 'core.Image'
Pipeline = 'core.Pipeline'
RuntimeEnvironment = 'core.RuntimeEnvironment'
T = TypeVar('T')
NameOrIdOrObject = Union[int, str, T]
ProjectVar = NameOrIdOrObject[Project]
TaskVar = NameOrIdOrObject[Task]
ModelVar = NameOrIdOrObject[Model]
PipelineVar = NameOrIdOrObject[Pipeline]
EnvironmentVar = NameOrIdOrObject[RuntimeEnvironment]
class ExposedMetadataMethod(ExposedMethod):
def __init__(self, bind=True, name: str = None):
super().__init__(name)
self.bind = bind
self.method = None
def generate_code(self):
fields = self.get_signature().args
declaration = self.get_declaration().replace('core.', '')
result = f'''self.meta_repo.{self.original_name}({', '.join(f.name for f in fields)})'''
if self.bind:
result = f'''self._bind({result})'''
return f''' {declaration}
"""{self.method.__doc__}"""
return {result}'''
def bind_to_self(method):
"""
Decorator for methods which binds method result to metadata repository contained in `self` reference.
:param method: method to decorate
:return: decorated method
"""
@wraps(method)
def inner(self, *args, **kwargs):
res = method(self, *args, **kwargs)
if isinstance(res, core.EboniteObject):
res.bind_meta_repo(self)
elif isinstance(res, Sequence):
for o in res:
o.bind_meta_repo(self)
return res
return inner
[docs]@type_field('type')
class MetadataRepository:
"""
Abstract base class for persistent repositories of metadata (:class:`.core.Project`, :class:`.core.Task`, etc)
"""
type = None
@abstractmethod
@ExposedMetadataMethod()
def get_projects(self) -> List['core.Project']:
"""
Gets all projects in the repository
:return: all projects in the repository
"""
@abstractmethod
@ExposedMetadataMethod(name='get_project')
def get_project_by_name(self, name: str) -> Optional['core.Project']:
"""
Finds project in the repository by name
:param name: name of the project to return
:return: found project if exists or `None`
"""
@abstractmethod
def get_project_by_id(self, id: int) -> Optional['core.Project']:
"""
Finds project in the repository by identifier
:param id: project id
:return: found project if exists or `None`
"""
@abstractmethod
def create_project(self, project: Project) -> Project:
"""
Creates the project and all its tasks.
:param project: project to create
:return: created project
:exception: :exc:`.errors.ExistingProjectError` if given project has the same name as existing one.
"""
@abstractmethod
def update_project(self, project: Project) -> Project:
"""
Updates the project and all its tasks.
:param project: project to update
:return: updated project
:exception: :exc:`.errors.NonExistingProjectError` if given project doesn't exist in the repository
"""
@abstractmethod
def delete_project(self, project: Project):
"""
Deletes the project and all tasks.
:param project: project to delete
:return: nothing
:exception: :exc:`.errors.NonExistingProjectError` if given project doesn't exist in the repository
"""
def save_project(self, project: Project) -> Project:
"""
Saves project into the repository
:param project: project to save
:return: saved project
:exception: :exc:`.errors.ExistingProjectError` if given project has the same name as existing one.
"""
existing_project = self.get_project_by_name(project.name)
if project.id is None and existing_project is None:
return self.create_project(project)
elif existing_project is not None:
if project.id is None or existing_project.id != project.id:
raise errors.ExistingProjectError(existing_project)
return self.update_project(project)
@ExposedMetadataMethod()
def get_or_create_project(self, name: str) -> Project:
"""
Creates a project if not exists or gets existing project otherwise.
:param name: project name
:return: project
"""
project = self.get_project_by_name(name)
if project is None:
project = core.Project(name)
project = self.create_project(project)
return project
@abstractmethod
@ExposedMetadataMethod()
def get_tasks(self, project: ProjectVar) -> List['core.Task']:
"""
Gets a list of tasks for given project
:param project: project to search for tasks in
:return: project tasks
"""
@abstractmethod
@ExposedMetadataMethod(name='get_task')
def get_task_by_name(self, project: ProjectVar, task_name: str) -> Optional['core.Task']:
"""
Finds task with given name in given project
:param project: project to search for task in
:param task_name: expected name of task
:return: task if exists or `None`
"""
@abstractmethod
def get_task_by_id(self, id: int) -> Optional['core.Task']:
"""
Finds task with given id
:param id: id of task to search for
:return: task if exists or `None`
"""
@ExposedMetadataMethod()
def get_or_create_task(self, project: str, task_name: str) -> Task:
"""
Creates a task if not exists or gets existing task otherwise.
:param project: project to search/create task in
:param task_name: expected name of task
:return: created/found task
"""
project = self.get_or_create_project(project)
task = self.get_task_by_name(project, task_name)
if task is None:
task = core.Task(task_name, project_id=project.id)
task = self.create_task(task)
return task
@abstractmethod
def create_task(self, task: Task) -> Task:
"""
Creates task in a repository
:param task: task to create
:return: created task
:exception: :class:`.errors.ExistingTaskError` if given task has the same name and project as existing one
"""
@abstractmethod
def update_task(self, task: Task) -> Task:
"""
Updates task in a repository.
:param task: task to update
:return: updated task
:exception: :exc:`.errors.NonExistingTaskError` if given tasks doesn't exist in the repository
"""
@abstractmethod
def delete_task(self, task: Task):
"""
Deletes the task and all its models.
:param task: task to delete
:return: nothing
:exception: :exc:`.errors.NonExistingTaskError` if given tasks doesn't exist in the repository
"""
def save_task(self, task: Task) -> Task:
"""
Saves task into repository
:param task: task
:return: saved task
:exception: :class:`.errors.ExistingTaskError` if given task has the same name and project as existing one
"""
if task.project_id is None:
raise ValueError("A project is not assigned to the task {}".format(task))
existing_task = self.get_task_by_name(task.project_id, task.name)
if task.id is None and existing_task is None:
return self.create_task(task)
elif existing_task is not None:
if task.id is None or existing_task.id != task.id:
raise errors.ExistingTaskError(existing_task)
return self.update_task(task)
@abstractmethod
@ExposedMetadataMethod()
def get_models(self, task: TaskVar, project: ProjectVar = None) -> List['core.Model']:
"""
Gets a list of models in given project and task
:param task: task to search for models in
:param project: project to search for models in
:return: found models
"""
@abstractmethod
def get_model_by_name(self, model_name: str, task: TaskVar, project: ProjectVar = None) -> Optional['core.Model']:
"""
Finds model by name in given task and project.
:param model_name: expected model name
:param task: task to search for model in
:param project: project to search for model in
:return: found model if exists or `None`
"""
@abstractmethod
def get_model_by_id(self, id: int) -> Optional['core.Model']:
"""
Finds model by identifier.
:param id: expected model id
:return: found model if exists or `None`
"""
@abstractmethod
def create_model(self, model: Model) -> Model:
"""
Creates model in the repository
:param model: model to create
:return: created model
:exception: :exc:`.errors.ExistingModelError` if given model has the same name and task as existing one
"""
@abstractmethod
def update_model(self, model: Model) -> Model:
"""
Updates model in the repository
:param model: model to update
:return: updated model
:exception: :exc:`.errors.NonExistingModelError` if given model doesn't exist in the repository
"""
@abstractmethod
def delete_model(self, model: Model):
"""
Deletes model from the repository
:param model: model to delete
:return: nothing
:exception: :exc:`.errors.NonExistingModelError` if given model doesn't exist in the repository
"""
def save_model(self, model: Model) -> Model:
"""
Saves model in the repository
:param model: model to save
:return: saved model
:exception: :exc:`.errors.ExistingModelError` if given model has the same name and task as existing one
"""
if model.task_id is None:
raise ValueError("A task is not assigned to the model {}".format(model))
existing_model = self.get_model_by_name(model.name, model.task_id)
if model.id is None and existing_model is None:
return self.create_model(model)
elif existing_model is not None:
if model.id is None or existing_model.id != model.id:
raise errors.ExistingModelError(model)
return self.update_model(model)
# ___________________
@abstractmethod
@ExposedMetadataMethod()
def get_pipelines(self, task: TaskVar, project: ProjectVar = None) -> List['core.Pipeline']:
"""
Gets a list of pipelines in given project and task
:param task: task to search for models in
:param project: project to search for models in
:return: found pipelines
"""
@abstractmethod
@ExposedMetadataMethod(name='get_pipeline')
def get_pipeline_by_name(self, pipeline_name: str, task: TaskVar,
project: ProjectVar = None) -> Optional['core.Pipeline']:
"""
Finds model by name in given task and project.
:param pipeline_name: expected pipeline name
:param task: task to search for pipeline in
:param project: project to search for pipeline in
:return: found pipeline if exists or `None`
"""
@abstractmethod
def get_pipeline_by_id(self, id: int) -> Optional['core.Pipeline']:
"""
Finds model by identifier.
:param id: expected model id
:return: found model if exists or `None`
"""
@abstractmethod
def create_pipeline(self, pipeline: Pipeline) -> Pipeline:
"""
Creates model in the repository
:param pipeline: pipeline to create
:return: created pipeline
:exception: :exc:`.errors.ExistingPipelineError` if given model has the same name and task as existing one
"""
@abstractmethod
def update_pipeline(self, pipeline: Pipeline) -> Pipeline:
"""
Updates model in the repository
:param pipeline: pipeline to update
:return: updated model
:exception: :exc:`.errors.NonExistingPipelineError` if given pipeline doesn't exist in the repository
"""
@abstractmethod
def delete_pipeline(self, pipeline: Pipeline):
"""
Deletes model from the repository
:param pipeline: pipeline to delete
:return: nothing
:exception: :exc:`.errors.NonExistingPipelineError` if given pipeline doesn't exist in the repository
"""
def save_pipeline(self, pipeline: Pipeline) -> Pipeline:
"""
Saves pipeline in the repository
:param pipeline: model to save
:return: saved pipeline
:exception: :exc:`.errors.ExistingPipelineError` if given pipeline has the same name and task as existing one
"""
if pipeline.task_id is None:
raise ValueError("A task is not assigned to the pipeline {}".format(pipeline))
existing_pipeline = self.get_pipeline_by_name(pipeline.name, pipeline.task_id)
if pipeline.id is None and existing_pipeline is None:
return self.create_pipeline(pipeline)
elif existing_pipeline is not None:
if pipeline.id is None or existing_pipeline.id != pipeline.id:
raise errors.ExistingPipelineError(pipeline)
return self.update_pipeline(pipeline)
# _______________
@abstractmethod
@ExposedMetadataMethod()
def get_images(self, task: TaskVar, project: ProjectVar = None) -> List['core.Image']:
"""
Gets a list of images in given model, task and project
:param task: task to search for images in
:param project: project to search for images in
:return: found images
"""
@abstractmethod
@ExposedMetadataMethod(name='get_image')
def get_image_by_name(self, image_name: str, task: TaskVar, project: ProjectVar = None) -> Optional['core.Image']:
"""
Finds image by name in given model, task and project.
:param image_name: expected image name
:param task: task to search for image in
:param project: project to search for image in
:return: found image if exists or `None`
"""
@abstractmethod
def get_image_by_id(self, id: int) -> Optional['core.Image']:
"""
Finds image by identifier.
:param id: expected image id
:return: found image if exists or `None`
"""
@abstractmethod
def create_image(self, image: Image) -> Image:
"""
Creates image in the repository
:param image: image to create
:return: created image
:exception: :exc:`.errors.ExistingImageError` if given image has the same name and model as existing one
"""
@abstractmethod
def update_image(self, image: Image) -> Image:
"""
Updates image in the repository
:param image: image to update
:return: updated image
:exception: :exc:`.errors.NonExistingImageError` if given image doesn't exist in the repository
"""
@abstractmethod
def delete_image(self, image: Image):
"""
Deletes image from the repository
:param image: image to delete
:return: nothing
:exception: :exc:`.errors.NonExistingImageError` if given image doesn't exist in the repository
"""
def save_image(self, image: Image) -> Image:
"""
Saves image in the repository
:param image: image to save
:return: saved image
:exception: :exc:`.errors.ExistingImageError` if given image has the same name and model as existing one
"""
self._validate_image(image)
existing_image = self.get_image_by_name(image.name, image.task_id)
if image.id is None and existing_image is None:
return self.create_image(image)
elif existing_image is not None:
if image.id is None or existing_image.id != image.id:
raise errors.ExistingImageError(image)
return self.update_image(image)
@abstractmethod
@ExposedMetadataMethod()
def get_environments(self) -> List[RuntimeEnvironment]:
"""
Gets a list of runtime environments
:return: found runtime environments
"""
@abstractmethod
@ExposedMetadataMethod(name='get_environment')
def get_environment_by_name(self, name: str) -> Optional[RuntimeEnvironment]:
"""
Finds runtime environment by name.
:param name: expected runtime environment name
:return: found runtime environment if exists or `None`
"""
@abstractmethod
def get_environment_by_id(self, id: int) -> Optional[RuntimeEnvironment]:
"""
Finds runtime environment by identifier.
:param id: expected runtime environment id
:return: found runtime environment if exists or `None`
"""
@abstractmethod
@ExposedMetadataMethod(name='push_environment')
def create_environment(self, environment: 'core.RuntimeEnvironment') -> RuntimeEnvironment:
"""
Creates runtime environment in the repository
:param environment: runtime environment to create
:return: created runtime environment
:exception: :exc:`.errors.ExistingEnvironmentError` if given runtime environment has the same name as existing
"""
@abstractmethod
def update_environment(self, environment: 'core.RuntimeEnvironment') -> RuntimeEnvironment:
"""
Updates runtime environment in the repository
:param environment: runtime environment to update
:return: updated runtime environment
:exception: :exc:`.errors.NonExistingEnvironmentError` if given runtime environment doesn't exist in the
repository
"""
@abstractmethod
def delete_environment(self, environment: 'core.RuntimeEnvironment'):
"""
Deletes runtime environment from the repository
:param environment: runtime environment to delete
:return: nothing
:exception: :exc:`.errors.NonExistingEnvironmentError` if given runtime environment doesn't exist in the
repository
"""
def save_environment(self, environment: 'core.RuntimeEnvironment') -> RuntimeEnvironment:
"""
Saves runtime environment in the repository
:param environment: runtime environment to save
:return: saved runtime environment
:exception: :exc:`.errors.ExistingEnvironmentError` if given runtime environment has the same name as existing
"""
self._validate_environment(environment)
existing_environment = self.get_environment_by_name(environment.name)
if environment.id is None and existing_environment is None:
return self.create_environment(environment)
elif existing_environment is not None:
if environment.id is None or existing_environment.id != environment.id:
raise errors.ExistingEnvironmentError(environment)
return self.update_environment(environment)
@abstractmethod
@ExposedMetadataMethod()
def get_instances(self, image: Union[int, 'core.Image'] = None,
environment: Union[int, 'core.RuntimeEnvironment'] = None) -> List['core.RuntimeInstance']:
"""
Gets a list of instances in given image or environment
:param image: image (or id) to search for instances in
:param environment: environment (or id) to search for instances in
:return: found instances
"""
@abstractmethod
@ExposedMetadataMethod(name='get_instance')
def get_instance_by_name(self, instance_name: str, image: Union[int, 'core.Image'],
environment: Union[int, 'core.RuntimeEnvironment']) -> Optional['core.RuntimeInstance']:
"""
Finds instance by name in given image and environment.
:param instance_name: expected instance name
:param image: image (or id) to search for instance in
:param environment: environment (or id) to search for instance in
:return: found instance if exists or `None`
"""
@abstractmethod
def get_instance_by_id(self, id: int) -> Optional['core.RuntimeInstance']:
"""
Finds instance by identifier.
:param id: expected instance id
:return: found instance if exists or `None`
"""
@abstractmethod
def create_instance(self, instance: 'core.RuntimeInstance') -> 'core.RuntimeInstance':
"""
Creates instance in the repository
:param instance: instance to create
:return: created instance
:exception: :exc:`.errors.ExistingInstanceError` if given instance has the same name, image and environment
as existing one
"""
@abstractmethod
def update_instance(self, instance: 'core.RuntimeInstance') -> 'core.RuntimeInstance':
"""
Updates instance in the repository
:param instance: instance to update
:return: updated instance
:exception: :exc:`.errors.NonExistingInstanceError` if given instance doesn't exist in the repository
"""
@abstractmethod
def delete_instance(self, instance: 'core.RuntimeInstance'):
"""
Deletes instance from the repository
:param instance: instance to delete
:return: nothing
:exception: :exc:`.errors.NonExistingInstanceError` if given instance doesn't exist in the repository
"""
def save_instance(self, instance: 'core.RuntimeInstance') -> 'core.RuntimeInstance':
"""
Saves instance in the repository
:param instance: instance to save
:return: saved instance
:exception: :exc:`.errors.ExistingInstanceError` if given image has the same name, image and environment
as existing one
"""
self._validate_instance(instance)
existing_instance = self.get_instance_by_name(instance.name, instance.image_id, instance.environment_id)
if instance.id is None and existing_instance is None:
return self.create_instance(instance)
elif existing_instance is not None:
if instance.id is None or existing_instance.id != instance.id:
raise errors.ExistingInstanceError(instance)
return self.update_instance(instance)
def _resolve_project(self, project: ProjectVar) -> Optional['core.Project']:
if isinstance(project, core.Project):
project = project.id if project.id is not None else project.name
if isinstance(project, int):
return self.get_project_by_id(project)
return self.get_project_by_name(project)
def _resolve_task(self, task: TaskVar, project: ProjectVar = None) -> Optional['core.Task']:
if isinstance(task, core.Task):
task = task.id if task.id is not None else task.name
if isinstance(task, int):
return self.get_task_by_id(task)
if project is None:
raise ValueError('Cannot resolve task without project')
return self.get_task_by_name(project, task)
def _resolve_model(self, model: ModelVar, task: TaskVar = None, project: ProjectVar = None):
if isinstance(model, core.Model):
model = model.id if model.id is not None else model.name
if isinstance(model, int):
return self.get_model_by_id(model)
if task is None:
raise ValueError('Cannot resolve model without task')
return self.get_model_by_name(model, task, project)
def _resolve_environment(self, environment: EnvironmentVar) -> Optional[RuntimeEnvironment]:
if isinstance(environment, core.RuntimeEnvironment):
environment = environment.id if environment.id is not None else environment.name
if isinstance(environment, int):
return self.get_environment_by_id(environment)
return self.get_environment_by_name(environment)
def _validate_project(self, project: Project):
pass
def _validate_task(self, task: Task):
if task.project_id is None:
raise errors.TaskNotInProjectError(task)
def _validate_model(self, model: Model):
if model.task_id is None:
raise errors.ModelNotInTaskError(model)
def _validate_pipeline(self, pipeline: Pipeline):
if pipeline.task_id is None:
raise errors.PipelineNotInTaskError(pipeline)
def _validate_image(self, image: Image):
if image.task_id is None:
raise errors.ImageNotInTaskError(image)
def _validate_environment(self, environment: 'core.RuntimeEnvironment'):
pass
def _validate_instance(self, instance: 'core.RuntimeInstance'):
if instance.image_id is None:
raise errors.InstanceNotInImageError(instance)
if instance.environment_id is None:
raise errors.InstanceNotInEnvironmentError(instance)