887c212a8b01cb4f2b95d66c861529b97311cf02
[osm/LCM.git] / osm_lcm / lcm_helm_conn.py
1 ##
2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 ##
18 import functools
19 import yaml
20 import asyncio
21 import uuid
22 import os
23 import ssl
24
25 from grpclib.client import Channel
26
27 from osm_lcm.frontend_pb2 import PrimitiveRequest
28 from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply
29 from osm_lcm.frontend_grpc import FrontendExecutorStub
30 from osm_lcm.lcm_utils import LcmBase, get_ee_id_parts
31
32 from osm_lcm.data_utils.database.database import Database
33 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
34
35 from n2vc.n2vc_conn import N2VCConnector
36 from n2vc.k8s_helm_conn import K8sHelmConnector
37 from n2vc.k8s_helm3_conn import K8sHelm3Connector
38 from n2vc.exceptions import (
39 N2VCBadArgumentsException,
40 N2VCException,
41 N2VCExecutionException,
42 )
43
44 from osm_lcm.lcm_utils import deep_get
45
46 CA_STORE = "/etc/ssl/certs/osm-ca.crt"
47
48
49 def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"):
50 def wrapper(func):
51 retry_exceptions = ConnectionRefusedError
52
53 @functools.wraps(func)
54 async def wrapped(*args, **kwargs):
55 # default values for wait time and delay_time
56 delay_time = 10
57 max_wait_time = 300
58
59 # obtain arguments from variable names
60 self = args[0]
61 if self.__dict__.get(max_wait_time_var):
62 max_wait_time = self.__dict__.get(max_wait_time_var)
63 if self.__dict__.get(delay_time_var):
64 delay_time = self.__dict__.get(delay_time_var)
65
66 wait_time = max_wait_time
67 while wait_time > 0:
68 try:
69 return await func(*args, **kwargs)
70 except retry_exceptions:
71 wait_time = wait_time - delay_time
72 await asyncio.sleep(delay_time)
73 continue
74 else:
75 return ConnectionRefusedError
76
77 return wrapped
78
79 return wrapper
80
81
82 def create_secure_context(
83 trusted: str,
84 ) -> ssl.SSLContext:
85 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
86 ctx.verify_mode = ssl.CERT_REQUIRED
87 ctx.check_hostname = True
88 ctx.minimum_version = ssl.TLSVersion.TLSv1_2
89 # TODO: client TLS
90 # ctx.load_cert_chain(str(client_cert), str(client_key))
91 ctx.load_verify_locations(trusted)
92 ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20")
93 ctx.set_alpn_protocols(["h2"])
94 try:
95 ctx.set_npn_protocols(["h2"])
96 except NotImplementedError:
97 pass
98 return ctx
99
100
101 class LCMHelmConn(N2VCConnector, LcmBase):
102 _KUBECTL_OSM_NAMESPACE = "osm"
103 _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s"
104 _EE_SERVICE_PORT = 50050
105
106 # Initial max retry time
107 _MAX_INITIAL_RETRY_TIME = 600
108 # Max retry time for normal operations
109 _MAX_RETRY_TIME = 30
110 # Time beetween retries, retry time after a connection error is raised
111 _EE_RETRY_DELAY = 10
112
113 def __init__(
114 self,
115 log: object = None,
116 loop: object = None,
117 vca_config: dict = None,
118 on_update_db=None,
119 ):
120 """
121 Initialize EE helm connector.
122 """
123
124 self.db = Database().instance.db
125 self.fs = Filesystem().instance.fs
126
127 # parent class constructor
128 N2VCConnector.__init__(
129 self, log=log, loop=loop, on_update_db=on_update_db, db=self.db, fs=self.fs
130 )
131
132 self.vca_config = vca_config
133 self.log.debug("Initialize helm N2VC connector")
134 self.log.debug("initial vca_config: {}".format(vca_config))
135
136 # TODO - Obtain data from configuration
137 self._ee_service_port = self._EE_SERVICE_PORT
138
139 self._retry_delay = self._EE_RETRY_DELAY
140
141 if self.vca_config and self.vca_config.get("eegrpcinittimeout"):
142 self._initial_retry_time = self.vca_config.get("eegrpcinittimeout")
143 self.log.debug("Initial retry time: {}".format(self._initial_retry_time))
144 else:
145 self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME
146 self.log.debug(
147 "Applied default retry time: {}".format(self._initial_retry_time)
148 )
149
150 if self.vca_config and self.vca_config.get("eegrpctimeout"):
151 self._max_retry_time = self.vca_config.get("eegrpctimeout")
152 self.log.debug("Retry time: {}".format(self._max_retry_time))
153 else:
154 self._max_retry_time = self._MAX_RETRY_TIME
155 self.log.debug(
156 "Applied default retry time: {}".format(self._max_retry_time)
157 )
158
159 if self.vca_config and self.vca_config.get("eegrpc_tls_enforce"):
160 self._tls_enforce = str(
161 self.vca_config.get("eegrpc_tls_enforce")
162 ).lower() in ("true", "1", "yes")
163 else:
164 self._tls_enforce = False
165 self.log.debug("TLS enforce enabled: {}".format(self._tls_enforce))
166
167 # initialize helm connector for helmv2 and helmv3
168 self._k8sclusterhelm2 = K8sHelmConnector(
169 kubectl_command=self.vca_config.get("kubectlpath"),
170 helm_command=self.vca_config.get("helmpath"),
171 fs=self.fs,
172 db=self.db,
173 log=self.log,
174 on_update_db=None,
175 )
176
177 self._k8sclusterhelm3 = K8sHelm3Connector(
178 kubectl_command=self.vca_config.get("kubectlpath"),
179 helm_command=self.vca_config.get("helm3path"),
180 fs=self.fs,
181 log=self.log,
182 db=self.db,
183 on_update_db=None,
184 )
185
186 self._system_cluster_id = None
187 self.log.info("Helm N2VC connector initialized")
188
189 # TODO - ¿reuse_ee_id?
190 async def create_execution_environment(
191 self,
192 namespace: str,
193 db_dict: dict,
194 reuse_ee_id: str = None,
195 progress_timeout: float = None,
196 total_timeout: float = None,
197 config: dict = None,
198 artifact_path: str = None,
199 chart_model: str = None,
200 vca_type: str = None,
201 *kargs,
202 **kwargs,
203 ) -> (str, dict):
204 """
205 Creates a new helm execution environment deploying the helm-chat indicated in the
206 artifact_path
207 :param str namespace: This param is not used, all helm charts are deployed in the osm
208 system namespace
209 :param dict db_dict: where to write to database when the status changes.
210 It contains a dictionary with {collection: str, filter: {}, path: str},
211 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
212 "_admin.deployed.VCA.3"}
213 :param str reuse_ee_id: ee id from an older execution. TODO - right now this param is not used
214 :param float progress_timeout:
215 :param float total_timeout:
216 :param dict config: General variables to instantiate KDU
217 :param str artifact_path: path of package content
218 :param str chart_model: helm chart/reference (string), which can be either
219 of these options:
220 - a name of chart available via the repos known by OSM
221 (e.g. stable/openldap, stable/openldap:1.2.4)
222 - a path to a packaged chart (e.g. mychart.tgz)
223 - a path to an unpacked chart directory or a URL (e.g. mychart)
224 :param str vca_type: Type of vca, must be type helm or helm-v3
225 :returns str, dict: id of the new execution environment including namespace.helm_id
226 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
227 """
228
229 self.log.info(
230 "create_execution_environment: namespace: {}, artifact_path: {}, "
231 "chart_model: {}, db_dict: {}, reuse_ee_id: {}".format(
232 namespace, artifact_path, db_dict, chart_model, reuse_ee_id
233 )
234 )
235
236 # Validate artifact-path is provided
237 if artifact_path is None or len(artifact_path) == 0:
238 raise N2VCBadArgumentsException(
239 message="artifact_path is mandatory", bad_args=["artifact_path"]
240 )
241
242 # Validate artifact-path exists and sync path
243 from_path = os.path.split(artifact_path)[0]
244 self.fs.sync(from_path)
245
246 # remove / in charm path
247 while artifact_path.find("//") >= 0:
248 artifact_path = artifact_path.replace("//", "/")
249
250 # check charm path
251 if self.fs.file_exists(artifact_path):
252 helm_chart_path = artifact_path
253 else:
254 msg = "artifact path does not exist: {}".format(artifact_path)
255 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
256
257 if artifact_path.startswith("/"):
258 full_path = self.fs.path + helm_chart_path
259 else:
260 full_path = self.fs.path + "/" + helm_chart_path
261
262 while full_path.find("//") >= 0:
263 full_path = full_path.replace("//", "/")
264
265 # By default, the KDU is expected to be a file
266 kdu_model = full_path
267 # If the chart_model includes a "/", then it is a reference:
268 # e.g. (stable/openldap; stable/openldap:1.2.4)
269 if chart_model.find("/") >= 0:
270 kdu_model = chart_model
271
272 try:
273 # Call helm conn install
274 # Obtain system cluster id from database
275 system_cluster_uuid = await self._get_system_cluster_id()
276 # Add parameter osm if exist to global
277 if config and config.get("osm"):
278 if not config.get("global"):
279 config["global"] = {}
280 config["global"]["osm"] = config.get("osm")
281
282 self.log.debug("install helm chart: {}".format(full_path))
283 if vca_type == "helm":
284 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
285 db_dict=db_dict,
286 kdu_model=kdu_model,
287 )
288 await self._k8sclusterhelm2.install(
289 system_cluster_uuid,
290 kdu_model=kdu_model,
291 kdu_instance=helm_id,
292 namespace=self._KUBECTL_OSM_NAMESPACE,
293 params=config,
294 db_dict=db_dict,
295 timeout=progress_timeout,
296 )
297 else:
298 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
299 db_dict=db_dict,
300 kdu_model=kdu_model,
301 )
302 await self._k8sclusterhelm3.install(
303 system_cluster_uuid,
304 kdu_model=kdu_model,
305 kdu_instance=helm_id,
306 namespace=self._KUBECTL_OSM_NAMESPACE,
307 params=config,
308 db_dict=db_dict,
309 timeout=progress_timeout,
310 )
311
312 ee_id = "{}:{}.{}".format(vca_type, self._KUBECTL_OSM_NAMESPACE, helm_id)
313 return ee_id, None
314 except N2VCException:
315 raise
316 except Exception as e:
317 self.log.error("Error deploying chart ee: {}".format(e), exc_info=True)
318 raise N2VCException("Error deploying chart ee: {}".format(e))
319
320 async def upgrade_execution_environment(
321 self,
322 namespace: str,
323 db_dict: dict,
324 helm_id: str,
325 progress_timeout: float = None,
326 total_timeout: float = None,
327 config: dict = None,
328 artifact_path: str = None,
329 vca_type: str = None,
330 *kargs,
331 **kwargs,
332 ) -> (str, dict):
333 """
334 Creates a new helm execution environment deploying the helm-chat indicated in the
335 attifact_path
336 :param str namespace: This param is not used, all helm charts are deployed in the osm
337 system namespace
338 :param dict db_dict: where to write to database when the status changes.
339 It contains a dictionary with {collection: str, filter: {}, path: str},
340 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
341 "_admin.deployed.VCA.3"}
342 :param helm_id: unique name of the Helm release to upgrade
343 :param float progress_timeout:
344 :param float total_timeout:
345 :param dict config: General variables to instantiate KDU
346 :param str artifact_path: path of package content
347 :param str vca_type: Type of vca, must be type helm or helm-v3
348 :returns str, dict: id of the new execution environment including namespace.helm_id
349 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
350 """
351
352 self.log.info(
353 "upgrade_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
354 )
355
356 # Validate helm_id is provided
357 if helm_id is None or len(helm_id) == 0:
358 raise N2VCBadArgumentsException(
359 message="helm_id is mandatory", bad_args=["helm_id"]
360 )
361
362 # Validate artifact-path is provided
363 if artifact_path is None or len(artifact_path) == 0:
364 raise N2VCBadArgumentsException(
365 message="artifact_path is mandatory", bad_args=["artifact_path"]
366 )
367
368 # Validate artifact-path exists and sync path
369 from_path = os.path.split(artifact_path)[0]
370 self.fs.sync(from_path)
371
372 # remove / in charm path
373 while artifact_path.find("//") >= 0:
374 artifact_path = artifact_path.replace("//", "/")
375
376 # check charm path
377 if self.fs.file_exists(artifact_path):
378 helm_chart_path = artifact_path
379 else:
380 msg = "artifact path does not exist: {}".format(artifact_path)
381 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
382
383 if artifact_path.startswith("/"):
384 full_path = self.fs.path + helm_chart_path
385 else:
386 full_path = self.fs.path + "/" + helm_chart_path
387
388 while full_path.find("//") >= 0:
389 full_path = full_path.replace("//", "/")
390
391 try:
392 # Call helm conn upgrade
393 # Obtain system cluster id from database
394 system_cluster_uuid = await self._get_system_cluster_id()
395 # Add parameter osm if exist to global
396 if config and config.get("osm"):
397 if not config.get("global"):
398 config["global"] = {}
399 config["global"]["osm"] = config.get("osm")
400
401 self.log.debug("Ugrade helm chart: {}".format(full_path))
402 if vca_type == "helm":
403 await self._k8sclusterhelm2.upgrade(
404 system_cluster_uuid,
405 kdu_model=full_path,
406 kdu_instance=helm_id,
407 namespace=namespace,
408 params=config,
409 db_dict=db_dict,
410 timeout=progress_timeout,
411 force=True,
412 )
413 else:
414 await self._k8sclusterhelm3.upgrade(
415 system_cluster_uuid,
416 kdu_model=full_path,
417 kdu_instance=helm_id,
418 namespace=namespace,
419 params=config,
420 db_dict=db_dict,
421 timeout=progress_timeout,
422 force=True,
423 )
424
425 except N2VCException:
426 raise
427 except Exception as e:
428 self.log.error("Error upgrading chart ee: {}".format(e), exc_info=True)
429 raise N2VCException("Error upgrading chart ee: {}".format(e))
430
431 async def create_tls_certificate(
432 self,
433 nsr_id: str,
434 secret_name: str,
435 usage: str,
436 dns_prefix: str,
437 namespace: str = _KUBECTL_OSM_NAMESPACE,
438 ):
439 # Obtain system cluster id from database
440 system_cluster_uuid = await self._get_system_cluster_id()
441 # use helm-v3 as certificates don't depend on helm version
442 await self._k8sclusterhelm3.create_certificate(
443 cluster_uuid=system_cluster_uuid,
444 namespace=namespace,
445 dns_prefix=dns_prefix,
446 name=nsr_id,
447 secret_name=secret_name,
448 usage=usage,
449 )
450
451 async def delete_tls_certificate(
452 self,
453 certificate_name: str = None,
454 namespace: str = _KUBECTL_OSM_NAMESPACE,
455 ):
456 # Obtain system cluster id from database
457 system_cluster_uuid = await self._get_system_cluster_id()
458 await self._k8sclusterhelm3.delete_certificate(
459 cluster_uuid=system_cluster_uuid,
460 namespace=namespace,
461 certificate_name=certificate_name,
462 )
463
464 async def register_execution_environment(
465 self,
466 namespace: str,
467 credentials: dict,
468 db_dict: dict,
469 progress_timeout: float = None,
470 total_timeout: float = None,
471 *kargs,
472 **kwargs,
473 ) -> str:
474 # nothing to do
475 pass
476
477 async def install_configuration_sw(self, *args, **kwargs):
478 # nothing to do
479 pass
480
481 async def add_relation(self, *args, **kwargs):
482 # nothing to do
483 pass
484
485 async def remove_relation(self):
486 # nothing to to
487 pass
488
489 async def get_status(self, *args, **kwargs):
490 # not used for this connector
491 pass
492
493 async def get_ee_ssh_public__key(
494 self,
495 ee_id: str,
496 db_dict: dict,
497 progress_timeout: float = None,
498 total_timeout: float = None,
499 **kwargs,
500 ) -> str:
501 """
502 Obtains ssh-public key from ee executing GetSShKey method from the ee.
503
504 :param str ee_id: the id of the execution environment returned by
505 create_execution_environment or register_execution_environment
506 :param dict db_dict:
507 :param float progress_timeout:
508 :param float total_timeout:
509 :returns: public key of the execution environment
510 """
511
512 self.log.info(
513 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id, db_dict)
514 )
515
516 # check arguments
517 if ee_id is None or len(ee_id) == 0:
518 raise N2VCBadArgumentsException(
519 message="ee_id is mandatory", bad_args=["ee_id"]
520 )
521
522 try:
523 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
524 version, namespace, helm_id = get_ee_id_parts(ee_id)
525 ip_addr = "{}.{}.svc".format(helm_id, namespace)
526 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
527 # install libraries and start successfully
528 ssh_key = await self._get_ssh_key(ip_addr)
529 return ssh_key
530 except Exception as e:
531 self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True)
532 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e))
533
534 async def upgrade_charm(
535 self,
536 ee_id: str = None,
537 path: str = None,
538 charm_id: str = None,
539 charm_type: str = None,
540 timeout: float = None,
541 ) -> str:
542 """This method upgrade charms in VNFs
543
544 This method does not support KDU's deployed with Helm.
545
546 Args:
547 ee_id: Execution environment id
548 path: Local path to the charm
549 charm_id: charm-id
550 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
551 timeout: (Float) Timeout for the ns update operation
552
553 Returns:
554 the output of the update operation if status equals to "completed"
555
556 """
557 raise N2VCException("KDUs deployed with Helm do not support charm upgrade")
558
559 async def exec_primitive(
560 self,
561 ee_id: str,
562 primitive_name: str,
563 params_dict: dict,
564 db_dict: dict = None,
565 progress_timeout: float = None,
566 total_timeout: float = None,
567 **kwargs,
568 ) -> str:
569 """
570 Execute a primitive in the execution environment
571
572 :param str ee_id: the one returned by create_execution_environment or
573 register_execution_environment with the format namespace.helm_id
574 :param str primitive_name: must be one defined in the software. There is one
575 called 'config', where, for the proxy case, the 'credentials' of VM are
576 provided
577 :param dict params_dict: parameters of the action
578 :param dict db_dict: where to write into database when the status changes.
579 It contains a dict with
580 {collection: <str>, filter: {}, path: <str>},
581 e.g. {collection: "nslcmops", filter:
582 {_id: <nslcmop_id>, path: "_admin.VCA"}
583 It will be used to store information about intermediate notifications
584 :param float progress_timeout:
585 :param float total_timeout:
586 :returns str: primitive result, if ok. It raises exceptions in case of fail
587 """
588
589 self.log.info(
590 "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
591 ee_id, primitive_name, params_dict, db_dict
592 )
593 )
594
595 # check arguments
596 if ee_id is None or len(ee_id) == 0:
597 raise N2VCBadArgumentsException(
598 message="ee_id is mandatory", bad_args=["ee_id"]
599 )
600 if primitive_name is None or len(primitive_name) == 0:
601 raise N2VCBadArgumentsException(
602 message="action_name is mandatory", bad_args=["action_name"]
603 )
604 if params_dict is None:
605 params_dict = dict()
606
607 try:
608 version, namespace, helm_id = get_ee_id_parts(ee_id)
609 ip_addr = "{}.{}.svc".format(helm_id, namespace)
610 except Exception as e:
611 self.log.error("Error getting ee ip ee: {}".format(e))
612 raise N2VCException("Error getting ee ip ee: {}".format(e))
613
614 if primitive_name == "config":
615 try:
616 # Execute config primitive, higher timeout to check the case ee is starting
617 status, detailed_message = await self._execute_config_primitive(
618 ip_addr, params_dict, db_dict=db_dict
619 )
620 self.log.debug(
621 "Executed config primitive ee_id_ {}, status: {}, message: {}".format(
622 ee_id, status, detailed_message
623 )
624 )
625 if status != "OK":
626 self.log.error(
627 "Error configuring helm ee, status: {}, message: {}".format(
628 status, detailed_message
629 )
630 )
631 raise N2VCExecutionException(
632 message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
633 ee_id, status, detailed_message
634 ),
635 primitive_name=primitive_name,
636 )
637 except Exception as e:
638 self.log.error("Error configuring helm ee: {}".format(e))
639 raise N2VCExecutionException(
640 message="Error configuring helm ee_id: {}, {}".format(ee_id, e),
641 primitive_name=primitive_name,
642 )
643 return "CONFIG OK"
644 else:
645 try:
646 # Execute primitive
647 status, detailed_message = await self._execute_primitive(
648 ip_addr, primitive_name, params_dict, db_dict=db_dict
649 )
650 self.log.debug(
651 "Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
652 primitive_name, ee_id, status, detailed_message
653 )
654 )
655 if status != "OK" and status != "PROCESSING":
656 self.log.error(
657 "Execute primitive {} returned not ok status: {}, message: {}".format(
658 primitive_name, status, detailed_message
659 )
660 )
661 raise N2VCExecutionException(
662 message="Execute primitive {} returned not ok status: {}, message: {}".format(
663 primitive_name, status, detailed_message
664 ),
665 primitive_name=primitive_name,
666 )
667 except Exception as e:
668 self.log.error(
669 "Error executing primitive {}: {}".format(primitive_name, e)
670 )
671 raise N2VCExecutionException(
672 message="Error executing primitive {} into ee={} : {}".format(
673 primitive_name, ee_id, e
674 ),
675 primitive_name=primitive_name,
676 )
677 return detailed_message
678
679 async def deregister_execution_environments(self):
680 # nothing to be done
681 pass
682
683 async def delete_execution_environment(
684 self,
685 ee_id: str,
686 db_dict: dict = None,
687 total_timeout: float = None,
688 **kwargs,
689 ):
690 """
691 Delete an execution environment
692 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
693 :param dict db_dict: where to write into database when the status changes.
694 It contains a dict with
695 {collection: <str>, filter: {}, path: <str>},
696 e.g. {collection: "nsrs", filter:
697 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
698 :param float total_timeout:
699 """
700
701 self.log.info("ee_id: {}".format(ee_id))
702
703 # check arguments
704 if ee_id is None:
705 raise N2VCBadArgumentsException(
706 message="ee_id is mandatory", bad_args=["ee_id"]
707 )
708
709 try:
710
711 # Obtain cluster_uuid
712 system_cluster_uuid = await self._get_system_cluster_id()
713
714 # Get helm_id
715 version, namespace, helm_id = get_ee_id_parts(ee_id)
716
717 # Uninstall chart, for backward compatibility we must assume that if there is no
718 # version it is helm-v2
719 if version == "helm-v3":
720 await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id)
721 else:
722 await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id)
723 self.log.info("ee_id: {} deleted".format(ee_id))
724 except N2VCException:
725 raise
726 except Exception as e:
727 self.log.error(
728 "Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True
729 )
730 raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e))
731
732 async def delete_namespace(
733 self, namespace: str, db_dict: dict = None, total_timeout: float = None
734 ):
735 # method not implemented for this connector, execution environments must be deleted individually
736 pass
737
738 async def install_k8s_proxy_charm(
739 self,
740 charm_name: str,
741 namespace: str,
742 artifact_path: str,
743 db_dict: dict,
744 progress_timeout: float = None,
745 total_timeout: float = None,
746 config: dict = None,
747 *kargs,
748 **kwargs,
749 ) -> str:
750 pass
751
752 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
753 async def _get_ssh_key(self, ip_addr):
754 return await self._execute_primitive_internal(
755 ip_addr,
756 "_get_ssh_key",
757 None,
758 )
759
760 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
761 async def _execute_config_primitive(self, ip_addr, params, db_dict=None):
762 return await self._execute_primitive_internal(
763 ip_addr, "config", params, db_dict=db_dict
764 )
765
766 @retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay")
767 async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None):
768 return await self._execute_primitive_internal(
769 ip_addr, primitive_name, params, db_dict=db_dict
770 )
771
772 async def _execute_primitive_internal(
773 self, ip_addr, primitive_name, params, db_dict=None
774 ):
775 async def execute():
776 stub = FrontendExecutorStub(channel)
777 if primitive_name == "_get_ssh_key":
778 self.log.debug("get ssh key, ip_addr: {}".format(ip_addr))
779 reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest())
780 return reply.message
781 # For any other primitives
782 async with stub.RunPrimitive.open() as stream:
783 primitive_id = str(uuid.uuid1())
784 result = None
785 self.log.debug(
786 "Execute primitive internal: id:{}, name:{}, params: {}".format(
787 primitive_id, primitive_name, params
788 )
789 )
790 await stream.send_message(
791 PrimitiveRequest(
792 id=primitive_id, name=primitive_name, params=yaml.dump(params)
793 ),
794 end=True,
795 )
796 async for reply in stream:
797 self.log.debug("Received reply: {}".format(reply))
798 result = reply
799 # If db_dict provided write notifs in database
800 if db_dict:
801 self._write_op_detailed_status(
802 db_dict, reply.status, reply.detailed_message
803 )
804 if result:
805 return reply.status, reply.detailed_message
806 else:
807 return "ERROR", "No result received"
808
809 ssl_context = create_secure_context(CA_STORE)
810 channel = Channel(ip_addr, self._ee_service_port, ssl=ssl_context)
811 try:
812 return await execute()
813 except ssl.SSLError as ssl_error: # fallback to insecure gRPC
814 if ssl_error.reason == "WRONG_VERSION_NUMBER" and not self._tls_enforce:
815 self.log.debug(
816 "Execution environment doesn't support TLS, falling back to unsecure gRPC"
817 )
818 channel = Channel(ip_addr, self._ee_service_port)
819 return await execute()
820 elif ssl_error.reason == "WRONG_VERSION_NUMBER":
821 raise N2VCException(
822 "Execution environment doesn't support TLS, primitives cannot be executed"
823 )
824 else:
825 raise
826 finally:
827 channel.close()
828
829 def _write_op_detailed_status(self, db_dict, status, detailed_message):
830
831 # write ee_id to database: _admin.deployed.VCA.x
832 try:
833 the_table = db_dict["collection"]
834 the_filter = db_dict["filter"]
835 update_dict = {"detailed-status": "{}: {}".format(status, detailed_message)}
836 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
837 self.db.set_one(
838 table=the_table,
839 q_filter=the_filter,
840 update_dict=update_dict,
841 fail_on_empty=True,
842 )
843 except asyncio.CancelledError:
844 raise
845 except Exception as e:
846 self.log.error("Error writing detailedStatus to database: {}".format(e))
847
848 async def _get_system_cluster_id(self):
849 if not self._system_cluster_id:
850 db_k8cluster = self.db.get_one(
851 "k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME}
852 )
853 k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id"))
854 if not k8s_hc_id:
855 try:
856 # backward compatibility for existing clusters that have not been initialized for helm v3
857 cluster_id = db_k8cluster.get("_id")
858 k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials"))
859 k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env(
860 k8s_credentials, reuse_cluster_uuid=cluster_id
861 )
862 db_k8scluster_update = {
863 "_admin.helm-chart-v3.error_msg": None,
864 "_admin.helm-chart-v3.id": k8s_hc_id,
865 "_admin.helm-chart-v3}.created": uninstall_sw,
866 "_admin.helm-chart-v3.operationalState": "ENABLED",
867 }
868 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
869 except Exception as e:
870 self.log.error(
871 "error initializing helm-v3 cluster: {}".format(str(e))
872 )
873 raise N2VCException(
874 "K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
875 cluster_id
876 )
877 )
878 self._system_cluster_id = k8s_hc_id
879 return self._system_cluster_id