# See the License for the specific language governing permissions and
# limitations under the License.
+import base64
+import re
+import binascii
+import yaml
from enum import Enum
from juju.machine import Machine
from juju.application import Application
from juju.action import Action
from juju.unit import Unit
+from n2vc.exceptions import N2VCInvalidCertificate
+
+
+def base64_to_cacert(b64string):
+ """Convert the base64-encoded string containing the VCA CACERT.
+
+ The input string....
+
+ """
+ try:
+ cacert = base64.b64decode(b64string).decode("utf-8")
+
+ cacert = re.sub(r"\\n", r"\n", cacert,)
+ except binascii.Error as e:
+ raise N2VCInvalidCertificate(message="Invalid CA Certificate: {}".format(e))
+
+ return cacert
class N2VCDeploymentStatus(Enum):
return cls.get_entity(v)
-FinalStatus = Dict(
- {
- EntityType.MACHINE: Dict({"field": "agent_status", "status": ["started"]}),
- EntityType.APPLICATION: Dict(
- {"field": "status", "status": ["active", "blocked"]}
- ),
- EntityType.ACTION: Dict(
- {"field": "status", "status": ["completed", "failed", "cancelled"]}
- ),
- }
-)
-
JujuStatusToOSM = {
- EntityType.MACHINE: {
+ "machine": {
"pending": N2VCDeploymentStatus.PENDING,
"started": N2VCDeploymentStatus.COMPLETED,
},
- EntityType.APPLICATION: {
+ "application": {
"waiting": N2VCDeploymentStatus.RUNNING,
"maintenance": N2VCDeploymentStatus.RUNNING,
"blocked": N2VCDeploymentStatus.RUNNING,
"error": N2VCDeploymentStatus.FAILED,
"active": N2VCDeploymentStatus.COMPLETED,
},
- EntityType.ACTION: {
+ "action": {
"pending": N2VCDeploymentStatus.PENDING,
"running": N2VCDeploymentStatus.RUNNING,
"completed": N2VCDeploymentStatus.COMPLETED,
},
- EntityType.UNIT: {
+ "unit": {
"waiting": N2VCDeploymentStatus.RUNNING,
"maintenance": N2VCDeploymentStatus.RUNNING,
"blocked": N2VCDeploymentStatus.RUNNING,
)
}
)
+
+
+def obj_to_yaml(obj: object) -> str:
+ """
+ Converts object to yaml format
+ :return: yaml data
+ """
+ # dump to yaml
+ dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
+ # split lines
+ lines = dump_text.splitlines()
+ # remove !!python/object tags
+ yaml_text = ""
+ for line in lines:
+ index = line.find("!!python/object")
+ if index >= 0:
+ line = line[:index]
+ yaml_text += line + "\n"
+ return yaml_text
+
+
+def obj_to_dict(obj: object) -> dict:
+ """
+ Converts object to dictionary format
+ :return: dict data
+ """
+ # convert obj to yaml
+ yaml_text = obj_to_yaml(obj)
+ # parse to dict
+ return yaml.load(yaml_text, Loader=yaml.Loader)