Coverity-CWE 22: Improper Limitation of a Pathname to a Restricted Directory ('Path...
[osm/LCM.git] / osm_lcm /
1 ##
2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 ##
18 import functools
19 import yaml
20 import asyncio
21 import uuid
22 import os
23 import ssl
25 from grpclib.client import Channel
27 from osm_lcm.data_utils.lcm_config import VcaConfig
28 from osm_lcm.frontend_pb2 import PrimitiveRequest
29 from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply
30 from osm_lcm.frontend_grpc import FrontendExecutorStub
31 from osm_lcm.lcm_utils import LcmBase, get_ee_id_parts
33 from osm_lcm.data_utils.database.database import Database
34 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
36 from n2vc.n2vc_conn import N2VCConnector
37 from n2vc.k8s_helm3_conn import K8sHelm3Connector
38 from n2vc.exceptions import (
39 N2VCBadArgumentsException,
40 N2VCException,
41 N2VCExecutionException,
42 )
44 from osm_lcm.lcm_utils import deep_get
47 def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"):
48 def wrapper(func):
49 retry_exceptions = (ConnectionRefusedError, TimeoutError)
51 @functools.wraps(func)
52 async def wrapped(*args, **kwargs):
53 # default values for wait time and delay_time
54 delay_time = 10
55 max_wait_time = 300
57 # obtain arguments from variable names
58 self = args[0]
59 if self.__dict__.get(max_wait_time_var):
60 max_wait_time = self.__dict__.get(max_wait_time_var)
61 if self.__dict__.get(delay_time_var):
62 delay_time = self.__dict__.get(delay_time_var)
64 wait_time = max_wait_time
65 while wait_time > 0:
66 try:
67 return await func(*args, **kwargs)
68 except retry_exceptions:
69 wait_time = wait_time - delay_time
70 await asyncio.sleep(delay_time)
71 continue
72 else:
73 return ConnectionRefusedError
75 return wrapped
77 return wrapper
80 def create_secure_context(
81 trusted: str, client_cert_path: str, client_key_path: str
82 ) -> ssl.SSLContext:
83 ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
84 ctx.verify_mode = ssl.CERT_REQUIRED
85 ctx.check_hostname = True
86 ctx.minimum_version = ssl.TLSVersion.TLSv1_2
87 ctx.load_cert_chain(client_cert_path, client_key_path)
88 ctx.load_verify_locations(trusted)
90 ctx.set_alpn_protocols(["h2"])
91 return ctx
94 class LCMHelmConn(N2VCConnector, LcmBase):
95 def __init__(
96 self,
97 log: object = None,
98 vca_config: VcaConfig = None,
99 on_update_db=None,
100 ):
101 """
102 Initialize EE helm connector.
103 """
105 self.db = Database().instance.db
106 self.fs = Filesystem().instance.fs
108 # parent class constructor
109 N2VCConnector.__init__(
110 self, log=log, on_update_db=on_update_db, db=self.db, fs=self.fs
111 )
113 self.vca_config = vca_config
114 self.log.debug("Initialize helm N2VC connector")
115 self.log.debug("initial vca_config: {}".format(vca_config.to_dict()))
117 self._retry_delay = self.vca_config.helm_ee_retry_delay
119 self._initial_retry_time = self.vca_config.helm_max_initial_retry_time
120 self.log.debug("Initial retry time: {}".format(self._initial_retry_time))
122 self._max_retry_time = self.vca_config.helm_max_retry_time
123 self.log.debug("Retry time: {}".format(self._max_retry_time))
125 # initialize helm connector for helmv3
126 self._k8sclusterhelm3 = K8sHelm3Connector(
127 kubectl_command=self.vca_config.kubectlpath,
128 helm_command=self.vca_config.helm3path,
129 fs=self.fs,
130 log=self.log,
131 db=self.db,
132 on_update_db=None,
133 )
135 self._system_cluster_id = None
136"Helm N2VC connector initialized")
138 # TODO - ¿reuse_ee_id?
139 async def create_execution_environment(
140 self,
141 namespace: str,
142 db_dict: dict,
143 reuse_ee_id: str = None,
144 progress_timeout: float = None,
145 total_timeout: float = None,
146 config: dict = None,
147 artifact_path: str = None,
148 chart_model: str = None,
149 vca_type: str = None,
150 *kargs,
151 **kwargs,
152 ) -> (str, dict):
153 """
154 Creates a new helm execution environment deploying the helm-chat indicated in the
155 artifact_path
156 :param str namespace: This param is not used, all helm charts are deployed in the osm
157 system namespace
158 :param dict db_dict: where to write to database when the status changes.
159 It contains a dictionary with {collection: str, filter: {}, path: str},
160 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
161 "_admin.deployed.VCA.3"}
162 :param str reuse_ee_id: ee id from an older execution. TODO - right now this param is not used
163 :param float progress_timeout:
164 :param float total_timeout:
165 :param dict config: General variables to instantiate KDU
166 :param str artifact_path: path of package content
167 :param str chart_model: helm chart/reference (string), which can be either
168 of these options:
169 - a name of chart available via the repos known by OSM
170 (e.g. stable/openldap, stable/openldap:1.2.4)
171 - a path to a packaged chart (e.g. mychart.tgz)
172 - a path to an unpacked chart directory or a URL (e.g. mychart)
173 :param str vca_type: Type of vca, must be type helm-v3
174 :returns str, dict: id of the new execution environment including namespace.helm_id
175 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
176 """
178 if not namespace:
179 namespace = self.vca_config.kubectl_osm_namespace
182 "create_execution_environment: namespace: {}, artifact_path: {}, "
183 "chart_model: {}, db_dict: {}, reuse_ee_id: {}".format(
184 namespace, artifact_path, db_dict, chart_model, reuse_ee_id
185 )
186 )
188 # Validate artifact-path is provided
189 if artifact_path is None or len(artifact_path) == 0:
190 raise N2VCBadArgumentsException(
191 message="artifact_path is mandatory", bad_args=["artifact_path"]
192 )
194 # Validate artifact-path exists and sync path
195 from_path = os.path.split(artifact_path)[0]
196 self.fs.sync(from_path)
198 # remove / in charm path
199 while artifact_path.find("//") >= 0:
200 artifact_path = artifact_path.replace("//", "/")
202 # check charm path
203 if self.fs.file_exists(artifact_path):
204 helm_chart_path = artifact_path
205 else:
206 msg = "artifact path does not exist: {}".format(artifact_path)
207 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
209 if artifact_path.startswith("/"):
210 full_path = self.fs.path + helm_chart_path
211 else:
212 full_path = self.fs.path + "/" + helm_chart_path
214 while full_path.find("//") >= 0:
215 full_path = full_path.replace("//", "/")
217 # By default, the KDU is expected to be a file
218 kdu_model = full_path
219 # If the chart_model includes a "/", then it is a reference:
220 # e.g. (stable/openldap; stable/openldap:1.2.4)
221 if chart_model.find("/") >= 0:
222 kdu_model = chart_model
224 try:
225 # Call helm conn install
226 # Obtain system cluster id from database
227 system_cluster_uuid = await self._get_system_cluster_id()
228 # Add parameter osm if exist to global
229 if config and config.get("osm"):
230 if not config.get("global"):
231 config["global"] = {}
232 config["global"]["osm"] = config.get("osm")
234 self.log.debug("install helm chart: {}".format(full_path))
235 helm_id = self._k8sclusterhelm3.generate_kdu_instance_name(
236 db_dict=db_dict,
237 kdu_model=kdu_model,
238 )
239 await self._k8sclusterhelm3.install(
240 system_cluster_uuid,
241 kdu_model=kdu_model,
242 kdu_instance=helm_id,
243 namespace=namespace,
244 params=config,
245 db_dict=db_dict,
246 timeout=progress_timeout,
247 )
249 ee_id = "{}:{}.{}".format(vca_type, namespace, helm_id)
250 return ee_id, None
251 except N2VCException:
252 raise
253 except Exception as e:
254 self.log.error("Error deploying chart ee: {}".format(e), exc_info=True)
255 raise N2VCException("Error deploying chart ee: {}".format(e))
257 async def upgrade_execution_environment(
258 self,
259 namespace: str,
260 db_dict: dict,
261 helm_id: str,
262 progress_timeout: float = None,
263 total_timeout: float = None,
264 config: dict = None,
265 artifact_path: str = None,
266 vca_type: str = None,
267 *kargs,
268 **kwargs,
269 ) -> (str, dict):
270 """
271 Creates a new helm execution environment deploying the helm-chat indicated in the
272 attifact_path
273 :param str namespace: This param is not used, all helm charts are deployed in the osm
274 system namespace
275 :param dict db_dict: where to write to database when the status changes.
276 It contains a dictionary with {collection: str, filter: {}, path: str},
277 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
278 "_admin.deployed.VCA.3"}
279 :param helm_id: unique name of the Helm release to upgrade
280 :param float progress_timeout:
281 :param float total_timeout:
282 :param dict config: General variables to instantiate KDU
283 :param str artifact_path: path of package content
284 :param str vca_type: Type of vca, must be type helm-v3
285 :returns str, dict: id of the new execution environment including namespace.helm_id
286 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
287 """
290 "upgrade_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
291 )
293 # Validate helm_id is provided
294 if helm_id is None or len(helm_id) == 0:
295 raise N2VCBadArgumentsException(
296 message="helm_id is mandatory", bad_args=["helm_id"]
297 )
299 # Validate artifact-path is provided
300 if artifact_path is None or len(artifact_path) == 0:
301 raise N2VCBadArgumentsException(
302 message="artifact_path is mandatory", bad_args=["artifact_path"]
303 )
305 # Validate artifact-path exists and sync path
306 from_path = os.path.split(artifact_path)[0]
307 self.fs.sync(from_path)
309 # remove / in charm path
310 while artifact_path.find("//") >= 0:
311 artifact_path = artifact_path.replace("//", "/")
313 # check charm path
314 if self.fs.file_exists(artifact_path):
315 helm_chart_path = artifact_path
316 else:
317 msg = "artifact path does not exist: {}".format(artifact_path)
318 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
320 if artifact_path.startswith("/"):
321 full_path = self.fs.path + helm_chart_path
322 else:
323 full_path = self.fs.path + "/" + helm_chart_path
325 while full_path.find("//") >= 0:
326 full_path = full_path.replace("//", "/")
328 try:
329 # Call helm conn upgrade
330 # Obtain system cluster id from database
331 system_cluster_uuid = await self._get_system_cluster_id()
332 # Add parameter osm if exist to global
333 if config and config.get("osm"):
334 if not config.get("global"):
335 config["global"] = {}
336 config["global"]["osm"] = config.get("osm")
338 self.log.debug("Ugrade helm chart: {}".format(full_path))
339 await self._k8sclusterhelm3.upgrade(
340 system_cluster_uuid,
341 kdu_model=full_path,
342 kdu_instance=helm_id,
343 namespace=namespace,
344 params=config,
345 db_dict=db_dict,
346 timeout=progress_timeout,
347 force=True,
348 )
350 except N2VCException:
351 raise
352 except Exception as e:
353 self.log.error("Error upgrading chart ee: {}".format(e), exc_info=True)
354 raise N2VCException("Error upgrading chart ee: {}".format(e))
356 async def create_tls_certificate(
357 self,
358 nsr_id: str,
359 secret_name: str,
360 usage: str,
361 dns_prefix: str,
362 namespace: str = None,
363 ):
364 # Obtain system cluster id from database
365 system_cluster_uuid = await self._get_system_cluster_id()
366 # use helm-v3 as certificates don't depend on helm version
367 await self._k8sclusterhelm3.create_certificate(
368 cluster_uuid=system_cluster_uuid,
369 namespace=namespace or self.vca_config.kubectl_osm_namespace,
370 dns_prefix=dns_prefix,
371 name=nsr_id,
372 secret_name=secret_name,
373 usage=usage,
374 )
376 async def delete_tls_certificate(
377 self,
378 certificate_name: str = None,
379 namespace: str = None,
380 ):
381 # Obtain system cluster id from database
382 system_cluster_uuid = await self._get_system_cluster_id()
383 await self._k8sclusterhelm3.delete_certificate(
384 cluster_uuid=system_cluster_uuid,
385 namespace=namespace or self.vca_config.kubectl_osm_namespace,
386 certificate_name=certificate_name,
387 )
389 async def setup_ns_namespace(
390 self,
391 name: str,
392 ):
393 # Obtain system cluster id from database
394 system_cluster_uuid = await self._get_system_cluster_id()
395 await self._k8sclusterhelm3.create_namespace(
396 namespace=name,
397 cluster_uuid=system_cluster_uuid,
398 labels={
399 "": self.vca_config.eegrpc_pod_admission_policy
400 },
401 )
402 await self._k8sclusterhelm3.setup_default_rbac(
403 name="ee-role",
404 namespace=name,
405 api_groups=[""],
406 resources=["secrets"],
407 verbs=["get"],
408 service_account="default",
409 cluster_uuid=system_cluster_uuid,
410 )
411 await self._k8sclusterhelm3.copy_secret_data(
412 src_secret="osm-ca",
413 dst_secret="osm-ca",
414 src_namespace=self.vca_config.kubectl_osm_namespace,
415 dst_namespace=name,
416 cluster_uuid=system_cluster_uuid,
417 data_key="ca.crt",
418 )
420 async def register_execution_environment(
421 self,
422 namespace: str,
423 credentials: dict,
424 db_dict: dict,
425 progress_timeout: float = None,
426 total_timeout: float = None,
427 *kargs,
428 **kwargs,
429 ) -> str:
430 # nothing to do
431 pass
433 async def install_configuration_sw(self, *args, **kwargs):
434 # nothing to do
435 pass
437 async def add_relation(self, *args, **kwargs):
438 # nothing to do
439 pass
441 async def remove_relation(self):
442 # nothing to to
443 pass
445 async def get_status(self, *args, **kwargs):
446 # not used for this connector
447 pass
449 async def get_ee_ssh_public__key(
450 self,
451 ee_id: str,
452 db_dict: dict,
453 progress_timeout: float = None,
454 total_timeout: float = None,
455 **kwargs,
456 ) -> str:
457 """
458 Obtains ssh-public key from ee executing GetSShKey method from the ee.
460 :param str ee_id: the id of the execution environment returned by
461 create_execution_environment or register_execution_environment
462 :param dict db_dict:
463 :param float progress_timeout:
464 :param float total_timeout:
465 :returns: public key of the execution environment
466 """
469 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id, db_dict)
470 )
472 # check arguments
473 if ee_id is None or len(ee_id) == 0:
474 raise N2VCBadArgumentsException(
475 message="ee_id is mandatory", bad_args=["ee_id"]
476 )
478 try:
479 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
480 version, namespace, helm_id = get_ee_id_parts(ee_id)
481 ip_addr = "{}.{}.svc".format(helm_id, namespace)
482 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
483 # install libraries and start successfully
484 ssh_key = await self._get_ssh_key(ip_addr)
485 return ssh_key
486 except Exception as e:
487 self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True)
488 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e))
490 async def upgrade_charm(
491 self,
492 ee_id: str = None,
493 path: str = None,
494 charm_id: str = None,
495 charm_type: str = None,
496 timeout: float = None,
497 ) -> str:
498 """This method upgrade charms in VNFs
500 This method does not support KDU's deployed with Helm.
502 Args:
503 ee_id: Execution environment id
504 path: Local path to the charm
505 charm_id: charm-id
506 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
507 timeout: (Float) Timeout for the ns update operation
509 Returns:
510 the output of the update operation if status equals to "completed"
512 """
513 raise N2VCException("KDUs deployed with Helm do not support charm upgrade")
515 async def exec_primitive(
516 self,
517 ee_id: str,
518 primitive_name: str,
519 params_dict: dict,
520 db_dict: dict = None,
521 progress_timeout: float = None,
522 total_timeout: float = None,
523 **kwargs,
524 ) -> str:
525 """
526 Execute a primitive in the execution environment
528 :param str ee_id: the one returned by create_execution_environment or
529 register_execution_environment with the format namespace.helm_id
530 :param str primitive_name: must be one defined in the software. There is one
531 called 'config', where, for the proxy case, the 'credentials' of VM are
532 provided
533 :param dict params_dict: parameters of the action
534 :param dict db_dict: where to write into database when the status changes.
535 It contains a dict with
536 {collection: <str>, filter: {}, path: <str>},
537 e.g. {collection: "nslcmops", filter:
538 {_id: <nslcmop_id>, path: "_admin.VCA"}
539 It will be used to store information about intermediate notifications
540 :param float progress_timeout:
541 :param float total_timeout:
542 :returns str: primitive result, if ok. It raises exceptions in case of fail
543 """
546 "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
547 ee_id, primitive_name, params_dict, db_dict
548 )
549 )
551 # check arguments
552 if ee_id is None or len(ee_id) == 0:
553 raise N2VCBadArgumentsException(
554 message="ee_id is mandatory", bad_args=["ee_id"]
555 )
556 if primitive_name is None or len(primitive_name) == 0:
557 raise N2VCBadArgumentsException(
558 message="action_name is mandatory", bad_args=["action_name"]
559 )
560 if params_dict is None:
561 params_dict = dict()
563 try:
564 version, namespace, helm_id = get_ee_id_parts(ee_id)
565 ip_addr = "{}.{}.svc".format(helm_id, namespace)
566 except Exception as e:
567 self.log.error("Error getting ee ip ee: {}".format(e))
568 raise N2VCException("Error getting ee ip ee: {}".format(e))
570 if primitive_name == "config":
571 try:
572 # Execute config primitive, higher timeout to check the case ee is starting
573 status, detailed_message = await self._execute_config_primitive(
574 ip_addr, params_dict, db_dict=db_dict
575 )
576 self.log.debug(
577 "Executed config primitive ee_id_ {}, status: {}, message: {}".format(
578 ee_id, status, detailed_message
579 )
580 )
581 if status != "OK":
582 self.log.error(
583 "Error configuring helm ee, status: {}, message: {}".format(
584 status, detailed_message
585 )
586 )
587 raise N2VCExecutionException(
588 message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
589 ee_id, status, detailed_message
590 ),
591 primitive_name=primitive_name,
592 )
593 except Exception as e:
594 self.log.error("Error configuring helm ee: {}".format(e))
595 raise N2VCExecutionException(
596 message="Error configuring helm ee_id: {}, {}".format(ee_id, e),
597 primitive_name=primitive_name,
598 )
599 return "CONFIG OK"
600 else:
601 try:
602 # Execute primitive
603 status, detailed_message = await self._execute_primitive(
604 ip_addr, primitive_name, params_dict, db_dict=db_dict
605 )
606 self.log.debug(
607 "Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
608 primitive_name, ee_id, status, detailed_message
609 )
610 )
611 if status != "OK" and status != "PROCESSING":
612 self.log.error(
613 "Execute primitive {} returned not ok status: {}, message: {}".format(
614 primitive_name, status, detailed_message
615 )
616 )
617 raise N2VCExecutionException(
618 message="Execute primitive {} returned not ok status: {}, message: {}".format(
619 primitive_name, status, detailed_message
620 ),
621 primitive_name=primitive_name,
622 )
623 except Exception as e:
624 self.log.error(
625 "Error executing primitive {}: {}".format(primitive_name, e)
626 )
627 raise N2VCExecutionException(
628 message="Error executing primitive {} into ee={} : {}".format(
629 primitive_name, ee_id, e
630 ),
631 primitive_name=primitive_name,
632 )
633 return detailed_message
635 async def deregister_execution_environments(self):
636 # nothing to be done
637 pass
639 async def delete_execution_environment(
640 self,
641 ee_id: str,
642 db_dict: dict = None,
643 total_timeout: float = None,
644 **kwargs,
645 ):
646 """
647 Delete an execution environment
648 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
649 :param dict db_dict: where to write into database when the status changes.
650 It contains a dict with
651 {collection: <str>, filter: {}, path: <str>},
652 e.g. {collection: "nsrs", filter:
653 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
654 :param float total_timeout:
655 """
657"ee_id: {}".format(ee_id))
659 # check arguments
660 if ee_id is None:
661 raise N2VCBadArgumentsException(
662 message="ee_id is mandatory", bad_args=["ee_id"]
663 )
665 try:
666 # Obtain cluster_uuid
667 system_cluster_uuid = await self._get_system_cluster_id()
669 # Get helm_id
670 version, namespace, helm_id = get_ee_id_parts(ee_id)
672 await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id)
673"ee_id: {} deleted".format(ee_id))
674 except N2VCException:
675 raise
676 except Exception as e:
677 self.log.error(
678 "Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True
679 )
680 raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e))
682 async def delete_namespace(
683 self, namespace: str, db_dict: dict = None, total_timeout: float = None
684 ):
685 # Obtain system cluster id from database
686 system_cluster_uuid = await self._get_system_cluster_id()
687 await self._k8sclusterhelm3.delete_namespace(
688 namespace=namespace,
689 cluster_uuid=system_cluster_uuid,
690 )
692 async def install_k8s_proxy_charm(
693 self,
694 charm_name: str,
695 namespace: str,
696 artifact_path: str,
697 db_dict: dict,
698 progress_timeout: float = None,
699 total_timeout: float = None,
700 config: dict = None,
701 *kargs,
702 **kwargs,
703 ) -> str:
704 pass
706 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
707 async def _get_ssh_key(self, ip_addr):
708 return await self._execute_primitive_internal(
709 ip_addr,
710 "_get_ssh_key",
711 None,
712 )
714 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
715 async def _execute_config_primitive(self, ip_addr, params, db_dict=None):
716 return await self._execute_primitive_internal(
717 ip_addr, "config", params, db_dict=db_dict
718 )
720 @retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay")
721 async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None):
722 return await self._execute_primitive_internal(
723 ip_addr, primitive_name, params, db_dict=db_dict
724 )
726 async def _execute_primitive_internal(
727 self, ip_addr, primitive_name, params, db_dict=None
728 ):
729 async def execute():
730 stub = FrontendExecutorStub(channel)
731 if primitive_name == "_get_ssh_key":
732 self.log.debug("get ssh key, ip_addr: {}".format(ip_addr))
733 reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest())
734 return reply.message
735 # For any other primitives
736 async with as stream:
737 primitive_id = str(uuid.uuid1())
738 result = None
739 self.log.debug(
740 "Execute primitive internal: id:{}, name:{}, params: {}".format(
741 primitive_id, primitive_name, params
742 )
743 )
744 await stream.send_message(
745 PrimitiveRequest(
746 id=primitive_id, name=primitive_name, params=yaml.dump(params)
747 ),
748 end=True,
749 )
750 async for reply in stream:
751 self.log.debug("Received reply: {}".format(reply))
752 result = reply
753 # If db_dict provided write notifs in database
754 if db_dict:
755 self._write_op_detailed_status(
756 db_dict, reply.status, reply.detailed_message
757 )
758 if result:
759 return reply.status, reply.detailed_message
760 else:
761 return "ERROR", "No result received"
763 ssl_context = create_secure_context(
764 self.vca_config.ca_store,
765 self.vca_config.client_cert_path,
766 self.vca_config.client_key_path,
767 )
768 channel = Channel(
769 ip_addr, self.vca_config.helm_ee_service_port, ssl=ssl_context
770 )
771 try:
772 return await execute()
773 except ssl.SSLError as ssl_error: # fallback to insecure gRPC
774 if (
775 ssl_error.reason == "WRONG_VERSION_NUMBER"
776 and not self.vca_config.eegrpc_tls_enforce
777 ):
778 self.log.debug(
779 "Execution environment doesn't support TLS, falling back to unsecure gRPC"
780 )
781 channel = Channel(ip_addr, self.vca_config.helm_ee_service_port)
782 return await execute()
783 elif ssl_error.reason == "WRONG_VERSION_NUMBER":
784 raise N2VCException(
785 "Execution environment doesn't support TLS, primitives cannot be executed"
786 )
787 else:
788 raise
789 finally:
790 channel.close()
792 def _write_op_detailed_status(self, db_dict, status, detailed_message):
793 # write ee_id to database: _admin.deployed.VCA.x
794 try:
795 the_table = db_dict["collection"]
796 the_filter = db_dict["filter"]
797 update_dict = {"detailed-status": "{}: {}".format(status, detailed_message)}
798 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
799 self.db.set_one(
800 table=the_table,
801 q_filter=the_filter,
802 update_dict=update_dict,
803 fail_on_empty=True,
804 )
805 except asyncio.CancelledError:
806 raise
807 except Exception as e:
808 self.log.error("Error writing detailedStatus to database: {}".format(e))
810 async def _get_system_cluster_id(self):
811 if not self._system_cluster_id:
812 db_k8cluster = self.db.get_one(
813 "k8sclusters", {"name": self.vca_config.kubectl_osm_cluster_name}
814 )
815 k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id"))
816 if not k8s_hc_id:
817 try:
818 # backward compatibility for existing clusters that have not been initialized for helm v3
819 cluster_id = db_k8cluster.get("_id")
820 k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials"))
821 k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env(
822 k8s_credentials, reuse_cluster_uuid=cluster_id
823 )
824 db_k8scluster_update = {
825 "_admin.helm-chart-v3.error_msg": None,
826 "": k8s_hc_id,
827 "_admin.helm-chart-v3}.created": uninstall_sw,
828 "_admin.helm-chart-v3.operationalState": "ENABLED",
829 }
830 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
831 except Exception as e:
832 self.log.error(
833 "error initializing helm-v3 cluster: {}".format(str(e))
834 )
835 raise N2VCException(
836 "K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
837 cluster_id
838 )
839 )
840 self._system_cluster_id = k8s_hc_id
841 return self._system_cluster_id