1 # Copyright 2021 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
15 from typing
import Any
, Dict
, NoReturn
18 def safe_get_ee_relation(
19 nsr_id
: str, ee_relation
: Dict
[str, Any
], vnf_profile_id
: str = None
23 "vnf-profile-id": ee_relation
.get("vnf-profile-id") or vnf_profile_id
,
24 "vdu-profile-id": ee_relation
.get("vdu-profile-id"),
25 "kdu-resource-profile-id": ee_relation
.get("kdu-resource-profile-id"),
26 "execution-environment-ref": ee_relation
.get("execution-environment-ref"),
27 "endpoint": ee_relation
["endpoint"],
38 def get_level(ee_relation
: dict):
39 """Get the execution environment level"""
42 not ee_relation
["vnf-profile-id"]
43 and not ee_relation
["vdu-profile-id"]
44 and not ee_relation
["kdu-resource-profile-id"]
48 ee_relation
["vnf-profile-id"]
49 and not ee_relation
["vdu-profile-id"]
50 and not ee_relation
["kdu-resource-profile-id"]
54 ee_relation
["vnf-profile-id"]
55 and ee_relation
["vdu-profile-id"]
56 and not ee_relation
["kdu-resource-profile-id"]
60 ee_relation
["vnf-profile-id"]
61 and not ee_relation
["vdu-profile-id"]
62 and ee_relation
["kdu-resource-profile-id"]
66 raise Exception("invalid relation endpoint")
70 class EERelation(dict):
71 """Represents the execution environment of a relation"""
75 relation_ee
: Dict
[str, Any
],
79 relation_ee: Relation Endpoint object in the VNFd or NSd.
84 "kdu-resource-profile-id": <>,
86 "execution-environment-ref": <>,
90 for key
, value
in relation_ee
.items():
91 self
.__setitem
__(key
, value
)
94 def vdu_profile_id(self
):
95 """Returns the vdu-profile id"""
96 return self
["vdu-profile-id"]
99 def kdu_resource_profile_id(self
):
100 """Returns the kdu-resource-profile id"""
101 return self
["kdu-resource-profile-id"]
104 def vnf_profile_id(self
):
105 """Returns the vnf-profile id"""
106 return self
["vnf-profile-id"]
109 def execution_environment_ref(self
):
110 """Returns the reference to the execution environment (id)"""
111 return self
["execution-environment-ref"]
115 """Returns the endpoint of the execution environment"""
116 return self
["endpoint"]
119 def nsr_id(self
) -> str:
120 """Returns the nsr id"""
121 return self
["nsr-id"]
124 class Relation(dict):
125 """Represents a relation"""
126 def __init__(self
, name
, provider
: EERelation
, requirer
: EERelation
) -> NoReturn
:
129 name: Name of the relation.
130 provider: Execution environment that provides the service for the relation.
131 requirer: Execution environment that requires the service from the provider.
133 self
.__setitem
__("name", name
)
134 self
.__setitem
__("provider", provider
)
135 self
.__setitem
__("requirer", requirer
)
138 def name(self
) -> str:
139 """Returns the name of the relation"""
143 def provider(self
) -> EERelation
:
144 """Returns the provider endpoint"""
145 return self
["provider"]
148 def requirer(self
) -> EERelation
:
149 """Returns the requirer endpoint"""
150 return self
["requirer"]
153 class DeployedComponent(dict):
154 """Represents a deployed component (nsr["_admin"]["deployed"]["VCA" | "K8s"])"""
156 def __init__(self
, data
: Dict
[str, Any
]):
159 data: dictionary with the data of the deployed component
161 for key
, value
in data
.items():
162 self
.__setitem
__(key
, value
)
165 def vnf_profile_id(self
):
166 """Returns the vnf-profile id"""
167 return self
["member-vnf-index"]
171 raise NotImplementedError()
174 def config_sw_installed(self
) -> bool:
175 raise NotImplementedError()
178 class DeployedK8sResource(DeployedComponent
):
179 """Represents a deployed component for a kdu resource"""
180 def __init__(self
, data
: Dict
[str, Any
]):
181 super().__init
__(data
)
185 """Returns the execution environment id"""
186 model
= self
["namespace"]
187 application_name
= self
["resource-name"]
188 return f
"{model}.{application_name}.k8s"
191 def config_sw_installed(self
) -> bool:
195 class DeployedVCA(DeployedComponent
):
196 """Represents a VCA deployed component"""
197 def __init__(self
, nsr_id
: str, deployed_vca
: Dict
[str, Any
]) -> NoReturn
:
201 vca_index: Vca index for the deployed VCA
203 super().__init
__(deployed_vca
)
207 def ee_id(self
) -> str:
208 """Returns the execution environment id"""
212 def vdu_profile_id(self
) -> str:
213 """Returns the vdu-profile id"""
214 return self
["vdu_id"]
217 def execution_environment_ref(self
) -> str:
218 """Returns the execution environment id"""
219 return self
["ee_descriptor_id"]
222 def config_sw_installed(self
) -> bool:
223 return self
.get("config_sw_installed", False)