X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FN2VC.git;a=blobdiff_plain;f=n2vc%2Fn2vc_conn.py;h=12704a3f35c622888fd5606126f137ac1343321a;hp=5e11b5f9708aba449b6e867d9bf09fa23d704346;hb=6331b04745fcd6d44b1b0320ca6e3e63cdebd0e8;hpb=0c478257d6bd8126b27d80f76d128c7cc21d0609;ds=sidebyside diff --git a/n2vc/n2vc_conn.py b/n2vc/n2vc_conn.py index 5e11b5f..12704a3 100644 --- a/n2vc/n2vc_conn.py +++ b/n2vc/n2vc_conn.py @@ -34,7 +34,7 @@ from osm_common.dbmongo import DbException import yaml from n2vc.loggable import Loggable -from n2vc.utils import EntityType, JujuStatusToOSM, N2VCDeploymentStatus +from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus class N2VCConnector(abc.ABC, Loggable): @@ -59,6 +59,7 @@ class N2VCConnector(abc.ABC, Loggable): username: str, vca_config: dict, on_update_db=None, + **kwargs, ): """Initialize N2VC abstract connector. It defines de API for VCA connectors @@ -117,7 +118,6 @@ class N2VCConnector(abc.ABC, Loggable): # generate private/public key-pair self.private_key_path = None self.public_key_path = None - self.get_public_key() @abc.abstractmethod async def get_status(self, namespace: str, yaml_format: bool = True): @@ -492,44 +492,13 @@ class N2VCConnector(abc.ABC, Loggable): else: self.log.info("Exception writing status to database: {}".format(e)) - def osm_status(self, entity_type: EntityType, status: str) -> N2VCDeploymentStatus: + def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus: if status not in JujuStatusToOSM[entity_type]: self.log.warning("Status {} not found in JujuStatusToOSM.".format(status)) return N2VCDeploymentStatus.UNKNOWN return JujuStatusToOSM[entity_type][status] -# DEPRECATED -def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus: - if statustype == "application" or statustype == "unit": - if status in ["waiting", "maintenance"]: - return N2VCDeploymentStatus.RUNNING - if status in ["error"]: - return N2VCDeploymentStatus.FAILED - elif status in ["active"]: - return N2VCDeploymentStatus.COMPLETED - elif status in ["blocked"]: - return N2VCDeploymentStatus.RUNNING - else: - return N2VCDeploymentStatus.UNKNOWN - elif statustype == "action": - if status in ["running"]: - return N2VCDeploymentStatus.RUNNING - elif status in ["completed"]: - return N2VCDeploymentStatus.COMPLETED - else: - return N2VCDeploymentStatus.UNKNOWN - elif statustype == "machine": - if status in ["pending"]: - return N2VCDeploymentStatus.PENDING - elif status in ["started"]: - return N2VCDeploymentStatus.COMPLETED - else: - return N2VCDeploymentStatus.UNKNOWN - - return N2VCDeploymentStatus.FAILED - - def obj_to_yaml(obj: object) -> str: # dump to yaml dump_text = yaml.dump(obj, default_flow_style=False, indent=2)