Fix flake8
[osm/N2VC.git] / n2vc / n2vc_juju_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import logging
24 import os
25 import asyncio
26 import time
27 import base64
28 import binascii
29 import re
30
31 from n2vc.n2vc_conn import N2VCConnector
32 from n2vc.exceptions \
33 import N2VCBadArgumentsException, N2VCException, N2VCConnectionException, \
34 N2VCExecutionException, N2VCInvalidCertificate
35 from n2vc.juju_observer import JujuModelObserver
36
37 from juju.controller import Controller
38 from juju.model import Model
39 from juju.application import Application
40 from juju.action import Action
41 from juju.machine import Machine
42 from juju.client import client
43
44 from n2vc.provisioner import SSHProvisioner
45
46
47 class N2VCJujuConnector(N2VCConnector):
48
49 """
50 ##################################################################################################
51 ########################################## P U B L I C ###########################################
52 ##################################################################################################
53 """
54
55 def __init__(
56 self,
57 db: object,
58 fs: object,
59 log: object = None,
60 loop: object = None,
61 url: str = '127.0.0.1:17070',
62 username: str = 'admin',
63 vca_config: dict = None,
64 on_update_db=None
65 ):
66 """Initialize juju N2VC connector
67 """
68
69 # parent class constructor
70 N2VCConnector.__init__(
71 self,
72 db=db,
73 fs=fs,
74 log=log,
75 loop=loop,
76 url=url,
77 username=username,
78 vca_config=vca_config,
79 on_update_db=on_update_db
80 )
81
82 # silence websocket traffic log
83 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
84 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
85 logging.getLogger('model').setLevel(logging.WARN)
86
87 self.info('Initializing N2VC juju connector...')
88
89 """
90 ##############################################################
91 # check arguments
92 ##############################################################
93 """
94
95 # juju URL
96 if url is None:
97 raise N2VCBadArgumentsException('Argument url is mandatory', ['url'])
98 url_parts = url.split(':')
99 if len(url_parts) != 2:
100 raise N2VCBadArgumentsException('Argument url: bad format (localhost:port) -> {}'.format(url), ['url'])
101 self.hostname = url_parts[0]
102 try:
103 self.port = int(url_parts[1])
104 except ValueError:
105 raise N2VCBadArgumentsException('url port must be a number -> {}'.format(url), ['url'])
106
107 # juju USERNAME
108 if username is None:
109 raise N2VCBadArgumentsException('Argument username is mandatory', ['username'])
110
111 # juju CONFIGURATION
112 if vca_config is None:
113 raise N2VCBadArgumentsException('Argument vca_config is mandatory', ['vca_config'])
114
115 if 'secret' in vca_config:
116 self.secret = vca_config['secret']
117 else:
118 raise N2VCBadArgumentsException('Argument vca_config.secret is mandatory', ['vca_config.secret'])
119
120 # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub
121 # if exists, it will be written in lcm container: _create_juju_public_key()
122 if 'public_key' in vca_config:
123 self.public_key = vca_config['public_key']
124 else:
125 self.public_key = None
126
127 # TODO: Verify ca_cert is valid before using. VCA will crash
128 # if the ca_cert isn't formatted correctly.
129 def base64_to_cacert(b64string):
130 """Convert the base64-encoded string containing the VCA CACERT.
131
132 The input string....
133
134 """
135 try:
136 cacert = base64.b64decode(b64string).decode("utf-8")
137
138 cacert = re.sub(
139 r'\\n',
140 r'\n',
141 cacert,
142 )
143 except binascii.Error as e:
144 self.debug("Caught binascii.Error: {}".format(e))
145 raise N2VCInvalidCertificate(message="Invalid CA Certificate")
146
147 return cacert
148
149 self.ca_cert = vca_config.get('ca_cert')
150 if self.ca_cert:
151 self.ca_cert = base64_to_cacert(vca_config['ca_cert'])
152
153 if 'api_proxy' in vca_config:
154 self.api_proxy = vca_config['api_proxy']
155 self.debug('api_proxy for native charms configured: {}'.format(self.api_proxy))
156 else:
157 self.warning('api_proxy is not configured. Support for native charms is disabled')
158
159 self.debug('Arguments have been checked')
160
161 # juju data
162 self.controller = None # it will be filled when connect to juju
163 self.juju_models = {} # model objects for every model_name
164 self.juju_observers = {} # model observers for every model_name
165 self._connecting = False # while connecting to juju (to avoid duplicate connections)
166 self._authenticated = False # it will be True when juju connection be stablished
167 self._creating_model = False # True during model creation
168
169 # create juju pub key file in lcm container at ./local/share/juju/ssh/juju_id_rsa.pub
170 self._create_juju_public_key()
171
172 self.info('N2VC juju connector initialized')
173
174 async def get_status(self, namespace: str):
175 self.info('Getting NS status. namespace: {}'.format(namespace))
176
177 if not self._authenticated:
178 await self._juju_login()
179
180 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
181 # model name is ns_id
182 model_name = ns_id
183 if model_name is None:
184 msg = 'Namespace {} not valid'.format(namespace)
185 self.error(msg)
186 raise N2VCBadArgumentsException(msg, ['namespace'])
187
188 # get juju model (create model if needed)
189 model = await self._juju_get_model(model_name=model_name)
190
191 status = await model.get_status()
192
193 return status
194
195 async def create_execution_environment(
196 self,
197 namespace: str,
198 db_dict: dict,
199 reuse_ee_id: str = None,
200 progress_timeout: float = None,
201 total_timeout: float = None
202 ) -> (str, dict):
203
204 self.info('Creating execution environment. namespace: {}, reuse_ee_id: {}'.format(namespace, reuse_ee_id))
205
206 if not self._authenticated:
207 await self._juju_login()
208
209 machine_id = None
210 if reuse_ee_id:
211 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=reuse_ee_id)
212 else:
213 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
214 # model name is ns_id
215 model_name = ns_id
216 # application name
217 application_name = self._get_application_name(namespace=namespace)
218
219 self.debug('model name: {}, application name: {}, machine_id: {}'
220 .format(model_name, application_name, machine_id))
221
222 # create or reuse a new juju machine
223 try:
224 machine = await self._juju_create_machine(
225 model_name=model_name,
226 application_name=application_name,
227 machine_id=machine_id,
228 db_dict=db_dict,
229 progress_timeout=progress_timeout,
230 total_timeout=total_timeout
231 )
232 except Exception as e:
233 message = 'Error creating machine on juju: {}'.format(e)
234 self.error(message)
235 raise N2VCException(message=message)
236
237 # id for the execution environment
238 ee_id = N2VCJujuConnector._build_ee_id(
239 model_name=model_name,
240 application_name=application_name,
241 machine_id=str(machine.entity_id)
242 )
243 self.debug('ee_id: {}'.format(ee_id))
244
245 # new machine credentials
246 credentials = dict()
247 credentials['hostname'] = machine.dns_name
248
249 self.info('Execution environment created. ee_id: {}, credentials: {}'.format(ee_id, credentials))
250
251 return ee_id, credentials
252
253 async def register_execution_environment(
254 self,
255 namespace: str,
256 credentials: dict,
257 db_dict: dict,
258 progress_timeout: float = None,
259 total_timeout: float = None
260 ) -> str:
261
262 if not self._authenticated:
263 await self._juju_login()
264
265 self.info('Registering execution environment. namespace={}, credentials={}'.format(namespace, credentials))
266
267 if credentials is None:
268 raise N2VCBadArgumentsException(message='credentials are mandatory', bad_args=['credentials'])
269 if credentials.get('hostname'):
270 hostname = credentials['hostname']
271 else:
272 raise N2VCBadArgumentsException(message='hostname is mandatory', bad_args=['credentials.hostname'])
273 if credentials.get('username'):
274 username = credentials['username']
275 else:
276 raise N2VCBadArgumentsException(message='username is mandatory', bad_args=['credentials.username'])
277 if 'private_key_path' in credentials:
278 private_key_path = credentials['private_key_path']
279 else:
280 # if not passed as argument, use generated private key path
281 private_key_path = self.private_key_path
282
283 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
284
285 # model name
286 model_name = ns_id
287 # application name
288 application_name = self._get_application_name(namespace=namespace)
289
290 # register machine on juju
291 try:
292 machine_id = await self._juju_provision_machine(
293 model_name=model_name,
294 hostname=hostname,
295 username=username,
296 private_key_path=private_key_path,
297 db_dict=db_dict,
298 progress_timeout=progress_timeout,
299 total_timeout=total_timeout
300 )
301 except Exception as e:
302 self.error('Error registering machine: {}'.format(e))
303 raise N2VCException(message='Error registering machine on juju: {}'.format(e))
304
305 self.info('Machine registered: {}'.format(machine_id))
306
307 # id for the execution environment
308 ee_id = N2VCJujuConnector._build_ee_id(
309 model_name=model_name,
310 application_name=application_name,
311 machine_id=str(machine_id)
312 )
313
314 self.info('Execution environment registered. ee_id: {}'.format(ee_id))
315
316 return ee_id
317
318 async def install_configuration_sw(
319 self,
320 ee_id: str,
321 artifact_path: str,
322 db_dict: dict,
323 progress_timeout: float = None,
324 total_timeout: float = None
325 ):
326
327 self.info('Installing configuration sw on ee_id: {}, artifact path: {}, db_dict: {}'
328 .format(ee_id, artifact_path, db_dict))
329
330 if not self._authenticated:
331 await self._juju_login()
332
333 # check arguments
334 if ee_id is None or len(ee_id) == 0:
335 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
336 if artifact_path is None or len(artifact_path) == 0:
337 raise N2VCBadArgumentsException(message='artifact_path is mandatory', bad_args=['artifact_path'])
338 if db_dict is None:
339 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
340
341 try:
342 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
343 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
344 except Exception as e:
345 raise N2VCBadArgumentsException(
346 message='ee_id={} is not a valid execution environment id'.format(ee_id),
347 bad_args=['ee_id']
348 )
349
350 # remove // in charm path
351 while artifact_path.find('//') >= 0:
352 artifact_path = artifact_path.replace('//', '/')
353
354 # check charm path
355 if not self.fs.file_exists(artifact_path, mode="dir"):
356 msg = 'artifact path does not exist: {}'.format(artifact_path)
357 raise N2VCBadArgumentsException(message=msg, bad_args=['artifact_path'])
358
359 if artifact_path.startswith('/'):
360 full_path = self.fs.path + artifact_path
361 else:
362 full_path = self.fs.path + '/' + artifact_path
363
364 try:
365 application, retries = await self._juju_deploy_charm(
366 model_name=model_name,
367 application_name=application_name,
368 charm_path=full_path,
369 machine_id=machine_id,
370 db_dict=db_dict,
371 progress_timeout=progress_timeout,
372 total_timeout=total_timeout
373 )
374 except Exception as e:
375 raise N2VCException(message='Error desploying charm into ee={} : {}'.format(ee_id, e))
376
377 self.info('Configuration sw installed')
378
379 async def get_ee_ssh_public__key(
380 self,
381 ee_id: str,
382 db_dict: dict,
383 progress_timeout: float = None,
384 total_timeout: float = None
385 ) -> str:
386
387 self.info('Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}'.format(ee_id, db_dict))
388
389 if not self._authenticated:
390 await self._juju_login()
391
392 # check arguments
393 if ee_id is None or len(ee_id) == 0:
394 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
395 if db_dict is None:
396 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
397
398 try:
399 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
400 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
401 except Exception as e:
402 raise N2VCBadArgumentsException(
403 message='ee_id={} is not a valid execution environment id'.format(ee_id),
404 bad_args=['ee_id']
405 )
406
407 # try to execute ssh layer primitives (if exist):
408 # generate-ssh-key
409 # get-ssh-public-key
410
411 output = None
412
413 # execute action: generate-ssh-key
414 try:
415 output, status = await self._juju_execute_action(
416 model_name=model_name,
417 application_name=application_name,
418 action_name='generate-ssh-key',
419 db_dict=db_dict,
420 progress_timeout=progress_timeout,
421 total_timeout=total_timeout
422 )
423 except Exception as e:
424 self.info('Cannot execute action generate-ssh-key: {}\nContinuing...'.format(e))
425
426 # execute action: get-ssh-public-key
427 try:
428 output, status = await self._juju_execute_action(
429 model_name=model_name,
430 application_name=application_name,
431 action_name='get-ssh-public-key',
432 db_dict=db_dict,
433 progress_timeout=progress_timeout,
434 total_timeout=total_timeout
435 )
436 except Exception as e:
437 msg = 'Cannot execute action get-ssh-public-key: {}\n'.format(e)
438 self.info(msg)
439 raise e
440
441 # return public key if exists
442 return output
443
444 async def add_relation(
445 self,
446 ee_id_1: str,
447 ee_id_2: str,
448 endpoint_1: str,
449 endpoint_2: str
450 ):
451
452 self.debug('adding new relation between {} and {}, endpoints: {}, {}'
453 .format(ee_id_1, ee_id_2, endpoint_1, endpoint_2))
454
455 if not self._authenticated:
456 await self._juju_login()
457
458 # get model, application and machines
459 model_1, app_1, machine_1 = self._get_ee_id_components(ee_id_1)
460 model_2, app_2, machine_2 = self._get_ee_id_components(ee_id_2)
461
462 # model must be the same
463 if model_1 != model_2:
464 message = 'EE models are not the same: {} vs {}'.format(ee_id_1, ee_id_2)
465 self.error(message)
466 raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_1', 'ee_id_2'])
467
468 # add juju relations between two applications
469 try:
470 self._juju_add_relation()
471 except Exception as e:
472 message = 'Error adding relation between {} and {}'.format(ee_id_1, ee_id_2)
473 self.error(message)
474 raise N2VCException(message=message)
475
476 async def remove_relation(
477 self
478 ):
479 if not self._authenticated:
480 await self._juju_login()
481 # TODO
482 self.info('Method not implemented yet')
483 raise NotImplemented()
484
485 async def deregister_execution_environments(
486 self
487 ):
488 if not self._authenticated:
489 await self._juju_login()
490 # TODO
491 self.info('Method not implemented yet')
492 raise NotImplemented()
493
494 async def delete_namespace(
495 self,
496 namespace: str,
497 db_dict: dict = None,
498 total_timeout: float = None
499 ):
500 self.info('Deleting namespace={}'.format(namespace))
501
502 if not self._authenticated:
503 await self._juju_login()
504
505 # check arguments
506 if namespace is None:
507 raise N2VCBadArgumentsException(message='namespace is mandatory', bad_args=['namespace'])
508
509 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
510 if ns_id is not None:
511 self.debug('Deleting model {}'.format(ns_id))
512 try:
513 await self._juju_destroy_model(
514 model_name=ns_id,
515 total_timeout=total_timeout
516 )
517 except Exception as e:
518 raise N2VCException(message='Error deleting namespace {} : {}'.format(namespace, e))
519 else:
520 raise N2VCBadArgumentsException(message='only ns_id is permitted to delete yet', bad_args=['namespace'])
521
522 self.info('Namespace {} deleted'.format(namespace))
523
524 async def delete_execution_environment(
525 self,
526 ee_id: str,
527 db_dict: dict = None,
528 total_timeout: float = None
529 ):
530 self.info('Deleting execution environment ee_id={}'.format(ee_id))
531
532 if not self._authenticated:
533 await self._juju_login()
534
535 # check arguments
536 if ee_id is None:
537 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
538
539 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=ee_id)
540
541 # destroy the application
542 try:
543 await self._juju_destroy_application(model_name=model_name, application_name=application_name)
544 except Exception as e:
545 raise N2VCException(message='Error deleting execution environment {} (application {}) : {}'
546 .format(ee_id, application_name, e))
547
548 # destroy the machine
549 try:
550 await self._juju_destroy_machine(
551 model_name=model_name,
552 machine_id=machine_id,
553 total_timeout=total_timeout
554 )
555 except Exception as e:
556 raise N2VCException(message='Error deleting execution environment {} (machine {}) : {}'
557 .format(ee_id, machine_id, e))
558
559 self.info('Execution environment {} deleted'.format(ee_id))
560
561 async def exec_primitive(
562 self,
563 ee_id: str,
564 primitive_name: str,
565 params_dict: dict,
566 db_dict: dict = None,
567 progress_timeout: float = None,
568 total_timeout: float = None
569 ) -> str:
570
571 self.info('Executing primitive: {} on ee: {}, params: {}'.format(primitive_name, ee_id, params_dict))
572
573 if not self._authenticated:
574 await self._juju_login()
575
576 # check arguments
577 if ee_id is None or len(ee_id) == 0:
578 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
579 if primitive_name is None or len(primitive_name) == 0:
580 raise N2VCBadArgumentsException(message='action_name is mandatory', bad_args=['action_name'])
581 if params_dict is None:
582 params_dict = dict()
583
584 try:
585 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
586 except Exception:
587 raise N2VCBadArgumentsException(
588 message='ee_id={} is not a valid execution environment id'.format(ee_id),
589 bad_args=['ee_id']
590 )
591
592 if primitive_name == 'config':
593 # Special case: config primitive
594 try:
595 await self._juju_configure_application(
596 model_name=model_name,
597 application_name=application_name,
598 config=params_dict,
599 db_dict=db_dict,
600 progress_timeout=progress_timeout,
601 total_timeout=total_timeout
602 )
603 except Exception as e:
604 self.error('Error configuring juju application: {}'.format(e))
605 raise N2VCExecutionException(
606 message='Error configuring application into ee={} : {}'.format(ee_id, e),
607 primitive_name=primitive_name
608 )
609 return 'CONFIG OK'
610 else:
611 try:
612 output, status = await self._juju_execute_action(
613 model_name=model_name,
614 application_name=application_name,
615 action_name=primitive_name,
616 db_dict=db_dict,
617 progress_timeout=progress_timeout,
618 total_timeout=total_timeout,
619 **params_dict
620 )
621 if status == 'completed':
622 return output
623 else:
624 raise Exception('status is not completed: {}'.format(status))
625 except Exception as e:
626 self.error('Error executing primitive {}: {}'.format(primitive_name, e))
627 raise N2VCExecutionException(
628 message='Error executing primitive {} into ee={} : {}'.format(primitive_name, ee_id, e),
629 primitive_name=primitive_name
630 )
631
632 async def disconnect(self):
633 self.info('closing juju N2VC...')
634 await self._juju_logout()
635
636 """
637 ##################################################################################################
638 ########################################## P R I V A T E #########################################
639 ##################################################################################################
640 """
641
642 def _write_ee_id_db(
643 self,
644 db_dict: dict,
645 ee_id: str
646 ):
647
648 # write ee_id to database: _admin.deployed.VCA.x
649 try:
650 the_table = db_dict['collection']
651 the_filter = db_dict['filter']
652 the_path = db_dict['path']
653 if not the_path[-1] == '.':
654 the_path = the_path + '.'
655 update_dict = {the_path + 'ee_id': ee_id}
656 self.debug('Writing ee_id to database: {}'.format(the_path))
657 self.db.set_one(
658 table=the_table,
659 q_filter=the_filter,
660 update_dict=update_dict,
661 fail_on_empty=True
662 )
663 except Exception as e:
664 self.error('Error writing ee_id to database: {}'.format(e))
665
666 @staticmethod
667 def _build_ee_id(
668 model_name: str,
669 application_name: str,
670 machine_id: str
671 ):
672 """
673 Build an execution environment id form model, application and machine
674 :param model_name:
675 :param application_name:
676 :param machine_id:
677 :return:
678 """
679 # id for the execution environment
680 return '{}.{}.{}'.format(model_name, application_name, machine_id)
681
682 @staticmethod
683 def _get_ee_id_components(
684 ee_id: str
685 ) -> (str, str, str):
686 """
687 Get model, application and machine components from an execution environment id
688 :param ee_id:
689 :return: model_name, application_name, machine_id
690 """
691
692 if ee_id is None:
693 return None, None, None
694
695 # split components of id
696 parts = ee_id.split('.')
697 model_name = parts[0]
698 application_name = parts[1]
699 machine_id = parts[2]
700 return model_name, application_name, machine_id
701
702 def _get_application_name(self, namespace: str) -> str:
703 """
704 Build application name from namespace
705 :param namespace:
706 :return: app-vnf-<vnf id>-vdu-<vdu-id>-cnt-<vdu-count>
707 """
708
709 # split namespace components
710 _, _, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
711
712 if vnf_id is None or len(vnf_id) == 0:
713 vnf_id = ''
714 else:
715 vnf_id = 'vnf-' + vnf_id
716
717 if vdu_id is None or len(vdu_id) == 0:
718 vdu_id = ''
719 else:
720 vdu_id = '-vdu-' + vdu_id
721
722 if vdu_count is None or len(vdu_count) == 0:
723 vdu_count = ''
724 else:
725 vdu_count = '-cnt-' + vdu_count
726
727 application_name = 'app-{}{}{}'.format(vnf_id, vdu_id, vdu_count)
728
729 return N2VCJujuConnector._format_app_name(application_name)
730
731 async def _juju_create_machine(
732 self,
733 model_name: str,
734 application_name: str,
735 machine_id: str = None,
736 db_dict: dict = None,
737 progress_timeout: float = None,
738 total_timeout: float = None
739 ) -> Machine:
740
741 self.debug('creating machine in model: {}, existing machine id: {}'.format(model_name, machine_id))
742
743 # get juju model and observer (create model if needed)
744 model = await self._juju_get_model(model_name=model_name)
745 observer = self.juju_observers[model_name]
746
747 # find machine id in model
748 machine = None
749 if machine_id is not None:
750 self.debug('Finding existing machine id {} in model'.format(machine_id))
751 # get juju existing machines in the model
752 existing_machines = await model.get_machines()
753 if machine_id in existing_machines:
754 self.debug('Machine id {} found in model (reusing it)'.format(machine_id))
755 machine = model.machines[machine_id]
756
757 if machine is None:
758 self.debug('Creating a new machine in juju...')
759 # machine does not exist, create it and wait for it
760 machine = await model.add_machine(
761 spec=None,
762 constraints=None,
763 disks=None,
764 series='xenial'
765 )
766
767 # register machine with observer
768 observer.register_machine(machine=machine, db_dict=db_dict)
769
770 # id for the execution environment
771 ee_id = N2VCJujuConnector._build_ee_id(
772 model_name=model_name,
773 application_name=application_name,
774 machine_id=str(machine.entity_id)
775 )
776
777 # write ee_id in database
778 self._write_ee_id_db(
779 db_dict=db_dict,
780 ee_id=ee_id
781 )
782
783 # wait for machine creation
784 await observer.wait_for_machine(
785 machine_id=str(machine.entity_id),
786 progress_timeout=progress_timeout,
787 total_timeout=total_timeout
788 )
789
790 else:
791
792 self.debug('Reusing old machine pending')
793
794 # register machine with observer
795 observer.register_machine(machine=machine, db_dict=db_dict)
796
797 # machine does exist, but it is in creation process (pending), wait for create finalisation
798 await observer.wait_for_machine(
799 machine_id=machine.entity_id,
800 progress_timeout=progress_timeout,
801 total_timeout=total_timeout)
802
803 self.debug("Machine ready at " + str(machine.dns_name))
804 return machine
805
806 async def _juju_provision_machine(
807 self,
808 model_name: str,
809 hostname: str,
810 username: str,
811 private_key_path: str,
812 db_dict: dict = None,
813 progress_timeout: float = None,
814 total_timeout: float = None
815 ) -> str:
816
817 if not self.api_proxy:
818 msg = 'Cannot provision machine: api_proxy is not defined'
819 self.error(msg=msg)
820 raise N2VCException(message=msg)
821
822 self.debug('provisioning machine. model: {}, hostname: {}, username: {}'.format(model_name, hostname, username))
823
824 if not self._authenticated:
825 await self._juju_login()
826
827 # get juju model and observer
828 model = await self._juju_get_model(model_name=model_name)
829 observer = self.juju_observers[model_name]
830
831 # TODO check if machine is already provisioned
832 machine_list = await model.get_machines()
833
834 provisioner = SSHProvisioner(
835 host=hostname,
836 user=username,
837 private_key_path=private_key_path,
838 log=self.log
839 )
840
841 params = None
842 try:
843 params = provisioner.provision_machine()
844 except Exception as ex:
845 msg = "Exception provisioning machine: {}".format(ex)
846 self.log.error(msg)
847 raise N2VCException(message=msg)
848
849 params.jobs = ['JobHostUnits']
850
851 connection = model.connection()
852
853 # Submit the request.
854 self.debug("Adding machine to model")
855 client_facade = client.ClientFacade.from_connection(connection)
856 results = await client_facade.AddMachines(params=[params])
857 error = results.machines[0].error
858 if error:
859 msg = "Error adding machine: {}}".format(error.message)
860 self.error(msg=msg)
861 raise ValueError(msg)
862
863 machine_id = results.machines[0].machine
864
865 # Need to run this after AddMachines has been called,
866 # as we need the machine_id
867 self.debug("Installing Juju agent into machine {}".format(machine_id))
868 asyncio.ensure_future(provisioner.install_agent(
869 connection=connection,
870 nonce=params.nonce,
871 machine_id=machine_id,
872 api=self.api_proxy,
873 ))
874
875 # wait for machine in model (now, machine is not yet in model, so we must wait for it)
876 machine = None
877 for i in range(10):
878 machine_list = await model.get_machines()
879 if machine_id in machine_list:
880 self.debug('Machine {} found in model!'.format(machine_id))
881 machine = model.machines.get(machine_id)
882 break
883 await asyncio.sleep(2)
884
885 if machine is None:
886 msg = 'Machine {} not found in model'.format(machine_id)
887 self.error(msg=msg)
888 raise Exception(msg)
889
890 # register machine with observer
891 observer.register_machine(machine=machine, db_dict=db_dict)
892
893 # wait for machine creation
894 self.debug('waiting for provision finishes... {}'.format(machine_id))
895 await observer.wait_for_machine(
896 machine_id=machine_id,
897 progress_timeout=progress_timeout,
898 total_timeout=total_timeout
899 )
900
901 self.debug("Machine provisioned {}".format(machine_id))
902
903 return machine_id
904
905 async def _juju_deploy_charm(
906 self,
907 model_name: str,
908 application_name: str,
909 charm_path: str,
910 machine_id: str,
911 db_dict: dict,
912 progress_timeout: float = None,
913 total_timeout: float = None
914 ) -> (Application, int):
915
916 # get juju model and observer
917 model = await self._juju_get_model(model_name=model_name)
918 observer = self.juju_observers[model_name]
919
920 # check if application already exists
921 application = None
922 if application_name in model.applications:
923 application = model.applications[application_name]
924
925 if application is None:
926
927 # application does not exist, create it and wait for it
928 self.debug('deploying application {} to machine {}, model {}'
929 .format(application_name, machine_id, model_name))
930 self.debug('charm: {}'.format(charm_path))
931 series = 'xenial'
932 # series = None
933 application = await model.deploy(
934 entity_url=charm_path,
935 application_name=application_name,
936 channel='stable',
937 num_units=1,
938 series=series,
939 to=machine_id
940 )
941
942 # register application with observer
943 observer.register_application(application=application, db_dict=db_dict)
944
945 self.debug('waiting for application deployed... {}'.format(application.entity_id))
946 retries = await observer.wait_for_application(
947 application_id=application.entity_id,
948 progress_timeout=progress_timeout,
949 total_timeout=total_timeout)
950 self.debug('application deployed')
951
952 else:
953
954 # register application with observer
955 observer.register_application(application=application, db_dict=db_dict)
956
957 # application already exists, but not finalised
958 self.debug('application already exists, waiting for deployed...')
959 retries = await observer.wait_for_application(
960 application_id=application.entity_id,
961 progress_timeout=progress_timeout,
962 total_timeout=total_timeout)
963 self.debug('application deployed')
964
965 return application, retries
966
967 async def _juju_execute_action(
968 self,
969 model_name: str,
970 application_name: str,
971 action_name: str,
972 db_dict: dict,
973 progress_timeout: float = None,
974 total_timeout: float = None,
975 **kwargs
976 ) -> Action:
977
978 # get juju model and observer
979 model = await self._juju_get_model(model_name=model_name)
980 observer = self.juju_observers[model_name]
981
982 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
983
984 self.debug('trying to execute action {}'.format(action_name))
985 unit = application.units[0]
986 if unit is not None:
987 actions = await application.get_actions()
988 if action_name in actions:
989 self.debug('executing action {} with params {}'.format(action_name, kwargs))
990 action = await unit.run_action(action_name, **kwargs)
991
992 # register action with observer
993 observer.register_action(action=action, db_dict=db_dict)
994
995 self.debug(' waiting for action completed or error...')
996 await observer.wait_for_action(
997 action_id=action.entity_id,
998 progress_timeout=progress_timeout,
999 total_timeout=total_timeout)
1000 self.debug('action completed with status: {}'.format(action.status))
1001 output = await model.get_action_output(action_uuid=action.entity_id)
1002 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1003 if action.entity_id in status:
1004 status = status[action.entity_id]
1005 else:
1006 status = 'failed'
1007 return output, status
1008
1009 raise N2VCExecutionException(
1010 message='Cannot execute action on charm',
1011 primitive_name=action_name
1012 )
1013
1014 async def _juju_configure_application(
1015 self,
1016 model_name: str,
1017 application_name: str,
1018 config: dict,
1019 db_dict: dict,
1020 progress_timeout: float = None,
1021 total_timeout: float = None
1022 ):
1023
1024 # get the application
1025 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
1026
1027 self.debug('configuring the application {} -> {}'.format(application_name, config))
1028 res = await application.set_config(config)
1029 self.debug('application {} configured. res={}'.format(application_name, res))
1030
1031 # Verify the config is set
1032 new_conf = await application.get_config()
1033 for key in config:
1034 value = new_conf[key]['value']
1035 self.debug(' {} = {}'.format(key, value))
1036 if config[key] != value:
1037 raise N2VCException(
1038 message='key {} is not configured correctly {} != {}'.format(key, config[key], new_conf[key])
1039 )
1040
1041 # check if 'verify-ssh-credentials' action exists
1042 # unit = application.units[0]
1043 actions = await application.get_actions()
1044 if 'verify-ssh-credentials' not in actions:
1045 msg = 'Action verify-ssh-credentials does not exist in application {}'.format(application_name)
1046 self.debug(msg=msg)
1047 return False
1048
1049 # execute verify-credentials
1050 num_retries = 20
1051 retry_timeout = 15.0
1052 for i in range(num_retries):
1053 try:
1054 self.debug('Executing action verify-ssh-credentials...')
1055 output, ok = await self._juju_execute_action(
1056 model_name=model_name,
1057 application_name=application_name,
1058 action_name='verify-ssh-credentials',
1059 db_dict=db_dict,
1060 progress_timeout=progress_timeout,
1061 total_timeout=total_timeout
1062 )
1063 self.debug('Result: {}, output: {}'.format(ok, output))
1064 return True
1065 except Exception as e:
1066 self.debug('Error executing verify-ssh-credentials: {}. Retrying...'.format(e))
1067 await asyncio.sleep(retry_timeout)
1068 else:
1069 self.error('Error executing verify-ssh-credentials after {} retries. '.format(num_retries))
1070 return False
1071
1072 async def _juju_get_application(
1073 self,
1074 model_name: str,
1075 application_name: str
1076 ):
1077 """Get the deployed application."""
1078
1079 model = await self._juju_get_model(model_name=model_name)
1080
1081 application_name = N2VCJujuConnector._format_app_name(application_name)
1082
1083 if model.applications and application_name in model.applications:
1084 return model.applications[application_name]
1085 else:
1086 raise N2VCException(message='Cannot get application {} from model {}'.format(application_name, model_name))
1087
1088 async def _juju_get_model(self, model_name: str) -> Model:
1089 """ Get a model object from juju controller
1090
1091 :param str model_name: name of the model
1092 :returns Model: model obtained from juju controller or Exception
1093 """
1094
1095 # format model name
1096 model_name = N2VCJujuConnector._format_model_name(model_name)
1097
1098 if model_name in self.juju_models:
1099 return self.juju_models[model_name]
1100
1101 if self._creating_model:
1102 self.debug('Another coroutine is creating a model. Wait...')
1103 while self._creating_model:
1104 # another coroutine is creating a model, wait
1105 await asyncio.sleep(0.1)
1106 # retry (perhaps another coroutine has created the model meanwhile)
1107 if model_name in self.juju_models:
1108 return self.juju_models[model_name]
1109
1110 try:
1111 self._creating_model = True
1112
1113 # get juju model names from juju
1114 model_list = await self.controller.list_models()
1115
1116 if model_name not in model_list:
1117 self.info('Model {} does not exist. Creating new model...'.format(model_name))
1118 model = await self.controller.add_model(
1119 model_name=model_name,
1120 config={'authorized-keys': self.public_key}
1121 )
1122 self.info('New model created, name={}'.format(model_name))
1123 else:
1124 self.debug('Model already exists in juju. Getting model {}'.format(model_name))
1125 model = await self.controller.get_model(model_name)
1126 self.debug('Existing model in juju, name={}'.format(model_name))
1127
1128 self.juju_models[model_name] = model
1129 self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model)
1130 return model
1131
1132 except Exception as e:
1133 msg = 'Cannot get model {}. Exception: {}'.format(model_name, e)
1134 self.error(msg)
1135 raise N2VCException(msg)
1136 finally:
1137 self._creating_model = False
1138
1139 async def _juju_add_relation(
1140 self,
1141 model_name: str,
1142 application_name_1: str,
1143 application_name_2: str,
1144 relation_1: str,
1145 relation_2: str
1146 ):
1147
1148 self.debug('adding relation')
1149
1150 # get juju model and observer
1151 model = await self._juju_get_model(model_name=model_name)
1152
1153 r1 = '{}:{}'.format(application_name_1, relation_1)
1154 r2 = '{}:{}'.format(application_name_2, relation_2)
1155 await model.add_relation(relation1=r1, relation2=r2)
1156
1157 async def _juju_destroy_application(
1158 self,
1159 model_name: str,
1160 application_name: str
1161 ):
1162
1163 self.debug('Destroying application {} in model {}'.format(application_name, model_name))
1164
1165 # get juju model and observer
1166 model = await self._juju_get_model(model_name=model_name)
1167
1168 application = model.applications.get(application_name)
1169 if application:
1170 await application.destroy()
1171 else:
1172 self.debug('Application not found: {}'.format(application_name))
1173
1174 async def _juju_destroy_machine(
1175 self,
1176 model_name: str,
1177 machine_id: str,
1178 total_timeout: float = None
1179 ):
1180
1181 self.debug('Destroying machine {} in model {}'.format(machine_id, model_name))
1182
1183 if total_timeout is None:
1184 total_timeout = 3600
1185
1186 # get juju model and observer
1187 model = await self._juju_get_model(model_name=model_name)
1188
1189 machines = await model.get_machines()
1190 if machine_id in machines:
1191 machine = model.machines[machine_id]
1192 await machine.destroy(force=True)
1193 # max timeout
1194 end = time.time() + total_timeout
1195 # wait for machine removal
1196 machines = await model.get_machines()
1197 while machine_id in machines and time.time() < end:
1198 self.debug('Waiting for machine {} is destroyed'.format(machine_id))
1199 await asyncio.sleep(0.5)
1200 machines = await model.get_machines()
1201 self.debug('Machine destroyed: {}'.format(machine_id))
1202 else:
1203 self.debug('Machine not found: {}'.format(machine_id))
1204
1205 async def _juju_destroy_model(
1206 self,
1207 model_name: str,
1208 total_timeout: float = None
1209 ):
1210
1211 self.debug('Destroying model {}'.format(model_name))
1212
1213 if total_timeout is None:
1214 total_timeout = 3600
1215
1216 model = await self._juju_get_model(model_name=model_name)
1217 uuid = model.info.uuid
1218
1219 self.debug('disconnecting model {}...'.format(model_name))
1220 await self._juju_disconnect_model(model_name=model_name)
1221 self.juju_models[model_name] = None
1222 self.juju_observers[model_name] = None
1223
1224 self.debug('destroying model {}...'.format(model_name))
1225 await self.controller.destroy_model(uuid)
1226
1227 # wait for model is completely destroyed
1228 end = time.time() + total_timeout
1229 while time.time() < end:
1230 self.debug('waiting for model is destroyed...')
1231 try:
1232 await self.controller.get_model(uuid)
1233 except Exception:
1234 self.debug('model destroyed')
1235 return
1236 await asyncio.sleep(1.0)
1237
1238 async def _juju_login(self):
1239 """Connect to juju controller
1240
1241 """
1242
1243 # if already authenticated, exit function
1244 if self._authenticated:
1245 return
1246
1247 # if connecting, wait for finish
1248 # another task could be trying to connect in parallel
1249 while self._connecting:
1250 await asyncio.sleep(0.1)
1251
1252 # double check after other task has finished
1253 if self._authenticated:
1254 return
1255
1256 try:
1257 self._connecting = True
1258 self.info(
1259 'connecting to juju controller: {} {}:{} ca_cert: {}'
1260 .format(self.url, self.username, self.secret, '\n'+self.ca_cert if self.ca_cert else 'None'))
1261
1262 # Create controller object
1263 self.controller = Controller(loop=self.loop)
1264 # Connect to controller
1265 await self.controller.connect(
1266 endpoint=self.url,
1267 username=self.username,
1268 password=self.secret,
1269 cacert=self.ca_cert
1270 )
1271 self._authenticated = True
1272 self.info('juju controller connected')
1273 except Exception as e:
1274 message = 'Exception connecting to juju: {}'.format(e)
1275 self.error(message)
1276 raise N2VCConnectionException(
1277 message=message,
1278 url=self.url
1279 )
1280 finally:
1281 self._connecting = False
1282
1283 async def _juju_logout(self):
1284 """Logout of the Juju controller."""
1285 if not self._authenticated:
1286 return False
1287
1288 # disconnect all models
1289 for model_name in self.juju_models:
1290 try:
1291 await self._juju_disconnect_model(model_name)
1292 except Exception as e:
1293 self.error('Error disconnecting model {} : {}'.format(model_name, e))
1294 # continue with next model...
1295
1296 self.info("Disconnecting controller")
1297 try:
1298 await self.controller.disconnect()
1299 except Exception as e:
1300 raise N2VCConnectionException(message='Error disconnecting controller: {}'.format(e), url=self.url)
1301
1302 self.controller = None
1303 self._authenticated = False
1304 self.info('disconnected')
1305
1306 async def _juju_disconnect_model(
1307 self,
1308 model_name: str
1309 ):
1310 self.debug("Disconnecting model {}".format(model_name))
1311 if model_name in self.juju_models:
1312 await self.juju_models[model_name].disconnect()
1313 self.juju_models[model_name] = None
1314 self.juju_observers[model_name] = None
1315
1316 def _create_juju_public_key(self):
1317 """Recreate the Juju public key on lcm container, if needed
1318 Certain libjuju commands expect to be run from the same machine as Juju
1319 is bootstrapped to. This method will write the public key to disk in
1320 that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
1321 """
1322
1323 # Make sure that we have a public key before writing to disk
1324 if self.public_key is None or len(self.public_key) == 0:
1325 if 'OSMLCM_VCA_PUBKEY' in os.environ:
1326 self.public_key = os.getenv('OSMLCM_VCA_PUBKEY', '')
1327 if len(self.public_key) == 0:
1328 return
1329 else:
1330 return
1331
1332 pk_path = "{}/.local/share/juju/ssh".format(os.path.expanduser('~'))
1333 file_path = "{}/juju_id_rsa.pub".format(pk_path)
1334 self.debug('writing juju public key to file:\n{}\npublic key: {}'.format(file_path, self.public_key))
1335 if not os.path.exists(pk_path):
1336 # create path and write file
1337 os.makedirs(pk_path)
1338 with open(file_path, 'w') as f:
1339 self.debug('Creating juju public key file: {}'.format(file_path))
1340 f.write(self.public_key)
1341 else:
1342 self.debug('juju public key file already exists: {}'.format(file_path))
1343
1344 @staticmethod
1345 def _format_model_name(name: str) -> str:
1346 """Format the name of the model.
1347
1348 Model names may only contain lowercase letters, digits and hyphens
1349 """
1350
1351 return name.replace('_', '-').replace(' ', '-').lower()
1352
1353 @staticmethod
1354 def _format_app_name(name: str) -> str:
1355 """Format the name of the application (in order to assure valid application name).
1356
1357 Application names have restrictions (run juju deploy --help):
1358 - contains lowercase letters 'a'-'z'
1359 - contains numbers '0'-'9'
1360 - contains hyphens '-'
1361 - starts with a lowercase letter
1362 - not two or more consecutive hyphens
1363 - after a hyphen, not a group with all numbers
1364 """
1365
1366 def all_numbers(s: str) -> bool:
1367 for c in s:
1368 if not c.isdigit():
1369 return False
1370 return True
1371
1372 new_name = name.replace('_', '-')
1373 new_name = new_name.replace(' ', '-')
1374 new_name = new_name.lower()
1375 while new_name.find('--') >= 0:
1376 new_name = new_name.replace('--', '-')
1377 groups = new_name.split('-')
1378
1379 # find 'all numbers' groups and prefix them with a letter
1380 app_name = ''
1381 for i in range(len(groups)):
1382 group = groups[i]
1383 if all_numbers(group):
1384 group = 'z' + group
1385 if i > 0:
1386 app_name += '-'
1387 app_name += group
1388
1389 if app_name[0].isdigit():
1390 app_name = 'z' + app_name
1391
1392 return app_name