Fix bug 1050: Make provisioner asynchronous
[osm/N2VC.git] / n2vc / n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 import abc
25 import asyncio
26 from enum import Enum
27 from http import HTTPStatus
28 import os
29 import shlex
30 import subprocess
31 import time
32
33 from n2vc.exceptions import N2VCBadArgumentsException
34 from osm_common.dbmongo import DbException
35 import yaml
36
37 from n2vc.loggable import Loggable
38
39
40 class N2VCDeploymentStatus(Enum):
41 PENDING = "pending"
42 RUNNING = "running"
43 COMPLETED = "completed"
44 FAILED = "failed"
45 UNKNOWN = "unknown"
46
47
48 class N2VCConnector(abc.ABC, Loggable):
49 """Generic N2VC connector
50
51 Abstract class
52 """
53
54 """
55 ####################################################################################
56 ################################### P U B L I C ####################################
57 ####################################################################################
58 """
59
60 def __init__(
61 self,
62 db: object,
63 fs: object,
64 log: object,
65 loop: object,
66 url: str,
67 username: str,
68 vca_config: dict,
69 on_update_db=None,
70 ):
71 """Initialize N2VC abstract connector. It defines de API for VCA connectors
72
73 :param object db: Mongo object managing the MongoDB (repo common DbBase)
74 :param object fs: FileSystem object managing the package artifacts (repo common
75 FsBase)
76 :param object log: the logging object to log to
77 :param object loop: the loop to use for asyncio (default current thread loop)
78 :param str url: a string that how to connect to the VCA (if needed, IP and port
79 can be obtained from there)
80 :param str username: the username to authenticate with VCA
81 :param dict vca_config: Additional parameters for the specific VCA. For example,
82 for juju it will contain:
83 secret: The password to authenticate with
84 public_key: The contents of the juju public SSH key
85 ca_cert str: The CA certificate used to authenticate
86 :param on_update_db: callback called when n2vc connector updates database.
87 Received arguments:
88 table: e.g. "nsrs"
89 filter: e.g. {_id: <nsd-id> }
90 path: e.g. "_admin.deployed.VCA.3."
91 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
92 """
93
94 # parent class
95 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
96
97 # check arguments
98 if db is None:
99 raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
100 if fs is None:
101 raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
102
103 self.log.info(
104 "url={}, username={}, vca_config={}".format(
105 url,
106 username,
107 {
108 k: v
109 for k, v in vca_config.items()
110 if k
111 not in ("host", "port", "user", "secret", "public_key", "ca_cert")
112 },
113 )
114 )
115
116 # store arguments into self
117 self.db = db
118 self.fs = fs
119 self.loop = loop or asyncio.get_event_loop()
120 self.url = url
121 self.username = username
122 self.vca_config = vca_config
123 self.on_update_db = on_update_db
124
125 # generate private/public key-pair
126 self.private_key_path = None
127 self.public_key_path = None
128 self.get_public_key()
129
130 @abc.abstractmethod
131 async def get_status(self, namespace: str, yaml_format: bool = True):
132 """Get namespace status
133
134 :param namespace: we obtain ns from namespace
135 :param yaml_format: returns a yaml string
136 """
137
138 # TODO: review which public key
139 def get_public_key(self) -> str:
140 """Get the VCA ssh-public-key
141
142 Returns the SSH public key from local mahine, to be injected into virtual
143 machines to be managed by the VCA.
144 First run, a ssh keypair will be created.
145 The public key is injected into a VM so that we can provision the
146 machine with Juju, after which Juju will communicate with the VM
147 directly via the juju agent.
148 """
149
150 # Find the path where we expect our key lives (~/.ssh)
151 homedir = os.environ.get("HOME")
152 if not homedir:
153 self.warning("No HOME environment variable, using /tmp")
154 homedir = "/tmp"
155 sshdir = "{}/.ssh".format(homedir)
156 if not os.path.exists(sshdir):
157 os.mkdir(sshdir)
158
159 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
160 self.public_key_path = "{}.pub".format(self.private_key_path)
161
162 # If we don't have a key generated, then we have to generate it using ssh-keygen
163 if not os.path.exists(self.private_key_path):
164 cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
165 "rsa", "4096", self.private_key_path
166 )
167 # run command with arguments
168 subprocess.check_output(shlex.split(cmd))
169
170 # Read the public key. Only one public key (one line) in the file
171 with open(self.public_key_path, "r") as file:
172 public_key = file.readline()
173
174 return public_key
175
176 @abc.abstractmethod
177 async def create_execution_environment(
178 self,
179 namespace: str,
180 db_dict: dict,
181 reuse_ee_id: str = None,
182 progress_timeout: float = None,
183 total_timeout: float = None,
184 ) -> (str, dict):
185 """Create an Execution Environment. Returns when it is created or raises an
186 exception on failing
187
188 :param str namespace: Contains a dot separate string.
189 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
190 :param dict db_dict: where to write to database when the status changes.
191 It contains a dictionary with {collection: str, filter: {}, path: str},
192 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
193 "_admin.deployed.VCA.3"}
194 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
195 older environment
196 :param float progress_timeout:
197 :param float total_timeout:
198 :returns str, dict: id of the new execution environment and credentials for it
199 (credentials can contains hostname, username, etc depending on
200 underlying cloud)
201 """
202
203 @abc.abstractmethod
204 async def register_execution_environment(
205 self,
206 namespace: str,
207 credentials: dict,
208 db_dict: dict,
209 progress_timeout: float = None,
210 total_timeout: float = None,
211 ) -> str:
212 """
213 Register an existing execution environment at the VCA
214
215 :param str namespace: same as create_execution_environment method
216 :param dict credentials: credentials to access the existing execution
217 environment
218 (it can contains hostname, username, path to private key, etc depending on
219 underlying cloud)
220 :param dict db_dict: where to write to database when the status changes.
221 It contains a dictionary with {collection: str, filter: {}, path: str},
222 e.g. {collection: "nsrs", filter:
223 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
224 :param float progress_timeout:
225 :param float total_timeout:
226 :returns str: id of the execution environment
227 """
228
229 @abc.abstractmethod
230 async def install_configuration_sw(
231 self,
232 ee_id: str,
233 artifact_path: str,
234 db_dict: dict,
235 progress_timeout: float = None,
236 total_timeout: float = None,
237 ):
238 """
239 Install the software inside the execution environment identified by ee_id
240
241 :param str ee_id: the id of the execution environment returned by
242 create_execution_environment or register_execution_environment
243 :param str artifact_path: where to locate the artifacts (parent folder) using
244 the self.fs
245 the final artifact path will be a combination of this artifact_path and
246 additional string from the config_dict (e.g. charm name)
247 :param dict db_dict: where to write into database when the status changes.
248 It contains a dict with
249 {collection: <str>, filter: {}, path: <str>},
250 e.g. {collection: "nsrs", filter:
251 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
252 :param float progress_timeout:
253 :param float total_timeout:
254 """
255
256 @abc.abstractmethod
257 async def get_ee_ssh_public__key(
258 self,
259 ee_id: str,
260 db_dict: dict,
261 progress_timeout: float = None,
262 total_timeout: float = None,
263 ) -> str:
264 """
265 Generate a priv/pub key pair in the execution environment and return the public
266 key
267
268 :param str ee_id: the id of the execution environment returned by
269 create_execution_environment or register_execution_environment
270 :param dict db_dict: where to write into database when the status changes.
271 It contains a dict with
272 {collection: <str>, filter: {}, path: <str>},
273 e.g. {collection: "nsrs", filter:
274 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
275 :param float progress_timeout:
276 :param float total_timeout:
277 :returns: public key of the execution environment
278 For the case of juju proxy charm ssh-layered, it is the one
279 returned by 'get-ssh-public-key' primitive.
280 It raises a N2VC exception if fails
281 """
282
283 @abc.abstractmethod
284 async def add_relation(
285 self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
286 ):
287 """
288 Add a relation between two Execution Environments (using their associated
289 endpoints).
290
291 :param str ee_id_1: The id of the first execution environment
292 :param str ee_id_2: The id of the second execution environment
293 :param str endpoint_1: The endpoint in the first execution environment
294 :param str endpoint_2: The endpoint in the second execution environment
295 """
296
297 # TODO
298 @abc.abstractmethod
299 async def remove_relation(self):
300 """
301 """
302
303 # TODO
304 @abc.abstractmethod
305 async def deregister_execution_environments(self):
306 """
307 """
308
309 @abc.abstractmethod
310 async def delete_namespace(
311 self, namespace: str, db_dict: dict = None, total_timeout: float = None
312 ):
313 """
314 Remove a network scenario and its execution environments
315 :param namespace: [<nsi-id>].<ns-id>
316 :param dict db_dict: where to write into database when the status changes.
317 It contains a dict with
318 {collection: <str>, filter: {}, path: <str>},
319 e.g. {collection: "nsrs", filter:
320 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
321 :param float total_timeout:
322 """
323
324 @abc.abstractmethod
325 async def delete_execution_environment(
326 self, ee_id: str, db_dict: dict = None, total_timeout: float = None
327 ):
328 """
329 Delete an execution environment
330 :param str ee_id: id of the execution environment to delete
331 :param dict db_dict: where to write into database when the status changes.
332 It contains a dict with
333 {collection: <str>, filter: {}, path: <str>},
334 e.g. {collection: "nsrs", filter:
335 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
336 :param float total_timeout:
337 """
338
339 @abc.abstractmethod
340 async def exec_primitive(
341 self,
342 ee_id: str,
343 primitive_name: str,
344 params_dict: dict,
345 db_dict: dict = None,
346 progress_timeout: float = None,
347 total_timeout: float = None,
348 ) -> str:
349 """
350 Execute a primitive in the execution environment
351
352 :param str ee_id: the one returned by create_execution_environment or
353 register_execution_environment
354 :param str primitive_name: must be one defined in the software. There is one
355 called 'config', where, for the proxy case, the 'credentials' of VM are
356 provided
357 :param dict params_dict: parameters of the action
358 :param dict db_dict: where to write into database when the status changes.
359 It contains a dict with
360 {collection: <str>, filter: {}, path: <str>},
361 e.g. {collection: "nsrs", filter:
362 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
363 :param float progress_timeout:
364 :param float total_timeout:
365 :returns str: primitive result, if ok. It raises exceptions in case of fail
366 """
367
368 async def disconnect(self):
369 """
370 Disconnect from VCA
371 """
372
373 """
374 ####################################################################################
375 ################################### P R I V A T E ##################################
376 ####################################################################################
377 """
378
379 def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
380 """
381 Split namespace components
382
383 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
384 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
385 """
386
387 # check parameters
388 if namespace is None or len(namespace) == 0:
389 raise N2VCBadArgumentsException(
390 "Argument namespace is mandatory", ["namespace"]
391 )
392
393 # split namespace components
394 parts = namespace.split(".")
395 nsi_id = None
396 ns_id = None
397 vnf_id = None
398 vdu_id = None
399 vdu_count = None
400 if len(parts) > 0 and len(parts[0]) > 0:
401 nsi_id = parts[0]
402 if len(parts) > 1 and len(parts[1]) > 0:
403 ns_id = parts[1]
404 if len(parts) > 2 and len(parts[2]) > 0:
405 vnf_id = parts[2]
406 if len(parts) > 3 and len(parts[3]) > 0:
407 vdu_id = parts[3]
408 vdu_parts = parts[3].split("-")
409 if len(vdu_parts) > 1:
410 vdu_id = vdu_parts[0]
411 vdu_count = vdu_parts[1]
412
413 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
414
415 async def write_app_status_to_db(
416 self,
417 db_dict: dict,
418 status: N2VCDeploymentStatus,
419 detailed_status: str,
420 vca_status: str,
421 entity_type: str,
422 ):
423 if not db_dict:
424 self.log.debug("No db_dict => No database write")
425 return
426
427 # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
428 # .format(str(status.value), detailed_status, vca_status, entity_type))
429
430 try:
431
432 the_table = db_dict["collection"]
433 the_filter = db_dict["filter"]
434 the_path = db_dict["path"]
435 if not the_path[-1] == ".":
436 the_path = the_path + "."
437 update_dict = {
438 the_path + "status": str(status.value),
439 the_path + "detailed-status": detailed_status,
440 the_path + "VCA-status": vca_status,
441 the_path + "entity-type": entity_type,
442 the_path + "status-time": str(time.time()),
443 }
444
445 self.db.set_one(
446 table=the_table,
447 q_filter=the_filter,
448 update_dict=update_dict,
449 fail_on_empty=True,
450 )
451
452 # database callback
453 if self.on_update_db:
454 if asyncio.iscoroutinefunction(self.on_update_db):
455 await self.on_update_db(
456 the_table, the_filter, the_path, update_dict
457 )
458 else:
459 self.on_update_db(the_table, the_filter, the_path, update_dict)
460
461 except DbException as e:
462 if e.http_code == HTTPStatus.NOT_FOUND:
463 self.log.error(
464 "NOT_FOUND error: Exception writing status to database: {}".format(
465 e
466 )
467 )
468 else:
469 self.log.info("Exception writing status to database: {}".format(e))
470
471
472 def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus:
473 if statustype == "application" or statustype == "unit":
474 if status in ["waiting", "maintenance"]:
475 return N2VCDeploymentStatus.RUNNING
476 if status in ["error"]:
477 return N2VCDeploymentStatus.FAILED
478 elif status in ["active"]:
479 return N2VCDeploymentStatus.COMPLETED
480 elif status in ["blocked"]:
481 return N2VCDeploymentStatus.RUNNING
482 else:
483 return N2VCDeploymentStatus.UNKNOWN
484 elif statustype == "action":
485 if status in ["running"]:
486 return N2VCDeploymentStatus.RUNNING
487 elif status in ["completed"]:
488 return N2VCDeploymentStatus.COMPLETED
489 else:
490 return N2VCDeploymentStatus.UNKNOWN
491 elif statustype == "machine":
492 if status in ["pending"]:
493 return N2VCDeploymentStatus.PENDING
494 elif status in ["started"]:
495 return N2VCDeploymentStatus.COMPLETED
496 else:
497 return N2VCDeploymentStatus.UNKNOWN
498
499 return N2VCDeploymentStatus.FAILED
500
501
502 def obj_to_yaml(obj: object) -> str:
503 # dump to yaml
504 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
505 # split lines
506 lines = dump_text.splitlines()
507 # remove !!python/object tags
508 yaml_text = ""
509 for line in lines:
510 index = line.find("!!python/object")
511 if index >= 0:
512 line = line[:index]
513 yaml_text += line + "\n"
514 return yaml_text
515
516
517 def obj_to_dict(obj: object) -> dict:
518 # convert obj to yaml
519 yaml_text = obj_to_yaml(obj)
520 # parse to dict
521 return yaml.load(yaml_text, Loader=yaml.Loader)