Python3.10/Ubuntu 22.04 part 2
[osm/N2VC.git] / n2vc / n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 import abc
25 import asyncio
26 from http import HTTPStatus
27 import os
28 import shlex
29 import subprocess
30 import time
31
32 from n2vc.exceptions import N2VCBadArgumentsException
33 from osm_common.dbmongo import DbException
34 import yaml
35
36 from n2vc.loggable import Loggable
37 from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
38
39
40 class N2VCConnector(abc.ABC, Loggable):
41 """Generic N2VC connector
42
43 Abstract class
44 """
45
46 """
47 ####################################################################################
48 ################################### P U B L I C ####################################
49 ####################################################################################
50 """
51
52 def __init__(
53 self,
54 db: object,
55 fs: object,
56 log: object,
57 on_update_db=None,
58 **kwargs,
59 ):
60 """Initialize N2VC abstract connector. It defines de API for VCA connectors
61
62 :param object db: Mongo object managing the MongoDB (repo common DbBase)
63 :param object fs: FileSystem object managing the package artifacts (repo common
64 FsBase)
65 :param object log: the logging object to log to
66 :param on_update_db: callback called when n2vc connector updates database.
67 Received arguments:
68 table: e.g. "nsrs"
69 filter: e.g. {_id: <nsd-id> }
70 path: e.g. "_admin.deployed.VCA.3."
71 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
72 """
73
74 # parent class
75 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
76
77 # check arguments
78 if db is None:
79 raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
80 if fs is None:
81 raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
82
83 # store arguments into self
84 self.db = db
85 self.fs = fs
86 self.on_update_db = on_update_db
87
88 # generate private/public key-pair
89 self.private_key_path = None
90 self.public_key_path = None
91
92 @abc.abstractmethod
93 async def get_status(self, namespace: str, yaml_format: bool = True):
94 """Get namespace status
95
96 :param namespace: we obtain ns from namespace
97 :param yaml_format: returns a yaml string
98 """
99
100 # TODO: review which public key
101 def get_public_key(self) -> str:
102 """Get the VCA ssh-public-key
103
104 Returns the SSH public key from local mahine, to be injected into virtual
105 machines to be managed by the VCA.
106 First run, a ssh keypair will be created.
107 The public key is injected into a VM so that we can provision the
108 machine with Juju, after which Juju will communicate with the VM
109 directly via the juju agent.
110 """
111
112 # Find the path where we expect our key lives (~/.ssh)
113 homedir = os.environ.get("HOME")
114 if not homedir:
115 self.log.warning("No HOME environment variable, using /tmp")
116 homedir = "/tmp"
117 sshdir = "{}/.ssh".format(homedir)
118 if not os.path.exists(sshdir):
119 os.mkdir(sshdir)
120
121 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
122 self.public_key_path = "{}.pub".format(self.private_key_path)
123
124 # If we don't have a key generated, then we have to generate it using ssh-keygen
125 if not os.path.exists(self.private_key_path):
126 cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
127 "rsa", "4096", self.private_key_path
128 )
129 # run command with arguments
130 subprocess.check_output(shlex.split(cmd))
131
132 # Read the public key. Only one public key (one line) in the file
133 with open(self.public_key_path, "r") as file:
134 public_key = file.readline()
135
136 return public_key
137
138 @abc.abstractmethod
139 async def create_execution_environment(
140 self,
141 namespace: str,
142 db_dict: dict,
143 reuse_ee_id: str = None,
144 progress_timeout: float = None,
145 total_timeout: float = None,
146 ) -> (str, dict):
147 """Create an Execution Environment. Returns when it is created or raises an
148 exception on failing
149
150 :param str namespace: Contains a dot separate string.
151 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
152 :param dict db_dict: where to write to database when the status changes.
153 It contains a dictionary with {collection: str, filter: {}, path: str},
154 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
155 "_admin.deployed.VCA.3"}
156 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
157 older environment
158 :param float progress_timeout:
159 :param float total_timeout:
160 :returns str, dict: id of the new execution environment and credentials for it
161 (credentials can contains hostname, username, etc depending on
162 underlying cloud)
163 """
164
165 @abc.abstractmethod
166 async def register_execution_environment(
167 self,
168 namespace: str,
169 credentials: dict,
170 db_dict: dict,
171 progress_timeout: float = None,
172 total_timeout: float = None,
173 ) -> str:
174 """
175 Register an existing execution environment at the VCA
176
177 :param str namespace: same as create_execution_environment method
178 :param dict credentials: credentials to access the existing execution
179 environment
180 (it can contains hostname, username, path to private key, etc depending on
181 underlying cloud)
182 :param dict db_dict: where to write to database when the status changes.
183 It contains a dictionary with {collection: str, filter: {}, path: str},
184 e.g. {collection: "nsrs", filter:
185 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
186 :param float progress_timeout:
187 :param float total_timeout:
188 :returns str: id of the execution environment
189 """
190
191 @abc.abstractmethod
192 async def install_configuration_sw(
193 self,
194 ee_id: str,
195 artifact_path: str,
196 db_dict: dict,
197 progress_timeout: float = None,
198 total_timeout: float = None,
199 ):
200 """
201 Install the software inside the execution environment identified by ee_id
202
203 :param str ee_id: the id of the execution environment returned by
204 create_execution_environment or register_execution_environment
205 :param str artifact_path: where to locate the artifacts (parent folder) using
206 the self.fs
207 the final artifact path will be a combination of this artifact_path and
208 additional string from the config_dict (e.g. charm name)
209 :param dict db_dict: where to write into database when the status changes.
210 It contains a dict with
211 {collection: <str>, filter: {}, path: <str>},
212 e.g. {collection: "nsrs", filter:
213 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
214 :param float progress_timeout:
215 :param float total_timeout:
216 """
217
218 @abc.abstractmethod
219 async def install_k8s_proxy_charm(
220 self,
221 charm_name: str,
222 namespace: str,
223 artifact_path: str,
224 db_dict: dict,
225 progress_timeout: float = None,
226 total_timeout: float = None,
227 config: dict = None,
228 ) -> str:
229 """
230 Install a k8s proxy charm
231
232 :param charm_name: Name of the charm being deployed
233 :param namespace: collection of all the uuids related to the charm.
234 :param str artifact_path: where to locate the artifacts (parent folder) using
235 the self.fs
236 the final artifact path will be a combination of this artifact_path and
237 additional string from the config_dict (e.g. charm name)
238 :param dict db_dict: where to write into database when the status changes.
239 It contains a dict with
240 {collection: <str>, filter: {}, path: <str>},
241 e.g. {collection: "nsrs", filter:
242 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
243 :param float progress_timeout:
244 :param float total_timeout:
245 :param config: Dictionary with additional configuration
246
247 :returns ee_id: execution environment id.
248 """
249
250 @abc.abstractmethod
251 async def get_ee_ssh_public__key(
252 self,
253 ee_id: str,
254 db_dict: dict,
255 progress_timeout: float = None,
256 total_timeout: float = None,
257 ) -> str:
258 """
259 Generate a priv/pub key pair in the execution environment and return the public
260 key
261
262 :param str ee_id: the id of the execution environment returned by
263 create_execution_environment or register_execution_environment
264 :param dict db_dict: where to write into database when the status changes.
265 It contains a dict with
266 {collection: <str>, filter: {}, path: <str>},
267 e.g. {collection: "nsrs", filter:
268 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
269 :param float progress_timeout:
270 :param float total_timeout:
271 :returns: public key of the execution environment
272 For the case of juju proxy charm ssh-layered, it is the one
273 returned by 'get-ssh-public-key' primitive.
274 It raises a N2VC exception if fails
275 """
276
277 @abc.abstractmethod
278 async def add_relation(
279 self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
280 ):
281 """
282 Add a relation between two Execution Environments (using their associated
283 endpoints).
284
285 :param str ee_id_1: The id of the first execution environment
286 :param str ee_id_2: The id of the second execution environment
287 :param str endpoint_1: The endpoint in the first execution environment
288 :param str endpoint_2: The endpoint in the second execution environment
289 """
290
291 # TODO
292 @abc.abstractmethod
293 async def remove_relation(self):
294 """ """
295
296 # TODO
297 @abc.abstractmethod
298 async def deregister_execution_environments(self):
299 """ """
300
301 @abc.abstractmethod
302 async def delete_namespace(
303 self, namespace: str, db_dict: dict = None, total_timeout: float = None
304 ):
305 """
306 Remove a network scenario and its execution environments
307 :param namespace: [<nsi-id>].<ns-id>
308 :param dict db_dict: where to write into database when the status changes.
309 It contains a dict with
310 {collection: <str>, filter: {}, path: <str>},
311 e.g. {collection: "nsrs", filter:
312 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
313 :param float total_timeout:
314 """
315
316 @abc.abstractmethod
317 async def delete_execution_environment(
318 self, ee_id: str, db_dict: dict = None, total_timeout: float = None
319 ):
320 """
321 Delete an execution environment
322 :param str ee_id: id of the execution environment to delete
323 :param dict db_dict: where to write into database when the status changes.
324 It contains a dict with
325 {collection: <str>, filter: {}, path: <str>},
326 e.g. {collection: "nsrs", filter:
327 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
328 :param float total_timeout:
329 """
330
331 @abc.abstractmethod
332 async def upgrade_charm(
333 self,
334 ee_id: str = None,
335 path: str = None,
336 charm_id: str = None,
337 charm_type: str = None,
338 timeout: float = None,
339 ) -> str:
340 """This method upgrade charms in VNFs
341
342 Args:
343 ee_id: Execution environment id
344 path: Local path to the charm
345 charm_id: charm-id
346 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
347 timeout: (Float) Timeout for the ns update operation
348
349 Returns:
350 The output of the update operation if status equals to "completed"
351 """
352
353 @abc.abstractmethod
354 async def exec_primitive(
355 self,
356 ee_id: str,
357 primitive_name: str,
358 params_dict: dict,
359 db_dict: dict = None,
360 progress_timeout: float = None,
361 total_timeout: float = None,
362 ) -> str:
363 """
364 Execute a primitive in the execution environment
365
366 :param str ee_id: the one returned by create_execution_environment or
367 register_execution_environment
368 :param str primitive_name: must be one defined in the software. There is one
369 called 'config', where, for the proxy case, the 'credentials' of VM are
370 provided
371 :param dict params_dict: parameters of the action
372 :param dict db_dict: where to write into database when the status changes.
373 It contains a dict with
374 {collection: <str>, filter: {}, path: <str>},
375 e.g. {collection: "nsrs", filter:
376 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
377 :param float progress_timeout:
378 :param float total_timeout:
379 :returns str: primitive result, if ok. It raises exceptions in case of fail
380 """
381
382 async def disconnect(self):
383 """
384 Disconnect from VCA
385 """
386
387 """
388 ####################################################################################
389 ################################### P R I V A T E ##################################
390 ####################################################################################
391 """
392
393 def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
394 """
395 Split namespace components
396
397 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
398 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
399 """
400
401 # check parameters
402 if namespace is None or len(namespace) == 0:
403 raise N2VCBadArgumentsException(
404 "Argument namespace is mandatory", ["namespace"]
405 )
406
407 # split namespace components
408 parts = namespace.split(".")
409 nsi_id = None
410 ns_id = None
411 vnf_id = None
412 vdu_id = None
413 vdu_count = None
414 if len(parts) > 0 and len(parts[0]) > 0:
415 nsi_id = parts[0]
416 if len(parts) > 1 and len(parts[1]) > 0:
417 ns_id = parts[1]
418 if len(parts) > 2 and len(parts[2]) > 0:
419 vnf_id = parts[2]
420 if len(parts) > 3 and len(parts[3]) > 0:
421 vdu_id = parts[3]
422 vdu_parts = parts[3].split("-")
423 if len(vdu_parts) > 1:
424 vdu_id = vdu_parts[0]
425 vdu_count = vdu_parts[1]
426
427 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
428
429 async def write_app_status_to_db(
430 self,
431 db_dict: dict,
432 status: N2VCDeploymentStatus,
433 detailed_status: str,
434 vca_status: str,
435 entity_type: str,
436 vca_id: str = None,
437 ):
438 """
439 Write application status to database
440
441 :param: db_dict: DB dictionary
442 :param: status: Status of the application
443 :param: detailed_status: Detailed status
444 :param: vca_status: VCA status
445 :param: entity_type: Entity type ("application", "machine, and "action")
446 :param: vca_id: Id of the VCA. If None, the default VCA will be used.
447 """
448 if not db_dict:
449 self.log.debug("No db_dict => No database write")
450 return
451
452 # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
453 # .format(str(status.value), detailed_status, vca_status, entity_type))
454
455 try:
456 the_table = db_dict["collection"]
457 the_filter = db_dict["filter"]
458 the_path = db_dict["path"]
459 if not the_path[-1] == ".":
460 the_path = the_path + "."
461 update_dict = {
462 the_path + "status": str(status.value),
463 the_path + "detailed-status": detailed_status,
464 the_path + "VCA-status": vca_status,
465 the_path + "entity-type": entity_type,
466 the_path + "status-time": str(time.time()),
467 }
468
469 self.db.set_one(
470 table=the_table,
471 q_filter=the_filter,
472 update_dict=update_dict,
473 fail_on_empty=True,
474 )
475
476 # database callback
477 if self.on_update_db:
478 if asyncio.iscoroutinefunction(self.on_update_db):
479 await self.on_update_db(
480 the_table, the_filter, the_path, update_dict, vca_id=vca_id
481 )
482 else:
483 self.on_update_db(
484 the_table, the_filter, the_path, update_dict, vca_id=vca_id
485 )
486
487 except DbException as e:
488 if e.http_code == HTTPStatus.NOT_FOUND:
489 self.log.error(
490 "NOT_FOUND error: Exception writing status to database: {}".format(
491 e
492 )
493 )
494 else:
495 self.log.info("Exception writing status to database: {}".format(e))
496
497 def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
498 if status not in JujuStatusToOSM[entity_type]:
499 self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
500 return N2VCDeploymentStatus.UNKNOWN
501 return JujuStatusToOSM[entity_type][status]
502
503
504 def obj_to_yaml(obj: object) -> str:
505 # dump to yaml
506 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
507 # split lines
508 lines = dump_text.splitlines()
509 # remove !!python/object tags
510 yaml_text = ""
511 for line in lines:
512 index = line.find("!!python/object")
513 if index >= 0:
514 line = line[:index]
515 yaml_text += line + "\n"
516 return yaml_text
517
518
519 def obj_to_dict(obj: object) -> dict:
520 # convert obj to yaml
521 yaml_text = obj_to_yaml(obj)
522 # parse to dict
523 return yaml.load(yaml_text, Loader=yaml.SafeLoader)