Update create_execution_environment to pass the chart_model
[osm/LCM.git] / osm_lcm / lcm_helm_conn.py
1 ##
2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 ##
18 import functools
19 import yaml
20 import asyncio
21 import socket
22 import uuid
23 import os
24
25 from grpclib.client import Channel
26
27 from osm_lcm.frontend_pb2 import PrimitiveRequest
28 from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply
29 from osm_lcm.frontend_grpc import FrontendExecutorStub
30 from osm_lcm.lcm_utils import LcmBase
31
32 from osm_lcm.data_utils.database.database import Database
33 from osm_lcm.data_utils.filesystem.filesystem import Filesystem
34
35 from n2vc.n2vc_conn import N2VCConnector
36 from n2vc.k8s_helm_conn import K8sHelmConnector
37 from n2vc.k8s_helm3_conn import K8sHelm3Connector
38 from n2vc.exceptions import (
39 N2VCBadArgumentsException,
40 N2VCException,
41 N2VCExecutionException,
42 )
43
44 from osm_lcm.lcm_utils import deep_get
45
46
47 def retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay"):
48 def wrapper(func):
49 retry_exceptions = ConnectionRefusedError
50
51 @functools.wraps(func)
52 async def wrapped(*args, **kwargs):
53 # default values for wait time and delay_time
54 delay_time = 10
55 max_wait_time = 300
56
57 # obtain arguments from variable names
58 self = args[0]
59 if self.__dict__.get(max_wait_time_var):
60 max_wait_time = self.__dict__.get(max_wait_time_var)
61 if self.__dict__.get(delay_time_var):
62 delay_time = self.__dict__.get(delay_time_var)
63
64 wait_time = max_wait_time
65 while wait_time > 0:
66 try:
67 return await func(*args, **kwargs)
68 except retry_exceptions:
69 wait_time = wait_time - delay_time
70 await asyncio.sleep(delay_time)
71 continue
72 else:
73 return ConnectionRefusedError
74
75 return wrapped
76
77 return wrapper
78
79
80 class LCMHelmConn(N2VCConnector, LcmBase):
81 _KUBECTL_OSM_NAMESPACE = "osm"
82 _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s"
83 _EE_SERVICE_PORT = 50050
84
85 # Initial max retry time
86 _MAX_INITIAL_RETRY_TIME = 600
87 # Max retry time for normal operations
88 _MAX_RETRY_TIME = 30
89 # Time beetween retries, retry time after a connection error is raised
90 _EE_RETRY_DELAY = 10
91
92 def __init__(
93 self,
94 log: object = None,
95 loop: object = None,
96 vca_config: dict = None,
97 on_update_db=None,
98 ):
99 """
100 Initialize EE helm connector.
101 """
102
103 self.db = Database().instance.db
104 self.fs = Filesystem().instance.fs
105
106 # parent class constructor
107 N2VCConnector.__init__(
108 self, log=log, loop=loop, on_update_db=on_update_db, db=self.db, fs=self.fs
109 )
110
111 self.vca_config = vca_config
112 self.log.debug("Initialize helm N2VC connector")
113 self.log.debug("initial vca_config: {}".format(vca_config))
114
115 # TODO - Obtain data from configuration
116 self._ee_service_port = self._EE_SERVICE_PORT
117
118 self._retry_delay = self._EE_RETRY_DELAY
119
120 if self.vca_config and self.vca_config.get("eegrpcinittimeout"):
121 self._initial_retry_time = self.vca_config.get("eegrpcinittimeout")
122 self.log.debug("Initial retry time: {}".format(self._initial_retry_time))
123 else:
124 self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME
125 self.log.debug(
126 "Applied default retry time: {}".format(self._initial_retry_time)
127 )
128
129 if self.vca_config and self.vca_config.get("eegrpctimeout"):
130 self._max_retry_time = self.vca_config.get("eegrpctimeout")
131 self.log.debug("Retry time: {}".format(self._max_retry_time))
132 else:
133 self._max_retry_time = self._MAX_RETRY_TIME
134 self.log.debug(
135 "Applied default retry time: {}".format(self._max_retry_time)
136 )
137
138 # initialize helm connector for helmv2 and helmv3
139 self._k8sclusterhelm2 = K8sHelmConnector(
140 kubectl_command=self.vca_config.get("kubectlpath"),
141 helm_command=self.vca_config.get("helmpath"),
142 fs=self.fs,
143 db=self.db,
144 log=self.log,
145 on_update_db=None,
146 )
147
148 self._k8sclusterhelm3 = K8sHelm3Connector(
149 kubectl_command=self.vca_config.get("kubectlpath"),
150 helm_command=self.vca_config.get("helm3path"),
151 fs=self.fs,
152 log=self.log,
153 db=self.db,
154 on_update_db=None,
155 )
156
157 self._system_cluster_id = None
158 self.log.info("Helm N2VC connector initialized")
159
160 # TODO - ¿reuse_ee_id?
161 async def create_execution_environment(
162 self,
163 namespace: str,
164 db_dict: dict,
165 reuse_ee_id: str = None,
166 progress_timeout: float = None,
167 total_timeout: float = None,
168 config: dict = None,
169 artifact_path: str = None,
170 chart_model: str = None,
171 vca_type: str = None,
172 *kargs,
173 **kwargs,
174 ) -> (str, dict):
175 """
176 Creates a new helm execution environment deploying the helm-chat indicated in the
177 artifact_path
178 :param str namespace: This param is not used, all helm charts are deployed in the osm
179 system namespace
180 :param dict db_dict: where to write to database when the status changes.
181 It contains a dictionary with {collection: str, filter: {}, path: str},
182 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
183 "_admin.deployed.VCA.3"}
184 :param str reuse_ee_id: ee id from an older execution. TODO - right now this param is not used
185 :param float progress_timeout:
186 :param float total_timeout:
187 :param dict config: General variables to instantiate KDU
188 :param str artifact_path: path of package content
189 :param str chart_model: helm chart/reference (string), which can be either
190 of these options:
191 - a name of chart available via the repos known by OSM
192 (e.g. stable/openldap, stable/openldap:1.2.4)
193 - a path to a packaged chart (e.g. mychart.tgz)
194 - a path to an unpacked chart directory or a URL (e.g. mychart)
195 :param str vca_type: Type of vca, must be type helm or helm-v3
196 :returns str, dict: id of the new execution environment including namespace.helm_id
197 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
198 """
199
200 self.log.info(
201 "create_execution_environment: namespace: {}, artifact_path: {}, "
202 "chart_model: {}, db_dict: {}, reuse_ee_id: {}".format(
203 namespace, artifact_path, db_dict, chart_model, reuse_ee_id
204 )
205 )
206
207 # Validate artifact-path is provided
208 if artifact_path is None or len(artifact_path) == 0:
209 raise N2VCBadArgumentsException(
210 message="artifact_path is mandatory", bad_args=["artifact_path"]
211 )
212
213 # Validate artifact-path exists and sync path
214 from_path = os.path.split(artifact_path)[0]
215 self.fs.sync(from_path)
216
217 # remove / in charm path
218 while artifact_path.find("//") >= 0:
219 artifact_path = artifact_path.replace("//", "/")
220
221 # check charm path
222 if self.fs.file_exists(artifact_path):
223 helm_chart_path = artifact_path
224 else:
225 msg = "artifact path does not exist: {}".format(artifact_path)
226 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
227
228 if artifact_path.startswith("/"):
229 full_path = self.fs.path + helm_chart_path
230 else:
231 full_path = self.fs.path + "/" + helm_chart_path
232
233 while full_path.find("//") >= 0:
234 full_path = full_path.replace("//", "/")
235
236 # By default, the KDU is expected to be a file
237 kdu_model = full_path
238 # If the chart_model includes a "/", then it is a reference:
239 # e.g. (stable/openldap; stable/openldap:1.2.4)
240 if chart_model.find("/") >= 0:
241 kdu_model = chart_model
242
243 try:
244 # Call helm conn install
245 # Obtain system cluster id from database
246 system_cluster_uuid = await self._get_system_cluster_id()
247 # Add parameter osm if exist to global
248 if config and config.get("osm"):
249 if not config.get("global"):
250 config["global"] = {}
251 config["global"]["osm"] = config.get("osm")
252
253 self.log.debug("install helm chart: {}".format(full_path))
254 if vca_type == "helm":
255 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
256 db_dict=db_dict,
257 kdu_model=kdu_model,
258 )
259 await self._k8sclusterhelm2.install(
260 system_cluster_uuid,
261 kdu_model=kdu_model,
262 kdu_instance=helm_id,
263 namespace=self._KUBECTL_OSM_NAMESPACE,
264 params=config,
265 db_dict=db_dict,
266 timeout=progress_timeout,
267 )
268 else:
269 helm_id = self._k8sclusterhelm2.generate_kdu_instance_name(
270 db_dict=db_dict,
271 kdu_model=kdu_model,
272 )
273 await self._k8sclusterhelm3.install(
274 system_cluster_uuid,
275 kdu_model=kdu_model,
276 kdu_instance=helm_id,
277 namespace=self._KUBECTL_OSM_NAMESPACE,
278 params=config,
279 db_dict=db_dict,
280 timeout=progress_timeout,
281 )
282
283 ee_id = "{}:{}.{}".format(vca_type, self._KUBECTL_OSM_NAMESPACE, helm_id)
284 return ee_id, None
285 except N2VCException:
286 raise
287 except Exception as e:
288 self.log.error("Error deploying chart ee: {}".format(e), exc_info=True)
289 raise N2VCException("Error deploying chart ee: {}".format(e))
290
291 async def register_execution_environment(
292 self,
293 namespace: str,
294 credentials: dict,
295 db_dict: dict,
296 progress_timeout: float = None,
297 total_timeout: float = None,
298 *kargs,
299 **kwargs,
300 ) -> str:
301 # nothing to do
302 pass
303
304 async def install_configuration_sw(self, *args, **kwargs):
305 # nothing to do
306 pass
307
308 async def add_relation(self, *args, **kwargs):
309 # nothing to do
310 pass
311
312 async def remove_relation(self):
313 # nothing to to
314 pass
315
316 async def get_status(self, *args, **kwargs):
317 # not used for this connector
318 pass
319
320 async def get_ee_ssh_public__key(
321 self,
322 ee_id: str,
323 db_dict: dict,
324 progress_timeout: float = None,
325 total_timeout: float = None,
326 **kwargs,
327 ) -> str:
328 """
329 Obtains ssh-public key from ee executing GetSShKey method from the ee.
330
331 :param str ee_id: the id of the execution environment returned by
332 create_execution_environment or register_execution_environment
333 :param dict db_dict:
334 :param float progress_timeout:
335 :param float total_timeout:
336 :returns: public key of the execution environment
337 """
338
339 self.log.info(
340 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id, db_dict)
341 )
342
343 # check arguments
344 if ee_id is None or len(ee_id) == 0:
345 raise N2VCBadArgumentsException(
346 message="ee_id is mandatory", bad_args=["ee_id"]
347 )
348
349 try:
350 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
351 version, namespace, helm_id = self._get_ee_id_parts(ee_id)
352 ip_addr = socket.gethostbyname(helm_id)
353
354 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
355 # install libraries and start successfully
356 ssh_key = await self._get_ssh_key(ip_addr)
357 return ssh_key
358 except Exception as e:
359 self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True)
360 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e))
361
362 async def upgrade_charm(
363 self,
364 ee_id: str = None,
365 path: str = None,
366 charm_id: str = None,
367 charm_type: str = None,
368 timeout: float = None,
369 ) -> str:
370 """This method upgrade charms in VNFs
371
372 This method does not support KDU's deployed with Helm.
373
374 Args:
375 ee_id: Execution environment id
376 path: Local path to the charm
377 charm_id: charm-id
378 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
379 timeout: (Float) Timeout for the ns update operation
380
381 Returns:
382 the output of the update operation if status equals to "completed"
383
384 """
385 raise N2VCException("KDUs deployed with Helm do not support charm upgrade")
386
387 async def exec_primitive(
388 self,
389 ee_id: str,
390 primitive_name: str,
391 params_dict: dict,
392 db_dict: dict = None,
393 progress_timeout: float = None,
394 total_timeout: float = None,
395 **kwargs,
396 ) -> str:
397 """
398 Execute a primitive in the execution environment
399
400 :param str ee_id: the one returned by create_execution_environment or
401 register_execution_environment with the format namespace.helm_id
402 :param str primitive_name: must be one defined in the software. There is one
403 called 'config', where, for the proxy case, the 'credentials' of VM are
404 provided
405 :param dict params_dict: parameters of the action
406 :param dict db_dict: where to write into database when the status changes.
407 It contains a dict with
408 {collection: <str>, filter: {}, path: <str>},
409 e.g. {collection: "nslcmops", filter:
410 {_id: <nslcmop_id>, path: "_admin.VCA"}
411 It will be used to store information about intermediate notifications
412 :param float progress_timeout:
413 :param float total_timeout:
414 :returns str: primitive result, if ok. It raises exceptions in case of fail
415 """
416
417 self.log.info(
418 "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
419 ee_id, primitive_name, params_dict, db_dict
420 )
421 )
422
423 # check arguments
424 if ee_id is None or len(ee_id) == 0:
425 raise N2VCBadArgumentsException(
426 message="ee_id is mandatory", bad_args=["ee_id"]
427 )
428 if primitive_name is None or len(primitive_name) == 0:
429 raise N2VCBadArgumentsException(
430 message="action_name is mandatory", bad_args=["action_name"]
431 )
432 if params_dict is None:
433 params_dict = dict()
434
435 try:
436 version, namespace, helm_id = self._get_ee_id_parts(ee_id)
437 ip_addr = socket.gethostbyname(helm_id)
438 except Exception as e:
439 self.log.error("Error getting ee ip ee: {}".format(e))
440 raise N2VCException("Error getting ee ip ee: {}".format(e))
441
442 if primitive_name == "config":
443 try:
444 # Execute config primitive, higher timeout to check the case ee is starting
445 status, detailed_message = await self._execute_config_primitive(
446 ip_addr, params_dict, db_dict=db_dict
447 )
448 self.log.debug(
449 "Executed config primitive ee_id_ {}, status: {}, message: {}".format(
450 ee_id, status, detailed_message
451 )
452 )
453 if status != "OK":
454 self.log.error(
455 "Error configuring helm ee, status: {}, message: {}".format(
456 status, detailed_message
457 )
458 )
459 raise N2VCExecutionException(
460 message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
461 ee_id, status, detailed_message
462 ),
463 primitive_name=primitive_name,
464 )
465 except Exception as e:
466 self.log.error("Error configuring helm ee: {}".format(e))
467 raise N2VCExecutionException(
468 message="Error configuring helm ee_id: {}, {}".format(ee_id, e),
469 primitive_name=primitive_name,
470 )
471 return "CONFIG OK"
472 else:
473 try:
474 # Execute primitive
475 status, detailed_message = await self._execute_primitive(
476 ip_addr, primitive_name, params_dict, db_dict=db_dict
477 )
478 self.log.debug(
479 "Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
480 primitive_name, ee_id, status, detailed_message
481 )
482 )
483 if status != "OK" and status != "PROCESSING":
484 self.log.error(
485 "Execute primitive {} returned not ok status: {}, message: {}".format(
486 primitive_name, status, detailed_message
487 )
488 )
489 raise N2VCExecutionException(
490 message="Execute primitive {} returned not ok status: {}, message: {}".format(
491 primitive_name, status, detailed_message
492 ),
493 primitive_name=primitive_name,
494 )
495 except Exception as e:
496 self.log.error(
497 "Error executing primitive {}: {}".format(primitive_name, e)
498 )
499 raise N2VCExecutionException(
500 message="Error executing primitive {} into ee={} : {}".format(
501 primitive_name, ee_id, e
502 ),
503 primitive_name=primitive_name,
504 )
505 return detailed_message
506
507 async def deregister_execution_environments(self):
508 # nothing to be done
509 pass
510
511 async def delete_execution_environment(
512 self,
513 ee_id: str,
514 db_dict: dict = None,
515 total_timeout: float = None,
516 **kwargs,
517 ):
518 """
519 Delete an execution environment
520 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
521 :param dict db_dict: where to write into database when the status changes.
522 It contains a dict with
523 {collection: <str>, filter: {}, path: <str>},
524 e.g. {collection: "nsrs", filter:
525 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
526 :param float total_timeout:
527 """
528
529 self.log.info("ee_id: {}".format(ee_id))
530
531 # check arguments
532 if ee_id is None:
533 raise N2VCBadArgumentsException(
534 message="ee_id is mandatory", bad_args=["ee_id"]
535 )
536
537 try:
538
539 # Obtain cluster_uuid
540 system_cluster_uuid = await self._get_system_cluster_id()
541
542 # Get helm_id
543 version, namespace, helm_id = self._get_ee_id_parts(ee_id)
544
545 # Uninstall chart, for backward compatibility we must assume that if there is no
546 # version it is helm-v2
547 if version == "helm-v3":
548 await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id)
549 else:
550 await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id)
551 self.log.info("ee_id: {} deleted".format(ee_id))
552 except N2VCException:
553 raise
554 except Exception as e:
555 self.log.error(
556 "Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True
557 )
558 raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e))
559
560 async def delete_namespace(
561 self, namespace: str, db_dict: dict = None, total_timeout: float = None
562 ):
563 # method not implemented for this connector, execution environments must be deleted individually
564 pass
565
566 async def install_k8s_proxy_charm(
567 self,
568 charm_name: str,
569 namespace: str,
570 artifact_path: str,
571 db_dict: dict,
572 progress_timeout: float = None,
573 total_timeout: float = None,
574 config: dict = None,
575 *kargs,
576 **kwargs,
577 ) -> str:
578 pass
579
580 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
581 async def _get_ssh_key(self, ip_addr):
582 channel = Channel(ip_addr, self._ee_service_port)
583 try:
584 stub = FrontendExecutorStub(channel)
585 self.log.debug("get ssh key, ip_addr: {}".format(ip_addr))
586 reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest())
587 return reply.message
588 finally:
589 channel.close()
590
591 @retryer(max_wait_time_var="_initial_retry_time", delay_time_var="_retry_delay")
592 async def _execute_config_primitive(self, ip_addr, params, db_dict=None):
593 return await self._execute_primitive_internal(
594 ip_addr, "config", params, db_dict=db_dict
595 )
596
597 @retryer(max_wait_time_var="_max_retry_time", delay_time_var="_retry_delay")
598 async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None):
599 return await self._execute_primitive_internal(
600 ip_addr, primitive_name, params, db_dict=db_dict
601 )
602
603 async def _execute_primitive_internal(
604 self, ip_addr, primitive_name, params, db_dict=None
605 ):
606
607 channel = Channel(ip_addr, self._ee_service_port)
608 try:
609 stub = FrontendExecutorStub(channel)
610 async with stub.RunPrimitive.open() as stream:
611 primitive_id = str(uuid.uuid1())
612 result = None
613 self.log.debug(
614 "Execute primitive internal: id:{}, name:{}, params: {}".format(
615 primitive_id, primitive_name, params
616 )
617 )
618 await stream.send_message(
619 PrimitiveRequest(
620 id=primitive_id, name=primitive_name, params=yaml.dump(params)
621 ),
622 end=True,
623 )
624 async for reply in stream:
625 self.log.debug("Received reply: {}".format(reply))
626 result = reply
627 # If db_dict provided write notifs in database
628 if db_dict:
629 self._write_op_detailed_status(
630 db_dict, reply.status, reply.detailed_message
631 )
632 if result:
633 return reply.status, reply.detailed_message
634 else:
635 return "ERROR", "No result received"
636 finally:
637 channel.close()
638
639 def _write_op_detailed_status(self, db_dict, status, detailed_message):
640
641 # write ee_id to database: _admin.deployed.VCA.x
642 try:
643 the_table = db_dict["collection"]
644 the_filter = db_dict["filter"]
645 update_dict = {"detailed-status": "{}: {}".format(status, detailed_message)}
646 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
647 self.db.set_one(
648 table=the_table,
649 q_filter=the_filter,
650 update_dict=update_dict,
651 fail_on_empty=True,
652 )
653 except asyncio.CancelledError:
654 raise
655 except Exception as e:
656 self.log.error("Error writing detailedStatus to database: {}".format(e))
657
658 async def _get_system_cluster_id(self):
659 if not self._system_cluster_id:
660 db_k8cluster = self.db.get_one(
661 "k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME}
662 )
663 k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id"))
664 if not k8s_hc_id:
665 try:
666 # backward compatibility for existing clusters that have not been initialized for helm v3
667 cluster_id = db_k8cluster.get("_id")
668 k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials"))
669 k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env(
670 k8s_credentials, reuse_cluster_uuid=cluster_id
671 )
672 db_k8scluster_update = {
673 "_admin.helm-chart-v3.error_msg": None,
674 "_admin.helm-chart-v3.id": k8s_hc_id,
675 "_admin.helm-chart-v3}.created": uninstall_sw,
676 "_admin.helm-chart-v3.operationalState": "ENABLED",
677 }
678 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
679 except Exception as e:
680 self.log.error(
681 "error initializing helm-v3 cluster: {}".format(str(e))
682 )
683 raise N2VCException(
684 "K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
685 cluster_id
686 )
687 )
688 self._system_cluster_id = k8s_hc_id
689 return self._system_cluster_id
690
691 def _get_ee_id_parts(self, ee_id):
692 """
693 Parses ee_id stored at database that can be either 'version:namespace.helm_id' or only
694 namespace.helm_id for backward compatibility
695 If exists helm version can be helm-v3 or helm (helm-v2 old version)
696 """
697 version, _, part_id = ee_id.rpartition(":")
698 namespace, _, helm_id = part_id.rpartition(".")
699 return version, namespace, helm_id