1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from juju
.machine
import Machine
21 from juju
.application
import Application
22 from juju
.action
import Action
23 from juju
.unit
import Unit
24 from n2vc
.exceptions
import N2VCInvalidCertificate
27 def base64_to_cacert(b64string
):
28 """Convert the base64-encoded string containing the VCA CACERT.
34 cacert
= base64
.b64decode(b64string
).decode("utf-8")
41 except binascii
.Error
as e
:
42 raise N2VCInvalidCertificate(message
="Invalid CA Certificate: {}".format(e
))
47 class N2VCDeploymentStatus(Enum
):
50 COMPLETED
= "completed"
57 Dict class that allows to access the keys like attributes
60 def __getattribute__(self
, name
):
65 class EntityType(Enum
):
67 APPLICATION
= Application
72 def has_value(cls
, value
):
73 return value
in cls
._value
2member
_map
_ # pylint: disable=E1101
76 def get_entity(cls
, value
):
78 cls
._value
2member
_map
_[value
] # pylint: disable=E1101
79 if value
in cls
._value
2member
_map
_ # pylint: disable=E1101
80 else None # pylint: disable=E1101
84 def get_entity_from_delta(cls
, delta_entity
: str):
86 Get Value from delta entity
88 :param: delta_entity: Possible values are "machine", "application", "unit", "action"
90 for v
in cls
._value
2member
_map
_: # pylint: disable=E1101
91 if v
.__name
__.lower() == delta_entity
:
92 return cls
.get_entity(v
)
97 "pending": N2VCDeploymentStatus
.PENDING
,
98 "started": N2VCDeploymentStatus
.COMPLETED
,
101 "waiting": N2VCDeploymentStatus
.RUNNING
,
102 "maintenance": N2VCDeploymentStatus
.RUNNING
,
103 "blocked": N2VCDeploymentStatus
.RUNNING
,
104 "error": N2VCDeploymentStatus
.FAILED
,
105 "active": N2VCDeploymentStatus
.COMPLETED
,
108 "pending": N2VCDeploymentStatus
.PENDING
,
109 "running": N2VCDeploymentStatus
.RUNNING
,
110 "completed": N2VCDeploymentStatus
.COMPLETED
,
113 "waiting": N2VCDeploymentStatus
.RUNNING
,
114 "maintenance": N2VCDeploymentStatus
.RUNNING
,
115 "blocked": N2VCDeploymentStatus
.RUNNING
,
116 "error": N2VCDeploymentStatus
.FAILED
,
117 "active": N2VCDeploymentStatus
.COMPLETED
,
122 def obj_to_yaml(obj
: object) -> str:
124 Converts object to yaml format
128 dump_text
= yaml
.dump(obj
, default_flow_style
=False, indent
=2)
130 lines
= dump_text
.splitlines()
131 # remove !!python/object tags
134 index
= line
.find("!!python/object")
137 yaml_text
+= line
+ "\n"
141 def obj_to_dict(obj
: object) -> dict:
143 Converts object to dictionary format
146 # convert obj to yaml
147 yaml_text
= obj_to_yaml(obj
)
149 return yaml
.load(yaml_text
, Loader
=yaml
.Loader
)