bac0f2c7f9f9aa1e14ed61893796e51e76aad03f
[osm/N2VC.git] / n2vc / n2vc_juju_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import logging
24 import os
25 import asyncio
26 import time
27 import base64
28 import binascii
29 import re
30
31 from n2vc.n2vc_conn import N2VCConnector
32 from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
33 from n2vc.exceptions \
34 import N2VCBadArgumentsException, N2VCException, N2VCConnectionException, \
35 N2VCExecutionException, N2VCInvalidCertificate, N2VCNotFound
36 from n2vc.juju_observer import JujuModelObserver
37
38 from juju.controller import Controller
39 from juju.model import Model
40 from juju.application import Application
41 from juju.action import Action
42 from juju.machine import Machine
43 from juju.client import client
44 from juju.errors import JujuAPIError
45
46 from n2vc.provisioner import SSHProvisioner
47
48
49 class N2VCJujuConnector(N2VCConnector):
50
51 """
52 ##################################################################################################
53 ########################################## P U B L I C ###########################################
54 ##################################################################################################
55 """
56
57 def __init__(
58 self,
59 db: object,
60 fs: object,
61 log: object = None,
62 loop: object = None,
63 url: str = '127.0.0.1:17070',
64 username: str = 'admin',
65 vca_config: dict = None,
66 on_update_db=None
67 ):
68 """Initialize juju N2VC connector
69 """
70
71 # parent class constructor
72 N2VCConnector.__init__(
73 self,
74 db=db,
75 fs=fs,
76 log=log,
77 loop=loop,
78 url=url,
79 username=username,
80 vca_config=vca_config,
81 on_update_db=on_update_db
82 )
83
84 # silence websocket traffic log
85 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
86 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
87 logging.getLogger('model').setLevel(logging.WARN)
88
89 self.log.info('Initializing N2VC juju connector...')
90
91 """
92 ##############################################################
93 # check arguments
94 ##############################################################
95 """
96
97 # juju URL
98 if url is None:
99 raise N2VCBadArgumentsException('Argument url is mandatory', ['url'])
100 url_parts = url.split(':')
101 if len(url_parts) != 2:
102 raise N2VCBadArgumentsException('Argument url: bad format (localhost:port) -> {}'.format(url), ['url'])
103 self.hostname = url_parts[0]
104 try:
105 self.port = int(url_parts[1])
106 except ValueError:
107 raise N2VCBadArgumentsException('url port must be a number -> {}'.format(url), ['url'])
108
109 # juju USERNAME
110 if username is None:
111 raise N2VCBadArgumentsException('Argument username is mandatory', ['username'])
112
113 # juju CONFIGURATION
114 if vca_config is None:
115 raise N2VCBadArgumentsException('Argument vca_config is mandatory', ['vca_config'])
116
117 if 'secret' in vca_config:
118 self.secret = vca_config['secret']
119 else:
120 raise N2VCBadArgumentsException('Argument vca_config.secret is mandatory', ['vca_config.secret'])
121
122 # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub
123 # if exists, it will be written in lcm container: _create_juju_public_key()
124 if 'public_key' in vca_config:
125 self.public_key = vca_config['public_key']
126 else:
127 self.public_key = None
128
129 # TODO: Verify ca_cert is valid before using. VCA will crash
130 # if the ca_cert isn't formatted correctly.
131 def base64_to_cacert(b64string):
132 """Convert the base64-encoded string containing the VCA CACERT.
133
134 The input string....
135
136 """
137 try:
138 cacert = base64.b64decode(b64string).decode("utf-8")
139
140 cacert = re.sub(
141 r'\\n',
142 r'\n',
143 cacert,
144 )
145 except binascii.Error as e:
146 self.log.debug("Caught binascii.Error: {}".format(e))
147 raise N2VCInvalidCertificate(message="Invalid CA Certificate")
148
149 return cacert
150
151 self.ca_cert = vca_config.get('ca_cert')
152 if self.ca_cert:
153 self.ca_cert = base64_to_cacert(vca_config['ca_cert'])
154
155 if 'api_proxy' in vca_config:
156 self.api_proxy = vca_config['api_proxy']
157 self.log.debug('api_proxy for native charms configured: {}'.format(self.api_proxy))
158 else:
159 self.warning('api_proxy is not configured. Support for native charms is disabled')
160
161 if 'enable_os_upgrade' in vca_config:
162 self.enable_os_upgrade = vca_config['enable_os_upgrade']
163 else:
164 self.enable_os_upgrade = True
165
166 if 'apt_mirror' in vca_config:
167 self.apt_mirror = vca_config['apt_mirror']
168 else:
169 self.apt_mirror = None
170
171 self.cloud = vca_config.get('cloud')
172 self.log.debug('Arguments have been checked')
173
174 # juju data
175 self.controller = None # it will be filled when connect to juju
176 self.juju_models = {} # model objects for every model_name
177 self.juju_observers = {} # model observers for every model_name
178 self._connecting = False # while connecting to juju (to avoid duplicate connections)
179 self._authenticated = False # it will be True when juju connection be stablished
180 self._creating_model = False # True during model creation
181
182 # create juju pub key file in lcm container at ./local/share/juju/ssh/juju_id_rsa.pub
183 self._create_juju_public_key()
184
185 self.log.info('N2VC juju connector initialized')
186
187 async def get_status(self, namespace: str, yaml_format: bool = True):
188
189 # self.log.info('Getting NS status. namespace: {}'.format(namespace))
190
191 if not self._authenticated:
192 await self._juju_login()
193
194 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
195 # model name is ns_id
196 model_name = ns_id
197 if model_name is None:
198 msg = 'Namespace {} not valid'.format(namespace)
199 self.log.error(msg)
200 raise N2VCBadArgumentsException(msg, ['namespace'])
201
202 # get juju model (create model if needed)
203 model = await self._juju_get_model(model_name=model_name)
204
205 status = await model.get_status()
206
207 if yaml_format:
208 return obj_to_yaml(status)
209 else:
210 return obj_to_dict(status)
211
212 async def create_execution_environment(
213 self,
214 namespace: str,
215 db_dict: dict,
216 reuse_ee_id: str = None,
217 progress_timeout: float = None,
218 total_timeout: float = None
219 ) -> (str, dict):
220
221 self.log.info('Creating execution environment. namespace: {}, reuse_ee_id: {}'.format(namespace, reuse_ee_id))
222
223 if not self._authenticated:
224 await self._juju_login()
225
226 machine_id = None
227 if reuse_ee_id:
228 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=reuse_ee_id)
229 else:
230 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
231 # model name is ns_id
232 model_name = ns_id
233 # application name
234 application_name = self._get_application_name(namespace=namespace)
235
236 self.log.debug('model name: {}, application name: {}, machine_id: {}'
237 .format(model_name, application_name, machine_id))
238
239 # create or reuse a new juju machine
240 try:
241 machine = await self._juju_create_machine(
242 model_name=model_name,
243 application_name=application_name,
244 machine_id=machine_id,
245 db_dict=db_dict,
246 progress_timeout=progress_timeout,
247 total_timeout=total_timeout
248 )
249 except Exception as e:
250 message = 'Error creating machine on juju: {}'.format(e)
251 self.log.error(message)
252 raise N2VCException(message=message)
253
254 # id for the execution environment
255 ee_id = N2VCJujuConnector._build_ee_id(
256 model_name=model_name,
257 application_name=application_name,
258 machine_id=str(machine.entity_id)
259 )
260 self.log.debug('ee_id: {}'.format(ee_id))
261
262 # new machine credentials
263 credentials = dict()
264 credentials['hostname'] = machine.dns_name
265
266 self.log.info('Execution environment created. ee_id: {}, credentials: {}'.format(ee_id, credentials))
267
268 return ee_id, credentials
269
270 async def register_execution_environment(
271 self,
272 namespace: str,
273 credentials: dict,
274 db_dict: dict,
275 progress_timeout: float = None,
276 total_timeout: float = None
277 ) -> str:
278
279 if not self._authenticated:
280 await self._juju_login()
281
282 self.log.info('Registering execution environment. namespace={}, credentials={}'.format(namespace, credentials))
283
284 if credentials is None:
285 raise N2VCBadArgumentsException(message='credentials are mandatory', bad_args=['credentials'])
286 if credentials.get('hostname'):
287 hostname = credentials['hostname']
288 else:
289 raise N2VCBadArgumentsException(message='hostname is mandatory', bad_args=['credentials.hostname'])
290 if credentials.get('username'):
291 username = credentials['username']
292 else:
293 raise N2VCBadArgumentsException(message='username is mandatory', bad_args=['credentials.username'])
294 if 'private_key_path' in credentials:
295 private_key_path = credentials['private_key_path']
296 else:
297 # if not passed as argument, use generated private key path
298 private_key_path = self.private_key_path
299
300 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
301
302 # model name
303 model_name = ns_id
304 # application name
305 application_name = self._get_application_name(namespace=namespace)
306
307 # register machine on juju
308 try:
309 machine_id = await self._juju_provision_machine(
310 model_name=model_name,
311 hostname=hostname,
312 username=username,
313 private_key_path=private_key_path,
314 db_dict=db_dict,
315 progress_timeout=progress_timeout,
316 total_timeout=total_timeout
317 )
318 except Exception as e:
319 self.log.error('Error registering machine: {}'.format(e))
320 raise N2VCException(message='Error registering machine on juju: {}'.format(e))
321
322 self.log.info('Machine registered: {}'.format(machine_id))
323
324 # id for the execution environment
325 ee_id = N2VCJujuConnector._build_ee_id(
326 model_name=model_name,
327 application_name=application_name,
328 machine_id=str(machine_id)
329 )
330
331 self.log.info('Execution environment registered. ee_id: {}'.format(ee_id))
332
333 return ee_id
334
335 async def install_configuration_sw(
336 self,
337 ee_id: str,
338 artifact_path: str,
339 db_dict: dict,
340 progress_timeout: float = None,
341 total_timeout: float = None,
342 config: dict = None,
343 ):
344
345 self.log.info('Installing configuration sw on ee_id: {}, artifact path: {}, db_dict: {}'
346 .format(ee_id, artifact_path, db_dict))
347
348 if not self._authenticated:
349 await self._juju_login()
350
351 # check arguments
352 if ee_id is None or len(ee_id) == 0:
353 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
354 if artifact_path is None or len(artifact_path) == 0:
355 raise N2VCBadArgumentsException(message='artifact_path is mandatory', bad_args=['artifact_path'])
356 if db_dict is None:
357 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
358
359 try:
360 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
361 self.log.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
362 except Exception as e:
363 raise N2VCBadArgumentsException(
364 message='ee_id={} is not a valid execution environment id'.format(ee_id),
365 bad_args=['ee_id']
366 )
367
368 # remove // in charm path
369 while artifact_path.find('//') >= 0:
370 artifact_path = artifact_path.replace('//', '/')
371
372 # check charm path
373 if not self.fs.file_exists(artifact_path, mode="dir"):
374 msg = 'artifact path does not exist: {}'.format(artifact_path)
375 raise N2VCBadArgumentsException(message=msg, bad_args=['artifact_path'])
376
377 if artifact_path.startswith('/'):
378 full_path = self.fs.path + artifact_path
379 else:
380 full_path = self.fs.path + '/' + artifact_path
381
382 try:
383 application, retries = await self._juju_deploy_charm(
384 model_name=model_name,
385 application_name=application_name,
386 charm_path=full_path,
387 machine_id=machine_id,
388 db_dict=db_dict,
389 progress_timeout=progress_timeout,
390 total_timeout=total_timeout,
391 config=config
392 )
393 except Exception as e:
394 raise N2VCException(message='Error desploying charm into ee={} : {}'.format(ee_id, e))
395
396 self.log.info('Configuration sw installed')
397
398 async def get_ee_ssh_public__key(
399 self,
400 ee_id: str,
401 db_dict: dict,
402 progress_timeout: float = None,
403 total_timeout: float = None
404 ) -> str:
405
406 self.log.info('Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}'.format(ee_id, db_dict))
407
408 if not self._authenticated:
409 await self._juju_login()
410
411 # check arguments
412 if ee_id is None or len(ee_id) == 0:
413 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
414 if db_dict is None:
415 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
416
417 try:
418 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
419 self.log.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
420 except Exception as e:
421 raise N2VCBadArgumentsException(
422 message='ee_id={} is not a valid execution environment id'.format(ee_id),
423 bad_args=['ee_id']
424 )
425
426 # try to execute ssh layer primitives (if exist):
427 # generate-ssh-key
428 # get-ssh-public-key
429
430 output = None
431
432 # execute action: generate-ssh-key
433 try:
434 output, status = await self._juju_execute_action(
435 model_name=model_name,
436 application_name=application_name,
437 action_name='generate-ssh-key',
438 db_dict=db_dict,
439 progress_timeout=progress_timeout,
440 total_timeout=total_timeout
441 )
442 except Exception as e:
443 self.log.info('Cannot execute action generate-ssh-key: {}\nContinuing...'.format(e))
444
445 # execute action: get-ssh-public-key
446 try:
447 output, status = await self._juju_execute_action(
448 model_name=model_name,
449 application_name=application_name,
450 action_name='get-ssh-public-key',
451 db_dict=db_dict,
452 progress_timeout=progress_timeout,
453 total_timeout=total_timeout
454 )
455 except Exception as e:
456 msg = 'Cannot execute action get-ssh-public-key: {}\n'.format(e)
457 self.log.info(msg)
458 raise e
459
460 # return public key if exists
461 return output["pubkey"] if "pubkey" in output else output
462
463 async def add_relation(
464 self,
465 ee_id_1: str,
466 ee_id_2: str,
467 endpoint_1: str,
468 endpoint_2: str
469 ):
470
471 self.log.debug('adding new relation between {} and {}, endpoints: {}, {}'
472 .format(ee_id_1, ee_id_2, endpoint_1, endpoint_2))
473
474 # check arguments
475 if not ee_id_1:
476 message = 'EE 1 is mandatory'
477 self.log.error(message)
478 raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_1'])
479 if not ee_id_2:
480 message = 'EE 2 is mandatory'
481 self.log.error(message)
482 raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_2'])
483 if not endpoint_1:
484 message = 'endpoint 1 is mandatory'
485 self.log.error(message)
486 raise N2VCBadArgumentsException(message=message, bad_args=['endpoint_1'])
487 if not endpoint_2:
488 message = 'endpoint 2 is mandatory'
489 self.log.error(message)
490 raise N2VCBadArgumentsException(message=message, bad_args=['endpoint_2'])
491
492 if not self._authenticated:
493 await self._juju_login()
494
495 # get the model, the applications and the machines from the ee_id's
496 model_1, app_1, machine_1 = self._get_ee_id_components(ee_id_1)
497 model_2, app_2, machine_2 = self._get_ee_id_components(ee_id_2)
498
499 # model must be the same
500 if model_1 != model_2:
501 message = 'EE models are not the same: {} vs {}'.format(ee_id_1, ee_id_2)
502 self.log.error(message)
503 raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_1', 'ee_id_2'])
504
505 # add juju relations between two applications
506 try:
507 await self._juju_add_relation(
508 model_name=model_1,
509 application_name_1=app_1,
510 application_name_2=app_2,
511 relation_1=endpoint_1,
512 relation_2=endpoint_2
513 )
514 except Exception as e:
515 message = 'Error adding relation between {} and {}'.format(ee_id_1, ee_id_2)
516 self.log.error(message)
517 raise N2VCException(message=message)
518
519 async def remove_relation(
520 self
521 ):
522 if not self._authenticated:
523 await self._juju_login()
524 # TODO
525 self.log.info('Method not implemented yet')
526 raise NotImplemented()
527
528 async def deregister_execution_environments(
529 self
530 ):
531 if not self._authenticated:
532 await self._juju_login()
533 # TODO
534 self.log.info('Method not implemented yet')
535 raise NotImplemented()
536
537 async def delete_namespace(
538 self,
539 namespace: str,
540 db_dict: dict = None,
541 total_timeout: float = None
542 ):
543 self.log.info('Deleting namespace={}'.format(namespace))
544
545 if not self._authenticated:
546 await self._juju_login()
547
548 # check arguments
549 if namespace is None:
550 raise N2VCBadArgumentsException(message='namespace is mandatory', bad_args=['namespace'])
551
552 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
553 if ns_id is not None:
554 try:
555 await self._juju_destroy_model(
556 model_name=ns_id,
557 total_timeout=total_timeout
558 )
559 except N2VCNotFound:
560 raise
561 except Exception as e:
562 raise N2VCException(message='Error deleting namespace {} : {}'.format(namespace, e))
563 else:
564 raise N2VCBadArgumentsException(message='only ns_id is permitted to delete yet', bad_args=['namespace'])
565
566 self.log.info('Namespace {} deleted'.format(namespace))
567
568 async def delete_execution_environment(
569 self,
570 ee_id: str,
571 db_dict: dict = None,
572 total_timeout: float = None
573 ):
574 self.log.info('Deleting execution environment ee_id={}'.format(ee_id))
575
576 if not self._authenticated:
577 await self._juju_login()
578
579 # check arguments
580 if ee_id is None:
581 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
582
583 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=ee_id)
584
585 # destroy the application
586 try:
587 await self._juju_destroy_application(model_name=model_name, application_name=application_name)
588 except Exception as e:
589 raise N2VCException(message='Error deleting execution environment {} (application {}) : {}'
590 .format(ee_id, application_name, e))
591
592 # destroy the machine
593 # try:
594 # await self._juju_destroy_machine(
595 # model_name=model_name,
596 # machine_id=machine_id,
597 # total_timeout=total_timeout
598 # )
599 # except Exception as e:
600 # raise N2VCException(message='Error deleting execution environment {} (machine {}) : {}'
601 # .format(ee_id, machine_id, e))
602
603 self.log.info('Execution environment {} deleted'.format(ee_id))
604
605 async def exec_primitive(
606 self,
607 ee_id: str,
608 primitive_name: str,
609 params_dict: dict,
610 db_dict: dict = None,
611 progress_timeout: float = None,
612 total_timeout: float = None
613 ) -> str:
614
615 self.log.info('Executing primitive: {} on ee: {}, params: {}'.format(primitive_name, ee_id, params_dict))
616
617 if not self._authenticated:
618 await self._juju_login()
619
620 # check arguments
621 if ee_id is None or len(ee_id) == 0:
622 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
623 if primitive_name is None or len(primitive_name) == 0:
624 raise N2VCBadArgumentsException(message='action_name is mandatory', bad_args=['action_name'])
625 if params_dict is None:
626 params_dict = dict()
627
628 try:
629 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
630 except Exception:
631 raise N2VCBadArgumentsException(
632 message='ee_id={} is not a valid execution environment id'.format(ee_id),
633 bad_args=['ee_id']
634 )
635
636 if primitive_name == 'config':
637 # Special case: config primitive
638 try:
639 await self._juju_configure_application(
640 model_name=model_name,
641 application_name=application_name,
642 config=params_dict,
643 db_dict=db_dict,
644 progress_timeout=progress_timeout,
645 total_timeout=total_timeout
646 )
647 except Exception as e:
648 self.log.error('Error configuring juju application: {}'.format(e))
649 raise N2VCExecutionException(
650 message='Error configuring application into ee={} : {}'.format(ee_id, e),
651 primitive_name=primitive_name
652 )
653 return 'CONFIG OK'
654 else:
655 try:
656 output, status = await self._juju_execute_action(
657 model_name=model_name,
658 application_name=application_name,
659 action_name=primitive_name,
660 db_dict=db_dict,
661 progress_timeout=progress_timeout,
662 total_timeout=total_timeout,
663 **params_dict
664 )
665 if status == 'completed':
666 return output
667 else:
668 raise Exception('status is not completed: {}'.format(status))
669 except Exception as e:
670 self.log.error('Error executing primitive {}: {}'.format(primitive_name, e))
671 raise N2VCExecutionException(
672 message='Error executing primitive {} into ee={} : {}'.format(primitive_name, ee_id, e),
673 primitive_name=primitive_name
674 )
675
676 async def disconnect(self):
677 self.log.info('closing juju N2VC...')
678 await self._juju_logout()
679
680 """
681 ##################################################################################################
682 ########################################## P R I V A T E #########################################
683 ##################################################################################################
684 """
685
686 def _write_ee_id_db(
687 self,
688 db_dict: dict,
689 ee_id: str
690 ):
691
692 # write ee_id to database: _admin.deployed.VCA.x
693 try:
694 the_table = db_dict['collection']
695 the_filter = db_dict['filter']
696 the_path = db_dict['path']
697 if not the_path[-1] == '.':
698 the_path = the_path + '.'
699 update_dict = {the_path + 'ee_id': ee_id}
700 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
701 self.db.set_one(
702 table=the_table,
703 q_filter=the_filter,
704 update_dict=update_dict,
705 fail_on_empty=True
706 )
707 except Exception as e:
708 self.log.error('Error writing ee_id to database: {}'.format(e))
709
710 @staticmethod
711 def _build_ee_id(
712 model_name: str,
713 application_name: str,
714 machine_id: str
715 ):
716 """
717 Build an execution environment id form model, application and machine
718 :param model_name:
719 :param application_name:
720 :param machine_id:
721 :return:
722 """
723 # id for the execution environment
724 return '{}.{}.{}'.format(model_name, application_name, machine_id)
725
726 @staticmethod
727 def _get_ee_id_components(
728 ee_id: str
729 ) -> (str, str, str):
730 """
731 Get model, application and machine components from an execution environment id
732 :param ee_id:
733 :return: model_name, application_name, machine_id
734 """
735
736 if ee_id is None:
737 return None, None, None
738
739 # split components of id
740 parts = ee_id.split('.')
741 model_name = parts[0]
742 application_name = parts[1]
743 machine_id = parts[2]
744 return model_name, application_name, machine_id
745
746 def _get_application_name(self, namespace: str) -> str:
747 """
748 Build application name from namespace
749 :param namespace:
750 :return: app-vnf-<vnf id>-vdu-<vdu-id>-cnt-<vdu-count>
751 """
752
753 # TODO: Enforce the Juju 50-character application limit
754
755 # split namespace components
756 _, _, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
757
758 if vnf_id is None or len(vnf_id) == 0:
759 vnf_id = ''
760 else:
761 # Shorten the vnf_id to its last twelve characters
762 vnf_id = 'vnf-' + vnf_id[-12:]
763
764 if vdu_id is None or len(vdu_id) == 0:
765 vdu_id = ''
766 else:
767 # Shorten the vdu_id to its last twelve characters
768 vdu_id = '-vdu-' + vdu_id[-12:]
769
770 if vdu_count is None or len(vdu_count) == 0:
771 vdu_count = ''
772 else:
773 vdu_count = '-cnt-' + vdu_count
774
775 application_name = 'app-{}{}{}'.format(vnf_id, vdu_id, vdu_count)
776
777 return N2VCJujuConnector._format_app_name(application_name)
778
779 async def _juju_create_machine(
780 self,
781 model_name: str,
782 application_name: str,
783 machine_id: str = None,
784 db_dict: dict = None,
785 progress_timeout: float = None,
786 total_timeout: float = None
787 ) -> Machine:
788
789 self.log.debug('creating machine in model: {}, existing machine id: {}'.format(model_name, machine_id))
790
791 # get juju model and observer (create model if needed)
792 model = await self._juju_get_model(model_name=model_name)
793 observer = self.juju_observers[model_name]
794
795 # find machine id in model
796 machine = None
797 if machine_id is not None:
798 self.log.debug('Finding existing machine id {} in model'.format(machine_id))
799 # get juju existing machines in the model
800 existing_machines = await model.get_machines()
801 if machine_id in existing_machines:
802 self.log.debug('Machine id {} found in model (reusing it)'.format(machine_id))
803 machine = model.machines[machine_id]
804
805 if machine is None:
806 self.log.debug('Creating a new machine in juju...')
807 # machine does not exist, create it and wait for it
808 machine = await model.add_machine(
809 spec=None,
810 constraints=None,
811 disks=None,
812 series='xenial'
813 )
814
815 # register machine with observer
816 observer.register_machine(machine=machine, db_dict=db_dict)
817
818 # id for the execution environment
819 ee_id = N2VCJujuConnector._build_ee_id(
820 model_name=model_name,
821 application_name=application_name,
822 machine_id=str(machine.entity_id)
823 )
824
825 # write ee_id in database
826 self._write_ee_id_db(
827 db_dict=db_dict,
828 ee_id=ee_id
829 )
830
831 # wait for machine creation
832 await observer.wait_for_machine(
833 machine_id=str(machine.entity_id),
834 progress_timeout=progress_timeout,
835 total_timeout=total_timeout
836 )
837
838 else:
839
840 self.log.debug('Reusing old machine pending')
841
842 # register machine with observer
843 observer.register_machine(machine=machine, db_dict=db_dict)
844
845 # machine does exist, but it is in creation process (pending), wait for create finalisation
846 await observer.wait_for_machine(
847 machine_id=machine.entity_id,
848 progress_timeout=progress_timeout,
849 total_timeout=total_timeout)
850
851 self.log.debug("Machine ready at " + str(machine.dns_name))
852 return machine
853
854 async def _juju_provision_machine(
855 self,
856 model_name: str,
857 hostname: str,
858 username: str,
859 private_key_path: str,
860 db_dict: dict = None,
861 progress_timeout: float = None,
862 total_timeout: float = None
863 ) -> str:
864
865 if not self.api_proxy:
866 msg = 'Cannot provision machine: api_proxy is not defined'
867 self.log.error(msg=msg)
868 raise N2VCException(message=msg)
869
870 self.log.debug('provisioning machine. model: {}, hostname: {}, username: {}'.format(model_name, hostname, username))
871
872 if not self._authenticated:
873 await self._juju_login()
874
875 # get juju model and observer
876 model = await self._juju_get_model(model_name=model_name)
877 observer = self.juju_observers[model_name]
878
879 # TODO check if machine is already provisioned
880 machine_list = await model.get_machines()
881
882 provisioner = SSHProvisioner(
883 host=hostname,
884 user=username,
885 private_key_path=private_key_path,
886 log=self.log
887 )
888
889 params = None
890 try:
891 params = provisioner.provision_machine()
892 except Exception as ex:
893 msg = "Exception provisioning machine: {}".format(ex)
894 self.log.error(msg)
895 raise N2VCException(message=msg)
896
897 params.jobs = ['JobHostUnits']
898
899 connection = model.connection()
900
901 # Submit the request.
902 self.log.debug("Adding machine to model")
903 client_facade = client.ClientFacade.from_connection(connection)
904 results = await client_facade.AddMachines(params=[params])
905 error = results.machines[0].error
906 if error:
907 msg = "Error adding machine: {}}".format(error.message)
908 self.log.error(msg=msg)
909 raise ValueError(msg)
910
911 machine_id = results.machines[0].machine
912
913 # Need to run this after AddMachines has been called,
914 # as we need the machine_id
915 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
916 asyncio.ensure_future(provisioner.install_agent(
917 connection=connection,
918 nonce=params.nonce,
919 machine_id=machine_id,
920 api=self.api_proxy,
921 ))
922
923 # wait for machine in model (now, machine is not yet in model, so we must wait for it)
924 machine = None
925 for i in range(10):
926 machine_list = await model.get_machines()
927 if machine_id in machine_list:
928 self.log.debug('Machine {} found in model!'.format(machine_id))
929 machine = model.machines.get(machine_id)
930 break
931 await asyncio.sleep(2)
932
933 if machine is None:
934 msg = 'Machine {} not found in model'.format(machine_id)
935 self.log.error(msg=msg)
936 raise Exception(msg)
937
938 # register machine with observer
939 observer.register_machine(machine=machine, db_dict=db_dict)
940
941 # wait for machine creation
942 self.log.debug('waiting for provision finishes... {}'.format(machine_id))
943 await observer.wait_for_machine(
944 machine_id=machine_id,
945 progress_timeout=progress_timeout,
946 total_timeout=total_timeout
947 )
948
949 self.log.debug("Machine provisioned {}".format(machine_id))
950
951 return machine_id
952
953 async def _juju_deploy_charm(
954 self,
955 model_name: str,
956 application_name: str,
957 charm_path: str,
958 machine_id: str,
959 db_dict: dict,
960 progress_timeout: float = None,
961 total_timeout: float = None,
962 config: dict = None
963 ) -> (Application, int):
964
965 # get juju model and observer
966 model = await self._juju_get_model(model_name=model_name)
967 observer = self.juju_observers[model_name]
968
969 # check if application already exists
970 application = None
971 if application_name in model.applications:
972 application = model.applications[application_name]
973
974 if application is None:
975
976 # application does not exist, create it and wait for it
977 self.log.debug('deploying application {} to machine {}, model {}'
978 .format(application_name, machine_id, model_name))
979 self.log.debug('charm: {}'.format(charm_path))
980 series = 'xenial'
981 # series = None
982 application = await model.deploy(
983 entity_url=charm_path,
984 application_name=application_name,
985 channel='stable',
986 num_units=1,
987 series=series,
988 to=machine_id,
989 config=config
990 )
991
992 # register application with observer
993 observer.register_application(application=application, db_dict=db_dict)
994
995 self.log.debug('waiting for application deployed... {}'.format(application.entity_id))
996 retries = await observer.wait_for_application(
997 application_id=application.entity_id,
998 progress_timeout=progress_timeout,
999 total_timeout=total_timeout)
1000 self.log.debug('application deployed')
1001
1002 else:
1003
1004 # register application with observer
1005 observer.register_application(application=application, db_dict=db_dict)
1006
1007 # application already exists, but not finalised
1008 self.log.debug('application already exists, waiting for deployed...')
1009 retries = await observer.wait_for_application(
1010 application_id=application.entity_id,
1011 progress_timeout=progress_timeout,
1012 total_timeout=total_timeout)
1013 self.log.debug('application deployed')
1014
1015 return application, retries
1016
1017 async def _juju_execute_action(
1018 self,
1019 model_name: str,
1020 application_name: str,
1021 action_name: str,
1022 db_dict: dict,
1023 progress_timeout: float = None,
1024 total_timeout: float = None,
1025 **kwargs
1026 ) -> Action:
1027
1028 # get juju model and observer
1029 model = await self._juju_get_model(model_name=model_name)
1030 observer = self.juju_observers[model_name]
1031
1032 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
1033
1034 unit = None
1035 for u in application.units:
1036 if await u.is_leader_from_status():
1037 unit = u
1038 if unit is not None:
1039 actions = await application.get_actions()
1040 if action_name in actions:
1041 self.log.debug('executing action "{}" using params: {}'.format(action_name, kwargs))
1042 action = await unit.run_action(action_name, **kwargs)
1043
1044 # register action with observer
1045 observer.register_action(action=action, db_dict=db_dict)
1046
1047 await observer.wait_for_action(
1048 action_id=action.entity_id,
1049 progress_timeout=progress_timeout,
1050 total_timeout=total_timeout)
1051 self.log.debug('action completed with status: {}'.format(action.status))
1052 output = await model.get_action_output(action_uuid=action.entity_id)
1053 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1054 if action.entity_id in status:
1055 status = status[action.entity_id]
1056 else:
1057 status = 'failed'
1058 return output, status
1059
1060 raise N2VCExecutionException(
1061 message='Cannot execute action on charm',
1062 primitive_name=action_name
1063 )
1064
1065 async def _juju_configure_application(
1066 self,
1067 model_name: str,
1068 application_name: str,
1069 config: dict,
1070 db_dict: dict,
1071 progress_timeout: float = None,
1072 total_timeout: float = None
1073 ):
1074
1075 # get the application
1076 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
1077
1078 self.log.debug('configuring the application {} -> {}'.format(application_name, config))
1079 res = await application.set_config(config)
1080 self.log.debug('application {} configured. res={}'.format(application_name, res))
1081
1082 # Verify the config is set
1083 new_conf = await application.get_config()
1084 for key in config:
1085 value = new_conf[key]['value']
1086 self.log.debug(' {} = {}'.format(key, value))
1087 if config[key] != value:
1088 raise N2VCException(
1089 message='key {} is not configured correctly {} != {}'.format(key, config[key], new_conf[key])
1090 )
1091
1092 # check if 'verify-ssh-credentials' action exists
1093 # unit = application.units[0]
1094 actions = await application.get_actions()
1095 if 'verify-ssh-credentials' not in actions:
1096 msg = 'Action verify-ssh-credentials does not exist in application {}'.format(application_name)
1097 self.log.debug(msg=msg)
1098 return False
1099
1100 # execute verify-credentials
1101 num_retries = 20
1102 retry_timeout = 15.0
1103 for i in range(num_retries):
1104 try:
1105 self.log.debug('Executing action verify-ssh-credentials...')
1106 output, ok = await self._juju_execute_action(
1107 model_name=model_name,
1108 application_name=application_name,
1109 action_name='verify-ssh-credentials',
1110 db_dict=db_dict,
1111 progress_timeout=progress_timeout,
1112 total_timeout=total_timeout
1113 )
1114 self.log.debug('Result: {}, output: {}'.format(ok, output))
1115 return True
1116 except Exception as e:
1117 self.log.debug('Error executing verify-ssh-credentials: {}. Retrying...'.format(e))
1118 await asyncio.sleep(retry_timeout)
1119 else:
1120 self.log.error('Error executing verify-ssh-credentials after {} retries. '.format(num_retries))
1121 return False
1122
1123 async def _juju_get_application(
1124 self,
1125 model_name: str,
1126 application_name: str
1127 ):
1128 """Get the deployed application."""
1129
1130 model = await self._juju_get_model(model_name=model_name)
1131
1132 application_name = N2VCJujuConnector._format_app_name(application_name)
1133
1134 if model.applications and application_name in model.applications:
1135 return model.applications[application_name]
1136 else:
1137 raise N2VCException(message='Cannot get application {} from model {}'.format(application_name, model_name))
1138
1139 async def _juju_get_model(self, model_name: str) -> Model:
1140 """ Get a model object from juju controller
1141 If the model does not exits, it creates it.
1142
1143 :param str model_name: name of the model
1144 :returns Model: model obtained from juju controller or Exception
1145 """
1146
1147 # format model name
1148 model_name = N2VCJujuConnector._format_model_name(model_name)
1149
1150 if model_name in self.juju_models:
1151 return self.juju_models[model_name]
1152
1153 if self._creating_model:
1154 self.log.debug('Another coroutine is creating a model. Wait...')
1155 while self._creating_model:
1156 # another coroutine is creating a model, wait
1157 await asyncio.sleep(0.1)
1158 # retry (perhaps another coroutine has created the model meanwhile)
1159 if model_name in self.juju_models:
1160 return self.juju_models[model_name]
1161
1162 try:
1163 self._creating_model = True
1164
1165 # get juju model names from juju
1166 model_list = await self.controller.list_models()
1167
1168 if model_name not in model_list:
1169 self.log.info('Model {} does not exist. Creating new model...'.format(model_name))
1170 config_dict = {'authorized-keys': self.public_key}
1171 if self.apt_mirror:
1172 config_dict['apt-mirror'] = self.apt_mirror
1173 if not self.enable_os_upgrade:
1174 config_dict['enable-os-refresh-update'] = False
1175 config_dict['enable-os-upgrade'] = False
1176
1177 model = await self.controller.add_model(
1178 model_name=model_name,
1179 config=config_dict,
1180 cloud_name=self.cloud,
1181 )
1182 self.log.info('New model created, name={}'.format(model_name))
1183 else:
1184 self.log.debug('Model already exists in juju. Getting model {}'.format(model_name))
1185 model = await self.controller.get_model(model_name)
1186 self.log.debug('Existing model in juju, name={}'.format(model_name))
1187
1188 self.juju_models[model_name] = model
1189 self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model)
1190 return model
1191
1192 except Exception as e:
1193 msg = 'Cannot get model {}. Exception: {}'.format(model_name, e)
1194 self.log.error(msg)
1195 raise N2VCException(msg)
1196 finally:
1197 self._creating_model = False
1198
1199 async def _juju_add_relation(
1200 self,
1201 model_name: str,
1202 application_name_1: str,
1203 application_name_2: str,
1204 relation_1: str,
1205 relation_2: str
1206 ):
1207
1208 # get juju model and observer
1209 model = await self._juju_get_model(model_name=model_name)
1210
1211 r1 = '{}:{}'.format(application_name_1, relation_1)
1212 r2 = '{}:{}'.format(application_name_2, relation_2)
1213
1214 self.log.debug('adding relation: {} -> {}'.format(r1, r2))
1215 try:
1216 await model.add_relation(relation1=r1, relation2=r2)
1217 except JujuAPIError as e:
1218 # If one of the applications in the relationship doesn't exist, or the relation has already been added,
1219 # let the operation fail silently.
1220 if 'not found' in e.message:
1221 return
1222 if 'already exists' in e.message:
1223 return
1224 # another execption, raise it
1225 raise e
1226
1227 async def _juju_destroy_application(
1228 self,
1229 model_name: str,
1230 application_name: str
1231 ):
1232
1233 self.log.debug('Destroying application {} in model {}'.format(application_name, model_name))
1234
1235 # get juju model and observer
1236 model = await self._juju_get_model(model_name=model_name)
1237 observer = self.juju_observers[model_name]
1238
1239 application = model.applications.get(application_name)
1240 if application:
1241 observer.unregister_application(application_name)
1242 await application.destroy()
1243 else:
1244 self.log.debug('Application not found: {}'.format(application_name))
1245
1246 async def _juju_destroy_machine(
1247 self,
1248 model_name: str,
1249 machine_id: str,
1250 total_timeout: float = None
1251 ):
1252
1253 self.log.debug('Destroying machine {} in model {}'.format(machine_id, model_name))
1254
1255 if total_timeout is None:
1256 total_timeout = 3600
1257
1258 # get juju model and observer
1259 model = await self._juju_get_model(model_name=model_name)
1260 observer = self.juju_observers[model_name]
1261
1262 machines = await model.get_machines()
1263 if machine_id in machines:
1264 machine = model.machines[machine_id]
1265 observer.unregister_machine(machine_id)
1266 # TODO: change this by machine.is_manual when this is upstreamed: https://github.com/juju/python-libjuju/pull/396
1267 if "instance-id" in machine.safe_data and machine.safe_data[
1268 "instance-id"
1269 ].startswith("manual:"):
1270 self.log.debug("machine.destroy(force=True) started.")
1271 await machine.destroy(force=True)
1272 self.log.debug("machine.destroy(force=True) passed.")
1273 # max timeout
1274 end = time.time() + total_timeout
1275 # wait for machine removal
1276 machines = await model.get_machines()
1277 while machine_id in machines and time.time() < end:
1278 self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
1279 await asyncio.sleep(0.5)
1280 machines = await model.get_machines()
1281 self.log.debug("Machine destroyed: {}".format(machine_id))
1282 else:
1283 self.log.debug('Machine not found: {}'.format(machine_id))
1284
1285 async def _juju_destroy_model(
1286 self,
1287 model_name: str,
1288 total_timeout: float = None
1289 ):
1290
1291 self.log.debug('Destroying model {}'.format(model_name))
1292
1293 if total_timeout is None:
1294 total_timeout = 3600
1295
1296 model = await self._juju_get_model(model_name=model_name)
1297
1298 if not model:
1299 raise N2VCNotFound(
1300 message="Model {} does not exist".format(model_name)
1301 )
1302
1303 uuid = model.info.uuid
1304
1305 # destroy applications
1306 for application_name in model.applications:
1307 try:
1308 await self._juju_destroy_application(model_name=model_name, application_name=application_name)
1309 except Exception as e:
1310 self.log.error(
1311 "Error destroying application {} in model {}: {}".format(
1312 application_name,
1313 model_name,
1314 e
1315 )
1316 )
1317
1318 # destroy machines
1319 machines = await model.get_machines()
1320 for machine_id in machines:
1321 try:
1322 await self._juju_destroy_machine(model_name=model_name, machine_id=machine_id)
1323 except Exception as e:
1324 # ignore exceptions destroying machine
1325 pass
1326
1327 await self._juju_disconnect_model(model_name=model_name)
1328
1329 self.log.debug('destroying model {}...'.format(model_name))
1330 await self.controller.destroy_model(uuid)
1331 self.log.debug('model destroy requested {}'.format(model_name))
1332
1333 # wait for model is completely destroyed
1334 end = time.time() + total_timeout
1335 while time.time() < end:
1336 self.log.debug('Waiting for model is destroyed...')
1337 try:
1338 # await self.controller.get_model(uuid)
1339 models = await self.controller.list_models()
1340 if model_name not in models:
1341 self.log.debug('The model {} ({}) was destroyed'.format(model_name, uuid))
1342 return
1343 except Exception as e:
1344 pass
1345 await asyncio.sleep(1.0)
1346
1347 async def _juju_login(self):
1348 """Connect to juju controller
1349
1350 """
1351
1352 # if already authenticated, exit function
1353 if self._authenticated:
1354 return
1355
1356 # if connecting, wait for finish
1357 # another task could be trying to connect in parallel
1358 while self._connecting:
1359 await asyncio.sleep(0.1)
1360
1361 # double check after other task has finished
1362 if self._authenticated:
1363 return
1364
1365 try:
1366 self._connecting = True
1367 self.log.info(
1368 'connecting to juju controller: {} {}:{} ca_cert: {}'
1369 .format(self.url, self.username, self.secret, '\n'+self.ca_cert if self.ca_cert else 'None'))
1370
1371 # Create controller object
1372 self.controller = Controller(loop=self.loop)
1373 # Connect to controller
1374 await self.controller.connect(
1375 endpoint=self.url,
1376 username=self.username,
1377 password=self.secret,
1378 cacert=self.ca_cert
1379 )
1380 self._authenticated = True
1381 self.log.info('juju controller connected')
1382 except Exception as e:
1383 message = 'Exception connecting to juju: {}'.format(e)
1384 self.log.error(message)
1385 raise N2VCConnectionException(
1386 message=message,
1387 url=self.url
1388 )
1389 finally:
1390 self._connecting = False
1391
1392 async def _juju_logout(self):
1393 """Logout of the Juju controller."""
1394 if not self._authenticated:
1395 return False
1396
1397 # disconnect all models
1398 for model_name in self.juju_models:
1399 try:
1400 await self._juju_disconnect_model(model_name)
1401 except Exception as e:
1402 self.log.error('Error disconnecting model {} : {}'.format(model_name, e))
1403 # continue with next model...
1404
1405 self.log.info("Disconnecting controller")
1406 try:
1407 await self.controller.disconnect()
1408 except Exception as e:
1409 raise N2VCConnectionException(message='Error disconnecting controller: {}'.format(e), url=self.url)
1410
1411 self.controller = None
1412 self._authenticated = False
1413 self.log.info('disconnected')
1414
1415 async def _juju_disconnect_model(
1416 self,
1417 model_name: str
1418 ):
1419 self.log.debug("Disconnecting model {}".format(model_name))
1420 if model_name in self.juju_models:
1421 await self.juju_models[model_name].disconnect()
1422 self.juju_models[model_name] = None
1423 self.juju_observers[model_name] = None
1424 else:
1425 self.warning('Cannot disconnect model: {}'.format(model_name))
1426
1427 def _create_juju_public_key(self):
1428 """Recreate the Juju public key on lcm container, if needed
1429 Certain libjuju commands expect to be run from the same machine as Juju
1430 is bootstrapped to. This method will write the public key to disk in
1431 that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
1432 """
1433
1434 # Make sure that we have a public key before writing to disk
1435 if self.public_key is None or len(self.public_key) == 0:
1436 if 'OSMLCM_VCA_PUBKEY' in os.environ:
1437 self.public_key = os.getenv('OSMLCM_VCA_PUBKEY', '')
1438 if len(self.public_key) == 0:
1439 return
1440 else:
1441 return
1442
1443 pk_path = "{}/.local/share/juju/ssh".format(os.path.expanduser('~'))
1444 file_path = "{}/juju_id_rsa.pub".format(pk_path)
1445 self.log.debug('writing juju public key to file:\n{}\npublic key: {}'.format(file_path, self.public_key))
1446 if not os.path.exists(pk_path):
1447 # create path and write file
1448 os.makedirs(pk_path)
1449 with open(file_path, 'w') as f:
1450 self.log.debug('Creating juju public key file: {}'.format(file_path))
1451 f.write(self.public_key)
1452 else:
1453 self.log.debug('juju public key file already exists: {}'.format(file_path))
1454
1455 @staticmethod
1456 def _format_model_name(name: str) -> str:
1457 """Format the name of the model.
1458
1459 Model names may only contain lowercase letters, digits and hyphens
1460 """
1461
1462 return name.replace('_', '-').replace(' ', '-').lower()
1463
1464 @staticmethod
1465 def _format_app_name(name: str) -> str:
1466 """Format the name of the application (in order to assure valid application name).
1467
1468 Application names have restrictions (run juju deploy --help):
1469 - contains lowercase letters 'a'-'z'
1470 - contains numbers '0'-'9'
1471 - contains hyphens '-'
1472 - starts with a lowercase letter
1473 - not two or more consecutive hyphens
1474 - after a hyphen, not a group with all numbers
1475 """
1476
1477 def all_numbers(s: str) -> bool:
1478 for c in s:
1479 if not c.isdigit():
1480 return False
1481 return True
1482
1483 new_name = name.replace('_', '-')
1484 new_name = new_name.replace(' ', '-')
1485 new_name = new_name.lower()
1486 while new_name.find('--') >= 0:
1487 new_name = new_name.replace('--', '-')
1488 groups = new_name.split('-')
1489
1490 # find 'all numbers' groups and prefix them with a letter
1491 app_name = ''
1492 for i in range(len(groups)):
1493 group = groups[i]
1494 if all_numbers(group):
1495 group = 'z' + group
1496 if i > 0:
1497 app_name += '-'
1498 app_name += group
1499
1500 if app_name[0].isdigit():
1501 app_name = 'z' + app_name
1502
1503 return app_name