Coverage for n2vc/n2vc_conn.py: 57%
134 statements
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-29 09:03 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2024-06-29 09:03 +0000
1##
2# Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3# This file is part of OSM
4# All Rights Reserved.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15# implied.
16# See the License for the specific language governing permissions and
17# limitations under the License.
18#
19# For those usages not covered by the Apache License, Version 2.0 please
20# contact with: nfvlabs@tid.es
21##
24import abc
25import asyncio
26from http import HTTPStatus
27from shlex import quote
28import os
29import shlex
30import subprocess
31import time
33from n2vc.exceptions import N2VCBadArgumentsException
34from osm_common.dbmongo import DbException
35import yaml
37from n2vc.loggable import Loggable
38from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
41class N2VCConnector(abc.ABC, Loggable):
42 """Generic N2VC connector
44 Abstract class
45 """
47 """
48 ####################################################################################
49 ################################### P U B L I C ####################################
50 ####################################################################################
51 """
53 def __init__(
54 self,
55 db: object,
56 fs: object,
57 log: object,
58 on_update_db=None,
59 **kwargs,
60 ):
61 """Initialize N2VC abstract connector. It defines de API for VCA connectors
63 :param object db: Mongo object managing the MongoDB (repo common DbBase)
64 :param object fs: FileSystem object managing the package artifacts (repo common
65 FsBase)
66 :param object log: the logging object to log to
67 :param on_update_db: callback called when n2vc connector updates database.
68 Received arguments:
69 table: e.g. "nsrs"
70 filter: e.g. {_id: <nsd-id> }
71 path: e.g. "_admin.deployed.VCA.3."
72 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
73 """
75 # parent class
76 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
78 # check arguments
79 if db is None:
80 raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
81 if fs is None:
82 raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
84 # store arguments into self
85 self.db = db
86 self.fs = fs
87 self.on_update_db = on_update_db
89 # generate private/public key-pair
90 self.private_key_path = None
91 self.public_key_path = None
93 @abc.abstractmethod
94 async def get_status(self, namespace: str, yaml_format: bool = True):
95 """Get namespace status
97 :param namespace: we obtain ns from namespace
98 :param yaml_format: returns a yaml string
99 """
101 # TODO: review which public key
102 def get_public_key(self) -> str:
103 """Get the VCA ssh-public-key
105 Returns the SSH public key from local mahine, to be injected into virtual
106 machines to be managed by the VCA.
107 First run, a ssh keypair will be created.
108 The public key is injected into a VM so that we can provision the
109 machine with Juju, after which Juju will communicate with the VM
110 directly via the juju agent.
111 """
113 # Find the path where we expect our key lives (~/.ssh)
114 homedir = os.environ.get("HOME")
115 if not homedir:
116 self.log.warning("No HOME environment variable, using /tmp")
117 homedir = "/tmp"
118 sshdir = "{}/.ssh".format(homedir)
119 sshdir = os.path.realpath(os.path.normpath(os.path.abspath(sshdir)))
120 if not os.path.exists(sshdir):
121 os.mkdir(sshdir)
123 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
124 self.private_key_path = os.path.realpath(
125 os.path.normpath(os.path.abspath(self.private_key_path))
126 )
127 self.public_key_path = "{}.pub".format(self.private_key_path)
128 self.public_key_path = os.path.realpath(
129 os.path.normpath(os.path.abspath(self.public_key_path))
130 )
132 # If we don't have a key generated, then we have to generate it using ssh-keygen
133 if not os.path.exists(self.private_key_path):
134 command = "ssh-keygen -t {} -b {} -N '' -f {}".format(
135 "rsa", "4096", quote(self.private_key_path)
136 )
137 # run command with arguments
138 args = shlex.split(command)
139 subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
141 # Read the public key. Only one public key (one line) in the file
142 with open(self.public_key_path, "r") as file:
143 public_key = file.readline()
145 return public_key
147 @abc.abstractmethod
148 async def create_execution_environment(
149 self,
150 namespace: str,
151 db_dict: dict,
152 reuse_ee_id: str = None,
153 progress_timeout: float = None,
154 total_timeout: float = None,
155 ) -> tuple[str, dict]:
156 """Create an Execution Environment. Returns when it is created or raises an
157 exception on failing
159 :param str namespace: Contains a dot separate string.
160 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
161 :param dict db_dict: where to write to database when the status changes.
162 It contains a dictionary with {collection: str, filter: {}, path: str},
163 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
164 "_admin.deployed.VCA.3"}
165 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
166 older environment
167 :param float progress_timeout:
168 :param float total_timeout:
169 :returns str, dict: id of the new execution environment and credentials for it
170 (credentials can contains hostname, username, etc depending on
171 underlying cloud)
172 """
174 @abc.abstractmethod
175 async def register_execution_environment(
176 self,
177 namespace: str,
178 credentials: dict,
179 db_dict: dict,
180 progress_timeout: float = None,
181 total_timeout: float = None,
182 ) -> str:
183 """
184 Register an existing execution environment at the VCA
186 :param str namespace: same as create_execution_environment method
187 :param dict credentials: credentials to access the existing execution
188 environment
189 (it can contains hostname, username, path to private key, etc depending on
190 underlying cloud)
191 :param dict db_dict: where to write to database when the status changes.
192 It contains a dictionary with {collection: str, filter: {}, path: str},
193 e.g. {collection: "nsrs", filter:
194 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
195 :param float progress_timeout:
196 :param float total_timeout:
197 :returns str: id of the execution environment
198 """
200 @abc.abstractmethod
201 async def install_configuration_sw(
202 self,
203 ee_id: str,
204 artifact_path: str,
205 db_dict: dict,
206 progress_timeout: float = None,
207 total_timeout: float = None,
208 ):
209 """
210 Install the software inside the execution environment identified by ee_id
212 :param str ee_id: the id of the execution environment returned by
213 create_execution_environment or register_execution_environment
214 :param str artifact_path: where to locate the artifacts (parent folder) using
215 the self.fs
216 the final artifact path will be a combination of this artifact_path and
217 additional string from the config_dict (e.g. charm name)
218 :param dict db_dict: where to write into database when the status changes.
219 It contains a dict with
220 {collection: <str>, filter: {}, path: <str>},
221 e.g. {collection: "nsrs", filter:
222 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
223 :param float progress_timeout:
224 :param float total_timeout:
225 """
227 @abc.abstractmethod
228 async def install_k8s_proxy_charm(
229 self,
230 charm_name: str,
231 namespace: str,
232 artifact_path: str,
233 db_dict: dict,
234 progress_timeout: float = None,
235 total_timeout: float = None,
236 config: dict = None,
237 ) -> str:
238 """
239 Install a k8s proxy charm
241 :param charm_name: Name of the charm being deployed
242 :param namespace: collection of all the uuids related to the charm.
243 :param str artifact_path: where to locate the artifacts (parent folder) using
244 the self.fs
245 the final artifact path will be a combination of this artifact_path and
246 additional string from the config_dict (e.g. charm name)
247 :param dict db_dict: where to write into database when the status changes.
248 It contains a dict with
249 {collection: <str>, filter: {}, path: <str>},
250 e.g. {collection: "nsrs", filter:
251 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
252 :param float progress_timeout:
253 :param float total_timeout:
254 :param config: Dictionary with additional configuration
256 :returns ee_id: execution environment id.
257 """
259 @abc.abstractmethod
260 async def get_ee_ssh_public__key(
261 self,
262 ee_id: str,
263 db_dict: dict,
264 progress_timeout: float = None,
265 total_timeout: float = None,
266 ) -> str:
267 """
268 Generate a priv/pub key pair in the execution environment and return the public
269 key
271 :param str ee_id: the id of the execution environment returned by
272 create_execution_environment or register_execution_environment
273 :param dict db_dict: where to write into database when the status changes.
274 It contains a dict with
275 {collection: <str>, filter: {}, path: <str>},
276 e.g. {collection: "nsrs", filter:
277 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
278 :param float progress_timeout:
279 :param float total_timeout:
280 :returns: public key of the execution environment
281 For the case of juju proxy charm ssh-layered, it is the one
282 returned by 'get-ssh-public-key' primitive.
283 It raises a N2VC exception if fails
284 """
286 @abc.abstractmethod
287 async def add_relation(
288 self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
289 ):
290 """
291 Add a relation between two Execution Environments (using their associated
292 endpoints).
294 :param str ee_id_1: The id of the first execution environment
295 :param str ee_id_2: The id of the second execution environment
296 :param str endpoint_1: The endpoint in the first execution environment
297 :param str endpoint_2: The endpoint in the second execution environment
298 """
300 # TODO
301 @abc.abstractmethod
302 async def remove_relation(self):
303 """ """
305 # TODO
306 @abc.abstractmethod
307 async def deregister_execution_environments(self):
308 """ """
310 @abc.abstractmethod
311 async def delete_namespace(
312 self, namespace: str, db_dict: dict = None, total_timeout: float = None
313 ):
314 """
315 Remove a network scenario and its execution environments
316 :param namespace: [<nsi-id>].<ns-id>
317 :param dict db_dict: where to write into database when the status changes.
318 It contains a dict with
319 {collection: <str>, filter: {}, path: <str>},
320 e.g. {collection: "nsrs", filter:
321 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
322 :param float total_timeout:
323 """
325 @abc.abstractmethod
326 async def delete_execution_environment(
327 self, ee_id: str, db_dict: dict = None, total_timeout: float = None
328 ):
329 """
330 Delete an execution environment
331 :param str ee_id: id of the execution environment to delete
332 :param dict db_dict: where to write into database when the status changes.
333 It contains a dict with
334 {collection: <str>, filter: {}, path: <str>},
335 e.g. {collection: "nsrs", filter:
336 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
337 :param float total_timeout:
338 """
340 @abc.abstractmethod
341 async def upgrade_charm(
342 self,
343 ee_id: str = None,
344 path: str = None,
345 charm_id: str = None,
346 charm_type: str = None,
347 timeout: float = None,
348 ) -> str:
349 """This method upgrade charms in VNFs
351 Args:
352 ee_id: Execution environment id
353 path: Local path to the charm
354 charm_id: charm-id
355 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
356 timeout: (Float) Timeout for the ns update operation
358 Returns:
359 The output of the update operation if status equals to "completed"
360 """
362 @abc.abstractmethod
363 async def exec_primitive(
364 self,
365 ee_id: str,
366 primitive_name: str,
367 params_dict: dict,
368 db_dict: dict = None,
369 progress_timeout: float = None,
370 total_timeout: float = None,
371 ) -> str:
372 """
373 Execute a primitive in the execution environment
375 :param str ee_id: the one returned by create_execution_environment or
376 register_execution_environment
377 :param str primitive_name: must be one defined in the software. There is one
378 called 'config', where, for the proxy case, the 'credentials' of VM are
379 provided
380 :param dict params_dict: parameters of the action
381 :param dict db_dict: where to write into database when the status changes.
382 It contains a dict with
383 {collection: <str>, filter: {}, path: <str>},
384 e.g. {collection: "nsrs", filter:
385 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
386 :param float progress_timeout:
387 :param float total_timeout:
388 :returns str: primitive result, if ok. It raises exceptions in case of fail
389 """
391 async def disconnect(self):
392 """
393 Disconnect from VCA
394 """
396 """
397 ####################################################################################
398 ################################### P R I V A T E ##################################
399 ####################################################################################
400 """
402 def _get_namespace_components(
403 self, namespace: str
404 ) -> tuple[str, str, str, str, str]:
405 """
406 Split namespace components
408 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
409 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
410 """
412 # check parameters
413 if namespace is None or len(namespace) == 0:
414 raise N2VCBadArgumentsException(
415 "Argument namespace is mandatory", ["namespace"]
416 )
418 # split namespace components
419 parts = namespace.split(".")
420 nsi_id = None
421 ns_id = None
422 vnf_id = None
423 vdu_id = None
424 vdu_count = None
425 if len(parts) > 0 and len(parts[0]) > 0:
426 nsi_id = parts[0]
427 if len(parts) > 1 and len(parts[1]) > 0:
428 ns_id = parts[1]
429 if len(parts) > 2 and len(parts[2]) > 0:
430 vnf_id = parts[2]
431 if len(parts) > 3 and len(parts[3]) > 0:
432 vdu_id = parts[3]
433 vdu_parts = parts[3].split("-")
434 if len(vdu_parts) > 1:
435 vdu_id = vdu_parts[0]
436 vdu_count = vdu_parts[1]
438 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
440 async def write_app_status_to_db(
441 self,
442 db_dict: dict,
443 status: N2VCDeploymentStatus,
444 detailed_status: str,
445 vca_status: str,
446 entity_type: str,
447 vca_id: str = None,
448 ):
449 """
450 Write application status to database
452 :param: db_dict: DB dictionary
453 :param: status: Status of the application
454 :param: detailed_status: Detailed status
455 :param: vca_status: VCA status
456 :param: entity_type: Entity type ("application", "machine, and "action")
457 :param: vca_id: Id of the VCA. If None, the default VCA will be used.
458 """
459 if not db_dict:
460 self.log.debug("No db_dict => No database write")
461 return
463 # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
464 # .format(str(status.value), detailed_status, vca_status, entity_type))
466 try:
467 the_table = db_dict["collection"]
468 the_filter = db_dict["filter"]
469 the_path = db_dict["path"]
470 if not the_path[-1] == ".":
471 the_path = the_path + "."
472 update_dict = {
473 the_path + "status": str(status.value),
474 the_path + "detailed-status": detailed_status,
475 the_path + "VCA-status": vca_status,
476 the_path + "entity-type": entity_type,
477 the_path + "status-time": str(time.time()),
478 }
480 self.db.set_one(
481 table=the_table,
482 q_filter=the_filter,
483 update_dict=update_dict,
484 fail_on_empty=True,
485 )
487 # database callback
488 if self.on_update_db:
489 if asyncio.iscoroutinefunction(self.on_update_db):
490 await self.on_update_db(
491 the_table, the_filter, the_path, update_dict, vca_id=vca_id
492 )
493 else:
494 self.on_update_db(
495 the_table, the_filter, the_path, update_dict, vca_id=vca_id
496 )
498 except DbException as e:
499 if e.http_code == HTTPStatus.NOT_FOUND:
500 self.log.error(
501 "NOT_FOUND error: Exception writing status to database: {}".format(
502 e
503 )
504 )
505 else:
506 self.log.info("Exception writing status to database: {}".format(e))
508 def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
509 if status not in JujuStatusToOSM[entity_type]:
510 self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
511 return N2VCDeploymentStatus.UNKNOWN
512 return JujuStatusToOSM[entity_type][status]
515def obj_to_yaml(obj: object) -> str:
516 # dump to yaml
517 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
518 # split lines
519 lines = dump_text.splitlines()
520 # remove !!python/object tags
521 yaml_text = ""
522 for line in lines:
523 index = line.find("!!python/object")
524 if index >= 0:
525 line = line[:index]
526 yaml_text += line + "\n"
527 return yaml_text
530def obj_to_dict(obj: object) -> dict:
531 # convert obj to yaml
532 yaml_text = obj_to_yaml(obj)
533 # parse to dict
534 return yaml.load(yaml_text, Loader=yaml.SafeLoader)