Code Coverage

Cobertura Coverage Report > n2vc >

n2vc_conn.py

Trend

File Coverage summary

NameClassesLinesConditionals
n2vc_conn.py
100%
1/1
57%
77/134
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
n2vc_conn.py
57%
77/134
N/A

Source

n2vc/n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 1 import abc
25 1 import asyncio
26 1 from http import HTTPStatus
27 1 from shlex import quote
28 1 import os
29 1 import shlex
30 1 import subprocess
31 1 import time
32
33 1 from n2vc.exceptions import N2VCBadArgumentsException
34 1 from osm_common.dbmongo import DbException
35 1 import yaml
36
37 1 from n2vc.loggable import Loggable
38 1 from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
39
40
41 1 class N2VCConnector(abc.ABC, Loggable):
42     """Generic N2VC connector
43
44     Abstract class
45     """
46
47 1     """
48     ####################################################################################
49     ################################### P U B L I C ####################################
50     ####################################################################################
51     """
52
53 1     def __init__(
54         self,
55         db: object,
56         fs: object,
57         log: object,
58         on_update_db=None,
59         **kwargs,
60     ):
61         """Initialize N2VC abstract connector. It defines de API for VCA connectors
62
63         :param object db: Mongo object managing the MongoDB (repo common DbBase)
64         :param object fs: FileSystem object managing the package artifacts (repo common
65             FsBase)
66         :param object log: the logging object to log to
67         :param on_update_db: callback called when n2vc connector updates database.
68             Received arguments:
69             table: e.g. "nsrs"
70             filter: e.g. {_id: <nsd-id> }
71             path: e.g. "_admin.deployed.VCA.3."
72             updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
73         """
74
75         # parent class
76 1         Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
77
78         # check arguments
79 1         if db is None:
80 0             raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
81 1         if fs is None:
82 0             raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
83
84         # store arguments into self
85 1         self.db = db
86 1         self.fs = fs
87 1         self.on_update_db = on_update_db
88
89         # generate private/public key-pair
90 1         self.private_key_path = None
91 1         self.public_key_path = None
92
93 1     @abc.abstractmethod
94 1     async def get_status(self, namespace: str, yaml_format: bool = True):
95         """Get namespace status
96
97         :param namespace: we obtain ns from namespace
98         :param yaml_format: returns a yaml string
99         """
100
101     # TODO: review which public key
102 1     def get_public_key(self) -> str:
103         """Get the VCA ssh-public-key
104
105         Returns the SSH public key from local mahine, to be injected into virtual
106         machines to be managed by the VCA.
107         First run, a ssh keypair will be created.
108         The public key is injected into a VM so that we can provision the
109         machine with Juju, after which Juju will communicate with the VM
110         directly via the juju agent.
111         """
112
113         # Find the path where we expect our key lives (~/.ssh)
114 0         homedir = os.environ.get("HOME")
115 0         if not homedir:
116 0             self.log.warning("No HOME environment variable, using /tmp")
117 0             homedir = "/tmp"
118 0         sshdir = "{}/.ssh".format(homedir)
119 0         sshdir = os.path.realpath(os.path.normpath(os.path.abspath(sshdir)))
120 0         if not os.path.exists(sshdir):
121 0             os.mkdir(sshdir)
122
123 0         self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
124 0         self.private_key_path = os.path.realpath(
125             os.path.normpath(os.path.abspath(self.private_key_path))
126         )
127 0         self.public_key_path = "{}.pub".format(self.private_key_path)
128 0         self.public_key_path = os.path.realpath(
129             os.path.normpath(os.path.abspath(self.public_key_path))
130         )
131
132         # If we don't have a key generated, then we have to generate it using ssh-keygen
133 0         if not os.path.exists(self.private_key_path):
134 0             command = "ssh-keygen -t {} -b {} -N '' -f {}".format(
135                 "rsa", "4096", quote(self.private_key_path)
136             )
137             # run command with arguments
138 0             args = shlex.split(command)
139 0             subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
140
141         # Read the public key. Only one public key (one line) in the file
142 0         with open(self.public_key_path, "r") as file:
143 0             public_key = file.readline()
144
145 0         return public_key
146
147 1     @abc.abstractmethod
148 1     async def create_execution_environment(
149         self,
150         namespace: str,
151         db_dict: dict,
152         reuse_ee_id: str = None,
153         progress_timeout: float = None,
154         total_timeout: float = None,
155     ) -> tuple[str, dict]:
156         """Create an Execution Environment. Returns when it is created or raises an
157         exception on failing
158
159         :param str namespace: Contains a dot separate string.
160                     LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
161         :param dict db_dict: where to write to database when the status changes.
162             It contains a dictionary with {collection: str, filter: {},  path: str},
163                 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
164                 "_admin.deployed.VCA.3"}
165         :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
166             older environment
167         :param float progress_timeout:
168         :param float total_timeout:
169         :returns str, dict: id of the new execution environment and credentials for it
170                     (credentials can contains hostname, username, etc depending on
171                     underlying cloud)
172         """
173
174 1     @abc.abstractmethod
175 1     async def register_execution_environment(
176         self,
177         namespace: str,
178         credentials: dict,
179         db_dict: dict,
180         progress_timeout: float = None,
181         total_timeout: float = None,
182     ) -> str:
183         """
184         Register an existing execution environment at the VCA
185
186         :param str namespace: same as create_execution_environment method
187         :param dict credentials: credentials to access the existing execution
188             environment
189             (it can contains hostname, username, path to private key, etc depending on
190             underlying cloud)
191         :param dict db_dict: where to write to database when the status changes.
192             It contains a dictionary with {collection: str, filter: {},  path: str},
193                 e.g. {collection: "nsrs", filter:
194                     {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
195         :param float progress_timeout:
196         :param float total_timeout:
197         :returns str: id of the execution environment
198         """
199
200 1     @abc.abstractmethod
201 1     async def install_configuration_sw(
202         self,
203         ee_id: str,
204         artifact_path: str,
205         db_dict: dict,
206         progress_timeout: float = None,
207         total_timeout: float = None,
208     ):
209         """
210         Install the software inside the execution environment identified by ee_id
211
212         :param str ee_id: the id of the execution environment returned by
213             create_execution_environment or register_execution_environment
214         :param str artifact_path: where to locate the artifacts (parent folder) using
215             the self.fs
216             the final artifact path will be a combination of this artifact_path and
217             additional string from the config_dict (e.g. charm name)
218         :param dict db_dict: where to write into database when the status changes.
219                         It contains a dict with
220                             {collection: <str>, filter: {},  path: <str>},
221                             e.g. {collection: "nsrs", filter:
222                                 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
223         :param float progress_timeout:
224         :param float total_timeout:
225         """
226
227 1     @abc.abstractmethod
228 1     async def install_k8s_proxy_charm(
229         self,
230         charm_name: str,
231         namespace: str,
232         artifact_path: str,
233         db_dict: dict,
234         progress_timeout: float = None,
235         total_timeout: float = None,
236         config: dict = None,
237     ) -> str:
238         """
239         Install a k8s proxy charm
240
241         :param charm_name: Name of the charm being deployed
242         :param namespace: collection of all the uuids related to the charm.
243         :param str artifact_path: where to locate the artifacts (parent folder) using
244             the self.fs
245             the final artifact path will be a combination of this artifact_path and
246             additional string from the config_dict (e.g. charm name)
247         :param dict db_dict: where to write into database when the status changes.
248                         It contains a dict with
249                             {collection: <str>, filter: {},  path: <str>},
250                             e.g. {collection: "nsrs", filter:
251                                 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
252         :param float progress_timeout:
253         :param float total_timeout:
254         :param config: Dictionary with additional configuration
255
256         :returns ee_id: execution environment id.
257         """
258
259 1     @abc.abstractmethod
260 1     async def get_ee_ssh_public__key(
261         self,
262         ee_id: str,
263         db_dict: dict,
264         progress_timeout: float = None,
265         total_timeout: float = None,
266     ) -> str:
267         """
268         Generate a priv/pub key pair in the execution environment and return the public
269         key
270
271         :param str ee_id: the id of the execution environment returned by
272             create_execution_environment or register_execution_environment
273         :param dict db_dict: where to write into database when the status changes.
274                         It contains a dict with
275                             {collection: <str>, filter: {},  path: <str>},
276                             e.g. {collection: "nsrs", filter:
277                                 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
278         :param float progress_timeout:
279         :param float total_timeout:
280         :returns: public key of the execution environment
281                     For the case of juju proxy charm ssh-layered, it is the one
282                     returned by 'get-ssh-public-key' primitive.
283                     It raises a N2VC exception if fails
284         """
285
286 1     @abc.abstractmethod
287 1     async def add_relation(
288         self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
289     ):
290         """
291         Add a relation between two Execution Environments (using their associated
292         endpoints).
293
294         :param str ee_id_1: The id of the first execution environment
295         :param str ee_id_2: The id of the second execution environment
296         :param str endpoint_1: The endpoint in the first execution environment
297         :param str endpoint_2: The endpoint in the second execution environment
298         """
299
300     # TODO
301 1     @abc.abstractmethod
302 1     async def remove_relation(self):
303         """ """
304
305     # TODO
306 1     @abc.abstractmethod
307 1     async def deregister_execution_environments(self):
308         """ """
309
310 1     @abc.abstractmethod
311 1     async def delete_namespace(
312         self, namespace: str, db_dict: dict = None, total_timeout: float = None
313     ):
314         """
315         Remove a network scenario and its execution environments
316         :param namespace: [<nsi-id>].<ns-id>
317         :param dict db_dict: where to write into database when the status changes.
318                         It contains a dict with
319                             {collection: <str>, filter: {},  path: <str>},
320                             e.g. {collection: "nsrs", filter:
321                                 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
322         :param float total_timeout:
323         """
324
325 1     @abc.abstractmethod
326 1     async def delete_execution_environment(
327         self, ee_id: str, db_dict: dict = None, total_timeout: float = None
328     ):
329         """
330         Delete an execution environment
331         :param str ee_id: id of the execution environment to delete
332         :param dict db_dict: where to write into database when the status changes.
333                         It contains a dict with
334                             {collection: <str>, filter: {},  path: <str>},
335                             e.g. {collection: "nsrs", filter:
336                                 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
337         :param float total_timeout:
338         """
339
340 1     @abc.abstractmethod
341 1     async def upgrade_charm(
342         self,
343         ee_id: str = None,
344         path: str = None,
345         charm_id: str = None,
346         charm_type: str = None,
347         timeout: float = None,
348     ) -> str:
349         """This method upgrade charms in VNFs
350
351         Args:
352             ee_id:  Execution environment id
353             path:   Local path to the charm
354             charm_id:   charm-id
355             charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
356             timeout: (Float)    Timeout for the ns update operation
357
358         Returns:
359             The output of the update operation if status equals to "completed"
360         """
361
362 1     @abc.abstractmethod
363 1     async def exec_primitive(
364         self,
365         ee_id: str,
366         primitive_name: str,
367         params_dict: dict,
368         db_dict: dict = None,
369         progress_timeout: float = None,
370         total_timeout: float = None,
371     ) -> str:
372         """
373         Execute a primitive in the execution environment
374
375         :param str ee_id: the one returned by create_execution_environment or
376             register_execution_environment
377         :param str primitive_name: must be one defined in the software. There is one
378             called 'config', where, for the proxy case, the 'credentials' of VM are
379             provided
380         :param dict params_dict: parameters of the action
381         :param dict db_dict: where to write into database when the status changes.
382                         It contains a dict with
383                             {collection: <str>, filter: {},  path: <str>},
384                             e.g. {collection: "nsrs", filter:
385                                 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
386         :param float progress_timeout:
387         :param float total_timeout:
388         :returns str: primitive result, if ok. It raises exceptions in case of fail
389         """
390
391 1     async def disconnect(self):
392         """
393         Disconnect from VCA
394         """
395
396 1     """
397     ####################################################################################
398     ################################### P R I V A T E ##################################
399     ####################################################################################
400     """
401
402 1     def _get_namespace_components(
403         self, namespace: str
404     ) -> tuple[str, str, str, str, str]:
405         """
406         Split namespace components
407
408         :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
409         :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
410         """
411
412         # check parameters
413 1         if namespace is None or len(namespace) == 0:
414 0             raise N2VCBadArgumentsException(
415                 "Argument namespace is mandatory", ["namespace"]
416             )
417
418         # split namespace components
419 1         parts = namespace.split(".")
420 1         nsi_id = None
421 1         ns_id = None
422 1         vnf_id = None
423 1         vdu_id = None
424 1         vdu_count = None
425 1         if len(parts) > 0 and len(parts[0]) > 0:
426 0             nsi_id = parts[0]
427 1         if len(parts) > 1 and len(parts[1]) > 0:
428 1             ns_id = parts[1]
429 1         if len(parts) > 2 and len(parts[2]) > 0:
430 1             vnf_id = parts[2]
431 1         if len(parts) > 3 and len(parts[3]) > 0:
432 1             vdu_id = parts[3]
433 1             vdu_parts = parts[3].split("-")
434 1             if len(vdu_parts) > 1:
435 1                 vdu_id = vdu_parts[0]
436 1                 vdu_count = vdu_parts[1]
437
438 1         return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
439
440 1     async def write_app_status_to_db(
441         self,
442         db_dict: dict,
443         status: N2VCDeploymentStatus,
444         detailed_status: str,
445         vca_status: str,
446         entity_type: str,
447         vca_id: str = None,
448     ):
449         """
450         Write application status to database
451
452         :param: db_dict: DB dictionary
453         :param: status: Status of the application
454         :param: detailed_status: Detailed status
455         :param: vca_status: VCA status
456         :param: entity_type: Entity type ("application", "machine, and "action")
457         :param: vca_id: Id of the VCA. If None, the default VCA will be used.
458         """
459 0         if not db_dict:
460 0             self.log.debug("No db_dict => No database write")
461 0             return
462
463         # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
464         #          .format(str(status.value), detailed_status, vca_status, entity_type))
465
466 0         try:
467 0             the_table = db_dict["collection"]
468 0             the_filter = db_dict["filter"]
469 0             the_path = db_dict["path"]
470 0             if not the_path[-1] == ".":
471 0                 the_path = the_path + "."
472 0             update_dict = {
473                 the_path + "status": str(status.value),
474                 the_path + "detailed-status": detailed_status,
475                 the_path + "VCA-status": vca_status,
476                 the_path + "entity-type": entity_type,
477                 the_path + "status-time": str(time.time()),
478             }
479
480 0             self.db.set_one(
481                 table=the_table,
482                 q_filter=the_filter,
483                 update_dict=update_dict,
484                 fail_on_empty=True,
485             )
486
487             # database callback
488 0             if self.on_update_db:
489 0                 if asyncio.iscoroutinefunction(self.on_update_db):
490 0                     await self.on_update_db(
491                         the_table, the_filter, the_path, update_dict, vca_id=vca_id
492                     )
493                 else:
494 0                     self.on_update_db(
495                         the_table, the_filter, the_path, update_dict, vca_id=vca_id
496                     )
497
498 0         except DbException as e:
499 0             if e.http_code == HTTPStatus.NOT_FOUND:
500 0                 self.log.error(
501                     "NOT_FOUND error: Exception writing status to database: {}".format(
502                         e
503                     )
504                 )
505             else:
506 0                 self.log.info("Exception writing status to database: {}".format(e))
507
508 1     def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
509 0         if status not in JujuStatusToOSM[entity_type]:
510 0             self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
511 0             return N2VCDeploymentStatus.UNKNOWN
512 0         return JujuStatusToOSM[entity_type][status]
513
514
515 1 def obj_to_yaml(obj: object) -> str:
516     # dump to yaml
517 0     dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
518     # split lines
519 0     lines = dump_text.splitlines()
520     # remove !!python/object tags
521 0     yaml_text = ""
522 0     for line in lines:
523 0         index = line.find("!!python/object")
524 0         if index >= 0:
525 0             line = line[:index]
526 0         yaml_text += line + "\n"
527 0     return yaml_text
528
529
530 1 def obj_to_dict(obj: object) -> dict:
531     # convert obj to yaml
532 0     yaml_text = obj_to_yaml(obj)
533     # parse to dict
534 0     return yaml.load(yaml_text, Loader=yaml.SafeLoader)