Feature 10239: Distributed VCA
[osm/N2VC.git] / n2vc / n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 import abc
25 import asyncio
26 from http import HTTPStatus
27 import os
28 import shlex
29 import subprocess
30 import time
31
32 from n2vc.exceptions import N2VCBadArgumentsException
33 from osm_common.dbmongo import DbException
34 import yaml
35
36 from n2vc.loggable import Loggable
37 from n2vc.utils import JujuStatusToOSM, N2VCDeploymentStatus
38
39
40 class N2VCConnector(abc.ABC, Loggable):
41 """Generic N2VC connector
42
43 Abstract class
44 """
45
46 """
47 ####################################################################################
48 ################################### P U B L I C ####################################
49 ####################################################################################
50 """
51
52 def __init__(
53 self,
54 db: object,
55 fs: object,
56 log: object,
57 loop: object,
58 on_update_db=None,
59 **kwargs,
60 ):
61 """Initialize N2VC abstract connector. It defines de API for VCA connectors
62
63 :param object db: Mongo object managing the MongoDB (repo common DbBase)
64 :param object fs: FileSystem object managing the package artifacts (repo common
65 FsBase)
66 :param object log: the logging object to log to
67 :param object loop: the loop to use for asyncio (default current thread loop)
68 :param on_update_db: callback called when n2vc connector updates database.
69 Received arguments:
70 table: e.g. "nsrs"
71 filter: e.g. {_id: <nsd-id> }
72 path: e.g. "_admin.deployed.VCA.3."
73 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
74 """
75
76 # parent class
77 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
78
79 # check arguments
80 if db is None:
81 raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
82 if fs is None:
83 raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
84
85 # store arguments into self
86 self.db = db
87 self.fs = fs
88 self.loop = loop or asyncio.get_event_loop()
89 self.on_update_db = on_update_db
90
91 # generate private/public key-pair
92 self.private_key_path = None
93 self.public_key_path = None
94
95 @abc.abstractmethod
96 async def get_status(self, namespace: str, yaml_format: bool = True):
97 """Get namespace status
98
99 :param namespace: we obtain ns from namespace
100 :param yaml_format: returns a yaml string
101 """
102
103 # TODO: review which public key
104 def get_public_key(self) -> str:
105 """Get the VCA ssh-public-key
106
107 Returns the SSH public key from local mahine, to be injected into virtual
108 machines to be managed by the VCA.
109 First run, a ssh keypair will be created.
110 The public key is injected into a VM so that we can provision the
111 machine with Juju, after which Juju will communicate with the VM
112 directly via the juju agent.
113 """
114
115 # Find the path where we expect our key lives (~/.ssh)
116 homedir = os.environ.get("HOME")
117 if not homedir:
118 self.log.warning("No HOME environment variable, using /tmp")
119 homedir = "/tmp"
120 sshdir = "{}/.ssh".format(homedir)
121 if not os.path.exists(sshdir):
122 os.mkdir(sshdir)
123
124 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
125 self.public_key_path = "{}.pub".format(self.private_key_path)
126
127 # If we don't have a key generated, then we have to generate it using ssh-keygen
128 if not os.path.exists(self.private_key_path):
129 cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
130 "rsa", "4096", self.private_key_path
131 )
132 # run command with arguments
133 subprocess.check_output(shlex.split(cmd))
134
135 # Read the public key. Only one public key (one line) in the file
136 with open(self.public_key_path, "r") as file:
137 public_key = file.readline()
138
139 return public_key
140
141 @abc.abstractmethod
142 async def create_execution_environment(
143 self,
144 namespace: str,
145 db_dict: dict,
146 reuse_ee_id: str = None,
147 progress_timeout: float = None,
148 total_timeout: float = None,
149 ) -> (str, dict):
150 """Create an Execution Environment. Returns when it is created or raises an
151 exception on failing
152
153 :param str namespace: Contains a dot separate string.
154 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
155 :param dict db_dict: where to write to database when the status changes.
156 It contains a dictionary with {collection: str, filter: {}, path: str},
157 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
158 "_admin.deployed.VCA.3"}
159 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
160 older environment
161 :param float progress_timeout:
162 :param float total_timeout:
163 :returns str, dict: id of the new execution environment and credentials for it
164 (credentials can contains hostname, username, etc depending on
165 underlying cloud)
166 """
167
168 @abc.abstractmethod
169 async def register_execution_environment(
170 self,
171 namespace: str,
172 credentials: dict,
173 db_dict: dict,
174 progress_timeout: float = None,
175 total_timeout: float = None,
176 ) -> str:
177 """
178 Register an existing execution environment at the VCA
179
180 :param str namespace: same as create_execution_environment method
181 :param dict credentials: credentials to access the existing execution
182 environment
183 (it can contains hostname, username, path to private key, etc depending on
184 underlying cloud)
185 :param dict db_dict: where to write to database when the status changes.
186 It contains a dictionary with {collection: str, filter: {}, path: str},
187 e.g. {collection: "nsrs", filter:
188 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
189 :param float progress_timeout:
190 :param float total_timeout:
191 :returns str: id of the execution environment
192 """
193
194 @abc.abstractmethod
195 async def install_configuration_sw(
196 self,
197 ee_id: str,
198 artifact_path: str,
199 db_dict: dict,
200 progress_timeout: float = None,
201 total_timeout: float = None,
202 ):
203 """
204 Install the software inside the execution environment identified by ee_id
205
206 :param str ee_id: the id of the execution environment returned by
207 create_execution_environment or register_execution_environment
208 :param str artifact_path: where to locate the artifacts (parent folder) using
209 the self.fs
210 the final artifact path will be a combination of this artifact_path and
211 additional string from the config_dict (e.g. charm name)
212 :param dict db_dict: where to write into database when the status changes.
213 It contains a dict with
214 {collection: <str>, filter: {}, path: <str>},
215 e.g. {collection: "nsrs", filter:
216 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
217 :param float progress_timeout:
218 :param float total_timeout:
219 """
220
221 @abc.abstractmethod
222 async def install_k8s_proxy_charm(
223 self,
224 charm_name: str,
225 namespace: str,
226 artifact_path: str,
227 db_dict: dict,
228 progress_timeout: float = None,
229 total_timeout: float = None,
230 config: dict = None,
231 ) -> str:
232 """
233 Install a k8s proxy charm
234
235 :param charm_name: Name of the charm being deployed
236 :param namespace: collection of all the uuids related to the charm.
237 :param str artifact_path: where to locate the artifacts (parent folder) using
238 the self.fs
239 the final artifact path will be a combination of this artifact_path and
240 additional string from the config_dict (e.g. charm name)
241 :param dict db_dict: where to write into database when the status changes.
242 It contains a dict with
243 {collection: <str>, filter: {}, path: <str>},
244 e.g. {collection: "nsrs", filter:
245 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
246 :param float progress_timeout:
247 :param float total_timeout:
248 :param config: Dictionary with additional configuration
249
250 :returns ee_id: execution environment id.
251 """
252
253 @abc.abstractmethod
254 async def get_ee_ssh_public__key(
255 self,
256 ee_id: str,
257 db_dict: dict,
258 progress_timeout: float = None,
259 total_timeout: float = None,
260 ) -> str:
261 """
262 Generate a priv/pub key pair in the execution environment and return the public
263 key
264
265 :param str ee_id: the id of the execution environment returned by
266 create_execution_environment or register_execution_environment
267 :param dict db_dict: where to write into database when the status changes.
268 It contains a dict with
269 {collection: <str>, filter: {}, path: <str>},
270 e.g. {collection: "nsrs", filter:
271 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
272 :param float progress_timeout:
273 :param float total_timeout:
274 :returns: public key of the execution environment
275 For the case of juju proxy charm ssh-layered, it is the one
276 returned by 'get-ssh-public-key' primitive.
277 It raises a N2VC exception if fails
278 """
279
280 @abc.abstractmethod
281 async def add_relation(
282 self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
283 ):
284 """
285 Add a relation between two Execution Environments (using their associated
286 endpoints).
287
288 :param str ee_id_1: The id of the first execution environment
289 :param str ee_id_2: The id of the second execution environment
290 :param str endpoint_1: The endpoint in the first execution environment
291 :param str endpoint_2: The endpoint in the second execution environment
292 """
293
294 # TODO
295 @abc.abstractmethod
296 async def remove_relation(self):
297 """
298 """
299
300 # TODO
301 @abc.abstractmethod
302 async def deregister_execution_environments(self):
303 """
304 """
305
306 @abc.abstractmethod
307 async def delete_namespace(
308 self, namespace: str, db_dict: dict = None, total_timeout: float = None
309 ):
310 """
311 Remove a network scenario and its execution environments
312 :param namespace: [<nsi-id>].<ns-id>
313 :param dict db_dict: where to write into database when the status changes.
314 It contains a dict with
315 {collection: <str>, filter: {}, path: <str>},
316 e.g. {collection: "nsrs", filter:
317 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
318 :param float total_timeout:
319 """
320
321 @abc.abstractmethod
322 async def delete_execution_environment(
323 self, ee_id: str, db_dict: dict = None, total_timeout: float = None
324 ):
325 """
326 Delete an execution environment
327 :param str ee_id: id of the execution environment to delete
328 :param dict db_dict: where to write into database when the status changes.
329 It contains a dict with
330 {collection: <str>, filter: {}, path: <str>},
331 e.g. {collection: "nsrs", filter:
332 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
333 :param float total_timeout:
334 """
335
336 @abc.abstractmethod
337 async def exec_primitive(
338 self,
339 ee_id: str,
340 primitive_name: str,
341 params_dict: dict,
342 db_dict: dict = None,
343 progress_timeout: float = None,
344 total_timeout: float = None,
345 ) -> str:
346 """
347 Execute a primitive in the execution environment
348
349 :param str ee_id: the one returned by create_execution_environment or
350 register_execution_environment
351 :param str primitive_name: must be one defined in the software. There is one
352 called 'config', where, for the proxy case, the 'credentials' of VM are
353 provided
354 :param dict params_dict: parameters of the action
355 :param dict db_dict: where to write into database when the status changes.
356 It contains a dict with
357 {collection: <str>, filter: {}, path: <str>},
358 e.g. {collection: "nsrs", filter:
359 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
360 :param float progress_timeout:
361 :param float total_timeout:
362 :returns str: primitive result, if ok. It raises exceptions in case of fail
363 """
364
365 async def disconnect(self):
366 """
367 Disconnect from VCA
368 """
369
370 """
371 ####################################################################################
372 ################################### P R I V A T E ##################################
373 ####################################################################################
374 """
375
376 def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
377 """
378 Split namespace components
379
380 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
381 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
382 """
383
384 # check parameters
385 if namespace is None or len(namespace) == 0:
386 raise N2VCBadArgumentsException(
387 "Argument namespace is mandatory", ["namespace"]
388 )
389
390 # split namespace components
391 parts = namespace.split(".")
392 nsi_id = None
393 ns_id = None
394 vnf_id = None
395 vdu_id = None
396 vdu_count = None
397 if len(parts) > 0 and len(parts[0]) > 0:
398 nsi_id = parts[0]
399 if len(parts) > 1 and len(parts[1]) > 0:
400 ns_id = parts[1]
401 if len(parts) > 2 and len(parts[2]) > 0:
402 vnf_id = parts[2]
403 if len(parts) > 3 and len(parts[3]) > 0:
404 vdu_id = parts[3]
405 vdu_parts = parts[3].split("-")
406 if len(vdu_parts) > 1:
407 vdu_id = vdu_parts[0]
408 vdu_count = vdu_parts[1]
409
410 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
411
412 async def write_app_status_to_db(
413 self,
414 db_dict: dict,
415 status: N2VCDeploymentStatus,
416 detailed_status: str,
417 vca_status: str,
418 entity_type: str,
419 vca_id: str = None,
420 ):
421 """
422 Write application status to database
423
424 :param: db_dict: DB dictionary
425 :param: status: Status of the application
426 :param: detailed_status: Detailed status
427 :param: vca_status: VCA status
428 :param: entity_type: Entity type ("application", "machine, and "action")
429 :param: vca_id: Id of the VCA. If None, the default VCA will be used.
430 """
431 if not db_dict:
432 self.log.debug("No db_dict => No database write")
433 return
434
435 # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
436 # .format(str(status.value), detailed_status, vca_status, entity_type))
437
438 try:
439
440 the_table = db_dict["collection"]
441 the_filter = db_dict["filter"]
442 the_path = db_dict["path"]
443 if not the_path[-1] == ".":
444 the_path = the_path + "."
445 update_dict = {
446 the_path + "status": str(status.value),
447 the_path + "detailed-status": detailed_status,
448 the_path + "VCA-status": vca_status,
449 the_path + "entity-type": entity_type,
450 the_path + "status-time": str(time.time()),
451 }
452
453 self.db.set_one(
454 table=the_table,
455 q_filter=the_filter,
456 update_dict=update_dict,
457 fail_on_empty=True,
458 )
459
460 # database callback
461 if self.on_update_db:
462 if asyncio.iscoroutinefunction(self.on_update_db):
463 await self.on_update_db(
464 the_table, the_filter, the_path, update_dict, vca_id=vca_id
465 )
466 else:
467 self.on_update_db(the_table, the_filter, the_path, update_dict, vca_id=vca_id)
468
469 except DbException as e:
470 if e.http_code == HTTPStatus.NOT_FOUND:
471 self.log.error(
472 "NOT_FOUND error: Exception writing status to database: {}".format(
473 e
474 )
475 )
476 else:
477 self.log.info("Exception writing status to database: {}".format(e))
478
479 def osm_status(self, entity_type: str, status: str) -> N2VCDeploymentStatus:
480 if status not in JujuStatusToOSM[entity_type]:
481 self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
482 return N2VCDeploymentStatus.UNKNOWN
483 return JujuStatusToOSM[entity_type][status]
484
485
486 def obj_to_yaml(obj: object) -> str:
487 # dump to yaml
488 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
489 # split lines
490 lines = dump_text.splitlines()
491 # remove !!python/object tags
492 yaml_text = ""
493 for line in lines:
494 index = line.find("!!python/object")
495 if index >= 0:
496 line = line[:index]
497 yaml_text += line + "\n"
498 return yaml_text
499
500
501 def obj_to_dict(obj: object) -> dict:
502 # convert obj to yaml
503 yaml_text = obj_to_yaml(obj)
504 # parse to dict
505 return yaml.load(yaml_text, Loader=yaml.Loader)