Fixes VCA deletion in NS termination
[osm/LCM.git] / osm_lcm / lcm_helm_conn.py
1 ##
2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 ##
18 import functools
19 import yaml
20 import asyncio
21 import uuid
22 import os
23 import ssl
24
25 from grpclib.client import Channel
26
27 from osm_lcm.data_utils.lcm_config import VcaConfig
28 from osm_lcm.frontend_pb2 import PrimitiveRequest
29 from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply
30 from osm_lcm.frontend_grpc import FrontendExecutorStub
31 from osm_lcm.lcm_utils import LcmBase, get_ee_id_parts
32
33 from osm_lcm.data_utils.database.database import Database
34 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
35
36 from n2vc.n2vc_conn import N2VCConnector
37 from n2vc.k8s_helm3_conn import K8sHelm3Connector
38 from n2vc.exceptions import (
39 N2VCBadArgumentsException,
40 N2VCException,
41 N2VCExecutionException,
42 )
43
44 from osm_lcm.lcm_utils import deep_get
45
46
47 def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"):
48 def wrapper(func):
49 retry_exceptions = (ConnectionRefusedError, TimeoutError)
50
51 @functools.wraps(func)
52 async def wrapped(*args, **kwargs):
53 # default values for wait time and delay_time
54 delay_time = 10
55 max_wait_time = 300
56
57 # obtain arguments from variable names
58 self = args[0]
59 if self.__dict__.get(max_wait_time_var):
60 max_wait_time = self.__dict__.get(max_wait_time_var)
61 if self.__dict__.get(delay_time_var):
62 delay_time = self.__dict__.get(delay_time_var)
63
64 wait_time = max_wait_time
65 while wait_time > 0:
66 try:
67 return await func(*args, **kwargs)
68 except retry_exceptions:
69 wait_time = wait_time - delay_time
70 await asyncio.sleep(delay_time)
71 continue
72 else:
73 return ConnectionRefusedError
74
75 return wrapped
76
77 return wrapper
78
79
80 def create_secure_context(
81 trusted: str, client_cert_path: str, client_key_path: str
82 ) -> ssl.SSLContext:
83 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
84 ctx.verify_mode = ssl.CERT_REQUIRED
85 ctx.check_hostname = True
86 ctx.minimum_version = ssl.TLSVersion.TLSv1_2
87 ctx.load_cert_chain(client_cert_path, client_key_path)
88 ctx.load_verify_locations(trusted)
89 ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20")
90 ctx.set_alpn_protocols(["h2"])
91 return ctx
92
93
94 class LCMHelmConn(N2VCConnector, LcmBase):
95 def __init__(
96 self,
97 log: object = None,
98 vca_config: VcaConfig = None,
99 on_update_db=None,
100 ):
101 """
102 Initialize EE helm connector.
103 """
104
105 self.db = Database().instance.db
106 self.fs = Filesystem().instance.fs
107
108 # parent class constructor
109 N2VCConnector.__init__(
110 self, log=log, on_update_db=on_update_db, db=self.db, fs=self.fs
111 )
112
113 self.vca_config = vca_config
114 self.log.debug("Initialize helm N2VC connector")
115 self.log.debug("initial vca_config: {}".format(vca_config.to_dict()))
116
117 self._retry_delay = self.vca_config.helm_ee_retry_delay
118
119 self._initial_retry_time = self.vca_config.helm_max_initial_retry_time
120 self.log.debug("Initial retry time: {}".format(self._initial_retry_time))
121
122 self._max_retry_time = self.vca_config.helm_max_retry_time
123 self.log.debug("Retry time: {}".format(self._max_retry_time))
124
125 # initialize helm connector for helmv3
126 self._k8sclusterhelm3 = K8sHelm3Connector(
127 kubectl_command=self.vca_config.kubectlpath,
128 helm_command=self.vca_config.helm3path,
129 fs=self.fs,
130 log=self.log,
131 db=self.db,
132 on_update_db=None,
133 )
134
135 self._system_cluster_id = None
136 self.log.info("Helm N2VC connector initialized")
137
138 # TODO - ¿reuse_ee_id?
139 async def create_execution_environment(
140 self,
141 namespace: str,
142 db_dict: dict,
143 reuse_ee_id: str = None,
144 progress_timeout: float = None,
145 total_timeout: float = None,
146 config: dict = None,
147 artifact_path: str = None,
148 chart_model: str = None,
149 vca_type: str = None,
150 *kargs,
151 **kwargs,
152 ) -> (str, dict):
153 """
154 Creates a new helm execution environment deploying the helm-chat indicated in the
155 artifact_path
156 :param str namespace: This param is not used, all helm charts are deployed in the osm
157 system namespace
158 :param dict db_dict: where to write to database when the status changes.
159 It contains a dictionary with {collection: str, filter: {}, path: str},
160 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
161 "_admin.deployed.VCA.3"}
162 :param str reuse_ee_id: ee id from an older execution. TODO - right now this param is not used
163 :param float progress_timeout:
164 :param float total_timeout:
165 :param dict config: General variables to instantiate KDU
166 :param str artifact_path: path of package content
167 :param str chart_model: helm chart/reference (string), which can be either
168 of these options:
169 - a name of chart available via the repos known by OSM
170 (e.g. stable/openldap, stable/openldap:1.2.4)
171 - a path to a packaged chart (e.g. mychart.tgz)
172 - a path to an unpacked chart directory or a URL (e.g. mychart)
173 :param str vca_type: Type of vca, must be type helm-v3
174 :returns str, dict: id of the new execution environment including namespace.helm_id
175 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
176 """
177
178 if not namespace:
179 namespace = self.vca_config.kubectl_osm_namespace
180
181 self.log.info(
182 "create_execution_environment: namespace: {}, artifact_path: {}, "
183 "chart_model: {}, db_dict: {}, reuse_ee_id: {}".format(
184 namespace, artifact_path, db_dict, chart_model, reuse_ee_id
185 )
186 )
187
188 # Validate artifact-path is provided
189 if artifact_path is None or len(artifact_path) == 0:
190 raise N2VCBadArgumentsException(
191 message="artifact_path is mandatory", bad_args=["artifact_path"]
192 )
193
194 # Validate artifact-path exists and sync path
195 from_path = os.path.split(artifact_path)[0]
196 self.fs.sync(from_path)
197
198 # remove / in charm path
199 while artifact_path.find("//") >= 0:
200 artifact_path = artifact_path.replace("//", "/")
201
202 # check charm path
203 if self.fs.file_exists(artifact_path):
204 helm_chart_path = artifact_path
205 else:
206 msg = "artifact path does not exist: {}".format(artifact_path)
207 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
208
209 if artifact_path.startswith("/"):
210 full_path = self.fs.path + helm_chart_path
211 else:
212 full_path = self.fs.path + "/" + helm_chart_path
213
214 while full_path.find("//") >= 0:
215 full_path = full_path.replace("//", "/")
216
217 # By default, the KDU is expected to be a file
218 kdu_model = full_path
219 # If the chart_model includes a "/", then it is a reference:
220 # e.g. (stable/openldap; stable/openldap:1.2.4)
221 if chart_model.find("/") >= 0:
222 kdu_model = chart_model
223
224 try:
225 # Call helm conn install
226 # Obtain system cluster id from database
227 system_cluster_uuid = await self._get_system_cluster_id()
228 # Add parameter osm if exist to global
229 if config and config.get("osm"):
230 if not config.get("global"):
231 config["global"] = {}
232 config["global"]["osm"] = config.get("osm")
233
234 self.log.debug("install helm chart: {}".format(full_path))
235 helm_id = self._k8sclusterhelm3.generate_kdu_instance_name(
236 db_dict=db_dict,
237 kdu_model=kdu_model,
238 )
239 await self._k8sclusterhelm3.install(
240 system_cluster_uuid,
241 kdu_model=kdu_model,
242 kdu_instance=helm_id,
243 namespace=namespace,
244 params=config,
245 db_dict=db_dict,
246 timeout=progress_timeout,
247 )
248
249 ee_id = "{}:{}.{}".format(vca_type, namespace, helm_id)
250 return ee_id, None
251 except N2VCException:
252 raise
253 except Exception as e:
254 self.log.error("Error deploying chart ee: {}".format(e), exc_info=True)
255 raise N2VCException("Error deploying chart ee: {}".format(e))
256
257 async def upgrade_execution_environment(
258 self,
259 namespace: str,
260 db_dict: dict,
261 helm_id: str,
262 progress_timeout: float = None,
263 total_timeout: float = None,
264 config: dict = None,
265 artifact_path: str = None,
266 vca_type: str = None,
267 *kargs,
268 **kwargs,
269 ) -> (str, dict):
270 """
271 Creates a new helm execution environment deploying the helm-chat indicated in the
272 attifact_path
273 :param str namespace: This param is not used, all helm charts are deployed in the osm
274 system namespace
275 :param dict db_dict: where to write to database when the status changes.
276 It contains a dictionary with {collection: str, filter: {}, path: str},
277 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
278 "_admin.deployed.VCA.3"}
279 :param helm_id: unique name of the Helm release to upgrade
280 :param float progress_timeout:
281 :param float total_timeout:
282 :param dict config: General variables to instantiate KDU
283 :param str artifact_path: path of package content
284 :param str vca_type: Type of vca, must be type helm-v3
285 :returns str, dict: id of the new execution environment including namespace.helm_id
286 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
287 """
288
289 self.log.info(
290 "upgrade_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
291 )
292
293 # Validate helm_id is provided
294 if helm_id is None or len(helm_id) == 0:
295 raise N2VCBadArgumentsException(
296 message="helm_id is mandatory", bad_args=["helm_id"]
297 )
298
299 # Validate artifact-path is provided
300 if artifact_path is None or len(artifact_path) == 0:
301 raise N2VCBadArgumentsException(
302 message="artifact_path is mandatory", bad_args=["artifact_path"]
303 )
304
305 # Validate artifact-path exists and sync path
306 from_path = os.path.split(artifact_path)[0]
307 self.fs.sync(from_path)
308
309 # remove / in charm path
310 while artifact_path.find("//") >= 0:
311 artifact_path = artifact_path.replace("//", "/")
312
313 # check charm path
314 if self.fs.file_exists(artifact_path):
315 helm_chart_path = artifact_path
316 else:
317 msg = "artifact path does not exist: {}".format(artifact_path)
318 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
319
320 if artifact_path.startswith("/"):
321 full_path = self.fs.path + helm_chart_path
322 else:
323 full_path = self.fs.path + "/" + helm_chart_path
324
325 while full_path.find("//") >= 0:
326 full_path = full_path.replace("//", "/")
327
328 try:
329 # Call helm conn upgrade
330 # Obtain system cluster id from database
331 system_cluster_uuid = await self._get_system_cluster_id()
332 # Add parameter osm if exist to global
333 if config and config.get("osm"):
334 if not config.get("global"):
335 config["global"] = {}
336 config["global"]["osm"] = config.get("osm")
337
338 self.log.debug("Ugrade helm chart: {}".format(full_path))
339 await self._k8sclusterhelm3.upgrade(
340 system_cluster_uuid,
341 kdu_model=full_path,
342 kdu_instance=helm_id,
343 namespace=namespace,
344 params=config,
345 db_dict=db_dict,
346 timeout=progress_timeout,
347 force=True,
348 )
349
350 except N2VCException:
351 raise
352 except Exception as e:
353 self.log.error("Error upgrading chart ee: {}".format(e), exc_info=True)
354 raise N2VCException("Error upgrading chart ee: {}".format(e))
355
356 async def create_tls_certificate(
357 self,
358 nsr_id: str,
359 secret_name: str,
360 usage: str,
361 dns_prefix: str,
362 namespace: str = None,
363 ):
364 # Obtain system cluster id from database
365 system_cluster_uuid = await self._get_system_cluster_id()
366 # use helm-v3 as certificates don't depend on helm version
367 await self._k8sclusterhelm3.create_certificate(
368 cluster_uuid=system_cluster_uuid,
369 namespace=namespace or self.vca_config.kubectl_osm_namespace,
370 dns_prefix=dns_prefix,
371 name=nsr_id,
372 secret_name=secret_name,
373 usage=usage,
374 )
375
376 async def delete_tls_certificate(
377 self,
378 certificate_name: str = None,
379 namespace: str = None,
380 ):
381 # Obtain system cluster id from database
382 system_cluster_uuid = await self._get_system_cluster_id()
383 await self._k8sclusterhelm3.delete_certificate(
384 cluster_uuid=system_cluster_uuid,
385 namespace=namespace or self.vca_config.kubectl_osm_namespace,
386 certificate_name=certificate_name,
387 )
388
389 async def setup_ns_namespace(
390 self,
391 name: str,
392 ):
393 # Obtain system cluster id from database
394 system_cluster_uuid = await self._get_system_cluster_id()
395 await self._k8sclusterhelm3.create_namespace(
396 namespace=name,
397 cluster_uuid=system_cluster_uuid,
398 labels={
399 "pod-security.kubernetes.io/enforce": self.vca_config.eegrpc_pod_admission_policy
400 },
401 )
402 await self._k8sclusterhelm3.setup_default_rbac(
403 name="ee-role",
404 namespace=name,
405 api_groups=[""],
406 resources=["secrets"],
407 verbs=["get"],
408 service_account="default",
409 cluster_uuid=system_cluster_uuid,
410 )
411 await self._k8sclusterhelm3.copy_secret_data(
412 src_secret="osm-ca",
413 dst_secret="osm-ca",
414 src_namespace=self.vca_config.kubectl_osm_namespace,
415 dst_namespace=name,
416 cluster_uuid=system_cluster_uuid,
417 data_key="ca.crt",
418 )
419
420 async def register_execution_environment(
421 self,
422 namespace: str,
423 credentials: dict,
424 db_dict: dict,
425 progress_timeout: float = None,
426 total_timeout: float = None,
427 *kargs,
428 **kwargs,
429 ) -> str:
430 # nothing to do
431 pass
432
433 async def install_configuration_sw(self, *args, **kwargs):
434 # nothing to do
435 pass
436
437 async def add_relation(self, *args, **kwargs):
438 # nothing to do
439 pass
440
441 async def remove_relation(self):
442 # nothing to to
443 pass
444
445 async def get_status(self, *args, **kwargs):
446 # not used for this connector
447 pass
448
449 async def get_ee_ssh_public__key(
450 self,
451 ee_id: str,
452 db_dict: dict,
453 progress_timeout: float = None,
454 total_timeout: float = None,
455 **kwargs,
456 ) -> str:
457 """
458 Obtains ssh-public key from ee executing GetSShKey method from the ee.
459
460 :param str ee_id: the id of the execution environment returned by
461 create_execution_environment or register_execution_environment
462 :param dict db_dict:
463 :param float progress_timeout:
464 :param float total_timeout:
465 :returns: public key of the execution environment
466 """
467
468 self.log.info(
469 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id, db_dict)
470 )
471
472 # check arguments
473 if ee_id is None or len(ee_id) == 0:
474 raise N2VCBadArgumentsException(
475 message="ee_id is mandatory", bad_args=["ee_id"]
476 )
477
478 try:
479 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
480 version, namespace, helm_id = get_ee_id_parts(ee_id)
481 ip_addr = "{}.{}.svc".format(helm_id, namespace)
482 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
483 # install libraries and start successfully
484 ssh_key = await self._get_ssh_key(ip_addr)
485 return ssh_key
486 except Exception as e:
487 self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True)
488 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e))
489
490 async def upgrade_charm(
491 self,
492 ee_id: str = None,
493 path: str = None,
494 charm_id: str = None,
495 charm_type: str = None,
496 timeout: float = None,
497 ) -> str:
498 """This method upgrade charms in VNFs
499
500 This method does not support KDU's deployed with Helm.
501
502 Args:
503 ee_id: Execution environment id
504 path: Local path to the charm
505 charm_id: charm-id
506 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
507 timeout: (Float) Timeout for the ns update operation
508
509 Returns:
510 the output of the update operation if status equals to "completed"
511
512 """
513 raise N2VCException("KDUs deployed with Helm do not support charm upgrade")
514
515 async def exec_primitive(
516 self,
517 ee_id: str,
518 primitive_name: str,
519 params_dict: dict,
520 db_dict: dict = None,
521 progress_timeout: float = None,
522 total_timeout: float = None,
523 **kwargs,
524 ) -> str:
525 """
526 Execute a primitive in the execution environment
527
528 :param str ee_id: the one returned by create_execution_environment or
529 register_execution_environment with the format namespace.helm_id
530 :param str primitive_name: must be one defined in the software. There is one
531 called 'config', where, for the proxy case, the 'credentials' of VM are
532 provided
533 :param dict params_dict: parameters of the action
534 :param dict db_dict: where to write into database when the status changes.
535 It contains a dict with
536 {collection: <str>, filter: {}, path: <str>},
537 e.g. {collection: "nslcmops", filter:
538 {_id: <nslcmop_id>, path: "_admin.VCA"}
539 It will be used to store information about intermediate notifications
540 :param float progress_timeout:
541 :param float total_timeout:
542 :returns str: primitive result, if ok. It raises exceptions in case of fail
543 """
544
545 self.log.info(
546 "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
547 ee_id, primitive_name, params_dict, db_dict
548 )
549 )
550
551 # check arguments
552 if ee_id is None or len(ee_id) == 0:
553 raise N2VCBadArgumentsException(
554 message="ee_id is mandatory", bad_args=["ee_id"]
555 )
556 if primitive_name is None or len(primitive_name) == 0:
557 raise N2VCBadArgumentsException(
558 message="action_name is mandatory", bad_args=["action_name"]
559 )
560 if params_dict is None:
561 params_dict = dict()
562
563 try:
564 version, namespace, helm_id = get_ee_id_parts(ee_id)
565 ip_addr = "{}.{}.svc".format(helm_id, namespace)
566 except Exception as e:
567 self.log.error("Error getting ee ip ee: {}".format(e))
568 raise N2VCException("Error getting ee ip ee: {}".format(e))
569
570 if primitive_name == "config":
571 try:
572 # Execute config primitive, higher timeout to check the case ee is starting
573 status, detailed_message = await self._execute_config_primitive(
574 ip_addr, params_dict, db_dict=db_dict
575 )
576 self.log.debug(
577 "Executed config primitive ee_id_ {}, status: {}, message: {}".format(
578 ee_id, status, detailed_message
579 )
580 )
581 if status != "OK":
582 self.log.error(
583 "Error configuring helm ee, status: {}, message: {}".format(
584 status, detailed_message
585 )
586 )
587 raise N2VCExecutionException(
588 message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
589 ee_id, status, detailed_message
590 ),
591 primitive_name=primitive_name,
592 )
593 except Exception as e:
594 self.log.error("Error configuring helm ee: {}".format(e))
595 raise N2VCExecutionException(
596 message="Error configuring helm ee_id: {}, {}".format(ee_id, e),
597 primitive_name=primitive_name,
598 )
599 return "CONFIG OK"
600 else:
601 try:
602 # Execute primitive
603 status, detailed_message = await self._execute_primitive(
604 ip_addr, primitive_name, params_dict, db_dict=db_dict
605 )
606 self.log.debug(
607 "Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
608 primitive_name, ee_id, status, detailed_message
609 )
610 )
611 if status != "OK" and status != "PROCESSING":
612 self.log.error(
613 "Execute primitive {} returned not ok status: {}, message: {}".format(
614 primitive_name, status, detailed_message
615 )
616 )
617 raise N2VCExecutionException(
618 message="Execute primitive {} returned not ok status: {}, message: {}".format(
619 primitive_name, status, detailed_message
620 ),
621 primitive_name=primitive_name,
622 )
623 except Exception as e:
624 self.log.error(
625 "Error executing primitive {}: {}".format(primitive_name, e)
626 )
627 raise N2VCExecutionException(
628 message="Error executing primitive {} into ee={} : {}".format(
629 primitive_name, ee_id, e
630 ),
631 primitive_name=primitive_name,
632 )
633 return detailed_message
634
635 async def deregister_execution_environments(self):
636 # nothing to be done
637 pass
638
639 async def delete_execution_environment(
640 self,
641 ee_id: str,
642 db_dict: dict = None,
643 total_timeout: float = None,
644 **kwargs,
645 ):
646 """
647 Delete an execution environment
648 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
649 :param dict db_dict: where to write into database when the status changes.
650 It contains a dict with
651 {collection: <str>, filter: {}, path: <str>},
652 e.g. {collection: "nsrs", filter:
653 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
654 :param float total_timeout:
655 """
656
657 self.log.info("ee_id: {}".format(ee_id))
658
659 # check arguments
660 if ee_id is None:
661 raise N2VCBadArgumentsException(
662 message="ee_id is mandatory", bad_args=["ee_id"]
663 )
664
665 try:
666 # Obtain cluster_uuid
667 system_cluster_uuid = await self._get_system_cluster_id()
668
669 # Get helm_id
670 version, namespace, helm_id = get_ee_id_parts(ee_id)
671
672 await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id)
673 self.log.info("ee_id: {} deleted".format(ee_id))
674 except N2VCException:
675 raise
676 except Exception as e:
677 self.log.error(
678 "Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True
679 )
680 raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e))
681
682 async def delete_namespace(
683 self, namespace: str, db_dict: dict = None, total_timeout: float = None
684 ):
685 # Obtain system cluster id from database
686 system_cluster_uuid = await self._get_system_cluster_id()
687 await self._k8sclusterhelm3.delete_namespace(
688 namespace=namespace,
689 cluster_uuid=system_cluster_uuid,
690 )
691
692 async def install_k8s_proxy_charm(
693 self,
694 charm_name: str,
695 namespace: str,
696 artifact_path: str,
697 db_dict: dict,
698 progress_timeout: float = None,
699 total_timeout: float = None,
700 config: dict = None,
701 *kargs,
702 **kwargs,
703 ) -> str:
704 pass
705
706 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
707 async def _get_ssh_key(self, ip_addr):
708 return await self._execute_primitive_internal(
709 ip_addr,
710 "_get_ssh_key",
711 None,
712 )
713
714 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
715 async def _execute_config_primitive(self, ip_addr, params, db_dict=None):
716 return await self._execute_primitive_internal(
717 ip_addr, "config", params, db_dict=db_dict
718 )
719
720 @retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay")
721 async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None):
722 return await self._execute_primitive_internal(
723 ip_addr, primitive_name, params, db_dict=db_dict
724 )
725
726 async def _execute_primitive_internal(
727 self, ip_addr, primitive_name, params, db_dict=None
728 ):
729 async def execute():
730 stub = FrontendExecutorStub(channel)
731 if primitive_name == "_get_ssh_key":
732 self.log.debug("get ssh key, ip_addr: {}".format(ip_addr))
733 reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest())
734 return reply.message
735 # For any other primitives
736 async with stub.RunPrimitive.open() as stream:
737 primitive_id = str(uuid.uuid1())
738 result = None
739 self.log.debug(
740 "Execute primitive internal: id:{}, name:{}, params: {}".format(
741 primitive_id, primitive_name, params
742 )
743 )
744 await stream.send_message(
745 PrimitiveRequest(
746 id=primitive_id, name=primitive_name, params=yaml.dump(params)
747 ),
748 end=True,
749 )
750 async for reply in stream:
751 self.log.debug("Received reply: {}".format(reply))
752 result = reply
753 # If db_dict provided write notifs in database
754 if db_dict:
755 self._write_op_detailed_status(
756 db_dict, reply.status, reply.detailed_message
757 )
758 if result:
759 return reply.status, reply.detailed_message
760 else:
761 return "ERROR", "No result received"
762
763 ssl_context = create_secure_context(
764 self.vca_config.ca_store,
765 self.vca_config.client_cert_path,
766 self.vca_config.client_key_path,
767 )
768 channel = Channel(
769 ip_addr, self.vca_config.helm_ee_service_port, ssl=ssl_context
770 )
771 try:
772 return await execute()
773 except ssl.SSLError as ssl_error: # fallback to insecure gRPC
774 if (
775 ssl_error.reason == "WRONG_VERSION_NUMBER"
776 and not self.vca_config.eegrpc_tls_enforce
777 ):
778 self.log.debug(
779 "Execution environment doesn't support TLS, falling back to unsecure gRPC"
780 )
781 channel = Channel(ip_addr, self.vca_config.helm_ee_service_port)
782 return await execute()
783 elif ssl_error.reason == "WRONG_VERSION_NUMBER":
784 raise N2VCException(
785 "Execution environment doesn't support TLS, primitives cannot be executed"
786 )
787 else:
788 raise
789 finally:
790 channel.close()
791
792 def _write_op_detailed_status(self, db_dict, status, detailed_message):
793 # write ee_id to database: _admin.deployed.VCA.x
794 try:
795 the_table = db_dict["collection"]
796 the_filter = db_dict["filter"]
797 update_dict = {"detailed-status": "{}: {}".format(status, detailed_message)}
798 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
799 self.db.set_one(
800 table=the_table,
801 q_filter=the_filter,
802 update_dict=update_dict,
803 fail_on_empty=True,
804 )
805 except asyncio.CancelledError:
806 raise
807 except Exception as e:
808 self.log.error("Error writing detailedStatus to database: {}".format(e))
809
810 async def _get_system_cluster_id(self):
811 if not self._system_cluster_id:
812 db_k8cluster = self.db.get_one(
813 "k8sclusters", {"name": self.vca_config.kubectl_osm_cluster_name}
814 )
815 k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id"))
816 if not k8s_hc_id:
817 try:
818 # backward compatibility for existing clusters that have not been initialized for helm v3
819 cluster_id = db_k8cluster.get("_id")
820 k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials"))
821 k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env(
822 k8s_credentials, reuse_cluster_uuid=cluster_id
823 )
824 db_k8scluster_update = {
825 "_admin.helm-chart-v3.error_msg": None,
826 "_admin.helm-chart-v3.id": k8s_hc_id,
827 "_admin.helm-chart-v3}.created": uninstall_sw,
828 "_admin.helm-chart-v3.operationalState": "ENABLED",
829 }
830 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
831 except Exception as e:
832 self.log.error(
833 "error initializing helm-v3 cluster: {}".format(str(e))
834 )
835 raise N2VCException(
836 "K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
837 cluster_id
838 )
839 )
840 self._system_cluster_id = k8s_hc_id
841 return self._system_cluster_id