Feature 10956: Implement upgrade of helm based EEs
[osm/LCM.git] / osm_lcm / lcm_helm_conn.py
1 ##
2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 ##
18 import functools
19 import yaml
20 import asyncio
21 import socket
22 import uuid
23 import os
24
25 from grpclib.client import Channel
26
27 from osm_lcm.frontend_pb2 import PrimitiveRequest
28 from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply
29 from osm_lcm.frontend_grpc import FrontendExecutorStub
30 from osm_lcm.lcm_utils import LcmBase, get_ee_id_parts
31
32 from osm_lcm.data_utils.database.database import Database
33 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
34
35 from n2vc.n2vc_conn import N2VCConnector
36 from n2vc.k8s_helm_conn import K8sHelmConnector
37 from n2vc.k8s_helm3_conn import K8sHelm3Connector
38 from n2vc.exceptions import (
39 N2VCBadArgumentsException,
40 N2VCException,
41 N2VCExecutionException,
42 )
43
44 from osm_lcm.lcm_utils import deep_get
45
46
47 def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"):
48 def wrapper(func):
49 retry_exceptions = ConnectionRefusedError
50
51 @functools.wraps(func)
52 async def wrapped(*args, **kwargs):
53 # default values for wait time and delay_time
54 delay_time = 10
55 max_wait_time = 300
56
57 # obtain arguments from variable names
58 self = args[0]
59 if self.__dict__.get(max_wait_time_var):
60 max_wait_time = self.__dict__.get(max_wait_time_var)
61 if self.__dict__.get(delay_time_var):
62 delay_time = self.__dict__.get(delay_time_var)
63
64 wait_time = max_wait_time
65 while wait_time > 0:
66 try:
67 return await func(*args, **kwargs)
68 except retry_exceptions:
69 wait_time = wait_time - delay_time
70 await asyncio.sleep(delay_time)
71 continue
72 else:
73 return ConnectionRefusedError
74
75 return wrapped
76
77 return wrapper
78
79
80 class LCMHelmConn(N2VCConnector, LcmBase):
81 _KUBECTL_OSM_NAMESPACE = "osm"
82 _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s"
83 _EE_SERVICE_PORT = 50050
84
85 # Initial max retry time
86 _MAX_INITIAL_RETRY_TIME = 600
87 # Max retry time for normal operations
88 _MAX_RETRY_TIME = 30
89 # Time beetween retries, retry time after a connection error is raised
90 _EE_RETRY_DELAY = 10
91
92 def __init__(
93 self,
94 log: object = None,
95 loop: object = None,
96 vca_config: dict = None,
97 on_update_db=None,
98 ):
99 """
100 Initialize EE helm connector.
101 """
102
103 self.db = Database().instance.db
104 self.fs = Filesystem().instance.fs
105
106 # parent class constructor
107 N2VCConnector.__init__(
108 self, log=log, loop=loop, on_update_db=on_update_db, db=self.db, fs=self.fs
109 )
110
111 self.vca_config = vca_config
112 self.log.debug("Initialize helm N2VC connector")
113 self.log.debug("initial vca_config: {}".format(vca_config))
114
115 # TODO - Obtain data from configuration
116 self._ee_service_port = self._EE_SERVICE_PORT
117
118 self._retry_delay = self._EE_RETRY_DELAY
119
120 if self.vca_config and self.vca_config.get("eegrpcinittimeout"):
121 self._initial_retry_time = self.vca_config.get("eegrpcinittimeout")
122 self.log.debug("Initial retry time: {}".format(self._initial_retry_time))
123 else:
124 self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME
125 self.log.debug(
126 "Applied default retry time: {}".format(self._initial_retry_time)
127 )
128
129 if self.vca_config and self.vca_config.get("eegrpctimeout"):
130 self._max_retry_time = self.vca_config.get("eegrpctimeout")
131 self.log.debug("Retry time: {}".format(self._max_retry_time))
132 else:
133 self._max_retry_time = self._MAX_RETRY_TIME
134 self.log.debug(
135 "Applied default retry time: {}".format(self._max_retry_time)
136 )
137
138 # initialize helm connector for helmv2 and helmv3
139 self._k8sclusterhelm2 = K8sHelmConnector(
140 kubectl_command=self.vca_config.get("kubectlpath"),
141 helm_command=self.vca_config.get("helmpath"),
142 fs=self.fs,
143 db=self.db,
144 log=self.log,
145 on_update_db=None,
146 )
147
148 self._k8sclusterhelm3 = K8sHelm3Connector(
149 kubectl_command=self.vca_config.get("kubectlpath"),
150 helm_command=self.vca_config.get("helm3path"),
151 fs=self.fs,
152 log=self.log,
153 db=self.db,
154 on_update_db=None,
155 )
156
157 self._system_cluster_id = None
158 self.log.info("Helm N2VC connector initialized")
159
160 # TODO - ¿reuse_ee_id?
161 async def create_execution_environment(
162 self,
163 namespace: str,
164 db_dict: dict,
165 reuse_ee_id: str = None,
166 progress_timeout: float = None,
167 total_timeout: float = None,
168 config: dict = None,
169 artifact_path: str = None,
170 chart_model: str = None,
171 vca_type: str = None,
172 *kargs,
173 **kwargs,
174 ) -> (str, dict):
175 """
176 Creates a new helm execution environment deploying the helm-chat indicated in the
177 artifact_path
178 :param str namespace: This param is not used, all helm charts are deployed in the osm
179 system namespace
180 :param dict db_dict: where to write to database when the status changes.
181 It contains a dictionary with {collection: str, filter: {}, path: str},
182 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
183 "_admin.deployed.VCA.3"}
184 :param str reuse_ee_id: ee id from an older execution. TODO - right now this param is not used
185 :param float progress_timeout:
186 :param float total_timeout:
187 :param dict config: General variables to instantiate KDU
188 :param str artifact_path: path of package content
189 :param str chart_model: helm chart/reference (string), which can be either
190 of these options:
191 - a name of chart available via the repos known by OSM
192 (e.g. stable/openldap, stable/openldap:1.2.4)
193 - a path to a packaged chart (e.g. mychart.tgz)
194 - a path to an unpacked chart directory or a URL (e.g. mychart)
195 :param str vca_type: Type of vca, must be type helm or helm-v3
196 :returns str, dict: id of the new execution environment including namespace.helm_id
197 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
198 """
199
200 self.log.info(
201 "create_execution_environment: namespace: {}, artifact_path: {}, "
202 "chart_model: {}, db_dict: {}, reuse_ee_id: {}".format(
203 namespace, artifact_path, db_dict, chart_model, reuse_ee_id
204 )
205 )
206
207 # Validate artifact-path is provided
208 if artifact_path is None or len(artifact_path) == 0:
209 raise N2VCBadArgumentsException(
210 message="artifact_path is mandatory", bad_args=["artifact_path"]
211 )
212
213 # Validate artifact-path exists and sync path
214 from_path = os.path.split(artifact_path)[0]
215 self.fs.sync(from_path)
216
217 # remove / in charm path
218 while artifact_path.find("//") >= 0:
219 artifact_path = artifact_path.replace("//", "/")
220
221 # check charm path
222 if self.fs.file_exists(artifact_path):
223 helm_chart_path = artifact_path
224 else:
225 msg = "artifact path does not exist: {}".format(artifact_path)
226 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
227
228 if artifact_path.startswith("/"):
229 full_path = self.fs.path + helm_chart_path
230 else:
231 full_path = self.fs.path + "/" + helm_chart_path
232
233 while full_path.find("//") >= 0:
234 full_path = full_path.replace("//", "/")
235
236 # By default, the KDU is expected to be a file
237 kdu_model = full_path
238 # If the chart_model includes a "/", then it is a reference:
239 # e.g. (stable/openldap; stable/openldap:1.2.4)
240 if chart_model.find("/") >= 0:
241 kdu_model = chart_model
242
243 try:
244 # Call helm conn install
245 # Obtain system cluster id from database
246 system_cluster_uuid = await self._get_system_cluster_id()
247 # Add parameter osm if exist to global
248 if config and config.get("osm"):
249 if not config.get("global"):
250 config["global"] = {}
251 config["global"]["osm"] = config.get("osm")
252
253 self.log.debug("install helm chart: {}".format(full_path))
254 if vca_type == "helm":
255 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
256 db_dict=db_dict,
257 kdu_model=kdu_model,
258 )
259 await self._k8sclusterhelm2.install(
260 system_cluster_uuid,
261 kdu_model=kdu_model,
262 kdu_instance=helm_id,
263 namespace=self._KUBECTL_OSM_NAMESPACE,
264 params=config,
265 db_dict=db_dict,
266 timeout=progress_timeout,
267 )
268 else:
269 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
270 db_dict=db_dict,
271 kdu_model=kdu_model,
272 )
273 await self._k8sclusterhelm3.install(
274 system_cluster_uuid,
275 kdu_model=kdu_model,
276 kdu_instance=helm_id,
277 namespace=self._KUBECTL_OSM_NAMESPACE,
278 params=config,
279 db_dict=db_dict,
280 timeout=progress_timeout,
281 )
282
283 ee_id = "{}:{}.{}".format(vca_type, self._KUBECTL_OSM_NAMESPACE, helm_id)
284 return ee_id, None
285 except N2VCException:
286 raise
287 except Exception as e:
288 self.log.error("Error deploying chart ee: {}".format(e), exc_info=True)
289 raise N2VCException("Error deploying chart ee: {}".format(e))
290
291 async def upgrade_execution_environment(
292 self,
293 namespace: str,
294 db_dict: dict,
295 helm_id: str,
296 progress_timeout: float = None,
297 total_timeout: float = None,
298 config: dict = None,
299 artifact_path: str = None,
300 vca_type: str = None,
301 *kargs,
302 **kwargs,
303 ) -> (str, dict):
304 """
305 Creates a new helm execution environment deploying the helm-chat indicated in the
306 attifact_path
307 :param str namespace: This param is not used, all helm charts are deployed in the osm
308 system namespace
309 :param dict db_dict: where to write to database when the status changes.
310 It contains a dictionary with {collection: str, filter: {}, path: str},
311 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
312 "_admin.deployed.VCA.3"}
313 :param helm_id: unique name of the Helm release to upgrade
314 :param float progress_timeout:
315 :param float total_timeout:
316 :param dict config: General variables to instantiate KDU
317 :param str artifact_path: path of package content
318 :param str vca_type: Type of vca, must be type helm or helm-v3
319 :returns str, dict: id of the new execution environment including namespace.helm_id
320 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
321 """
322
323 self.log.info(
324 "upgrade_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
325 )
326
327 # Validate helm_id is provided
328 if helm_id is None or len(helm_id) == 0:
329 raise N2VCBadArgumentsException(
330 message="helm_id is mandatory", bad_args=["helm_id"]
331 )
332
333 # Validate artifact-path is provided
334 if artifact_path is None or len(artifact_path) == 0:
335 raise N2VCBadArgumentsException(
336 message="artifact_path is mandatory", bad_args=["artifact_path"]
337 )
338
339 # Validate artifact-path exists and sync path
340 from_path = os.path.split(artifact_path)[0]
341 self.fs.sync(from_path)
342
343 # remove / in charm path
344 while artifact_path.find("//") >= 0:
345 artifact_path = artifact_path.replace("//", "/")
346
347 # check charm path
348 if self.fs.file_exists(artifact_path):
349 helm_chart_path = artifact_path
350 else:
351 msg = "artifact path does not exist: {}".format(artifact_path)
352 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
353
354 if artifact_path.startswith("/"):
355 full_path = self.fs.path + helm_chart_path
356 else:
357 full_path = self.fs.path + "/" + helm_chart_path
358
359 while full_path.find("//") >= 0:
360 full_path = full_path.replace("//", "/")
361
362 try:
363 # Call helm conn upgrade
364 # Obtain system cluster id from database
365 system_cluster_uuid = await self._get_system_cluster_id()
366 # Add parameter osm if exist to global
367 if config and config.get("osm"):
368 if not config.get("global"):
369 config["global"] = {}
370 config["global"]["osm"] = config.get("osm")
371
372 self.log.debug("Ugrade helm chart: {}".format(full_path))
373 if vca_type == "helm":
374 await self._k8sclusterhelm2.upgrade(
375 system_cluster_uuid,
376 kdu_model=full_path,
377 kdu_instance=helm_id,
378 namespace=namespace,
379 params=config,
380 db_dict=db_dict,
381 timeout=progress_timeout,
382 force=True,
383 )
384 else:
385 await self._k8sclusterhelm3.upgrade(
386 system_cluster_uuid,
387 kdu_model=full_path,
388 kdu_instance=helm_id,
389 namespace=namespace,
390 params=config,
391 db_dict=db_dict,
392 timeout=progress_timeout,
393 force=True,
394 )
395
396 except N2VCException:
397 raise
398 except Exception as e:
399 self.log.error("Error upgrading chart ee: {}".format(e), exc_info=True)
400 raise N2VCException("Error upgrading chart ee: {}".format(e))
401
402 async def register_execution_environment(
403 self,
404 namespace: str,
405 credentials: dict,
406 db_dict: dict,
407 progress_timeout: float = None,
408 total_timeout: float = None,
409 *kargs,
410 **kwargs,
411 ) -> str:
412 # nothing to do
413 pass
414
415 async def install_configuration_sw(self, *args, **kwargs):
416 # nothing to do
417 pass
418
419 async def add_relation(self, *args, **kwargs):
420 # nothing to do
421 pass
422
423 async def remove_relation(self):
424 # nothing to to
425 pass
426
427 async def get_status(self, *args, **kwargs):
428 # not used for this connector
429 pass
430
431 async def get_ee_ssh_public__key(
432 self,
433 ee_id: str,
434 db_dict: dict,
435 progress_timeout: float = None,
436 total_timeout: float = None,
437 **kwargs,
438 ) -> str:
439 """
440 Obtains ssh-public key from ee executing GetSShKey method from the ee.
441
442 :param str ee_id: the id of the execution environment returned by
443 create_execution_environment or register_execution_environment
444 :param dict db_dict:
445 :param float progress_timeout:
446 :param float total_timeout:
447 :returns: public key of the execution environment
448 """
449
450 self.log.info(
451 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id, db_dict)
452 )
453
454 # check arguments
455 if ee_id is None or len(ee_id) == 0:
456 raise N2VCBadArgumentsException(
457 message="ee_id is mandatory", bad_args=["ee_id"]
458 )
459
460 try:
461 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
462 version, namespace, helm_id = get_ee_id_parts(ee_id)
463 ip_addr = socket.gethostbyname(helm_id)
464
465 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
466 # install libraries and start successfully
467 ssh_key = await self._get_ssh_key(ip_addr)
468 return ssh_key
469 except Exception as e:
470 self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True)
471 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e))
472
473 async def upgrade_charm(
474 self,
475 ee_id: str = None,
476 path: str = None,
477 charm_id: str = None,
478 charm_type: str = None,
479 timeout: float = None,
480 ) -> str:
481 """This method upgrade charms in VNFs
482
483 This method does not support KDU's deployed with Helm.
484
485 Args:
486 ee_id: Execution environment id
487 path: Local path to the charm
488 charm_id: charm-id
489 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
490 timeout: (Float) Timeout for the ns update operation
491
492 Returns:
493 the output of the update operation if status equals to "completed"
494
495 """
496 raise N2VCException("KDUs deployed with Helm do not support charm upgrade")
497
498 async def exec_primitive(
499 self,
500 ee_id: str,
501 primitive_name: str,
502 params_dict: dict,
503 db_dict: dict = None,
504 progress_timeout: float = None,
505 total_timeout: float = None,
506 **kwargs,
507 ) -> str:
508 """
509 Execute a primitive in the execution environment
510
511 :param str ee_id: the one returned by create_execution_environment or
512 register_execution_environment with the format namespace.helm_id
513 :param str primitive_name: must be one defined in the software. There is one
514 called 'config', where, for the proxy case, the 'credentials' of VM are
515 provided
516 :param dict params_dict: parameters of the action
517 :param dict db_dict: where to write into database when the status changes.
518 It contains a dict with
519 {collection: <str>, filter: {}, path: <str>},
520 e.g. {collection: "nslcmops", filter:
521 {_id: <nslcmop_id>, path: "_admin.VCA"}
522 It will be used to store information about intermediate notifications
523 :param float progress_timeout:
524 :param float total_timeout:
525 :returns str: primitive result, if ok. It raises exceptions in case of fail
526 """
527
528 self.log.info(
529 "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
530 ee_id, primitive_name, params_dict, db_dict
531 )
532 )
533
534 # check arguments
535 if ee_id is None or len(ee_id) == 0:
536 raise N2VCBadArgumentsException(
537 message="ee_id is mandatory", bad_args=["ee_id"]
538 )
539 if primitive_name is None or len(primitive_name) == 0:
540 raise N2VCBadArgumentsException(
541 message="action_name is mandatory", bad_args=["action_name"]
542 )
543 if params_dict is None:
544 params_dict = dict()
545
546 try:
547 version, namespace, helm_id = get_ee_id_parts(ee_id)
548 ip_addr = socket.gethostbyname(helm_id)
549 except Exception as e:
550 self.log.error("Error getting ee ip ee: {}".format(e))
551 raise N2VCException("Error getting ee ip ee: {}".format(e))
552
553 if primitive_name == "config":
554 try:
555 # Execute config primitive, higher timeout to check the case ee is starting
556 status, detailed_message = await self._execute_config_primitive(
557 ip_addr, params_dict, db_dict=db_dict
558 )
559 self.log.debug(
560 "Executed config primitive ee_id_ {}, status: {}, message: {}".format(
561 ee_id, status, detailed_message
562 )
563 )
564 if status != "OK":
565 self.log.error(
566 "Error configuring helm ee, status: {}, message: {}".format(
567 status, detailed_message
568 )
569 )
570 raise N2VCExecutionException(
571 message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
572 ee_id, status, detailed_message
573 ),
574 primitive_name=primitive_name,
575 )
576 except Exception as e:
577 self.log.error("Error configuring helm ee: {}".format(e))
578 raise N2VCExecutionException(
579 message="Error configuring helm ee_id: {}, {}".format(ee_id, e),
580 primitive_name=primitive_name,
581 )
582 return "CONFIG OK"
583 else:
584 try:
585 # Execute primitive
586 status, detailed_message = await self._execute_primitive(
587 ip_addr, primitive_name, params_dict, db_dict=db_dict
588 )
589 self.log.debug(
590 "Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
591 primitive_name, ee_id, status, detailed_message
592 )
593 )
594 if status != "OK" and status != "PROCESSING":
595 self.log.error(
596 "Execute primitive {} returned not ok status: {}, message: {}".format(
597 primitive_name, status, detailed_message
598 )
599 )
600 raise N2VCExecutionException(
601 message="Execute primitive {} returned not ok status: {}, message: {}".format(
602 primitive_name, status, detailed_message
603 ),
604 primitive_name=primitive_name,
605 )
606 except Exception as e:
607 self.log.error(
608 "Error executing primitive {}: {}".format(primitive_name, e)
609 )
610 raise N2VCExecutionException(
611 message="Error executing primitive {} into ee={} : {}".format(
612 primitive_name, ee_id, e
613 ),
614 primitive_name=primitive_name,
615 )
616 return detailed_message
617
618 async def deregister_execution_environments(self):
619 # nothing to be done
620 pass
621
622 async def delete_execution_environment(
623 self,
624 ee_id: str,
625 db_dict: dict = None,
626 total_timeout: float = None,
627 **kwargs,
628 ):
629 """
630 Delete an execution environment
631 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
632 :param dict db_dict: where to write into database when the status changes.
633 It contains a dict with
634 {collection: <str>, filter: {}, path: <str>},
635 e.g. {collection: "nsrs", filter:
636 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
637 :param float total_timeout:
638 """
639
640 self.log.info("ee_id: {}".format(ee_id))
641
642 # check arguments
643 if ee_id is None:
644 raise N2VCBadArgumentsException(
645 message="ee_id is mandatory", bad_args=["ee_id"]
646 )
647
648 try:
649
650 # Obtain cluster_uuid
651 system_cluster_uuid = await self._get_system_cluster_id()
652
653 # Get helm_id
654 version, namespace, helm_id = get_ee_id_parts(ee_id)
655
656 # Uninstall chart, for backward compatibility we must assume that if there is no
657 # version it is helm-v2
658 if version == "helm-v3":
659 await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id)
660 else:
661 await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id)
662 self.log.info("ee_id: {} deleted".format(ee_id))
663 except N2VCException:
664 raise
665 except Exception as e:
666 self.log.error(
667 "Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True
668 )
669 raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e))
670
671 async def delete_namespace(
672 self, namespace: str, db_dict: dict = None, total_timeout: float = None
673 ):
674 # method not implemented for this connector, execution environments must be deleted individually
675 pass
676
677 async def install_k8s_proxy_charm(
678 self,
679 charm_name: str,
680 namespace: str,
681 artifact_path: str,
682 db_dict: dict,
683 progress_timeout: float = None,
684 total_timeout: float = None,
685 config: dict = None,
686 *kargs,
687 **kwargs,
688 ) -> str:
689 pass
690
691 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
692 async def _get_ssh_key(self, ip_addr):
693 channel = Channel(ip_addr, self._ee_service_port)
694 try:
695 stub = FrontendExecutorStub(channel)
696 self.log.debug("get ssh key, ip_addr: {}".format(ip_addr))
697 reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest())
698 return reply.message
699 finally:
700 channel.close()
701
702 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
703 async def _execute_config_primitive(self, ip_addr, params, db_dict=None):
704 return await self._execute_primitive_internal(
705 ip_addr, "config", params, db_dict=db_dict
706 )
707
708 @retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay")
709 async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None):
710 return await self._execute_primitive_internal(
711 ip_addr, primitive_name, params, db_dict=db_dict
712 )
713
714 async def _execute_primitive_internal(
715 self, ip_addr, primitive_name, params, db_dict=None
716 ):
717
718 channel = Channel(ip_addr, self._ee_service_port)
719 try:
720 stub = FrontendExecutorStub(channel)
721 async with stub.RunPrimitive.open() as stream:
722 primitive_id = str(uuid.uuid1())
723 result = None
724 self.log.debug(
725 "Execute primitive internal: id:{}, name:{}, params: {}".format(
726 primitive_id, primitive_name, params
727 )
728 )
729 await stream.send_message(
730 PrimitiveRequest(
731 id=primitive_id, name=primitive_name, params=yaml.dump(params)
732 ),
733 end=True,
734 )
735 async for reply in stream:
736 self.log.debug("Received reply: {}".format(reply))
737 result = reply
738 # If db_dict provided write notifs in database
739 if db_dict:
740 self._write_op_detailed_status(
741 db_dict, reply.status, reply.detailed_message
742 )
743 if result:
744 return reply.status, reply.detailed_message
745 else:
746 return "ERROR", "No result received"
747 finally:
748 channel.close()
749
750 def _write_op_detailed_status(self, db_dict, status, detailed_message):
751
752 # write ee_id to database: _admin.deployed.VCA.x
753 try:
754 the_table = db_dict["collection"]
755 the_filter = db_dict["filter"]
756 update_dict = {"detailed-status": "{}: {}".format(status, detailed_message)}
757 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
758 self.db.set_one(
759 table=the_table,
760 q_filter=the_filter,
761 update_dict=update_dict,
762 fail_on_empty=True,
763 )
764 except asyncio.CancelledError:
765 raise
766 except Exception as e:
767 self.log.error("Error writing detailedStatus to database: {}".format(e))
768
769 async def _get_system_cluster_id(self):
770 if not self._system_cluster_id:
771 db_k8cluster = self.db.get_one(
772 "k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME}
773 )
774 k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id"))
775 if not k8s_hc_id:
776 try:
777 # backward compatibility for existing clusters that have not been initialized for helm v3
778 cluster_id = db_k8cluster.get("_id")
779 k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials"))
780 k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env(
781 k8s_credentials, reuse_cluster_uuid=cluster_id
782 )
783 db_k8scluster_update = {
784 "_admin.helm-chart-v3.error_msg": None,
785 "_admin.helm-chart-v3.id": k8s_hc_id,
786 "_admin.helm-chart-v3}.created": uninstall_sw,
787 "_admin.helm-chart-v3.operationalState": "ENABLED",
788 }
789 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
790 except Exception as e:
791 self.log.error(
792 "error initializing helm-v3 cluster: {}".format(str(e))
793 )
794 raise N2VCException(
795 "K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
796 cluster_id
797 )
798 )
799 self._system_cluster_id = k8s_hc_id
800 return self._system_cluster_id