d38bcadbb6f8fb9b131c516fff5f40eb0d9c97e9
[osm/N2VC.git] / n2vc / n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 import abc
25 import asyncio
26 from http import HTTPStatus
27 import os
28 import shlex
29 import subprocess
30 import time
31
32 from n2vc.exceptions import N2VCBadArgumentsException
33 from osm_common.dbmongo import DbException
34 import yaml
35
36 from n2vc.loggable import Loggable
37 from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
38
39
40 class N2VCConnector(abc.ABC, Loggable):
41 """Generic N2VC connector
42
43 Abstract class
44 """
45
46 """
47 ####################################################################################
48 ################################### P U B L I C ####################################
49 ####################################################################################
50 """
51
52 def __init__(
53 self,
54 db: object,
55 fs: object,
56 log: object,
57 loop: object,
58 on_update_db=None,
59 **kwargs,
60 ):
61 """Initialize N2VC abstract connector. It defines de API for VCA connectors
62
63 :param object db: Mongo object managing the MongoDB (repo common DbBase)
64 :param object fs: FileSystem object managing the package artifacts (repo common
65 FsBase)
66 :param object log: the logging object to log to
67 :param object loop: the loop to use for asyncio (default current thread loop)
68 :param on_update_db: callback called when n2vc connector updates database.
69 Received arguments:
70 table: e.g. "nsrs"
71 filter: e.g. {_id: <nsd-id> }
72 path: e.g. "_admin.deployed.VCA.3."
73 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
74 """
75
76 # parent class
77 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
78
79 # check arguments
80 if db is None:
81 raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
82 if fs is None:
83 raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
84
85 # store arguments into self
86 self.db = db
87 self.fs = fs
88 self.loop = loop or asyncio.get_event_loop()
89 self.on_update_db = on_update_db
90
91 # generate private/public key-pair
92 self.private_key_path = None
93 self.public_key_path = None
94
95 @abc.abstractmethod
96 async def get_status(self, namespace: str, yaml_format: bool = True):
97 """Get namespace status
98
99 :param namespace: we obtain ns from namespace
100 :param yaml_format: returns a yaml string
101 """
102
103 # TODO: review which public key
104 def get_public_key(self) -> str:
105 """Get the VCA ssh-public-key
106
107 Returns the SSH public key from local mahine, to be injected into virtual
108 machines to be managed by the VCA.
109 First run, a ssh keypair will be created.
110 The public key is injected into a VM so that we can provision the
111 machine with Juju, after which Juju will communicate with the VM
112 directly via the juju agent.
113 """
114
115 # Find the path where we expect our key lives (~/.ssh)
116 homedir = os.environ.get("HOME")
117 if not homedir:
118 self.log.warning("No HOME environment variable, using /tmp")
119 homedir = "/tmp"
120 sshdir = "{}/.ssh".format(homedir)
121 if not os.path.exists(sshdir):
122 os.mkdir(sshdir)
123
124 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
125 self.public_key_path = "{}.pub".format(self.private_key_path)
126
127 # If we don't have a key generated, then we have to generate it using ssh-keygen
128 if not os.path.exists(self.private_key_path):
129 cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
130 "rsa", "4096", self.private_key_path
131 )
132 # run command with arguments
133 subprocess.check_output(shlex.split(cmd))
134
135 # Read the public key. Only one public key (one line) in the file
136 with open(self.public_key_path, "r") as file:
137 public_key = file.readline()
138
139 return public_key
140
141 @abc.abstractmethod
142 async def create_execution_environment(
143 self,
144 namespace: str,
145 db_dict: dict,
146 reuse_ee_id: str = None,
147 progress_timeout: float = None,
148 total_timeout: float = None,
149 ) -> (str, dict):
150 """Create an Execution Environment. Returns when it is created or raises an
151 exception on failing
152
153 :param str namespace: Contains a dot separate string.
154 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
155 :param dict db_dict: where to write to database when the status changes.
156 It contains a dictionary with {collection: str, filter: {}, path: str},
157 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
158 "_admin.deployed.VCA.3"}
159 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
160 older environment
161 :param float progress_timeout:
162 :param float total_timeout:
163 :returns str, dict: id of the new execution environment and credentials for it
164 (credentials can contains hostname, username, etc depending on
165 underlying cloud)
166 """
167
168 @abc.abstractmethod
169 async def register_execution_environment(
170 self,
171 namespace: str,
172 credentials: dict,
173 db_dict: dict,
174 progress_timeout: float = None,
175 total_timeout: float = None,
176 ) -> str:
177 """
178 Register an existing execution environment at the VCA
179
180 :param str namespace: same as create_execution_environment method
181 :param dict credentials: credentials to access the existing execution
182 environment
183 (it can contains hostname, username, path to private key, etc depending on
184 underlying cloud)
185 :param dict db_dict: where to write to database when the status changes.
186 It contains a dictionary with {collection: str, filter: {}, path: str},
187 e.g. {collection: "nsrs", filter:
188 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
189 :param float progress_timeout:
190 :param float total_timeout:
191 :returns str: id of the execution environment
192 """
193
194 @abc.abstractmethod
195 async def install_configuration_sw(
196 self,
197 ee_id: str,
198 artifact_path: str,
199 db_dict: dict,
200 progress_timeout: float = None,
201 total_timeout: float = None,
202 ):
203 """
204 Install the software inside the execution environment identified by ee_id
205
206 :param str ee_id: the id of the execution environment returned by
207 create_execution_environment or register_execution_environment
208 :param str artifact_path: where to locate the artifacts (parent folder) using
209 the self.fs
210 the final artifact path will be a combination of this artifact_path and
211 additional string from the config_dict (e.g. charm name)
212 :param dict db_dict: where to write into database when the status changes.
213 It contains a dict with
214 {collection: <str>, filter: {}, path: <str>},
215 e.g. {collection: "nsrs", filter:
216 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
217 :param float progress_timeout:
218 :param float total_timeout:
219 """
220
221 @abc.abstractmethod
222 async def install_k8s_proxy_charm(
223 self,
224 charm_name: str,
225 namespace: str,
226 artifact_path: str,
227 db_dict: dict,
228 progress_timeout: float = None,
229 total_timeout: float = None,
230 config: dict = None,
231 ) -> str:
232 """
233 Install a k8s proxy charm
234
235 :param charm_name: Name of the charm being deployed
236 :param namespace: collection of all the uuids related to the charm.
237 :param str artifact_path: where to locate the artifacts (parent folder) using
238 the self.fs
239 the final artifact path will be a combination of this artifact_path and
240 additional string from the config_dict (e.g. charm name)
241 :param dict db_dict: where to write into database when the status changes.
242 It contains a dict with
243 {collection: <str>, filter: {}, path: <str>},
244 e.g. {collection: "nsrs", filter:
245 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
246 :param float progress_timeout:
247 :param float total_timeout:
248 :param config: Dictionary with additional configuration
249
250 :returns ee_id: execution environment id.
251 """
252
253 @abc.abstractmethod
254 async def get_ee_ssh_public__key(
255 self,
256 ee_id: str,
257 db_dict: dict,
258 progress_timeout: float = None,
259 total_timeout: float = None,
260 ) -> str:
261 """
262 Generate a priv/pub key pair in the execution environment and return the public
263 key
264
265 :param str ee_id: the id of the execution environment returned by
266 create_execution_environment or register_execution_environment
267 :param dict db_dict: where to write into database when the status changes.
268 It contains a dict with
269 {collection: <str>, filter: {}, path: <str>},
270 e.g. {collection: "nsrs", filter:
271 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
272 :param float progress_timeout:
273 :param float total_timeout:
274 :returns: public key of the execution environment
275 For the case of juju proxy charm ssh-layered, it is the one
276 returned by 'get-ssh-public-key' primitive.
277 It raises a N2VC exception if fails
278 """
279
280 @abc.abstractmethod
281 async def add_relation(
282 self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
283 ):
284 """
285 Add a relation between two Execution Environments (using their associated
286 endpoints).
287
288 :param str ee_id_1: The id of the first execution environment
289 :param str ee_id_2: The id of the second execution environment
290 :param str endpoint_1: The endpoint in the first execution environment
291 :param str endpoint_2: The endpoint in the second execution environment
292 """
293
294 # TODO
295 @abc.abstractmethod
296 async def remove_relation(self):
297 """ """
298
299 # TODO
300 @abc.abstractmethod
301 async def deregister_execution_environments(self):
302 """ """
303
304 @abc.abstractmethod
305 async def delete_namespace(
306 self, namespace: str, db_dict: dict = None, total_timeout: float = None
307 ):
308 """
309 Remove a network scenario and its execution environments
310 :param namespace: [<nsi-id>].<ns-id>
311 :param dict db_dict: where to write into database when the status changes.
312 It contains a dict with
313 {collection: <str>, filter: {}, path: <str>},
314 e.g. {collection: "nsrs", filter:
315 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
316 :param float total_timeout:
317 """
318
319 @abc.abstractmethod
320 async def delete_execution_environment(
321 self, ee_id: str, db_dict: dict = None, total_timeout: float = None
322 ):
323 """
324 Delete an execution environment
325 :param str ee_id: id of the execution environment to delete
326 :param dict db_dict: where to write into database when the status changes.
327 It contains a dict with
328 {collection: <str>, filter: {}, path: <str>},
329 e.g. {collection: "nsrs", filter:
330 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
331 :param float total_timeout:
332 """
333
334 @abc.abstractmethod
335 async def upgrade_charm(
336 self,
337 ee_id: str = None,
338 path: str = None,
339 charm_id: str = None,
340 charm_type: str = None,
341 timeout: float = None,
342 ) -> str:
343 """This method upgrade charms in VNFs
344
345 Args:
346 ee_id: Execution environment id
347 path: Local path to the charm
348 charm_id: charm-id
349 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
350 timeout: (Float) Timeout for the ns update operation
351
352 Returns:
353 The output of the update operation if status equals to "completed"
354 """
355
356 @abc.abstractmethod
357 async def exec_primitive(
358 self,
359 ee_id: str,
360 primitive_name: str,
361 params_dict: dict,
362 db_dict: dict = None,
363 progress_timeout: float = None,
364 total_timeout: float = None,
365 ) -> str:
366 """
367 Execute a primitive in the execution environment
368
369 :param str ee_id: the one returned by create_execution_environment or
370 register_execution_environment
371 :param str primitive_name: must be one defined in the software. There is one
372 called 'config', where, for the proxy case, the 'credentials' of VM are
373 provided
374 :param dict params_dict: parameters of the action
375 :param dict db_dict: where to write into database when the status changes.
376 It contains a dict with
377 {collection: <str>, filter: {}, path: <str>},
378 e.g. {collection: "nsrs", filter:
379 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
380 :param float progress_timeout:
381 :param float total_timeout:
382 :returns str: primitive result, if ok. It raises exceptions in case of fail
383 """
384
385 async def disconnect(self):
386 """
387 Disconnect from VCA
388 """
389
390 """
391 ####################################################################################
392 ################################### P R I V A T E ##################################
393 ####################################################################################
394 """
395
396 def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
397 """
398 Split namespace components
399
400 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
401 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
402 """
403
404 # check parameters
405 if namespace is None or len(namespace) == 0:
406 raise N2VCBadArgumentsException(
407 "Argument namespace is mandatory", ["namespace"]
408 )
409
410 # split namespace components
411 parts = namespace.split(".")
412 nsi_id = None
413 ns_id = None
414 vnf_id = None
415 vdu_id = None
416 vdu_count = None
417 if len(parts) > 0 and len(parts[0]) > 0:
418 nsi_id = parts[0]
419 if len(parts) > 1 and len(parts[1]) > 0:
420 ns_id = parts[1]
421 if len(parts) > 2 and len(parts[2]) > 0:
422 vnf_id = parts[2]
423 if len(parts) > 3 and len(parts[3]) > 0:
424 vdu_id = parts[3]
425 vdu_parts = parts[3].split("-")
426 if len(vdu_parts) > 1:
427 vdu_id = vdu_parts[0]
428 vdu_count = vdu_parts[1]
429
430 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
431
432 async def write_app_status_to_db(
433 self,
434 db_dict: dict,
435 status: N2VCDeploymentStatus,
436 detailed_status: str,
437 vca_status: str,
438 entity_type: str,
439 vca_id: str = None,
440 ):
441 """
442 Write application status to database
443
444 :param: db_dict: DB dictionary
445 :param: status: Status of the application
446 :param: detailed_status: Detailed status
447 :param: vca_status: VCA status
448 :param: entity_type: Entity type ("application", "machine, and "action")
449 :param: vca_id: Id of the VCA. If None, the default VCA will be used.
450 """
451 if not db_dict:
452 self.log.debug("No db_dict => No database write")
453 return
454
455 # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
456 # .format(str(status.value), detailed_status, vca_status, entity_type))
457
458 try:
459
460 the_table = db_dict["collection"]
461 the_filter = db_dict["filter"]
462 the_path = db_dict["path"]
463 if not the_path[-1] == ".":
464 the_path = the_path + "."
465 update_dict = {
466 the_path + "status": str(status.value),
467 the_path + "detailed-status": detailed_status,
468 the_path + "VCA-status": vca_status,
469 the_path + "entity-type": entity_type,
470 the_path + "status-time": str(time.time()),
471 }
472
473 self.db.set_one(
474 table=the_table,
475 q_filter=the_filter,
476 update_dict=update_dict,
477 fail_on_empty=True,
478 )
479
480 # database callback
481 if self.on_update_db:
482 if asyncio.iscoroutinefunction(self.on_update_db):
483 await self.on_update_db(
484 the_table, the_filter, the_path, update_dict, vca_id=vca_id
485 )
486 else:
487 self.on_update_db(
488 the_table, the_filter, the_path, update_dict, vca_id=vca_id
489 )
490
491 except DbException as e:
492 if e.http_code == HTTPStatus.NOT_FOUND:
493 self.log.error(
494 "NOT_FOUND error: Exception writing status to database: {}".format(
495 e
496 )
497 )
498 else:
499 self.log.info("Exception writing status to database: {}".format(e))
500
501 def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
502 if status not in JujuStatusToOSM[entity_type]:
503 self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
504 return N2VCDeploymentStatus.UNKNOWN
505 return JujuStatusToOSM[entity_type][status]
506
507
508 def obj_to_yaml(obj: object) -> str:
509 # dump to yaml
510 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
511 # split lines
512 lines = dump_text.splitlines()
513 # remove !!python/object tags
514 yaml_text = ""
515 for line in lines:
516 index = line.find("!!python/object")
517 if index >= 0:
518 line = line[:index]
519 yaml_text += line + "\n"
520 return yaml_text
521
522
523 def obj_to_dict(obj: object) -> dict:
524 # convert obj to yaml
525 yaml_text = obj_to_yaml(obj)
526 # parse to dict
527 return yaml.load(yaml_text, Loader=yaml.SafeLoader)