Pin black version in tox.ini to 23.12.1
[osm/N2VC.git] / n2vc / n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 import abc
25 import asyncio
26 from http import HTTPStatus
27 from shlex import quote
28 import os
29 import shlex
30 import subprocess
31 import time
32
33 from n2vc.exceptions import N2VCBadArgumentsException
34 from osm_common.dbmongo import DbException
35 import yaml
36
37 from n2vc.loggable import Loggable
38 from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
39
40
41 class N2VCConnector(abc.ABC, Loggable):
42 """Generic N2VC connector
43
44 Abstract class
45 """
46
47 """
48 ####################################################################################
49 ################################### P U B L I C ####################################
50 ####################################################################################
51 """
52
53 def __init__(
54 self,
55 db: object,
56 fs: object,
57 log: object,
58 on_update_db=None,
59 **kwargs,
60 ):
61 """Initialize N2VC abstract connector. It defines de API for VCA connectors
62
63 :param object db: Mongo object managing the MongoDB (repo common DbBase)
64 :param object fs: FileSystem object managing the package artifacts (repo common
65 FsBase)
66 :param object log: the logging object to log to
67 :param on_update_db: callback called when n2vc connector updates database.
68 Received arguments:
69 table: e.g. "nsrs"
70 filter: e.g. {_id: <nsd-id> }
71 path: e.g. "_admin.deployed.VCA.3."
72 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
73 """
74
75 # parent class
76 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
77
78 # check arguments
79 if db is None:
80 raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
81 if fs is None:
82 raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
83
84 # store arguments into self
85 self.db = db
86 self.fs = fs
87 self.on_update_db = on_update_db
88
89 # generate private/public key-pair
90 self.private_key_path = None
91 self.public_key_path = None
92
93 @abc.abstractmethod
94 async def get_status(self, namespace: str, yaml_format: bool = True):
95 """Get namespace status
96
97 :param namespace: we obtain ns from namespace
98 :param yaml_format: returns a yaml string
99 """
100
101 # TODO: review which public key
102 def get_public_key(self) -> str:
103 """Get the VCA ssh-public-key
104
105 Returns the SSH public key from local mahine, to be injected into virtual
106 machines to be managed by the VCA.
107 First run, a ssh keypair will be created.
108 The public key is injected into a VM so that we can provision the
109 machine with Juju, after which Juju will communicate with the VM
110 directly via the juju agent.
111 """
112
113 # Find the path where we expect our key lives (~/.ssh)
114 homedir = os.environ.get("HOME")
115 if not homedir:
116 self.log.warning("No HOME environment variable, using /tmp")
117 homedir = "/tmp"
118 sshdir = "{}/.ssh".format(homedir)
119 sshdir = os.path.realpath(os.path.normpath(os.path.abspath(sshdir)))
120 if not os.path.exists(sshdir):
121 os.mkdir(sshdir)
122
123 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
124 self.private_key_path = os.path.realpath(
125 os.path.normpath(os.path.abspath(self.private_key_path))
126 )
127 self.public_key_path = "{}.pub".format(self.private_key_path)
128 self.public_key_path = os.path.realpath(
129 os.path.normpath(os.path.abspath(self.public_key_path))
130 )
131
132 # If we don't have a key generated, then we have to generate it using ssh-keygen
133 if not os.path.exists(self.private_key_path):
134 command = "ssh-keygen -t {} -b {} -N '' -f {}".format(
135 "rsa", "4096", quote(self.private_key_path)
136 )
137 # run command with arguments
138 args = shlex.split(command)
139 subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
140
141 # Read the public key. Only one public key (one line) in the file
142 with open(self.public_key_path, "r") as file:
143 public_key = file.readline()
144
145 return public_key
146
147 @abc.abstractmethod
148 async def create_execution_environment(
149 self,
150 namespace: str,
151 db_dict: dict,
152 reuse_ee_id: str = None,
153 progress_timeout: float = None,
154 total_timeout: float = None,
155 ) -> tuple[str, dict]:
156 """Create an Execution Environment. Returns when it is created or raises an
157 exception on failing
158
159 :param str namespace: Contains a dot separate string.
160 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
161 :param dict db_dict: where to write to database when the status changes.
162 It contains a dictionary with {collection: str, filter: {}, path: str},
163 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
164 "_admin.deployed.VCA.3"}
165 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
166 older environment
167 :param float progress_timeout:
168 :param float total_timeout:
169 :returns str, dict: id of the new execution environment and credentials for it
170 (credentials can contains hostname, username, etc depending on
171 underlying cloud)
172 """
173
174 @abc.abstractmethod
175 async def register_execution_environment(
176 self,
177 namespace: str,
178 credentials: dict,
179 db_dict: dict,
180 progress_timeout: float = None,
181 total_timeout: float = None,
182 ) -> str:
183 """
184 Register an existing execution environment at the VCA
185
186 :param str namespace: same as create_execution_environment method
187 :param dict credentials: credentials to access the existing execution
188 environment
189 (it can contains hostname, username, path to private key, etc depending on
190 underlying cloud)
191 :param dict db_dict: where to write to database when the status changes.
192 It contains a dictionary with {collection: str, filter: {}, path: str},
193 e.g. {collection: "nsrs", filter:
194 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
195 :param float progress_timeout:
196 :param float total_timeout:
197 :returns str: id of the execution environment
198 """
199
200 @abc.abstractmethod
201 async def install_configuration_sw(
202 self,
203 ee_id: str,
204 artifact_path: str,
205 db_dict: dict,
206 progress_timeout: float = None,
207 total_timeout: float = None,
208 ):
209 """
210 Install the software inside the execution environment identified by ee_id
211
212 :param str ee_id: the id of the execution environment returned by
213 create_execution_environment or register_execution_environment
214 :param str artifact_path: where to locate the artifacts (parent folder) using
215 the self.fs
216 the final artifact path will be a combination of this artifact_path and
217 additional string from the config_dict (e.g. charm name)
218 :param dict db_dict: where to write into database when the status changes.
219 It contains a dict with
220 {collection: <str>, filter: {}, path: <str>},
221 e.g. {collection: "nsrs", filter:
222 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
223 :param float progress_timeout:
224 :param float total_timeout:
225 """
226
227 @abc.abstractmethod
228 async def install_k8s_proxy_charm(
229 self,
230 charm_name: str,
231 namespace: str,
232 artifact_path: str,
233 db_dict: dict,
234 progress_timeout: float = None,
235 total_timeout: float = None,
236 config: dict = None,
237 ) -> str:
238 """
239 Install a k8s proxy charm
240
241 :param charm_name: Name of the charm being deployed
242 :param namespace: collection of all the uuids related to the charm.
243 :param str artifact_path: where to locate the artifacts (parent folder) using
244 the self.fs
245 the final artifact path will be a combination of this artifact_path and
246 additional string from the config_dict (e.g. charm name)
247 :param dict db_dict: where to write into database when the status changes.
248 It contains a dict with
249 {collection: <str>, filter: {}, path: <str>},
250 e.g. {collection: "nsrs", filter:
251 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
252 :param float progress_timeout:
253 :param float total_timeout:
254 :param config: Dictionary with additional configuration
255
256 :returns ee_id: execution environment id.
257 """
258
259 @abc.abstractmethod
260 async def get_ee_ssh_public__key(
261 self,
262 ee_id: str,
263 db_dict: dict,
264 progress_timeout: float = None,
265 total_timeout: float = None,
266 ) -> str:
267 """
268 Generate a priv/pub key pair in the execution environment and return the public
269 key
270
271 :param str ee_id: the id of the execution environment returned by
272 create_execution_environment or register_execution_environment
273 :param dict db_dict: where to write into database when the status changes.
274 It contains a dict with
275 {collection: <str>, filter: {}, path: <str>},
276 e.g. {collection: "nsrs", filter:
277 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
278 :param float progress_timeout:
279 :param float total_timeout:
280 :returns: public key of the execution environment
281 For the case of juju proxy charm ssh-layered, it is the one
282 returned by 'get-ssh-public-key' primitive.
283 It raises a N2VC exception if fails
284 """
285
286 @abc.abstractmethod
287 async def add_relation(
288 self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
289 ):
290 """
291 Add a relation between two Execution Environments (using their associated
292 endpoints).
293
294 :param str ee_id_1: The id of the first execution environment
295 :param str ee_id_2: The id of the second execution environment
296 :param str endpoint_1: The endpoint in the first execution environment
297 :param str endpoint_2: The endpoint in the second execution environment
298 """
299
300 # TODO
301 @abc.abstractmethod
302 async def remove_relation(self):
303 """ """
304
305 # TODO
306 @abc.abstractmethod
307 async def deregister_execution_environments(self):
308 """ """
309
310 @abc.abstractmethod
311 async def delete_namespace(
312 self, namespace: str, db_dict: dict = None, total_timeout: float = None
313 ):
314 """
315 Remove a network scenario and its execution environments
316 :param namespace: [<nsi-id>].<ns-id>
317 :param dict db_dict: where to write into database when the status changes.
318 It contains a dict with
319 {collection: <str>, filter: {}, path: <str>},
320 e.g. {collection: "nsrs", filter:
321 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
322 :param float total_timeout:
323 """
324
325 @abc.abstractmethod
326 async def delete_execution_environment(
327 self, ee_id: str, db_dict: dict = None, total_timeout: float = None
328 ):
329 """
330 Delete an execution environment
331 :param str ee_id: id of the execution environment to delete
332 :param dict db_dict: where to write into database when the status changes.
333 It contains a dict with
334 {collection: <str>, filter: {}, path: <str>},
335 e.g. {collection: "nsrs", filter:
336 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
337 :param float total_timeout:
338 """
339
340 @abc.abstractmethod
341 async def upgrade_charm(
342 self,
343 ee_id: str = None,
344 path: str = None,
345 charm_id: str = None,
346 charm_type: str = None,
347 timeout: float = None,
348 ) -> str:
349 """This method upgrade charms in VNFs
350
351 Args:
352 ee_id: Execution environment id
353 path: Local path to the charm
354 charm_id: charm-id
355 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
356 timeout: (Float) Timeout for the ns update operation
357
358 Returns:
359 The output of the update operation if status equals to "completed"
360 """
361
362 @abc.abstractmethod
363 async def exec_primitive(
364 self,
365 ee_id: str,
366 primitive_name: str,
367 params_dict: dict,
368 db_dict: dict = None,
369 progress_timeout: float = None,
370 total_timeout: float = None,
371 ) -> str:
372 """
373 Execute a primitive in the execution environment
374
375 :param str ee_id: the one returned by create_execution_environment or
376 register_execution_environment
377 :param str primitive_name: must be one defined in the software. There is one
378 called 'config', where, for the proxy case, the 'credentials' of VM are
379 provided
380 :param dict params_dict: parameters of the action
381 :param dict db_dict: where to write into database when the status changes.
382 It contains a dict with
383 {collection: <str>, filter: {}, path: <str>},
384 e.g. {collection: "nsrs", filter:
385 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
386 :param float progress_timeout:
387 :param float total_timeout:
388 :returns str: primitive result, if ok. It raises exceptions in case of fail
389 """
390
391 async def disconnect(self):
392 """
393 Disconnect from VCA
394 """
395
396 """
397 ####################################################################################
398 ################################### P R I V A T E ##################################
399 ####################################################################################
400 """
401
402 def _get_namespace_components(
403 self, namespace: str
404 ) -> tuple[str, str, str, str, str]:
405 """
406 Split namespace components
407
408 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
409 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
410 """
411
412 # check parameters
413 if namespace is None or len(namespace) == 0:
414 raise N2VCBadArgumentsException(
415 "Argument namespace is mandatory", ["namespace"]
416 )
417
418 # split namespace components
419 parts = namespace.split(".")
420 nsi_id = None
421 ns_id = None
422 vnf_id = None
423 vdu_id = None
424 vdu_count = None
425 if len(parts) > 0 and len(parts[0]) > 0:
426 nsi_id = parts[0]
427 if len(parts) > 1 and len(parts[1]) > 0:
428 ns_id = parts[1]
429 if len(parts) > 2 and len(parts[2]) > 0:
430 vnf_id = parts[2]
431 if len(parts) > 3 and len(parts[3]) > 0:
432 vdu_id = parts[3]
433 vdu_parts = parts[3].split("-")
434 if len(vdu_parts) > 1:
435 vdu_id = vdu_parts[0]
436 vdu_count = vdu_parts[1]
437
438 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
439
440 async def write_app_status_to_db(
441 self,
442 db_dict: dict,
443 status: N2VCDeploymentStatus,
444 detailed_status: str,
445 vca_status: str,
446 entity_type: str,
447 vca_id: str = None,
448 ):
449 """
450 Write application status to database
451
452 :param: db_dict: DB dictionary
453 :param: status: Status of the application
454 :param: detailed_status: Detailed status
455 :param: vca_status: VCA status
456 :param: entity_type: Entity type ("application", "machine, and "action")
457 :param: vca_id: Id of the VCA. If None, the default VCA will be used.
458 """
459 if not db_dict:
460 self.log.debug("No db_dict => No database write")
461 return
462
463 # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
464 # .format(str(status.value), detailed_status, vca_status, entity_type))
465
466 try:
467 the_table = db_dict["collection"]
468 the_filter = db_dict["filter"]
469 the_path = db_dict["path"]
470 if not the_path[-1] == ".":
471 the_path = the_path + "."
472 update_dict = {
473 the_path + "status": str(status.value),
474 the_path + "detailed-status": detailed_status,
475 the_path + "VCA-status": vca_status,
476 the_path + "entity-type": entity_type,
477 the_path + "status-time": str(time.time()),
478 }
479
480 self.db.set_one(
481 table=the_table,
482 q_filter=the_filter,
483 update_dict=update_dict,
484 fail_on_empty=True,
485 )
486
487 # database callback
488 if self.on_update_db:
489 if asyncio.iscoroutinefunction(self.on_update_db):
490 await self.on_update_db(
491 the_table, the_filter, the_path, update_dict, vca_id=vca_id
492 )
493 else:
494 self.on_update_db(
495 the_table, the_filter, the_path, update_dict, vca_id=vca_id
496 )
497
498 except DbException as e:
499 if e.http_code == HTTPStatus.NOT_FOUND:
500 self.log.error(
501 "NOT_FOUND error: Exception writing status to database: {}".format(
502 e
503 )
504 )
505 else:
506 self.log.info("Exception writing status to database: {}".format(e))
507
508 def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
509 if status not in JujuStatusToOSM[entity_type]:
510 self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
511 return N2VCDeploymentStatus.UNKNOWN
512 return JujuStatusToOSM[entity_type][status]
513
514
515 def obj_to_yaml(obj: object) -> str:
516 # dump to yaml
517 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
518 # split lines
519 lines = dump_text.splitlines()
520 # remove !!python/object tags
521 yaml_text = ""
522 for line in lines:
523 index = line.find("!!python/object")
524 if index >= 0:
525 line = line[:index]
526 yaml_text += line + "\n"
527 return yaml_text
528
529
530 def obj_to_dict(obj: object) -> dict:
531 # convert obj to yaml
532 yaml_text = obj_to_yaml(obj)
533 # parse to dict
534 return yaml.load(yaml_text, Loader=yaml.SafeLoader)