Initial refactor of N2VC
[osm/N2VC.git] / n2vc / n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 import abc
25 import asyncio
26 from http import HTTPStatus
27 import os
28 import shlex
29 import subprocess
30 import time
31
32 from n2vc.exceptions import N2VCBadArgumentsException
33 from osm_common.dbmongo import DbException
34 import yaml
35
36 from n2vc.loggable import Loggable
37 from n2vc.utils import EntityType, JujuStatusToOSM, N2VCDeploymentStatus
38
39
40 class N2VCConnector(abc.ABC, Loggable):
41 """Generic N2VC connector
42
43 Abstract class
44 """
45
46 """
47 ####################################################################################
48 ################################### P U B L I C ####################################
49 ####################################################################################
50 """
51
52 def __init__(
53 self,
54 db: object,
55 fs: object,
56 log: object,
57 loop: object,
58 url: str,
59 username: str,
60 vca_config: dict,
61 on_update_db=None,
62 ):
63 """Initialize N2VC abstract connector. It defines de API for VCA connectors
64
65 :param object db: Mongo object managing the MongoDB (repo common DbBase)
66 :param object fs: FileSystem object managing the package artifacts (repo common
67 FsBase)
68 :param object log: the logging object to log to
69 :param object loop: the loop to use for asyncio (default current thread loop)
70 :param str url: a string that how to connect to the VCA (if needed, IP and port
71 can be obtained from there)
72 :param str username: the username to authenticate with VCA
73 :param dict vca_config: Additional parameters for the specific VCA. For example,
74 for juju it will contain:
75 secret: The password to authenticate with
76 public_key: The contents of the juju public SSH key
77 ca_cert str: The CA certificate used to authenticate
78 :param on_update_db: callback called when n2vc connector updates database.
79 Received arguments:
80 table: e.g. "nsrs"
81 filter: e.g. {_id: <nsd-id> }
82 path: e.g. "_admin.deployed.VCA.3."
83 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
84 """
85
86 # parent class
87 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
88
89 # check arguments
90 if db is None:
91 raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
92 if fs is None:
93 raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
94
95 self.log.info(
96 "url={}, username={}, vca_config={}".format(
97 url,
98 username,
99 {
100 k: v
101 for k, v in vca_config.items()
102 if k
103 not in ("host", "port", "user", "secret", "public_key", "ca_cert")
104 },
105 )
106 )
107
108 # store arguments into self
109 self.db = db
110 self.fs = fs
111 self.loop = loop or asyncio.get_event_loop()
112 self.url = url
113 self.username = username
114 self.vca_config = vca_config
115 self.on_update_db = on_update_db
116
117 # generate private/public key-pair
118 self.private_key_path = None
119 self.public_key_path = None
120 self.get_public_key()
121
122 @abc.abstractmethod
123 async def get_status(self, namespace: str, yaml_format: bool = True):
124 """Get namespace status
125
126 :param namespace: we obtain ns from namespace
127 :param yaml_format: returns a yaml string
128 """
129
130 # TODO: review which public key
131 def get_public_key(self) -> str:
132 """Get the VCA ssh-public-key
133
134 Returns the SSH public key from local mahine, to be injected into virtual
135 machines to be managed by the VCA.
136 First run, a ssh keypair will be created.
137 The public key is injected into a VM so that we can provision the
138 machine with Juju, after which Juju will communicate with the VM
139 directly via the juju agent.
140 """
141
142 # Find the path where we expect our key lives (~/.ssh)
143 homedir = os.environ.get("HOME")
144 if not homedir:
145 self.warning("No HOME environment variable, using /tmp")
146 homedir = "/tmp"
147 sshdir = "{}/.ssh".format(homedir)
148 if not os.path.exists(sshdir):
149 os.mkdir(sshdir)
150
151 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
152 self.public_key_path = "{}.pub".format(self.private_key_path)
153
154 # If we don't have a key generated, then we have to generate it using ssh-keygen
155 if not os.path.exists(self.private_key_path):
156 cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
157 "rsa", "4096", self.private_key_path
158 )
159 # run command with arguments
160 subprocess.check_output(shlex.split(cmd))
161
162 # Read the public key. Only one public key (one line) in the file
163 with open(self.public_key_path, "r") as file:
164 public_key = file.readline()
165
166 return public_key
167
168 @abc.abstractmethod
169 async def create_execution_environment(
170 self,
171 namespace: str,
172 db_dict: dict,
173 reuse_ee_id: str = None,
174 progress_timeout: float = None,
175 total_timeout: float = None,
176 ) -> (str, dict):
177 """Create an Execution Environment. Returns when it is created or raises an
178 exception on failing
179
180 :param str namespace: Contains a dot separate string.
181 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
182 :param dict db_dict: where to write to database when the status changes.
183 It contains a dictionary with {collection: str, filter: {}, path: str},
184 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
185 "_admin.deployed.VCA.3"}
186 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
187 older environment
188 :param float progress_timeout:
189 :param float total_timeout:
190 :returns str, dict: id of the new execution environment and credentials for it
191 (credentials can contains hostname, username, etc depending on
192 underlying cloud)
193 """
194
195 @abc.abstractmethod
196 async def register_execution_environment(
197 self,
198 namespace: str,
199 credentials: dict,
200 db_dict: dict,
201 progress_timeout: float = None,
202 total_timeout: float = None,
203 ) -> str:
204 """
205 Register an existing execution environment at the VCA
206
207 :param str namespace: same as create_execution_environment method
208 :param dict credentials: credentials to access the existing execution
209 environment
210 (it can contains hostname, username, path to private key, etc depending on
211 underlying cloud)
212 :param dict db_dict: where to write to database when the status changes.
213 It contains a dictionary with {collection: str, filter: {}, path: str},
214 e.g. {collection: "nsrs", filter:
215 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
216 :param float progress_timeout:
217 :param float total_timeout:
218 :returns str: id of the execution environment
219 """
220
221 @abc.abstractmethod
222 async def install_configuration_sw(
223 self,
224 ee_id: str,
225 artifact_path: str,
226 db_dict: dict,
227 progress_timeout: float = None,
228 total_timeout: float = None,
229 ):
230 """
231 Install the software inside the execution environment identified by ee_id
232
233 :param str ee_id: the id of the execution environment returned by
234 create_execution_environment or register_execution_environment
235 :param str artifact_path: where to locate the artifacts (parent folder) using
236 the self.fs
237 the final artifact path will be a combination of this artifact_path and
238 additional string from the config_dict (e.g. charm name)
239 :param dict db_dict: where to write into database when the status changes.
240 It contains a dict with
241 {collection: <str>, filter: {}, path: <str>},
242 e.g. {collection: "nsrs", filter:
243 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
244 :param float progress_timeout:
245 :param float total_timeout:
246 """
247
248 @abc.abstractmethod
249 async def get_ee_ssh_public__key(
250 self,
251 ee_id: str,
252 db_dict: dict,
253 progress_timeout: float = None,
254 total_timeout: float = None,
255 ) -> str:
256 """
257 Generate a priv/pub key pair in the execution environment and return the public
258 key
259
260 :param str ee_id: the id of the execution environment returned by
261 create_execution_environment or register_execution_environment
262 :param dict db_dict: where to write into database when the status changes.
263 It contains a dict with
264 {collection: <str>, filter: {}, path: <str>},
265 e.g. {collection: "nsrs", filter:
266 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
267 :param float progress_timeout:
268 :param float total_timeout:
269 :returns: public key of the execution environment
270 For the case of juju proxy charm ssh-layered, it is the one
271 returned by 'get-ssh-public-key' primitive.
272 It raises a N2VC exception if fails
273 """
274
275 @abc.abstractmethod
276 async def add_relation(
277 self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
278 ):
279 """
280 Add a relation between two Execution Environments (using their associated
281 endpoints).
282
283 :param str ee_id_1: The id of the first execution environment
284 :param str ee_id_2: The id of the second execution environment
285 :param str endpoint_1: The endpoint in the first execution environment
286 :param str endpoint_2: The endpoint in the second execution environment
287 """
288
289 # TODO
290 @abc.abstractmethod
291 async def remove_relation(self):
292 """
293 """
294
295 # TODO
296 @abc.abstractmethod
297 async def deregister_execution_environments(self):
298 """
299 """
300
301 @abc.abstractmethod
302 async def delete_namespace(
303 self, namespace: str, db_dict: dict = None, total_timeout: float = None
304 ):
305 """
306 Remove a network scenario and its execution environments
307 :param namespace: [<nsi-id>].<ns-id>
308 :param dict db_dict: where to write into database when the status changes.
309 It contains a dict with
310 {collection: <str>, filter: {}, path: <str>},
311 e.g. {collection: "nsrs", filter:
312 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
313 :param float total_timeout:
314 """
315
316 @abc.abstractmethod
317 async def delete_execution_environment(
318 self, ee_id: str, db_dict: dict = None, total_timeout: float = None
319 ):
320 """
321 Delete an execution environment
322 :param str ee_id: id of the execution environment to delete
323 :param dict db_dict: where to write into database when the status changes.
324 It contains a dict with
325 {collection: <str>, filter: {}, path: <str>},
326 e.g. {collection: "nsrs", filter:
327 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
328 :param float total_timeout:
329 """
330
331 @abc.abstractmethod
332 async def exec_primitive(
333 self,
334 ee_id: str,
335 primitive_name: str,
336 params_dict: dict,
337 db_dict: dict = None,
338 progress_timeout: float = None,
339 total_timeout: float = None,
340 ) -> str:
341 """
342 Execute a primitive in the execution environment
343
344 :param str ee_id: the one returned by create_execution_environment or
345 register_execution_environment
346 :param str primitive_name: must be one defined in the software. There is one
347 called 'config', where, for the proxy case, the 'credentials' of VM are
348 provided
349 :param dict params_dict: parameters of the action
350 :param dict db_dict: where to write into database when the status changes.
351 It contains a dict with
352 {collection: <str>, filter: {}, path: <str>},
353 e.g. {collection: "nsrs", filter:
354 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
355 :param float progress_timeout:
356 :param float total_timeout:
357 :returns str: primitive result, if ok. It raises exceptions in case of fail
358 """
359
360 async def disconnect(self):
361 """
362 Disconnect from VCA
363 """
364
365 """
366 ####################################################################################
367 ################################### P R I V A T E ##################################
368 ####################################################################################
369 """
370
371 def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
372 """
373 Split namespace components
374
375 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
376 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
377 """
378
379 # check parameters
380 if namespace is None or len(namespace) == 0:
381 raise N2VCBadArgumentsException(
382 "Argument namespace is mandatory", ["namespace"]
383 )
384
385 # split namespace components
386 parts = namespace.split(".")
387 nsi_id = None
388 ns_id = None
389 vnf_id = None
390 vdu_id = None
391 vdu_count = None
392 if len(parts) > 0 and len(parts[0]) > 0:
393 nsi_id = parts[0]
394 if len(parts) > 1 and len(parts[1]) > 0:
395 ns_id = parts[1]
396 if len(parts) > 2 and len(parts[2]) > 0:
397 vnf_id = parts[2]
398 if len(parts) > 3 and len(parts[3]) > 0:
399 vdu_id = parts[3]
400 vdu_parts = parts[3].split("-")
401 if len(vdu_parts) > 1:
402 vdu_id = vdu_parts[0]
403 vdu_count = vdu_parts[1]
404
405 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
406
407 async def write_app_status_to_db(
408 self,
409 db_dict: dict,
410 status: N2VCDeploymentStatus,
411 detailed_status: str,
412 vca_status: str,
413 entity_type: str,
414 ):
415 if not db_dict:
416 self.log.debug("No db_dict => No database write")
417 return
418
419 # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
420 # .format(str(status.value), detailed_status, vca_status, entity_type))
421
422 try:
423
424 the_table = db_dict["collection"]
425 the_filter = db_dict["filter"]
426 the_path = db_dict["path"]
427 if not the_path[-1] == ".":
428 the_path = the_path + "."
429 update_dict = {
430 the_path + "status": str(status.value),
431 the_path + "detailed-status": detailed_status,
432 the_path + "VCA-status": vca_status,
433 the_path + "entity-type": entity_type,
434 the_path + "status-time": str(time.time()),
435 }
436
437 self.db.set_one(
438 table=the_table,
439 q_filter=the_filter,
440 update_dict=update_dict,
441 fail_on_empty=True,
442 )
443
444 # database callback
445 if self.on_update_db:
446 if asyncio.iscoroutinefunction(self.on_update_db):
447 await self.on_update_db(
448 the_table, the_filter, the_path, update_dict
449 )
450 else:
451 self.on_update_db(the_table, the_filter, the_path, update_dict)
452
453 except DbException as e:
454 if e.http_code == HTTPStatus.NOT_FOUND:
455 self.log.error(
456 "NOT_FOUND error: Exception writing status to database: {}".format(
457 e
458 )
459 )
460 else:
461 self.log.info("Exception writing status to database: {}".format(e))
462
463 def osm_status(self, entity_type: EntityType, status: str) -> N2VCDeploymentStatus:
464 if status not in JujuStatusToOSM[entity_type]:
465 self.log.warning("Status {} not found in JujuStatusToOSM.")
466 return N2VCDeploymentStatus.UNKNOWN
467 return JujuStatusToOSM[entity_type][status]
468
469
470 # DEPRECATED
471 def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus:
472 if statustype == "application" or statustype == "unit":
473 if status in ["waiting", "maintenance"]:
474 return N2VCDeploymentStatus.RUNNING
475 if status in ["error"]:
476 return N2VCDeploymentStatus.FAILED
477 elif status in ["active"]:
478 return N2VCDeploymentStatus.COMPLETED
479 elif status in ["blocked"]:
480 return N2VCDeploymentStatus.RUNNING
481 else:
482 return N2VCDeploymentStatus.UNKNOWN
483 elif statustype == "action":
484 if status in ["running"]:
485 return N2VCDeploymentStatus.RUNNING
486 elif status in ["completed"]:
487 return N2VCDeploymentStatus.COMPLETED
488 else:
489 return N2VCDeploymentStatus.UNKNOWN
490 elif statustype == "machine":
491 if status in ["pending"]:
492 return N2VCDeploymentStatus.PENDING
493 elif status in ["started"]:
494 return N2VCDeploymentStatus.COMPLETED
495 else:
496 return N2VCDeploymentStatus.UNKNOWN
497
498 return N2VCDeploymentStatus.FAILED
499
500
501 def obj_to_yaml(obj: object) -> str:
502 # dump to yaml
503 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
504 # split lines
505 lines = dump_text.splitlines()
506 # remove !!python/object tags
507 yaml_text = ""
508 for line in lines:
509 index = line.find("!!python/object")
510 if index >= 0:
511 line = line[:index]
512 yaml_text += line + "\n"
513 return yaml_text
514
515
516 def obj_to_dict(obj: object) -> dict:
517 # convert obj to yaml
518 yaml_text = obj_to_yaml(obj)
519 # parse to dict
520 return yaml.load(yaml_text, Loader=yaml.Loader)