K8s action support
[osm/N2VC.git] / n2vc / n2vc_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23
24 import abc
25 import asyncio
26 import os
27 import subprocess
28 import shlex
29 import time
30 from enum import Enum
31 from http import HTTPStatus
32 from n2vc.loggable import Loggable
33 from n2vc.exceptions import N2VCBadArgumentsException
34 import yaml
35
36 from osm_common.dbmongo import DbException
37
38
39 class N2VCDeploymentStatus(Enum):
40 PENDING = 'pending'
41 RUNNING = 'running'
42 COMPLETED = 'completed'
43 FAILED = 'failed'
44 UNKNOWN = 'unknown'
45
46
47 class N2VCConnector(abc.ABC, Loggable):
48 """Generic N2VC connector
49
50 Abstract class
51 """
52
53 """
54 ##################################################################################################
55 ########################################## P U B L I C ###########################################
56 ##################################################################################################
57 """
58
59 def __init__(
60 self,
61 db: object,
62 fs: object,
63 log: object,
64 loop: object,
65 url: str,
66 username: str,
67 vca_config: dict,
68 on_update_db=None
69 ):
70 """Initialize N2VC abstract connector. It defines de API for VCA connectors
71
72 :param object db: Mongo object managing the MongoDB (repo common DbBase)
73 :param object fs: FileSystem object managing the package artifacts (repo common FsBase)
74 :param object log: the logging object to log to
75 :param object loop: the loop to use for asyncio (default current thread loop)
76 :param str url: a string that how to connect to the VCA (if needed, IP and port can be obtained from there)
77 :param str username: the username to authenticate with VCA
78 :param dict vca_config: Additional parameters for the specific VCA. For example, for juju it will contain:
79 secret: The password to authenticate with
80 public_key: The contents of the juju public SSH key
81 ca_cert str: The CA certificate used to authenticate
82 :param on_update_db: callback called when n2vc connector updates database. Received arguments:
83 table: e.g. "nsrs"
84 filter: e.g. {_id: <nsd-id> }
85 path: e.g. "_admin.deployed.VCA.3."
86 updated_data: e.g. , "{ _admin.deployed.VCA.3.status: 'xxx', etc }"
87 """
88
89 # parent class
90 Loggable.__init__(self, log=log, log_to_console=True, prefix='\nN2VC')
91
92 # check arguments
93 if db is None:
94 raise N2VCBadArgumentsException('Argument db is mandatory', ['db'])
95 if fs is None:
96 raise N2VCBadArgumentsException('Argument fs is mandatory', ['fs'])
97
98 self.log.info('url={}, username={}, vca_config={}'.format(
99 url, username, {k: v for k, v in vca_config.items() if k not in ("host", "port", "user", "secret",
100 "public_key", "ca_cert")}))
101
102 # store arguments into self
103 self.db = db
104 self.fs = fs
105 self.loop = loop or asyncio.get_event_loop()
106 self.url = url
107 self.username = username
108 self.vca_config = vca_config
109 self.on_update_db = on_update_db
110
111 # generate private/public key-pair
112 self.private_key_path = None
113 self.public_key_path = None
114 self.get_public_key()
115
116 @abc.abstractmethod
117 async def get_status(self, namespace: str, yaml_format: bool = True):
118 """Get namespace status
119
120 :param namespace: we obtain ns from namespace
121 :param yaml_format: returns a yaml string
122 """
123
124 # TODO: review which public key
125 def get_public_key(self) -> str:
126 """Get the VCA ssh-public-key
127
128 Returns the SSH public key from local mahine, to be injected into virtual machines to
129 be managed by the VCA.
130 First run, a ssh keypair will be created.
131 The public key is injected into a VM so that we can provision the
132 machine with Juju, after which Juju will communicate with the VM
133 directly via the juju agent.
134 """
135
136 public_key = ''
137
138 # Find the path where we expect our key lives (~/.ssh)
139 homedir = os.environ.get('HOME')
140 if not homedir:
141 self.warning('No HOME environment variable, using /tmp')
142 homedir = '/tmp'
143 sshdir = "{}/.ssh".format(homedir)
144 if not os.path.exists(sshdir):
145 os.mkdir(sshdir)
146
147 self.private_key_path = "{}/id_n2vc_rsa".format(sshdir)
148 self.public_key_path = "{}.pub".format(self.private_key_path)
149
150 # If we don't have a key generated, then we have to generate it using ssh-keygen
151 if not os.path.exists(self.private_key_path):
152 cmd = "ssh-keygen -t {} -b {} -N '' -f {}".format(
153 "rsa",
154 "4096",
155 self.private_key_path
156 )
157 # run command with arguments
158 subprocess.check_output(shlex.split(cmd))
159
160 # Read the public key. Only one public key (one line) in the file
161 with open(self.public_key_path, "r") as file:
162 public_key = file.readline()
163
164 return public_key
165
166 @abc.abstractmethod
167 async def create_execution_environment(
168 self,
169 namespace: str,
170 db_dict: dict,
171 reuse_ee_id: str = None,
172 progress_timeout: float = None,
173 total_timeout: float = None
174 ) -> (str, dict):
175 """Create an Execution Environment. Returns when it is created or raises an exception on failing
176
177 :param str namespace: Contains a dot separate string.
178 LCM will use: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
179 :param dict db_dict: where to write to database when the status changes.
180 It contains a dictionary with {collection: str, filter: {}, path: str},
181 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
182 :param str reuse_ee_id: ee id from an older execution. It allows us to reuse an older environment
183 :param float progress_timeout:
184 :param float total_timeout:
185 :returns str, dict: id of the new execution environment and credentials for it
186 (credentials can contains hostname, username, etc depending on underlying cloud)
187 """
188
189 @abc.abstractmethod
190 async def register_execution_environment(
191 self,
192 namespace: str,
193 credentials: dict,
194 db_dict: dict,
195 progress_timeout: float = None,
196 total_timeout: float = None
197 ) -> str:
198 """
199 Register an existing execution environment at the VCA
200
201 :param str namespace: same as create_execution_environment method
202 :param dict credentials: credentials to access the existing execution environment
203 (it can contains hostname, username, path to private key, etc depending on underlying cloud)
204 :param dict db_dict: where to write to database when the status changes.
205 It contains a dictionary with {collection: str, filter: {}, path: str},
206 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
207 :param float progress_timeout:
208 :param float total_timeout:
209 :returns str: id of the execution environment
210 """
211
212 @abc.abstractmethod
213 async def install_configuration_sw(
214 self,
215 ee_id: str,
216 artifact_path: str,
217 db_dict: dict,
218 progress_timeout: float = None,
219 total_timeout: float = None
220 ):
221 """
222 Install the software inside the execution environment identified by ee_id
223
224 :param str ee_id: the id of the execution environment returned by create_execution_environment
225 or register_execution_environment
226 :param str artifact_path: where to locate the artifacts (parent folder) using the self.fs
227 the final artifact path will be a combination of this artifact_path and additional string from
228 the config_dict (e.g. charm name)
229 :param dict db_dict: where to write into database when the status changes.
230 It contains a dict with {collection: <str>, filter: {}, path: <str>},
231 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
232 :param float progress_timeout:
233 :param float total_timeout:
234 """
235
236 @abc.abstractmethod
237 async def get_ee_ssh_public__key(
238 self,
239 ee_id: str,
240 db_dict: dict,
241 progress_timeout: float = None,
242 total_timeout: float = None
243 ) -> str:
244 """
245 Generate a priv/pub key pair in the execution environment and return the public key
246
247 :param str ee_id: the id of the execution environment returned by create_execution_environment
248 or register_execution_environment
249 :param dict db_dict: where to write into database when the status changes.
250 It contains a dict with {collection: <str>, filter: {}, path: <str>},
251 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
252 :param float progress_timeout:
253 :param float total_timeout:
254 :returns: public key of the execution environment
255 For the case of juju proxy charm ssh-layered, it is the one returned by 'get-ssh-public-key'
256 primitive.
257 It raises a N2VC exception if fails
258 """
259
260 @abc.abstractmethod
261 async def add_relation(
262 self,
263 ee_id_1: str,
264 ee_id_2: str,
265 endpoint_1: str,
266 endpoint_2: str
267 ):
268 """
269 Add a relation between two Execution Environments (using their associated endpoints).
270
271 :param str ee_id_1: The id of the first execution environment
272 :param str ee_id_2: The id of the second execution environment
273 :param str endpoint_1: The endpoint in the first execution environment
274 :param str endpoint_2: The endpoint in the second execution environment
275 """
276
277 # TODO
278 @abc.abstractmethod
279 async def remove_relation(
280 self
281 ):
282 """
283 """
284
285 # TODO
286 @abc.abstractmethod
287 async def deregister_execution_environments(
288 self
289 ):
290 """
291 """
292
293 @abc.abstractmethod
294 async def delete_namespace(
295 self,
296 namespace: str,
297 db_dict: dict = None,
298 total_timeout: float = None
299 ):
300 """
301 Remove a network scenario and its execution environments
302 :param namespace: [<nsi-id>].<ns-id>
303 :param dict db_dict: where to write into database when the status changes.
304 It contains a dict with {collection: <str>, filter: {}, path: <str>},
305 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
306 :param float total_timeout:
307 """
308
309 @abc.abstractmethod
310 async def delete_execution_environment(
311 self,
312 ee_id: str,
313 db_dict: dict = None,
314 total_timeout: float = None
315 ):
316 """
317 Delete an execution environment
318 :param str ee_id: id of the execution environment to delete
319 :param dict db_dict: where to write into database when the status changes.
320 It contains a dict with {collection: <str>, filter: {}, path: <str>},
321 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
322 :param float total_timeout:
323 """
324
325 @abc.abstractmethod
326 async def exec_primitive(
327 self,
328 ee_id: str,
329 primitive_name: str,
330 params_dict: dict,
331 db_dict: dict = None,
332 progress_timeout: float = None,
333 total_timeout: float = None
334 ) -> str:
335 """
336 Execute a primitive in the execution environment
337
338 :param str ee_id: the one returned by create_execution_environment or register_execution_environment
339 :param str primitive_name: must be one defined in the software. There is one called 'config',
340 where, for the proxy case, the 'credentials' of VM are provided
341 :param dict params_dict: parameters of the action
342 :param dict db_dict: where to write into database when the status changes.
343 It contains a dict with {collection: <str>, filter: {}, path: <str>},
344 e.g. {collection: "nsrs", filter: {_id: <nsd-id>, path: "_admin.deployed.VCA.3"}
345 :param float progress_timeout:
346 :param float total_timeout:
347 :returns str: primitive result, if ok. It raises exceptions in case of fail
348 """
349
350 async def disconnect(self):
351 """
352 Disconnect from VCA
353 """
354
355 """
356 ##################################################################################################
357 ########################################## P R I V A T E #########################################
358 ##################################################################################################
359 """
360
361 def _get_namespace_components(self, namespace: str) -> (str, str, str, str, str):
362 """
363 Split namespace components
364
365 :param namespace: [<nsi-id>].<ns-id>.<vnf-id>.<vdu-id>[-<count>]
366 :return: nsi_id, ns_id, vnf_id, vdu_id, vdu_count
367 """
368
369 # check parameters
370 if namespace is None or len(namespace) == 0:
371 raise N2VCBadArgumentsException('Argument namespace is mandatory', ['namespace'])
372
373 # split namespace components
374 parts = namespace.split('.')
375 nsi_id = None
376 ns_id = None
377 vnf_id = None
378 vdu_id = None
379 vdu_count = None
380 if len(parts) > 0 and len(parts[0]) > 0:
381 nsi_id = parts[0]
382 if len(parts) > 1 and len(parts[1]) > 0:
383 ns_id = parts[1]
384 if len(parts) > 2 and len(parts[2]) > 0:
385 vnf_id = parts[2]
386 if len(parts) > 3 and len(parts[3]) > 0:
387 vdu_id = parts[3]
388 vdu_parts = parts[3].split('-')
389 if len(vdu_parts) > 1:
390 vdu_id = vdu_parts[0]
391 vdu_count = vdu_parts[1]
392
393 return nsi_id, ns_id, vnf_id, vdu_id, vdu_count
394
395 async def write_app_status_to_db(
396 self,
397 db_dict: dict,
398 status: N2VCDeploymentStatus,
399 detailed_status: str,
400 vca_status: str,
401 entity_type: str
402 ):
403 if not db_dict:
404 self.log.debug('No db_dict => No database write')
405 return
406
407 # self.log.debug('status={} / detailed-status={} / VCA-status={} / entity_type={}'
408 # .format(str(status.value), detailed_status, vca_status, entity_type))
409
410 try:
411
412 the_table = db_dict['collection']
413 the_filter = db_dict['filter']
414 the_path = db_dict['path']
415 if not the_path[-1] == '.':
416 the_path = the_path + '.'
417 update_dict = {
418 the_path + 'status': str(status.value),
419 the_path + 'detailed-status': detailed_status,
420 the_path + 'VCA-status': vca_status,
421 the_path + 'entity-type': entity_type,
422 the_path + 'status-time': str(time.time()),
423 }
424
425 self.db.set_one(
426 table=the_table,
427 q_filter=the_filter,
428 update_dict=update_dict,
429 fail_on_empty=True
430 )
431
432 # database callback
433 if self.on_update_db:
434 if asyncio.iscoroutinefunction(self.on_update_db):
435 await self.on_update_db(the_table, the_filter, the_path, update_dict)
436 else:
437 self.on_update_db(the_table, the_filter, the_path, update_dict)
438
439 except DbException as e:
440 if e.http_code == HTTPStatus.NOT_FOUND:
441 self.log.error('NOT_FOUND error: Exception writing status to database: {}'.format(e))
442 else:
443 self.log.info('Exception writing status to database: {}'.format(e))
444
445
446 def juju_status_2_osm_status(type: str, status: str) -> N2VCDeploymentStatus:
447 if type == 'application' or type == 'unit':
448 if status in ['waiting', 'maintenance']:
449 return N2VCDeploymentStatus.RUNNING
450 if status in ['error']:
451 return N2VCDeploymentStatus.FAILED
452 elif status in ['active']:
453 return N2VCDeploymentStatus.COMPLETED
454 elif status in ['blocked']:
455 return N2VCDeploymentStatus.RUNNING
456 else:
457 return N2VCDeploymentStatus.UNKNOWN
458 elif type == 'action':
459 if status in ['running']:
460 return N2VCDeploymentStatus.RUNNING
461 elif status in ['completed']:
462 return N2VCDeploymentStatus.COMPLETED
463 else:
464 return N2VCDeploymentStatus.UNKNOWN
465 elif type == 'machine':
466 if status in ['pending']:
467 return N2VCDeploymentStatus.PENDING
468 elif status in ['started']:
469 return N2VCDeploymentStatus.COMPLETED
470 else:
471 return N2VCDeploymentStatus.UNKNOWN
472
473 return N2VCDeploymentStatus.FAILED
474
475
476 def obj_to_yaml(obj: object) -> str:
477 # dump to yaml
478 dump_text = yaml.dump(obj, default_flow_style=False, indent=2)
479 # split lines
480 lines = dump_text.splitlines()
481 # remove !!python/object tags
482 yaml_text = ''
483 for line in lines:
484 index = line.find('!!python/object')
485 if index >= 0:
486 line = line[:index]
487 yaml_text += line + '\n'
488 return yaml_text
489
490
491 def obj_to_dict(obj: object) -> dict:
492 # convert obj to yaml
493 yaml_text = obj_to_yaml(obj)
494 # parse to dict
495 return yaml.load(yaml_text, Loader=yaml.Loader)