2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
25 from grpclib
.client
import Channel
27 from osm_lcm
.frontend_pb2
import PrimitiveRequest
28 from osm_lcm
.frontend_pb2
import SshKeyRequest
, SshKeyReply
29 from osm_lcm
.frontend_grpc
import FrontendExecutorStub
30 from osm_lcm
.lcm_utils
import LcmBase
32 from osm_lcm
.data_utils
.database
.database
import Database
33 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
35 from n2vc
.n2vc_conn
import N2VCConnector
36 from n2vc
.k8s_helm_conn
import K8sHelmConnector
37 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
38 from n2vc
.exceptions
import (
39 N2VCBadArgumentsException
,
41 N2VCExecutionException
,
44 from osm_lcm
.lcm_utils
import deep_get
47 def retryer(max_wait_time_var
="_initial_retry_time", delay_time_var
="_retry_delay"):
49 retry_exceptions
= ConnectionRefusedError
51 @functools.wraps(func
)
52 async def wrapped(*args
, **kwargs
):
53 # default values for wait time and delay_time
57 # obtain arguments from variable names
59 if self
.__dict
__.get(max_wait_time_var
):
60 max_wait_time
= self
.__dict
__.get(max_wait_time_var
)
61 if self
.__dict
__.get(delay_time_var
):
62 delay_time
= self
.__dict
__.get(delay_time_var
)
64 wait_time
= max_wait_time
67 return await func(*args
, **kwargs
)
68 except retry_exceptions
:
69 wait_time
= wait_time
- delay_time
70 await asyncio
.sleep(delay_time
)
73 return ConnectionRefusedError
80 class LCMHelmConn(N2VCConnector
, LcmBase
):
81 _KUBECTL_OSM_NAMESPACE
= "osm"
82 _KUBECTL_OSM_CLUSTER_NAME
= "_system-osm-k8s"
83 _EE_SERVICE_PORT
= 50050
85 # Initial max retry time
86 _MAX_INITIAL_RETRY_TIME
= 600
87 # Max retry time for normal operations
89 # Time beetween retries, retry time after a connection error is raised
96 vca_config
: dict = None,
100 Initialize EE helm connector.
103 self
.db
= Database().instance
.db
104 self
.fs
= Filesystem().instance
.fs
106 # parent class constructor
107 N2VCConnector
.__init
__(
108 self
, log
=log
, loop
=loop
, on_update_db
=on_update_db
, db
=self
.db
, fs
=self
.fs
111 self
.vca_config
= vca_config
112 self
.log
.debug("Initialize helm N2VC connector")
113 self
.log
.debug("initial vca_config: {}".format(vca_config
))
115 # TODO - Obtain data from configuration
116 self
._ee
_service
_port
= self
._EE
_SERVICE
_PORT
118 self
._retry
_delay
= self
._EE
_RETRY
_DELAY
120 if self
.vca_config
and self
.vca_config
.get("eegrpcinittimeout"):
121 self
._initial
_retry
_time
= self
.vca_config
.get("eegrpcinittimeout")
122 self
.log
.debug("Initial retry time: {}".format(self
._initial
_retry
_time
))
124 self
._initial
_retry
_time
= self
._MAX
_INITIAL
_RETRY
_TIME
126 "Applied default retry time: {}".format(self
._initial
_retry
_time
)
129 if self
.vca_config
and self
.vca_config
.get("eegrpctimeout"):
130 self
._max
_retry
_time
= self
.vca_config
.get("eegrpctimeout")
131 self
.log
.debug("Retry time: {}".format(self
._max
_retry
_time
))
133 self
._max
_retry
_time
= self
._MAX
_RETRY
_TIME
135 "Applied default retry time: {}".format(self
._max
_retry
_time
)
138 # initialize helm connector for helmv2 and helmv3
139 self
._k
8sclusterhelm
2 = K8sHelmConnector(
140 kubectl_command
=self
.vca_config
.get("kubectlpath"),
141 helm_command
=self
.vca_config
.get("helmpath"),
148 self
._k
8sclusterhelm
3 = K8sHelm3Connector(
149 kubectl_command
=self
.vca_config
.get("kubectlpath"),
150 helm_command
=self
.vca_config
.get("helm3path"),
157 self
._system
_cluster
_id
= None
158 self
.log
.info("Helm N2VC connector initialized")
160 # TODO - ¿reuse_ee_id?
161 async def create_execution_environment(
165 reuse_ee_id
: str = None,
166 progress_timeout
: float = None,
167 total_timeout
: float = None,
169 artifact_path
: str = None,
170 vca_type
: str = None,
175 Creates a new helm execution environment deploying the helm-chat indicated in the
177 :param str namespace: This param is not used, all helm charts are deployed in the osm
179 :param dict db_dict: where to write to database when the status changes.
180 It contains a dictionary with {collection: str, filter: {}, path: str},
181 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
182 "_admin.deployed.VCA.3"}
183 :param str reuse_ee_id: ee id from an older execution. TODO - right now this params is not used
184 :param float progress_timeout:
185 :param float total_timeout:
186 :param dict config: General variables to instantiate KDU
187 :param str artifact_path: path of package content
188 :param str vca_type: Type of vca, must be type helm or helm-v3
189 :returns str, dict: id of the new execution environment including namespace.helm_id
190 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
194 "create_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
195 "reuse_ee_id: {}".format(namespace
, artifact_path
, db_dict
, reuse_ee_id
)
198 # Validate artifact-path is provided
199 if artifact_path
is None or len(artifact_path
) == 0:
200 raise N2VCBadArgumentsException(
201 message
="artifact_path is mandatory", bad_args
=["artifact_path"]
204 # Validate artifact-path exists and sync path
205 from_path
= os
.path
.split(artifact_path
)[0]
206 self
.fs
.sync(from_path
)
208 # remove / in charm path
209 while artifact_path
.find("//") >= 0:
210 artifact_path
= artifact_path
.replace("//", "/")
213 if self
.fs
.file_exists(artifact_path
):
214 helm_chart_path
= artifact_path
216 msg
= "artifact path does not exist: {}".format(artifact_path
)
217 raise N2VCBadArgumentsException(message
=msg
, bad_args
=["artifact_path"])
219 if artifact_path
.startswith("/"):
220 full_path
= self
.fs
.path
+ helm_chart_path
222 full_path
= self
.fs
.path
+ "/" + helm_chart_path
224 while full_path
.find("//") >= 0:
225 full_path
= full_path
.replace("//", "/")
228 # Call helm conn install
229 # Obtain system cluster id from database
230 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
231 # Add parameter osm if exist to global
232 if config
and config
.get("osm"):
233 if not config
.get("global"):
234 config
["global"] = {}
235 config
["global"]["osm"] = config
.get("osm")
237 self
.log
.debug("install helm chart: {}".format(full_path
))
238 if vca_type
== "helm":
239 helm_id
= self
._k
8sclusterhelm
2.generate_kdu_instance_name(
243 await self
._k
8sclusterhelm
2.install(
246 kdu_instance
=helm_id
,
247 namespace
=self
._KUBECTL
_OSM
_NAMESPACE
,
250 timeout
=progress_timeout
,
253 helm_id
= self
._k
8sclusterhelm
2.generate_kdu_instance_name(
257 await self
._k
8sclusterhelm
3.install(
260 kdu_instance
=helm_id
,
261 namespace
=self
._KUBECTL
_OSM
_NAMESPACE
,
264 timeout
=progress_timeout
,
267 ee_id
= "{}:{}.{}".format(vca_type
, self
._KUBECTL
_OSM
_NAMESPACE
, helm_id
)
269 except N2VCException
:
271 except Exception as e
:
272 self
.log
.error("Error deploying chart ee: {}".format(e
), exc_info
=True)
273 raise N2VCException("Error deploying chart ee: {}".format(e
))
275 async def register_execution_environment(
280 progress_timeout
: float = None,
281 total_timeout
: float = None,
288 async def install_configuration_sw(self
, *args
, **kwargs
):
292 async def add_relation(self
, *args
, **kwargs
):
296 async def remove_relation(self
):
300 async def get_status(self
, *args
, **kwargs
):
301 # not used for this connector
304 async def get_ee_ssh_public__key(
308 progress_timeout
: float = None,
309 total_timeout
: float = None,
313 Obtains ssh-public key from ee executing GetSShKey method from the ee.
315 :param str ee_id: the id of the execution environment returned by
316 create_execution_environment or register_execution_environment
318 :param float progress_timeout:
319 :param float total_timeout:
320 :returns: public key of the execution environment
324 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id
, db_dict
)
328 if ee_id
is None or len(ee_id
) == 0:
329 raise N2VCBadArgumentsException(
330 message
="ee_id is mandatory", bad_args
=["ee_id"]
334 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
335 version
, namespace
, helm_id
= self
._get
_ee
_id
_parts
(ee_id
)
336 ip_addr
= socket
.gethostbyname(helm_id
)
338 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
339 # install libraries and start successfully
340 ssh_key
= await self
._get
_ssh
_key
(ip_addr
)
342 except Exception as e
:
343 self
.log
.error("Error obtaining ee ssh_key: {}".format(e
), exc_info
=True)
344 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e
))
346 async def upgrade_charm(
350 charm_id
: str = None,
351 charm_type
: str = None,
352 timeout
: float = None,
354 """This method upgrade charms in VNFs
356 This method does not support KDU's deployed with Helm.
359 ee_id: Execution environment id
360 path: Local path to the charm
362 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
363 timeout: (Float) Timeout for the ns update operation
366 the output of the update operation if status equals to "completed"
369 raise N2VCException("KDUs deployed with Helm do not support charm upgrade")
371 async def exec_primitive(
376 db_dict
: dict = None,
377 progress_timeout
: float = None,
378 total_timeout
: float = None,
382 Execute a primitive in the execution environment
384 :param str ee_id: the one returned by create_execution_environment or
385 register_execution_environment with the format namespace.helm_id
386 :param str primitive_name: must be one defined in the software. There is one
387 called 'config', where, for the proxy case, the 'credentials' of VM are
389 :param dict params_dict: parameters of the action
390 :param dict db_dict: where to write into database when the status changes.
391 It contains a dict with
392 {collection: <str>, filter: {}, path: <str>},
393 e.g. {collection: "nslcmops", filter:
394 {_id: <nslcmop_id>, path: "_admin.VCA"}
395 It will be used to store information about intermediate notifications
396 :param float progress_timeout:
397 :param float total_timeout:
398 :returns str: primitive result, if ok. It raises exceptions in case of fail
402 "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
403 ee_id
, primitive_name
, params_dict
, db_dict
408 if ee_id
is None or len(ee_id
) == 0:
409 raise N2VCBadArgumentsException(
410 message
="ee_id is mandatory", bad_args
=["ee_id"]
412 if primitive_name
is None or len(primitive_name
) == 0:
413 raise N2VCBadArgumentsException(
414 message
="action_name is mandatory", bad_args
=["action_name"]
416 if params_dict
is None:
420 version
, namespace
, helm_id
= self
._get
_ee
_id
_parts
(ee_id
)
421 ip_addr
= socket
.gethostbyname(helm_id
)
422 except Exception as e
:
423 self
.log
.error("Error getting ee ip ee: {}".format(e
))
424 raise N2VCException("Error getting ee ip ee: {}".format(e
))
426 if primitive_name
== "config":
428 # Execute config primitive, higher timeout to check the case ee is starting
429 status
, detailed_message
= await self
._execute
_config
_primitive
(
430 ip_addr
, params_dict
, db_dict
=db_dict
433 "Executed config primitive ee_id_ {}, status: {}, message: {}".format(
434 ee_id
, status
, detailed_message
439 "Error configuring helm ee, status: {}, message: {}".format(
440 status
, detailed_message
443 raise N2VCExecutionException(
444 message
="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
445 ee_id
, status
, detailed_message
447 primitive_name
=primitive_name
,
449 except Exception as e
:
450 self
.log
.error("Error configuring helm ee: {}".format(e
))
451 raise N2VCExecutionException(
452 message
="Error configuring helm ee_id: {}, {}".format(ee_id
, e
),
453 primitive_name
=primitive_name
,
459 status
, detailed_message
= await self
._execute
_primitive
(
460 ip_addr
, primitive_name
, params_dict
, db_dict
=db_dict
463 "Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
464 primitive_name
, ee_id
, status
, detailed_message
467 if status
!= "OK" and status
!= "PROCESSING":
469 "Execute primitive {} returned not ok status: {}, message: {}".format(
470 primitive_name
, status
, detailed_message
473 raise N2VCExecutionException(
474 message
="Execute primitive {} returned not ok status: {}, message: {}".format(
475 primitive_name
, status
, detailed_message
477 primitive_name
=primitive_name
,
479 except Exception as e
:
481 "Error executing primitive {}: {}".format(primitive_name
, e
)
483 raise N2VCExecutionException(
484 message
="Error executing primitive {} into ee={} : {}".format(
485 primitive_name
, ee_id
, e
487 primitive_name
=primitive_name
,
489 return detailed_message
491 async def deregister_execution_environments(self
):
495 async def delete_execution_environment(
498 db_dict
: dict = None,
499 total_timeout
: float = None,
503 Delete an execution environment
504 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
505 :param dict db_dict: where to write into database when the status changes.
506 It contains a dict with
507 {collection: <str>, filter: {}, path: <str>},
508 e.g. {collection: "nsrs", filter:
509 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
510 :param float total_timeout:
513 self
.log
.info("ee_id: {}".format(ee_id
))
517 raise N2VCBadArgumentsException(
518 message
="ee_id is mandatory", bad_args
=["ee_id"]
523 # Obtain cluster_uuid
524 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
527 version
, namespace
, helm_id
= self
._get
_ee
_id
_parts
(ee_id
)
529 # Uninstall chart, for backward compatibility we must assume that if there is no
530 # version it is helm-v2
531 if version
== "helm-v3":
532 await self
._k
8sclusterhelm
3.uninstall(system_cluster_uuid
, helm_id
)
534 await self
._k
8sclusterhelm
2.uninstall(system_cluster_uuid
, helm_id
)
535 self
.log
.info("ee_id: {} deleted".format(ee_id
))
536 except N2VCException
:
538 except Exception as e
:
540 "Error deleting ee id: {}: {}".format(ee_id
, e
), exc_info
=True
542 raise N2VCException("Error deleting ee id {}: {}".format(ee_id
, e
))
544 async def delete_namespace(
545 self
, namespace
: str, db_dict
: dict = None, total_timeout
: float = None
547 # method not implemented for this connector, execution environments must be deleted individually
550 async def install_k8s_proxy_charm(
556 progress_timeout
: float = None,
557 total_timeout
: float = None,
564 @retryer(max_wait_time_var
="_initial_retry_time", delay_time_var
="_retry_delay")
565 async def _get_ssh_key(self
, ip_addr
):
566 channel
= Channel(ip_addr
, self
._ee
_service
_port
)
568 stub
= FrontendExecutorStub(channel
)
569 self
.log
.debug("get ssh key, ip_addr: {}".format(ip_addr
))
570 reply
: SshKeyReply
= await stub
.GetSshKey(SshKeyRequest())
575 @retryer(max_wait_time_var
="_initial_retry_time", delay_time_var
="_retry_delay")
576 async def _execute_config_primitive(self
, ip_addr
, params
, db_dict
=None):
577 return await self
._execute
_primitive
_internal
(
578 ip_addr
, "config", params
, db_dict
=db_dict
581 @retryer(max_wait_time_var
="_max_retry_time", delay_time_var
="_retry_delay")
582 async def _execute_primitive(self
, ip_addr
, primitive_name
, params
, db_dict
=None):
583 return await self
._execute
_primitive
_internal
(
584 ip_addr
, primitive_name
, params
, db_dict
=db_dict
587 async def _execute_primitive_internal(
588 self
, ip_addr
, primitive_name
, params
, db_dict
=None
591 channel
= Channel(ip_addr
, self
._ee
_service
_port
)
593 stub
= FrontendExecutorStub(channel
)
594 async with stub
.RunPrimitive
.open() as stream
:
595 primitive_id
= str(uuid
.uuid1())
598 "Execute primitive internal: id:{}, name:{}, params: {}".format(
599 primitive_id
, primitive_name
, params
602 await stream
.send_message(
604 id=primitive_id
, name
=primitive_name
, params
=yaml
.dump(params
)
608 async for reply
in stream
:
609 self
.log
.debug("Received reply: {}".format(reply
))
611 # If db_dict provided write notifs in database
613 self
._write
_op
_detailed
_status
(
614 db_dict
, reply
.status
, reply
.detailed_message
617 return reply
.status
, reply
.detailed_message
619 return "ERROR", "No result received"
623 def _write_op_detailed_status(self
, db_dict
, status
, detailed_message
):
625 # write ee_id to database: _admin.deployed.VCA.x
627 the_table
= db_dict
["collection"]
628 the_filter
= db_dict
["filter"]
629 update_dict
= {"detailed-status": "{}: {}".format(status
, detailed_message
)}
630 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
634 update_dict
=update_dict
,
637 except asyncio
.CancelledError
:
639 except Exception as e
:
640 self
.log
.error("Error writing detailedStatus to database: {}".format(e
))
642 async def _get_system_cluster_id(self
):
643 if not self
._system
_cluster
_id
:
644 db_k8cluster
= self
.db
.get_one(
645 "k8sclusters", {"name": self
._KUBECTL
_OSM
_CLUSTER
_NAME
}
647 k8s_hc_id
= deep_get(db_k8cluster
, ("_admin", "helm-chart-v3", "id"))
650 # backward compatibility for existing clusters that have not been initialized for helm v3
651 cluster_id
= db_k8cluster
.get("_id")
652 k8s_credentials
= yaml
.safe_dump(db_k8cluster
.get("credentials"))
653 k8s_hc_id
, uninstall_sw
= await self
._k
8sclusterhelm
3.init_env(
654 k8s_credentials
, reuse_cluster_uuid
=cluster_id
656 db_k8scluster_update
= {
657 "_admin.helm-chart-v3.error_msg": None,
658 "_admin.helm-chart-v3.id": k8s_hc_id
,
659 "_admin.helm-chart-v3}.created": uninstall_sw
,
660 "_admin.helm-chart-v3.operationalState": "ENABLED",
662 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
663 except Exception as e
:
665 "error initializing helm-v3 cluster: {}".format(str(e
))
668 "K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
672 self
._system
_cluster
_id
= k8s_hc_id
673 return self
._system
_cluster
_id
675 def _get_ee_id_parts(self
, ee_id
):
677 Parses ee_id stored at database that can be either 'version:namespace.helm_id' or only
678 namespace.helm_id for backward compatibility
679 If exists helm version can be helm-v3 or helm (helm-v2 old version)
681 version
, _
, part_id
= ee_id
.rpartition(":")
682 namespace
, _
, helm_id
= part_id
.rpartition(".")
683 return version
, namespace
, helm_id