Feature 10929: LCM saga, Milestone 1.
[osm/LCM.git] / osm_lcm / lcm_helm_conn.py
1 ##
2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 ##
18 import functools
19 import yaml
20 import asyncio
21 import uuid
22 import os
23 import ssl
24
25 from grpclib.client import Channel
26
27 from osm_lcm.data_utils.lcm_config import VcaConfig
28 from osm_lcm.frontend_pb2 import PrimitiveRequest
29 from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply
30 from osm_lcm.frontend_grpc import FrontendExecutorStub
31 from osm_lcm.lcm_utils import LcmBase, get_ee_id_parts
32
33 from osm_lcm.data_utils.database.database import Database
34 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
35
36 from n2vc.n2vc_conn import N2VCConnector
37 from n2vc.k8s_helm_conn import K8sHelmConnector
38 from n2vc.k8s_helm3_conn import K8sHelm3Connector
39 from n2vc.exceptions import (
40 N2VCBadArgumentsException,
41 N2VCException,
42 N2VCExecutionException,
43 )
44
45 from osm_lcm.lcm_utils import deep_get
46
47
48 def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"):
49 def wrapper(func):
50 retry_exceptions = ConnectionRefusedError
51
52 @functools.wraps(func)
53 async def wrapped(*args, **kwargs):
54 # default values for wait time and delay_time
55 delay_time = 10
56 max_wait_time = 300
57
58 # obtain arguments from variable names
59 self = args[0]
60 if self.__dict__.get(max_wait_time_var):
61 max_wait_time = self.__dict__.get(max_wait_time_var)
62 if self.__dict__.get(delay_time_var):
63 delay_time = self.__dict__.get(delay_time_var)
64
65 wait_time = max_wait_time
66 while wait_time > 0:
67 try:
68 return await func(*args, **kwargs)
69 except retry_exceptions:
70 wait_time = wait_time - delay_time
71 await asyncio.sleep(delay_time)
72 continue
73 else:
74 return ConnectionRefusedError
75
76 return wrapped
77
78 return wrapper
79
80
81 def create_secure_context(
82 trusted: str,
83 ) -> ssl.SSLContext:
84 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
85 ctx.verify_mode = ssl.CERT_REQUIRED
86 ctx.check_hostname = True
87 ctx.minimum_version = ssl.TLSVersion.TLSv1_2
88 # TODO: client TLS
89 # ctx.load_cert_chain(str(client_cert), str(client_key))
90 ctx.load_verify_locations(trusted)
91 ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20")
92 ctx.set_alpn_protocols(["h2"])
93 try:
94 ctx.set_npn_protocols(["h2"])
95 except NotImplementedError:
96 pass
97 return ctx
98
99
100 class LCMHelmConn(N2VCConnector, LcmBase):
101 def __init__(
102 self,
103 log: object = None,
104 loop: object = None,
105 vca_config: VcaConfig = None,
106 on_update_db=None,
107 ):
108 """
109 Initialize EE helm connector.
110 """
111
112 self.db = Database().instance.db
113 self.fs = Filesystem().instance.fs
114
115 # parent class constructor
116 N2VCConnector.__init__(
117 self, log=log, loop=loop, on_update_db=on_update_db, db=self.db, fs=self.fs
118 )
119
120 self.vca_config = vca_config
121 self.log.debug("Initialize helm N2VC connector")
122 self.log.debug("initial vca_config: {}".format(vca_config.to_dict()))
123
124 self._retry_delay = self.vca_config.helm_ee_retry_delay
125
126 self._initial_retry_time = self.vca_config.helm_max_initial_retry_time
127 self.log.debug("Initial retry time: {}".format(self._initial_retry_time))
128
129 self._max_retry_time = self.vca_config.helm_max_retry_time
130 self.log.debug("Retry time: {}".format(self._max_retry_time))
131
132 # initialize helm connector for helmv2 and helmv3
133 self._k8sclusterhelm2 = K8sHelmConnector(
134 kubectl_command=self.vca_config.kubectlpath,
135 helm_command=self.vca_config.helmpath,
136 fs=self.fs,
137 db=self.db,
138 log=self.log,
139 on_update_db=None,
140 )
141
142 self._k8sclusterhelm3 = K8sHelm3Connector(
143 kubectl_command=self.vca_config.kubectlpath,
144 helm_command=self.vca_config.helm3path,
145 fs=self.fs,
146 log=self.log,
147 db=self.db,
148 on_update_db=None,
149 )
150
151 self._system_cluster_id = None
152 self.log.info("Helm N2VC connector initialized")
153
154 # TODO - ¿reuse_ee_id?
155 async def create_execution_environment(
156 self,
157 namespace: str,
158 db_dict: dict,
159 reuse_ee_id: str = None,
160 progress_timeout: float = None,
161 total_timeout: float = None,
162 config: dict = None,
163 artifact_path: str = None,
164 chart_model: str = None,
165 vca_type: str = None,
166 *kargs,
167 **kwargs,
168 ) -> (str, dict):
169 """
170 Creates a new helm execution environment deploying the helm-chat indicated in the
171 artifact_path
172 :param str namespace: This param is not used, all helm charts are deployed in the osm
173 system namespace
174 :param dict db_dict: where to write to database when the status changes.
175 It contains a dictionary with {collection: str, filter: {}, path: str},
176 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
177 "_admin.deployed.VCA.3"}
178 :param str reuse_ee_id: ee id from an older execution. TODO - right now this param is not used
179 :param float progress_timeout:
180 :param float total_timeout:
181 :param dict config: General variables to instantiate KDU
182 :param str artifact_path: path of package content
183 :param str chart_model: helm chart/reference (string), which can be either
184 of these options:
185 - a name of chart available via the repos known by OSM
186 (e.g. stable/openldap, stable/openldap:1.2.4)
187 - a path to a packaged chart (e.g. mychart.tgz)
188 - a path to an unpacked chart directory or a URL (e.g. mychart)
189 :param str vca_type: Type of vca, must be type helm or helm-v3
190 :returns str, dict: id of the new execution environment including namespace.helm_id
191 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
192 """
193
194 self.log.info(
195 "create_execution_environment: namespace: {}, artifact_path: {}, "
196 "chart_model: {}, db_dict: {}, reuse_ee_id: {}".format(
197 namespace, artifact_path, db_dict, chart_model, reuse_ee_id
198 )
199 )
200
201 # Validate artifact-path is provided
202 if artifact_path is None or len(artifact_path) == 0:
203 raise N2VCBadArgumentsException(
204 message="artifact_path is mandatory", bad_args=["artifact_path"]
205 )
206
207 # Validate artifact-path exists and sync path
208 from_path = os.path.split(artifact_path)[0]
209 self.fs.sync(from_path)
210
211 # remove / in charm path
212 while artifact_path.find("//") >= 0:
213 artifact_path = artifact_path.replace("//", "/")
214
215 # check charm path
216 if self.fs.file_exists(artifact_path):
217 helm_chart_path = artifact_path
218 else:
219 msg = "artifact path does not exist: {}".format(artifact_path)
220 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
221
222 if artifact_path.startswith("/"):
223 full_path = self.fs.path + helm_chart_path
224 else:
225 full_path = self.fs.path + "/" + helm_chart_path
226
227 while full_path.find("//") >= 0:
228 full_path = full_path.replace("//", "/")
229
230 # By default, the KDU is expected to be a file
231 kdu_model = full_path
232 # If the chart_model includes a "/", then it is a reference:
233 # e.g. (stable/openldap; stable/openldap:1.2.4)
234 if chart_model.find("/") >= 0:
235 kdu_model = chart_model
236
237 try:
238 # Call helm conn install
239 # Obtain system cluster id from database
240 system_cluster_uuid = await self._get_system_cluster_id()
241 # Add parameter osm if exist to global
242 if config and config.get("osm"):
243 if not config.get("global"):
244 config["global"] = {}
245 config["global"]["osm"] = config.get("osm")
246
247 self.log.debug("install helm chart: {}".format(full_path))
248 if vca_type == "helm":
249 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
250 db_dict=db_dict,
251 kdu_model=kdu_model,
252 )
253 await self._k8sclusterhelm2.install(
254 system_cluster_uuid,
255 kdu_model=kdu_model,
256 kdu_instance=helm_id,
257 namespace=self.vca_config.kubectl_osm_namespace,
258 params=config,
259 db_dict=db_dict,
260 timeout=progress_timeout,
261 )
262 else:
263 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
264 db_dict=db_dict,
265 kdu_model=kdu_model,
266 )
267 await self._k8sclusterhelm3.install(
268 system_cluster_uuid,
269 kdu_model=kdu_model,
270 kdu_instance=helm_id,
271 namespace=self.vca_config.kubectl_osm_namespace,
272 params=config,
273 db_dict=db_dict,
274 timeout=progress_timeout,
275 )
276
277 ee_id = "{}:{}.{}".format(
278 vca_type, self.vca_config.kubectl_osm_namespace, helm_id
279 )
280 return ee_id, None
281 except N2VCException:
282 raise
283 except Exception as e:
284 self.log.error("Error deploying chart ee: {}".format(e), exc_info=True)
285 raise N2VCException("Error deploying chart ee: {}".format(e))
286
287 async def upgrade_execution_environment(
288 self,
289 namespace: str,
290 db_dict: dict,
291 helm_id: str,
292 progress_timeout: float = None,
293 total_timeout: float = None,
294 config: dict = None,
295 artifact_path: str = None,
296 vca_type: str = None,
297 *kargs,
298 **kwargs,
299 ) -> (str, dict):
300 """
301 Creates a new helm execution environment deploying the helm-chat indicated in the
302 attifact_path
303 :param str namespace: This param is not used, all helm charts are deployed in the osm
304 system namespace
305 :param dict db_dict: where to write to database when the status changes.
306 It contains a dictionary with {collection: str, filter: {}, path: str},
307 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
308 "_admin.deployed.VCA.3"}
309 :param helm_id: unique name of the Helm release to upgrade
310 :param float progress_timeout:
311 :param float total_timeout:
312 :param dict config: General variables to instantiate KDU
313 :param str artifact_path: path of package content
314 :param str vca_type: Type of vca, must be type helm or helm-v3
315 :returns str, dict: id of the new execution environment including namespace.helm_id
316 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
317 """
318
319 self.log.info(
320 "upgrade_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
321 )
322
323 # Validate helm_id is provided
324 if helm_id is None or len(helm_id) == 0:
325 raise N2VCBadArgumentsException(
326 message="helm_id is mandatory", bad_args=["helm_id"]
327 )
328
329 # Validate artifact-path is provided
330 if artifact_path is None or len(artifact_path) == 0:
331 raise N2VCBadArgumentsException(
332 message="artifact_path is mandatory", bad_args=["artifact_path"]
333 )
334
335 # Validate artifact-path exists and sync path
336 from_path = os.path.split(artifact_path)[0]
337 self.fs.sync(from_path)
338
339 # remove / in charm path
340 while artifact_path.find("//") >= 0:
341 artifact_path = artifact_path.replace("//", "/")
342
343 # check charm path
344 if self.fs.file_exists(artifact_path):
345 helm_chart_path = artifact_path
346 else:
347 msg = "artifact path does not exist: {}".format(artifact_path)
348 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
349
350 if artifact_path.startswith("/"):
351 full_path = self.fs.path + helm_chart_path
352 else:
353 full_path = self.fs.path + "/" + helm_chart_path
354
355 while full_path.find("//") >= 0:
356 full_path = full_path.replace("//", "/")
357
358 try:
359 # Call helm conn upgrade
360 # Obtain system cluster id from database
361 system_cluster_uuid = await self._get_system_cluster_id()
362 # Add parameter osm if exist to global
363 if config and config.get("osm"):
364 if not config.get("global"):
365 config["global"] = {}
366 config["global"]["osm"] = config.get("osm")
367
368 self.log.debug("Ugrade helm chart: {}".format(full_path))
369 if vca_type == "helm":
370 await self._k8sclusterhelm2.upgrade(
371 system_cluster_uuid,
372 kdu_model=full_path,
373 kdu_instance=helm_id,
374 namespace=namespace,
375 params=config,
376 db_dict=db_dict,
377 timeout=progress_timeout,
378 force=True,
379 )
380 else:
381 await self._k8sclusterhelm3.upgrade(
382 system_cluster_uuid,
383 kdu_model=full_path,
384 kdu_instance=helm_id,
385 namespace=namespace,
386 params=config,
387 db_dict=db_dict,
388 timeout=progress_timeout,
389 force=True,
390 )
391
392 except N2VCException:
393 raise
394 except Exception as e:
395 self.log.error("Error upgrading chart ee: {}".format(e), exc_info=True)
396 raise N2VCException("Error upgrading chart ee: {}".format(e))
397
398 async def create_tls_certificate(
399 self,
400 nsr_id: str,
401 secret_name: str,
402 usage: str,
403 dns_prefix: str,
404 namespace: str = None,
405 ):
406 # Obtain system cluster id from database
407 system_cluster_uuid = await self._get_system_cluster_id()
408 # use helm-v3 as certificates don't depend on helm version
409 await self._k8sclusterhelm3.create_certificate(
410 cluster_uuid=system_cluster_uuid,
411 namespace=namespace or self.vca_config.kubectl_osm_namespace,
412 dns_prefix=dns_prefix,
413 name=nsr_id,
414 secret_name=secret_name,
415 usage=usage,
416 )
417
418 async def delete_tls_certificate(
419 self,
420 certificate_name: str = None,
421 namespace: str = None,
422 ):
423 # Obtain system cluster id from database
424 system_cluster_uuid = await self._get_system_cluster_id()
425 await self._k8sclusterhelm3.delete_certificate(
426 cluster_uuid=system_cluster_uuid,
427 namespace=namespace or self.vca_config.kubectl_osm_namespace,
428 certificate_name=certificate_name,
429 )
430
431 async def register_execution_environment(
432 self,
433 namespace: str,
434 credentials: dict,
435 db_dict: dict,
436 progress_timeout: float = None,
437 total_timeout: float = None,
438 *kargs,
439 **kwargs,
440 ) -> str:
441 # nothing to do
442 pass
443
444 async def install_configuration_sw(self, *args, **kwargs):
445 # nothing to do
446 pass
447
448 async def add_relation(self, *args, **kwargs):
449 # nothing to do
450 pass
451
452 async def remove_relation(self):
453 # nothing to to
454 pass
455
456 async def get_status(self, *args, **kwargs):
457 # not used for this connector
458 pass
459
460 async def get_ee_ssh_public__key(
461 self,
462 ee_id: str,
463 db_dict: dict,
464 progress_timeout: float = None,
465 total_timeout: float = None,
466 **kwargs,
467 ) -> str:
468 """
469 Obtains ssh-public key from ee executing GetSShKey method from the ee.
470
471 :param str ee_id: the id of the execution environment returned by
472 create_execution_environment or register_execution_environment
473 :param dict db_dict:
474 :param float progress_timeout:
475 :param float total_timeout:
476 :returns: public key of the execution environment
477 """
478
479 self.log.info(
480 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id, db_dict)
481 )
482
483 # check arguments
484 if ee_id is None or len(ee_id) == 0:
485 raise N2VCBadArgumentsException(
486 message="ee_id is mandatory", bad_args=["ee_id"]
487 )
488
489 try:
490 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
491 version, namespace, helm_id = get_ee_id_parts(ee_id)
492 ip_addr = "{}.{}.svc".format(helm_id, namespace)
493 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
494 # install libraries and start successfully
495 ssh_key = await self._get_ssh_key(ip_addr)
496 return ssh_key
497 except Exception as e:
498 self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True)
499 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e))
500
501 async def upgrade_charm(
502 self,
503 ee_id: str = None,
504 path: str = None,
505 charm_id: str = None,
506 charm_type: str = None,
507 timeout: float = None,
508 ) -> str:
509 """This method upgrade charms in VNFs
510
511 This method does not support KDU's deployed with Helm.
512
513 Args:
514 ee_id: Execution environment id
515 path: Local path to the charm
516 charm_id: charm-id
517 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
518 timeout: (Float) Timeout for the ns update operation
519
520 Returns:
521 the output of the update operation if status equals to "completed"
522
523 """
524 raise N2VCException("KDUs deployed with Helm do not support charm upgrade")
525
526 async def exec_primitive(
527 self,
528 ee_id: str,
529 primitive_name: str,
530 params_dict: dict,
531 db_dict: dict = None,
532 progress_timeout: float = None,
533 total_timeout: float = None,
534 **kwargs,
535 ) -> str:
536 """
537 Execute a primitive in the execution environment
538
539 :param str ee_id: the one returned by create_execution_environment or
540 register_execution_environment with the format namespace.helm_id
541 :param str primitive_name: must be one defined in the software. There is one
542 called 'config', where, for the proxy case, the 'credentials' of VM are
543 provided
544 :param dict params_dict: parameters of the action
545 :param dict db_dict: where to write into database when the status changes.
546 It contains a dict with
547 {collection: <str>, filter: {}, path: <str>},
548 e.g. {collection: "nslcmops", filter:
549 {_id: <nslcmop_id>, path: "_admin.VCA"}
550 It will be used to store information about intermediate notifications
551 :param float progress_timeout:
552 :param float total_timeout:
553 :returns str: primitive result, if ok. It raises exceptions in case of fail
554 """
555
556 self.log.info(
557 "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
558 ee_id, primitive_name, params_dict, db_dict
559 )
560 )
561
562 # check arguments
563 if ee_id is None or len(ee_id) == 0:
564 raise N2VCBadArgumentsException(
565 message="ee_id is mandatory", bad_args=["ee_id"]
566 )
567 if primitive_name is None or len(primitive_name) == 0:
568 raise N2VCBadArgumentsException(
569 message="action_name is mandatory", bad_args=["action_name"]
570 )
571 if params_dict is None:
572 params_dict = dict()
573
574 try:
575 version, namespace, helm_id = get_ee_id_parts(ee_id)
576 ip_addr = "{}.{}.svc".format(helm_id, namespace)
577 except Exception as e:
578 self.log.error("Error getting ee ip ee: {}".format(e))
579 raise N2VCException("Error getting ee ip ee: {}".format(e))
580
581 if primitive_name == "config":
582 try:
583 # Execute config primitive, higher timeout to check the case ee is starting
584 status, detailed_message = await self._execute_config_primitive(
585 ip_addr, params_dict, db_dict=db_dict
586 )
587 self.log.debug(
588 "Executed config primitive ee_id_ {}, status: {}, message: {}".format(
589 ee_id, status, detailed_message
590 )
591 )
592 if status != "OK":
593 self.log.error(
594 "Error configuring helm ee, status: {}, message: {}".format(
595 status, detailed_message
596 )
597 )
598 raise N2VCExecutionException(
599 message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
600 ee_id, status, detailed_message
601 ),
602 primitive_name=primitive_name,
603 )
604 except Exception as e:
605 self.log.error("Error configuring helm ee: {}".format(e))
606 raise N2VCExecutionException(
607 message="Error configuring helm ee_id: {}, {}".format(ee_id, e),
608 primitive_name=primitive_name,
609 )
610 return "CONFIG OK"
611 else:
612 try:
613 # Execute primitive
614 status, detailed_message = await self._execute_primitive(
615 ip_addr, primitive_name, params_dict, db_dict=db_dict
616 )
617 self.log.debug(
618 "Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
619 primitive_name, ee_id, status, detailed_message
620 )
621 )
622 if status != "OK" and status != "PROCESSING":
623 self.log.error(
624 "Execute primitive {} returned not ok status: {}, message: {}".format(
625 primitive_name, status, detailed_message
626 )
627 )
628 raise N2VCExecutionException(
629 message="Execute primitive {} returned not ok status: {}, message: {}".format(
630 primitive_name, status, detailed_message
631 ),
632 primitive_name=primitive_name,
633 )
634 except Exception as e:
635 self.log.error(
636 "Error executing primitive {}: {}".format(primitive_name, e)
637 )
638 raise N2VCExecutionException(
639 message="Error executing primitive {} into ee={} : {}".format(
640 primitive_name, ee_id, e
641 ),
642 primitive_name=primitive_name,
643 )
644 return detailed_message
645
646 async def deregister_execution_environments(self):
647 # nothing to be done
648 pass
649
650 async def delete_execution_environment(
651 self,
652 ee_id: str,
653 db_dict: dict = None,
654 total_timeout: float = None,
655 **kwargs,
656 ):
657 """
658 Delete an execution environment
659 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
660 :param dict db_dict: where to write into database when the status changes.
661 It contains a dict with
662 {collection: <str>, filter: {}, path: <str>},
663 e.g. {collection: "nsrs", filter:
664 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
665 :param float total_timeout:
666 """
667
668 self.log.info("ee_id: {}".format(ee_id))
669
670 # check arguments
671 if ee_id is None:
672 raise N2VCBadArgumentsException(
673 message="ee_id is mandatory", bad_args=["ee_id"]
674 )
675
676 try:
677
678 # Obtain cluster_uuid
679 system_cluster_uuid = await self._get_system_cluster_id()
680
681 # Get helm_id
682 version, namespace, helm_id = get_ee_id_parts(ee_id)
683
684 # Uninstall chart, for backward compatibility we must assume that if there is no
685 # version it is helm-v2
686 if version == "helm-v3":
687 await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id)
688 else:
689 await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id)
690 self.log.info("ee_id: {} deleted".format(ee_id))
691 except N2VCException:
692 raise
693 except Exception as e:
694 self.log.error(
695 "Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True
696 )
697 raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e))
698
699 async def delete_namespace(
700 self, namespace: str, db_dict: dict = None, total_timeout: float = None
701 ):
702 # method not implemented for this connector, execution environments must be deleted individually
703 pass
704
705 async def install_k8s_proxy_charm(
706 self,
707 charm_name: str,
708 namespace: str,
709 artifact_path: str,
710 db_dict: dict,
711 progress_timeout: float = None,
712 total_timeout: float = None,
713 config: dict = None,
714 *kargs,
715 **kwargs,
716 ) -> str:
717 pass
718
719 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
720 async def _get_ssh_key(self, ip_addr):
721 return await self._execute_primitive_internal(
722 ip_addr,
723 "_get_ssh_key",
724 None,
725 )
726
727 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
728 async def _execute_config_primitive(self, ip_addr, params, db_dict=None):
729 return await self._execute_primitive_internal(
730 ip_addr, "config", params, db_dict=db_dict
731 )
732
733 @retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay")
734 async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None):
735 return await self._execute_primitive_internal(
736 ip_addr, primitive_name, params, db_dict=db_dict
737 )
738
739 async def _execute_primitive_internal(
740 self, ip_addr, primitive_name, params, db_dict=None
741 ):
742 async def execute():
743 stub = FrontendExecutorStub(channel)
744 if primitive_name == "_get_ssh_key":
745 self.log.debug("get ssh key, ip_addr: {}".format(ip_addr))
746 reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest())
747 return reply.message
748 # For any other primitives
749 async with stub.RunPrimitive.open() as stream:
750 primitive_id = str(uuid.uuid1())
751 result = None
752 self.log.debug(
753 "Execute primitive internal: id:{}, name:{}, params: {}".format(
754 primitive_id, primitive_name, params
755 )
756 )
757 await stream.send_message(
758 PrimitiveRequest(
759 id=primitive_id, name=primitive_name, params=yaml.dump(params)
760 ),
761 end=True,
762 )
763 async for reply in stream:
764 self.log.debug("Received reply: {}".format(reply))
765 result = reply
766 # If db_dict provided write notifs in database
767 if db_dict:
768 self._write_op_detailed_status(
769 db_dict, reply.status, reply.detailed_message
770 )
771 if result:
772 return reply.status, reply.detailed_message
773 else:
774 return "ERROR", "No result received"
775
776 ssl_context = create_secure_context(self.vca_config.ca_store)
777 channel = Channel(
778 ip_addr, self.vca_config.helm_ee_service_port, ssl=ssl_context
779 )
780 try:
781 return await execute()
782 except ssl.SSLError as ssl_error: # fallback to insecure gRPC
783 if (
784 ssl_error.reason == "WRONG_VERSION_NUMBER"
785 and not self.vca_config.eegrpc_tls_enforce
786 ):
787 self.log.debug(
788 "Execution environment doesn't support TLS, falling back to unsecure gRPC"
789 )
790 channel = Channel(ip_addr, self.vca_config.helm_ee_service_port)
791 return await execute()
792 elif ssl_error.reason == "WRONG_VERSION_NUMBER":
793 raise N2VCException(
794 "Execution environment doesn't support TLS, primitives cannot be executed"
795 )
796 else:
797 raise
798 finally:
799 channel.close()
800
801 def _write_op_detailed_status(self, db_dict, status, detailed_message):
802
803 # write ee_id to database: _admin.deployed.VCA.x
804 try:
805 the_table = db_dict["collection"]
806 the_filter = db_dict["filter"]
807 update_dict = {"detailed-status": "{}: {}".format(status, detailed_message)}
808 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
809 self.db.set_one(
810 table=the_table,
811 q_filter=the_filter,
812 update_dict=update_dict,
813 fail_on_empty=True,
814 )
815 except asyncio.CancelledError:
816 raise
817 except Exception as e:
818 self.log.error("Error writing detailedStatus to database: {}".format(e))
819
820 async def _get_system_cluster_id(self):
821 if not self._system_cluster_id:
822 db_k8cluster = self.db.get_one(
823 "k8sclusters", {"name": self.vca_config.kubectl_osm_cluster_name}
824 )
825 k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id"))
826 if not k8s_hc_id:
827 try:
828 # backward compatibility for existing clusters that have not been initialized for helm v3
829 cluster_id = db_k8cluster.get("_id")
830 k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials"))
831 k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env(
832 k8s_credentials, reuse_cluster_uuid=cluster_id
833 )
834 db_k8scluster_update = {
835 "_admin.helm-chart-v3.error_msg": None,
836 "_admin.helm-chart-v3.id": k8s_hc_id,
837 "_admin.helm-chart-v3}.created": uninstall_sw,
838 "_admin.helm-chart-v3.operationalState": "ENABLED",
839 }
840 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
841 except Exception as e:
842 self.log.error(
843 "error initializing helm-v3 cluster: {}".format(str(e))
844 )
845 raise N2VCException(
846 "K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
847 cluster_id
848 )
849 )
850 self._system_cluster_id = k8s_hc_id
851 return self._system_cluster_id