Fix bug 2088 by quoting inputs for commands
[osm/N2VC.git] / n2vc / n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 import abc
25 import asyncio
26 from http import HTTPStatus
27 from shlex import quote
28 import os
29 import shlex
30 import subprocess
31 import time
32
33 from n2vc.exceptions import N2VCBadArgumentsException
34 from osm_common.dbmongo import DbException
35 import yaml
36
37 from n2vc.loggable import Loggable
38 from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
39
40
41 class N2VCConnector(abc.ABC, Loggable):
42 """Generic N2VC connector
43
44 Abstract class
45 """
46
47 """
48 ####################################################################################
49 ################################### P U B L I C ####################################
50 ####################################################################################
51 """
52
53 def __init__(
54 self,
55 db: object,
56 fs: object,
57 log: object,
58 loop: object,
59 on_update_db=None,
60 **kwargs,
61 ):
62 """Initialize N2VC abstract connector. It defines de API for VCA connectors
63
64 :param object db: Mongo object managing the MongoDB (repo common DbBase)
65 :param object fs: FileSystem object managing the package artifacts (repo common
66 FsBase)
67 :param object log: the logging object to log to
68 :param object loop: the loop to use for asyncio (default current thread loop)
69 :param on_update_db: callback called when n2vc connector updates database.
70 Received arguments:
71 table: e.g. "nsrs"
72 filter: e.g. {_id: <nsd-id> }
73 path: e.g. "_admin.deployed.VCA.3."
74 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
75 """
76
77 # parent class
78 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
79
80 # check arguments
81 if db is None:
82 raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
83 if fs is None:
84 raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
85
86 # store arguments into self
87 self.db = db
88 self.fs = fs
89 self.loop = loop or asyncio.get_event_loop()
90 self.on_update_db = on_update_db
91
92 # generate private/public key-pair
93 self.private_key_path = None
94 self.public_key_path = None
95
96 @abc.abstractmethod
97 async def get_status(self, namespace: str, yaml_format: bool = True):
98 """Get namespace status
99
100 :param namespace: we obtain ns from namespace
101 :param yaml_format: returns a yaml string
102 """
103
104 # TODO: review which public key
105 def get_public_key(self) -> str:
106 """Get the VCA ssh-public-key
107
108 Returns the SSH public key from local mahine, to be injected into virtual
109 machines to be managed by the VCA.
110 First run, a ssh keypair will be created.
111 The public key is injected into a VM so that we can provision the
112 machine with Juju, after which Juju will communicate with the VM
113 directly via the juju agent.
114 """
115
116 # Find the path where we expect our key lives (~/.ssh)
117 homedir = os.environ.get("HOME")
118 if not homedir:
119 self.log.warning("No HOME environment variable, using /tmp")
120 homedir = "/tmp"
121 sshdir = "{}/.ssh".format(homedir)
122 if not os.path.exists(sshdir):
123 os.mkdir(sshdir)
124
125 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
126 self.public_key_path = "{}.pub".format(self.private_key_path)
127
128 # If we don't have a key generated, then we have to generate it using ssh-keygen
129 if not os.path.exists(self.private_key_path):
130 cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
131 "rsa", "4096", quote(self.private_key_path)
132 )
133 # run command with arguments
134 subprocess.check_output(shlex.split(cmd))
135
136 # Read the public key. Only one public key (one line) in the file
137 with open(self.public_key_path, "r") as file:
138 public_key = file.readline()
139
140 return public_key
141
142 @abc.abstractmethod
143 async def create_execution_environment(
144 self,
145 namespace: str,
146 db_dict: dict,
147 reuse_ee_id: str = None,
148 progress_timeout: float = None,
149 total_timeout: float = None,
150 ) -> (str, dict):
151 """Create an Execution Environment. Returns when it is created or raises an
152 exception on failing
153
154 :param str namespace: Contains a dot separate string.
155 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
156 :param dict db_dict: where to write to database when the status changes.
157 It contains a dictionary with {collection: str, filter: {}, path: str},
158 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
159 "_admin.deployed.VCA.3"}
160 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
161 older environment
162 :param float progress_timeout:
163 :param float total_timeout:
164 :returns str, dict: id of the new execution environment and credentials for it
165 (credentials can contains hostname, username, etc depending on
166 underlying cloud)
167 """
168
169 @abc.abstractmethod
170 async def register_execution_environment(
171 self,
172 namespace: str,
173 credentials: dict,
174 db_dict: dict,
175 progress_timeout: float = None,
176 total_timeout: float = None,
177 ) -> str:
178 """
179 Register an existing execution environment at the VCA
180
181 :param str namespace: same as create_execution_environment method
182 :param dict credentials: credentials to access the existing execution
183 environment
184 (it can contains hostname, username, path to private key, etc depending on
185 underlying cloud)
186 :param dict db_dict: where to write to database when the status changes.
187 It contains a dictionary with {collection: str, filter: {}, path: str},
188 e.g. {collection: "nsrs", filter:
189 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
190 :param float progress_timeout:
191 :param float total_timeout:
192 :returns str: id of the execution environment
193 """
194
195 @abc.abstractmethod
196 async def install_configuration_sw(
197 self,
198 ee_id: str,
199 artifact_path: str,
200 db_dict: dict,
201 progress_timeout: float = None,
202 total_timeout: float = None,
203 ):
204 """
205 Install the software inside the execution environment identified by ee_id
206
207 :param str ee_id: the id of the execution environment returned by
208 create_execution_environment or register_execution_environment
209 :param str artifact_path: where to locate the artifacts (parent folder) using
210 the self.fs
211 the final artifact path will be a combination of this artifact_path and
212 additional string from the config_dict (e.g. charm name)
213 :param dict db_dict: where to write into database when the status changes.
214 It contains a dict with
215 {collection: <str>, filter: {}, path: <str>},
216 e.g. {collection: "nsrs", filter:
217 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
218 :param float progress_timeout:
219 :param float total_timeout:
220 """
221
222 @abc.abstractmethod
223 async def install_k8s_proxy_charm(
224 self,
225 charm_name: str,
226 namespace: str,
227 artifact_path: str,
228 db_dict: dict,
229 progress_timeout: float = None,
230 total_timeout: float = None,
231 config: dict = None,
232 ) -> str:
233 """
234 Install a k8s proxy charm
235
236 :param charm_name: Name of the charm being deployed
237 :param namespace: collection of all the uuids related to the charm.
238 :param str artifact_path: where to locate the artifacts (parent folder) using
239 the self.fs
240 the final artifact path will be a combination of this artifact_path and
241 additional string from the config_dict (e.g. charm name)
242 :param dict db_dict: where to write into database when the status changes.
243 It contains a dict with
244 {collection: <str>, filter: {}, path: <str>},
245 e.g. {collection: "nsrs", filter:
246 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
247 :param float progress_timeout:
248 :param float total_timeout:
249 :param config: Dictionary with additional configuration
250
251 :returns ee_id: execution environment id.
252 """
253
254 @abc.abstractmethod
255 async def get_ee_ssh_public__key(
256 self,
257 ee_id: str,
258 db_dict: dict,
259 progress_timeout: float = None,
260 total_timeout: float = None,
261 ) -> str:
262 """
263 Generate a priv/pub key pair in the execution environment and return the public
264 key
265
266 :param str ee_id: the id of the execution environment returned by
267 create_execution_environment or register_execution_environment
268 :param dict db_dict: where to write into database when the status changes.
269 It contains a dict with
270 {collection: <str>, filter: {}, path: <str>},
271 e.g. {collection: "nsrs", filter:
272 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
273 :param float progress_timeout:
274 :param float total_timeout:
275 :returns: public key of the execution environment
276 For the case of juju proxy charm ssh-layered, it is the one
277 returned by 'get-ssh-public-key' primitive.
278 It raises a N2VC exception if fails
279 """
280
281 @abc.abstractmethod
282 async def add_relation(
283 self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
284 ):
285 """
286 Add a relation between two Execution Environments (using their associated
287 endpoints).
288
289 :param str ee_id_1: The id of the first execution environment
290 :param str ee_id_2: The id of the second execution environment
291 :param str endpoint_1: The endpoint in the first execution environment
292 :param str endpoint_2: The endpoint in the second execution environment
293 """
294
295 # TODO
296 @abc.abstractmethod
297 async def remove_relation(self):
298 """ """
299
300 # TODO
301 @abc.abstractmethod
302 async def deregister_execution_environments(self):
303 """ """
304
305 @abc.abstractmethod
306 async def delete_namespace(
307 self, namespace: str, db_dict: dict = None, total_timeout: float = None
308 ):
309 """
310 Remove a network scenario and its execution environments
311 :param namespace: [<nsi-id>].<ns-id>
312 :param dict db_dict: where to write into database when the status changes.
313 It contains a dict with
314 {collection: <str>, filter: {}, path: <str>},
315 e.g. {collection: "nsrs", filter:
316 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
317 :param float total_timeout:
318 """
319
320 @abc.abstractmethod
321 async def delete_execution_environment(
322 self, ee_id: str, db_dict: dict = None, total_timeout: float = None
323 ):
324 """
325 Delete an execution environment
326 :param str ee_id: id of the execution environment to delete
327 :param dict db_dict: where to write into database when the status changes.
328 It contains a dict with
329 {collection: <str>, filter: {}, path: <str>},
330 e.g. {collection: "nsrs", filter:
331 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
332 :param float total_timeout:
333 """
334
335 @abc.abstractmethod
336 async def upgrade_charm(
337 self,
338 ee_id: str = None,
339 path: str = None,
340 charm_id: str = None,
341 charm_type: str = None,
342 timeout: float = None,
343 ) -> str:
344 """This method upgrade charms in VNFs
345
346 Args:
347 ee_id: Execution environment id
348 path: Local path to the charm
349 charm_id: charm-id
350 charm_type: Charm type can be lxc-proxy-charm, native-charm or k8s-proxy-charm
351 timeout: (Float) Timeout for the ns update operation
352
353 Returns:
354 The output of the update operation if status equals to "completed"
355 """
356
357 @abc.abstractmethod
358 async def exec_primitive(
359 self,
360 ee_id: str,
361 primitive_name: str,
362 params_dict: dict,
363 db_dict: dict = None,
364 progress_timeout: float = None,
365 total_timeout: float = None,
366 ) -> str:
367 """
368 Execute a primitive in the execution environment
369
370 :param str ee_id: the one returned by create_execution_environment or
371 register_execution_environment
372 :param str primitive_name: must be one defined in the software. There is one
373 called 'config', where, for the proxy case, the 'credentials' of VM are
374 provided
375 :param dict params_dict: parameters of the action
376 :param dict db_dict: where to write into database when the status changes.
377 It contains a dict with
378 {collection: <str>, filter: {}, path: <str>},
379 e.g. {collection: "nsrs", filter:
380 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
381 :param float progress_timeout:
382 :param float total_timeout:
383 :returns str: primitive result, if ok. It raises exceptions in case of fail
384 """
385
386 async def disconnect(self):
387 """
388 Disconnect from VCA
389 """
390
391 """
392 ####################################################################################
393 ################################### P R I V A T E ##################################
394 ####################################################################################
395 """
396
397 def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
398 """
399 Split namespace components
400
401 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
402 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
403 """
404
405 # check parameters
406 if namespace is None or len(namespace) == 0:
407 raise N2VCBadArgumentsException(
408 "Argument namespace is mandatory", ["namespace"]
409 )
410
411 # split namespace components
412 parts = namespace.split(".")
413 nsi_id = None
414 ns_id = None
415 vnf_id = None
416 vdu_id = None
417 vdu_count = None
418 if len(parts) > 0 and len(parts[0]) > 0:
419 nsi_id = parts[0]
420 if len(parts) > 1 and len(parts[1]) > 0:
421 ns_id = parts[1]
422 if len(parts) > 2 and len(parts[2]) > 0:
423 vnf_id = parts[2]
424 if len(parts) > 3 and len(parts[3]) > 0:
425 vdu_id = parts[3]
426 vdu_parts = parts[3].split("-")
427 if len(vdu_parts) > 1:
428 vdu_id = vdu_parts[0]
429 vdu_count = vdu_parts[1]
430
431 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
432
433 async def write_app_status_to_db(
434 self,
435 db_dict: dict,
436 status: N2VCDeploymentStatus,
437 detailed_status: str,
438 vca_status: str,
439 entity_type: str,
440 vca_id: str = None,
441 ):
442 """
443 Write application status to database
444
445 :param: db_dict: DB dictionary
446 :param: status: Status of the application
447 :param: detailed_status: Detailed status
448 :param: vca_status: VCA status
449 :param: entity_type: Entity type ("application", "machine, and "action")
450 :param: vca_id: Id of the VCA. If None, the default VCA will be used.
451 """
452 if not db_dict:
453 self.log.debug("No db_dict => No database write")
454 return
455
456 # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
457 # .format(str(status.value), detailed_status, vca_status, entity_type))
458
459 try:
460 the_table = db_dict["collection"]
461 the_filter = db_dict["filter"]
462 the_path = db_dict["path"]
463 if not the_path[-1] == ".":
464 the_path = the_path + "."
465 update_dict = {
466 the_path + "status": str(status.value),
467 the_path + "detailed-status": detailed_status,
468 the_path + "VCA-status": vca_status,
469 the_path + "entity-type": entity_type,
470 the_path + "status-time": str(time.time()),
471 }
472
473 self.db.set_one(
474 table=the_table,
475 q_filter=the_filter,
476 update_dict=update_dict,
477 fail_on_empty=True,
478 )
479
480 # database callback
481 if self.on_update_db:
482 if asyncio.iscoroutinefunction(self.on_update_db):
483 await self.on_update_db(
484 the_table, the_filter, the_path, update_dict, vca_id=vca_id
485 )
486 else:
487 self.on_update_db(
488 the_table, the_filter, the_path, update_dict, vca_id=vca_id
489 )
490
491 except DbException as e:
492 if e.http_code == HTTPStatus.NOT_FOUND:
493 self.log.error(
494 "NOT_FOUND error: Exception writing status to database: {}".format(
495 e
496 )
497 )
498 else:
499 self.log.info("Exception writing status to database: {}".format(e))
500
501 def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
502 if status not in JujuStatusToOSM[entity_type]:
503 self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
504 return N2VCDeploymentStatus.UNKNOWN
505 return JujuStatusToOSM[entity_type][status]
506
507
508 def obj_to_yaml(obj: object) -> str:
509 # dump to yaml
510 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
511 # split lines
512 lines = dump_text.splitlines()
513 # remove !!python/object tags
514 yaml_text = ""
515 for line in lines:
516 index = line.find("!!python/object")
517 if index >= 0:
518 line = line[:index]
519 yaml_text += line + "\n"
520 return yaml_text
521
522
523 def obj_to_dict(obj: object) -> dict:
524 # convert obj to yaml
525 yaml_text = obj_to_yaml(obj)
526 # parse to dict
527 return yaml.load(yaml_text, Loader=yaml.Loader)