00b1bc156fa026e4d8d2ea1d791a6650208e07df
[osm/N2VC.git] / n2vc / n2vc_juju_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import logging
24 import os
25 import asyncio
26 import time
27 import base64
28 import binascii
29 import re
30
31 from n2vc.n2vc_conn import N2VCConnector
32 from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
33 from n2vc.exceptions \
34 import N2VCBadArgumentsException, N2VCException, N2VCConnectionException, \
35 N2VCExecutionException, N2VCInvalidCertificate
36 from n2vc.juju_observer import JujuModelObserver
37
38 from juju.controller import Controller
39 from juju.model import Model
40 from juju.application import Application
41 from juju.action import Action
42 from juju.machine import Machine
43 from juju.client import client
44
45 from n2vc.provisioner import SSHProvisioner
46
47
48 class N2VCJujuConnector(N2VCConnector):
49
50 """
51 ##################################################################################################
52 ########################################## P U B L I C ###########################################
53 ##################################################################################################
54 """
55
56 def __init__(
57 self,
58 db: object,
59 fs: object,
60 log: object = None,
61 loop: object = None,
62 url: str = '127.0.0.1:17070',
63 username: str = 'admin',
64 vca_config: dict = None,
65 on_update_db=None
66 ):
67 """Initialize juju N2VC connector
68 """
69
70 # parent class constructor
71 N2VCConnector.__init__(
72 self,
73 db=db,
74 fs=fs,
75 log=log,
76 loop=loop,
77 url=url,
78 username=username,
79 vca_config=vca_config,
80 on_update_db=on_update_db
81 )
82
83 # silence websocket traffic log
84 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
85 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
86 logging.getLogger('model').setLevel(logging.WARN)
87
88 self.info('Initializing N2VC juju connector...')
89
90 """
91 ##############################################################
92 # check arguments
93 ##############################################################
94 """
95
96 # juju URL
97 if url is None:
98 raise N2VCBadArgumentsException('Argument url is mandatory', ['url'])
99 url_parts = url.split(':')
100 if len(url_parts) != 2:
101 raise N2VCBadArgumentsException('Argument url: bad format (localhost:port) -> {}'.format(url), ['url'])
102 self.hostname = url_parts[0]
103 try:
104 self.port = int(url_parts[1])
105 except ValueError:
106 raise N2VCBadArgumentsException('url port must be a number -> {}'.format(url), ['url'])
107
108 # juju USERNAME
109 if username is None:
110 raise N2VCBadArgumentsException('Argument username is mandatory', ['username'])
111
112 # juju CONFIGURATION
113 if vca_config is None:
114 raise N2VCBadArgumentsException('Argument vca_config is mandatory', ['vca_config'])
115
116 if 'secret' in vca_config:
117 self.secret = vca_config['secret']
118 else:
119 raise N2VCBadArgumentsException('Argument vca_config.secret is mandatory', ['vca_config.secret'])
120
121 # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub
122 # if exists, it will be written in lcm container: _create_juju_public_key()
123 if 'public_key' in vca_config:
124 self.public_key = vca_config['public_key']
125 else:
126 self.public_key = None
127
128 # TODO: Verify ca_cert is valid before using. VCA will crash
129 # if the ca_cert isn't formatted correctly.
130 def base64_to_cacert(b64string):
131 """Convert the base64-encoded string containing the VCA CACERT.
132
133 The input string....
134
135 """
136 try:
137 cacert = base64.b64decode(b64string).decode("utf-8")
138
139 cacert = re.sub(
140 r'\\n',
141 r'\n',
142 cacert,
143 )
144 except binascii.Error as e:
145 self.debug("Caught binascii.Error: {}".format(e))
146 raise N2VCInvalidCertificate(message="Invalid CA Certificate")
147
148 return cacert
149
150 self.ca_cert = vca_config.get('ca_cert')
151 if self.ca_cert:
152 self.ca_cert = base64_to_cacert(vca_config['ca_cert'])
153
154 if 'api_proxy' in vca_config:
155 self.api_proxy = vca_config['api_proxy']
156 self.debug('api_proxy for native charms configured: {}'.format(self.api_proxy))
157 else:
158 self.warning('api_proxy is not configured. Support for native charms is disabled')
159
160 if 'enable_os_upgrade' in vca_config:
161 self.enable_os_upgrade = vca_config['enable_os_upgrade']
162 else:
163 self.enable_os_upgrade = True
164
165 if 'apt_mirror' in vca_config:
166 self.apt_mirror = vca_config['apt_mirror']
167 else:
168 self.apt_mirror = None
169
170 self.debug('Arguments have been checked')
171
172 # juju data
173 self.controller = None # it will be filled when connect to juju
174 self.juju_models = {} # model objects for every model_name
175 self.juju_observers = {} # model observers for every model_name
176 self._connecting = False # while connecting to juju (to avoid duplicate connections)
177 self._authenticated = False # it will be True when juju connection be stablished
178 self._creating_model = False # True during model creation
179
180 # create juju pub key file in lcm container at ./local/share/juju/ssh/juju_id_rsa.pub
181 self._create_juju_public_key()
182
183 self.info('N2VC juju connector initialized')
184
185 async def get_status(self, namespace: str, yaml_format: bool = True):
186
187 # self.info('Getting NS status. namespace: {}'.format(namespace))
188
189 if not self._authenticated:
190 await self._juju_login()
191
192 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
193 # model name is ns_id
194 model_name = ns_id
195 if model_name is None:
196 msg = 'Namespace {} not valid'.format(namespace)
197 self.error(msg)
198 raise N2VCBadArgumentsException(msg, ['namespace'])
199
200 # get juju model (create model if needed)
201 model = await self._juju_get_model(model_name=model_name)
202
203 status = await model.get_status()
204
205 if yaml_format:
206 return obj_to_yaml(status)
207 else:
208 return obj_to_dict(status)
209
210 async def create_execution_environment(
211 self,
212 namespace: str,
213 db_dict: dict,
214 reuse_ee_id: str = None,
215 progress_timeout: float = None,
216 total_timeout: float = None
217 ) -> (str, dict):
218
219 self.info('Creating execution environment. namespace: {}, reuse_ee_id: {}'.format(namespace, reuse_ee_id))
220
221 if not self._authenticated:
222 await self._juju_login()
223
224 machine_id = None
225 if reuse_ee_id:
226 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=reuse_ee_id)
227 else:
228 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
229 # model name is ns_id
230 model_name = ns_id
231 # application name
232 application_name = self._get_application_name(namespace=namespace)
233
234 self.debug('model name: {}, application name: {}, machine_id: {}'
235 .format(model_name, application_name, machine_id))
236
237 # create or reuse a new juju machine
238 try:
239 machine = await self._juju_create_machine(
240 model_name=model_name,
241 application_name=application_name,
242 machine_id=machine_id,
243 db_dict=db_dict,
244 progress_timeout=progress_timeout,
245 total_timeout=total_timeout
246 )
247 except Exception as e:
248 message = 'Error creating machine on juju: {}'.format(e)
249 self.error(message)
250 raise N2VCException(message=message)
251
252 # id for the execution environment
253 ee_id = N2VCJujuConnector._build_ee_id(
254 model_name=model_name,
255 application_name=application_name,
256 machine_id=str(machine.entity_id)
257 )
258 self.debug('ee_id: {}'.format(ee_id))
259
260 # new machine credentials
261 credentials = dict()
262 credentials['hostname'] = machine.dns_name
263
264 self.info('Execution environment created. ee_id: {}, credentials: {}'.format(ee_id, credentials))
265
266 return ee_id, credentials
267
268 async def register_execution_environment(
269 self,
270 namespace: str,
271 credentials: dict,
272 db_dict: dict,
273 progress_timeout: float = None,
274 total_timeout: float = None
275 ) -> str:
276
277 if not self._authenticated:
278 await self._juju_login()
279
280 self.info('Registering execution environment. namespace={}, credentials={}'.format(namespace, credentials))
281
282 if credentials is None:
283 raise N2VCBadArgumentsException(message='credentials are mandatory', bad_args=['credentials'])
284 if credentials.get('hostname'):
285 hostname = credentials['hostname']
286 else:
287 raise N2VCBadArgumentsException(message='hostname is mandatory', bad_args=['credentials.hostname'])
288 if credentials.get('username'):
289 username = credentials['username']
290 else:
291 raise N2VCBadArgumentsException(message='username is mandatory', bad_args=['credentials.username'])
292 if 'private_key_path' in credentials:
293 private_key_path = credentials['private_key_path']
294 else:
295 # if not passed as argument, use generated private key path
296 private_key_path = self.private_key_path
297
298 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
299
300 # model name
301 model_name = ns_id
302 # application name
303 application_name = self._get_application_name(namespace=namespace)
304
305 # register machine on juju
306 try:
307 machine_id = await self._juju_provision_machine(
308 model_name=model_name,
309 hostname=hostname,
310 username=username,
311 private_key_path=private_key_path,
312 db_dict=db_dict,
313 progress_timeout=progress_timeout,
314 total_timeout=total_timeout
315 )
316 except Exception as e:
317 self.error('Error registering machine: {}'.format(e))
318 raise N2VCException(message='Error registering machine on juju: {}'.format(e))
319
320 self.info('Machine registered: {}'.format(machine_id))
321
322 # id for the execution environment
323 ee_id = N2VCJujuConnector._build_ee_id(
324 model_name=model_name,
325 application_name=application_name,
326 machine_id=str(machine_id)
327 )
328
329 self.info('Execution environment registered. ee_id: {}'.format(ee_id))
330
331 return ee_id
332
333 async def install_configuration_sw(
334 self,
335 ee_id: str,
336 artifact_path: str,
337 db_dict: dict,
338 progress_timeout: float = None,
339 total_timeout: float = None
340 ):
341
342 self.info('Installing configuration sw on ee_id: {}, artifact path: {}, db_dict: {}'
343 .format(ee_id, artifact_path, db_dict))
344
345 if not self._authenticated:
346 await self._juju_login()
347
348 # check arguments
349 if ee_id is None or len(ee_id) == 0:
350 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
351 if artifact_path is None or len(artifact_path) == 0:
352 raise N2VCBadArgumentsException(message='artifact_path is mandatory', bad_args=['artifact_path'])
353 if db_dict is None:
354 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
355
356 try:
357 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
358 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
359 except Exception as e:
360 raise N2VCBadArgumentsException(
361 message='ee_id={} is not a valid execution environment id'.format(ee_id),
362 bad_args=['ee_id']
363 )
364
365 # remove // in charm path
366 while artifact_path.find('//') >= 0:
367 artifact_path = artifact_path.replace('//', '/')
368
369 # check charm path
370 if not self.fs.file_exists(artifact_path, mode="dir"):
371 msg = 'artifact path does not exist: {}'.format(artifact_path)
372 raise N2VCBadArgumentsException(message=msg, bad_args=['artifact_path'])
373
374 if artifact_path.startswith('/'):
375 full_path = self.fs.path + artifact_path
376 else:
377 full_path = self.fs.path + '/' + artifact_path
378
379 try:
380 application, retries = await self._juju_deploy_charm(
381 model_name=model_name,
382 application_name=application_name,
383 charm_path=full_path,
384 machine_id=machine_id,
385 db_dict=db_dict,
386 progress_timeout=progress_timeout,
387 total_timeout=total_timeout
388 )
389 except Exception as e:
390 raise N2VCException(message='Error desploying charm into ee={} : {}'.format(ee_id, e))
391
392 self.info('Configuration sw installed')
393
394 async def get_ee_ssh_public__key(
395 self,
396 ee_id: str,
397 db_dict: dict,
398 progress_timeout: float = None,
399 total_timeout: float = None
400 ) -> str:
401
402 self.info('Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}'.format(ee_id, db_dict))
403
404 if not self._authenticated:
405 await self._juju_login()
406
407 # check arguments
408 if ee_id is None or len(ee_id) == 0:
409 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
410 if db_dict is None:
411 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
412
413 try:
414 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
415 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
416 except Exception as e:
417 raise N2VCBadArgumentsException(
418 message='ee_id={} is not a valid execution environment id'.format(ee_id),
419 bad_args=['ee_id']
420 )
421
422 # try to execute ssh layer primitives (if exist):
423 # generate-ssh-key
424 # get-ssh-public-key
425
426 output = None
427
428 # execute action: generate-ssh-key
429 try:
430 output, status = await self._juju_execute_action(
431 model_name=model_name,
432 application_name=application_name,
433 action_name='generate-ssh-key',
434 db_dict=db_dict,
435 progress_timeout=progress_timeout,
436 total_timeout=total_timeout
437 )
438 except Exception as e:
439 self.info('Cannot execute action generate-ssh-key: {}\nContinuing...'.format(e))
440
441 # execute action: get-ssh-public-key
442 try:
443 output, status = await self._juju_execute_action(
444 model_name=model_name,
445 application_name=application_name,
446 action_name='get-ssh-public-key',
447 db_dict=db_dict,
448 progress_timeout=progress_timeout,
449 total_timeout=total_timeout
450 )
451 except Exception as e:
452 msg = 'Cannot execute action get-ssh-public-key: {}\n'.format(e)
453 self.info(msg)
454 raise e
455
456 # return public key if exists
457 return output["pubkey"] if "pubkey" in output else output
458
459 async def add_relation(
460 self,
461 ee_id_1: str,
462 ee_id_2: str,
463 endpoint_1: str,
464 endpoint_2: str
465 ):
466
467 self.debug('adding new relation between {} and {}, endpoints: {}, {}'
468 .format(ee_id_1, ee_id_2, endpoint_1, endpoint_2))
469
470 if not self._authenticated:
471 await self._juju_login()
472
473 # get model, application and machines
474 model_1, app_1, machine_1 = self._get_ee_id_components(ee_id_1)
475 model_2, app_2, machine_2 = self._get_ee_id_components(ee_id_2)
476
477 # model must be the same
478 if model_1 != model_2:
479 message = 'EE models are not the same: {} vs {}'.format(ee_id_1, ee_id_2)
480 self.error(message)
481 raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_1', 'ee_id_2'])
482
483 # add juju relations between two applications
484 try:
485 self._juju_add_relation()
486 except Exception as e:
487 message = 'Error adding relation between {} and {}'.format(ee_id_1, ee_id_2)
488 self.error(message)
489 raise N2VCException(message=message)
490
491 async def remove_relation(
492 self
493 ):
494 if not self._authenticated:
495 await self._juju_login()
496 # TODO
497 self.info('Method not implemented yet')
498 raise NotImplemented()
499
500 async def deregister_execution_environments(
501 self
502 ):
503 if not self._authenticated:
504 await self._juju_login()
505 # TODO
506 self.info('Method not implemented yet')
507 raise NotImplemented()
508
509 async def delete_namespace(
510 self,
511 namespace: str,
512 db_dict: dict = None,
513 total_timeout: float = None
514 ):
515 self.info('Deleting namespace={}'.format(namespace))
516
517 if not self._authenticated:
518 await self._juju_login()
519
520 # check arguments
521 if namespace is None:
522 raise N2VCBadArgumentsException(message='namespace is mandatory', bad_args=['namespace'])
523
524 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
525 if ns_id is not None:
526 try:
527 await self._juju_destroy_model(
528 model_name=ns_id,
529 total_timeout=total_timeout
530 )
531 except Exception as e:
532 raise N2VCException(message='Error deleting namespace {} : {}'.format(namespace, e))
533 else:
534 raise N2VCBadArgumentsException(message='only ns_id is permitted to delete yet', bad_args=['namespace'])
535
536 self.info('Namespace {} deleted'.format(namespace))
537
538 async def delete_execution_environment(
539 self,
540 ee_id: str,
541 db_dict: dict = None,
542 total_timeout: float = None
543 ):
544 self.info('Deleting execution environment ee_id={}'.format(ee_id))
545
546 if not self._authenticated:
547 await self._juju_login()
548
549 # check arguments
550 if ee_id is None:
551 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
552
553 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=ee_id)
554
555 # destroy the application
556 try:
557 await self._juju_destroy_application(model_name=model_name, application_name=application_name)
558 except Exception as e:
559 raise N2VCException(message='Error deleting execution environment {} (application {}) : {}'
560 .format(ee_id, application_name, e))
561
562 # destroy the machine
563 try:
564 await self._juju_destroy_machine(
565 model_name=model_name,
566 machine_id=machine_id,
567 total_timeout=total_timeout
568 )
569 except Exception as e:
570 raise N2VCException(message='Error deleting execution environment {} (machine {}) : {}'
571 .format(ee_id, machine_id, e))
572
573 self.info('Execution environment {} deleted'.format(ee_id))
574
575 async def exec_primitive(
576 self,
577 ee_id: str,
578 primitive_name: str,
579 params_dict: dict,
580 db_dict: dict = None,
581 progress_timeout: float = None,
582 total_timeout: float = None
583 ) -> str:
584
585 self.info('Executing primitive: {} on ee: {}, params: {}'.format(primitive_name, ee_id, params_dict))
586
587 if not self._authenticated:
588 await self._juju_login()
589
590 # check arguments
591 if ee_id is None or len(ee_id) == 0:
592 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
593 if primitive_name is None or len(primitive_name) == 0:
594 raise N2VCBadArgumentsException(message='action_name is mandatory', bad_args=['action_name'])
595 if params_dict is None:
596 params_dict = dict()
597
598 try:
599 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
600 except Exception:
601 raise N2VCBadArgumentsException(
602 message='ee_id={} is not a valid execution environment id'.format(ee_id),
603 bad_args=['ee_id']
604 )
605
606 if primitive_name == 'config':
607 # Special case: config primitive
608 try:
609 await self._juju_configure_application(
610 model_name=model_name,
611 application_name=application_name,
612 config=params_dict,
613 db_dict=db_dict,
614 progress_timeout=progress_timeout,
615 total_timeout=total_timeout
616 )
617 except Exception as e:
618 self.error('Error configuring juju application: {}'.format(e))
619 raise N2VCExecutionException(
620 message='Error configuring application into ee={} : {}'.format(ee_id, e),
621 primitive_name=primitive_name
622 )
623 return 'CONFIG OK'
624 else:
625 try:
626 output, status = await self._juju_execute_action(
627 model_name=model_name,
628 application_name=application_name,
629 action_name=primitive_name,
630 db_dict=db_dict,
631 progress_timeout=progress_timeout,
632 total_timeout=total_timeout,
633 **params_dict
634 )
635 if status == 'completed':
636 return output
637 else:
638 raise Exception('status is not completed: {}'.format(status))
639 except Exception as e:
640 self.error('Error executing primitive {}: {}'.format(primitive_name, e))
641 raise N2VCExecutionException(
642 message='Error executing primitive {} into ee={} : {}'.format(primitive_name, ee_id, e),
643 primitive_name=primitive_name
644 )
645
646 async def disconnect(self):
647 self.info('closing juju N2VC...')
648 await self._juju_logout()
649
650 """
651 ##################################################################################################
652 ########################################## P R I V A T E #########################################
653 ##################################################################################################
654 """
655
656 def _write_ee_id_db(
657 self,
658 db_dict: dict,
659 ee_id: str
660 ):
661
662 # write ee_id to database: _admin.deployed.VCA.x
663 try:
664 the_table = db_dict['collection']
665 the_filter = db_dict['filter']
666 the_path = db_dict['path']
667 if not the_path[-1] == '.':
668 the_path = the_path + '.'
669 update_dict = {the_path + 'ee_id': ee_id}
670 # self.debug('Writing ee_id to database: {}'.format(the_path))
671 self.db.set_one(
672 table=the_table,
673 q_filter=the_filter,
674 update_dict=update_dict,
675 fail_on_empty=True
676 )
677 except Exception as e:
678 self.error('Error writing ee_id to database: {}'.format(e))
679
680 @staticmethod
681 def _build_ee_id(
682 model_name: str,
683 application_name: str,
684 machine_id: str
685 ):
686 """
687 Build an execution environment id form model, application and machine
688 :param model_name:
689 :param application_name:
690 :param machine_id:
691 :return:
692 """
693 # id for the execution environment
694 return '{}.{}.{}'.format(model_name, application_name, machine_id)
695
696 @staticmethod
697 def _get_ee_id_components(
698 ee_id: str
699 ) -> (str, str, str):
700 """
701 Get model, application and machine components from an execution environment id
702 :param ee_id:
703 :return: model_name, application_name, machine_id
704 """
705
706 if ee_id is None:
707 return None, None, None
708
709 # split components of id
710 parts = ee_id.split('.')
711 model_name = parts[0]
712 application_name = parts[1]
713 machine_id = parts[2]
714 return model_name, application_name, machine_id
715
716 def _get_application_name(self, namespace: str) -> str:
717 """
718 Build application name from namespace
719 :param namespace:
720 :return: app-vnf-<vnf id>-vdu-<vdu-id>-cnt-<vdu-count>
721 """
722
723 # TODO: Enforce the Juju 50-character application limit
724
725 # split namespace components
726 _, _, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
727
728 if vnf_id is None or len(vnf_id) == 0:
729 vnf_id = ''
730 else:
731 # Shorten the vnf_id to its last twelve characters
732 vnf_id = 'vnf-' + vnf_id[-12:]
733
734 if vdu_id is None or len(vdu_id) == 0:
735 vdu_id = ''
736 else:
737 # Shorten the vdu_id to its last twelve characters
738 vdu_id = '-vdu-' + vdu_id[-12:]
739
740 if vdu_count is None or len(vdu_count) == 0:
741 vdu_count = ''
742 else:
743 vdu_count = '-cnt-' + vdu_count
744
745 application_name = 'app-{}{}{}'.format(vnf_id, vdu_id, vdu_count)
746
747 return N2VCJujuConnector._format_app_name(application_name)
748
749 async def _juju_create_machine(
750 self,
751 model_name: str,
752 application_name: str,
753 machine_id: str = None,
754 db_dict: dict = None,
755 progress_timeout: float = None,
756 total_timeout: float = None
757 ) -> Machine:
758
759 self.debug('creating machine in model: {}, existing machine id: {}'.format(model_name, machine_id))
760
761 # get juju model and observer (create model if needed)
762 model = await self._juju_get_model(model_name=model_name)
763 observer = self.juju_observers[model_name]
764
765 # find machine id in model
766 machine = None
767 if machine_id is not None:
768 self.debug('Finding existing machine id {} in model'.format(machine_id))
769 # get juju existing machines in the model
770 existing_machines = await model.get_machines()
771 if machine_id in existing_machines:
772 self.debug('Machine id {} found in model (reusing it)'.format(machine_id))
773 machine = model.machines[machine_id]
774
775 if machine is None:
776 self.debug('Creating a new machine in juju...')
777 # machine does not exist, create it and wait for it
778 machine = await model.add_machine(
779 spec=None,
780 constraints=None,
781 disks=None,
782 series='xenial'
783 )
784
785 # register machine with observer
786 observer.register_machine(machine=machine, db_dict=db_dict)
787
788 # id for the execution environment
789 ee_id = N2VCJujuConnector._build_ee_id(
790 model_name=model_name,
791 application_name=application_name,
792 machine_id=str(machine.entity_id)
793 )
794
795 # write ee_id in database
796 self._write_ee_id_db(
797 db_dict=db_dict,
798 ee_id=ee_id
799 )
800
801 # wait for machine creation
802 await observer.wait_for_machine(
803 machine_id=str(machine.entity_id),
804 progress_timeout=progress_timeout,
805 total_timeout=total_timeout
806 )
807
808 else:
809
810 self.debug('Reusing old machine pending')
811
812 # register machine with observer
813 observer.register_machine(machine=machine, db_dict=db_dict)
814
815 # machine does exist, but it is in creation process (pending), wait for create finalisation
816 await observer.wait_for_machine(
817 machine_id=machine.entity_id,
818 progress_timeout=progress_timeout,
819 total_timeout=total_timeout)
820
821 self.debug("Machine ready at " + str(machine.dns_name))
822 return machine
823
824 async def _juju_provision_machine(
825 self,
826 model_name: str,
827 hostname: str,
828 username: str,
829 private_key_path: str,
830 db_dict: dict = None,
831 progress_timeout: float = None,
832 total_timeout: float = None
833 ) -> str:
834
835 if not self.api_proxy:
836 msg = 'Cannot provision machine: api_proxy is not defined'
837 self.error(msg=msg)
838 raise N2VCException(message=msg)
839
840 self.debug('provisioning machine. model: {}, hostname: {}, username: {}'.format(model_name, hostname, username))
841
842 if not self._authenticated:
843 await self._juju_login()
844
845 # get juju model and observer
846 model = await self._juju_get_model(model_name=model_name)
847 observer = self.juju_observers[model_name]
848
849 # TODO check if machine is already provisioned
850 machine_list = await model.get_machines()
851
852 provisioner = SSHProvisioner(
853 host=hostname,
854 user=username,
855 private_key_path=private_key_path,
856 log=self.log
857 )
858
859 params = None
860 try:
861 params = provisioner.provision_machine()
862 except Exception as ex:
863 msg = "Exception provisioning machine: {}".format(ex)
864 self.log.error(msg)
865 raise N2VCException(message=msg)
866
867 params.jobs = ['JobHostUnits']
868
869 connection = model.connection()
870
871 # Submit the request.
872 self.debug("Adding machine to model")
873 client_facade = client.ClientFacade.from_connection(connection)
874 results = await client_facade.AddMachines(params=[params])
875 error = results.machines[0].error
876 if error:
877 msg = "Error adding machine: {}}".format(error.message)
878 self.error(msg=msg)
879 raise ValueError(msg)
880
881 machine_id = results.machines[0].machine
882
883 # Need to run this after AddMachines has been called,
884 # as we need the machine_id
885 self.debug("Installing Juju agent into machine {}".format(machine_id))
886 asyncio.ensure_future(provisioner.install_agent(
887 connection=connection,
888 nonce=params.nonce,
889 machine_id=machine_id,
890 api=self.api_proxy,
891 ))
892
893 # wait for machine in model (now, machine is not yet in model, so we must wait for it)
894 machine = None
895 for i in range(10):
896 machine_list = await model.get_machines()
897 if machine_id in machine_list:
898 self.debug('Machine {} found in model!'.format(machine_id))
899 machine = model.machines.get(machine_id)
900 break
901 await asyncio.sleep(2)
902
903 if machine is None:
904 msg = 'Machine {} not found in model'.format(machine_id)
905 self.error(msg=msg)
906 raise Exception(msg)
907
908 # register machine with observer
909 observer.register_machine(machine=machine, db_dict=db_dict)
910
911 # wait for machine creation
912 self.debug('waiting for provision finishes... {}'.format(machine_id))
913 await observer.wait_for_machine(
914 machine_id=machine_id,
915 progress_timeout=progress_timeout,
916 total_timeout=total_timeout
917 )
918
919 self.debug("Machine provisioned {}".format(machine_id))
920
921 return machine_id
922
923 async def _juju_deploy_charm(
924 self,
925 model_name: str,
926 application_name: str,
927 charm_path: str,
928 machine_id: str,
929 db_dict: dict,
930 progress_timeout: float = None,
931 total_timeout: float = None
932 ) -> (Application, int):
933
934 # get juju model and observer
935 model = await self._juju_get_model(model_name=model_name)
936 observer = self.juju_observers[model_name]
937
938 # check if application already exists
939 application = None
940 if application_name in model.applications:
941 application = model.applications[application_name]
942
943 if application is None:
944
945 # application does not exist, create it and wait for it
946 self.debug('deploying application {} to machine {}, model {}'
947 .format(application_name, machine_id, model_name))
948 self.debug('charm: {}'.format(charm_path))
949 series = 'xenial'
950 # series = None
951 application = await model.deploy(
952 entity_url=charm_path,
953 application_name=application_name,
954 channel='stable',
955 num_units=1,
956 series=series,
957 to=machine_id
958 )
959
960 # register application with observer
961 observer.register_application(application=application, db_dict=db_dict)
962
963 self.debug('waiting for application deployed... {}'.format(application.entity_id))
964 retries = await observer.wait_for_application(
965 application_id=application.entity_id,
966 progress_timeout=progress_timeout,
967 total_timeout=total_timeout)
968 self.debug('application deployed')
969
970 else:
971
972 # register application with observer
973 observer.register_application(application=application, db_dict=db_dict)
974
975 # application already exists, but not finalised
976 self.debug('application already exists, waiting for deployed...')
977 retries = await observer.wait_for_application(
978 application_id=application.entity_id,
979 progress_timeout=progress_timeout,
980 total_timeout=total_timeout)
981 self.debug('application deployed')
982
983 return application, retries
984
985 async def _juju_execute_action(
986 self,
987 model_name: str,
988 application_name: str,
989 action_name: str,
990 db_dict: dict,
991 progress_timeout: float = None,
992 total_timeout: float = None,
993 **kwargs
994 ) -> Action:
995
996 # get juju model and observer
997 model = await self._juju_get_model(model_name=model_name)
998 observer = self.juju_observers[model_name]
999
1000 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
1001
1002 unit = application.units[0]
1003 if unit is not None:
1004 actions = await application.get_actions()
1005 if action_name in actions:
1006 self.debug('executing action "{}" using params: {}'.format(action_name, kwargs))
1007 action = await unit.run_action(action_name, **kwargs)
1008
1009 # register action with observer
1010 observer.register_action(action=action, db_dict=db_dict)
1011
1012 await observer.wait_for_action(
1013 action_id=action.entity_id,
1014 progress_timeout=progress_timeout,
1015 total_timeout=total_timeout)
1016 self.debug('action completed with status: {}'.format(action.status))
1017 output = await model.get_action_output(action_uuid=action.entity_id)
1018 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1019 if action.entity_id in status:
1020 status = status[action.entity_id]
1021 else:
1022 status = 'failed'
1023 return output, status
1024
1025 raise N2VCExecutionException(
1026 message='Cannot execute action on charm',
1027 primitive_name=action_name
1028 )
1029
1030 async def _juju_configure_application(
1031 self,
1032 model_name: str,
1033 application_name: str,
1034 config: dict,
1035 db_dict: dict,
1036 progress_timeout: float = None,
1037 total_timeout: float = None
1038 ):
1039
1040 # get the application
1041 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
1042
1043 self.debug('configuring the application {} -> {}'.format(application_name, config))
1044 res = await application.set_config(config)
1045 self.debug('application {} configured. res={}'.format(application_name, res))
1046
1047 # Verify the config is set
1048 new_conf = await application.get_config()
1049 for key in config:
1050 value = new_conf[key]['value']
1051 self.debug(' {} = {}'.format(key, value))
1052 if config[key] != value:
1053 raise N2VCException(
1054 message='key {} is not configured correctly {} != {}'.format(key, config[key], new_conf[key])
1055 )
1056
1057 # check if 'verify-ssh-credentials' action exists
1058 # unit = application.units[0]
1059 actions = await application.get_actions()
1060 if 'verify-ssh-credentials' not in actions:
1061 msg = 'Action verify-ssh-credentials does not exist in application {}'.format(application_name)
1062 self.debug(msg=msg)
1063 return False
1064
1065 # execute verify-credentials
1066 num_retries = 20
1067 retry_timeout = 15.0
1068 for i in range(num_retries):
1069 try:
1070 self.debug('Executing action verify-ssh-credentials...')
1071 output, ok = await self._juju_execute_action(
1072 model_name=model_name,
1073 application_name=application_name,
1074 action_name='verify-ssh-credentials',
1075 db_dict=db_dict,
1076 progress_timeout=progress_timeout,
1077 total_timeout=total_timeout
1078 )
1079 self.debug('Result: {}, output: {}'.format(ok, output))
1080 return True
1081 except Exception as e:
1082 self.debug('Error executing verify-ssh-credentials: {}. Retrying...'.format(e))
1083 await asyncio.sleep(retry_timeout)
1084 else:
1085 self.error('Error executing verify-ssh-credentials after {} retries. '.format(num_retries))
1086 return False
1087
1088 async def _juju_get_application(
1089 self,
1090 model_name: str,
1091 application_name: str
1092 ):
1093 """Get the deployed application."""
1094
1095 model = await self._juju_get_model(model_name=model_name)
1096
1097 application_name = N2VCJujuConnector._format_app_name(application_name)
1098
1099 if model.applications and application_name in model.applications:
1100 return model.applications[application_name]
1101 else:
1102 raise N2VCException(message='Cannot get application {} from model {}'.format(application_name, model_name))
1103
1104 async def _juju_get_model(self, model_name: str) -> Model:
1105 """ Get a model object from juju controller
1106 If the model does not exits, it creates it.
1107
1108 :param str model_name: name of the model
1109 :returns Model: model obtained from juju controller or Exception
1110 """
1111
1112 # format model name
1113 model_name = N2VCJujuConnector._format_model_name(model_name)
1114
1115 if model_name in self.juju_models:
1116 return self.juju_models[model_name]
1117
1118 if self._creating_model:
1119 self.debug('Another coroutine is creating a model. Wait...')
1120 while self._creating_model:
1121 # another coroutine is creating a model, wait
1122 await asyncio.sleep(0.1)
1123 # retry (perhaps another coroutine has created the model meanwhile)
1124 if model_name in self.juju_models:
1125 return self.juju_models[model_name]
1126
1127 try:
1128 self._creating_model = True
1129
1130 # get juju model names from juju
1131 model_list = await self.controller.list_models()
1132
1133 if model_name not in model_list:
1134 self.info('Model {} does not exist. Creating new model...'.format(model_name))
1135 config_dict = {'authorized-keys': self.public_key}
1136 if self.apt_mirror:
1137 config_dict['apt-mirror'] = self.apt_mirror
1138 if not self.enable_os_upgrade:
1139 config_dict['enable-os-refresh-update'] = False
1140 config_dict['enable-os-upgrade'] = False
1141
1142 model = await self.controller.add_model(
1143 model_name=model_name,
1144 config=config_dict
1145 )
1146 self.info('New model created, name={}'.format(model_name))
1147 else:
1148 self.debug('Model already exists in juju. Getting model {}'.format(model_name))
1149 model = await self.controller.get_model(model_name)
1150 self.debug('Existing model in juju, name={}'.format(model_name))
1151
1152 self.juju_models[model_name] = model
1153 self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model)
1154 return model
1155
1156 except Exception as e:
1157 msg = 'Cannot get model {}. Exception: {}'.format(model_name, e)
1158 self.error(msg)
1159 raise N2VCException(msg)
1160 finally:
1161 self._creating_model = False
1162
1163 async def _juju_add_relation(
1164 self,
1165 model_name: str,
1166 application_name_1: str,
1167 application_name_2: str,
1168 relation_1: str,
1169 relation_2: str
1170 ):
1171
1172 self.debug('adding relation')
1173
1174 # get juju model and observer
1175 model = await self._juju_get_model(model_name=model_name)
1176
1177 r1 = '{}:{}'.format(application_name_1, relation_1)
1178 r2 = '{}:{}'.format(application_name_2, relation_2)
1179 await model.add_relation(relation1=r1, relation2=r2)
1180
1181 async def _juju_destroy_application(
1182 self,
1183 model_name: str,
1184 application_name: str
1185 ):
1186
1187 self.debug('Destroying application {} in model {}'.format(application_name, model_name))
1188
1189 # get juju model and observer
1190 model = await self._juju_get_model(model_name=model_name)
1191
1192 application = model.applications.get(application_name)
1193 if application:
1194 await application.destroy()
1195 else:
1196 self.debug('Application not found: {}'.format(application_name))
1197
1198 async def _juju_destroy_machine(
1199 self,
1200 model_name: str,
1201 machine_id: str,
1202 total_timeout: float = None
1203 ):
1204
1205 self.debug('Destroying machine {} in model {}'.format(machine_id, model_name))
1206
1207 if total_timeout is None:
1208 total_timeout = 3600
1209
1210 # get juju model and observer
1211 model = await self._juju_get_model(model_name=model_name)
1212
1213 machines = await model.get_machines()
1214 if machine_id in machines:
1215 machine = model.machines[machine_id]
1216 await machine.destroy(force=True)
1217 # max timeout
1218 end = time.time() + total_timeout
1219 # wait for machine removal
1220 machines = await model.get_machines()
1221 while machine_id in machines and time.time() < end:
1222 self.debug('Waiting for machine {} is destroyed'.format(machine_id))
1223 await asyncio.sleep(0.5)
1224 machines = await model.get_machines()
1225 self.debug('Machine destroyed: {}'.format(machine_id))
1226 else:
1227 self.debug('Machine not found: {}'.format(machine_id))
1228
1229 async def _juju_destroy_model(
1230 self,
1231 model_name: str,
1232 total_timeout: float = None
1233 ):
1234
1235 self.debug('Destroying model {}'.format(model_name))
1236
1237 if total_timeout is None:
1238 total_timeout = 3600
1239
1240 model = await self._juju_get_model(model_name=model_name)
1241 uuid = model.info.uuid
1242
1243 # destroy machines
1244 machines = await model.get_machines()
1245 for machine_id in machines:
1246 try:
1247 await self._juju_destroy_machine(model_name=model_name, machine_id=machine_id)
1248 except Exception as e:
1249 # ignore exceptions destroying machine
1250 pass
1251
1252 await self._juju_disconnect_model(model_name=model_name)
1253 self.juju_models[model_name] = None
1254 self.juju_observers[model_name] = None
1255
1256 self.debug('destroying model {}...'.format(model_name))
1257 await self.controller.destroy_model(uuid)
1258 self.debug('model destroy requested {}'.format(model_name))
1259
1260 # wait for model is completely destroyed
1261 end = time.time() + total_timeout
1262 while time.time() < end:
1263 self.debug('Waiting for model is destroyed...')
1264 try:
1265 # await self.controller.get_model(uuid)
1266 models = await self.controller.list_models()
1267 if model_name not in models:
1268 self.debug('The model {} ({}) was destroyed'.format(model_name, uuid))
1269 return
1270 except Exception as e:
1271 pass
1272 await asyncio.sleep(1.0)
1273
1274 async def _juju_login(self):
1275 """Connect to juju controller
1276
1277 """
1278
1279 # if already authenticated, exit function
1280 if self._authenticated:
1281 return
1282
1283 # if connecting, wait for finish
1284 # another task could be trying to connect in parallel
1285 while self._connecting:
1286 await asyncio.sleep(0.1)
1287
1288 # double check after other task has finished
1289 if self._authenticated:
1290 return
1291
1292 try:
1293 self._connecting = True
1294 self.info(
1295 'connecting to juju controller: {} {}:{} ca_cert: {}'
1296 .format(self.url, self.username, self.secret, '\n'+self.ca_cert if self.ca_cert else 'None'))
1297
1298 # Create controller object
1299 self.controller = Controller(loop=self.loop)
1300 # Connect to controller
1301 await self.controller.connect(
1302 endpoint=self.url,
1303 username=self.username,
1304 password=self.secret,
1305 cacert=self.ca_cert
1306 )
1307 self._authenticated = True
1308 self.info('juju controller connected')
1309 except Exception as e:
1310 message = 'Exception connecting to juju: {}'.format(e)
1311 self.error(message)
1312 raise N2VCConnectionException(
1313 message=message,
1314 url=self.url
1315 )
1316 finally:
1317 self._connecting = False
1318
1319 async def _juju_logout(self):
1320 """Logout of the Juju controller."""
1321 if not self._authenticated:
1322 return False
1323
1324 # disconnect all models
1325 for model_name in self.juju_models:
1326 try:
1327 await self._juju_disconnect_model(model_name)
1328 except Exception as e:
1329 self.error('Error disconnecting model {} : {}'.format(model_name, e))
1330 # continue with next model...
1331
1332 self.info("Disconnecting controller")
1333 try:
1334 await self.controller.disconnect()
1335 except Exception as e:
1336 raise N2VCConnectionException(message='Error disconnecting controller: {}'.format(e), url=self.url)
1337
1338 self.controller = None
1339 self._authenticated = False
1340 self.info('disconnected')
1341
1342 async def _juju_disconnect_model(
1343 self,
1344 model_name: str
1345 ):
1346 self.debug("Disconnecting model {}".format(model_name))
1347 if model_name in self.juju_models:
1348 await self.juju_models[model_name].disconnect()
1349 self.juju_models[model_name] = None
1350 self.juju_observers[model_name] = None
1351 else:
1352 self.warning('Cannot disconnect model: {}'.format(model_name))
1353
1354 def _create_juju_public_key(self):
1355 """Recreate the Juju public key on lcm container, if needed
1356 Certain libjuju commands expect to be run from the same machine as Juju
1357 is bootstrapped to. This method will write the public key to disk in
1358 that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
1359 """
1360
1361 # Make sure that we have a public key before writing to disk
1362 if self.public_key is None or len(self.public_key) == 0:
1363 if 'OSMLCM_VCA_PUBKEY' in os.environ:
1364 self.public_key = os.getenv('OSMLCM_VCA_PUBKEY', '')
1365 if len(self.public_key) == 0:
1366 return
1367 else:
1368 return
1369
1370 pk_path = "{}/.local/share/juju/ssh".format(os.path.expanduser('~'))
1371 file_path = "{}/juju_id_rsa.pub".format(pk_path)
1372 self.debug('writing juju public key to file:\n{}\npublic key: {}'.format(file_path, self.public_key))
1373 if not os.path.exists(pk_path):
1374 # create path and write file
1375 os.makedirs(pk_path)
1376 with open(file_path, 'w') as f:
1377 self.debug('Creating juju public key file: {}'.format(file_path))
1378 f.write(self.public_key)
1379 else:
1380 self.debug('juju public key file already exists: {}'.format(file_path))
1381
1382 @staticmethod
1383 def _format_model_name(name: str) -> str:
1384 """Format the name of the model.
1385
1386 Model names may only contain lowercase letters, digits and hyphens
1387 """
1388
1389 return name.replace('_', '-').replace(' ', '-').lower()
1390
1391 @staticmethod
1392 def _format_app_name(name: str) -> str:
1393 """Format the name of the application (in order to assure valid application name).
1394
1395 Application names have restrictions (run juju deploy --help):
1396 - contains lowercase letters 'a'-'z'
1397 - contains numbers '0'-'9'
1398 - contains hyphens '-'
1399 - starts with a lowercase letter
1400 - not two or more consecutive hyphens
1401 - after a hyphen, not a group with all numbers
1402 """
1403
1404 def all_numbers(s: str) -> bool:
1405 for c in s:
1406 if not c.isdigit():
1407 return False
1408 return True
1409
1410 new_name = name.replace('_', '-')
1411 new_name = new_name.replace(' ', '-')
1412 new_name = new_name.lower()
1413 while new_name.find('--') >= 0:
1414 new_name = new_name.replace('--', '-')
1415 groups = new_name.split('-')
1416
1417 # find 'all numbers' groups and prefix them with a letter
1418 app_name = ''
1419 for i in range(len(groups)):
1420 group = groups[i]
1421 if all_numbers(group):
1422 group = 'z' + group
1423 if i > 0:
1424 app_name += '-'
1425 app_name += group
1426
1427 if app_name[0].isdigit():
1428 app_name = 'z' + app_name
1429
1430 return app_name