d7db63991fde8e7de4f43908d1fe94e25aad85e2
[osm/LCM.git] / osm_lcm / lcm_helm_conn.py
1 ##
2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 ##
18 import functools
19 import yaml
20 import asyncio
21 import uuid
22 import os
23 import ssl
24
25 from grpclib.client import Channel
26
27 from osm_lcm.data_utils.lcm_config import VcaConfig
28 from osm_lcm.frontend_pb2 import PrimitiveRequest
29 from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply
30 from osm_lcm.frontend_grpc import FrontendExecutorStub
31 from osm_lcm.lcm_utils import LcmBase, get_ee_id_parts
32
33 from osm_lcm.data_utils.database.database import Database
34 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
35
36 from n2vc.n2vc_conn import N2VCConnector
37 from n2vc.k8s_helm_conn import K8sHelmConnector
38 from n2vc.k8s_helm3_conn import K8sHelm3Connector
39 from n2vc.exceptions import (
40 N2VCBadArgumentsException,
41 N2VCException,
42 N2VCExecutionException,
43 )
44
45 from osm_lcm.lcm_utils import deep_get
46
47
48 def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"):
49 def wrapper(func):
50 retry_exceptions = (ConnectionRefusedError, TimeoutError)
51
52 @functools.wraps(func)
53 async def wrapped(*args, **kwargs):
54 # default values for wait time and delay_time
55 delay_time = 10
56 max_wait_time = 300
57
58 # obtain arguments from variable names
59 self = args[0]
60 if self.__dict__.get(max_wait_time_var):
61 max_wait_time = self.__dict__.get(max_wait_time_var)
62 if self.__dict__.get(delay_time_var):
63 delay_time = self.__dict__.get(delay_time_var)
64
65 wait_time = max_wait_time
66 while wait_time > 0:
67 try:
68 return await func(*args, **kwargs)
69 except retry_exceptions:
70 wait_time = wait_time - delay_time
71 await asyncio.sleep(delay_time)
72 continue
73 else:
74 return ConnectionRefusedError
75
76 return wrapped
77
78 return wrapper
79
80
81 def create_secure_context(
82 trusted: str, client_cert_path: str, client_key_path: str
83 ) -> ssl.SSLContext:
84 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
85 ctx.verify_mode = ssl.CERT_REQUIRED
86 ctx.check_hostname = True
87 ctx.minimum_version = ssl.TLSVersion.TLSv1_2
88 ctx.load_cert_chain(client_cert_path, client_key_path)
89 ctx.load_verify_locations(trusted)
90 ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20")
91 ctx.set_alpn_protocols(["h2"])
92 return ctx
93
94
95 class LCMHelmConn(N2VCConnector, LcmBase):
96 def __init__(
97 self,
98 log: object = None,
99 vca_config: VcaConfig = None,
100 on_update_db=None,
101 ):
102 """
103 Initialize EE helm connector.
104 """
105
106 self.db = Database().instance.db
107 self.fs = Filesystem().instance.fs
108
109 # parent class constructor
110 N2VCConnector.__init__(
111 self, log=log, on_update_db=on_update_db, db=self.db, fs=self.fs
112 )
113
114 self.vca_config = vca_config
115 self.log.debug("Initialize helm N2VC connector")
116 self.log.debug("initial vca_config: {}".format(vca_config.to_dict()))
117
118 self._retry_delay = self.vca_config.helm_ee_retry_delay
119
120 self._initial_retry_time = self.vca_config.helm_max_initial_retry_time
121 self.log.debug("Initial retry time: {}".format(self._initial_retry_time))
122
123 self._max_retry_time = self.vca_config.helm_max_retry_time
124 self.log.debug("Retry time: {}".format(self._max_retry_time))
125
126 # initialize helm connector for helmv2 and helmv3
127 self._k8sclusterhelm2 = K8sHelmConnector(
128 kubectl_command=self.vca_config.kubectlpath,
129 helm_command=self.vca_config.helmpath,
130 fs=self.fs,
131 db=self.db,
132 log=self.log,
133 on_update_db=None,
134 )
135
136 self._k8sclusterhelm3 = K8sHelm3Connector(
137 kubectl_command=self.vca_config.kubectlpath,
138 helm_command=self.vca_config.helm3path,
139 fs=self.fs,
140 log=self.log,
141 db=self.db,
142 on_update_db=None,
143 )
144
145 self._system_cluster_id = None
146 self.log.info("Helm N2VC connector initialized")
147
148 # TODO - ¿reuse_ee_id?
149 async def create_execution_environment(
150 self,
151 namespace: str,
152 db_dict: dict,
153 reuse_ee_id: str = None,
154 progress_timeout: float = None,
155 total_timeout: float = None,
156 config: dict = None,
157 artifact_path: str = None,
158 chart_model: str = None,
159 vca_type: str = None,
160 *kargs,
161 **kwargs,
162 ) -> (str, dict):
163 """
164 Creates a new helm execution environment deploying the helm-chat indicated in the
165 artifact_path
166 :param str namespace: This param is not used, all helm charts are deployed in the osm
167 system namespace
168 :param dict db_dict: where to write to database when the status changes.
169 It contains a dictionary with {collection: str, filter: {}, path: str},
170 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
171 "_admin.deployed.VCA.3"}
172 :param str reuse_ee_id: ee id from an older execution. TODO - right now this param is not used
173 :param float progress_timeout:
174 :param float total_timeout:
175 :param dict config: General variables to instantiate KDU
176 :param str artifact_path: path of package content
177 :param str chart_model: helm chart/reference (string), which can be either
178 of these options:
179 - a name of chart available via the repos known by OSM
180 (e.g. stable/openldap, stable/openldap:1.2.4)
181 - a path to a packaged chart (e.g. mychart.tgz)
182 - a path to an unpacked chart directory or a URL (e.g. mychart)
183 :param str vca_type: Type of vca, must be type helm or helm-v3
184 :returns str, dict: id of the new execution environment including namespace.helm_id
185 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
186 """
187
188 if not namespace:
189 namespace = self.vca_config.kubectl_osm_namespace
190
191 self.log.info(
192 "create_execution_environment: namespace: {}, artifact_path: {}, "
193 "chart_model: {}, db_dict: {}, reuse_ee_id: {}".format(
194 namespace, artifact_path, db_dict, chart_model, reuse_ee_id
195 )
196 )
197
198 # Validate artifact-path is provided
199 if artifact_path is None or len(artifact_path) == 0:
200 raise N2VCBadArgumentsException(
201 message="artifact_path is mandatory", bad_args=["artifact_path"]
202 )
203
204 # Validate artifact-path exists and sync path
205 from_path = os.path.split(artifact_path)[0]
206 self.fs.sync(from_path)
207
208 # remove / in charm path
209 while artifact_path.find("//") >= 0:
210 artifact_path = artifact_path.replace("//", "/")
211
212 # check charm path
213 if self.fs.file_exists(artifact_path):
214 helm_chart_path = artifact_path
215 else:
216 msg = "artifact path does not exist: {}".format(artifact_path)
217 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
218
219 if artifact_path.startswith("/"):
220 full_path = self.fs.path + helm_chart_path
221 else:
222 full_path = self.fs.path + "/" + helm_chart_path
223
224 while full_path.find("//") >= 0:
225 full_path = full_path.replace("//", "/")
226
227 # By default, the KDU is expected to be a file
228 kdu_model = full_path
229 # If the chart_model includes a "/", then it is a reference:
230 # e.g. (stable/openldap; stable/openldap:1.2.4)
231 if chart_model.find("/") >= 0:
232 kdu_model = chart_model
233
234 try:
235 # Call helm conn install
236 # Obtain system cluster id from database
237 system_cluster_uuid = await self._get_system_cluster_id()
238 # Add parameter osm if exist to global
239 if config and config.get("osm"):
240 if not config.get("global"):
241 config["global"] = {}
242 config["global"]["osm"] = config.get("osm")
243
244 self.log.debug("install helm chart: {}".format(full_path))
245 if vca_type == "helm":
246 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
247 db_dict=db_dict,
248 kdu_model=kdu_model,
249 )
250 await self._k8sclusterhelm2.install(
251 system_cluster_uuid,
252 kdu_model=kdu_model,
253 kdu_instance=helm_id,
254 namespace=namespace,
255 params=config,
256 db_dict=db_dict,
257 timeout=progress_timeout,
258 )
259 else:
260 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
261 db_dict=db_dict,
262 kdu_model=kdu_model,
263 )
264 await self._k8sclusterhelm3.install(
265 system_cluster_uuid,
266 kdu_model=kdu_model,
267 kdu_instance=helm_id,
268 namespace=namespace,
269 params=config,
270 db_dict=db_dict,
271 timeout=progress_timeout,
272 )
273
274 ee_id = "{}:{}.{}".format(vca_type, namespace, helm_id)
275 return ee_id, None
276 except N2VCException:
277 raise
278 except Exception as e:
279 self.log.error("Error deploying chart ee: {}".format(e), exc_info=True)
280 raise N2VCException("Error deploying chart ee: {}".format(e))
281
282 async def upgrade_execution_environment(
283 self,
284 namespace: str,
285 db_dict: dict,
286 helm_id: str,
287 progress_timeout: float = None,
288 total_timeout: float = None,
289 config: dict = None,
290 artifact_path: str = None,
291 vca_type: str = None,
292 *kargs,
293 **kwargs,
294 ) -> (str, dict):
295 """
296 Creates a new helm execution environment deploying the helm-chat indicated in the
297 attifact_path
298 :param str namespace: This param is not used, all helm charts are deployed in the osm
299 system namespace
300 :param dict db_dict: where to write to database when the status changes.
301 It contains a dictionary with {collection: str, filter: {}, path: str},
302 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
303 "_admin.deployed.VCA.3"}
304 :param helm_id: unique name of the Helm release to upgrade
305 :param float progress_timeout:
306 :param float total_timeout:
307 :param dict config: General variables to instantiate KDU
308 :param str artifact_path: path of package content
309 :param str vca_type: Type of vca, must be type helm or helm-v3
310 :returns str, dict: id of the new execution environment including namespace.helm_id
311 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
312 """
313
314 self.log.info(
315 "upgrade_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
316 )
317
318 # Validate helm_id is provided
319 if helm_id is None or len(helm_id) == 0:
320 raise N2VCBadArgumentsException(
321 message="helm_id is mandatory", bad_args=["helm_id"]
322 )
323
324 # Validate artifact-path is provided
325 if artifact_path is None or len(artifact_path) == 0:
326 raise N2VCBadArgumentsException(
327 message="artifact_path is mandatory", bad_args=["artifact_path"]
328 )
329
330 # Validate artifact-path exists and sync path
331 from_path = os.path.split(artifact_path)[0]
332 self.fs.sync(from_path)
333
334 # remove / in charm path
335 while artifact_path.find("//") >= 0:
336 artifact_path = artifact_path.replace("//", "/")
337
338 # check charm path
339 if self.fs.file_exists(artifact_path):
340 helm_chart_path = artifact_path
341 else:
342 msg = "artifact path does not exist: {}".format(artifact_path)
343 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
344
345 if artifact_path.startswith("/"):
346 full_path = self.fs.path + helm_chart_path
347 else:
348 full_path = self.fs.path + "/" + helm_chart_path
349
350 while full_path.find("//") >= 0:
351 full_path = full_path.replace("//", "/")
352
353 try:
354 # Call helm conn upgrade
355 # Obtain system cluster id from database
356 system_cluster_uuid = await self._get_system_cluster_id()
357 # Add parameter osm if exist to global
358 if config and config.get("osm"):
359 if not config.get("global"):
360 config["global"] = {}
361 config["global"]["osm"] = config.get("osm")
362
363 self.log.debug("Ugrade helm chart: {}".format(full_path))
364 if vca_type == "helm":
365 await self._k8sclusterhelm2.upgrade(
366 system_cluster_uuid,
367 kdu_model=full_path,
368 kdu_instance=helm_id,
369 namespace=namespace,
370 params=config,
371 db_dict=db_dict,
372 timeout=progress_timeout,
373 force=True,
374 )
375 else:
376 await self._k8sclusterhelm3.upgrade(
377 system_cluster_uuid,
378 kdu_model=full_path,
379 kdu_instance=helm_id,
380 namespace=namespace,
381 params=config,
382 db_dict=db_dict,
383 timeout=progress_timeout,
384 force=True,
385 )
386
387 except N2VCException:
388 raise
389 except Exception as e:
390 self.log.error("Error upgrading chart ee: {}".format(e), exc_info=True)
391 raise N2VCException("Error upgrading chart ee: {}".format(e))
392
393 async def create_tls_certificate(
394 self,
395 nsr_id: str,
396 secret_name: str,
397 usage: str,
398 dns_prefix: str,
399 namespace: str = None,
400 ):
401 # Obtain system cluster id from database
402 system_cluster_uuid = await self._get_system_cluster_id()
403 # use helm-v3 as certificates don't depend on helm version
404 await self._k8sclusterhelm3.create_certificate(
405 cluster_uuid=system_cluster_uuid,
406 namespace=namespace or self.vca_config.kubectl_osm_namespace,
407 dns_prefix=dns_prefix,
408 name=nsr_id,
409 secret_name=secret_name,
410 usage=usage,
411 )
412
413 async def delete_tls_certificate(
414 self,
415 certificate_name: str = None,
416 namespace: str = None,
417 ):
418 # Obtain system cluster id from database
419 system_cluster_uuid = await self._get_system_cluster_id()
420 await self._k8sclusterhelm3.delete_certificate(
421 cluster_uuid=system_cluster_uuid,
422 namespace=namespace or self.vca_config.kubectl_osm_namespace,
423 certificate_name=certificate_name,
424 )
425
426 async def setup_ns_namespace(
427 self,
428 name: str,
429 ):
430 # Obtain system cluster id from database
431 system_cluster_uuid = await self._get_system_cluster_id()
432 await self._k8sclusterhelm3.create_namespace(
433 namespace=name,
434 cluster_uuid=system_cluster_uuid,
435 labels={
436 "pod-security.kubernetes.io/enforce": self.vca_config.eegrpc_pod_admission_policy
437 },
438 )
439 await self._k8sclusterhelm3.setup_default_rbac(
440 name="ee-role",
441 namespace=name,
442 api_groups=[""],
443 resources=["secrets"],
444 verbs=["get"],
445 service_account="default",
446 cluster_uuid=system_cluster_uuid,
447 )
448 await self._k8sclusterhelm3.copy_secret_data(
449 src_secret="osm-ca",
450 dst_secret="osm-ca",
451 src_namespace=self.vca_config.kubectl_osm_namespace,
452 dst_namespace=name,
453 cluster_uuid=system_cluster_uuid,
454 data_key="ca.crt",
455 )
456
457 async def register_execution_environment(
458 self,
459 namespace: str,
460 credentials: dict,
461 db_dict: dict,
462 progress_timeout: float = None,
463 total_timeout: float = None,
464 *kargs,
465 **kwargs,
466 ) -> str:
467 # nothing to do
468 pass
469
470 async def install_configuration_sw(self, *args, **kwargs):
471 # nothing to do
472 pass
473
474 async def add_relation(self, *args, **kwargs):
475 # nothing to do
476 pass
477
478 async def remove_relation(self):
479 # nothing to to
480 pass
481
482 async def get_status(self, *args, **kwargs):
483 # not used for this connector
484 pass
485
486 async def get_ee_ssh_public__key(
487 self,
488 ee_id: str,
489 db_dict: dict,
490 progress_timeout: float = None,
491 total_timeout: float = None,
492 **kwargs,
493 ) -> str:
494 """
495 Obtains ssh-public key from ee executing GetSShKey method from the ee.
496
497 :param str ee_id: the id of the execution environment returned by
498 create_execution_environment or register_execution_environment
499 :param dict db_dict:
500 :param float progress_timeout:
501 :param float total_timeout:
502 :returns: public key of the execution environment
503 """
504
505 self.log.info(
506 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id, db_dict)
507 )
508
509 # check arguments
510 if ee_id is None or len(ee_id) == 0:
511 raise N2VCBadArgumentsException(
512 message="ee_id is mandatory", bad_args=["ee_id"]
513 )
514
515 try:
516 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
517 version, namespace, helm_id = get_ee_id_parts(ee_id)
518 ip_addr = "{}.{}.svc".format(helm_id, namespace)
519 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
520 # install libraries and start successfully
521 ssh_key = await self._get_ssh_key(ip_addr)
522 return ssh_key
523 except Exception as e:
524 self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True)
525 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e))
526
527 async def upgrade_charm(
528 self,
529 ee_id: str = None,
530 path: str = None,
531 charm_id: str = None,
532 charm_type: str = None,
533 timeout: float = None,
534 ) -> str:
535 """This method upgrade charms in VNFs
536
537 This method does not support KDU's deployed with Helm.
538
539 Args:
540 ee_id: Execution environment id
541 path: Local path to the charm
542 charm_id: charm-id
543 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
544 timeout: (Float) Timeout for the ns update operation
545
546 Returns:
547 the output of the update operation if status equals to "completed"
548
549 """
550 raise N2VCException("KDUs deployed with Helm do not support charm upgrade")
551
552 async def exec_primitive(
553 self,
554 ee_id: str,
555 primitive_name: str,
556 params_dict: dict,
557 db_dict: dict = None,
558 progress_timeout: float = None,
559 total_timeout: float = None,
560 **kwargs,
561 ) -> str:
562 """
563 Execute a primitive in the execution environment
564
565 :param str ee_id: the one returned by create_execution_environment or
566 register_execution_environment with the format namespace.helm_id
567 :param str primitive_name: must be one defined in the software. There is one
568 called 'config', where, for the proxy case, the 'credentials' of VM are
569 provided
570 :param dict params_dict: parameters of the action
571 :param dict db_dict: where to write into database when the status changes.
572 It contains a dict with
573 {collection: <str>, filter: {}, path: <str>},
574 e.g. {collection: "nslcmops", filter:
575 {_id: <nslcmop_id>, path: "_admin.VCA"}
576 It will be used to store information about intermediate notifications
577 :param float progress_timeout:
578 :param float total_timeout:
579 :returns str: primitive result, if ok. It raises exceptions in case of fail
580 """
581
582 self.log.info(
583 "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
584 ee_id, primitive_name, params_dict, db_dict
585 )
586 )
587
588 # check arguments
589 if ee_id is None or len(ee_id) == 0:
590 raise N2VCBadArgumentsException(
591 message="ee_id is mandatory", bad_args=["ee_id"]
592 )
593 if primitive_name is None or len(primitive_name) == 0:
594 raise N2VCBadArgumentsException(
595 message="action_name is mandatory", bad_args=["action_name"]
596 )
597 if params_dict is None:
598 params_dict = dict()
599
600 try:
601 version, namespace, helm_id = get_ee_id_parts(ee_id)
602 ip_addr = "{}.{}.svc".format(helm_id, namespace)
603 except Exception as e:
604 self.log.error("Error getting ee ip ee: {}".format(e))
605 raise N2VCException("Error getting ee ip ee: {}".format(e))
606
607 if primitive_name == "config":
608 try:
609 # Execute config primitive, higher timeout to check the case ee is starting
610 status, detailed_message = await self._execute_config_primitive(
611 ip_addr, params_dict, db_dict=db_dict
612 )
613 self.log.debug(
614 "Executed config primitive ee_id_ {}, status: {}, message: {}".format(
615 ee_id, status, detailed_message
616 )
617 )
618 if status != "OK":
619 self.log.error(
620 "Error configuring helm ee, status: {}, message: {}".format(
621 status, detailed_message
622 )
623 )
624 raise N2VCExecutionException(
625 message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
626 ee_id, status, detailed_message
627 ),
628 primitive_name=primitive_name,
629 )
630 except Exception as e:
631 self.log.error("Error configuring helm ee: {}".format(e))
632 raise N2VCExecutionException(
633 message="Error configuring helm ee_id: {}, {}".format(ee_id, e),
634 primitive_name=primitive_name,
635 )
636 return "CONFIG OK"
637 else:
638 try:
639 # Execute primitive
640 status, detailed_message = await self._execute_primitive(
641 ip_addr, primitive_name, params_dict, db_dict=db_dict
642 )
643 self.log.debug(
644 "Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
645 primitive_name, ee_id, status, detailed_message
646 )
647 )
648 if status != "OK" and status != "PROCESSING":
649 self.log.error(
650 "Execute primitive {} returned not ok status: {}, message: {}".format(
651 primitive_name, status, detailed_message
652 )
653 )
654 raise N2VCExecutionException(
655 message="Execute primitive {} returned not ok status: {}, message: {}".format(
656 primitive_name, status, detailed_message
657 ),
658 primitive_name=primitive_name,
659 )
660 except Exception as e:
661 self.log.error(
662 "Error executing primitive {}: {}".format(primitive_name, e)
663 )
664 raise N2VCExecutionException(
665 message="Error executing primitive {} into ee={} : {}".format(
666 primitive_name, ee_id, e
667 ),
668 primitive_name=primitive_name,
669 )
670 return detailed_message
671
672 async def deregister_execution_environments(self):
673 # nothing to be done
674 pass
675
676 async def delete_execution_environment(
677 self,
678 ee_id: str,
679 db_dict: dict = None,
680 total_timeout: float = None,
681 **kwargs,
682 ):
683 """
684 Delete an execution environment
685 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
686 :param dict db_dict: where to write into database when the status changes.
687 It contains a dict with
688 {collection: <str>, filter: {}, path: <str>},
689 e.g. {collection: "nsrs", filter:
690 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
691 :param float total_timeout:
692 """
693
694 self.log.info("ee_id: {}".format(ee_id))
695
696 # check arguments
697 if ee_id is None:
698 raise N2VCBadArgumentsException(
699 message="ee_id is mandatory", bad_args=["ee_id"]
700 )
701
702 try:
703 # Obtain cluster_uuid
704 system_cluster_uuid = await self._get_system_cluster_id()
705
706 # Get helm_id
707 version, namespace, helm_id = get_ee_id_parts(ee_id)
708
709 # Uninstall chart, for backward compatibility we must assume that if there is no
710 # version it is helm-v2
711 if version == "helm-v3":
712 await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id)
713 else:
714 await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id)
715 self.log.info("ee_id: {} deleted".format(ee_id))
716 except N2VCException:
717 raise
718 except Exception as e:
719 self.log.error(
720 "Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True
721 )
722 raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e))
723
724 async def delete_namespace(
725 self, namespace: str, db_dict: dict = None, total_timeout: float = None
726 ):
727 # Obtain system cluster id from database
728 system_cluster_uuid = await self._get_system_cluster_id()
729 await self._k8sclusterhelm3.delete_namespace(
730 namespace=namespace,
731 cluster_uuid=system_cluster_uuid,
732 )
733
734 async def install_k8s_proxy_charm(
735 self,
736 charm_name: str,
737 namespace: str,
738 artifact_path: str,
739 db_dict: dict,
740 progress_timeout: float = None,
741 total_timeout: float = None,
742 config: dict = None,
743 *kargs,
744 **kwargs,
745 ) -> str:
746 pass
747
748 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
749 async def _get_ssh_key(self, ip_addr):
750 return await self._execute_primitive_internal(
751 ip_addr,
752 "_get_ssh_key",
753 None,
754 )
755
756 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
757 async def _execute_config_primitive(self, ip_addr, params, db_dict=None):
758 return await self._execute_primitive_internal(
759 ip_addr, "config", params, db_dict=db_dict
760 )
761
762 @retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay")
763 async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None):
764 return await self._execute_primitive_internal(
765 ip_addr, primitive_name, params, db_dict=db_dict
766 )
767
768 async def _execute_primitive_internal(
769 self, ip_addr, primitive_name, params, db_dict=None
770 ):
771 async def execute():
772 stub = FrontendExecutorStub(channel)
773 if primitive_name == "_get_ssh_key":
774 self.log.debug("get ssh key, ip_addr: {}".format(ip_addr))
775 reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest())
776 return reply.message
777 # For any other primitives
778 async with stub.RunPrimitive.open() as stream:
779 primitive_id = str(uuid.uuid1())
780 result = None
781 self.log.debug(
782 "Execute primitive internal: id:{}, name:{}, params: {}".format(
783 primitive_id, primitive_name, params
784 )
785 )
786 await stream.send_message(
787 PrimitiveRequest(
788 id=primitive_id, name=primitive_name, params=yaml.dump(params)
789 ),
790 end=True,
791 )
792 async for reply in stream:
793 self.log.debug("Received reply: {}".format(reply))
794 result = reply
795 # If db_dict provided write notifs in database
796 if db_dict:
797 self._write_op_detailed_status(
798 db_dict, reply.status, reply.detailed_message
799 )
800 if result:
801 return reply.status, reply.detailed_message
802 else:
803 return "ERROR", "No result received"
804
805 ssl_context = create_secure_context(
806 self.vca_config.ca_store,
807 self.vca_config.client_cert_path,
808 self.vca_config.client_key_path,
809 )
810 channel = Channel(
811 ip_addr, self.vca_config.helm_ee_service_port, ssl=ssl_context
812 )
813 try:
814 return await execute()
815 except ssl.SSLError as ssl_error: # fallback to insecure gRPC
816 if (
817 ssl_error.reason == "WRONG_VERSION_NUMBER"
818 and not self.vca_config.eegrpc_tls_enforce
819 ):
820 self.log.debug(
821 "Execution environment doesn't support TLS, falling back to unsecure gRPC"
822 )
823 channel = Channel(ip_addr, self.vca_config.helm_ee_service_port)
824 return await execute()
825 elif ssl_error.reason == "WRONG_VERSION_NUMBER":
826 raise N2VCException(
827 "Execution environment doesn't support TLS, primitives cannot be executed"
828 )
829 else:
830 raise
831 finally:
832 channel.close()
833
834 def _write_op_detailed_status(self, db_dict, status, detailed_message):
835 # write ee_id to database: _admin.deployed.VCA.x
836 try:
837 the_table = db_dict["collection"]
838 the_filter = db_dict["filter"]
839 update_dict = {"detailed-status": "{}: {}".format(status, detailed_message)}
840 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
841 self.db.set_one(
842 table=the_table,
843 q_filter=the_filter,
844 update_dict=update_dict,
845 fail_on_empty=True,
846 )
847 except asyncio.CancelledError:
848 raise
849 except Exception as e:
850 self.log.error("Error writing detailedStatus to database: {}".format(e))
851
852 async def _get_system_cluster_id(self):
853 if not self._system_cluster_id:
854 db_k8cluster = self.db.get_one(
855 "k8sclusters", {"name": self.vca_config.kubectl_osm_cluster_name}
856 )
857 k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id"))
858 if not k8s_hc_id:
859 try:
860 # backward compatibility for existing clusters that have not been initialized for helm v3
861 cluster_id = db_k8cluster.get("_id")
862 k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials"))
863 k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env(
864 k8s_credentials, reuse_cluster_uuid=cluster_id
865 )
866 db_k8scluster_update = {
867 "_admin.helm-chart-v3.error_msg": None,
868 "_admin.helm-chart-v3.id": k8s_hc_id,
869 "_admin.helm-chart-v3}.created": uninstall_sw,
870 "_admin.helm-chart-v3.operationalState": "ENABLED",
871 }
872 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
873 except Exception as e:
874 self.log.error(
875 "error initializing helm-v3 cluster: {}".format(str(e))
876 )
877 raise N2VCException(
878 "K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
879 cluster_id
880 )
881 )
882 self._system_cluster_id = k8s_hc_id
883 return self._system_cluster_id