Added support for helm v3
[osm/LCM.git] / osm_lcm / lcm_helm_conn.py
1 ##
2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 ##
18 import functools
19 import yaml
20 import asyncio
21 import socket
22 import uuid
23 import os
24
25 from grpclib.client import Channel
26
27 from osm_lcm.frontend_pb2 import PrimitiveRequest
28 from osm_lcm.frontend_pb2 import SshKeyRequest, SshKeyReply
29 from osm_lcm.frontend_grpc import FrontendExecutorStub
30 from osm_lcm.lcm_utils import LcmBase
31
32 from n2vc.n2vc_conn import N2VCConnector
33 from n2vc.k8s_helm_conn import K8sHelmConnector
34 from n2vc.k8s_helm3_conn import K8sHelm3Connector
35 from n2vc.exceptions import N2VCBadArgumentsException, N2VCException, N2VCExecutionException
36
37 from osm_lcm.lcm_utils import deep_get
38
39
40 def retryer(max_wait_time=60, delay_time=10):
41 def wrapper(func):
42 retry_exceptions = (
43 ConnectionRefusedError
44 )
45
46 @functools.wraps(func)
47 async def wrapped(*args, **kwargs):
48 wait_time = max_wait_time
49 while wait_time > 0:
50 try:
51 return await func(*args, **kwargs)
52 except retry_exceptions:
53 wait_time = wait_time - delay_time
54 await asyncio.sleep(delay_time)
55 continue
56 else:
57 return ConnectionRefusedError
58 return wrapped
59 return wrapper
60
61
62 class LCMHelmConn(N2VCConnector, LcmBase):
63 _KUBECTL_OSM_NAMESPACE = "osm"
64 _KUBECTL_OSM_CLUSTER_NAME = "_system-osm-k8s"
65 _EE_SERVICE_PORT = 50050
66
67 # Time beetween retries
68 _EE_RETRY_DELAY = 10
69 # Initial max retry time
70 _MAX_INITIAL_RETRY_TIME = 300
71 # Other retry time
72 _MAX_RETRY_TIME = 30
73
74 def __init__(self,
75 db: object,
76 fs: object,
77 log: object = None,
78 loop: object = None,
79 url: str = None,
80 username: str = None,
81 vca_config: dict = None,
82 on_update_db=None, ):
83 """
84 Initialize EE helm connector.
85 """
86
87 # parent class constructor
88 N2VCConnector.__init__(
89 self,
90 db=db,
91 fs=fs,
92 log=log,
93 loop=loop,
94 url=url,
95 username=username,
96 vca_config=vca_config,
97 on_update_db=on_update_db,
98 )
99
100 self.log.debug("Initialize helm N2VC connector")
101
102 # TODO - Obtain data from configuration
103 self._ee_service_port = self._EE_SERVICE_PORT
104
105 self._retry_delay = self._EE_RETRY_DELAY
106 self._max_retry_time = self._MAX_RETRY_TIME
107 self._initial_retry_time = self._MAX_INITIAL_RETRY_TIME
108
109 # initialize helm connector for helmv2 and helmv3
110 self._k8sclusterhelm2 = K8sHelmConnector(
111 kubectl_command=self.vca_config.get("kubectlpath"),
112 helm_command=self.vca_config.get("helmpath"),
113 fs=self.fs,
114 log=self.log,
115 db=self.db,
116 on_update_db=None,
117 )
118
119 self._k8sclusterhelm3 = K8sHelm3Connector(
120 kubectl_command=self.vca_config.get("kubectlpath"),
121 helm_command=self.vca_config.get("helm3path"),
122 fs=self.fs,
123 log=self.log,
124 db=self.db,
125 on_update_db=None,
126 )
127
128 self._system_cluster_id = None
129 self.log.info("Helm N2VC connector initialized")
130
131 # TODO - ¿reuse_ee_id?
132 async def create_execution_environment(self,
133 namespace: str,
134 db_dict: dict,
135 reuse_ee_id: str = None,
136 progress_timeout: float = None,
137 total_timeout: float = None,
138 config: dict = None,
139 artifact_path: str = None,
140 vca_type: str = None) -> (str, dict):
141 """
142 Creates a new helm execution environment deploying the helm-chat indicated in the
143 attifact_path
144 :param str namespace: This param is not used, all helm charts are deployed in the osm
145 system namespace
146 :param dict db_dict: where to write to database when the status changes.
147 It contains a dictionary with {collection: str, filter: {}, path: str},
148 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
149 "_admin.deployed.VCA.3"}
150 :param str reuse_ee_id: ee id from an older execution. TODO - right now this params is not used
151 :param float progress_timeout:
152 :param float total_timeout:
153 :param dict config: General variables to instantiate KDU
154 :param str artifact_path: path of package content
155 :param str vca_type: Type of vca, must be type helm or helm-v3
156 :returns str, dict: id of the new execution environment including namespace.helm_id
157 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
158 """
159
160 self.log.info(
161 "create_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
162 "reuse_ee_id: {}".format(
163 namespace, artifact_path, db_dict, reuse_ee_id)
164 )
165
166 # Validate artifact-path is provided
167 if artifact_path is None or len(artifact_path) == 0:
168 raise N2VCBadArgumentsException(
169 message="artifact_path is mandatory", bad_args=["artifact_path"]
170 )
171
172 # Validate artifact-path exists and sync path
173 from_path = os.path.split(artifact_path)[0]
174 self.fs.sync(from_path)
175
176 # remove / in charm path
177 while artifact_path.find("//") >= 0:
178 artifact_path = artifact_path.replace("//", "/")
179
180 # check charm path
181 if self.fs.file_exists(artifact_path):
182 helm_chart_path = artifact_path
183 else:
184 msg = "artifact path does not exist: {}".format(artifact_path)
185 raise N2VCBadArgumentsException(message=msg, bad_args=["artifact_path"])
186
187 if artifact_path.startswith("/"):
188 full_path = self.fs.path + helm_chart_path
189 else:
190 full_path = self.fs.path + "/" + helm_chart_path
191
192 try:
193 # Call helm conn install
194 # Obtain system cluster id from database
195 system_cluster_uuid = await self._get_system_cluster_id()
196 # Add parameter osm if exist to global
197 if config and config.get("osm"):
198 if not config.get("global"):
199 config["global"] = {}
200 config["global"]["osm"] = config.get("osm")
201
202 self.log.debug("install helm chart: {}".format(full_path))
203 if vca_type == "helm":
204 helm_id = await self._k8sclusterhelm2.install(system_cluster_uuid, kdu_model=full_path,
205 namespace=self._KUBECTL_OSM_NAMESPACE,
206 params=config,
207 db_dict=db_dict,
208 timeout=progress_timeout)
209 else:
210 helm_id = await self._k8sclusterhelm3.install(system_cluster_uuid, kdu_model=full_path,
211 namespace=self._KUBECTL_OSM_NAMESPACE,
212 params=config,
213 db_dict=db_dict,
214 timeout=progress_timeout)
215
216 ee_id = "{}:{}.{}".format(vca_type, self._KUBECTL_OSM_NAMESPACE, helm_id)
217 return ee_id, None
218 except N2VCException:
219 raise
220 except Exception as e:
221 self.log.error("Error deploying chart ee: {}".format(e), exc_info=True)
222 raise N2VCException("Error deploying chart ee: {}".format(e))
223
224 async def register_execution_environment(self, namespace: str, credentials: dict, db_dict: dict,
225 progress_timeout: float = None, total_timeout: float = None) -> str:
226 # nothing to do
227 pass
228
229 async def install_configuration_sw(self,
230 ee_id: str,
231 artifact_path: str,
232 db_dict: dict,
233 progress_timeout: float = None,
234 total_timeout: float = None,
235 config: dict = None,
236 num_units: int = 1,
237 vca_type: str = None
238 ):
239 # nothing to do
240 pass
241
242 async def add_relation(self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str):
243 # nothing to do
244 pass
245
246 async def remove_relation(self):
247 # nothing to to
248 pass
249
250 async def get_status(self, namespace: str, yaml_format: bool = True):
251 # not used for this connector
252 pass
253
254 async def get_ee_ssh_public__key(self, ee_id: str, db_dict: dict, progress_timeout: float = None,
255 total_timeout: float = None) -> str:
256 """
257 Obtains ssh-public key from ee executing GetSShKey method from the ee.
258
259 :param str ee_id: the id of the execution environment returned by
260 create_execution_environment or register_execution_environment
261 :param dict db_dict:
262 :param float progress_timeout:
263 :param float total_timeout:
264 :returns: public key of the execution environment
265 """
266
267 self.log.info(
268 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(
269 ee_id, db_dict)
270 )
271
272 # check arguments
273 if ee_id is None or len(ee_id) == 0:
274 raise N2VCBadArgumentsException(
275 message="ee_id is mandatory", bad_args=["ee_id"]
276 )
277
278 try:
279 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
280 version, namespace, helm_id = self._get_ee_id_parts(ee_id)
281 ip_addr = socket.gethostbyname(helm_id)
282
283 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
284 # install libraries and start successfully
285 ssh_key = await self._get_ssh_key(ip_addr)
286 return ssh_key
287 except Exception as e:
288 self.log.error("Error obtaining ee ssh_key: {}".format(e), exc_info=True)
289 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e))
290
291 async def exec_primitive(self, ee_id: str, primitive_name: str, params_dict: dict, db_dict: dict = None,
292 progress_timeout: float = None, total_timeout: float = None) -> str:
293 """
294 Execute a primitive in the execution environment
295
296 :param str ee_id: the one returned by create_execution_environment or
297 register_execution_environment with the format namespace.helm_id
298 :param str primitive_name: must be one defined in the software. There is one
299 called 'config', where, for the proxy case, the 'credentials' of VM are
300 provided
301 :param dict params_dict: parameters of the action
302 :param dict db_dict: where to write into database when the status changes.
303 It contains a dict with
304 {collection: <str>, filter: {}, path: <str>},
305 e.g. {collection: "nslcmops", filter:
306 {_id: <nslcmop_id>, path: "_admin.VCA"}
307 It will be used to store information about intermediate notifications
308 :param float progress_timeout:
309 :param float total_timeout:
310 :returns str: primitive result, if ok. It raises exceptions in case of fail
311 """
312
313 self.log.info("exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
314 ee_id, primitive_name, params_dict, db_dict
315 ))
316
317 # check arguments
318 if ee_id is None or len(ee_id) == 0:
319 raise N2VCBadArgumentsException(
320 message="ee_id is mandatory", bad_args=["ee_id"]
321 )
322 if primitive_name is None or len(primitive_name) == 0:
323 raise N2VCBadArgumentsException(
324 message="action_name is mandatory", bad_args=["action_name"]
325 )
326 if params_dict is None:
327 params_dict = dict()
328
329 try:
330 version, namespace, helm_id = self._get_ee_id_parts(ee_id)
331 ip_addr = socket.gethostbyname(helm_id)
332 except Exception as e:
333 self.log.error("Error getting ee ip ee: {}".format(e))
334 raise N2VCException("Error getting ee ip ee: {}".format(e))
335
336 if primitive_name == "config":
337 try:
338 # Execute config primitive, higher timeout to check the case ee is starting
339 status, detailed_message = await self._execute_config_primitive(ip_addr, params_dict, db_dict=db_dict)
340 self.log.debug("Executed config primitive ee_id_ {}, status: {}, message: {}".format(
341 ee_id, status, detailed_message))
342 if status != "OK":
343 self.log.error("Error configuring helm ee, status: {}, message: {}".format(
344 status, detailed_message))
345 raise N2VCExecutionException(
346 message="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
347 ee_id, status, detailed_message
348 ),
349 primitive_name=primitive_name,
350 )
351 except Exception as e:
352 self.log.error("Error configuring helm ee: {}".format(e))
353 raise N2VCExecutionException(
354 message="Error configuring helm ee_id: {}, {}".format(
355 ee_id, e
356 ),
357 primitive_name=primitive_name,
358 )
359 return "CONFIG OK"
360 else:
361 try:
362 # Execute primitive
363 status, detailed_message = await self._execute_primitive(ip_addr, primitive_name,
364 params_dict, db_dict=db_dict)
365 self.log.debug("Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
366 primitive_name, ee_id, status, detailed_message))
367 if status != "OK" and status != "PROCESSING":
368 self.log.error(
369 "Execute primitive {} returned not ok status: {}, message: {}".format(
370 primitive_name, status, detailed_message)
371 )
372 raise N2VCExecutionException(
373 message="Execute primitive {} returned not ok status: {}, message: {}".format(
374 primitive_name, status, detailed_message
375 ),
376 primitive_name=primitive_name,
377 )
378 except Exception as e:
379 self.log.error(
380 "Error executing primitive {}: {}".format(primitive_name, e)
381 )
382 raise N2VCExecutionException(
383 message="Error executing primitive {} into ee={} : {}".format(
384 primitive_name, ee_id, e
385 ),
386 primitive_name=primitive_name,
387 )
388 return detailed_message
389
390 async def deregister_execution_environments(self):
391 # nothing to be done
392 pass
393
394 async def delete_execution_environment(self, ee_id: str, db_dict: dict = None, total_timeout: float = None):
395 """
396 Delete an execution environment
397 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
398 :param dict db_dict: where to write into database when the status changes.
399 It contains a dict with
400 {collection: <str>, filter: {}, path: <str>},
401 e.g. {collection: "nsrs", filter:
402 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
403 :param float total_timeout:
404 """
405
406 self.log.info("ee_id: {}".format(ee_id))
407
408 # check arguments
409 if ee_id is None:
410 raise N2VCBadArgumentsException(
411 message="ee_id is mandatory", bad_args=["ee_id"]
412 )
413
414 try:
415
416 # Obtain cluster_uuid
417 system_cluster_uuid = await self._get_system_cluster_id()
418
419 # Get helm_id
420 version, namespace, helm_id = self._get_ee_id_parts(ee_id)
421
422 # Uninstall chart, for backward compatibility we must assume that if there is no
423 # version it is helm-v2
424 if version == "helm-v3":
425 await self._k8sclusterhelm3.uninstall(system_cluster_uuid, helm_id)
426 else:
427 await self._k8sclusterhelm2.uninstall(system_cluster_uuid, helm_id)
428 self.log.info("ee_id: {} deleted".format(ee_id))
429 except N2VCException:
430 raise
431 except Exception as e:
432 self.log.error("Error deleting ee id: {}: {}".format(ee_id, e), exc_info=True)
433 raise N2VCException("Error deleting ee id {}: {}".format(ee_id, e))
434
435 async def delete_namespace(self, namespace: str, db_dict: dict = None, total_timeout: float = None):
436 # method not implemented for this connector, execution environments must be deleted individually
437 pass
438
439 async def install_k8s_proxy_charm(
440 self,
441 charm_name: str,
442 namespace: str,
443 artifact_path: str,
444 db_dict: dict,
445 progress_timeout: float = None,
446 total_timeout: float = None,
447 config: dict = None,
448 ) -> str:
449 pass
450
451 @retryer(max_wait_time=_MAX_INITIAL_RETRY_TIME, delay_time=_EE_RETRY_DELAY)
452 async def _get_ssh_key(self, ip_addr):
453 channel = Channel(ip_addr, self._ee_service_port)
454 try:
455 stub = FrontendExecutorStub(channel)
456 self.log.debug("get ssh key, ip_addr: {}".format(ip_addr))
457 reply: SshKeyReply = await stub.GetSshKey(SshKeyRequest())
458 return reply.message
459 finally:
460 channel.close()
461
462 @retryer(max_wait_time=_MAX_INITIAL_RETRY_TIME, delay_time=_EE_RETRY_DELAY)
463 async def _execute_config_primitive(self, ip_addr, params, db_dict=None):
464 return await self._execute_primitive_internal(ip_addr, "config", params, db_dict=db_dict)
465
466 @retryer(max_wait_time=_MAX_RETRY_TIME, delay_time=_EE_RETRY_DELAY)
467 async def _execute_primitive(self, ip_addr, primitive_name, params, db_dict=None):
468 return await self._execute_primitive_internal(ip_addr, primitive_name, params, db_dict=db_dict)
469
470 async def _execute_primitive_internal(self, ip_addr, primitive_name, params, db_dict=None):
471
472 channel = Channel(ip_addr, self._ee_service_port)
473 try:
474 stub = FrontendExecutorStub(channel)
475 async with stub.RunPrimitive.open() as stream:
476 primitive_id = str(uuid.uuid1())
477 result = None
478 self.log.debug("Execute primitive internal: id:{}, name:{}, params: {}".
479 format(primitive_id, primitive_name, params))
480 await stream.send_message(
481 PrimitiveRequest(id=primitive_id, name=primitive_name, params=yaml.dump(params)), end=True)
482 async for reply in stream:
483 self.log.debug("Received reply: {}".format(reply))
484 result = reply
485 # If db_dict provided write notifs in database
486 if db_dict:
487 self._write_op_detailed_status(db_dict, reply.status, reply.detailed_message)
488 if result:
489 return reply.status, reply.detailed_message
490 else:
491 return "ERROR", "No result received"
492 finally:
493 channel.close()
494
495 def _write_op_detailed_status(self, db_dict, status, detailed_message):
496
497 # write ee_id to database: _admin.deployed.VCA.x
498 try:
499 the_table = db_dict["collection"]
500 the_filter = db_dict["filter"]
501 update_dict = {"detailed-status": "{}: {}".format(status, detailed_message)}
502 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
503 self.db.set_one(
504 table=the_table,
505 q_filter=the_filter,
506 update_dict=update_dict,
507 fail_on_empty=True,
508 )
509 except asyncio.CancelledError:
510 raise
511 except Exception as e:
512 self.log.error("Error writing detailedStatus to database: {}".format(e))
513
514 async def _get_system_cluster_id(self):
515 if not self._system_cluster_id:
516 db_k8cluster = self.db.get_one("k8sclusters", {"name": self._KUBECTL_OSM_CLUSTER_NAME})
517 k8s_hc_id = deep_get(db_k8cluster, ("_admin", "helm-chart-v3", "id"))
518 if not k8s_hc_id:
519 try:
520 # backward compatibility for existing clusters that have not been initialized for helm v3
521 cluster_id = db_k8cluster.get("_id")
522 k8s_credentials = yaml.safe_dump(db_k8cluster.get("credentials"))
523 k8s_hc_id, uninstall_sw = await self._k8sclusterhelm3.init_env(k8s_credentials,
524 reuse_cluster_uuid=cluster_id)
525 db_k8scluster_update = {"_admin.helm-chart-v3.error_msg": None,
526 "_admin.helm-chart-v3.id": k8s_hc_id,
527 "_admin.helm-chart-v3}.created": uninstall_sw,
528 "_admin.helm-chart-v3.operationalState": "ENABLED"}
529 self.update_db_2("k8sclusters", cluster_id, db_k8scluster_update)
530 except Exception as e:
531 self.log.error("error initializing helm-v3 cluster: {}".format(str(e)))
532 raise N2VCException("K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
533 cluster_id))
534 self._system_cluster_id = k8s_hc_id
535 return self._system_cluster_id
536
537 def _get_ee_id_parts(self, ee_id):
538 """
539 Parses ee_id stored at database that can be either 'version:namespace.helm_id' or only
540 namespace.helm_id for backward compatibility
541 If exists helm version can be helm-v3 or helm (helm-v2 old version)
542 """
543 version, _, part_id = ee_id.rpartition(':')
544 namespace, _, helm_id = part_id.rpartition('.')
545 return version, namespace, helm_id