2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
25 from grpclib
.client
import Channel
27 from osm_lcm
.frontend_pb2
import PrimitiveRequest
28 from osm_lcm
.frontend_pb2
import SshKeyRequest
, SshKeyReply
29 from osm_lcm
.frontend_grpc
import FrontendExecutorStub
30 from osm_lcm
.lcm_utils
import LcmBase
32 from osm_lcm
.data_utils
.database
.database
import Database
33 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
35 from n2vc
.n2vc_conn
import N2VCConnector
36 from n2vc
.k8s_helm_conn
import K8sHelmConnector
37 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
38 from n2vc
.exceptions
import N2VCBadArgumentsException
, N2VCException
, N2VCExecutionException
40 from osm_lcm
.lcm_utils
import deep_get
43 def retryer(max_wait_time_var
="_initial_retry_time", delay_time_var
="_retry_delay"):
46 ConnectionRefusedError
49 @functools.wraps(func
)
50 async def wrapped(*args
, **kwargs
):
51 # default values for wait time and delay_time
55 # obtain arguments from variable names
57 if self
.__dict
__.get(max_wait_time_var
):
58 max_wait_time
= self
.__dict
__.get(max_wait_time_var
)
59 if self
.__dict
__.get(delay_time_var
):
60 delay_time
= self
.__dict
__.get(delay_time_var
)
62 wait_time
= max_wait_time
65 return await func(*args
, **kwargs
)
66 except retry_exceptions
:
67 wait_time
= wait_time
- delay_time
68 await asyncio
.sleep(delay_time
)
71 return ConnectionRefusedError
76 class LCMHelmConn(N2VCConnector
, LcmBase
):
77 _KUBECTL_OSM_NAMESPACE
= "osm"
78 _KUBECTL_OSM_CLUSTER_NAME
= "_system-osm-k8s"
79 _EE_SERVICE_PORT
= 50050
81 # Initial max retry time
82 _MAX_INITIAL_RETRY_TIME
= 600
83 # Max retry time for normal operations
85 # Time beetween retries, retry time after a connection error is raised
91 vca_config
: dict = None,
94 Initialize EE helm connector.
97 self
.db
= Database().instance
.db
98 self
.fs
= Filesystem().instance
.fs
100 # parent class constructor
101 N2VCConnector
.__init
__(
105 on_update_db
=on_update_db
,
110 self
.vca_config
= vca_config
111 self
.log
.debug("Initialize helm N2VC connector")
112 self
.log
.debug("initial vca_config: {}".format(vca_config
))
114 # TODO - Obtain data from configuration
115 self
._ee
_service
_port
= self
._EE
_SERVICE
_PORT
117 self
._retry
_delay
= self
._EE
_RETRY
_DELAY
119 if self
.vca_config
and self
.vca_config
.get("eegrpcinittimeout"):
120 self
._initial
_retry
_time
= self
.vca_config
.get("eegrpcinittimeout")
121 self
.log
.debug("Initial retry time: {}".format(self
._initial
_retry
_time
))
123 self
._initial
_retry
_time
= self
._MAX
_INITIAL
_RETRY
_TIME
124 self
.log
.debug("Applied default retry time: {}".format(self
._initial
_retry
_time
))
126 if self
.vca_config
and self
.vca_config
.get("eegrpctimeout"):
127 self
._max
_retry
_time
= self
.vca_config
.get("eegrpctimeout")
128 self
.log
.debug("Retry time: {}".format(self
._max
_retry
_time
))
130 self
._max
_retry
_time
= self
._MAX
_RETRY
_TIME
131 self
.log
.debug("Applied default retry time: {}".format(self
._max
_retry
_time
))
133 # initialize helm connector for helmv2 and helmv3
134 self
._k
8sclusterhelm
2 = K8sHelmConnector(
135 kubectl_command
=self
.vca_config
.get("kubectlpath"),
136 helm_command
=self
.vca_config
.get("helmpath"),
143 self
._k
8sclusterhelm
3 = K8sHelm3Connector(
144 kubectl_command
=self
.vca_config
.get("kubectlpath"),
145 helm_command
=self
.vca_config
.get("helm3path"),
152 self
._system
_cluster
_id
= None
153 self
.log
.info("Helm N2VC connector initialized")
155 # TODO - ¿reuse_ee_id?
156 async def create_execution_environment(self
,
159 reuse_ee_id
: str = None,
160 progress_timeout
: float = None,
161 total_timeout
: float = None,
163 artifact_path
: str = None,
164 vca_type
: str = None,
165 *kargs
, **kwargs
) -> (str, dict):
167 Creates a new helm execution environment deploying the helm-chat indicated in the
169 :param str namespace: This param is not used, all helm charts are deployed in the osm
171 :param dict db_dict: where to write to database when the status changes.
172 It contains a dictionary with {collection: str, filter: {}, path: str},
173 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
174 "_admin.deployed.VCA.3"}
175 :param str reuse_ee_id: ee id from an older execution. TODO - right now this params is not used
176 :param float progress_timeout:
177 :param float total_timeout:
178 :param dict config: General variables to instantiate KDU
179 :param str artifact_path: path of package content
180 :param str vca_type: Type of vca, must be type helm or helm-v3
181 :returns str, dict: id of the new execution environment including namespace.helm_id
182 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
186 "create_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
187 "reuse_ee_id: {}".format(
188 namespace
, artifact_path
, db_dict
, reuse_ee_id
)
191 # Validate artifact-path is provided
192 if artifact_path
is None or len(artifact_path
) == 0:
193 raise N2VCBadArgumentsException(
194 message
="artifact_path is mandatory", bad_args
=["artifact_path"]
197 # Validate artifact-path exists and sync path
198 from_path
= os
.path
.split(artifact_path
)[0]
199 self
.fs
.sync(from_path
)
201 # remove / in charm path
202 while artifact_path
.find("//") >= 0:
203 artifact_path
= artifact_path
.replace("//", "/")
206 if self
.fs
.file_exists(artifact_path
):
207 helm_chart_path
= artifact_path
209 msg
= "artifact path does not exist: {}".format(artifact_path
)
210 raise N2VCBadArgumentsException(message
=msg
, bad_args
=["artifact_path"])
212 if artifact_path
.startswith("/"):
213 full_path
= self
.fs
.path
+ helm_chart_path
215 full_path
= self
.fs
.path
+ "/" + helm_chart_path
217 while full_path
.find("//") >= 0:
218 full_path
= full_path
.replace("//", "/")
221 # Call helm conn install
222 # Obtain system cluster id from database
223 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
224 # Add parameter osm if exist to global
225 if config
and config
.get("osm"):
226 if not config
.get("global"):
227 config
["global"] = {}
228 config
["global"]["osm"] = config
.get("osm")
230 self
.log
.debug("install helm chart: {}".format(full_path
))
231 if vca_type
== "helm":
232 helm_id
= self
._k
8sclusterhelm
2.generate_kdu_instance_name(
236 await self
._k
8sclusterhelm
2.install(system_cluster_uuid
, kdu_model
=full_path
,
237 kdu_instance
=helm_id
,
238 namespace
=self
._KUBECTL
_OSM
_NAMESPACE
,
241 timeout
=progress_timeout
)
243 helm_id
= self
._k
8sclusterhelm
2.generate_kdu_instance_name(
247 await self
._k
8sclusterhelm
3.install(system_cluster_uuid
, kdu_model
=full_path
,
248 kdu_instance
=helm_id
,
249 namespace
=self
._KUBECTL
_OSM
_NAMESPACE
,
252 timeout
=progress_timeout
)
254 ee_id
= "{}:{}.{}".format(vca_type
, self
._KUBECTL
_OSM
_NAMESPACE
, helm_id
)
256 except N2VCException
:
258 except Exception as e
:
259 self
.log
.error("Error deploying chart ee: {}".format(e
), exc_info
=True)
260 raise N2VCException("Error deploying chart ee: {}".format(e
))
262 async def register_execution_environment(self
, namespace
: str, credentials
: dict, db_dict
: dict,
263 progress_timeout
: float = None, total_timeout
: float = None,
264 *kargs
, **kwargs
) -> str:
268 async def install_configuration_sw(self
,
272 progress_timeout
: float = None,
273 total_timeout
: float = None,
281 async def add_relation(self
, ee_id_1
: str, ee_id_2
: str, endpoint_1
: str, endpoint_2
: str):
285 async def remove_relation(self
):
289 async def get_status(self
, namespace
: str, yaml_format
: bool = True):
290 # not used for this connector
293 async def get_ee_ssh_public__key(self
, ee_id
: str, db_dict
: dict, progress_timeout
: float = None,
294 total_timeout
: float = None) -> str:
296 Obtains ssh-public key from ee executing GetSShKey method from the ee.
298 :param str ee_id: the id of the execution environment returned by
299 create_execution_environment or register_execution_environment
301 :param float progress_timeout:
302 :param float total_timeout:
303 :returns: public key of the execution environment
307 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(
312 if ee_id
is None or len(ee_id
) == 0:
313 raise N2VCBadArgumentsException(
314 message
="ee_id is mandatory", bad_args
=["ee_id"]
318 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
319 version
, namespace
, helm_id
= self
._get
_ee
_id
_parts
(ee_id
)
320 ip_addr
= socket
.gethostbyname(helm_id
)
322 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
323 # install libraries and start successfully
324 ssh_key
= await self
._get
_ssh
_key
(ip_addr
)
326 except Exception as e
:
327 self
.log
.error("Error obtaining ee ssh_key: {}".format(e
), exc_info
=True)
328 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e
))
330 async def exec_primitive(self
, ee_id
: str, primitive_name
: str, params_dict
: dict, db_dict
: dict = None,
331 progress_timeout
: float = None, total_timeout
: float = None) -> str:
333 Execute a primitive in the execution environment
335 :param str ee_id: the one returned by create_execution_environment or
336 register_execution_environment with the format namespace.helm_id
337 :param str primitive_name: must be one defined in the software. There is one
338 called 'config', where, for the proxy case, the 'credentials' of VM are
340 :param dict params_dict: parameters of the action
341 :param dict db_dict: where to write into database when the status changes.
342 It contains a dict with
343 {collection: <str>, filter: {}, path: <str>},
344 e.g. {collection: "nslcmops", filter:
345 {_id: <nslcmop_id>, path: "_admin.VCA"}
346 It will be used to store information about intermediate notifications
347 :param float progress_timeout:
348 :param float total_timeout:
349 :returns str: primitive result, if ok. It raises exceptions in case of fail
352 self
.log
.info("exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
353 ee_id
, primitive_name
, params_dict
, db_dict
357 if ee_id
is None or len(ee_id
) == 0:
358 raise N2VCBadArgumentsException(
359 message
="ee_id is mandatory", bad_args
=["ee_id"]
361 if primitive_name
is None or len(primitive_name
) == 0:
362 raise N2VCBadArgumentsException(
363 message
="action_name is mandatory", bad_args
=["action_name"]
365 if params_dict
is None:
369 version
, namespace
, helm_id
= self
._get
_ee
_id
_parts
(ee_id
)
370 ip_addr
= socket
.gethostbyname(helm_id
)
371 except Exception as e
:
372 self
.log
.error("Error getting ee ip ee: {}".format(e
))
373 raise N2VCException("Error getting ee ip ee: {}".format(e
))
375 if primitive_name
== "config":
377 # Execute config primitive, higher timeout to check the case ee is starting
378 status
, detailed_message
= await self
._execute
_config
_primitive
(ip_addr
, params_dict
, db_dict
=db_dict
)
379 self
.log
.debug("Executed config primitive ee_id_ {}, status: {}, message: {}".format(
380 ee_id
, status
, detailed_message
))
382 self
.log
.error("Error configuring helm ee, status: {}, message: {}".format(
383 status
, detailed_message
))
384 raise N2VCExecutionException(
385 message
="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
386 ee_id
, status
, detailed_message
388 primitive_name
=primitive_name
,
390 except Exception as e
:
391 self
.log
.error("Error configuring helm ee: {}".format(e
))
392 raise N2VCExecutionException(
393 message
="Error configuring helm ee_id: {}, {}".format(
396 primitive_name
=primitive_name
,
402 status
, detailed_message
= await self
._execute
_primitive
(ip_addr
, primitive_name
,
403 params_dict
, db_dict
=db_dict
)
404 self
.log
.debug("Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
405 primitive_name
, ee_id
, status
, detailed_message
))
406 if status
!= "OK" and status
!= "PROCESSING":
408 "Execute primitive {} returned not ok status: {}, message: {}".format(
409 primitive_name
, status
, detailed_message
)
411 raise N2VCExecutionException(
412 message
="Execute primitive {} returned not ok status: {}, message: {}".format(
413 primitive_name
, status
, detailed_message
415 primitive_name
=primitive_name
,
417 except Exception as e
:
419 "Error executing primitive {}: {}".format(primitive_name
, e
)
421 raise N2VCExecutionException(
422 message
="Error executing primitive {} into ee={} : {}".format(
423 primitive_name
, ee_id
, e
425 primitive_name
=primitive_name
,
427 return detailed_message
429 async def deregister_execution_environments(self
):
433 async def delete_execution_environment(
436 db_dict
: dict = None,
437 total_timeout
: float = None,
441 Delete an execution environment
442 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
443 :param dict db_dict: where to write into database when the status changes.
444 It contains a dict with
445 {collection: <str>, filter: {}, path: <str>},
446 e.g. {collection: "nsrs", filter:
447 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
448 :param float total_timeout:
451 self
.log
.info("ee_id: {}".format(ee_id
))
455 raise N2VCBadArgumentsException(
456 message
="ee_id is mandatory", bad_args
=["ee_id"]
461 # Obtain cluster_uuid
462 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
465 version
, namespace
, helm_id
= self
._get
_ee
_id
_parts
(ee_id
)
467 # Uninstall chart, for backward compatibility we must assume that if there is no
468 # version it is helm-v2
469 if version
== "helm-v3":
470 await self
._k
8sclusterhelm
3.uninstall(system_cluster_uuid
, helm_id
)
472 await self
._k
8sclusterhelm
2.uninstall(system_cluster_uuid
, helm_id
)
473 self
.log
.info("ee_id: {} deleted".format(ee_id
))
474 except N2VCException
:
476 except Exception as e
:
477 self
.log
.error("Error deleting ee id: {}: {}".format(ee_id
, e
), exc_info
=True)
478 raise N2VCException("Error deleting ee id {}: {}".format(ee_id
, e
))
480 async def delete_namespace(self
, namespace
: str, db_dict
: dict = None, total_timeout
: float = None):
481 # method not implemented for this connector, execution environments must be deleted individually
484 async def install_k8s_proxy_charm(
490 progress_timeout
: float = None,
491 total_timeout
: float = None,
497 @retryer(max_wait_time_var
="_initial_retry_time", delay_time_var
="_retry_delay")
498 async def _get_ssh_key(self
, ip_addr
):
499 channel
= Channel(ip_addr
, self
._ee
_service
_port
)
501 stub
= FrontendExecutorStub(channel
)
502 self
.log
.debug("get ssh key, ip_addr: {}".format(ip_addr
))
503 reply
: SshKeyReply
= await stub
.GetSshKey(SshKeyRequest())
508 @retryer(max_wait_time_var
="_initial_retry_time", delay_time_var
="_retry_delay")
509 async def _execute_config_primitive(self
, ip_addr
, params
, db_dict
=None):
510 return await self
._execute
_primitive
_internal
(ip_addr
, "config", params
, db_dict
=db_dict
)
512 @retryer(max_wait_time_var
="_max_retry_time", delay_time_var
="_retry_delay")
513 async def _execute_primitive(self
, ip_addr
, primitive_name
, params
, db_dict
=None):
514 return await self
._execute
_primitive
_internal
(ip_addr
, primitive_name
, params
, db_dict
=db_dict
)
516 async def _execute_primitive_internal(self
, ip_addr
, primitive_name
, params
, db_dict
=None):
518 channel
= Channel(ip_addr
, self
._ee
_service
_port
)
520 stub
= FrontendExecutorStub(channel
)
521 async with stub
.RunPrimitive
.open() as stream
:
522 primitive_id
= str(uuid
.uuid1())
524 self
.log
.debug("Execute primitive internal: id:{}, name:{}, params: {}".
525 format(primitive_id
, primitive_name
, params
))
526 await stream
.send_message(
527 PrimitiveRequest(id=primitive_id
, name
=primitive_name
, params
=yaml
.dump(params
)), end
=True)
528 async for reply
in stream
:
529 self
.log
.debug("Received reply: {}".format(reply
))
531 # If db_dict provided write notifs in database
533 self
._write
_op
_detailed
_status
(db_dict
, reply
.status
, reply
.detailed_message
)
535 return reply
.status
, reply
.detailed_message
537 return "ERROR", "No result received"
541 def _write_op_detailed_status(self
, db_dict
, status
, detailed_message
):
543 # write ee_id to database: _admin.deployed.VCA.x
545 the_table
= db_dict
["collection"]
546 the_filter
= db_dict
["filter"]
547 update_dict
= {"detailed-status": "{}: {}".format(status
, detailed_message
)}
548 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
552 update_dict
=update_dict
,
555 except asyncio
.CancelledError
:
557 except Exception as e
:
558 self
.log
.error("Error writing detailedStatus to database: {}".format(e
))
560 async def _get_system_cluster_id(self
):
561 if not self
._system
_cluster
_id
:
562 db_k8cluster
= self
.db
.get_one("k8sclusters", {"name": self
._KUBECTL
_OSM
_CLUSTER
_NAME
})
563 k8s_hc_id
= deep_get(db_k8cluster
, ("_admin", "helm-chart-v3", "id"))
566 # backward compatibility for existing clusters that have not been initialized for helm v3
567 cluster_id
= db_k8cluster
.get("_id")
568 k8s_credentials
= yaml
.safe_dump(db_k8cluster
.get("credentials"))
569 k8s_hc_id
, uninstall_sw
= await self
._k
8sclusterhelm
3.init_env(k8s_credentials
,
570 reuse_cluster_uuid
=cluster_id
)
571 db_k8scluster_update
= {"_admin.helm-chart-v3.error_msg": None,
572 "_admin.helm-chart-v3.id": k8s_hc_id
,
573 "_admin.helm-chart-v3}.created": uninstall_sw
,
574 "_admin.helm-chart-v3.operationalState": "ENABLED"}
575 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
576 except Exception as e
:
577 self
.log
.error("error initializing helm-v3 cluster: {}".format(str(e
)))
578 raise N2VCException("K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
580 self
._system
_cluster
_id
= k8s_hc_id
581 return self
._system
_cluster
_id
583 def _get_ee_id_parts(self
, ee_id
):
585 Parses ee_id stored at database that can be either 'version:namespace.helm_id' or only
586 namespace.helm_id for backward compatibility
587 If exists helm version can be helm-v3 or helm (helm-v2 old version)
589 version
, _
, part_id
= ee_id
.rpartition(':')
590 namespace
, _
, helm_id
= part_id
.rpartition('.')
591 return version
, namespace
, helm_id