2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
25 from grpclib
.client
import Channel
27 from osm_lcm
.data_utils
.lcm_config
import VcaConfig
28 from osm_lcm
.frontend_pb2
import PrimitiveRequest
29 from osm_lcm
.frontend_pb2
import SshKeyRequest
, SshKeyReply
30 from osm_lcm
.frontend_grpc
import FrontendExecutorStub
31 from osm_lcm
.lcm_utils
import LcmBase
, get_ee_id_parts
33 from osm_lcm
.data_utils
.database
.database
import Database
34 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
36 from n2vc
.n2vc_conn
import N2VCConnector
37 from n2vc
.k8s_helm_conn
import K8sHelmConnector
38 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
39 from n2vc
.exceptions
import (
40 N2VCBadArgumentsException
,
42 N2VCExecutionException
,
45 from osm_lcm
.lcm_utils
import deep_get
48 def retryer(max_wait_time_var
="_initial_retry_time", delay_time_var
="_retry_delay"):
50 retry_exceptions
= (ConnectionRefusedError
, TimeoutError
)
52 @functools.wraps(func
)
53 async def wrapped(*args
, **kwargs
):
54 # default values for wait time and delay_time
58 # obtain arguments from variable names
60 if self
.__dict
__.get(max_wait_time_var
):
61 max_wait_time
= self
.__dict
__.get(max_wait_time_var
)
62 if self
.__dict
__.get(delay_time_var
):
63 delay_time
= self
.__dict
__.get(delay_time_var
)
65 wait_time
= max_wait_time
68 return await func(*args
, **kwargs
)
69 except retry_exceptions
:
70 wait_time
= wait_time
- delay_time
71 await asyncio
.sleep(delay_time
)
74 return ConnectionRefusedError
81 def create_secure_context(
82 trusted
: str, client_cert_path
: str, client_key_path
: str
84 ctx
= ssl
.SSLContext(ssl
.PROTOCOL_TLS_CLIENT
)
85 ctx
.verify_mode
= ssl
.CERT_REQUIRED
86 ctx
.check_hostname
= True
87 ctx
.minimum_version
= ssl
.TLSVersion
.TLSv1_2
88 ctx
.load_cert_chain(client_cert_path
, client_key_path
)
89 ctx
.load_verify_locations(trusted
)
90 ctx
.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20")
91 ctx
.set_alpn_protocols(["h2"])
95 class LCMHelmConn(N2VCConnector
, LcmBase
):
99 vca_config
: VcaConfig
= None,
103 Initialize EE helm connector.
106 self
.db
= Database().instance
.db
107 self
.fs
= Filesystem().instance
.fs
109 # parent class constructor
110 N2VCConnector
.__init
__(
111 self
, log
=log
, on_update_db
=on_update_db
, db
=self
.db
, fs
=self
.fs
114 self
.vca_config
= vca_config
115 self
.log
.debug("Initialize helm N2VC connector")
116 self
.log
.debug("initial vca_config: {}".format(vca_config
.to_dict()))
118 self
._retry
_delay
= self
.vca_config
.helm_ee_retry_delay
120 self
._initial
_retry
_time
= self
.vca_config
.helm_max_initial_retry_time
121 self
.log
.debug("Initial retry time: {}".format(self
._initial
_retry
_time
))
123 self
._max
_retry
_time
= self
.vca_config
.helm_max_retry_time
124 self
.log
.debug("Retry time: {}".format(self
._max
_retry
_time
))
126 # initialize helm connector for helmv2 and helmv3
127 self
._k
8sclusterhelm
2 = K8sHelmConnector(
128 kubectl_command
=self
.vca_config
.kubectlpath
,
129 helm_command
=self
.vca_config
.helmpath
,
136 self
._k
8sclusterhelm
3 = K8sHelm3Connector(
137 kubectl_command
=self
.vca_config
.kubectlpath
,
138 helm_command
=self
.vca_config
.helm3path
,
145 self
._system
_cluster
_id
= None
146 self
.log
.info("Helm N2VC connector initialized")
148 # TODO - ¿reuse_ee_id?
149 async def create_execution_environment(
153 reuse_ee_id
: str = None,
154 progress_timeout
: float = None,
155 total_timeout
: float = None,
157 artifact_path
: str = None,
158 chart_model
: str = None,
159 vca_type
: str = None,
164 Creates a new helm execution environment deploying the helm-chat indicated in the
166 :param str namespace: This param is not used, all helm charts are deployed in the osm
168 :param dict db_dict: where to write to database when the status changes.
169 It contains a dictionary with {collection: str, filter: {}, path: str},
170 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
171 "_admin.deployed.VCA.3"}
172 :param str reuse_ee_id: ee id from an older execution. TODO - right now this param is not used
173 :param float progress_timeout:
174 :param float total_timeout:
175 :param dict config: General variables to instantiate KDU
176 :param str artifact_path: path of package content
177 :param str chart_model: helm chart/reference (string), which can be either
179 - a name of chart available via the repos known by OSM
180 (e.g. stable/openldap, stable/openldap:1.2.4)
181 - a path to a packaged chart (e.g. mychart.tgz)
182 - a path to an unpacked chart directory or a URL (e.g. mychart)
183 :param str vca_type: Type of vca, must be type helm or helm-v3
184 :returns str, dict: id of the new execution environment including namespace.helm_id
185 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
189 namespace
= self
.vca_config
.kubectl_osm_namespace
192 "create_execution_environment: namespace: {}, artifact_path: {}, "
193 "chart_model: {}, db_dict: {}, reuse_ee_id: {}".format(
194 namespace
, artifact_path
, db_dict
, chart_model
, reuse_ee_id
198 # Validate artifact-path is provided
199 if artifact_path
is None or len(artifact_path
) == 0:
200 raise N2VCBadArgumentsException(
201 message
="artifact_path is mandatory", bad_args
=["artifact_path"]
204 # Validate artifact-path exists and sync path
205 from_path
= os
.path
.split(artifact_path
)[0]
206 self
.fs
.sync(from_path
)
208 # remove / in charm path
209 while artifact_path
.find("//") >= 0:
210 artifact_path
= artifact_path
.replace("//", "/")
213 if self
.fs
.file_exists(artifact_path
):
214 helm_chart_path
= artifact_path
216 msg
= "artifact path does not exist: {}".format(artifact_path
)
217 raise N2VCBadArgumentsException(message
=msg
, bad_args
=["artifact_path"])
219 if artifact_path
.startswith("/"):
220 full_path
= self
.fs
.path
+ helm_chart_path
222 full_path
= self
.fs
.path
+ "/" + helm_chart_path
224 while full_path
.find("//") >= 0:
225 full_path
= full_path
.replace("//", "/")
227 # By default, the KDU is expected to be a file
228 kdu_model
= full_path
229 # If the chart_model includes a "/", then it is a reference:
230 # e.g. (stable/openldap; stable/openldap:1.2.4)
231 if chart_model
.find("/") >= 0:
232 kdu_model
= chart_model
235 # Call helm conn install
236 # Obtain system cluster id from database
237 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
238 # Add parameter osm if exist to global
239 if config
and config
.get("osm"):
240 if not config
.get("global"):
241 config
["global"] = {}
242 config
["global"]["osm"] = config
.get("osm")
244 self
.log
.debug("install helm chart: {}".format(full_path
))
245 if vca_type
== "helm":
246 helm_id
= self
._k
8sclusterhelm
2.generate_kdu_instance_name(
250 await self
._k
8sclusterhelm
2.install(
253 kdu_instance
=helm_id
,
257 timeout
=progress_timeout
,
260 helm_id
= self
._k
8sclusterhelm
2.generate_kdu_instance_name(
264 await self
._k
8sclusterhelm
3.install(
267 kdu_instance
=helm_id
,
271 timeout
=progress_timeout
,
274 ee_id
= "{}:{}.{}".format(vca_type
, namespace
, helm_id
)
276 except N2VCException
:
278 except Exception as e
:
279 self
.log
.error("Error deploying chart ee: {}".format(e
), exc_info
=True)
280 raise N2VCException("Error deploying chart ee: {}".format(e
))
282 async def upgrade_execution_environment(
287 progress_timeout
: float = None,
288 total_timeout
: float = None,
290 artifact_path
: str = None,
291 vca_type
: str = None,
296 Creates a new helm execution environment deploying the helm-chat indicated in the
298 :param str namespace: This param is not used, all helm charts are deployed in the osm
300 :param dict db_dict: where to write to database when the status changes.
301 It contains a dictionary with {collection: str, filter: {}, path: str},
302 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
303 "_admin.deployed.VCA.3"}
304 :param helm_id: unique name of the Helm release to upgrade
305 :param float progress_timeout:
306 :param float total_timeout:
307 :param dict config: General variables to instantiate KDU
308 :param str artifact_path: path of package content
309 :param str vca_type: Type of vca, must be type helm or helm-v3
310 :returns str, dict: id of the new execution environment including namespace.helm_id
311 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
315 "upgrade_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
318 # Validate helm_id is provided
319 if helm_id
is None or len(helm_id
) == 0:
320 raise N2VCBadArgumentsException(
321 message
="helm_id is mandatory", bad_args
=["helm_id"]
324 # Validate artifact-path is provided
325 if artifact_path
is None or len(artifact_path
) == 0:
326 raise N2VCBadArgumentsException(
327 message
="artifact_path is mandatory", bad_args
=["artifact_path"]
330 # Validate artifact-path exists and sync path
331 from_path
= os
.path
.split(artifact_path
)[0]
332 self
.fs
.sync(from_path
)
334 # remove / in charm path
335 while artifact_path
.find("//") >= 0:
336 artifact_path
= artifact_path
.replace("//", "/")
339 if self
.fs
.file_exists(artifact_path
):
340 helm_chart_path
= artifact_path
342 msg
= "artifact path does not exist: {}".format(artifact_path
)
343 raise N2VCBadArgumentsException(message
=msg
, bad_args
=["artifact_path"])
345 if artifact_path
.startswith("/"):
346 full_path
= self
.fs
.path
+ helm_chart_path
348 full_path
= self
.fs
.path
+ "/" + helm_chart_path
350 while full_path
.find("//") >= 0:
351 full_path
= full_path
.replace("//", "/")
354 # Call helm conn upgrade
355 # Obtain system cluster id from database
356 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
357 # Add parameter osm if exist to global
358 if config
and config
.get("osm"):
359 if not config
.get("global"):
360 config
["global"] = {}
361 config
["global"]["osm"] = config
.get("osm")
363 self
.log
.debug("Ugrade helm chart: {}".format(full_path
))
364 if vca_type
== "helm":
365 await self
._k
8sclusterhelm
2.upgrade(
368 kdu_instance
=helm_id
,
372 timeout
=progress_timeout
,
376 await self
._k
8sclusterhelm
3.upgrade(
379 kdu_instance
=helm_id
,
383 timeout
=progress_timeout
,
387 except N2VCException
:
389 except Exception as e
:
390 self
.log
.error("Error upgrading chart ee: {}".format(e
), exc_info
=True)
391 raise N2VCException("Error upgrading chart ee: {}".format(e
))
393 async def create_tls_certificate(
399 namespace
: str = None,
401 # Obtain system cluster id from database
402 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
403 # use helm-v3 as certificates don't depend on helm version
404 await self
._k
8sclusterhelm
3.create_certificate(
405 cluster_uuid
=system_cluster_uuid
,
406 namespace
=namespace
or self
.vca_config
.kubectl_osm_namespace
,
407 dns_prefix
=dns_prefix
,
409 secret_name
=secret_name
,
413 async def delete_tls_certificate(
415 certificate_name
: str = None,
416 namespace
: str = None,
418 # Obtain system cluster id from database
419 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
420 await self
._k
8sclusterhelm
3.delete_certificate(
421 cluster_uuid
=system_cluster_uuid
,
422 namespace
=namespace
or self
.vca_config
.kubectl_osm_namespace
,
423 certificate_name
=certificate_name
,
426 async def setup_ns_namespace(
430 # Obtain system cluster id from database
431 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
432 await self
._k
8sclusterhelm
3.create_namespace(
434 cluster_uuid
=system_cluster_uuid
,
436 await self
._k
8sclusterhelm
3.setup_default_rbac(
440 resources
=["secrets"],
442 service_account
="default",
443 cluster_uuid
=system_cluster_uuid
,
445 await self
._k
8sclusterhelm
3.copy_secret_data(
448 src_namespace
=self
.vca_config
.kubectl_osm_namespace
,
450 cluster_uuid
=system_cluster_uuid
,
454 async def register_execution_environment(
459 progress_timeout
: float = None,
460 total_timeout
: float = None,
467 async def install_configuration_sw(self
, *args
, **kwargs
):
471 async def add_relation(self
, *args
, **kwargs
):
475 async def remove_relation(self
):
479 async def get_status(self
, *args
, **kwargs
):
480 # not used for this connector
483 async def get_ee_ssh_public__key(
487 progress_timeout
: float = None,
488 total_timeout
: float = None,
492 Obtains ssh-public key from ee executing GetSShKey method from the ee.
494 :param str ee_id: the id of the execution environment returned by
495 create_execution_environment or register_execution_environment
497 :param float progress_timeout:
498 :param float total_timeout:
499 :returns: public key of the execution environment
503 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id
, db_dict
)
507 if ee_id
is None or len(ee_id
) == 0:
508 raise N2VCBadArgumentsException(
509 message
="ee_id is mandatory", bad_args
=["ee_id"]
513 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
514 version
, namespace
, helm_id
= get_ee_id_parts(ee_id
)
515 ip_addr
= "{}.{}.svc".format(helm_id
, namespace
)
516 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
517 # install libraries and start successfully
518 ssh_key
= await self
._get
_ssh
_key
(ip_addr
)
520 except Exception as e
:
521 self
.log
.error("Error obtaining ee ssh_key: {}".format(e
), exc_info
=True)
522 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e
))
524 async def upgrade_charm(
528 charm_id
: str = None,
529 charm_type
: str = None,
530 timeout
: float = None,
532 """This method upgrade charms in VNFs
534 This method does not support KDU's deployed with Helm.
537 ee_id: Execution environment id
538 path: Local path to the charm
540 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
541 timeout: (Float) Timeout for the ns update operation
544 the output of the update operation if status equals to "completed"
547 raise N2VCException("KDUs deployed with Helm do not support charm upgrade")
549 async def exec_primitive(
554 db_dict
: dict = None,
555 progress_timeout
: float = None,
556 total_timeout
: float = None,
560 Execute a primitive in the execution environment
562 :param str ee_id: the one returned by create_execution_environment or
563 register_execution_environment with the format namespace.helm_id
564 :param str primitive_name: must be one defined in the software. There is one
565 called 'config', where, for the proxy case, the 'credentials' of VM are
567 :param dict params_dict: parameters of the action
568 :param dict db_dict: where to write into database when the status changes.
569 It contains a dict with
570 {collection: <str>, filter: {}, path: <str>},
571 e.g. {collection: "nslcmops", filter:
572 {_id: <nslcmop_id>, path: "_admin.VCA"}
573 It will be used to store information about intermediate notifications
574 :param float progress_timeout:
575 :param float total_timeout:
576 :returns str: primitive result, if ok. It raises exceptions in case of fail
580 "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
581 ee_id
, primitive_name
, params_dict
, db_dict
586 if ee_id
is None or len(ee_id
) == 0:
587 raise N2VCBadArgumentsException(
588 message
="ee_id is mandatory", bad_args
=["ee_id"]
590 if primitive_name
is None or len(primitive_name
) == 0:
591 raise N2VCBadArgumentsException(
592 message
="action_name is mandatory", bad_args
=["action_name"]
594 if params_dict
is None:
598 version
, namespace
, helm_id
= get_ee_id_parts(ee_id
)
599 ip_addr
= "{}.{}.svc".format(helm_id
, namespace
)
600 except Exception as e
:
601 self
.log
.error("Error getting ee ip ee: {}".format(e
))
602 raise N2VCException("Error getting ee ip ee: {}".format(e
))
604 if primitive_name
== "config":
606 # Execute config primitive, higher timeout to check the case ee is starting
607 status
, detailed_message
= await self
._execute
_config
_primitive
(
608 ip_addr
, params_dict
, db_dict
=db_dict
611 "Executed config primitive ee_id_ {}, status: {}, message: {}".format(
612 ee_id
, status
, detailed_message
617 "Error configuring helm ee, status: {}, message: {}".format(
618 status
, detailed_message
621 raise N2VCExecutionException(
622 message
="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
623 ee_id
, status
, detailed_message
625 primitive_name
=primitive_name
,
627 except Exception as e
:
628 self
.log
.error("Error configuring helm ee: {}".format(e
))
629 raise N2VCExecutionException(
630 message
="Error configuring helm ee_id: {}, {}".format(ee_id
, e
),
631 primitive_name
=primitive_name
,
637 status
, detailed_message
= await self
._execute
_primitive
(
638 ip_addr
, primitive_name
, params_dict
, db_dict
=db_dict
641 "Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
642 primitive_name
, ee_id
, status
, detailed_message
645 if status
!= "OK" and status
!= "PROCESSING":
647 "Execute primitive {} returned not ok status: {}, message: {}".format(
648 primitive_name
, status
, detailed_message
651 raise N2VCExecutionException(
652 message
="Execute primitive {} returned not ok status: {}, message: {}".format(
653 primitive_name
, status
, detailed_message
655 primitive_name
=primitive_name
,
657 except Exception as e
:
659 "Error executing primitive {}: {}".format(primitive_name
, e
)
661 raise N2VCExecutionException(
662 message
="Error executing primitive {} into ee={} : {}".format(
663 primitive_name
, ee_id
, e
665 primitive_name
=primitive_name
,
667 return detailed_message
669 async def deregister_execution_environments(self
):
673 async def delete_execution_environment(
676 db_dict
: dict = None,
677 total_timeout
: float = None,
681 Delete an execution environment
682 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
683 :param dict db_dict: where to write into database when the status changes.
684 It contains a dict with
685 {collection: <str>, filter: {}, path: <str>},
686 e.g. {collection: "nsrs", filter:
687 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
688 :param float total_timeout:
691 self
.log
.info("ee_id: {}".format(ee_id
))
695 raise N2VCBadArgumentsException(
696 message
="ee_id is mandatory", bad_args
=["ee_id"]
700 # Obtain cluster_uuid
701 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
704 version
, namespace
, helm_id
= get_ee_id_parts(ee_id
)
706 # Uninstall chart, for backward compatibility we must assume that if there is no
707 # version it is helm-v2
708 if version
== "helm-v3":
709 await self
._k
8sclusterhelm
3.uninstall(system_cluster_uuid
, helm_id
)
711 await self
._k
8sclusterhelm
2.uninstall(system_cluster_uuid
, helm_id
)
712 self
.log
.info("ee_id: {} deleted".format(ee_id
))
713 except N2VCException
:
715 except Exception as e
:
717 "Error deleting ee id: {}: {}".format(ee_id
, e
), exc_info
=True
719 raise N2VCException("Error deleting ee id {}: {}".format(ee_id
, e
))
721 async def delete_namespace(
722 self
, namespace
: str, db_dict
: dict = None, total_timeout
: float = None
724 # Obtain system cluster id from database
725 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
726 await self
._k
8sclusterhelm
3.delete_namespace(
728 cluster_uuid
=system_cluster_uuid
,
731 async def install_k8s_proxy_charm(
737 progress_timeout
: float = None,
738 total_timeout
: float = None,
745 @retryer(max_wait_time_var
="_initial_retry_time", delay_time_var
="_retry_delay")
746 async def _get_ssh_key(self
, ip_addr
):
747 return await self
._execute
_primitive
_internal
(
753 @retryer(max_wait_time_var
="_initial_retry_time", delay_time_var
="_retry_delay")
754 async def _execute_config_primitive(self
, ip_addr
, params
, db_dict
=None):
755 return await self
._execute
_primitive
_internal
(
756 ip_addr
, "config", params
, db_dict
=db_dict
759 @retryer(max_wait_time_var
="_max_retry_time", delay_time_var
="_retry_delay")
760 async def _execute_primitive(self
, ip_addr
, primitive_name
, params
, db_dict
=None):
761 return await self
._execute
_primitive
_internal
(
762 ip_addr
, primitive_name
, params
, db_dict
=db_dict
765 async def _execute_primitive_internal(
766 self
, ip_addr
, primitive_name
, params
, db_dict
=None
769 stub
= FrontendExecutorStub(channel
)
770 if primitive_name
== "_get_ssh_key":
771 self
.log
.debug("get ssh key, ip_addr: {}".format(ip_addr
))
772 reply
: SshKeyReply
= await stub
.GetSshKey(SshKeyRequest())
774 # For any other primitives
775 async with stub
.RunPrimitive
.open() as stream
:
776 primitive_id
= str(uuid
.uuid1())
779 "Execute primitive internal: id:{}, name:{}, params: {}".format(
780 primitive_id
, primitive_name
, params
783 await stream
.send_message(
785 id=primitive_id
, name
=primitive_name
, params
=yaml
.dump(params
)
789 async for reply
in stream
:
790 self
.log
.debug("Received reply: {}".format(reply
))
792 # If db_dict provided write notifs in database
794 self
._write
_op
_detailed
_status
(
795 db_dict
, reply
.status
, reply
.detailed_message
798 return reply
.status
, reply
.detailed_message
800 return "ERROR", "No result received"
802 ssl_context
= create_secure_context(
803 self
.vca_config
.ca_store
,
804 self
.vca_config
.client_cert_path
,
805 self
.vca_config
.client_key_path
,
808 ip_addr
, self
.vca_config
.helm_ee_service_port
, ssl
=ssl_context
811 return await execute()
812 except ssl
.SSLError
as ssl_error
: # fallback to insecure gRPC
814 ssl_error
.reason
== "WRONG_VERSION_NUMBER"
815 and not self
.vca_config
.eegrpc_tls_enforce
818 "Execution environment doesn't support TLS, falling back to unsecure gRPC"
820 channel
= Channel(ip_addr
, self
.vca_config
.helm_ee_service_port
)
821 return await execute()
822 elif ssl_error
.reason
== "WRONG_VERSION_NUMBER":
824 "Execution environment doesn't support TLS, primitives cannot be executed"
831 def _write_op_detailed_status(self
, db_dict
, status
, detailed_message
):
832 # write ee_id to database: _admin.deployed.VCA.x
834 the_table
= db_dict
["collection"]
835 the_filter
= db_dict
["filter"]
836 update_dict
= {"detailed-status": "{}: {}".format(status
, detailed_message
)}
837 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
841 update_dict
=update_dict
,
844 except asyncio
.CancelledError
:
846 except Exception as e
:
847 self
.log
.error("Error writing detailedStatus to database: {}".format(e
))
849 async def _get_system_cluster_id(self
):
850 if not self
._system
_cluster
_id
:
851 db_k8cluster
= self
.db
.get_one(
852 "k8sclusters", {"name": self
.vca_config
.kubectl_osm_cluster_name
}
854 k8s_hc_id
= deep_get(db_k8cluster
, ("_admin", "helm-chart-v3", "id"))
857 # backward compatibility for existing clusters that have not been initialized for helm v3
858 cluster_id
= db_k8cluster
.get("_id")
859 k8s_credentials
= yaml
.safe_dump(db_k8cluster
.get("credentials"))
860 k8s_hc_id
, uninstall_sw
= await self
._k
8sclusterhelm
3.init_env(
861 k8s_credentials
, reuse_cluster_uuid
=cluster_id
863 db_k8scluster_update
= {
864 "_admin.helm-chart-v3.error_msg": None,
865 "_admin.helm-chart-v3.id": k8s_hc_id
,
866 "_admin.helm-chart-v3}.created": uninstall_sw
,
867 "_admin.helm-chart-v3.operationalState": "ENABLED",
869 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
870 except Exception as e
:
872 "error initializing helm-v3 cluster: {}".format(str(e
))
875 "K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
879 self
._system
_cluster
_id
= k8s_hc_id
880 return self
._system
_cluster
_id