import abc
import asyncio
-from enum import Enum
from http import HTTPStatus
import os
import shlex
import yaml
from n2vc.loggable import Loggable
-
-
-class N2VCDeploymentStatus(Enum):
- PENDING = "pending"
- RUNNING = "running"
- COMPLETED = "completed"
- FAILED = "failed"
- UNKNOWN = "unknown"
+from n2vc.utils import EntityType, JujuStatusToOSM, N2VCDeploymentStatus
class N2VCConnector(abc.ABC, Loggable):
:param float total_timeout:
"""
+ @abc.abstractmethod
+ async def install_k8s_proxy_charm(
+ self,
+ charm_name: str,
+ namespace: str,
+ artifact_path: str,
+ db_dict: dict,
+ progress_timeout: float = None,
+ total_timeout: float = None,
+ config: dict = None,
+ ) -> str:
+ """
+ Install a k8s proxy charm
+
+ :param charm_name: Name of the charm being deployed
+ :param namespace: collection of all the uuids related to the charm.
+ :param str artifact_path: where to locate the artifacts (parent folder) using
+ the self.fs
+ the final artifact path will be a combination of this artifact_path and
+ additional string from the config_dict (e.g. charm name)
+ :param dict db_dict: where to write into database when the status changes.
+ It contains a dict with
+ {collection: <str>, filter: {}, path: <str>},
+ e.g. {collection: "nsrs", filter:
+ {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
+ :param float progress_timeout:
+ :param float total_timeout:
+ :param config: Dictionary with additional configuration
+
+ :returns ee_id: execution environment id.
+ """
+
@abc.abstractmethod
async def get_ee_ssh_public__key(
self,
else:
self.log.info("Exception writing status to database: {}".format(e))
+ def osm_status(self, entity_type: EntityType, status: str) -> N2VCDeploymentStatus:
+ if status not in JujuStatusToOSM[entity_type]:
+ self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
+ return N2VCDeploymentStatus.UNKNOWN
+ return JujuStatusToOSM[entity_type][status]
+
+# DEPRECATED
def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus:
if statustype == "application" or statustype == "unit":
if status in ["waiting", "maintenance"]: