Ubuntu 22.04 and Python 3.10 preparation
[osm/LCM.git] / osm_lcm / lcm_helm_conn.py
1 ##
2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 ##
18 import functools
19 import yaml
20 import asyncio
21 import uuid
22 import os
23 import ssl
24
25 from grpclib.client import Channel
26
27 from osm_lcm.data_utils.lcm_config import VcaConfig
28 from osm_lcm.frontend_pb2 import PrimitiveRequest
29 from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply
30 from osm_lcm.frontend_grpc import FrontendExecutorStub
31 from osm_lcm.lcm_utils import LcmBase, get_ee_id_parts
32
33 from osm_lcm.data_utils.database.database import Database
34 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
35
36 from n2vc.n2vc_conn import N2VCConnector
37 from n2vc.k8s_helm_conn import K8sHelmConnector
38 from n2vc.k8s_helm3_conn import K8sHelm3Connector
39 from n2vc.exceptions import (
40 N2VCBadArgumentsException,
41 N2VCException,
42 N2VCExecutionException,
43 )
44
45 from osm_lcm.lcm_utils import deep_get
46
47
48 def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"):
49 def wrapper(func):
50 retry_exceptions = (ConnectionRefusedError, TimeoutError)
51
52 @functools.wraps(func)
53 async def wrapped(*args, **kwargs):
54 # default values for wait time and delay_time
55 delay_time = 10
56 max_wait_time = 300
57
58 # obtain arguments from variable names
59 self = args[0]
60 if self.__dict__.get(max_wait_time_var):
61 max_wait_time = self.__dict__.get(max_wait_time_var)
62 if self.__dict__.get(delay_time_var):
63 delay_time = self.__dict__.get(delay_time_var)
64
65 wait_time = max_wait_time
66 while wait_time > 0:
67 try:
68 return await func(*args, **kwargs)
69 except retry_exceptions:
70 wait_time = wait_time - delay_time
71 await asyncio.sleep(delay_time)
72 continue
73 else:
74 return ConnectionRefusedError
75
76 return wrapped
77
78 return wrapper
79
80
81 def create_secure_context(
82 trusted: str,
83 ) -> ssl.SSLContext:
84 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
85 ctx.verify_mode = ssl.CERT_REQUIRED
86 ctx.check_hostname = True
87 ctx.minimum_version = ssl.TLSVersion.TLSv1_2
88 # TODO: client TLS
89 # ctx.load_cert_chain(str(client_cert), str(client_key))
90 ctx.load_verify_locations(trusted)
91 ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20")
92 ctx.set_alpn_protocols(["h2"])
93 return ctx
94
95
96 class LCMHelmConn(N2VCConnector, LcmBase):
97 def __init__(
98 self,
99 log: object = None,
100 vca_config: VcaConfig = None,
101 on_update_db=None,
102 ):
103 """
104 Initialize EE helm connector.
105 """
106
107 self.db = Database().instance.db
108 self.fs = Filesystem().instance.fs
109
110 # parent class constructor
111 N2VCConnector.__init__(
112 self, log=log, on_update_db=on_update_db, db=self.db, fs=self.fs
113 )
114
115 self.vca_config = vca_config
116 self.log.debug("Initialize helm N2VC connector")
117 self.log.debug("initial vca_config: {}".format(vca_config.to_dict()))
118
119 self._retry_delay = self.vca_config.helm_ee_retry_delay
120
121 self._initial_retry_time = self.vca_config.helm_max_initial_retry_time
122 self.log.debug("Initial retry time: {}".format(self._initial_retry_time))
123
124 self._max_retry_time = self.vca_config.helm_max_retry_time
125 self.log.debug("Retry time: {}".format(self._max_retry_time))
126
127 # initialize helm connector for helmv2 and helmv3
128 self._k8sclusterhelm2 = K8sHelmConnector(
129 kubectl_command=self.vca_config.kubectlpath,
130 helm_command=self.vca_config.helmpath,
131 fs=self.fs,
132 db=self.db,
133 log=self.log,
134 on_update_db=None,
135 )
136
137 self._k8sclusterhelm3 = K8sHelm3Connector(
138 kubectl_command=self.vca_config.kubectlpath,
139 helm_command=self.vca_config.helm3path,
140 fs=self.fs,
141 log=self.log,
142 db=self.db,
143 on_update_db=None,
144 )
145
146 self._system_cluster_id = None
147 self.log.info("Helm N2VC connector initialized")
148
149 # TODO - ¿reuse_ee_id?
150 async def create_execution_environment(
151 self,
152 namespace: str,
153 db_dict: dict,
154 reuse_ee_id: str = None,
155 progress_timeout: float = None,
156 total_timeout: float = None,
157 config: dict = None,
158 artifact_path: str = None,
159 chart_model: str = None,
160 vca_type: str = None,
161 *kargs,
162 **kwargs,
163 ) -> (str, dict):
164 """
165 Creates a new helm execution environment deploying the helm-chat indicated in the
166 artifact_path
167 :param str namespace: This param is not used, all helm charts are deployed in the osm
168 system namespace
169 :param dict db_dict: where to write to database when the status changes.
170 It contains a dictionary with {collection: str, filter: {}, path: str},
171 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
172 "_admin.deployed.VCA.3"}
173 :param str reuse_ee_id: ee id from an older execution. TODO - right now this param is not used
174 :param float progress_timeout:
175 :param float total_timeout:
176 :param dict config: General variables to instantiate KDU
177 :param str artifact_path: path of package content
178 :param str chart_model: helm chart/reference (string), which can be either
179 of these options:
180 - a name of chart available via the repos known by OSM
181 (e.g. stable/openldap, stable/openldap:1.2.4)
182 - a path to a packaged chart (e.g. mychart.tgz)
183 - a path to an unpacked chart directory or a URL (e.g. mychart)
184 :param str vca_type: Type of vca, must be type helm or helm-v3
185 :returns str, dict: id of the new execution environment including namespace.helm_id
186 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
187 """
188
189 self.log.info(
190 "create_execution_environment: namespace: {}, artifact_path: {}, "
191 "chart_model: {}, db_dict: {}, reuse_ee_id: {}".format(
192 namespace, artifact_path, db_dict, chart_model, reuse_ee_id
193 )
194 )
195
196 # Validate artifact-path is provided
197 if artifact_path is None or len(artifact_path) == 0:
198 raise N2VCBadArgumentsException(
199 message="artifact_path is mandatory", bad_args=["artifact_path"]
200 )
201
202 # Validate artifact-path exists and sync path
203 from_path = os.path.split(artifact_path)[0]
204 self.fs.sync(from_path)
205
206 # remove / in charm path
207 while artifact_path.find("//") >= 0:
208 artifact_path = artifact_path.replace("//", "/")
209
210 # check charm path
211 if self.fs.file_exists(artifact_path):
212 helm_chart_path = artifact_path
213 else:
214 msg = "artifact path does not exist: {}".format(artifact_path)
215 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
216
217 if artifact_path.startswith("/"):
218 full_path = self.fs.path + helm_chart_path
219 else:
220 full_path = self.fs.path + "/" + helm_chart_path
221
222 while full_path.find("//") >= 0:
223 full_path = full_path.replace("//", "/")
224
225 # By default, the KDU is expected to be a file
226 kdu_model = full_path
227 # If the chart_model includes a "/", then it is a reference:
228 # e.g. (stable/openldap; stable/openldap:1.2.4)
229 if chart_model.find("/") >= 0:
230 kdu_model = chart_model
231
232 try:
233 # Call helm conn install
234 # Obtain system cluster id from database
235 system_cluster_uuid = await self._get_system_cluster_id()
236 # Add parameter osm if exist to global
237 if config and config.get("osm"):
238 if not config.get("global"):
239 config["global"] = {}
240 config["global"]["osm"] = config.get("osm")
241
242 self.log.debug("install helm chart: {}".format(full_path))
243 if vca_type == "helm":
244 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
245 db_dict=db_dict,
246 kdu_model=kdu_model,
247 )
248 await self._k8sclusterhelm2.install(
249 system_cluster_uuid,
250 kdu_model=kdu_model,
251 kdu_instance=helm_id,
252 namespace=self.vca_config.kubectl_osm_namespace,
253 params=config,
254 db_dict=db_dict,
255 timeout=progress_timeout,
256 )
257 else:
258 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
259 db_dict=db_dict,
260 kdu_model=kdu_model,
261 )
262 await self._k8sclusterhelm3.install(
263 system_cluster_uuid,
264 kdu_model=kdu_model,
265 kdu_instance=helm_id,
266 namespace=self.vca_config.kubectl_osm_namespace,
267 params=config,
268 db_dict=db_dict,
269 timeout=progress_timeout,
270 )
271
272 ee_id = "{}:{}.{}".format(
273 vca_type, self.vca_config.kubectl_osm_namespace, helm_id
274 )
275 return ee_id, None
276 except N2VCException:
277 raise
278 except Exception as e:
279 self.log.error("Error deploying chart ee: {}".format(e), exc_info=True)
280 raise N2VCException("Error deploying chart ee: {}".format(e))
281
282 async def upgrade_execution_environment(
283 self,
284 namespace: str,
285 db_dict: dict,
286 helm_id: str,
287 progress_timeout: float = None,
288 total_timeout: float = None,
289 config: dict = None,
290 artifact_path: str = None,
291 vca_type: str = None,
292 *kargs,
293 **kwargs,
294 ) -> (str, dict):
295 """
296 Creates a new helm execution environment deploying the helm-chat indicated in the
297 attifact_path
298 :param str namespace: This param is not used, all helm charts are deployed in the osm
299 system namespace
300 :param dict db_dict: where to write to database when the status changes.
301 It contains a dictionary with {collection: str, filter: {}, path: str},
302 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
303 "_admin.deployed.VCA.3"}
304 :param helm_id: unique name of the Helm release to upgrade
305 :param float progress_timeout:
306 :param float total_timeout:
307 :param dict config: General variables to instantiate KDU
308 :param str artifact_path: path of package content
309 :param str vca_type: Type of vca, must be type helm or helm-v3
310 :returns str, dict: id of the new execution environment including namespace.helm_id
311 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
312 """
313
314 self.log.info(
315 "upgrade_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
316 )
317
318 # Validate helm_id is provided
319 if helm_id is None or len(helm_id) == 0:
320 raise N2VCBadArgumentsException(
321 message="helm_id is mandatory", bad_args=["helm_id"]
322 )
323
324 # Validate artifact-path is provided
325 if artifact_path is None or len(artifact_path) == 0:
326 raise N2VCBadArgumentsException(
327 message="artifact_path is mandatory", bad_args=["artifact_path"]
328 )
329
330 # Validate artifact-path exists and sync path
331 from_path = os.path.split(artifact_path)[0]
332 self.fs.sync(from_path)
333
334 # remove / in charm path
335 while artifact_path.find("//") >= 0:
336 artifact_path = artifact_path.replace("//", "/")
337
338 # check charm path
339 if self.fs.file_exists(artifact_path):
340 helm_chart_path = artifact_path
341 else:
342 msg = "artifact path does not exist: {}".format(artifact_path)
343 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
344
345 if artifact_path.startswith("/"):
346 full_path = self.fs.path + helm_chart_path
347 else:
348 full_path = self.fs.path + "/" + helm_chart_path
349
350 while full_path.find("//") >= 0:
351 full_path = full_path.replace("//", "/")
352
353 try:
354 # Call helm conn upgrade
355 # Obtain system cluster id from database
356 system_cluster_uuid = await self._get_system_cluster_id()
357 # Add parameter osm if exist to global
358 if config and config.get("osm"):
359 if not config.get("global"):
360 config["global"] = {}
361 config["global"]["osm"] = config.get("osm")
362
363 self.log.debug("Ugrade helm chart: {}".format(full_path))
364 if vca_type == "helm":
365 await self._k8sclusterhelm2.upgrade(
366 system_cluster_uuid,
367 kdu_model=full_path,
368 kdu_instance=helm_id,
369 namespace=namespace,
370 params=config,
371 db_dict=db_dict,
372 timeout=progress_timeout,
373 force=True,
374 )
375 else:
376 await self._k8sclusterhelm3.upgrade(
377 system_cluster_uuid,
378 kdu_model=full_path,
379 kdu_instance=helm_id,
380 namespace=namespace,
381 params=config,
382 db_dict=db_dict,
383 timeout=progress_timeout,
384 force=True,
385 )
386
387 except N2VCException:
388 raise
389 except Exception as e:
390 self.log.error("Error upgrading chart ee: {}".format(e), exc_info=True)
391 raise N2VCException("Error upgrading chart ee: {}".format(e))
392
393 async def create_tls_certificate(
394 self,
395 nsr_id: str,
396 secret_name: str,
397 usage: str,
398 dns_prefix: str,
399 namespace: str = None,
400 ):
401 # Obtain system cluster id from database
402 system_cluster_uuid = await self._get_system_cluster_id()
403 # use helm-v3 as certificates don't depend on helm version
404 await self._k8sclusterhelm3.create_certificate(
405 cluster_uuid=system_cluster_uuid,
406 namespace=namespace or self.vca_config.kubectl_osm_namespace,
407 dns_prefix=dns_prefix,
408 name=nsr_id,
409 secret_name=secret_name,
410 usage=usage,
411 )
412
413 async def delete_tls_certificate(
414 self,
415 certificate_name: str = None,
416 namespace: str = None,
417 ):
418 # Obtain system cluster id from database
419 system_cluster_uuid = await self._get_system_cluster_id()
420 await self._k8sclusterhelm3.delete_certificate(
421 cluster_uuid=system_cluster_uuid,
422 namespace=namespace or self.vca_config.kubectl_osm_namespace,
423 certificate_name=certificate_name,
424 )
425
426 async def register_execution_environment(
427 self,
428 namespace: str,
429 credentials: dict,
430 db_dict: dict,
431 progress_timeout: float = None,
432 total_timeout: float = None,
433 *kargs,
434 **kwargs,
435 ) -> str:
436 # nothing to do
437 pass
438
439 async def install_configuration_sw(self, *args, **kwargs):
440 # nothing to do
441 pass
442
443 async def add_relation(self, *args, **kwargs):
444 # nothing to do
445 pass
446
447 async def remove_relation(self):
448 # nothing to to
449 pass
450
451 async def get_status(self, *args, **kwargs):
452 # not used for this connector
453 pass
454
455 async def get_ee_ssh_public__key(
456 self,
457 ee_id: str,
458 db_dict: dict,
459 progress_timeout: float = None,
460 total_timeout: float = None,
461 **kwargs,
462 ) -> str:
463 """
464 Obtains ssh-public key from ee executing GetSShKey method from the ee.
465
466 :param str ee_id: the id of the execution environment returned by
467 create_execution_environment or register_execution_environment
468 :param dict db_dict:
469 :param float progress_timeout:
470 :param float total_timeout:
471 :returns: public key of the execution environment
472 """
473
474 self.log.info(
475 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id, db_dict)
476 )
477
478 # check arguments
479 if ee_id is None or len(ee_id) == 0:
480 raise N2VCBadArgumentsException(
481 message="ee_id is mandatory", bad_args=["ee_id"]
482 )
483
484 try:
485 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
486 version, namespace, helm_id = get_ee_id_parts(ee_id)
487 ip_addr = "{}.{}.svc".format(helm_id, namespace)
488 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
489 # install libraries and start successfully
490 ssh_key = await self._get_ssh_key(ip_addr)
491 return ssh_key
492 except Exception as e:
493 self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True)
494 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e))
495
496 async def upgrade_charm(
497 self,
498 ee_id: str = None,
499 path: str = None,
500 charm_id: str = None,
501 charm_type: str = None,
502 timeout: float = None,
503 ) -> str:
504 """This method upgrade charms in VNFs
505
506 This method does not support KDU's deployed with Helm.
507
508 Args:
509 ee_id: Execution environment id
510 path: Local path to the charm
511 charm_id: charm-id
512 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
513 timeout: (Float) Timeout for the ns update operation
514
515 Returns:
516 the output of the update operation if status equals to "completed"
517
518 """
519 raise N2VCException("KDUs deployed with Helm do not support charm upgrade")
520
521 async def exec_primitive(
522 self,
523 ee_id: str,
524 primitive_name: str,
525 params_dict: dict,
526 db_dict: dict = None,
527 progress_timeout: float = None,
528 total_timeout: float = None,
529 **kwargs,
530 ) -> str:
531 """
532 Execute a primitive in the execution environment
533
534 :param str ee_id: the one returned by create_execution_environment or
535 register_execution_environment with the format namespace.helm_id
536 :param str primitive_name: must be one defined in the software. There is one
537 called 'config', where, for the proxy case, the 'credentials' of VM are
538 provided
539 :param dict params_dict: parameters of the action
540 :param dict db_dict: where to write into database when the status changes.
541 It contains a dict with
542 {collection: <str>, filter: {}, path: <str>},
543 e.g. {collection: "nslcmops", filter:
544 {_id: <nslcmop_id>, path: "_admin.VCA"}
545 It will be used to store information about intermediate notifications
546 :param float progress_timeout:
547 :param float total_timeout:
548 :returns str: primitive result, if ok. It raises exceptions in case of fail
549 """
550
551 self.log.info(
552 "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
553 ee_id, primitive_name, params_dict, db_dict
554 )
555 )
556
557 # check arguments
558 if ee_id is None or len(ee_id) == 0:
559 raise N2VCBadArgumentsException(
560 message="ee_id is mandatory", bad_args=["ee_id"]
561 )
562 if primitive_name is None or len(primitive_name) == 0:
563 raise N2VCBadArgumentsException(
564 message="action_name is mandatory", bad_args=["action_name"]
565 )
566 if params_dict is None:
567 params_dict = dict()
568
569 try:
570 version, namespace, helm_id = get_ee_id_parts(ee_id)
571 ip_addr = "{}.{}.svc".format(helm_id, namespace)
572 except Exception as e:
573 self.log.error("Error getting ee ip ee: {}".format(e))
574 raise N2VCException("Error getting ee ip ee: {}".format(e))
575
576 if primitive_name == "config":
577 try:
578 # Execute config primitive, higher timeout to check the case ee is starting
579 status, detailed_message = await self._execute_config_primitive(
580 ip_addr, params_dict, db_dict=db_dict
581 )
582 self.log.debug(
583 "Executed config primitive ee_id_ {}, status: {}, message: {}".format(
584 ee_id, status, detailed_message
585 )
586 )
587 if status != "OK":
588 self.log.error(
589 "Error configuring helm ee, status: {}, message: {}".format(
590 status, detailed_message
591 )
592 )
593 raise N2VCExecutionException(
594 message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
595 ee_id, status, detailed_message
596 ),
597 primitive_name=primitive_name,
598 )
599 except Exception as e:
600 self.log.error("Error configuring helm ee: {}".format(e))
601 raise N2VCExecutionException(
602 message="Error configuring helm ee_id: {}, {}".format(ee_id, e),
603 primitive_name=primitive_name,
604 )
605 return "CONFIG OK"
606 else:
607 try:
608 # Execute primitive
609 status, detailed_message = await self._execute_primitive(
610 ip_addr, primitive_name, params_dict, db_dict=db_dict
611 )
612 self.log.debug(
613 "Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
614 primitive_name, ee_id, status, detailed_message
615 )
616 )
617 if status != "OK" and status != "PROCESSING":
618 self.log.error(
619 "Execute primitive {} returned not ok status: {}, message: {}".format(
620 primitive_name, status, detailed_message
621 )
622 )
623 raise N2VCExecutionException(
624 message="Execute primitive {} returned not ok status: {}, message: {}".format(
625 primitive_name, status, detailed_message
626 ),
627 primitive_name=primitive_name,
628 )
629 except Exception as e:
630 self.log.error(
631 "Error executing primitive {}: {}".format(primitive_name, e)
632 )
633 raise N2VCExecutionException(
634 message="Error executing primitive {} into ee={} : {}".format(
635 primitive_name, ee_id, e
636 ),
637 primitive_name=primitive_name,
638 )
639 return detailed_message
640
641 async def deregister_execution_environments(self):
642 # nothing to be done
643 pass
644
645 async def delete_execution_environment(
646 self,
647 ee_id: str,
648 db_dict: dict = None,
649 total_timeout: float = None,
650 **kwargs,
651 ):
652 """
653 Delete an execution environment
654 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
655 :param dict db_dict: where to write into database when the status changes.
656 It contains a dict with
657 {collection: <str>, filter: {}, path: <str>},
658 e.g. {collection: "nsrs", filter:
659 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
660 :param float total_timeout:
661 """
662
663 self.log.info("ee_id: {}".format(ee_id))
664
665 # check arguments
666 if ee_id is None:
667 raise N2VCBadArgumentsException(
668 message="ee_id is mandatory", bad_args=["ee_id"]
669 )
670
671 try:
672 # Obtain cluster_uuid
673 system_cluster_uuid = await self._get_system_cluster_id()
674
675 # Get helm_id
676 version, namespace, helm_id = get_ee_id_parts(ee_id)
677
678 # Uninstall chart, for backward compatibility we must assume that if there is no
679 # version it is helm-v2
680 if version == "helm-v3":
681 await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id)
682 else:
683 await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id)
684 self.log.info("ee_id: {} deleted".format(ee_id))
685 except N2VCException:
686 raise
687 except Exception as e:
688 self.log.error(
689 "Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True
690 )
691 raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e))
692
693 async def delete_namespace(
694 self, namespace: str, db_dict: dict = None, total_timeout: float = None
695 ):
696 # method not implemented for this connector, execution environments must be deleted individually
697 pass
698
699 async def install_k8s_proxy_charm(
700 self,
701 charm_name: str,
702 namespace: str,
703 artifact_path: str,
704 db_dict: dict,
705 progress_timeout: float = None,
706 total_timeout: float = None,
707 config: dict = None,
708 *kargs,
709 **kwargs,
710 ) -> str:
711 pass
712
713 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
714 async def _get_ssh_key(self, ip_addr):
715 return await self._execute_primitive_internal(
716 ip_addr,
717 "_get_ssh_key",
718 None,
719 )
720
721 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
722 async def _execute_config_primitive(self, ip_addr, params, db_dict=None):
723 return await self._execute_primitive_internal(
724 ip_addr, "config", params, db_dict=db_dict
725 )
726
727 @retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay")
728 async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None):
729 return await self._execute_primitive_internal(
730 ip_addr, primitive_name, params, db_dict=db_dict
731 )
732
733 async def _execute_primitive_internal(
734 self, ip_addr, primitive_name, params, db_dict=None
735 ):
736 async def execute():
737 stub = FrontendExecutorStub(channel)
738 if primitive_name == "_get_ssh_key":
739 self.log.debug("get ssh key, ip_addr: {}".format(ip_addr))
740 reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest())
741 return reply.message
742 # For any other primitives
743 async with stub.RunPrimitive.open() as stream:
744 primitive_id = str(uuid.uuid1())
745 result = None
746 self.log.debug(
747 "Execute primitive internal: id:{}, name:{}, params: {}".format(
748 primitive_id, primitive_name, params
749 )
750 )
751 await stream.send_message(
752 PrimitiveRequest(
753 id=primitive_id, name=primitive_name, params=yaml.dump(params)
754 ),
755 end=True,
756 )
757 async for reply in stream:
758 self.log.debug("Received reply: {}".format(reply))
759 result = reply
760 # If db_dict provided write notifs in database
761 if db_dict:
762 self._write_op_detailed_status(
763 db_dict, reply.status, reply.detailed_message
764 )
765 if result:
766 return reply.status, reply.detailed_message
767 else:
768 return "ERROR", "No result received"
769
770 ssl_context = create_secure_context(self.vca_config.ca_store)
771 channel = Channel(
772 ip_addr, self.vca_config.helm_ee_service_port, ssl=ssl_context
773 )
774 try:
775 return await execute()
776 except ssl.SSLError as ssl_error: # fallback to insecure gRPC
777 if (
778 ssl_error.reason == "WRONG_VERSION_NUMBER"
779 and not self.vca_config.eegrpc_tls_enforce
780 ):
781 self.log.debug(
782 "Execution environment doesn't support TLS, falling back to unsecure gRPC"
783 )
784 channel = Channel(ip_addr, self.vca_config.helm_ee_service_port)
785 return await execute()
786 elif ssl_error.reason == "WRONG_VERSION_NUMBER":
787 raise N2VCException(
788 "Execution environment doesn't support TLS, primitives cannot be executed"
789 )
790 else:
791 raise
792 finally:
793 channel.close()
794
795 def _write_op_detailed_status(self, db_dict, status, detailed_message):
796 # write ee_id to database: _admin.deployed.VCA.x
797 try:
798 the_table = db_dict["collection"]
799 the_filter = db_dict["filter"]
800 update_dict = {"detailed-status": "{}: {}".format(status, detailed_message)}
801 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
802 self.db.set_one(
803 table=the_table,
804 q_filter=the_filter,
805 update_dict=update_dict,
806 fail_on_empty=True,
807 )
808 except asyncio.CancelledError:
809 raise
810 except Exception as e:
811 self.log.error("Error writing detailedStatus to database: {}".format(e))
812
813 async def _get_system_cluster_id(self):
814 if not self._system_cluster_id:
815 db_k8cluster = self.db.get_one(
816 "k8sclusters", {"name": self.vca_config.kubectl_osm_cluster_name}
817 )
818 k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id"))
819 if not k8s_hc_id:
820 try:
821 # backward compatibility for existing clusters that have not been initialized for helm v3
822 cluster_id = db_k8cluster.get("_id")
823 k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials"))
824 k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env(
825 k8s_credentials, reuse_cluster_uuid=cluster_id
826 )
827 db_k8scluster_update = {
828 "_admin.helm-chart-v3.error_msg": None,
829 "_admin.helm-chart-v3.id": k8s_hc_id,
830 "_admin.helm-chart-v3}.created": uninstall_sw,
831 "_admin.helm-chart-v3.operationalState": "ENABLED",
832 }
833 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
834 except Exception as e:
835 self.log.error(
836 "error initializing helm-v3 cluster: {}".format(str(e))
837 )
838 raise N2VCException(
839 "K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
840 cluster_id
841 )
842 )
843 self._system_cluster_id = k8s_hc_id
844 return self._system_cluster_id