Fix bug 1636: remove the default 30s timeout in retry
[osm/N2VC.git] / n2vc / n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 import abc
25 import asyncio
26 from http import HTTPStatus
27 import os
28 import shlex
29 import subprocess
30 import time
31
32 from n2vc.exceptions import N2VCBadArgumentsException
33 from osm_common.dbmongo import DbException
34 import yaml
35
36 from n2vc.loggable import Loggable
37 from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
38
39
40 class N2VCConnector(abc.ABC, Loggable):
41 """Generic N2VC connector
42
43 Abstract class
44 """
45
46 """
47 ####################################################################################
48 ################################### P U B L I C ####################################
49 ####################################################################################
50 """
51
52 def __init__(
53 self,
54 db: object,
55 fs: object,
56 log: object,
57 loop: object,
58 on_update_db=None,
59 **kwargs,
60 ):
61 """Initialize N2VC abstract connector. It defines de API for VCA connectors
62
63 :param object db: Mongo object managing the MongoDB (repo common DbBase)
64 :param object fs: FileSystem object managing the package artifacts (repo common
65 FsBase)
66 :param object log: the logging object to log to
67 :param object loop: the loop to use for asyncio (default current thread loop)
68 :param on_update_db: callback called when n2vc connector updates database.
69 Received arguments:
70 table: e.g. "nsrs"
71 filter: e.g. {_id: <nsd-id> }
72 path: e.g. "_admin.deployed.VCA.3."
73 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
74 """
75
76 # parent class
77 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
78
79 # check arguments
80 if db is None:
81 raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
82 if fs is None:
83 raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
84
85 # store arguments into self
86 self.db = db
87 self.fs = fs
88 self.loop = loop or asyncio.get_event_loop()
89 self.on_update_db = on_update_db
90
91 # generate private/public key-pair
92 self.private_key_path = None
93 self.public_key_path = None
94
95 @abc.abstractmethod
96 async def get_status(self, namespace: str, yaml_format: bool = True):
97 """Get namespace status
98
99 :param namespace: we obtain ns from namespace
100 :param yaml_format: returns a yaml string
101 """
102
103 # TODO: review which public key
104 def get_public_key(self) -> str:
105 """Get the VCA ssh-public-key
106
107 Returns the SSH public key from local mahine, to be injected into virtual
108 machines to be managed by the VCA.
109 First run, a ssh keypair will be created.
110 The public key is injected into a VM so that we can provision the
111 machine with Juju, after which Juju will communicate with the VM
112 directly via the juju agent.
113 """
114
115 # Find the path where we expect our key lives (~/.ssh)
116 homedir = os.environ.get("HOME")
117 if not homedir:
118 self.log.warning("No HOME environment variable, using /tmp")
119 homedir = "/tmp"
120 sshdir = "{}/.ssh".format(homedir)
121 if not os.path.exists(sshdir):
122 os.mkdir(sshdir)
123
124 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
125 self.public_key_path = "{}.pub".format(self.private_key_path)
126
127 # If we don't have a key generated, then we have to generate it using ssh-keygen
128 if not os.path.exists(self.private_key_path):
129 cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
130 "rsa", "4096", self.private_key_path
131 )
132 # run command with arguments
133 subprocess.check_output(shlex.split(cmd))
134
135 # Read the public key. Only one public key (one line) in the file
136 with open(self.public_key_path, "r") as file:
137 public_key = file.readline()
138
139 return public_key
140
141 @abc.abstractmethod
142 async def create_execution_environment(
143 self,
144 namespace: str,
145 db_dict: dict,
146 reuse_ee_id: str = None,
147 progress_timeout: float = None,
148 total_timeout: float = None,
149 ) -> (str, dict):
150 """Create an Execution Environment. Returns when it is created or raises an
151 exception on failing
152
153 :param str namespace: Contains a dot separate string.
154 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
155 :param dict db_dict: where to write to database when the status changes.
156 It contains a dictionary with {collection: str, filter: {}, path: str},
157 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
158 "_admin.deployed.VCA.3"}
159 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
160 older environment
161 :param float progress_timeout:
162 :param float total_timeout:
163 :returns str, dict: id of the new execution environment and credentials for it
164 (credentials can contains hostname, username, etc depending on
165 underlying cloud)
166 """
167
168 @abc.abstractmethod
169 async def register_execution_environment(
170 self,
171 namespace: str,
172 credentials: dict,
173 db_dict: dict,
174 progress_timeout: float = None,
175 total_timeout: float = None,
176 ) -> str:
177 """
178 Register an existing execution environment at the VCA
179
180 :param str namespace: same as create_execution_environment method
181 :param dict credentials: credentials to access the existing execution
182 environment
183 (it can contains hostname, username, path to private key, etc depending on
184 underlying cloud)
185 :param dict db_dict: where to write to database when the status changes.
186 It contains a dictionary with {collection: str, filter: {}, path: str},
187 e.g. {collection: "nsrs", filter:
188 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
189 :param float progress_timeout:
190 :param float total_timeout:
191 :returns str: id of the execution environment
192 """
193
194 @abc.abstractmethod
195 async def install_configuration_sw(
196 self,
197 ee_id: str,
198 artifact_path: str,
199 db_dict: dict,
200 progress_timeout: float = None,
201 total_timeout: float = None,
202 ):
203 """
204 Install the software inside the execution environment identified by ee_id
205
206 :param str ee_id: the id of the execution environment returned by
207 create_execution_environment or register_execution_environment
208 :param str artifact_path: where to locate the artifacts (parent folder) using
209 the self.fs
210 the final artifact path will be a combination of this artifact_path and
211 additional string from the config_dict (e.g. charm name)
212 :param dict db_dict: where to write into database when the status changes.
213 It contains a dict with
214 {collection: <str>, filter: {}, path: <str>},
215 e.g. {collection: "nsrs", filter:
216 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
217 :param float progress_timeout:
218 :param float total_timeout:
219 """
220
221 @abc.abstractmethod
222 async def install_k8s_proxy_charm(
223 self,
224 charm_name: str,
225 namespace: str,
226 artifact_path: str,
227 db_dict: dict,
228 progress_timeout: float = None,
229 total_timeout: float = None,
230 config: dict = None,
231 ) -> str:
232 """
233 Install a k8s proxy charm
234
235 :param charm_name: Name of the charm being deployed
236 :param namespace: collection of all the uuids related to the charm.
237 :param str artifact_path: where to locate the artifacts (parent folder) using
238 the self.fs
239 the final artifact path will be a combination of this artifact_path and
240 additional string from the config_dict (e.g. charm name)
241 :param dict db_dict: where to write into database when the status changes.
242 It contains a dict with
243 {collection: <str>, filter: {}, path: <str>},
244 e.g. {collection: "nsrs", filter:
245 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
246 :param float progress_timeout:
247 :param float total_timeout:
248 :param config: Dictionary with additional configuration
249
250 :returns ee_id: execution environment id.
251 """
252
253 @abc.abstractmethod
254 async def get_ee_ssh_public__key(
255 self,
256 ee_id: str,
257 db_dict: dict,
258 progress_timeout: float = None,
259 total_timeout: float = None,
260 ) -> str:
261 """
262 Generate a priv/pub key pair in the execution environment and return the public
263 key
264
265 :param str ee_id: the id of the execution environment returned by
266 create_execution_environment or register_execution_environment
267 :param dict db_dict: where to write into database when the status changes.
268 It contains a dict with
269 {collection: <str>, filter: {}, path: <str>},
270 e.g. {collection: "nsrs", filter:
271 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
272 :param float progress_timeout:
273 :param float total_timeout:
274 :returns: public key of the execution environment
275 For the case of juju proxy charm ssh-layered, it is the one
276 returned by 'get-ssh-public-key' primitive.
277 It raises a N2VC exception if fails
278 """
279
280 @abc.abstractmethod
281 async def add_relation(
282 self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
283 ):
284 """
285 Add a relation between two Execution Environments (using their associated
286 endpoints).
287
288 :param str ee_id_1: The id of the first execution environment
289 :param str ee_id_2: The id of the second execution environment
290 :param str endpoint_1: The endpoint in the first execution environment
291 :param str endpoint_2: The endpoint in the second execution environment
292 """
293
294 # TODO
295 @abc.abstractmethod
296 async def remove_relation(self):
297 """ """
298
299 # TODO
300 @abc.abstractmethod
301 async def deregister_execution_environments(self):
302 """ """
303
304 @abc.abstractmethod
305 async def delete_namespace(
306 self, namespace: str, db_dict: dict = None, total_timeout: float = None
307 ):
308 """
309 Remove a network scenario and its execution environments
310 :param namespace: [<nsi-id>].<ns-id>
311 :param dict db_dict: where to write into database when the status changes.
312 It contains a dict with
313 {collection: <str>, filter: {}, path: <str>},
314 e.g. {collection: "nsrs", filter:
315 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
316 :param float total_timeout:
317 """
318
319 @abc.abstractmethod
320 async def delete_execution_environment(
321 self, ee_id: str, db_dict: dict = None, total_timeout: float = None
322 ):
323 """
324 Delete an execution environment
325 :param str ee_id: id of the execution environment to delete
326 :param dict db_dict: where to write into database when the status changes.
327 It contains a dict with
328 {collection: <str>, filter: {}, path: <str>},
329 e.g. {collection: "nsrs", filter:
330 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
331 :param float total_timeout:
332 """
333
334 @abc.abstractmethod
335 async def exec_primitive(
336 self,
337 ee_id: str,
338 primitive_name: str,
339 params_dict: dict,
340 db_dict: dict = None,
341 progress_timeout: float = None,
342 total_timeout: float = None,
343 ) -> str:
344 """
345 Execute a primitive in the execution environment
346
347 :param str ee_id: the one returned by create_execution_environment or
348 register_execution_environment
349 :param str primitive_name: must be one defined in the software. There is one
350 called 'config', where, for the proxy case, the 'credentials' of VM are
351 provided
352 :param dict params_dict: parameters of the action
353 :param dict db_dict: where to write into database when the status changes.
354 It contains a dict with
355 {collection: <str>, filter: {}, path: <str>},
356 e.g. {collection: "nsrs", filter:
357 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
358 :param float progress_timeout:
359 :param float total_timeout:
360 :returns str: primitive result, if ok. It raises exceptions in case of fail
361 """
362
363 async def disconnect(self):
364 """
365 Disconnect from VCA
366 """
367
368 """
369 ####################################################################################
370 ################################### P R I V A T E ##################################
371 ####################################################################################
372 """
373
374 def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
375 """
376 Split namespace components
377
378 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
379 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
380 """
381
382 # check parameters
383 if namespace is None or len(namespace) == 0:
384 raise N2VCBadArgumentsException(
385 "Argument namespace is mandatory", ["namespace"]
386 )
387
388 # split namespace components
389 parts = namespace.split(".")
390 nsi_id = None
391 ns_id = None
392 vnf_id = None
393 vdu_id = None
394 vdu_count = None
395 if len(parts) > 0 and len(parts[0]) > 0:
396 nsi_id = parts[0]
397 if len(parts) > 1 and len(parts[1]) > 0:
398 ns_id = parts[1]
399 if len(parts) > 2 and len(parts[2]) > 0:
400 vnf_id = parts[2]
401 if len(parts) > 3 and len(parts[3]) > 0:
402 vdu_id = parts[3]
403 vdu_parts = parts[3].split("-")
404 if len(vdu_parts) > 1:
405 vdu_id = vdu_parts[0]
406 vdu_count = vdu_parts[1]
407
408 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
409
410 async def write_app_status_to_db(
411 self,
412 db_dict: dict,
413 status: N2VCDeploymentStatus,
414 detailed_status: str,
415 vca_status: str,
416 entity_type: str,
417 vca_id: str = None,
418 ):
419 """
420 Write application status to database
421
422 :param: db_dict: DB dictionary
423 :param: status: Status of the application
424 :param: detailed_status: Detailed status
425 :param: vca_status: VCA status
426 :param: entity_type: Entity type ("application", "machine, and "action")
427 :param: vca_id: Id of the VCA. If None, the default VCA will be used.
428 """
429 if not db_dict:
430 self.log.debug("No db_dict => No database write")
431 return
432
433 # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
434 # .format(str(status.value), detailed_status, vca_status, entity_type))
435
436 try:
437
438 the_table = db_dict["collection"]
439 the_filter = db_dict["filter"]
440 the_path = db_dict["path"]
441 if not the_path[-1] == ".":
442 the_path = the_path + "."
443 update_dict = {
444 the_path + "status": str(status.value),
445 the_path + "detailed-status": detailed_status,
446 the_path + "VCA-status": vca_status,
447 the_path + "entity-type": entity_type,
448 the_path + "status-time": str(time.time()),
449 }
450
451 self.db.set_one(
452 table=the_table,
453 q_filter=the_filter,
454 update_dict=update_dict,
455 fail_on_empty=True,
456 )
457
458 # database callback
459 if self.on_update_db:
460 if asyncio.iscoroutinefunction(self.on_update_db):
461 await self.on_update_db(
462 the_table, the_filter, the_path, update_dict, vca_id=vca_id
463 )
464 else:
465 self.on_update_db(
466 the_table, the_filter, the_path, update_dict, vca_id=vca_id
467 )
468
469 except DbException as e:
470 if e.http_code == HTTPStatus.NOT_FOUND:
471 self.log.error(
472 "NOT_FOUND error: Exception writing status to database: {}".format(
473 e
474 )
475 )
476 else:
477 self.log.info("Exception writing status to database: {}".format(e))
478
479 def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
480 if status not in JujuStatusToOSM[entity_type]:
481 self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
482 return N2VCDeploymentStatus.UNKNOWN
483 return JujuStatusToOSM[entity_type][status]
484
485
486 def obj_to_yaml(obj: object) -> str:
487 # dump to yaml
488 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
489 # split lines
490 lines = dump_text.splitlines()
491 # remove !!python/object tags
492 yaml_text = ""
493 for line in lines:
494 index = line.find("!!python/object")
495 if index >= 0:
496 line = line[:index]
497 yaml_text += line + "\n"
498 return yaml_text
499
500
501 def obj_to_dict(obj: object) -> dict:
502 # convert obj to yaml
503 yaml_text = obj_to_yaml(obj)
504 # parse to dict
505 return yaml.load(yaml_text, Loader=yaml.Loader)