2 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
25 from grpclib
.client
import Channel
27 from osm_lcm
.frontend_pb2
import PrimitiveRequest
28 from osm_lcm
.frontend_pb2
import SshKeyRequest
, SshKeyReply
29 from osm_lcm
.frontend_grpc
import FrontendExecutorStub
31 from n2vc
.n2vc_conn
import N2VCConnector
32 from n2vc
.k8s_helm_conn
import K8sHelmConnector
33 from n2vc
.exceptions
import N2VCBadArgumentsException
, N2VCException
, N2VCExecutionException
35 from osm_lcm
.lcm_utils
import deep_get
38 def retryer(max_wait_time
=60, delay_time
=10):
41 ConnectionRefusedError
44 @functools.wraps(func
)
45 async def wrapped(*args
, **kwargs
):
46 wait_time
= max_wait_time
49 return await func(*args
, **kwargs
)
50 except retry_exceptions
:
51 wait_time
= wait_time
- delay_time
52 await asyncio
.sleep(delay_time
)
55 return ConnectionRefusedError
60 class LCMHelmConn(N2VCConnector
):
61 _KUBECTL_OSM_NAMESPACE
= "osm"
62 _KUBECTL_OSM_CLUSTER_NAME
= "_system-osm-k8s"
63 _EE_SERVICE_PORT
= 50050
65 # Time beetween retries
67 # Initial max retry time
68 _MAX_INITIAL_RETRY_TIME
= 300
79 vca_config
: dict = None,
82 Initialize EE helm connector.
85 # parent class constructor
86 N2VCConnector
.__init
__(
94 vca_config
=vca_config
,
95 on_update_db
=on_update_db
,
98 self
.log
.debug("Initialize helm N2VC connector")
100 # TODO - Obtain data from configuration
101 self
._ee
_service
_port
= self
._EE
_SERVICE
_PORT
103 self
._retry
_delay
= self
._EE
_RETRY
_DELAY
104 self
._max
_retry
_time
= self
._MAX
_RETRY
_TIME
105 self
._initial
_retry
_time
= self
._MAX
_INITIAL
_RETRY
_TIME
107 # initialize helm connector
108 self
._k
8sclusterhelm
= K8sHelmConnector(
109 kubectl_command
=self
.vca_config
.get("kubectlpath"),
110 helm_command
=self
.vca_config
.get("helmpath"),
117 self
._system
_cluster
_id
= None
118 self
.log
.info("Helm N2VC connector initialized")
120 # TODO - ¿reuse_ee_id?
121 async def create_execution_environment(self
,
124 reuse_ee_id
: str = None,
125 progress_timeout
: float = None,
126 total_timeout
: float = None,
128 artifact_path
: str = None,
129 vca_type
: str = None) -> (str, dict):
131 Creates a new helm execution environment deploying the helm-chat indicated in the
133 :param str namespace: This param is not used, all helm charts are deployed in the osm
135 :param dict db_dict: where to write to database when the status changes.
136 It contains a dictionary with {collection: str, filter: {}, path: str},
137 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
138 "_admin.deployed.VCA.3"}
139 :param str reuse_ee_id: ee id from an older execution. TODO - right now this params is not used
140 :param float progress_timeout:
141 :param float total_timeout:
142 :param dict config: General variables to instantiate KDU
143 :param str artifact_path: path of package content
144 :param str vca_type: Type of vca, not used as assumed of type helm
145 :returns str, dict: id of the new execution environment including namespace.helm_id
146 and credentials object set to None as all credentials should be osm kubernetes .kubeconfig
150 "create_execution_environment: namespace: {}, artifact_path: {}, db_dict: {}, "
151 "reuse_ee_id: {}".format(
152 namespace
, artifact_path
, db_dict
, reuse_ee_id
)
155 # Validate artifact-path is provided
156 if artifact_path
is None or len(artifact_path
) == 0:
157 raise N2VCBadArgumentsException(
158 message
="artifact_path is mandatory", bad_args
=["artifact_path"]
161 # Validate artifact-path exists and sync path
162 from_path
= os
.path
.split(artifact_path
)[0]
163 self
.fs
.sync(from_path
)
165 # remove / in charm path
166 while artifact_path
.find("//") >= 0:
167 artifact_path
= artifact_path
.replace("//", "/")
170 if self
.fs
.file_exists(artifact_path
):
171 helm_chart_path
= artifact_path
173 msg
= "artifact path does not exist: {}".format(artifact_path
)
174 raise N2VCBadArgumentsException(message
=msg
, bad_args
=["artifact_path"])
176 if artifact_path
.startswith("/"):
177 full_path
= self
.fs
.path
+ helm_chart_path
179 full_path
= self
.fs
.path
+ "/" + helm_chart_path
182 # Call helm conn install
183 # Obtain system cluster id from database
184 system_cluster_uuid
= self
._get
_system
_cluster
_id
()
185 # Add parameter osm if exist to global
186 if config
and config
.get("osm"):
187 if not config
.get("global"):
188 config
["global"] = {}
189 config
["global"]["osm"] = config
.get("osm")
191 self
.log
.debug("install helm chart: {}".format(full_path
))
192 helm_id
= await self
._k
8sclusterhelm
.install(system_cluster_uuid
, kdu_model
=full_path
,
193 namespace
=self
._KUBECTL
_OSM
_NAMESPACE
,
196 timeout
=progress_timeout
)
198 ee_id
= "{}.{}".format(self
._KUBECTL
_OSM
_NAMESPACE
, helm_id
)
200 except N2VCException
:
202 except Exception as e
:
203 self
.log
.error("Error deploying chart ee: {}".format(e
), exc_info
=True)
204 raise N2VCException("Error deploying chart ee: {}".format(e
))
206 async def register_execution_environment(self
, namespace
: str, credentials
: dict, db_dict
: dict,
207 progress_timeout
: float = None, total_timeout
: float = None) -> str:
211 async def install_configuration_sw(self
,
215 progress_timeout
: float = None,
216 total_timeout
: float = None,
224 async def add_relation(self
, ee_id_1
: str, ee_id_2
: str, endpoint_1
: str, endpoint_2
: str):
228 async def remove_relation(self
):
232 async def get_status(self
, namespace
: str, yaml_format
: bool = True):
233 # not used for this connector
236 async def get_ee_ssh_public__key(self
, ee_id
: str, db_dict
: dict, progress_timeout
: float = None,
237 total_timeout
: float = None) -> str:
239 Obtains ssh-public key from ee executing GetSShKey method from the ee.
241 :param str ee_id: the id of the execution environment returned by
242 create_execution_environment or register_execution_environment
244 :param float progress_timeout:
245 :param float total_timeout:
246 :returns: public key of the execution environment
250 "get_ee_ssh_public_key: ee_id: {}, db_dict: {}".format(
255 if ee_id
is None or len(ee_id
) == 0:
256 raise N2VCBadArgumentsException(
257 message
="ee_id is mandatory", bad_args
=["ee_id"]
261 # Obtain ip_addr for the ee service, it is resolved by dns from the ee name by kubernetes
262 namespace
, helm_id
= self
._get
_ee
_id
_parts
(ee_id
)
263 ip_addr
= socket
.gethostbyname(helm_id
)
265 # Obtain ssh_key from the ee, this method will implement retries to allow the ee
266 # install libraries and start successfully
267 ssh_key
= await self
._get
_ssh
_key
(ip_addr
)
269 except Exception as e
:
270 self
.log
.error("Error obtaining ee ssh_key: {}".format(e
), exc_info
=True)
271 raise N2VCException("Error obtaining ee ssh_ke: {}".format(e
))
273 async def exec_primitive(self
, ee_id
: str, primitive_name
: str, params_dict
: dict, db_dict
: dict = None,
274 progress_timeout
: float = None, total_timeout
: float = None) -> str:
276 Execute a primitive in the execution environment
278 :param str ee_id: the one returned by create_execution_environment or
279 register_execution_environment with the format namespace.helm_id
280 :param str primitive_name: must be one defined in the software. There is one
281 called 'config', where, for the proxy case, the 'credentials' of VM are
283 :param dict params_dict: parameters of the action
284 :param dict db_dict: where to write into database when the status changes.
285 It contains a dict with
286 {collection: <str>, filter: {}, path: <str>},
287 e.g. {collection: "nslcmops", filter:
288 {_id: <nslcmop_id>, path: "_admin.VCA"}
289 It will be used to store information about intermediate notifications
290 :param float progress_timeout:
291 :param float total_timeout:
292 :returns str: primitive result, if ok. It raises exceptions in case of fail
295 self
.log
.info("exec primitive for ee_id : {}, primitive_name: {}, params_dict: {}, db_dict: {}".format(
296 ee_id
, primitive_name
, params_dict
, db_dict
300 if ee_id
is None or len(ee_id
) == 0:
301 raise N2VCBadArgumentsException(
302 message
="ee_id is mandatory", bad_args
=["ee_id"]
304 if primitive_name
is None or len(primitive_name
) == 0:
305 raise N2VCBadArgumentsException(
306 message
="action_name is mandatory", bad_args
=["action_name"]
308 if params_dict
is None:
312 namespace
, helm_id
= self
._get
_ee
_id
_parts
(ee_id
)
313 ip_addr
= socket
.gethostbyname(helm_id
)
314 except Exception as e
:
315 self
.log
.error("Error getting ee ip ee: {}".format(e
))
316 raise N2VCException("Error getting ee ip ee: {}".format(e
))
318 if primitive_name
== "config":
320 # Execute config primitive, higher timeout to check the case ee is starting
321 status
, detailed_message
= await self
._execute
_config
_primitive
(ip_addr
, params_dict
, db_dict
=db_dict
)
322 self
.log
.debug("Executed config primitive ee_id_ {}, status: {}, message: {}".format(
323 ee_id
, status
, detailed_message
))
325 self
.log
.error("Error configuring helm ee, status: {}, message: {}".format(
326 status
, detailed_message
))
327 raise N2VCExecutionException(
328 message
="Error configuring helm ee_id: {}, status: {}, message: {}: ".format(
329 ee_id
, status
, detailed_message
331 primitive_name
=primitive_name
,
333 except Exception as e
:
334 self
.log
.error("Error configuring helm ee: {}".format(e
))
335 raise N2VCExecutionException(
336 message
="Error configuring helm ee_id: {}, {}".format(
339 primitive_name
=primitive_name
,
345 status
, detailed_message
= await self
._execute
_primitive
(ip_addr
, primitive_name
,
346 params_dict
, db_dict
=db_dict
)
347 self
.log
.debug("Executed primitive {} ee_id_ {}, status: {}, message: {}".format(
348 primitive_name
, ee_id
, status
, detailed_message
))
349 if status
!= "OK" and status
!= "PROCESSING":
351 "Execute primitive {} returned not ok status: {}, message: {}".format(
352 primitive_name
, status
, detailed_message
)
354 raise N2VCExecutionException(
355 message
="Execute primitive {} returned not ok status: {}, message: {}".format(
356 primitive_name
, status
, detailed_message
358 primitive_name
=primitive_name
,
360 except Exception as e
:
362 "Error executing primitive {}: {}".format(primitive_name
, e
)
364 raise N2VCExecutionException(
365 message
="Error executing primitive {} into ee={} : {}".format(
366 primitive_name
, ee_id
, e
368 primitive_name
=primitive_name
,
370 return detailed_message
372 async def deregister_execution_environments(self
):
376 async def delete_execution_environment(self
, ee_id
: str, db_dict
: dict = None, total_timeout
: float = None):
378 Delete an execution environment
379 :param str ee_id: id of the execution environment to delete, included namespace.helm_id
380 :param dict db_dict: where to write into database when the status changes.
381 It contains a dict with
382 {collection: <str>, filter: {}, path: <str>},
383 e.g. {collection: "nsrs", filter:
384 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
385 :param float total_timeout:
388 self
.log
.info("ee_id: {}".format(ee_id
))
392 raise N2VCBadArgumentsException(
393 message
="ee_id is mandatory", bad_args
=["ee_id"]
398 # Obtain cluster_uuid
399 system_cluster_uuid
= self
._get
_system
_cluster
_id
()
402 namespace
, helm_id
= self
._get
_ee
_id
_parts
(ee_id
)
405 await self
._k
8sclusterhelm
.uninstall(system_cluster_uuid
, helm_id
)
406 self
.log
.info("ee_id: {} deleted".format(ee_id
))
407 except N2VCException
:
409 except Exception as e
:
410 self
.log
.error("Error deleting ee id: {}: {}".format(ee_id
, e
), exc_info
=True)
411 raise N2VCException("Error deleting ee id {}: {}".format(ee_id
, e
))
413 async def delete_namespace(self
, namespace
: str, db_dict
: dict = None, total_timeout
: float = None):
414 # method not implemented for this connector, execution environments must be deleted individually
417 async def install_k8s_proxy_charm(
423 progress_timeout
: float = None,
424 total_timeout
: float = None,
429 @retryer(max_wait_time
=_MAX_INITIAL_RETRY_TIME
, delay_time
=_EE_RETRY_DELAY
)
430 async def _get_ssh_key(self
, ip_addr
):
431 channel
= Channel(ip_addr
, self
._ee
_service
_port
)
433 stub
= FrontendExecutorStub(channel
)
434 self
.log
.debug("get ssh key, ip_addr: {}".format(ip_addr
))
435 reply
: SshKeyReply
= await stub
.GetSshKey(SshKeyRequest())
440 @retryer(max_wait_time
=_MAX_INITIAL_RETRY_TIME
, delay_time
=_EE_RETRY_DELAY
)
441 async def _execute_config_primitive(self
, ip_addr
, params
, db_dict
=None):
442 return await self
._execute
_primitive
_internal
(ip_addr
, "config", params
, db_dict
=db_dict
)
444 @retryer(max_wait_time
=_MAX_RETRY_TIME
, delay_time
=_EE_RETRY_DELAY
)
445 async def _execute_primitive(self
, ip_addr
, primitive_name
, params
, db_dict
=None):
446 return await self
._execute
_primitive
_internal
(ip_addr
, primitive_name
, params
, db_dict
=db_dict
)
448 async def _execute_primitive_internal(self
, ip_addr
, primitive_name
, params
, db_dict
=None):
450 channel
= Channel(ip_addr
, self
._ee
_service
_port
)
452 stub
= FrontendExecutorStub(channel
)
453 async with stub
.RunPrimitive
.open() as stream
:
454 primitive_id
= str(uuid
.uuid1())
456 self
.log
.debug("Execute primitive internal: id:{}, name:{}, params: {}".
457 format(primitive_id
, primitive_name
, params
))
458 await stream
.send_message(
459 PrimitiveRequest(id=primitive_id
, name
=primitive_name
, params
=yaml
.dump(params
)), end
=True)
460 async for reply
in stream
:
461 self
.log
.debug("Received reply: {}".format(reply
))
463 # If db_dict provided write notifs in database
465 self
._write
_op
_detailed
_status
(db_dict
, reply
.status
, reply
.detailed_message
)
467 return reply
.status
, reply
.detailed_message
469 return "ERROR", "No result received"
473 def _write_op_detailed_status(self
, db_dict
, status
, detailed_message
):
475 # write ee_id to database: _admin.deployed.VCA.x
477 the_table
= db_dict
["collection"]
478 the_filter
= db_dict
["filter"]
479 update_dict
= {"detailed-status": "{}: {}".format(status
, detailed_message
)}
480 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
484 update_dict
=update_dict
,
487 except asyncio
.CancelledError
:
489 except Exception as e
:
490 self
.log
.error("Error writing detailedStatus to database: {}".format(e
))
492 def _get_system_cluster_id(self
):
493 if not self
._system
_cluster
_id
:
494 db_k8cluster
= self
.db
.get_one("k8sclusters", {"name": self
._KUBECTL
_OSM
_CLUSTER
_NAME
})
495 k8s_hc_id
= deep_get(db_k8cluster
, ("_admin", "helm-chart", "id"))
497 self
.log
.error("osm system cluster has not been properly initialized for helm connector, "
498 "helm-chart id is not defined")
499 raise N2VCException("osm system cluster has not been properly initialized for helm connector")
500 self
._system
_cluster
_id
= k8s_hc_id
501 return self
._system
_cluster
_id
503 def _get_ee_id_parts(self
, ee_id
):
504 namespace
, _
, helm_id
= ee_id
.partition('.')
505 return namespace
, helm_id