Bug 2000 fixed: the namespace for the Juju Bundle is now updated within the KDU insta...
[osm/LCM.git] / osm_lcm / lcm_helm_conn.py
1 ##
2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 ##
18 import functools
19 import yaml
20 import asyncio
21 import socket
22 import uuid
23 import os
24
25 from grpclib.client import Channel
26
27 from osm_lcm.frontend_pb2 import PrimitiveRequest
28 from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply
29 from osm_lcm.frontend_grpc import FrontendExecutorStub
30 from osm_lcm.lcm_utils import LcmBase
31
32 from osm_lcm.data_utils.database.database import Database
33 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
34
35 from n2vc.n2vc_conn import N2VCConnector
36 from n2vc.k8s_helm_conn import K8sHelmConnector
37 from n2vc.k8s_helm3_conn import K8sHelm3Connector
38 from n2vc.exceptions import (
39 N2VCBadArgumentsException,
40 N2VCException,
41 N2VCExecutionException,
42 )
43
44 from osm_lcm.lcm_utils import deep_get
45
46
47 def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"):
48 def wrapper(func):
49 retry_exceptions = ConnectionRefusedError
50
51 @functools.wraps(func)
52 async def wrapped(*args, **kwargs):
53 # default values for wait time and delay_time
54 delay_time = 10
55 max_wait_time = 300
56
57 # obtain arguments from variable names
58 self = args[0]
59 if self.__dict__.get(max_wait_time_var):
60 max_wait_time = self.__dict__.get(max_wait_time_var)
61 if self.__dict__.get(delay_time_var):
62 delay_time = self.__dict__.get(delay_time_var)
63
64 wait_time = max_wait_time
65 while wait_time > 0:
66 try:
67 return await func(*args, **kwargs)
68 except retry_exceptions:
69 wait_time = wait_time - delay_time
70 await asyncio.sleep(delay_time)
71 continue
72 else:
73 return ConnectionRefusedError
74
75 return wrapped
76
77 return wrapper
78
79
80 class LCMHelmConn(N2VCConnector, LcmBase):
81 _KUBECTL_OSM_NAMESPACE = "osm"
82 _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s"
83 _EE_SERVICE_PORT = 50050
84
85 # Initial max retry time
86 _MAX_INITIAL_RETRY_TIME = 600
87 # Max retry time for normal operations
88 _MAX_RETRY_TIME = 30
89 # Time beetween retries, retry time after a connection error is raised
90 _EE_RETRY_DELAY = 10
91
92 def __init__(
93 self,
94 log: object = None,
95 loop: object = None,
96 vca_config: dict = None,
97 on_update_db=None,
98 ):
99 """
100 Initialize EE helm connector.
101 """
102
103 self.db = Database().instance.db
104 self.fs = Filesystem().instance.fs
105
106 # parent class constructor
107 N2VCConnector.__init__(
108 self, log=log, loop=loop, on_update_db=on_update_db, db=self.db, fs=self.fs
109 )
110
111 self.vca_config = vca_config
112 self.log.debug("Initialize helm N2VC connector")
113 self.log.debug("initial vca_config: {}".format(vca_config))
114
115 # TODO - Obtain data from configuration
116 self._ee_service_port = self._EE_SERVICE_PORT
117
118 self._retry_delay = self._EE_RETRY_DELAY
119
120 if self.vca_config and self.vca_config.get("eegrpcinittimeout"):
121 self._initial_retry_time = self.vca_config.get("eegrpcinittimeout")
122 self.log.debug("Initial retry time: {}".format(self._initial_retry_time))
123 else:
124 self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME
125 self.log.debug(
126 "Applied default retry time: {}".format(self._initial_retry_time)
127 )
128
129 if self.vca_config and self.vca_config.get("eegrpctimeout"):
130 self._max_retry_time = self.vca_config.get("eegrpctimeout")
131 self.log.debug("Retry time: {}".format(self._max_retry_time))
132 else:
133 self._max_retry_time = self._MAX_RETRY_TIME
134 self.log.debug(
135 "Applied default retry time: {}".format(self._max_retry_time)
136 )
137
138 # initialize helm connector for helmv2 and helmv3
139 self._k8sclusterhelm2 = K8sHelmConnector(
140 kubectl_command=self.vca_config.get("kubectlpath"),
141 helm_command=self.vca_config.get("helmpath"),
142 fs=self.fs,
143 db=self.db,
144 log=self.log,
145 on_update_db=None,
146 )
147
148 self._k8sclusterhelm3 = K8sHelm3Connector(
149 kubectl_command=self.vca_config.get("kubectlpath"),
150 helm_command=self.vca_config.get("helm3path"),
151 fs=self.fs,
152 log=self.log,
153 db=self.db,
154 on_update_db=None,
155 )
156
157 self._system_cluster_id = None
158 self.log.info("Helm N2VC connector initialized")
159
160 # TODO - ¿reuse_ee_id?
161 async def create_execution_environment(
162 self,
163 namespace: str,
164 db_dict: dict,
165 reuse_ee_id: str = None,
166 progress_timeout: float = None,
167 total_timeout: float = None,
168 config: dict = None,
169 artifact_path: str = None,
170 vca_type: str = None,
171 *kargs,
172 **kwargs,
173 ) -> (str, dict):
174 """
175 Creates a new helm execution environment deploying the helm-chat indicated in the
176 attifact_path
177 :param str namespace: This param is not used, all helm charts are deployed in the osm
178 system namespace
179 :param dict db_dict: where to write to database when the status changes.
180 It contains a dictionary with {collection: str, filter: {}, path: str},
181 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
182 "_admin.deployed.VCA.3"}
183 :param str reuse_ee_id: ee id from an older execution. TODO - right now this params is not used
184 :param float progress_timeout:
185 :param float total_timeout:
186 :param dict config: General variables to instantiate KDU
187 :param str artifact_path: path of package content
188 :param str vca_type: Type of vca, must be type helm or helm-v3
189 :returns str, dict: id of the new execution environment including namespace.helm_id
190 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
191 """
192
193 self.log.info(
194 "create_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
195 "reuse_ee_id: {}".format(namespace, artifact_path, db_dict, reuse_ee_id)
196 )
197
198 # Validate artifact-path is provided
199 if artifact_path is None or len(artifact_path) == 0:
200 raise N2VCBadArgumentsException(
201 message="artifact_path is mandatory", bad_args=["artifact_path"]
202 )
203
204 # Validate artifact-path exists and sync path
205 from_path = os.path.split(artifact_path)[0]
206 self.fs.sync(from_path)
207
208 # remove / in charm path
209 while artifact_path.find("//") >= 0:
210 artifact_path = artifact_path.replace("//", "/")
211
212 # check charm path
213 if self.fs.file_exists(artifact_path):
214 helm_chart_path = artifact_path
215 else:
216 msg = "artifact path does not exist: {}".format(artifact_path)
217 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
218
219 if artifact_path.startswith("/"):
220 full_path = self.fs.path + helm_chart_path
221 else:
222 full_path = self.fs.path + "/" + helm_chart_path
223
224 while full_path.find("//") >= 0:
225 full_path = full_path.replace("//", "/")
226
227 try:
228 # Call helm conn install
229 # Obtain system cluster id from database
230 system_cluster_uuid = await self._get_system_cluster_id()
231 # Add parameter osm if exist to global
232 if config and config.get("osm"):
233 if not config.get("global"):
234 config["global"] = {}
235 config["global"]["osm"] = config.get("osm")
236
237 self.log.debug("install helm chart: {}".format(full_path))
238 if vca_type == "helm":
239 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
240 db_dict=db_dict,
241 kdu_model=full_path,
242 )
243 await self._k8sclusterhelm2.install(
244 system_cluster_uuid,
245 kdu_model=full_path,
246 kdu_instance=helm_id,
247 namespace=self._KUBECTL_OSM_NAMESPACE,
248 params=config,
249 db_dict=db_dict,
250 timeout=progress_timeout,
251 )
252 else:
253 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
254 db_dict=db_dict,
255 kdu_model=full_path,
256 )
257 await self._k8sclusterhelm3.install(
258 system_cluster_uuid,
259 kdu_model=full_path,
260 kdu_instance=helm_id,
261 namespace=self._KUBECTL_OSM_NAMESPACE,
262 params=config,
263 db_dict=db_dict,
264 timeout=progress_timeout,
265 )
266
267 ee_id = "{}:{}.{}".format(vca_type, self._KUBECTL_OSM_NAMESPACE, helm_id)
268 return ee_id, None
269 except N2VCException:
270 raise
271 except Exception as e:
272 self.log.error("Error deploying chart ee: {}".format(e), exc_info=True)
273 raise N2VCException("Error deploying chart ee: {}".format(e))
274
275 async def register_execution_environment(
276 self,
277 namespace: str,
278 credentials: dict,
279 db_dict: dict,
280 progress_timeout: float = None,
281 total_timeout: float = None,
282 *kargs,
283 **kwargs,
284 ) -> str:
285 # nothing to do
286 pass
287
288 async def install_configuration_sw(self, *args, **kwargs):
289 # nothing to do
290 pass
291
292 async def add_relation(self, *args, **kwargs):
293 # nothing to do
294 pass
295
296 async def remove_relation(self):
297 # nothing to to
298 pass
299
300 async def get_status(self, *args, **kwargs):
301 # not used for this connector
302 pass
303
304 async def upgrade_charm(self, *args, **kwargs):
305 # not used for this connector
306 pass
307
308 async def get_ee_ssh_public__key(
309 self,
310 ee_id: str,
311 db_dict: dict,
312 progress_timeout: float = None,
313 total_timeout: float = None,
314 **kwargs,
315 ) -> str:
316 """
317 Obtains ssh-public key from ee executing GetSShKey method from the ee.
318
319 :param str ee_id: the id of the execution environment returned by
320 create_execution_environment or register_execution_environment
321 :param dict db_dict:
322 :param float progress_timeout:
323 :param float total_timeout:
324 :returns: public key of the execution environment
325 """
326
327 self.log.info(
328 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id, db_dict)
329 )
330
331 # check arguments
332 if ee_id is None or len(ee_id) == 0:
333 raise N2VCBadArgumentsException(
334 message="ee_id is mandatory", bad_args=["ee_id"]
335 )
336
337 try:
338 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
339 version, namespace, helm_id = self._get_ee_id_parts(ee_id)
340 ip_addr = socket.gethostbyname(helm_id)
341
342 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
343 # install libraries and start successfully
344 ssh_key = await self._get_ssh_key(ip_addr)
345 return ssh_key
346 except Exception as e:
347 self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True)
348 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e))
349
350 async def upgrade_charm(
351 self,
352 ee_id: str = None,
353 path: str = None,
354 charm_id: str = None,
355 charm_type: str = None,
356 timeout: float = None,
357 ) -> str:
358 """This method upgrade charms in VNFs
359
360 This method does not support KDU's deployed with Helm.
361
362 Args:
363 ee_id: Execution environment id
364 path: Local path to the charm
365 charm_id: charm-id
366 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
367 timeout: (Float) Timeout for the ns update operation
368
369 Returns:
370 the output of the update operation if status equals to "completed"
371
372 """
373 raise N2VCException("KDUs deployed with Helm do not support charm upgrade")
374
375 async def exec_primitive(
376 self,
377 ee_id: str,
378 primitive_name: str,
379 params_dict: dict,
380 db_dict: dict = None,
381 progress_timeout: float = None,
382 total_timeout: float = None,
383 **kwargs,
384 ) -> str:
385 """
386 Execute a primitive in the execution environment
387
388 :param str ee_id: the one returned by create_execution_environment or
389 register_execution_environment with the format namespace.helm_id
390 :param str primitive_name: must be one defined in the software. There is one
391 called 'config', where, for the proxy case, the 'credentials' of VM are
392 provided
393 :param dict params_dict: parameters of the action
394 :param dict db_dict: where to write into database when the status changes.
395 It contains a dict with
396 {collection: <str>, filter: {}, path: <str>},
397 e.g. {collection: "nslcmops", filter:
398 {_id: <nslcmop_id>, path: "_admin.VCA"}
399 It will be used to store information about intermediate notifications
400 :param float progress_timeout:
401 :param float total_timeout:
402 :returns str: primitive result, if ok. It raises exceptions in case of fail
403 """
404
405 self.log.info(
406 "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
407 ee_id, primitive_name, params_dict, db_dict
408 )
409 )
410
411 # check arguments
412 if ee_id is None or len(ee_id) == 0:
413 raise N2VCBadArgumentsException(
414 message="ee_id is mandatory", bad_args=["ee_id"]
415 )
416 if primitive_name is None or len(primitive_name) == 0:
417 raise N2VCBadArgumentsException(
418 message="action_name is mandatory", bad_args=["action_name"]
419 )
420 if params_dict is None:
421 params_dict = dict()
422
423 try:
424 version, namespace, helm_id = self._get_ee_id_parts(ee_id)
425 ip_addr = socket.gethostbyname(helm_id)
426 except Exception as e:
427 self.log.error("Error getting ee ip ee: {}".format(e))
428 raise N2VCException("Error getting ee ip ee: {}".format(e))
429
430 if primitive_name == "config":
431 try:
432 # Execute config primitive, higher timeout to check the case ee is starting
433 status, detailed_message = await self._execute_config_primitive(
434 ip_addr, params_dict, db_dict=db_dict
435 )
436 self.log.debug(
437 "Executed config primitive ee_id_ {}, status: {}, message: {}".format(
438 ee_id, status, detailed_message
439 )
440 )
441 if status != "OK":
442 self.log.error(
443 "Error configuring helm ee, status: {}, message: {}".format(
444 status, detailed_message
445 )
446 )
447 raise N2VCExecutionException(
448 message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
449 ee_id, status, detailed_message
450 ),
451 primitive_name=primitive_name,
452 )
453 except Exception as e:
454 self.log.error("Error configuring helm ee: {}".format(e))
455 raise N2VCExecutionException(
456 message="Error configuring helm ee_id: {}, {}".format(ee_id, e),
457 primitive_name=primitive_name,
458 )
459 return "CONFIG OK"
460 else:
461 try:
462 # Execute primitive
463 status, detailed_message = await self._execute_primitive(
464 ip_addr, primitive_name, params_dict, db_dict=db_dict
465 )
466 self.log.debug(
467 "Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
468 primitive_name, ee_id, status, detailed_message
469 )
470 )
471 if status != "OK" and status != "PROCESSING":
472 self.log.error(
473 "Execute primitive {} returned not ok status: {}, message: {}".format(
474 primitive_name, status, detailed_message
475 )
476 )
477 raise N2VCExecutionException(
478 message="Execute primitive {} returned not ok status: {}, message: {}".format(
479 primitive_name, status, detailed_message
480 ),
481 primitive_name=primitive_name,
482 )
483 except Exception as e:
484 self.log.error(
485 "Error executing primitive {}: {}".format(primitive_name, e)
486 )
487 raise N2VCExecutionException(
488 message="Error executing primitive {} into ee={} : {}".format(
489 primitive_name, ee_id, e
490 ),
491 primitive_name=primitive_name,
492 )
493 return detailed_message
494
495 async def deregister_execution_environments(self):
496 # nothing to be done
497 pass
498
499 async def delete_execution_environment(
500 self,
501 ee_id: str,
502 db_dict: dict = None,
503 total_timeout: float = None,
504 **kwargs,
505 ):
506 """
507 Delete an execution environment
508 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
509 :param dict db_dict: where to write into database when the status changes.
510 It contains a dict with
511 {collection: <str>, filter: {}, path: <str>},
512 e.g. {collection: "nsrs", filter:
513 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
514 :param float total_timeout:
515 """
516
517 self.log.info("ee_id: {}".format(ee_id))
518
519 # check arguments
520 if ee_id is None:
521 raise N2VCBadArgumentsException(
522 message="ee_id is mandatory", bad_args=["ee_id"]
523 )
524
525 try:
526
527 # Obtain cluster_uuid
528 system_cluster_uuid = await self._get_system_cluster_id()
529
530 # Get helm_id
531 version, namespace, helm_id = self._get_ee_id_parts(ee_id)
532
533 # Uninstall chart, for backward compatibility we must assume that if there is no
534 # version it is helm-v2
535 if version == "helm-v3":
536 await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id)
537 else:
538 await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id)
539 self.log.info("ee_id: {} deleted".format(ee_id))
540 except N2VCException:
541 raise
542 except Exception as e:
543 self.log.error(
544 "Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True
545 )
546 raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e))
547
548 async def delete_namespace(
549 self, namespace: str, db_dict: dict = None, total_timeout: float = None
550 ):
551 # method not implemented for this connector, execution environments must be deleted individually
552 pass
553
554 async def install_k8s_proxy_charm(
555 self,
556 charm_name: str,
557 namespace: str,
558 artifact_path: str,
559 db_dict: dict,
560 progress_timeout: float = None,
561 total_timeout: float = None,
562 config: dict = None,
563 *kargs,
564 **kwargs,
565 ) -> str:
566 pass
567
568 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
569 async def _get_ssh_key(self, ip_addr):
570 channel = Channel(ip_addr, self._ee_service_port)
571 try:
572 stub = FrontendExecutorStub(channel)
573 self.log.debug("get ssh key, ip_addr: {}".format(ip_addr))
574 reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest())
575 return reply.message
576 finally:
577 channel.close()
578
579 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
580 async def _execute_config_primitive(self, ip_addr, params, db_dict=None):
581 return await self._execute_primitive_internal(
582 ip_addr, "config", params, db_dict=db_dict
583 )
584
585 @retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay")
586 async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None):
587 return await self._execute_primitive_internal(
588 ip_addr, primitive_name, params, db_dict=db_dict
589 )
590
591 async def _execute_primitive_internal(
592 self, ip_addr, primitive_name, params, db_dict=None
593 ):
594
595 channel = Channel(ip_addr, self._ee_service_port)
596 try:
597 stub = FrontendExecutorStub(channel)
598 async with stub.RunPrimitive.open() as stream:
599 primitive_id = str(uuid.uuid1())
600 result = None
601 self.log.debug(
602 "Execute primitive internal: id:{}, name:{}, params: {}".format(
603 primitive_id, primitive_name, params
604 )
605 )
606 await stream.send_message(
607 PrimitiveRequest(
608 id=primitive_id, name=primitive_name, params=yaml.dump(params)
609 ),
610 end=True,
611 )
612 async for reply in stream:
613 self.log.debug("Received reply: {}".format(reply))
614 result = reply
615 # If db_dict provided write notifs in database
616 if db_dict:
617 self._write_op_detailed_status(
618 db_dict, reply.status, reply.detailed_message
619 )
620 if result:
621 return reply.status, reply.detailed_message
622 else:
623 return "ERROR", "No result received"
624 finally:
625 channel.close()
626
627 def _write_op_detailed_status(self, db_dict, status, detailed_message):
628
629 # write ee_id to database: _admin.deployed.VCA.x
630 try:
631 the_table = db_dict["collection"]
632 the_filter = db_dict["filter"]
633 update_dict = {"detailed-status": "{}: {}".format(status, detailed_message)}
634 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
635 self.db.set_one(
636 table=the_table,
637 q_filter=the_filter,
638 update_dict=update_dict,
639 fail_on_empty=True,
640 )
641 except asyncio.CancelledError:
642 raise
643 except Exception as e:
644 self.log.error("Error writing detailedStatus to database: {}".format(e))
645
646 async def _get_system_cluster_id(self):
647 if not self._system_cluster_id:
648 db_k8cluster = self.db.get_one(
649 "k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME}
650 )
651 k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id"))
652 if not k8s_hc_id:
653 try:
654 # backward compatibility for existing clusters that have not been initialized for helm v3
655 cluster_id = db_k8cluster.get("_id")
656 k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials"))
657 k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env(
658 k8s_credentials, reuse_cluster_uuid=cluster_id
659 )
660 db_k8scluster_update = {
661 "_admin.helm-chart-v3.error_msg": None,
662 "_admin.helm-chart-v3.id": k8s_hc_id,
663 "_admin.helm-chart-v3}.created": uninstall_sw,
664 "_admin.helm-chart-v3.operationalState": "ENABLED",
665 }
666 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
667 except Exception as e:
668 self.log.error(
669 "error initializing helm-v3 cluster: {}".format(str(e))
670 )
671 raise N2VCException(
672 "K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
673 cluster_id
674 )
675 )
676 self._system_cluster_id = k8s_hc_id
677 return self._system_cluster_id
678
679 def _get_ee_id_parts(self, ee_id):
680 """
681 Parses ee_id stored at database that can be either 'version:namespace.helm_id' or only
682 namespace.helm_id for backward compatibility
683 If exists helm version can be helm-v3 or helm (helm-v2 old version)
684 """
685 version, _, part_id = ee_id.rpartition(":")
686 namespace, _, helm_id = part_id.rpartition(".")
687 return version, namespace, helm_id