1 |
|
## |
2 |
|
# Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U. |
3 |
|
# |
4 |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
# you may not use this file except in compliance with the License. |
6 |
|
# You may obtain a copy of the License at |
7 |
|
# |
8 |
|
# http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
# |
10 |
|
# Unless required by applicable law or agreed to in writing, software |
11 |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
13 |
|
# implied. |
14 |
|
# See the License for the specific language governing permissions and |
15 |
|
# limitations under the License. |
16 |
|
# |
17 |
|
## |
18 |
1 |
import functools |
19 |
1 |
import yaml |
20 |
1 |
import asyncio |
21 |
1 |
import uuid |
22 |
1 |
import os |
23 |
1 |
import ssl |
24 |
|
|
25 |
1 |
from grpclib.client import Channel |
26 |
|
|
27 |
1 |
from osm_lcm.data_utils.lcm_config import VcaConfig |
28 |
1 |
from osm_lcm.frontend_pb2 import PrimitiveRequest |
29 |
1 |
from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply |
30 |
1 |
from osm_lcm.frontend_grpc import FrontendExecutorStub |
31 |
1 |
from osm_lcm.lcm_utils import LcmBase, get_ee_id_parts |
32 |
|
|
33 |
1 |
from osm_lcm.data_utils.database.database import Database |
34 |
1 |
from osm_lcm.data_utils.filesystem.filesystem import Filesystem |
35 |
|
|
36 |
1 |
from n2vc.n2vc_conn import N2VCConnector |
37 |
1 |
from n2vc.k8s_helm_conn import K8sHelmConnector |
38 |
1 |
from n2vc.k8s_helm3_conn import K8sHelm3Connector |
39 |
1 |
from n2vc.exceptions import ( |
40 |
|
N2VCBadArgumentsException, |
41 |
|
N2VCException, |
42 |
|
N2VCExecutionException, |
43 |
|
) |
44 |
|
|
45 |
1 |
from osm_lcm.lcm_utils import deep_get |
46 |
|
|
47 |
|
|
48 |
1 |
def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"): |
49 |
1 |
def wrapper(func): |
50 |
1 |
retry_exceptions = ConnectionRefusedError |
51 |
|
|
52 |
1 |
@functools.wraps(func) |
53 |
1 |
async def wrapped(*args, **kwargs): |
54 |
|
# default values for wait time and delay_time |
55 |
1 |
delay_time = 10 |
56 |
1 |
max_wait_time = 300 |
57 |
|
|
58 |
|
# obtain arguments from variable names |
59 |
1 |
self = args[0] |
60 |
1 |
if self.__dict__.get(max_wait_time_var): |
61 |
1 |
max_wait_time = self.__dict__.get(max_wait_time_var) |
62 |
1 |
if self.__dict__.get(delay_time_var): |
63 |
1 |
delay_time = self.__dict__.get(delay_time_var) |
64 |
|
|
65 |
1 |
wait_time = max_wait_time |
66 |
1 |
while wait_time > 0: |
67 |
1 |
try: |
68 |
1 |
return await func(*args, **kwargs) |
69 |
0 |
except retry_exceptions: |
70 |
0 |
wait_time = wait_time - delay_time |
71 |
0 |
await asyncio.sleep(delay_time) |
72 |
0 |
continue |
73 |
|
else: |
74 |
0 |
return ConnectionRefusedError |
75 |
|
|
76 |
1 |
return wrapped |
77 |
|
|
78 |
1 |
return wrapper |
79 |
|
|
80 |
|
|
81 |
1 |
def create_secure_context( |
82 |
|
trusted: str, |
83 |
|
) -> ssl.SSLContext: |
84 |
0 |
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
85 |
0 |
ctx.verify_mode = ssl.CERT_REQUIRED |
86 |
0 |
ctx.check_hostname = True |
87 |
0 |
ctx.minimum_version = ssl.TLSVersion.TLSv1_2 |
88 |
|
# TODO: client TLS |
89 |
|
# ctx.load_cert_chain(str(client_cert), str(client_key)) |
90 |
0 |
ctx.load_verify_locations(trusted) |
91 |
0 |
ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20") |
92 |
0 |
ctx.set_alpn_protocols(["h2"]) |
93 |
0 |
try: |
94 |
0 |
ctx.set_npn_protocols(["h2"]) |
95 |
0 |
except NotImplementedError: |
96 |
0 |
pass |
97 |
0 |
return ctx |
98 |
|
|
99 |
|
|
100 |
1 |
class LCMHelmConn(N2VCConnector, LcmBase): |
101 |
1 |
def __init__( |
102 |
|
self, |
103 |
|
log: object = None, |
104 |
|
loop: object = None, |
105 |
|
vca_config: VcaConfig = None, |
106 |
|
on_update_db=None, |
107 |
|
): |
108 |
|
""" |
109 |
|
Initialize EE helm connector. |
110 |
|
""" |
111 |
|
|
112 |
1 |
self.db = Database().instance.db |
113 |
1 |
self.fs = Filesystem().instance.fs |
114 |
|
|
115 |
|
# parent class constructor |
116 |
1 |
N2VCConnector.__init__( |
117 |
|
self, log=log, loop=loop, on_update_db=on_update_db, db=self.db, fs=self.fs |
118 |
|
) |
119 |
|
|
120 |
1 |
self.vca_config = vca_config |
121 |
1 |
self.log.debug("Initialize helm N2VC connector") |
122 |
1 |
self.log.debug("initial vca_config: {}".format(vca_config.to_dict())) |
123 |
|
|
124 |
1 |
self._retry_delay = self.vca_config.helm_ee_retry_delay |
125 |
|
|
126 |
1 |
self._initial_retry_time = self.vca_config.helm_max_initial_retry_time |
127 |
1 |
self.log.debug("Initial retry time: {}".format(self._initial_retry_time)) |
128 |
|
|
129 |
1 |
self._max_retry_time = self.vca_config.helm_max_retry_time |
130 |
1 |
self.log.debug("Retry time: {}".format(self._max_retry_time)) |
131 |
|
|
132 |
|
# initialize helm connector for helmv2 and helmv3 |
133 |
1 |
self._k8sclusterhelm2 = K8sHelmConnector( |
134 |
|
kubectl_command=self.vca_config.kubectlpath, |
135 |
|
helm_command=self.vca_config.helmpath, |
136 |
|
fs=self.fs, |
137 |
|
db=self.db, |
138 |
|
log=self.log, |
139 |
|
on_update_db=None, |
140 |
|
) |
141 |
|
|
142 |
1 |
self._k8sclusterhelm3 = K8sHelm3Connector( |
143 |
|
kubectl_command=self.vca_config.kubectlpath, |
144 |
|
helm_command=self.vca_config.helm3path, |
145 |
|
fs=self.fs, |
146 |
|
log=self.log, |
147 |
|
db=self.db, |
148 |
|
on_update_db=None, |
149 |
|
) |
150 |
|
|
151 |
1 |
self._system_cluster_id = None |
152 |
1 |
self.log.info("Helm N2VC connector initialized") |
153 |
|
|
154 |
|
# TODO - ¿reuse_ee_id? |
155 |
1 |
async def create_execution_environment( |
156 |
|
self, |
157 |
|
namespace: str, |
158 |
|
db_dict: dict, |
159 |
|
reuse_ee_id: str = None, |
160 |
|
progress_timeout: float = None, |
161 |
|
total_timeout: float = None, |
162 |
|
config: dict = None, |
163 |
|
artifact_path: str = None, |
164 |
|
chart_model: str = None, |
165 |
|
vca_type: str = None, |
166 |
|
*kargs, |
167 |
|
**kwargs, |
168 |
|
) -> (str, dict): |
169 |
|
""" |
170 |
|
Creates a new helm execution environment deploying the helm-chat indicated in the |
171 |
|
artifact_path |
172 |
|
:param str namespace: This param is not used, all helm charts are deployed in the osm |
173 |
|
system namespace |
174 |
|
:param dict db_dict: where to write to database when the status changes. |
175 |
|
It contains a dictionary with {collection: str, filter: {}, path: str}, |
176 |
|
e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: |
177 |
|
"_admin.deployed.VCA.3"} |
178 |
|
:param str reuse_ee_id: ee id from an older execution. TODO - right now this param is not used |
179 |
|
:param float progress_timeout: |
180 |
|
:param float total_timeout: |
181 |
|
:param dict config: General variables to instantiate KDU |
182 |
|
:param str artifact_path: path of package content |
183 |
|
:param str chart_model: helm chart/reference (string), which can be either |
184 |
|
of these options: |
185 |
|
- a name of chart available via the repos known by OSM |
186 |
|
(e.g. stable/openldap, stable/openldap:1.2.4) |
187 |
|
- a path to a packaged chart (e.g. mychart.tgz) |
188 |
|
- a path to an unpacked chart directory or a URL (e.g. mychart) |
189 |
|
:param str vca_type: Type of vca, must be type helm or helm-v3 |
190 |
|
:returns str, dict: id of the new execution environment including namespace.helm_id |
191 |
|
and credentials object set to None as all credentials should be osm kubernetes .kubeconfig |
192 |
|
""" |
193 |
|
|
194 |
1 |
self.log.info( |
195 |
|
"create_execution_environment: namespace: {}, artifact_path: {}, " |
196 |
|
"chart_model: {}, db_dict: {}, reuse_ee_id: {}".format( |
197 |
|
namespace, artifact_path, db_dict, chart_model, reuse_ee_id |
198 |
|
) |
199 |
|
) |
200 |
|
|
201 |
|
# Validate artifact-path is provided |
202 |
1 |
if artifact_path is None or len(artifact_path) == 0: |
203 |
0 |
raise N2VCBadArgumentsException( |
204 |
|
message="artifact_path is mandatory", bad_args=["artifact_path"] |
205 |
|
) |
206 |
|
|
207 |
|
# Validate artifact-path exists and sync path |
208 |
1 |
from_path = os.path.split(artifact_path)[0] |
209 |
1 |
self.fs.sync(from_path) |
210 |
|
|
211 |
|
# remove / in charm path |
212 |
1 |
while artifact_path.find("//") >= 0: |
213 |
0 |
artifact_path = artifact_path.replace("//", "/") |
214 |
|
|
215 |
|
# check charm path |
216 |
1 |
if self.fs.file_exists(artifact_path): |
217 |
1 |
helm_chart_path = artifact_path |
218 |
|
else: |
219 |
0 |
msg = "artifact path does not exist: {}".format(artifact_path) |
220 |
0 |
raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"]) |
221 |
|
|
222 |
1 |
if artifact_path.startswith("/"): |
223 |
0 |
full_path = self.fs.path + helm_chart_path |
224 |
|
else: |
225 |
1 |
full_path = self.fs.path + "/" + helm_chart_path |
226 |
|
|
227 |
1 |
while full_path.find("//") >= 0: |
228 |
1 |
full_path = full_path.replace("//", "/") |
229 |
|
|
230 |
|
# By default, the KDU is expected to be a file |
231 |
1 |
kdu_model = full_path |
232 |
|
# If the chart_model includes a "/", then it is a reference: |
233 |
|
# e.g. (stable/openldap; stable/openldap:1.2.4) |
234 |
1 |
if chart_model.find("/") >= 0: |
235 |
0 |
kdu_model = chart_model |
236 |
|
|
237 |
1 |
try: |
238 |
|
# Call helm conn install |
239 |
|
# Obtain system cluster id from database |
240 |
1 |
system_cluster_uuid = await self._get_system_cluster_id() |
241 |
|
# Add parameter osm if exist to global |
242 |
1 |
if config and config.get("osm"): |
243 |
0 |
if not config.get("global"): |
244 |
0 |
config["global"] = {} |
245 |
0 |
config["global"]["osm"] = config.get("osm") |
246 |
|
|
247 |
1 |
self.log.debug("install helm chart: {}".format(full_path)) |
248 |
1 |
if vca_type == "helm": |
249 |
0 |
helm_id = self._k8sclusterhelm2.generate_kdu_instance_name( |
250 |
|
db_dict=db_dict, |
251 |
|
kdu_model=kdu_model, |
252 |
|
) |
253 |
0 |
await self._k8sclusterhelm2.install( |
254 |
|
system_cluster_uuid, |
255 |
|
kdu_model=kdu_model, |
256 |
|
kdu_instance=helm_id, |
257 |
|
namespace=self.vca_config.kubectl_osm_namespace, |
258 |
|
params=config, |
259 |
|
db_dict=db_dict, |
260 |
|
timeout=progress_timeout, |
261 |
|
) |
262 |
|
else: |
263 |
1 |
helm_id = self._k8sclusterhelm2.generate_kdu_instance_name( |
264 |
|
db_dict=db_dict, |
265 |
|
kdu_model=kdu_model, |
266 |
|
) |
267 |
1 |
await self._k8sclusterhelm3.install( |
268 |
|
system_cluster_uuid, |
269 |
|
kdu_model=kdu_model, |
270 |
|
kdu_instance=helm_id, |
271 |
|
namespace=self.vca_config.kubectl_osm_namespace, |
272 |
|
params=config, |
273 |
|
db_dict=db_dict, |
274 |
|
timeout=progress_timeout, |
275 |
|
) |
276 |
|
|
277 |
1 |
ee_id = "{}:{}.{}".format( |
278 |
|
vca_type, self.vca_config.kubectl_osm_namespace, helm_id |
279 |
|
) |
280 |
1 |
return ee_id, None |
281 |
0 |
except N2VCException: |
282 |
0 |
raise |
283 |
0 |
except Exception as e: |
284 |
0 |
self.log.error("Error deploying chart ee: {}".format(e), exc_info=True) |
285 |
0 |
raise N2VCException("Error deploying chart ee: {}".format(e)) |
286 |
|
|
287 |
1 |
async def upgrade_execution_environment( |
288 |
|
self, |
289 |
|
namespace: str, |
290 |
|
db_dict: dict, |
291 |
|
helm_id: str, |
292 |
|
progress_timeout: float = None, |
293 |
|
total_timeout: float = None, |
294 |
|
config: dict = None, |
295 |
|
artifact_path: str = None, |
296 |
|
vca_type: str = None, |
297 |
|
*kargs, |
298 |
|
**kwargs, |
299 |
|
) -> (str, dict): |
300 |
|
""" |
301 |
|
Creates a new helm execution environment deploying the helm-chat indicated in the |
302 |
|
attifact_path |
303 |
|
:param str namespace: This param is not used, all helm charts are deployed in the osm |
304 |
|
system namespace |
305 |
|
:param dict db_dict: where to write to database when the status changes. |
306 |
|
It contains a dictionary with {collection: str, filter: {}, path: str}, |
307 |
|
e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: |
308 |
|
"_admin.deployed.VCA.3"} |
309 |
|
:param helm_id: unique name of the Helm release to upgrade |
310 |
|
:param float progress_timeout: |
311 |
|
:param float total_timeout: |
312 |
|
:param dict config: General variables to instantiate KDU |
313 |
|
:param str artifact_path: path of package content |
314 |
|
:param str vca_type: Type of vca, must be type helm or helm-v3 |
315 |
|
:returns str, dict: id of the new execution environment including namespace.helm_id |
316 |
|
and credentials object set to None as all credentials should be osm kubernetes .kubeconfig |
317 |
|
""" |
318 |
|
|
319 |
0 |
self.log.info( |
320 |
|
"upgrade_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, " |
321 |
|
) |
322 |
|
|
323 |
|
# Validate helm_id is provided |
324 |
0 |
if helm_id is None or len(helm_id) == 0: |
325 |
0 |
raise N2VCBadArgumentsException( |
326 |
|
message="helm_id is mandatory", bad_args=["helm_id"] |
327 |
|
) |
328 |
|
|
329 |
|
# Validate artifact-path is provided |
330 |
0 |
if artifact_path is None or len(artifact_path) == 0: |
331 |
0 |
raise N2VCBadArgumentsException( |
332 |
|
message="artifact_path is mandatory", bad_args=["artifact_path"] |
333 |
|
) |
334 |
|
|
335 |
|
# Validate artifact-path exists and sync path |
336 |
0 |
from_path = os.path.split(artifact_path)[0] |
337 |
0 |
self.fs.sync(from_path) |
338 |
|
|
339 |
|
# remove / in charm path |
340 |
0 |
while artifact_path.find("//") >= 0: |
341 |
0 |
artifact_path = artifact_path.replace("//", "/") |
342 |
|
|
343 |
|
# check charm path |
344 |
0 |
if self.fs.file_exists(artifact_path): |
345 |
0 |
helm_chart_path = artifact_path |
346 |
|
else: |
347 |
0 |
msg = "artifact path does not exist: {}".format(artifact_path) |
348 |
0 |
raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"]) |
349 |
|
|
350 |
0 |
if artifact_path.startswith("/"): |
351 |
0 |
full_path = self.fs.path + helm_chart_path |
352 |
|
else: |
353 |
0 |
full_path = self.fs.path + "/" + helm_chart_path |
354 |
|
|
355 |
0 |
while full_path.find("//") >= 0: |
356 |
0 |
full_path = full_path.replace("//", "/") |
357 |
|
|
358 |
0 |
try: |
359 |
|
# Call helm conn upgrade |
360 |
|
# Obtain system cluster id from database |
361 |
0 |
system_cluster_uuid = await self._get_system_cluster_id() |
362 |
|
# Add parameter osm if exist to global |
363 |
0 |
if config and config.get("osm"): |
364 |
0 |
if not config.get("global"): |
365 |
0 |
config["global"] = {} |
366 |
0 |
config["global"]["osm"] = config.get("osm") |
367 |
|
|
368 |
0 |
self.log.debug("Ugrade helm chart: {}".format(full_path)) |
369 |
0 |
if vca_type == "helm": |
370 |
0 |
await self._k8sclusterhelm2.upgrade( |
371 |
|
system_cluster_uuid, |
372 |
|
kdu_model=full_path, |
373 |
|
kdu_instance=helm_id, |
374 |
|
namespace=namespace, |
375 |
|
params=config, |
376 |
|
db_dict=db_dict, |
377 |
|
timeout=progress_timeout, |
378 |
|
force=True, |
379 |
|
) |
380 |
|
else: |
381 |
0 |
await self._k8sclusterhelm3.upgrade( |
382 |
|
system_cluster_uuid, |
383 |
|
kdu_model=full_path, |
384 |
|
kdu_instance=helm_id, |
385 |
|
namespace=namespace, |
386 |
|
params=config, |
387 |
|
db_dict=db_dict, |
388 |
|
timeout=progress_timeout, |
389 |
|
force=True, |
390 |
|
) |
391 |
|
|
392 |
0 |
except N2VCException: |
393 |
0 |
raise |
394 |
0 |
except Exception as e: |
395 |
0 |
self.log.error("Error upgrading chart ee: {}".format(e), exc_info=True) |
396 |
0 |
raise N2VCException("Error upgrading chart ee: {}".format(e)) |
397 |
|
|
398 |
1 |
async def create_tls_certificate( |
399 |
|
self, |
400 |
|
nsr_id: str, |
401 |
|
secret_name: str, |
402 |
|
usage: str, |
403 |
|
dns_prefix: str, |
404 |
|
namespace: str = None, |
405 |
|
): |
406 |
|
# Obtain system cluster id from database |
407 |
0 |
system_cluster_uuid = await self._get_system_cluster_id() |
408 |
|
# use helm-v3 as certificates don't depend on helm version |
409 |
0 |
await self._k8sclusterhelm3.create_certificate( |
410 |
|
cluster_uuid=system_cluster_uuid, |
411 |
|
namespace=namespace or self.vca_config.kubectl_osm_namespace, |
412 |
|
dns_prefix=dns_prefix, |
413 |
|
name=nsr_id, |
414 |
|
secret_name=secret_name, |
415 |
|
usage=usage, |
416 |
|
) |
417 |
|
|
418 |
1 |
async def delete_tls_certificate( |
419 |
|
self, |
420 |
|
certificate_name: str = None, |
421 |
|
namespace: str = None, |
422 |
|
): |
423 |
|
# Obtain system cluster id from database |
424 |
0 |
system_cluster_uuid = await self._get_system_cluster_id() |
425 |
0 |
await self._k8sclusterhelm3.delete_certificate( |
426 |
|
cluster_uuid=system_cluster_uuid, |
427 |
|
namespace=namespace or self.vca_config.kubectl_osm_namespace, |
428 |
|
certificate_name=certificate_name, |
429 |
|
) |
430 |
|
|
431 |
1 |
async def register_execution_environment( |
432 |
|
self, |
433 |
|
namespace: str, |
434 |
|
credentials: dict, |
435 |
|
db_dict: dict, |
436 |
|
progress_timeout: float = None, |
437 |
|
total_timeout: float = None, |
438 |
|
*kargs, |
439 |
|
**kwargs, |
440 |
|
) -> str: |
441 |
|
# nothing to do |
442 |
0 |
pass |
443 |
|
|
444 |
1 |
async def install_configuration_sw(self, *args, **kwargs): |
445 |
|
# nothing to do |
446 |
0 |
pass |
447 |
|
|
448 |
1 |
async def add_relation(self, *args, **kwargs): |
449 |
|
# nothing to do |
450 |
0 |
pass |
451 |
|
|
452 |
1 |
async def remove_relation(self): |
453 |
|
# nothing to to |
454 |
0 |
pass |
455 |
|
|
456 |
1 |
async def get_status(self, *args, **kwargs): |
457 |
|
# not used for this connector |
458 |
0 |
pass |
459 |
|
|
460 |
1 |
async def get_ee_ssh_public__key( |
461 |
|
self, |
462 |
|
ee_id: str, |
463 |
|
db_dict: dict, |
464 |
|
progress_timeout: float = None, |
465 |
|
total_timeout: float = None, |
466 |
|
**kwargs, |
467 |
|
) -> str: |
468 |
|
""" |
469 |
|
Obtains ssh-public key from ee executing GetSShKey method from the ee. |
470 |
|
|
471 |
|
:param str ee_id: the id of the execution environment returned by |
472 |
|
create_execution_environment or register_execution_environment |
473 |
|
:param dict db_dict: |
474 |
|
:param float progress_timeout: |
475 |
|
:param float total_timeout: |
476 |
|
:returns: public key of the execution environment |
477 |
|
""" |
478 |
|
|
479 |
1 |
self.log.info( |
480 |
|
"get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id, db_dict) |
481 |
|
) |
482 |
|
|
483 |
|
# check arguments |
484 |
1 |
if ee_id is None or len(ee_id) == 0: |
485 |
0 |
raise N2VCBadArgumentsException( |
486 |
|
message="ee_id is mandatory", bad_args=["ee_id"] |
487 |
|
) |
488 |
|
|
489 |
1 |
try: |
490 |
|
# Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes |
491 |
1 |
version, namespace, helm_id = get_ee_id_parts(ee_id) |
492 |
1 |
ip_addr = "{}.{}.svc".format(helm_id, namespace) |
493 |
|
# Obtain ssh_key from the ee, this method will implement retries to allow the ee |
494 |
|
# install libraries and start successfully |
495 |
1 |
ssh_key = await self._get_ssh_key(ip_addr) |
496 |
1 |
return ssh_key |
497 |
0 |
except Exception as e: |
498 |
0 |
self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True) |
499 |
0 |
raise N2VCException("Error obtaining ee ssh_ke: {}".format(e)) |
500 |
|
|
501 |
1 |
async def upgrade_charm( |
502 |
|
self, |
503 |
|
ee_id: str = None, |
504 |
|
path: str = None, |
505 |
|
charm_id: str = None, |
506 |
|
charm_type: str = None, |
507 |
|
timeout: float = None, |
508 |
|
) -> str: |
509 |
|
"""This method upgrade charms in VNFs |
510 |
|
|
511 |
|
This method does not support KDU's deployed with Helm. |
512 |
|
|
513 |
|
Args: |
514 |
|
ee_id: Execution environment id |
515 |
|
path: Local path to the charm |
516 |
|
charm_id: charm-id |
517 |
|
charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm |
518 |
|
timeout: (Float) Timeout for the ns update operation |
519 |
|
|
520 |
|
Returns: |
521 |
|
the output of the update operation if status equals to "completed" |
522 |
|
|
523 |
|
""" |
524 |
0 |
raise N2VCException("KDUs deployed with Helm do not support charm upgrade") |
525 |
|
|
526 |
1 |
async def exec_primitive( |
527 |
|
self, |
528 |
|
ee_id: str, |
529 |
|
primitive_name: str, |
530 |
|
params_dict: dict, |
531 |
|
db_dict: dict = None, |
532 |
|
progress_timeout: float = None, |
533 |
|
total_timeout: float = None, |
534 |
|
**kwargs, |
535 |
|
) -> str: |
536 |
|
""" |
537 |
|
Execute a primitive in the execution environment |
538 |
|
|
539 |
|
:param str ee_id: the one returned by create_execution_environment or |
540 |
|
register_execution_environment with the format namespace.helm_id |
541 |
|
:param str primitive_name: must be one defined in the software. There is one |
542 |
|
called 'config', where, for the proxy case, the 'credentials' of VM are |
543 |
|
provided |
544 |
|
:param dict params_dict: parameters of the action |
545 |
|
:param dict db_dict: where to write into database when the status changes. |
546 |
|
It contains a dict with |
547 |
|
{collection: <str>, filter: {}, path: <str>}, |
548 |
|
e.g. {collection: "nslcmops", filter: |
549 |
|
{_id: <nslcmop_id>, path: "_admin.VCA"} |
550 |
|
It will be used to store information about intermediate notifications |
551 |
|
:param float progress_timeout: |
552 |
|
:param float total_timeout: |
553 |
|
:returns str: primitive result, if ok. It raises exceptions in case of fail |
554 |
|
""" |
555 |
|
|
556 |
1 |
self.log.info( |
557 |
|
"exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format( |
558 |
|
ee_id, primitive_name, params_dict, db_dict |
559 |
|
) |
560 |
|
) |
561 |
|
|
562 |
|
# check arguments |
563 |
1 |
if ee_id is None or len(ee_id) == 0: |
564 |
0 |
raise N2VCBadArgumentsException( |
565 |
|
message="ee_id is mandatory", bad_args=["ee_id"] |
566 |
|
) |
567 |
1 |
if primitive_name is None or len(primitive_name) == 0: |
568 |
0 |
raise N2VCBadArgumentsException( |
569 |
|
message="action_name is mandatory", bad_args=["action_name"] |
570 |
|
) |
571 |
1 |
if params_dict is None: |
572 |
0 |
params_dict = dict() |
573 |
|
|
574 |
1 |
try: |
575 |
1 |
version, namespace, helm_id = get_ee_id_parts(ee_id) |
576 |
1 |
ip_addr = "{}.{}.svc".format(helm_id, namespace) |
577 |
0 |
except Exception as e: |
578 |
0 |
self.log.error("Error getting ee ip ee: {}".format(e)) |
579 |
0 |
raise N2VCException("Error getting ee ip ee: {}".format(e)) |
580 |
|
|
581 |
1 |
if primitive_name == "config": |
582 |
1 |
try: |
583 |
|
# Execute config primitive, higher timeout to check the case ee is starting |
584 |
1 |
status, detailed_message = await self._execute_config_primitive( |
585 |
|
ip_addr, params_dict, db_dict=db_dict |
586 |
|
) |
587 |
1 |
self.log.debug( |
588 |
|
"Executed config primitive ee_id_ {}, status: {}, message: {}".format( |
589 |
|
ee_id, status, detailed_message |
590 |
|
) |
591 |
|
) |
592 |
1 |
if status != "OK": |
593 |
0 |
self.log.error( |
594 |
|
"Error configuring helm ee, status: {}, message: {}".format( |
595 |
|
status, detailed_message |
596 |
|
) |
597 |
|
) |
598 |
0 |
raise N2VCExecutionException( |
599 |
|
message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format( |
600 |
|
ee_id, status, detailed_message |
601 |
|
), |
602 |
|
primitive_name=primitive_name, |
603 |
|
) |
604 |
0 |
except Exception as e: |
605 |
0 |
self.log.error("Error configuring helm ee: {}".format(e)) |
606 |
0 |
raise N2VCExecutionException( |
607 |
|
message="Error configuring helm ee_id: {}, {}".format(ee_id, e), |
608 |
|
primitive_name=primitive_name, |
609 |
|
) |
610 |
1 |
return "CONFIG OK" |
611 |
|
else: |
612 |
1 |
try: |
613 |
|
# Execute primitive |
614 |
1 |
status, detailed_message = await self._execute_primitive( |
615 |
|
ip_addr, primitive_name, params_dict, db_dict=db_dict |
616 |
|
) |
617 |
1 |
self.log.debug( |
618 |
|
"Executed primitive {} ee_id_ {}, status: {}, message: {}".format( |
619 |
|
primitive_name, ee_id, status, detailed_message |
620 |
|
) |
621 |
|
) |
622 |
1 |
if status != "OK" and status != "PROCESSING": |
623 |
0 |
self.log.error( |
624 |
|
"Execute primitive {} returned not ok status: {}, message: {}".format( |
625 |
|
primitive_name, status, detailed_message |
626 |
|
) |
627 |
|
) |
628 |
0 |
raise N2VCExecutionException( |
629 |
|
message="Execute primitive {} returned not ok status: {}, message: {}".format( |
630 |
|
primitive_name, status, detailed_message |
631 |
|
), |
632 |
|
primitive_name=primitive_name, |
633 |
|
) |
634 |
0 |
except Exception as e: |
635 |
0 |
self.log.error( |
636 |
|
"Error executing primitive {}: {}".format(primitive_name, e) |
637 |
|
) |
638 |
0 |
raise N2VCExecutionException( |
639 |
|
message="Error executing primitive {} into ee={} : {}".format( |
640 |
|
primitive_name, ee_id, e |
641 |
|
), |
642 |
|
primitive_name=primitive_name, |
643 |
|
) |
644 |
1 |
return detailed_message |
645 |
|
|
646 |
1 |
async def deregister_execution_environments(self): |
647 |
|
# nothing to be done |
648 |
0 |
pass |
649 |
|
|
650 |
1 |
async def delete_execution_environment( |
651 |
|
self, |
652 |
|
ee_id: str, |
653 |
|
db_dict: dict = None, |
654 |
|
total_timeout: float = None, |
655 |
|
**kwargs, |
656 |
|
): |
657 |
|
""" |
658 |
|
Delete an execution environment |
659 |
|
:param str ee_id: id of the execution environment to delete, included namespace.helm_id |
660 |
|
:param dict db_dict: where to write into database when the status changes. |
661 |
|
It contains a dict with |
662 |
|
{collection: <str>, filter: {}, path: <str>}, |
663 |
|
e.g. {collection: "nsrs", filter: |
664 |
|
{_id: <nsd-id>, path: "_admin.deployed.VCA.3"} |
665 |
|
:param float total_timeout: |
666 |
|
""" |
667 |
|
|
668 |
1 |
self.log.info("ee_id: {}".format(ee_id)) |
669 |
|
|
670 |
|
# check arguments |
671 |
1 |
if ee_id is None: |
672 |
0 |
raise N2VCBadArgumentsException( |
673 |
|
message="ee_id is mandatory", bad_args=["ee_id"] |
674 |
|
) |
675 |
|
|
676 |
1 |
try: |
677 |
|
# Obtain cluster_uuid |
678 |
1 |
system_cluster_uuid = await self._get_system_cluster_id() |
679 |
|
|
680 |
|
# Get helm_id |
681 |
1 |
version, namespace, helm_id = get_ee_id_parts(ee_id) |
682 |
|
|
683 |
|
# Uninstall chart, for backward compatibility we must assume that if there is no |
684 |
|
# version it is helm-v2 |
685 |
1 |
if version == "helm-v3": |
686 |
1 |
await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id) |
687 |
|
else: |
688 |
0 |
await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id) |
689 |
1 |
self.log.info("ee_id: {} deleted".format(ee_id)) |
690 |
0 |
except N2VCException: |
691 |
0 |
raise |
692 |
0 |
except Exception as e: |
693 |
0 |
self.log.error( |
694 |
|
"Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True |
695 |
|
) |
696 |
0 |
raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e)) |
697 |
|
|
698 |
1 |
async def delete_namespace( |
699 |
|
self, namespace: str, db_dict: dict = None, total_timeout: float = None |
700 |
|
): |
701 |
|
# method not implemented for this connector, execution environments must be deleted individually |
702 |
0 |
pass |
703 |
|
|
704 |
1 |
async def install_k8s_proxy_charm( |
705 |
|
self, |
706 |
|
charm_name: str, |
707 |
|
namespace: str, |
708 |
|
artifact_path: str, |
709 |
|
db_dict: dict, |
710 |
|
progress_timeout: float = None, |
711 |
|
total_timeout: float = None, |
712 |
|
config: dict = None, |
713 |
|
*kargs, |
714 |
|
**kwargs, |
715 |
|
) -> str: |
716 |
0 |
pass |
717 |
|
|
718 |
1 |
@retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay") |
719 |
1 |
async def _get_ssh_key(self, ip_addr): |
720 |
0 |
return await self._execute_primitive_internal( |
721 |
|
ip_addr, |
722 |
|
"_get_ssh_key", |
723 |
|
None, |
724 |
|
) |
725 |
|
|
726 |
1 |
@retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay") |
727 |
1 |
async def _execute_config_primitive(self, ip_addr, params, db_dict=None): |
728 |
1 |
return await self._execute_primitive_internal( |
729 |
|
ip_addr, "config", params, db_dict=db_dict |
730 |
|
) |
731 |
|
|
732 |
1 |
@retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay") |
733 |
1 |
async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None): |
734 |
1 |
return await self._execute_primitive_internal( |
735 |
|
ip_addr, primitive_name, params, db_dict=db_dict |
736 |
|
) |
737 |
|
|
738 |
1 |
async def _execute_primitive_internal( |
739 |
|
self, ip_addr, primitive_name, params, db_dict=None |
740 |
|
): |
741 |
0 |
async def execute(): |
742 |
0 |
stub = FrontendExecutorStub(channel) |
743 |
0 |
if primitive_name == "_get_ssh_key": |
744 |
0 |
self.log.debug("get ssh key, ip_addr: {}".format(ip_addr)) |
745 |
0 |
reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest()) |
746 |
0 |
return reply.message |
747 |
|
# For any other primitives |
748 |
0 |
async with stub.RunPrimitive.open() as stream: |
749 |
0 |
primitive_id = str(uuid.uuid1()) |
750 |
0 |
result = None |
751 |
0 |
self.log.debug( |
752 |
|
"Execute primitive internal: id:{}, name:{}, params: {}".format( |
753 |
|
primitive_id, primitive_name, params |
754 |
|
) |
755 |
|
) |
756 |
0 |
await stream.send_message( |
757 |
|
PrimitiveRequest( |
758 |
|
id=primitive_id, name=primitive_name, params=yaml.dump(params) |
759 |
|
), |
760 |
|
end=True, |
761 |
|
) |
762 |
0 |
async for reply in stream: |
763 |
0 |
self.log.debug("Received reply: {}".format(reply)) |
764 |
0 |
result = reply |
765 |
|
# If db_dict provided write notifs in database |
766 |
0 |
if db_dict: |
767 |
0 |
self._write_op_detailed_status( |
768 |
|
db_dict, reply.status, reply.detailed_message |
769 |
|
) |
770 |
0 |
if result: |
771 |
0 |
return reply.status, reply.detailed_message |
772 |
|
else: |
773 |
0 |
return "ERROR", "No result received" |
774 |
|
|
775 |
0 |
ssl_context = create_secure_context(self.vca_config.ca_store) |
776 |
0 |
channel = Channel( |
777 |
|
ip_addr, self.vca_config.helm_ee_service_port, ssl=ssl_context |
778 |
|
) |
779 |
0 |
try: |
780 |
0 |
return await execute() |
781 |
0 |
except ssl.SSLError as ssl_error: # fallback to insecure gRPC |
782 |
0 |
if ( |
783 |
|
ssl_error.reason == "WRONG_VERSION_NUMBER" |
784 |
|
and not self.vca_config.eegrpc_tls_enforce |
785 |
|
): |
786 |
0 |
self.log.debug( |
787 |
|
"Execution environment doesn't support TLS, falling back to unsecure gRPC" |
788 |
|
) |
789 |
0 |
channel = Channel(ip_addr, self.vca_config.helm_ee_service_port) |
790 |
0 |
return await execute() |
791 |
0 |
elif ssl_error.reason == "WRONG_VERSION_NUMBER": |
792 |
0 |
raise N2VCException( |
793 |
|
"Execution environment doesn't support TLS, primitives cannot be executed" |
794 |
|
) |
795 |
|
else: |
796 |
0 |
raise |
797 |
|
finally: |
798 |
0 |
channel.close() |
799 |
|
|
800 |
1 |
def _write_op_detailed_status(self, db_dict, status, detailed_message): |
801 |
|
# write ee_id to database: _admin.deployed.VCA.x |
802 |
0 |
try: |
803 |
0 |
the_table = db_dict["collection"] |
804 |
0 |
the_filter = db_dict["filter"] |
805 |
0 |
update_dict = {"detailed-status": "{}: {}".format(status, detailed_message)} |
806 |
|
# self.log.debug('Writing ee_id to database: {}'.format(the_path)) |
807 |
0 |
self.db.set_one( |
808 |
|
table=the_table, |
809 |
|
q_filter=the_filter, |
810 |
|
update_dict=update_dict, |
811 |
|
fail_on_empty=True, |
812 |
|
) |
813 |
0 |
except asyncio.CancelledError: |
814 |
0 |
raise |
815 |
0 |
except Exception as e: |
816 |
0 |
self.log.error("Error writing detailedStatus to database: {}".format(e)) |
817 |
|
|
818 |
1 |
async def _get_system_cluster_id(self): |
819 |
1 |
if not self._system_cluster_id: |
820 |
1 |
db_k8cluster = self.db.get_one( |
821 |
|
"k8sclusters", {"name": self.vca_config.kubectl_osm_cluster_name} |
822 |
|
) |
823 |
1 |
k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id")) |
824 |
1 |
if not k8s_hc_id: |
825 |
0 |
try: |
826 |
|
# backward compatibility for existing clusters that have not been initialized for helm v3 |
827 |
0 |
cluster_id = db_k8cluster.get("_id") |
828 |
0 |
k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials")) |
829 |
0 |
k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env( |
830 |
|
k8s_credentials, reuse_cluster_uuid=cluster_id |
831 |
|
) |
832 |
0 |
db_k8scluster_update = { |
833 |
|
"_admin.helm-chart-v3.error_msg": None, |
834 |
|
"_admin.helm-chart-v3.id": k8s_hc_id, |
835 |
|
"_admin.helm-chart-v3}.created": uninstall_sw, |
836 |
|
"_admin.helm-chart-v3.operationalState": "ENABLED", |
837 |
|
} |
838 |
0 |
self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update) |
839 |
0 |
except Exception as e: |
840 |
0 |
self.log.error( |
841 |
|
"error initializing helm-v3 cluster: {}".format(str(e)) |
842 |
|
) |
843 |
0 |
raise N2VCException( |
844 |
|
"K8s system cluster '{}' has not been initialized for helm-chart-v3".format( |
845 |
|
cluster_id |
846 |
|
) |
847 |
|
) |
848 |
1 |
self._system_cluster_id = k8s_hc_id |
849 |
1 |
return self._system_cluster_id |