Bug 1998 fixed: DeployedK8sResource is now obtaining the model name from the namespace
[osm/LCM.git] / osm_lcm / data_utils / vca.py
1 # Copyright 2021 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 from typing import Any, Dict, NoReturn
16
17
18 def safe_get_ee_relation(
19 nsr_id: str, ee_relation: Dict[str, Any], vnf_profile_id: str = None
20 ) -> Dict[str, Any]:
21 return {
22 "nsr-id": nsr_id,
23 "vnf-profile-id": ee_relation.get("vnf-profile-id") or vnf_profile_id,
24 "vdu-profile-id": ee_relation.get("vdu-profile-id"),
25 "kdu-resource-profile-id": ee_relation.get("kdu-resource-profile-id"),
26 "execution-environment-ref": ee_relation.get("execution-environment-ref"),
27 "endpoint": ee_relation["endpoint"],
28 }
29
30
31 class EELevel:
32 VDU = "vdu"
33 VNF = "vnf"
34 KDU = "kdu"
35 NS = "ns"
36
37 @staticmethod
38 def get_level(ee_relation: dict):
39 """Get the execution environment level"""
40 level = None
41 if (
42 not ee_relation["vnf-profile-id"]
43 and not ee_relation["vdu-profile-id"]
44 and not ee_relation["kdu-resource-profile-id"]
45 ):
46 level = EELevel.NS
47 elif (
48 ee_relation["vnf-profile-id"]
49 and not ee_relation["vdu-profile-id"]
50 and not ee_relation["kdu-resource-profile-id"]
51 ):
52 level = EELevel.VNF
53 elif (
54 ee_relation["vnf-profile-id"]
55 and ee_relation["vdu-profile-id"]
56 and not ee_relation["kdu-resource-profile-id"]
57 ):
58 level = EELevel.VDU
59 elif (
60 ee_relation["vnf-profile-id"]
61 and not ee_relation["vdu-profile-id"]
62 and ee_relation["kdu-resource-profile-id"]
63 ):
64 level = EELevel.KDU
65 else:
66 raise Exception("invalid relation endpoint")
67 return level
68
69
70 class EERelation(dict):
71 """Represents the execution environment of a relation"""
72
73 def __init__(
74 self,
75 relation_ee: Dict[str, Any],
76 ) -> NoReturn:
77 """
78 Args:
79 relation_ee: Relation Endpoint object in the VNFd or NSd.
80 Example:
81 {
82 "nsr-id": <>,
83 "vdu-profile-id": <>,
84 "kdu-resource-profile-id": <>,
85 "vnf-profile-id": <>,
86 "execution-environment-ref": <>,
87 "endpoint": <>,
88 }
89 """
90 for key, value in relation_ee.items():
91 self.__setitem__(key, value)
92
93 @property
94 def vdu_profile_id(self):
95 """Returns the vdu-profile id"""
96 return self["vdu-profile-id"]
97
98 @property
99 def kdu_resource_profile_id(self):
100 """Returns the kdu-resource-profile id"""
101 return self["kdu-resource-profile-id"]
102
103 @property
104 def vnf_profile_id(self):
105 """Returns the vnf-profile id"""
106 return self["vnf-profile-id"]
107
108 @property
109 def execution_environment_ref(self):
110 """Returns the reference to the execution environment (id)"""
111 return self["execution-environment-ref"]
112
113 @property
114 def endpoint(self):
115 """Returns the endpoint of the execution environment"""
116 return self["endpoint"]
117
118 @property
119 def nsr_id(self) -> str:
120 """Returns the nsr id"""
121 return self["nsr-id"]
122
123
124 class Relation(dict):
125 """Represents a relation"""
126 def __init__(self, name, provider: EERelation, requirer: EERelation) -> NoReturn:
127 """
128 Args:
129 name: Name of the relation.
130 provider: Execution environment that provides the service for the relation.
131 requirer: Execution environment that requires the service from the provider.
132 """
133 self.__setitem__("name", name)
134 self.__setitem__("provider", provider)
135 self.__setitem__("requirer", requirer)
136
137 @property
138 def name(self) -> str:
139 """Returns the name of the relation"""
140 return self["name"]
141
142 @property
143 def provider(self) -> EERelation:
144 """Returns the provider endpoint"""
145 return self["provider"]
146
147 @property
148 def requirer(self) -> EERelation:
149 """Returns the requirer endpoint"""
150 return self["requirer"]
151
152
153 class DeployedComponent(dict):
154 """Represents a deployed component (nsr["_admin"]["deployed"]["VCA" | "K8s"])"""
155
156 def __init__(self, data: Dict[str, Any]):
157 """
158 Args:
159 data: dictionary with the data of the deployed component
160 """
161 for key, value in data.items():
162 self.__setitem__(key, value)
163
164 @property
165 def vnf_profile_id(self):
166 """Returns the vnf-profile id"""
167 return self["member-vnf-index"]
168
169 @property
170 def ee_id(self):
171 raise NotImplementedError()
172
173 @property
174 def config_sw_installed(self) -> bool:
175 raise NotImplementedError()
176
177
178 class DeployedK8sResource(DeployedComponent):
179 """Represents a deployed component for a kdu resource"""
180 def __init__(self, data: Dict[str, Any]):
181 super().__init__(data)
182
183 @property
184 def ee_id(self):
185 """Returns the execution environment id"""
186 model = self["namespace"]
187 application_name = self["resource-name"]
188 return f"{model}.{application_name}.k8s"
189
190 @property
191 def config_sw_installed(self) -> bool:
192 return True
193
194
195 class DeployedVCA(DeployedComponent):
196 """Represents a VCA deployed component"""
197 def __init__(self, nsr_id: str, deployed_vca: Dict[str, Any]) -> NoReturn:
198 """
199 Args:
200 db_nsr: NS record
201 vca_index: Vca index for the deployed VCA
202 """
203 super().__init__(deployed_vca)
204 self.nsr_id = nsr_id
205
206 @property
207 def ee_id(self) -> str:
208 """Returns the execution environment id"""
209 return self["ee_id"]
210
211 @property
212 def vdu_profile_id(self) -> str:
213 """Returns the vdu-profile id"""
214 return self["vdu_id"]
215
216 @property
217 def execution_environment_ref(self) -> str:
218 """Returns the execution environment id"""
219 return self["ee_descriptor_id"]
220
221 @property
222 def config_sw_installed(self) -> bool:
223 return self.get("config_sw_installed", False)