Store k8s controller information in Mongo, and remove controller attribute from K8sJu...
[osm/N2VC.git] / n2vc / n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 import abc
25 import asyncio
26 from http import HTTPStatus
27 import os
28 import shlex
29 import subprocess
30 import time
31
32 from n2vc.exceptions import N2VCBadArgumentsException
33 from osm_common.dbmongo import DbException
34 import yaml
35
36 from n2vc.loggable import Loggable
37 from n2vc.utils import EntityType, JujuStatusToOSM, N2VCDeploymentStatus
38
39
40 class N2VCConnector(abc.ABC, Loggable):
41 """Generic N2VC connector
42
43 Abstract class
44 """
45
46 """
47 ####################################################################################
48 ################################### P U B L I C ####################################
49 ####################################################################################
50 """
51
52 def __init__(
53 self,
54 db: object,
55 fs: object,
56 log: object,
57 loop: object,
58 url: str,
59 username: str,
60 vca_config: dict,
61 on_update_db=None,
62 ):
63 """Initialize N2VC abstract connector. It defines de API for VCA connectors
64
65 :param object db: Mongo object managing the MongoDB (repo common DbBase)
66 :param object fs: FileSystem object managing the package artifacts (repo common
67 FsBase)
68 :param object log: the logging object to log to
69 :param object loop: the loop to use for asyncio (default current thread loop)
70 :param str url: a string that how to connect to the VCA (if needed, IP and port
71 can be obtained from there)
72 :param str username: the username to authenticate with VCA
73 :param dict vca_config: Additional parameters for the specific VCA. For example,
74 for juju it will contain:
75 secret: The password to authenticate with
76 public_key: The contents of the juju public SSH key
77 ca_cert str: The CA certificate used to authenticate
78 :param on_update_db: callback called when n2vc connector updates database.
79 Received arguments:
80 table: e.g. "nsrs"
81 filter: e.g. {_id: <nsd-id> }
82 path: e.g. "_admin.deployed.VCA.3."
83 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
84 """
85
86 # parent class
87 Loggable.__init__(self, log=log, log_to_console=True, prefix="\nN2VC")
88
89 # check arguments
90 if db is None:
91 raise N2VCBadArgumentsException("Argument db is mandatory", ["db"])
92 if fs is None:
93 raise N2VCBadArgumentsException("Argument fs is mandatory", ["fs"])
94
95 self.log.info(
96 "url={}, username={}, vca_config={}".format(
97 url,
98 username,
99 {
100 k: v
101 for k, v in vca_config.items()
102 if k
103 not in ("host", "port", "user", "secret", "public_key", "ca_cert")
104 },
105 )
106 )
107
108 # store arguments into self
109 self.db = db
110 self.fs = fs
111 self.loop = loop or asyncio.get_event_loop()
112 self.url = url
113 self.username = username
114 self.vca_config = vca_config
115 self.on_update_db = on_update_db
116
117 # generate private/public key-pair
118 self.private_key_path = None
119 self.public_key_path = None
120 self.get_public_key()
121
122 @abc.abstractmethod
123 async def get_status(self, namespace: str, yaml_format: bool = True):
124 """Get namespace status
125
126 :param namespace: we obtain ns from namespace
127 :param yaml_format: returns a yaml string
128 """
129
130 # TODO: review which public key
131 def get_public_key(self) -> str:
132 """Get the VCA ssh-public-key
133
134 Returns the SSH public key from local mahine, to be injected into virtual
135 machines to be managed by the VCA.
136 First run, a ssh keypair will be created.
137 The public key is injected into a VM so that we can provision the
138 machine with Juju, after which Juju will communicate with the VM
139 directly via the juju agent.
140 """
141
142 # Find the path where we expect our key lives (~/.ssh)
143 homedir = os.environ.get("HOME")
144 if not homedir:
145 self.log.warning("No HOME environment variable, using /tmp")
146 homedir = "/tmp"
147 sshdir = "{}/.ssh".format(homedir)
148 if not os.path.exists(sshdir):
149 os.mkdir(sshdir)
150
151 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
152 self.public_key_path = "{}.pub".format(self.private_key_path)
153
154 # If we don't have a key generated, then we have to generate it using ssh-keygen
155 if not os.path.exists(self.private_key_path):
156 cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
157 "rsa", "4096", self.private_key_path
158 )
159 # run command with arguments
160 subprocess.check_output(shlex.split(cmd))
161
162 # Read the public key. Only one public key (one line) in the file
163 with open(self.public_key_path, "r") as file:
164 public_key = file.readline()
165
166 return public_key
167
168 @abc.abstractmethod
169 async def create_execution_environment(
170 self,
171 namespace: str,
172 db_dict: dict,
173 reuse_ee_id: str = None,
174 progress_timeout: float = None,
175 total_timeout: float = None,
176 ) -> (str, dict):
177 """Create an Execution Environment. Returns when it is created or raises an
178 exception on failing
179
180 :param str namespace: Contains a dot separate string.
181 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
182 :param dict db_dict: where to write to database when the status changes.
183 It contains a dictionary with {collection: str, filter: {}, path: str},
184 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path:
185 "_admin.deployed.VCA.3"}
186 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an
187 older environment
188 :param float progress_timeout:
189 :param float total_timeout:
190 :returns str, dict: id of the new execution environment and credentials for it
191 (credentials can contains hostname, username, etc depending on
192 underlying cloud)
193 """
194
195 @abc.abstractmethod
196 async def register_execution_environment(
197 self,
198 namespace: str,
199 credentials: dict,
200 db_dict: dict,
201 progress_timeout: float = None,
202 total_timeout: float = None,
203 ) -> str:
204 """
205 Register an existing execution environment at the VCA
206
207 :param str namespace: same as create_execution_environment method
208 :param dict credentials: credentials to access the existing execution
209 environment
210 (it can contains hostname, username, path to private key, etc depending on
211 underlying cloud)
212 :param dict db_dict: where to write to database when the status changes.
213 It contains a dictionary with {collection: str, filter: {}, path: str},
214 e.g. {collection: "nsrs", filter:
215 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
216 :param float progress_timeout:
217 :param float total_timeout:
218 :returns str: id of the execution environment
219 """
220
221 @abc.abstractmethod
222 async def install_configuration_sw(
223 self,
224 ee_id: str,
225 artifact_path: str,
226 db_dict: dict,
227 progress_timeout: float = None,
228 total_timeout: float = None,
229 ):
230 """
231 Install the software inside the execution environment identified by ee_id
232
233 :param str ee_id: the id of the execution environment returned by
234 create_execution_environment or register_execution_environment
235 :param str artifact_path: where to locate the artifacts (parent folder) using
236 the self.fs
237 the final artifact path will be a combination of this artifact_path and
238 additional string from the config_dict (e.g. charm name)
239 :param dict db_dict: where to write into database when the status changes.
240 It contains a dict with
241 {collection: <str>, filter: {}, path: <str>},
242 e.g. {collection: "nsrs", filter:
243 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
244 :param float progress_timeout:
245 :param float total_timeout:
246 """
247
248 @abc.abstractmethod
249 async def install_k8s_proxy_charm(
250 self,
251 charm_name: str,
252 namespace: str,
253 artifact_path: str,
254 db_dict: dict,
255 progress_timeout: float = None,
256 total_timeout: float = None,
257 config: dict = None,
258 ) -> str:
259 """
260 Install a k8s proxy charm
261
262 :param charm_name: Name of the charm being deployed
263 :param namespace: collection of all the uuids related to the charm.
264 :param str artifact_path: where to locate the artifacts (parent folder) using
265 the self.fs
266 the final artifact path will be a combination of this artifact_path and
267 additional string from the config_dict (e.g. charm name)
268 :param dict db_dict: where to write into database when the status changes.
269 It contains a dict with
270 {collection: <str>, filter: {}, path: <str>},
271 e.g. {collection: "nsrs", filter:
272 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
273 :param float progress_timeout:
274 :param float total_timeout:
275 :param config: Dictionary with additional configuration
276
277 :returns ee_id: execution environment id.
278 """
279
280 @abc.abstractmethod
281 async def get_ee_ssh_public__key(
282 self,
283 ee_id: str,
284 db_dict: dict,
285 progress_timeout: float = None,
286 total_timeout: float = None,
287 ) -> str:
288 """
289 Generate a priv/pub key pair in the execution environment and return the public
290 key
291
292 :param str ee_id: the id of the execution environment returned by
293 create_execution_environment or register_execution_environment
294 :param dict db_dict: where to write into database when the status changes.
295 It contains a dict with
296 {collection: <str>, filter: {}, path: <str>},
297 e.g. {collection: "nsrs", filter:
298 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
299 :param float progress_timeout:
300 :param float total_timeout:
301 :returns: public key of the execution environment
302 For the case of juju proxy charm ssh-layered, it is the one
303 returned by 'get-ssh-public-key' primitive.
304 It raises a N2VC exception if fails
305 """
306
307 @abc.abstractmethod
308 async def add_relation(
309 self, ee_id_1: str, ee_id_2: str, endpoint_1: str, endpoint_2: str
310 ):
311 """
312 Add a relation between two Execution Environments (using their associated
313 endpoints).
314
315 :param str ee_id_1: The id of the first execution environment
316 :param str ee_id_2: The id of the second execution environment
317 :param str endpoint_1: The endpoint in the first execution environment
318 :param str endpoint_2: The endpoint in the second execution environment
319 """
320
321 # TODO
322 @abc.abstractmethod
323 async def remove_relation(self):
324 """
325 """
326
327 # TODO
328 @abc.abstractmethod
329 async def deregister_execution_environments(self):
330 """
331 """
332
333 @abc.abstractmethod
334 async def delete_namespace(
335 self, namespace: str, db_dict: dict = None, total_timeout: float = None
336 ):
337 """
338 Remove a network scenario and its execution environments
339 :param namespace: [<nsi-id>].<ns-id>
340 :param dict db_dict: where to write into database when the status changes.
341 It contains a dict with
342 {collection: <str>, filter: {}, path: <str>},
343 e.g. {collection: "nsrs", filter:
344 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
345 :param float total_timeout:
346 """
347
348 @abc.abstractmethod
349 async def delete_execution_environment(
350 self, ee_id: str, db_dict: dict = None, total_timeout: float = None
351 ):
352 """
353 Delete an execution environment
354 :param str ee_id: id of the execution environment to delete
355 :param dict db_dict: where to write into database when the status changes.
356 It contains a dict with
357 {collection: <str>, filter: {}, path: <str>},
358 e.g. {collection: "nsrs", filter:
359 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
360 :param float total_timeout:
361 """
362
363 @abc.abstractmethod
364 async def exec_primitive(
365 self,
366 ee_id: str,
367 primitive_name: str,
368 params_dict: dict,
369 db_dict: dict = None,
370 progress_timeout: float = None,
371 total_timeout: float = None,
372 ) -> str:
373 """
374 Execute a primitive in the execution environment
375
376 :param str ee_id: the one returned by create_execution_environment or
377 register_execution_environment
378 :param str primitive_name: must be one defined in the software. There is one
379 called 'config', where, for the proxy case, the 'credentials' of VM are
380 provided
381 :param dict params_dict: parameters of the action
382 :param dict db_dict: where to write into database when the status changes.
383 It contains a dict with
384 {collection: <str>, filter: {}, path: <str>},
385 e.g. {collection: "nsrs", filter:
386 {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
387 :param float progress_timeout:
388 :param float total_timeout:
389 :returns str: primitive result, if ok. It raises exceptions in case of fail
390 """
391
392 async def disconnect(self):
393 """
394 Disconnect from VCA
395 """
396
397 """
398 ####################################################################################
399 ################################### P R I V A T E ##################################
400 ####################################################################################
401 """
402
403 def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
404 """
405 Split namespace components
406
407 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
408 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
409 """
410
411 # check parameters
412 if namespace is None or len(namespace) == 0:
413 raise N2VCBadArgumentsException(
414 "Argument namespace is mandatory", ["namespace"]
415 )
416
417 # split namespace components
418 parts = namespace.split(".")
419 nsi_id = None
420 ns_id = None
421 vnf_id = None
422 vdu_id = None
423 vdu_count = None
424 if len(parts) > 0 and len(parts[0]) > 0:
425 nsi_id = parts[0]
426 if len(parts) > 1 and len(parts[1]) > 0:
427 ns_id = parts[1]
428 if len(parts) > 2 and len(parts[2]) > 0:
429 vnf_id = parts[2]
430 if len(parts) > 3 and len(parts[3]) > 0:
431 vdu_id = parts[3]
432 vdu_parts = parts[3].split("-")
433 if len(vdu_parts) > 1:
434 vdu_id = vdu_parts[0]
435 vdu_count = vdu_parts[1]
436
437 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
438
439 async def write_app_status_to_db(
440 self,
441 db_dict: dict,
442 status: N2VCDeploymentStatus,
443 detailed_status: str,
444 vca_status: str,
445 entity_type: str,
446 ):
447 if not db_dict:
448 self.log.debug("No db_dict => No database write")
449 return
450
451 # self.log.debug('status={} / detailed-status={} / VCA-status={}/entity_type={}'
452 # .format(str(status.value), detailed_status, vca_status, entity_type))
453
454 try:
455
456 the_table = db_dict["collection"]
457 the_filter = db_dict["filter"]
458 the_path = db_dict["path"]
459 if not the_path[-1] == ".":
460 the_path = the_path + "."
461 update_dict = {
462 the_path + "status": str(status.value),
463 the_path + "detailed-status": detailed_status,
464 the_path + "VCA-status": vca_status,
465 the_path + "entity-type": entity_type,
466 the_path + "status-time": str(time.time()),
467 }
468
469 self.db.set_one(
470 table=the_table,
471 q_filter=the_filter,
472 update_dict=update_dict,
473 fail_on_empty=True,
474 )
475
476 # database callback
477 if self.on_update_db:
478 if asyncio.iscoroutinefunction(self.on_update_db):
479 await self.on_update_db(
480 the_table, the_filter, the_path, update_dict
481 )
482 else:
483 self.on_update_db(the_table, the_filter, the_path, update_dict)
484
485 except DbException as e:
486 if e.http_code == HTTPStatus.NOT_FOUND:
487 self.log.error(
488 "NOT_FOUND error: Exception writing status to database: {}".format(
489 e
490 )
491 )
492 else:
493 self.log.info("Exception writing status to database: {}".format(e))
494
495 def osm_status(self, entity_type: EntityType, status: str) -> N2VCDeploymentStatus:
496 if status not in JujuStatusToOSM[entity_type]:
497 self.log.warning("Status {} not found in JujuStatusToOSM.".format(status))
498 return N2VCDeploymentStatus.UNKNOWN
499 return JujuStatusToOSM[entity_type][status]
500
501
502 # DEPRECATED
503 def juju_status_2_osm_status(statustype: str, status: str) -> N2VCDeploymentStatus:
504 if statustype == "application" or statustype == "unit":
505 if status in ["waiting", "maintenance"]:
506 return N2VCDeploymentStatus.RUNNING
507 if status in ["error"]:
508 return N2VCDeploymentStatus.FAILED
509 elif status in ["active"]:
510 return N2VCDeploymentStatus.COMPLETED
511 elif status in ["blocked"]:
512 return N2VCDeploymentStatus.RUNNING
513 else:
514 return N2VCDeploymentStatus.UNKNOWN
515 elif statustype == "action":
516 if status in ["running"]:
517 return N2VCDeploymentStatus.RUNNING
518 elif status in ["completed"]:
519 return N2VCDeploymentStatus.COMPLETED
520 else:
521 return N2VCDeploymentStatus.UNKNOWN
522 elif statustype == "machine":
523 if status in ["pending"]:
524 return N2VCDeploymentStatus.PENDING
525 elif status in ["started"]:
526 return N2VCDeploymentStatus.COMPLETED
527 else:
528 return N2VCDeploymentStatus.UNKNOWN
529
530 return N2VCDeploymentStatus.FAILED
531
532
533 def obj_to_yaml(obj: object) -> str:
534 # dump to yaml
535 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
536 # split lines
537 lines = dump_text.splitlines()
538 # remove !!python/object tags
539 yaml_text = ""
540 for line in lines:
541 index = line.find("!!python/object")
542 if index >= 0:
543 line = line[:index]
544 yaml_text += line + "\n"
545 return yaml_text
546
547
548 def obj_to_dict(obj: object) -> dict:
549 # convert obj to yaml
550 yaml_text = obj_to_yaml(obj)
551 # parse to dict
552 return yaml.load(yaml_text, Loader=yaml.Loader)