2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
25 from grpclib
.client
import Channel
27 from osm_lcm
.frontend_pb2
import PrimitiveRequest
28 from osm_lcm
.frontend_pb2
import SshKeyRequest
, SshKeyReply
29 from osm_lcm
.frontend_grpc
import FrontendExecutorStub
30 from osm_lcm
.lcm_utils
import LcmBase
, get_ee_id_parts
32 from osm_lcm
.data_utils
.database
.database
import Database
33 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
35 from n2vc
.n2vc_conn
import N2VCConnector
36 from n2vc
.k8s_helm_conn
import K8sHelmConnector
37 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
38 from n2vc
.exceptions
import (
39 N2VCBadArgumentsException
,
41 N2VCExecutionException
,
44 from osm_lcm
.lcm_utils
import deep_get
47 def retryer(max_wait_time_var
="_initial_retry_time", delay_time_var
="_retry_delay"):
49 retry_exceptions
= ConnectionRefusedError
51 @functools.wraps(func
)
52 async def wrapped(*args
, **kwargs
):
53 # default values for wait time and delay_time
57 # obtain arguments from variable names
59 if self
.__dict
__.get(max_wait_time_var
):
60 max_wait_time
= self
.__dict
__.get(max_wait_time_var
)
61 if self
.__dict
__.get(delay_time_var
):
62 delay_time
= self
.__dict
__.get(delay_time_var
)
64 wait_time
= max_wait_time
67 return await func(*args
, **kwargs
)
68 except retry_exceptions
:
69 wait_time
= wait_time
- delay_time
70 await asyncio
.sleep(delay_time
)
73 return ConnectionRefusedError
80 class LCMHelmConn(N2VCConnector
, LcmBase
):
81 _KUBECTL_OSM_NAMESPACE
= "osm"
82 _KUBECTL_OSM_CLUSTER_NAME
= "_system-osm-k8s"
83 _EE_SERVICE_PORT
= 50050
85 # Initial max retry time
86 _MAX_INITIAL_RETRY_TIME
= 600
87 # Max retry time for normal operations
89 # Time beetween retries, retry time after a connection error is raised
96 vca_config
: dict = None,
100 Initialize EE helm connector.
103 self
.db
= Database().instance
.db
104 self
.fs
= Filesystem().instance
.fs
106 # parent class constructor
107 N2VCConnector
.__init
__(
108 self
, log
=log
, loop
=loop
, on_update_db
=on_update_db
, db
=self
.db
, fs
=self
.fs
111 self
.vca_config
= vca_config
112 self
.log
.debug("Initialize helm N2VC connector")
113 self
.log
.debug("initial vca_config: {}".format(vca_config
))
115 # TODO - Obtain data from configuration
116 self
._ee
_service
_port
= self
._EE
_SERVICE
_PORT
118 self
._retry
_delay
= self
._EE
_RETRY
_DELAY
120 if self
.vca_config
and self
.vca_config
.get("eegrpcinittimeout"):
121 self
._initial
_retry
_time
= self
.vca_config
.get("eegrpcinittimeout")
122 self
.log
.debug("Initial retry time: {}".format(self
._initial
_retry
_time
))
124 self
._initial
_retry
_time
= self
._MAX
_INITIAL
_RETRY
_TIME
126 "Applied default retry time: {}".format(self
._initial
_retry
_time
)
129 if self
.vca_config
and self
.vca_config
.get("eegrpctimeout"):
130 self
._max
_retry
_time
= self
.vca_config
.get("eegrpctimeout")
131 self
.log
.debug("Retry time: {}".format(self
._max
_retry
_time
))
133 self
._max
_retry
_time
= self
._MAX
_RETRY
_TIME
135 "Applied default retry time: {}".format(self
._max
_retry
_time
)
138 # initialize helm connector for helmv2 and helmv3
139 self
._k
8sclusterhelm
2 = K8sHelmConnector(
140 kubectl_command
=self
.vca_config
.get("kubectlpath"),
141 helm_command
=self
.vca_config
.get("helmpath"),
148 self
._k
8sclusterhelm
3 = K8sHelm3Connector(
149 kubectl_command
=self
.vca_config
.get("kubectlpath"),
150 helm_command
=self
.vca_config
.get("helm3path"),
157 self
._system
_cluster
_id
= None
158 self
.log
.info("Helm N2VC connector initialized")
160 # TODO - ¿reuse_ee_id?
161 async def create_execution_environment(
165 reuse_ee_id
: str = None,
166 progress_timeout
: float = None,
167 total_timeout
: float = None,
169 artifact_path
: str = None,
170 chart_model
: str = None,
171 vca_type
: str = None,
176 Creates a new helm execution environment deploying the helm-chat indicated in the
178 :param str namespace: This param is not used, all helm charts are deployed in the osm
180 :param dict db_dict: where to write to database when the status changes.
181 It contains a dictionary with {collection: str, filter: {}, path: str},
182 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
183 "_admin.deployed.VCA.3"}
184 :param str reuse_ee_id: ee id from an older execution. TODO - right now this param is not used
185 :param float progress_timeout:
186 :param float total_timeout:
187 :param dict config: General variables to instantiate KDU
188 :param str artifact_path: path of package content
189 :param str chart_model: helm chart/reference (string), which can be either
191 - a name of chart available via the repos known by OSM
192 (e.g. stable/openldap, stable/openldap:1.2.4)
193 - a path to a packaged chart (e.g. mychart.tgz)
194 - a path to an unpacked chart directory or a URL (e.g. mychart)
195 :param str vca_type: Type of vca, must be type helm or helm-v3
196 :returns str, dict: id of the new execution environment including namespace.helm_id
197 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
201 "create_execution_environment: namespace: {}, artifact_path: {}, "
202 "chart_model: {}, db_dict: {}, reuse_ee_id: {}".format(
203 namespace
, artifact_path
, db_dict
, chart_model
, reuse_ee_id
207 # Validate artifact-path is provided
208 if artifact_path
is None or len(artifact_path
) == 0:
209 raise N2VCBadArgumentsException(
210 message
="artifact_path is mandatory", bad_args
=["artifact_path"]
213 # Validate artifact-path exists and sync path
214 from_path
= os
.path
.split(artifact_path
)[0]
215 self
.fs
.sync(from_path
)
217 # remove / in charm path
218 while artifact_path
.find("//") >= 0:
219 artifact_path
= artifact_path
.replace("//", "/")
222 if self
.fs
.file_exists(artifact_path
):
223 helm_chart_path
= artifact_path
225 msg
= "artifact path does not exist: {}".format(artifact_path
)
226 raise N2VCBadArgumentsException(message
=msg
, bad_args
=["artifact_path"])
228 if artifact_path
.startswith("/"):
229 full_path
= self
.fs
.path
+ helm_chart_path
231 full_path
= self
.fs
.path
+ "/" + helm_chart_path
233 while full_path
.find("//") >= 0:
234 full_path
= full_path
.replace("//", "/")
236 # By default, the KDU is expected to be a file
237 kdu_model
= full_path
238 # If the chart_model includes a "/", then it is a reference:
239 # e.g. (stable/openldap; stable/openldap:1.2.4)
240 if chart_model
.find("/") >= 0:
241 kdu_model
= chart_model
244 # Call helm conn install
245 # Obtain system cluster id from database
246 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
247 # Add parameter osm if exist to global
248 if config
and config
.get("osm"):
249 if not config
.get("global"):
250 config
["global"] = {}
251 config
["global"]["osm"] = config
.get("osm")
253 self
.log
.debug("install helm chart: {}".format(full_path
))
254 if vca_type
== "helm":
255 helm_id
= self
._k
8sclusterhelm
2.generate_kdu_instance_name(
259 await self
._k
8sclusterhelm
2.install(
262 kdu_instance
=helm_id
,
263 namespace
=self
._KUBECTL
_OSM
_NAMESPACE
,
266 timeout
=progress_timeout
,
269 helm_id
= self
._k
8sclusterhelm
2.generate_kdu_instance_name(
273 await self
._k
8sclusterhelm
3.install(
276 kdu_instance
=helm_id
,
277 namespace
=self
._KUBECTL
_OSM
_NAMESPACE
,
280 timeout
=progress_timeout
,
283 ee_id
= "{}:{}.{}".format(vca_type
, self
._KUBECTL
_OSM
_NAMESPACE
, helm_id
)
285 except N2VCException
:
287 except Exception as e
:
288 self
.log
.error("Error deploying chart ee: {}".format(e
), exc_info
=True)
289 raise N2VCException("Error deploying chart ee: {}".format(e
))
291 async def upgrade_execution_environment(
296 progress_timeout
: float = None,
297 total_timeout
: float = None,
299 artifact_path
: str = None,
300 vca_type
: str = None,
305 Creates a new helm execution environment deploying the helm-chat indicated in the
307 :param str namespace: This param is not used, all helm charts are deployed in the osm
309 :param dict db_dict: where to write to database when the status changes.
310 It contains a dictionary with {collection: str, filter: {}, path: str},
311 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
312 "_admin.deployed.VCA.3"}
313 :param helm_id: unique name of the Helm release to upgrade
314 :param float progress_timeout:
315 :param float total_timeout:
316 :param dict config: General variables to instantiate KDU
317 :param str artifact_path: path of package content
318 :param str vca_type: Type of vca, must be type helm or helm-v3
319 :returns str, dict: id of the new execution environment including namespace.helm_id
320 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
324 "upgrade_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
327 # Validate helm_id is provided
328 if helm_id
is None or len(helm_id
) == 0:
329 raise N2VCBadArgumentsException(
330 message
="helm_id is mandatory", bad_args
=["helm_id"]
333 # Validate artifact-path is provided
334 if artifact_path
is None or len(artifact_path
) == 0:
335 raise N2VCBadArgumentsException(
336 message
="artifact_path is mandatory", bad_args
=["artifact_path"]
339 # Validate artifact-path exists and sync path
340 from_path
= os
.path
.split(artifact_path
)[0]
341 self
.fs
.sync(from_path
)
343 # remove / in charm path
344 while artifact_path
.find("//") >= 0:
345 artifact_path
= artifact_path
.replace("//", "/")
348 if self
.fs
.file_exists(artifact_path
):
349 helm_chart_path
= artifact_path
351 msg
= "artifact path does not exist: {}".format(artifact_path
)
352 raise N2VCBadArgumentsException(message
=msg
, bad_args
=["artifact_path"])
354 if artifact_path
.startswith("/"):
355 full_path
= self
.fs
.path
+ helm_chart_path
357 full_path
= self
.fs
.path
+ "/" + helm_chart_path
359 while full_path
.find("//") >= 0:
360 full_path
= full_path
.replace("//", "/")
363 # Call helm conn upgrade
364 # Obtain system cluster id from database
365 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
366 # Add parameter osm if exist to global
367 if config
and config
.get("osm"):
368 if not config
.get("global"):
369 config
["global"] = {}
370 config
["global"]["osm"] = config
.get("osm")
372 self
.log
.debug("Ugrade helm chart: {}".format(full_path
))
373 if vca_type
== "helm":
374 await self
._k
8sclusterhelm
2.upgrade(
377 kdu_instance
=helm_id
,
381 timeout
=progress_timeout
,
385 await self
._k
8sclusterhelm
3.upgrade(
388 kdu_instance
=helm_id
,
392 timeout
=progress_timeout
,
396 except N2VCException
:
398 except Exception as e
:
399 self
.log
.error("Error upgrading chart ee: {}".format(e
), exc_info
=True)
400 raise N2VCException("Error upgrading chart ee: {}".format(e
))
402 async def register_execution_environment(
407 progress_timeout
: float = None,
408 total_timeout
: float = None,
415 async def install_configuration_sw(self
, *args
, **kwargs
):
419 async def add_relation(self
, *args
, **kwargs
):
423 async def remove_relation(self
):
427 async def get_status(self
, *args
, **kwargs
):
428 # not used for this connector
431 async def get_ee_ssh_public__key(
435 progress_timeout
: float = None,
436 total_timeout
: float = None,
440 Obtains ssh-public key from ee executing GetSShKey method from the ee.
442 :param str ee_id: the id of the execution environment returned by
443 create_execution_environment or register_execution_environment
445 :param float progress_timeout:
446 :param float total_timeout:
447 :returns: public key of the execution environment
451 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(ee_id
, db_dict
)
455 if ee_id
is None or len(ee_id
) == 0:
456 raise N2VCBadArgumentsException(
457 message
="ee_id is mandatory", bad_args
=["ee_id"]
461 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
462 version
, namespace
, helm_id
= get_ee_id_parts(ee_id
)
463 ip_addr
= socket
.gethostbyname(helm_id
)
465 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
466 # install libraries and start successfully
467 ssh_key
= await self
._get
_ssh
_key
(ip_addr
)
469 except Exception as e
:
470 self
.log
.error("Error obtaining ee ssh_key: {}".format(e
), exc_info
=True)
471 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e
))
473 async def upgrade_charm(
477 charm_id
: str = None,
478 charm_type
: str = None,
479 timeout
: float = None,
481 """This method upgrade charms in VNFs
483 This method does not support KDU's deployed with Helm.
486 ee_id: Execution environment id
487 path: Local path to the charm
489 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
490 timeout: (Float) Timeout for the ns update operation
493 the output of the update operation if status equals to "completed"
496 raise N2VCException("KDUs deployed with Helm do not support charm upgrade")
498 async def exec_primitive(
503 db_dict
: dict = None,
504 progress_timeout
: float = None,
505 total_timeout
: float = None,
509 Execute a primitive in the execution environment
511 :param str ee_id: the one returned by create_execution_environment or
512 register_execution_environment with the format namespace.helm_id
513 :param str primitive_name: must be one defined in the software. There is one
514 called 'config', where, for the proxy case, the 'credentials' of VM are
516 :param dict params_dict: parameters of the action
517 :param dict db_dict: where to write into database when the status changes.
518 It contains a dict with
519 {collection: <str>, filter: {}, path: <str>},
520 e.g. {collection: "nslcmops", filter:
521 {_id: <nslcmop_id>, path: "_admin.VCA"}
522 It will be used to store information about intermediate notifications
523 :param float progress_timeout:
524 :param float total_timeout:
525 :returns str: primitive result, if ok. It raises exceptions in case of fail
529 "exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
530 ee_id
, primitive_name
, params_dict
, db_dict
535 if ee_id
is None or len(ee_id
) == 0:
536 raise N2VCBadArgumentsException(
537 message
="ee_id is mandatory", bad_args
=["ee_id"]
539 if primitive_name
is None or len(primitive_name
) == 0:
540 raise N2VCBadArgumentsException(
541 message
="action_name is mandatory", bad_args
=["action_name"]
543 if params_dict
is None:
547 version
, namespace
, helm_id
= get_ee_id_parts(ee_id
)
548 ip_addr
= socket
.gethostbyname(helm_id
)
549 except Exception as e
:
550 self
.log
.error("Error getting ee ip ee: {}".format(e
))
551 raise N2VCException("Error getting ee ip ee: {}".format(e
))
553 if primitive_name
== "config":
555 # Execute config primitive, higher timeout to check the case ee is starting
556 status
, detailed_message
= await self
._execute
_config
_primitive
(
557 ip_addr
, params_dict
, db_dict
=db_dict
560 "Executed config primitive ee_id_ {}, status: {}, message: {}".format(
561 ee_id
, status
, detailed_message
566 "Error configuring helm ee, status: {}, message: {}".format(
567 status
, detailed_message
570 raise N2VCExecutionException(
571 message
="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
572 ee_id
, status
, detailed_message
574 primitive_name
=primitive_name
,
576 except Exception as e
:
577 self
.log
.error("Error configuring helm ee: {}".format(e
))
578 raise N2VCExecutionException(
579 message
="Error configuring helm ee_id: {}, {}".format(ee_id
, e
),
580 primitive_name
=primitive_name
,
586 status
, detailed_message
= await self
._execute
_primitive
(
587 ip_addr
, primitive_name
, params_dict
, db_dict
=db_dict
590 "Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
591 primitive_name
, ee_id
, status
, detailed_message
594 if status
!= "OK" and status
!= "PROCESSING":
596 "Execute primitive {} returned not ok status: {}, message: {}".format(
597 primitive_name
, status
, detailed_message
600 raise N2VCExecutionException(
601 message
="Execute primitive {} returned not ok status: {}, message: {}".format(
602 primitive_name
, status
, detailed_message
604 primitive_name
=primitive_name
,
606 except Exception as e
:
608 "Error executing primitive {}: {}".format(primitive_name
, e
)
610 raise N2VCExecutionException(
611 message
="Error executing primitive {} into ee={} : {}".format(
612 primitive_name
, ee_id
, e
614 primitive_name
=primitive_name
,
616 return detailed_message
618 async def deregister_execution_environments(self
):
622 async def delete_execution_environment(
625 db_dict
: dict = None,
626 total_timeout
: float = None,
630 Delete an execution environment
631 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
632 :param dict db_dict: where to write into database when the status changes.
633 It contains a dict with
634 {collection: <str>, filter: {}, path: <str>},
635 e.g. {collection: "nsrs", filter:
636 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
637 :param float total_timeout:
640 self
.log
.info("ee_id: {}".format(ee_id
))
644 raise N2VCBadArgumentsException(
645 message
="ee_id is mandatory", bad_args
=["ee_id"]
650 # Obtain cluster_uuid
651 system_cluster_uuid
= await self
._get
_system
_cluster
_id
()
654 version
, namespace
, helm_id
= get_ee_id_parts(ee_id
)
656 # Uninstall chart, for backward compatibility we must assume that if there is no
657 # version it is helm-v2
658 if version
== "helm-v3":
659 await self
._k
8sclusterhelm
3.uninstall(system_cluster_uuid
, helm_id
)
661 await self
._k
8sclusterhelm
2.uninstall(system_cluster_uuid
, helm_id
)
662 self
.log
.info("ee_id: {} deleted".format(ee_id
))
663 except N2VCException
:
665 except Exception as e
:
667 "Error deleting ee id: {}: {}".format(ee_id
, e
), exc_info
=True
669 raise N2VCException("Error deleting ee id {}: {}".format(ee_id
, e
))
671 async def delete_namespace(
672 self
, namespace
: str, db_dict
: dict = None, total_timeout
: float = None
674 # method not implemented for this connector, execution environments must be deleted individually
677 async def install_k8s_proxy_charm(
683 progress_timeout
: float = None,
684 total_timeout
: float = None,
691 @retryer(max_wait_time_var
="_initial_retry_time", delay_time_var
="_retry_delay")
692 async def _get_ssh_key(self
, ip_addr
):
693 channel
= Channel(ip_addr
, self
._ee
_service
_port
)
695 stub
= FrontendExecutorStub(channel
)
696 self
.log
.debug("get ssh key, ip_addr: {}".format(ip_addr
))
697 reply
: SshKeyReply
= await stub
.GetSshKey(SshKeyRequest())
702 @retryer(max_wait_time_var
="_initial_retry_time", delay_time_var
="_retry_delay")
703 async def _execute_config_primitive(self
, ip_addr
, params
, db_dict
=None):
704 return await self
._execute
_primitive
_internal
(
705 ip_addr
, "config", params
, db_dict
=db_dict
708 @retryer(max_wait_time_var
="_max_retry_time", delay_time_var
="_retry_delay")
709 async def _execute_primitive(self
, ip_addr
, primitive_name
, params
, db_dict
=None):
710 return await self
._execute
_primitive
_internal
(
711 ip_addr
, primitive_name
, params
, db_dict
=db_dict
714 async def _execute_primitive_internal(
715 self
, ip_addr
, primitive_name
, params
, db_dict
=None
718 channel
= Channel(ip_addr
, self
._ee
_service
_port
)
720 stub
= FrontendExecutorStub(channel
)
721 async with stub
.RunPrimitive
.open() as stream
:
722 primitive_id
= str(uuid
.uuid1())
725 "Execute primitive internal: id:{}, name:{}, params: {}".format(
726 primitive_id
, primitive_name
, params
729 await stream
.send_message(
731 id=primitive_id
, name
=primitive_name
, params
=yaml
.dump(params
)
735 async for reply
in stream
:
736 self
.log
.debug("Received reply: {}".format(reply
))
738 # If db_dict provided write notifs in database
740 self
._write
_op
_detailed
_status
(
741 db_dict
, reply
.status
, reply
.detailed_message
744 return reply
.status
, reply
.detailed_message
746 return "ERROR", "No result received"
750 def _write_op_detailed_status(self
, db_dict
, status
, detailed_message
):
752 # write ee_id to database: _admin.deployed.VCA.x
754 the_table
= db_dict
["collection"]
755 the_filter
= db_dict
["filter"]
756 update_dict
= {"detailed-status": "{}: {}".format(status
, detailed_message
)}
757 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
761 update_dict
=update_dict
,
764 except asyncio
.CancelledError
:
766 except Exception as e
:
767 self
.log
.error("Error writing detailedStatus to database: {}".format(e
))
769 async def _get_system_cluster_id(self
):
770 if not self
._system
_cluster
_id
:
771 db_k8cluster
= self
.db
.get_one(
772 "k8sclusters", {"name": self
._KUBECTL
_OSM
_CLUSTER
_NAME
}
774 k8s_hc_id
= deep_get(db_k8cluster
, ("_admin", "helm-chart-v3", "id"))
777 # backward compatibility for existing clusters that have not been initialized for helm v3
778 cluster_id
= db_k8cluster
.get("_id")
779 k8s_credentials
= yaml
.safe_dump(db_k8cluster
.get("credentials"))
780 k8s_hc_id
, uninstall_sw
= await self
._k
8sclusterhelm
3.init_env(
781 k8s_credentials
, reuse_cluster_uuid
=cluster_id
783 db_k8scluster_update
= {
784 "_admin.helm-chart-v3.error_msg": None,
785 "_admin.helm-chart-v3.id": k8s_hc_id
,
786 "_admin.helm-chart-v3}.created": uninstall_sw
,
787 "_admin.helm-chart-v3.operationalState": "ENABLED",
789 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
790 except Exception as e
:
792 "error initializing helm-v3 cluster: {}".format(str(e
))
795 "K8s system cluster '{}' has not been initialized for helm-chart-v3".format(
799 self
._system
_cluster
_id
= k8s_hc_id
800 return self
._system
_cluster
_id