ad0dc286021ddc5f75d3d70763b0160666726fc3
[osm/N2VC.git] / n2vc / n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 import abc
25 import asyncio
26 from http import HTTPStatus
27 import os
28 import shlex
29 import subprocess
30 import time
31
32 from n2vc.exceptions import N2VCBadArgumentsException
33 from osm_common.dbmongo import DbException
34 import yaml
35
36 from n2vc.loggable import Loggable
37 from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
38
39
40 class N2VCConnector(abc.ABC, Loggable):
41 """Generic N2VC connector
42
43 Abstract class
44 """
45
46 """
47 ####################################################################################
48 ################################### P U B L I C ####################################
49 ####################################################################################
50 """
51
52 def __init__(
53 self,
54 db: object,
55 fs: object,
56 log: object,
57 loop: object,
58 url: str,
59 username: str,
60 vca_config: dict,
61 on_update_db=None,
62 ):
63 """Initialize N2VC abstract connector. It defines de API for VCA connectors
64
65 :param object db: Mongo object managing the MongoDB (repo common DbBase)
66 :param object fs: FileSystem object managing the package artifacts (repo common
67 FsBase)
68 :param object log: the logging object to log to
69 :param object loop: the loop to use for asyncio (default current thread loop)
70 :param str url: a string that how to connect to the VCA (if needed, IP and port
71 can be obtained from there)
72 :param str username: the username to authenticate with VCA
73 :param dict vca_config: Additional parameters for the specific VCA. For example,
74 for juju it will contain:
75 secret: The password to authenticate with
76 public_key: The contents of the juju public SSH key
77 ca_cert str: The CA certificate used to authenticate
78 :param on_update_db: callback called when n2vc connector updates database.
79 Received arguments:
80 table: e.g. "nsrs"
81 filter: e.g. {_id: <nsd-id> }
82 path: e.g. "_admin.deployed.VCA.3."
83 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
84 """
85
86 # parent class
87 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
88
89 # check arguments
90 if db is None:
91 raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
92 if fs is None:
93 raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
94
95 self.log.info(
96 "url={}, username={}, vca_config={}".format(
97 url,
98 username,
99 {
100 k: v
101 for k, v in vca_config.items()
102 if k
103 not in ("host", "port", "user", "secret", "public_key", "ca_cert")
104 },
105 )
106 )
107
108 # store arguments into self
109 self.db = db
110 self.fs = fs
111 self.loop = loop or asyncio.get_event_loop()
112 self.url = url
113 self.username = username
114 self.vca_config = vca_config
115 self.on_update_db = on_update_db
116
117 # generate private/public key-pair
118 self.private_key_path = None
119 self.public_key_path = None
120
121 @abc.abstractmethod
122 async def get_status(self, namespace: str, yaml_format: bool = True):
123 """Get namespace status
124
125 :param namespace: we obtain ns from namespace
126 :param yaml_format: returns a yaml string
127 """
128
129 # TODO: review which public key
130 def get_public_key(self) -> str:
131 """Get the VCA ssh-public-key
132
133 Returns the SSH public key from local mahine, to be injected into virtual
134 machines to be managed by the VCA.
135 First run, a ssh keypair will be created.
136 The public key is injected into a VM so that we can provision the
137 machine with Juju, after which Juju will communicate with the VM
138 directly via the juju agent.
139 """
140
141 # Find the path where we expect our key lives (~/.ssh)
142 homedir = os.environ.get("HOME")
143 if not homedir:
144 self.log.warning("No HOME environment variable, using /tmp")
145 homedir = "/tmp"
146 sshdir = "{}/.ssh".format(homedir)
147 if not os.path.exists(sshdir):
148 os.mkdir(sshdir)
149
150 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
151 self.public_key_path = "{}.pub".format(self.private_key_path)
152
153 # If we don't have a key generated, then we have to generate it using ssh-keygen
154 if not os.path.exists(self.private_key_path):
155 cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
156 "rsa", "4096", self.private_key_path
157 )
158 # run command with arguments
159 subprocess.check_output(shlex.split(cmd))
160
161 # Read the public key. Only one public key (one line) in the file
162 with open(self.public_key_path, "r") as file:
163 public_key = file.readline()
164
165 return public_key
166
167 @abc.abstractmethod
168 async def create_execution_environment(
169 self,
170 namespace: str,
171 db_dict: dict,
172 reuse_ee_id: str = None,
173 progress_timeout: float = None,
174 total_timeout: float = None,
175 ) -> (str, dict):
176 """Create an Execution Environment. Returns when it is created or raises an
177 exception on failing
178
179 :param str namespace: Contains a dot separate string.
180 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
181 :param dict db_dict: where to write to database when the status changes.
182 It contains a dictionary with {collection: str, filter: {}, path: str},
183 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
184 "_admin.deployed.VCA.3"}
185 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
186 older environment
187 :param float progress_timeout:
188 :param float total_timeout:
189 :returns str, dict: id of the new execution environment and credentials for it
190 (credentials can contains hostname, username, etc depending on
191 underlying cloud)
192 """
193
194 @abc.abstractmethod
195 async def register_execution_environment(
196 self,
197 namespace: str,
198 credentials: dict,
199 db_dict: dict,
200 progress_timeout: float = None,
201 total_timeout: float = None,
202 ) -> str:
203 """
204 Register an existing execution environment at the VCA
205
206 :param str namespace: same as create_execution_environment method
207 :param dict credentials: credentials to access the existing execution
208 environment
209 (it can contains hostname, username, path to private key, etc depending on
210 underlying cloud)
211 :param dict db_dict: where to write to database when the status changes.
212 It contains a dictionary with {collection: str, filter: {}, path: str},
213 e.g. {collection: "nsrs", filter:
214 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
215 :param float progress_timeout:
216 :param float total_timeout:
217 :returns str: id of the execution environment
218 """
219
220 @abc.abstractmethod
221 async def install_configuration_sw(
222 self,
223 ee_id: str,
224 artifact_path: str,
225 db_dict: dict,
226 progress_timeout: float = None,
227 total_timeout: float = None,
228 ):
229 """
230 Install the software inside the execution environment identified by ee_id
231
232 :param str ee_id: the id of the execution environment returned by
233 create_execution_environment or register_execution_environment
234 :param str artifact_path: where to locate the artifacts (parent folder) using
235 the self.fs
236 the final artifact path will be a combination of this artifact_path and
237 additional string from the config_dict (e.g. charm name)
238 :param dict db_dict: where to write into database when the status changes.
239 It contains a dict with
240 {collection: <str>, filter: {}, path: <str>},
241 e.g. {collection: "nsrs", filter:
242 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
243 :param float progress_timeout:
244 :param float total_timeout:
245 """
246
247 @abc.abstractmethod
248 async def install_k8s_proxy_charm(
249 self,
250 charm_name: str,
251 namespace: str,
252 artifact_path: str,
253 db_dict: dict,
254 progress_timeout: float = None,
255 total_timeout: float = None,
256 config: dict = None,
257 ) -> str:
258 """
259 Install a k8s proxy charm
260
261 :param charm_name: Name of the charm being deployed
262 :param namespace: collection of all the uuids related to the charm.
263 :param str artifact_path: where to locate the artifacts (parent folder) using
264 the self.fs
265 the final artifact path will be a combination of this artifact_path and
266 additional string from the config_dict (e.g. charm name)
267 :param dict db_dict: where to write into database when the status changes.
268 It contains a dict with
269 {collection: <str>, filter: {}, path: <str>},
270 e.g. {collection: "nsrs", filter:
271 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
272 :param float progress_timeout:
273 :param float total_timeout:
274 :param config: Dictionary with additional configuration
275
276 :returns ee_id: execution environment id.
277 """
278
279 @abc.abstractmethod
280 async def get_ee_ssh_public__key(
281 self,
282 ee_id: str,
283 db_dict: dict,
284 progress_timeout: float = None,
285 total_timeout: float = None,
286 ) -> str:
287 """
288 Generate a priv/pub key pair in the execution environment and return the public
289 key
290
291 :param str ee_id: the id of the execution environment returned by
292 create_execution_environment or register_execution_environment
293 :param dict db_dict: where to write into database when the status changes.
294 It contains a dict with
295 {collection: <str>, filter: {}, path: <str>},
296 e.g. {collection: "nsrs", filter:
297 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
298 :param float progress_timeout:
299 :param float total_timeout:
300 :returns: public key of the execution environment
301 For the case of juju proxy charm ssh-layered, it is the one
302 returned by 'get-ssh-public-key' primitive.
303 It raises a N2VC exception if fails
304 """
305
306 @abc.abstractmethod
307 async def add_relation(
308 self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
309 ):
310 """
311 Add a relation between two Execution Environments (using their associated
312 endpoints).
313
314 :param str ee_id_1: The id of the first execution environment
315 :param str ee_id_2: The id of the second execution environment
316 :param str endpoint_1: The endpoint in the first execution environment
317 :param str endpoint_2: The endpoint in the second execution environment
318 """
319
320 # TODO
321 @abc.abstractmethod
322 async def remove_relation(self):
323 """
324 """
325
326 # TODO
327 @abc.abstractmethod
328 async def deregister_execution_environments(self):
329 """
330 """
331
332 @abc.abstractmethod
333 async def delete_namespace(
334 self, namespace: str, db_dict: dict = None, total_timeout: float = None
335 ):
336 """
337 Remove a network scenario and its execution environments
338 :param namespace: [<nsi-id>].<ns-id>
339 :param dict db_dict: where to write into database when the status changes.
340 It contains a dict with
341 {collection: <str>, filter: {}, path: <str>},
342 e.g. {collection: "nsrs", filter:
343 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
344 :param float total_timeout:
345 """
346
347 @abc.abstractmethod
348 async def delete_execution_environment(
349 self, ee_id: str, db_dict: dict = None, total_timeout: float = None
350 ):
351 """
352 Delete an execution environment
353 :param str ee_id: id of the execution environment to delete
354 :param dict db_dict: where to write into database when the status changes.
355 It contains a dict with
356 {collection: <str>, filter: {}, path: <str>},
357 e.g. {collection: "nsrs", filter:
358 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
359 :param float total_timeout:
360 """
361
362 @abc.abstractmethod
363 async def exec_primitive(
364 self,
365 ee_id: str,
366 primitive_name: str,
367 params_dict: dict,
368 db_dict: dict = None,
369 progress_timeout: float = None,
370 total_timeout: float = None,
371 ) -> str:
372 """
373 Execute a primitive in the execution environment
374
375 :param str ee_id: the one returned by create_execution_environment or
376 register_execution_environment
377 :param str primitive_name: must be one defined in the software. There is one
378 called 'config', where, for the proxy case, the 'credentials' of VM are
379 provided
380 :param dict params_dict: parameters of the action
381 :param dict db_dict: where to write into database when the status changes.
382 It contains a dict with
383 {collection: <str>, filter: {}, path: <str>},
384 e.g. {collection: "nsrs", filter:
385 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
386 :param float progress_timeout:
387 :param float total_timeout:
388 :returns str: primitive result, if ok. It raises exceptions in case of fail
389 """
390
391 async def disconnect(self):
392 """
393 Disconnect from VCA
394 """
395
396 """
397 ####################################################################################
398 ################################### P R I V A T E ##################################
399 ####################################################################################
400 """
401
402 def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
403 """
404 Split namespace components
405
406 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
407 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
408 """
409
410 # check parameters
411 if namespace is None or len(namespace) == 0:
412 raise N2VCBadArgumentsException(
413 "Argument namespace is mandatory", ["namespace"]
414 )
415
416 # split namespace components
417 parts = namespace.split(".")
418 nsi_id = None
419 ns_id = None
420 vnf_id = None
421 vdu_id = None
422 vdu_count = None
423 if len(parts) > 0 and len(parts[0]) > 0:
424 nsi_id = parts[0]
425 if len(parts) > 1 and len(parts[1]) > 0:
426 ns_id = parts[1]
427 if len(parts) > 2 and len(parts[2]) > 0:
428 vnf_id = parts[2]
429 if len(parts) > 3 and len(parts[3]) > 0:
430 vdu_id = parts[3]
431 vdu_parts = parts[3].split("-")
432 if len(vdu_parts) > 1:
433 vdu_id = vdu_parts[0]
434 vdu_count = vdu_parts[1]
435
436 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
437
438 async def write_app_status_to_db(
439 self,
440 db_dict: dict,
441 status: N2VCDeploymentStatus,
442 detailed_status: str,
443 vca_status: str,
444 entity_type: str,
445 ):
446 if not db_dict:
447 self.log.debug("No db_dict => No database write")
448 return
449
450 # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
451 # .format(str(status.value), detailed_status, vca_status, entity_type))
452
453 try:
454
455 the_table = db_dict["collection"]
456 the_filter = db_dict["filter"]
457 the_path = db_dict["path"]
458 if not the_path[-1] == ".":
459 the_path = the_path + "."
460 update_dict = {
461 the_path + "status": str(status.value),
462 the_path + "detailed-status": detailed_status,
463 the_path + "VCA-status": vca_status,
464 the_path + "entity-type": entity_type,
465 the_path + "status-time": str(time.time()),
466 }
467
468 self.db.set_one(
469 table=the_table,
470 q_filter=the_filter,
471 update_dict=update_dict,
472 fail_on_empty=True,
473 )
474
475 # database callback
476 if self.on_update_db:
477 if asyncio.iscoroutinefunction(self.on_update_db):
478 await self.on_update_db(
479 the_table, the_filter, the_path, update_dict
480 )
481 else:
482 self.on_update_db(the_table, the_filter, the_path, update_dict)
483
484 except DbException as e:
485 if e.http_code == HTTPStatus.NOT_FOUND:
486 self.log.error(
487 "NOT_FOUND error: Exception writing status to database: {}".format(
488 e
489 )
490 )
491 else:
492 self.log.info("Exception writing status to database: {}".format(e))
493
494 def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
495 if status not in JujuStatusToOSM[entity_type]:
496 self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
497 return N2VCDeploymentStatus.UNKNOWN
498 return JujuStatusToOSM[entity_type][status]
499
500
501 def obj_to_yaml(obj: object) -> str:
502 # dump to yaml
503 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
504 # split lines
505 lines = dump_text.splitlines()
506 # remove !!python/object tags
507 yaml_text = ""
508 for line in lines:
509 index = line.find("!!python/object")
510 if index >= 0:
511 line = line[:index]
512 yaml_text += line + "\n"
513 return yaml_text
514
515
516 def obj_to_dict(obj: object) -> dict:
517 # convert obj to yaml
518 yaml_text = obj_to_yaml(obj)
519 # parse to dict
520 return yaml.load(yaml_text, Loader=yaml.Loader)