import yaml
from n2vc.loggable import Loggable
-from n2vc.utils import EntityType, JujuStatusToOSM, N2VCDeploymentStatus
+from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
class N2VCConnector(abc.ABC, Loggable):
# generate private/public key-pair
self.private_key_path = None
self.public_key_path = None
- self.get_public_key()
@abc.abstractmethod
async def get_status(self, namespace: str, yaml_format: bool = True):
else:
self.log.info("Exception writing status to database: {}".format(e))
- def osm_status(self, entity_type: EntityType, status: str) -> N2VCDeploymentStatus:
+ def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
if status not in JujuStatusToOSM[entity_type]:
self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
return N2VCDeploymentStatus.UNKNOWN
return JujuStatusToOSM[entity_type][status]
-# DEPRECATED
-def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus:
- if statustype == "application" or statustype == "unit":
- if status in ["waiting", "maintenance"]:
- return N2VCDeploymentStatus.RUNNING
- if status in ["error"]:
- return N2VCDeploymentStatus.FAILED
- elif status in ["active"]:
- return N2VCDeploymentStatus.COMPLETED
- elif status in ["blocked"]:
- return N2VCDeploymentStatus.RUNNING
- else:
- return N2VCDeploymentStatus.UNKNOWN
- elif statustype == "action":
- if status in ["running"]:
- return N2VCDeploymentStatus.RUNNING
- elif status in ["completed"]:
- return N2VCDeploymentStatus.COMPLETED
- else:
- return N2VCDeploymentStatus.UNKNOWN
- elif statustype == "machine":
- if status in ["pending"]:
- return N2VCDeploymentStatus.PENDING
- elif status in ["started"]:
- return N2VCDeploymentStatus.COMPLETED
- else:
- return N2VCDeploymentStatus.UNKNOWN
-
- return N2VCDeploymentStatus.FAILED
-
-
def obj_to_yaml(obj: object) -> str:
# dump to yaml
dump_text = yaml.dump(obj, default_flow_style=False, indent=2)