Code Coverage

Cobertura Coverage Report > n2vc >

n2vc_conn.py

Trend

File Coverage summary

NameClassesLinesConditionals
n2vc_conn.py
100%
1/1
59%
75/128
100%
0/0

Coverage Breakdown by Class

NameLinesConditionals
n2vc_conn.py
59%
75/128
N/A

Source

n2vc/n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 1 import abc
25 1 import asyncio
26 1 from http import HTTPStatus
27 1 import os
28 1 import shlex
29 1 import subprocess
30 1 import time
31
32 1 from n2vc.exceptions import N2VCBadArgumentsException
33 1 from osm_common.dbmongo import DbException
34 1 import yaml
35
36 1 from n2vc.loggable import Loggable
37 1 from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
38
39
40 1 class N2VCConnector(abc.ABC, Loggable):
41     """Generic N2VC connector
42
43     Abstract class
44     """
45
46     """
47     ####################################################################################
48     ################################### P U B L I C ####################################
49     ####################################################################################
50     """
51
52 1     def __init__(
53         self,
54         db: object,
55         fs: object,
56         log: object,
57         loop: object,
58         on_update_db=None,
59         **kwargs,
60     ):
61         """Initialize N2VC abstract connector. It defines de API for VCA connectors
62
63         :param object db: Mongo object managing the MongoDB (repo common DbBase)
64         :param object fs: FileSystem object managing the package artifacts (repo common
65             FsBase)
66         :param object log: the logging object to log to
67         :param object loop: the loop to use for asyncio (default current thread loop)
68         :param on_update_db: callback called when n2vc connector updates database.
69             Received arguments:
70             table: e.g. "nsrs"
71             filter: e.g. {_id: <nsd-id> }
72             path: e.g. "_admin.deployed.VCA.3."
73             updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
74         """
75
76         # parent class
77 1         Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
78
79         # check arguments
80 1         if db is None:
81 0             raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
82 1         if fs is None:
83 0             raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
84
85         # store arguments into self
86 1         self.db = db
87 1         self.fs = fs
88 1         self.loop = loop or asyncio.get_event_loop()
89 1         self.on_update_db = on_update_db
90
91         # generate private/public key-pair
92 1         self.private_key_path = None
93 1         self.public_key_path = None
94
95 1     @abc.abstractmethod
96 1     async def get_status(self, namespace: str, yaml_format: bool = True):
97         """Get namespace status
98
99         :param namespace: we obtain ns from namespace
100         :param yaml_format: returns a yaml string
101         """
102
103     # TODO: review which public key
104 1     def get_public_key(self) -> str:
105         """Get the VCA ssh-public-key
106
107         Returns the SSH public key from local mahine, to be injected into virtual
108         machines to be managed by the VCA.
109         First run, a ssh keypair will be created.
110         The public key is injected into a VM so that we can provision the
111         machine with Juju, after which Juju will communicate with the VM
112         directly via the juju agent.
113         """
114
115         # Find the path where we expect our key lives (~/.ssh)
116 0         homedir = os.environ.get("HOME")
117 0         if not homedir:
118 0             self.log.warning("No HOME environment variable, using /tmp")
119 0             homedir = "/tmp"
120 0         sshdir = "{}/.ssh".format(homedir)
121 0         if not os.path.exists(sshdir):
122 0             os.mkdir(sshdir)
123
124 0         self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
125 0         self.public_key_path = "{}.pub".format(self.private_key_path)
126
127         # If we don't have a key generated, then we have to generate it using ssh-keygen
128 0         if not os.path.exists(self.private_key_path):
129 0             cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
130                 "rsa", "4096", self.private_key_path
131             )
132             # run command with arguments
133 0             subprocess.check_output(shlex.split(cmd))
134
135         # Read the public key. Only one public key (one line) in the file
136 0         with open(self.public_key_path, "r") as file:
137 0             public_key = file.readline()
138
139 0         return public_key
140
141 1     @abc.abstractmethod
142 1     async def create_execution_environment(
143         self,
144         namespace: str,
145         db_dict: dict,
146         reuse_ee_id: str = None,
147         progress_timeout: float = None,
148         total_timeout: float = None,
149     ) -> (str, dict):
150         """Create an Execution Environment. Returns when it is created or raises an
151         exception on failing
152
153         :param str namespace: Contains a dot separate string.
154                     LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
155         :param dict db_dict: where to write to database when the status changes.
156             It contains a dictionary with {collection: str, filter: {},  path: str},
157                 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
158                 "_admin.deployed.VCA.3"}
159         :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
160             older environment
161         :param float progress_timeout:
162         :param float total_timeout:
163         :returns str, dict: id of the new execution environment and credentials for it
164                     (credentials can contains hostname, username, etc depending on
165                     underlying cloud)
166         """
167
168 1     @abc.abstractmethod
169 1     async def register_execution_environment(
170         self,
171         namespace: str,
172         credentials: dict,
173         db_dict: dict,
174         progress_timeout: float = None,
175         total_timeout: float = None,
176     ) -> str:
177         """
178         Register an existing execution environment at the VCA
179
180         :param str namespace: same as create_execution_environment method
181         :param dict credentials: credentials to access the existing execution
182             environment
183             (it can contains hostname, username, path to private key, etc depending on
184             underlying cloud)
185         :param dict db_dict: where to write to database when the status changes.
186             It contains a dictionary with {collection: str, filter: {},  path: str},
187                 e.g. {collection: "nsrs", filter:
188                     {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
189         :param float progress_timeout:
190         :param float total_timeout:
191         :returns str: id of the execution environment
192         """
193
194 1     @abc.abstractmethod
195 1     async def install_configuration_sw(
196         self,
197         ee_id: str,
198         artifact_path: str,
199         db_dict: dict,
200         progress_timeout: float = None,
201         total_timeout: float = None,
202     ):
203         """
204         Install the software inside the execution environment identified by ee_id
205
206         :param str ee_id: the id of the execution environment returned by
207             create_execution_environment or register_execution_environment
208         :param str artifact_path: where to locate the artifacts (parent folder) using
209             the self.fs
210             the final artifact path will be a combination of this artifact_path and
211             additional string from the config_dict (e.g. charm name)
212         :param dict db_dict: where to write into database when the status changes.
213                         It contains a dict with
214                             {collection: <str>, filter: {},  path: <str>},
215                             e.g. {collection: "nsrs", filter:
216                                 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
217         :param float progress_timeout:
218         :param float total_timeout:
219         """
220
221 1     @abc.abstractmethod
222 1     async def install_k8s_proxy_charm(
223         self,
224         charm_name: str,
225         namespace: str,
226         artifact_path: str,
227         db_dict: dict,
228         progress_timeout: float = None,
229         total_timeout: float = None,
230         config: dict = None,
231     ) -> str:
232         """
233         Install a k8s proxy charm
234
235         :param charm_name: Name of the charm being deployed
236         :param namespace: collection of all the uuids related to the charm.
237         :param str artifact_path: where to locate the artifacts (parent folder) using
238             the self.fs
239             the final artifact path will be a combination of this artifact_path and
240             additional string from the config_dict (e.g. charm name)
241         :param dict db_dict: where to write into database when the status changes.
242                         It contains a dict with
243                             {collection: <str>, filter: {},  path: <str>},
244                             e.g. {collection: "nsrs", filter:
245                                 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
246         :param float progress_timeout:
247         :param float total_timeout:
248         :param config: Dictionary with additional configuration
249
250         :returns ee_id: execution environment id.
251         """
252
253 1     @abc.abstractmethod
254 1     async def get_ee_ssh_public__key(
255         self,
256         ee_id: str,
257         db_dict: dict,
258         progress_timeout: float = None,
259         total_timeout: float = None,
260     ) -> str:
261         """
262         Generate a priv/pub key pair in the execution environment and return the public
263         key
264
265         :param str ee_id: the id of the execution environment returned by
266             create_execution_environment or register_execution_environment
267         :param dict db_dict: where to write into database when the status changes.
268                         It contains a dict with
269                             {collection: <str>, filter: {},  path: <str>},
270                             e.g. {collection: "nsrs", filter:
271                                 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
272         :param float progress_timeout:
273         :param float total_timeout:
274         :returns: public key of the execution environment
275                     For the case of juju proxy charm ssh-layered, it is the one
276                     returned by 'get-ssh-public-key' primitive.
277                     It raises a N2VC exception if fails
278         """
279
280 1     @abc.abstractmethod
281 1     async def add_relation(
282         self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
283     ):
284         """
285         Add a relation between two Execution Environments (using their associated
286         endpoints).
287
288         :param str ee_id_1: The id of the first execution environment
289         :param str ee_id_2: The id of the second execution environment
290         :param str endpoint_1: The endpoint in the first execution environment
291         :param str endpoint_2: The endpoint in the second execution environment
292         """
293
294     # TODO
295 1     @abc.abstractmethod
296 1     async def remove_relation(self):
297         """ """
298
299     # TODO
300 1     @abc.abstractmethod
301 1     async def deregister_execution_environments(self):
302         """ """
303
304 1     @abc.abstractmethod
305 1     async def delete_namespace(
306         self, namespace: str, db_dict: dict = None, total_timeout: float = None
307     ):
308         """
309         Remove a network scenario and its execution environments
310         :param namespace: [<nsi-id>].<ns-id>
311         :param dict db_dict: where to write into database when the status changes.
312                         It contains a dict with
313                             {collection: <str>, filter: {},  path: <str>},
314                             e.g. {collection: "nsrs", filter:
315                                 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
316         :param float total_timeout:
317         """
318
319 1     @abc.abstractmethod
320 1     async def delete_execution_environment(
321         self, ee_id: str, db_dict: dict = None, total_timeout: float = None
322     ):
323         """
324         Delete an execution environment
325         :param str ee_id: id of the execution environment to delete
326         :param dict db_dict: where to write into database when the status changes.
327                         It contains a dict with
328                             {collection: <str>, filter: {},  path: <str>},
329                             e.g. {collection: "nsrs", filter:
330                                 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
331         :param float total_timeout:
332         """
333
334 1     @abc.abstractmethod
335 1     async def upgrade_charm(
336         self,
337         ee_id: str = None,
338         path: str = None,
339         charm_id: str = None,
340         charm_type: str = None,
341         timeout: float = None,
342     ) -> str:
343         """This method upgrade charms in VNFs
344
345         Args:
346             ee_id:  Execution environment id
347             path:   Local path to the charm
348             charm_id:   charm-id
349             charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
350             timeout: (Float)    Timeout for the ns update operation
351
352         Returns:
353             The output of the update operation if status equals to "completed"
354         """
355
356 1     @abc.abstractmethod
357 1     async def exec_primitive(
358         self,
359         ee_id: str,
360         primitive_name: str,
361         params_dict: dict,
362         db_dict: dict = None,
363         progress_timeout: float = None,
364         total_timeout: float = None,
365     ) -> str:
366         """
367         Execute a primitive in the execution environment
368
369         :param str ee_id: the one returned by create_execution_environment or
370             register_execution_environment
371         :param str primitive_name: must be one defined in the software. There is one
372             called 'config', where, for the proxy case, the 'credentials' of VM are
373             provided
374         :param dict params_dict: parameters of the action
375         :param dict db_dict: where to write into database when the status changes.
376                         It contains a dict with
377                             {collection: <str>, filter: {},  path: <str>},
378                             e.g. {collection: "nsrs", filter:
379                                 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
380         :param float progress_timeout:
381         :param float total_timeout:
382         :returns str: primitive result, if ok. It raises exceptions in case of fail
383         """
384
385 1     async def disconnect(self):
386         """
387         Disconnect from VCA
388         """
389
390     """
391     ####################################################################################
392     ################################### P R I V A T E ##################################
393     ####################################################################################
394     """
395
396 1     def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
397         """
398         Split namespace components
399
400         :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
401         :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
402         """
403
404         # check parameters
405 1         if namespace is None or len(namespace) == 0:
406 0             raise N2VCBadArgumentsException(
407                 "Argument namespace is mandatory", ["namespace"]
408             )
409
410         # split namespace components
411 1         parts = namespace.split(".")
412 1         nsi_id = None
413 1         ns_id = None
414 1         vnf_id = None
415 1         vdu_id = None
416 1         vdu_count = None
417 1         if len(parts) > 0 and len(parts[0]) > 0:
418 0             nsi_id = parts[0]
419 1         if len(parts) > 1 and len(parts[1]) > 0:
420 1             ns_id = parts[1]
421 1         if len(parts) > 2 and len(parts[2]) > 0:
422 1             vnf_id = parts[2]
423 1         if len(parts) > 3 and len(parts[3]) > 0:
424 1             vdu_id = parts[3]
425 1             vdu_parts = parts[3].split("-")
426 1             if len(vdu_parts) > 1:
427 1                 vdu_id = vdu_parts[0]
428 1                 vdu_count = vdu_parts[1]
429
430 1         return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
431
432 1     async def write_app_status_to_db(
433         self,
434         db_dict: dict,
435         status: N2VCDeploymentStatus,
436         detailed_status: str,
437         vca_status: str,
438         entity_type: str,
439         vca_id: str = None,
440     ):
441         """
442         Write application status to database
443
444         :param: db_dict: DB dictionary
445         :param: status: Status of the application
446         :param: detailed_status: Detailed status
447         :param: vca_status: VCA status
448         :param: entity_type: Entity type ("application", "machine, and "action")
449         :param: vca_id: Id of the VCA. If None, the default VCA will be used.
450         """
451 0         if not db_dict:
452 0             self.log.debug("No db_dict => No database write")
453 0             return
454
455         # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
456         #          .format(str(status.value), detailed_status, vca_status, entity_type))
457
458 0         try:
459 0             the_table = db_dict["collection"]
460 0             the_filter = db_dict["filter"]
461 0             the_path = db_dict["path"]
462 0             if not the_path[-1] == ".":
463 0                 the_path = the_path + "."
464 0             update_dict = {
465                 the_path + "status": str(status.value),
466                 the_path + "detailed-status": detailed_status,
467                 the_path + "VCA-status": vca_status,
468                 the_path + "entity-type": entity_type,
469                 the_path + "status-time": str(time.time()),
470             }
471
472 0             self.db.set_one(
473                 table=the_table,
474                 q_filter=the_filter,
475                 update_dict=update_dict,
476                 fail_on_empty=True,
477             )
478
479             # database callback
480 0             if self.on_update_db:
481 0                 if asyncio.iscoroutinefunction(self.on_update_db):
482 0                     await self.on_update_db(
483                         the_table, the_filter, the_path, update_dict, vca_id=vca_id
484                     )
485                 else:
486 0                     self.on_update_db(
487                         the_table, the_filter, the_path, update_dict, vca_id=vca_id
488                     )
489
490 0         except DbException as e:
491 0             if e.http_code == HTTPStatus.NOT_FOUND:
492 0                 self.log.error(
493                     "NOT_FOUND error: Exception writing status to database: {}".format(
494                         e
495                     )
496                 )
497             else:
498 0                 self.log.info("Exception writing status to database: {}".format(e))
499
500 1     def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
501 0         if status not in JujuStatusToOSM[entity_type]:
502 0             self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
503 0             return N2VCDeploymentStatus.UNKNOWN
504 0         return JujuStatusToOSM[entity_type][status]
505
506
507 1 def obj_to_yaml(obj: object) -> str:
508     # dump to yaml
509 0     dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
510     # split lines
511 0     lines = dump_text.splitlines()
512     # remove !!python/object tags
513 0     yaml_text = ""
514 0     for line in lines:
515 0         index = line.find("!!python/object")
516 0         if index >= 0:
517 0             line = line[:index]
518 0         yaml_text += line + "\n"
519 0     return yaml_text
520
521
522 1 def obj_to_dict(obj: object) -> dict:
523     # convert obj to yaml
524 0     yaml_text = obj_to_yaml(obj)
525     # parse to dict
526 0     return yaml.load(yaml_text, Loader=yaml.SafeLoader)