Add missing argument in notify_callback
[osm/N2VC.git] / n2vc / n2vc_juju_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import logging
24 import os
25 import asyncio
26 import time
27 import base64
28 import binascii
29 import re
30
31 from n2vc.n2vc_conn import N2VCConnector
32 from n2vc.exceptions \
33 import N2VCBadArgumentsException, N2VCException, N2VCConnectionException, \
34 N2VCExecutionException, N2VCInvalidCertificate
35 from n2vc.juju_observer import JujuModelObserver
36
37 from juju.controller import Controller
38 from juju.model import Model
39 from juju.application import Application
40 from juju.action import Action
41 from juju.machine import Machine
42
43
44 class N2VCJujuConnector(N2VCConnector):
45
46 """
47 ##################################################################################################
48 ########################################## P U B L I C ###########################################
49 ##################################################################################################
50 """
51
52 def __init__(
53 self,
54 db: object,
55 fs: object,
56 log: object = None,
57 loop: object = None,
58 url: str = '127.0.0.1:17070',
59 username: str = 'admin',
60 vca_config: dict = None,
61 on_update_db=None,
62 api_proxy=None
63 ):
64 """Initialize juju N2VC connector
65 """
66
67 # parent class constructor
68 N2VCConnector.__init__(
69 self,
70 db=db,
71 fs=fs,
72 log=log,
73 loop=loop,
74 url=url,
75 username=username,
76 vca_config=vca_config,
77 on_update_db=on_update_db
78 )
79
80 # silence websocket traffic log
81 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
82 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
83 logging.getLogger('model').setLevel(logging.WARN)
84
85 self.info('Initializing N2VC juju connector...')
86
87 """
88 ##############################################################
89 # check arguments
90 ##############################################################
91 """
92
93 # juju URL
94 if url is None:
95 raise N2VCBadArgumentsException('Argument url is mandatory', ['url'])
96 url_parts = url.split(':')
97 if len(url_parts) != 2:
98 raise N2VCBadArgumentsException('Argument url: bad format (localhost:port) -> {}'.format(url), ['url'])
99 self.hostname = url_parts[0]
100 try:
101 self.port = int(url_parts[1])
102 except ValueError:
103 raise N2VCBadArgumentsException('url port must be a number -> {}'.format(url), ['url'])
104
105 # juju USERNAME
106 if username is None:
107 raise N2VCBadArgumentsException('Argument username is mandatory', ['username'])
108
109 # juju CONFIGURATION
110 if vca_config is None:
111 raise N2VCBadArgumentsException('Argument vca_config is mandatory', ['vca_config'])
112
113 if 'secret' in vca_config:
114 self.secret = vca_config['secret']
115 else:
116 raise N2VCBadArgumentsException('Argument vca_config.secret is mandatory', ['vca_config.secret'])
117
118 # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub
119 # if exists, it will be written in lcm container: _create_juju_public_key()
120 if 'public_key' in vca_config:
121 self.public_key = vca_config['public_key']
122 else:
123 self.public_key = None
124
125 # TODO: Verify ca_cert is valid before using. VCA will crash
126 # if the ca_cert isn't formatted correctly.
127 def base64_to_cacert(b64string):
128 """Convert the base64-encoded string containing the VCA CACERT.
129
130 The input string....
131
132 """
133 try:
134 cacert = base64.b64decode(b64string).decode("utf-8")
135
136 cacert = re.sub(
137 r'\\n',
138 r'\n',
139 cacert,
140 )
141 except binascii.Error as e:
142 self.debug("Caught binascii.Error: {}".format(e))
143 raise N2VCInvalidCertificate(message="Invalid CA Certificate")
144
145 return cacert
146
147 self.ca_cert = vca_config.get('ca_cert')
148 if self.ca_cert:
149 self.ca_cert = base64_to_cacert(vca_config['ca_cert'])
150
151 if api_proxy:
152 self.api_proxy = api_proxy
153 else:
154 self.warning('api_proxy is not configured. Support for native charms is disabled')
155
156 self.debug('Arguments have been checked')
157
158 # juju data
159 self.controller = None # it will be filled when connect to juju
160 self.juju_models = {} # model objects for every model_name
161 self.juju_observers = {} # model observers for every model_name
162 self._connecting = False # while connecting to juju (to avoid duplicate connections)
163 self._authenticated = False # it will be True when juju connection be stablished
164 self._creating_model = False # True during model creation
165
166 # create juju pub key file in lcm container at ./local/share/juju/ssh/juju_id_rsa.pub
167 self._create_juju_public_key()
168
169 self.info('N2VC juju connector initialized')
170
171 async def get_status(self, namespace: str):
172 self.info('Getting NS status. namespace: {}'.format(namespace))
173
174 if not self._authenticated:
175 await self._juju_login()
176
177 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
178 # model name is ns_id
179 model_name = ns_id
180 if model_name is None:
181 msg = 'Namespace {} not valid'.format(namespace)
182 self.error(msg)
183 raise N2VCBadArgumentsException(msg, ['namespace'])
184
185 # get juju model (create model if needed)
186 model = await self._juju_get_model(model_name=model_name)
187
188 status = await model.get_status()
189
190 return status
191
192 async def create_execution_environment(
193 self,
194 namespace: str,
195 db_dict: dict,
196 reuse_ee_id: str = None,
197 progress_timeout: float = None,
198 total_timeout: float = None
199 ) -> (str, dict):
200
201 self.info('Creating execution environment. namespace: {}, reuse_ee_id: {}'.format(namespace, reuse_ee_id))
202
203 if not self._authenticated:
204 await self._juju_login()
205
206 machine_id = None
207 if reuse_ee_id:
208 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=reuse_ee_id)
209 else:
210 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
211 # model name is ns_id
212 model_name = ns_id
213 # application name
214 application_name = self._get_application_name(namespace=namespace)
215
216 self.debug('model name: {}, application name: {}, machine_id: {}'
217 .format(model_name, application_name, machine_id))
218
219 # create or reuse a new juju machine
220 try:
221 machine = await self._juju_create_machine(
222 model_name=model_name,
223 application_name=application_name,
224 machine_id=machine_id,
225 db_dict=db_dict,
226 progress_timeout=progress_timeout,
227 total_timeout=total_timeout
228 )
229 except Exception as e:
230 message = 'Error creating machine on juju: {}'.format(e)
231 self.error(message)
232 raise N2VCException(message=message)
233
234 # id for the execution environment
235 ee_id = N2VCJujuConnector._build_ee_id(
236 model_name=model_name,
237 application_name=application_name,
238 machine_id=str(machine.entity_id)
239 )
240 self.debug('ee_id: {}'.format(ee_id))
241
242 # new machine credentials
243 credentials = dict()
244 credentials['hostname'] = machine.dns_name
245
246 self.info('Execution environment created. ee_id: {}, credentials: {}'.format(ee_id, credentials))
247
248 return ee_id, credentials
249
250 async def register_execution_environment(
251 self,
252 namespace: str,
253 credentials: dict,
254 db_dict: dict,
255 progress_timeout: float = None,
256 total_timeout: float = None
257 ) -> str:
258
259 if not self._authenticated:
260 await self._juju_login()
261
262 self.info('Registering execution environment. namespace={}, credentials={}'.format(namespace, credentials))
263
264 if credentials is None:
265 raise N2VCBadArgumentsException(message='credentials are mandatory', bad_args=['credentials'])
266 if 'hostname' in credentials:
267 hostname = credentials['hostname']
268 else:
269 raise N2VCBadArgumentsException(message='hostname is mandatory', bad_args=['credentials.hostname'])
270 if 'username' in credentials:
271 username = credentials['username']
272 else:
273 raise N2VCBadArgumentsException(message='username is mandatory', bad_args=['credentials.username'])
274 if 'private_key_path' in credentials:
275 private_key_path = credentials['private_key_path']
276 else:
277 # if not passed as argument, use generated private key path
278 private_key_path = self.private_key_path
279
280 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
281
282 # model name
283 model_name = ns_id
284 # application name
285 application_name = self._get_application_name(namespace=namespace)
286
287 # register machine on juju
288 try:
289 machine = await self._juju_provision_machine(
290 model_name=model_name,
291 hostname=hostname,
292 username=username,
293 private_key_path=private_key_path,
294 db_dict=db_dict,
295 progress_timeout=progress_timeout,
296 total_timeout=total_timeout
297 )
298 except Exception as e:
299 self.error('Error registering machine: {}'.format(e))
300 raise N2VCException(message='Error registering machine on juju: {}'.format(e))
301 self.info('Machine registered')
302
303 # id for the execution environment
304 ee_id = N2VCJujuConnector._build_ee_id(
305 model_name=model_name,
306 application_name=application_name,
307 machine_id=str(machine.entity_id)
308 )
309
310 self.info('Execution environment registered. ee_id: {}'.format(ee_id))
311
312 return ee_id
313
314 async def install_configuration_sw(
315 self,
316 ee_id: str,
317 artifact_path: str,
318 db_dict: dict,
319 progress_timeout: float = None,
320 total_timeout: float = None
321 ):
322
323 self.info('Installing configuration sw on ee_id: {}, artifact path: {}, db_dict: {}'
324 .format(ee_id, artifact_path, db_dict))
325
326 if not self._authenticated:
327 await self._juju_login()
328
329 # check arguments
330 if ee_id is None or len(ee_id) == 0:
331 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
332 if artifact_path is None or len(artifact_path) == 0:
333 raise N2VCBadArgumentsException(message='artifact_path is mandatory', bad_args=['artifact_path'])
334 if db_dict is None:
335 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
336
337 try:
338 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
339 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
340 except Exception as e:
341 raise N2VCBadArgumentsException(
342 message='ee_id={} is not a valid execution environment id'.format(ee_id),
343 bad_args=['ee_id']
344 )
345
346 # remove // in charm path
347 while artifact_path.find('//') >= 0:
348 artifact_path = artifact_path.replace('//', '/')
349
350 # check charm path
351 if not self.fs.file_exists(artifact_path, mode="dir"):
352 msg = 'artifact path does not exist: {}'.format(artifact_path)
353 raise N2VCBadArgumentsException(message=msg, bad_args=['artifact_path'])
354
355 if artifact_path.startswith('/'):
356 full_path = self.fs.path + artifact_path
357 else:
358 full_path = self.fs.path + '/' + artifact_path
359
360 try:
361 application, retries = await self._juju_deploy_charm(
362 model_name=model_name,
363 application_name=application_name,
364 charm_path=full_path,
365 machine_id=machine_id,
366 db_dict=db_dict,
367 progress_timeout=progress_timeout,
368 total_timeout=total_timeout
369 )
370 except Exception as e:
371 raise N2VCException(message='Error desploying charm into ee={} : {}'.format(ee_id, e))
372
373 self.info('Configuration sw installed')
374
375 async def get_ee_ssh_public__key(
376 self,
377 ee_id: str,
378 db_dict: dict,
379 progress_timeout: float = None,
380 total_timeout: float = None
381 ) -> str:
382
383 self.info('Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}'.format(ee_id, db_dict))
384
385 if not self._authenticated:
386 await self._juju_login()
387
388 # check arguments
389 if ee_id is None or len(ee_id) == 0:
390 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
391 if db_dict is None:
392 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
393
394 try:
395 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
396 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
397 except Exception as e:
398 raise N2VCBadArgumentsException(
399 message='ee_id={} is not a valid execution environment id'.format(ee_id),
400 bad_args=['ee_id']
401 )
402
403 # try to execute ssh layer primitives (if exist):
404 # generate-ssh-key
405 # get-ssh-public-key
406
407 output = None
408
409 # execute action: generate-ssh-key
410 try:
411 output, status = await self._juju_execute_action(
412 model_name=model_name,
413 application_name=application_name,
414 action_name='generate-ssh-key',
415 db_dict=db_dict,
416 progress_timeout=progress_timeout,
417 total_timeout=total_timeout
418 )
419 except Exception as e:
420 self.info('Cannot execute action generate-ssh-key: {}\nContinuing...'.format(e))
421
422 # execute action: get-ssh-public-key
423 try:
424 output, status = await self._juju_execute_action(
425 model_name=model_name,
426 application_name=application_name,
427 action_name='get-ssh-public-key',
428 db_dict=db_dict,
429 progress_timeout=progress_timeout,
430 total_timeout=total_timeout
431 )
432 except Exception as e:
433 msg = 'Cannot execute action get-ssh-public-key: {}\n'.format(e)
434 self.info(msg)
435 raise e
436
437 # return public key if exists
438 return output
439
440 async def add_relation(
441 self,
442 ee_id_1: str,
443 ee_id_2: str,
444 endpoint_1: str,
445 endpoint_2: str
446 ):
447
448 self.debug('adding new relation between {} and {}, endpoints: {}, {}'
449 .format(ee_id_1, ee_id_2, endpoint_1, endpoint_2))
450
451 if not self._authenticated:
452 await self._juju_login()
453
454 # get model, application and machines
455 model_1, app_1, machine_1 = self._get_ee_id_components(ee_id_1)
456 model_2, app_2, machine_2 = self._get_ee_id_components(ee_id_2)
457
458 # model must be the same
459 if model_1 != model_2:
460 message = 'EE models are not the same: {} vs {}'.format(ee_id_1, ee_id_2)
461 self.error(message)
462 raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_1', 'ee_id_2'])
463
464 # add juju relations between two applications
465 try:
466 self._juju_add_relation()
467 except Exception as e:
468 message = 'Error adding relation between {} and {}'.format(ee_id_1, ee_id_2)
469 self.error(message)
470 raise N2VCException(message=message)
471
472 async def remove_relation(
473 self
474 ):
475 if not self._authenticated:
476 await self._juju_login()
477 # TODO
478 self.info('Method not implemented yet')
479 raise NotImplemented()
480
481 async def deregister_execution_environments(
482 self
483 ):
484 if not self._authenticated:
485 await self._juju_login()
486 # TODO
487 self.info('Method not implemented yet')
488 raise NotImplemented()
489
490 async def delete_namespace(
491 self,
492 namespace: str,
493 db_dict: dict = None,
494 total_timeout: float = None
495 ):
496 self.info('Deleting namespace={}'.format(namespace))
497
498 if not self._authenticated:
499 await self._juju_login()
500
501 # check arguments
502 if namespace is None:
503 raise N2VCBadArgumentsException(message='namespace is mandatory', bad_args=['namespace'])
504
505 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
506 if ns_id is not None:
507 self.debug('Deleting model {}'.format(ns_id))
508 try:
509 await self._juju_destroy_model(
510 model_name=ns_id,
511 total_timeout=total_timeout
512 )
513 except Exception as e:
514 raise N2VCException(message='Error deleting namespace {} : {}'.format(namespace, e))
515 else:
516 raise N2VCBadArgumentsException(message='only ns_id is permitted to delete yet', bad_args=['namespace'])
517
518 self.info('Namespace {} deleted'.format(namespace))
519
520 async def delete_execution_environment(
521 self,
522 ee_id: str,
523 db_dict: dict = None,
524 total_timeout: float = None
525 ):
526 self.info('Deleting execution environment ee_id={}'.format(ee_id))
527
528 if not self._authenticated:
529 await self._juju_login()
530
531 # check arguments
532 if ee_id is None:
533 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
534
535 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=ee_id)
536
537 # destroy the application
538 try:
539 await self._juju_destroy_application(model_name=model_name, application_name=application_name)
540 except Exception as e:
541 raise N2VCException(message='Error deleting execution environment {} (application {}) : {}'
542 .format(ee_id, application_name, e))
543
544 # destroy the machine
545 try:
546 await self._juju_destroy_machine(
547 model_name=model_name,
548 machine_id=machine_id,
549 total_timeout=total_timeout
550 )
551 except Exception as e:
552 raise N2VCException(message='Error deleting execution environment {} (machine {}) : {}'
553 .format(ee_id, machine_id, e))
554
555 self.info('Execution environment {} deleted'.format(ee_id))
556
557 async def exec_primitive(
558 self,
559 ee_id: str,
560 primitive_name: str,
561 params_dict: dict,
562 db_dict: dict = None,
563 progress_timeout: float = None,
564 total_timeout: float = None
565 ) -> str:
566
567 self.info('Executing primitive: {} on ee: {}, params: {}'.format(primitive_name, ee_id, params_dict))
568
569 if not self._authenticated:
570 await self._juju_login()
571
572 # check arguments
573 if ee_id is None or len(ee_id) == 0:
574 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
575 if primitive_name is None or len(primitive_name) == 0:
576 raise N2VCBadArgumentsException(message='action_name is mandatory', bad_args=['action_name'])
577 if params_dict is None:
578 params_dict = dict()
579
580 try:
581 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
582 except Exception:
583 raise N2VCBadArgumentsException(
584 message='ee_id={} is not a valid execution environment id'.format(ee_id),
585 bad_args=['ee_id']
586 )
587
588 if primitive_name == 'config':
589 # Special case: config primitive
590 try:
591 await self._juju_configure_application(
592 model_name=model_name,
593 application_name=application_name,
594 config=params_dict,
595 db_dict=db_dict,
596 progress_timeout=progress_timeout,
597 total_timeout=total_timeout
598 )
599 except Exception as e:
600 self.error('Error configuring juju application: {}'.format(e))
601 raise N2VCExecutionException(
602 message='Error configuring application into ee={} : {}'.format(ee_id, e),
603 primitive_name=primitive_name
604 )
605 return 'CONFIG OK'
606 else:
607 try:
608 output, status = await self._juju_execute_action(
609 model_name=model_name,
610 application_name=application_name,
611 action_name=primitive_name,
612 db_dict=db_dict,
613 progress_timeout=progress_timeout,
614 total_timeout=total_timeout,
615 **params_dict
616 )
617 if status == 'completed':
618 return output
619 else:
620 raise Exception('status is not completed: {}'.format(status))
621 except Exception as e:
622 self.error('Error executing primitive {}: {}'.format(primitive_name, e))
623 raise N2VCExecutionException(
624 message='Error executing primitive {} into ee={} : {}'.format(primitive_name, ee_id, e),
625 primitive_name=primitive_name
626 )
627
628 async def disconnect(self):
629 self.info('closing juju N2VC...')
630 await self._juju_logout()
631
632 """
633 ##################################################################################################
634 ########################################## P R I V A T E #########################################
635 ##################################################################################################
636 """
637
638 def _write_ee_id_db(
639 self,
640 db_dict: dict,
641 ee_id: str
642 ):
643
644 # write ee_id to database: _admin.deployed.VCA.x
645 try:
646 the_table = db_dict['collection']
647 the_filter = db_dict['filter']
648 the_path = db_dict['path']
649 if not the_path[-1] == '.':
650 the_path = the_path + '.'
651 update_dict = {the_path + 'ee_id': ee_id}
652 self.debug('Writing ee_id to database: {}'.format(the_path))
653 self.db.set_one(
654 table=the_table,
655 q_filter=the_filter,
656 update_dict=update_dict,
657 fail_on_empty=True
658 )
659 except Exception as e:
660 self.error('Error writing ee_id to database: {}'.format(e))
661
662 @staticmethod
663 def _build_ee_id(
664 model_name: str,
665 application_name: str,
666 machine_id: str
667 ):
668 """
669 Build an execution environment id form model, application and machine
670 :param model_name:
671 :param application_name:
672 :param machine_id:
673 :return:
674 """
675 # id for the execution environment
676 return '{}.{}.{}'.format(model_name, application_name, machine_id)
677
678 @staticmethod
679 def _get_ee_id_components(
680 ee_id: str
681 ) -> (str, str, str):
682 """
683 Get model, application and machine components from an execution environment id
684 :param ee_id:
685 :return: model_name, application_name, machine_id
686 """
687
688 if ee_id is None:
689 return None, None, None
690
691 # split components of id
692 parts = ee_id.split('.')
693 model_name = parts[0]
694 application_name = parts[1]
695 machine_id = parts[2]
696 return model_name, application_name, machine_id
697
698 def _get_application_name(self, namespace: str) -> str:
699 """
700 Build application name from namespace
701 :param namespace:
702 :return: app-vnf-<vnf id>-vdu-<vdu-id>-cnt-<vdu-count>
703 """
704
705 # split namespace components
706 _, _, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
707
708 if vnf_id is None or len(vnf_id) == 0:
709 vnf_id = ''
710 else:
711 vnf_id = 'vnf-' + vnf_id
712
713 if vdu_id is None or len(vdu_id) == 0:
714 vdu_id = ''
715 else:
716 vdu_id = '-vdu-' + vdu_id
717
718 if vdu_count is None or len(vdu_count) == 0:
719 vdu_count = ''
720 else:
721 vdu_count = '-cnt-' + vdu_count
722
723 application_name = 'app-{}{}{}'.format(vnf_id, vdu_id, vdu_count)
724
725 return N2VCJujuConnector._format_app_name(application_name)
726
727 async def _juju_create_machine(
728 self,
729 model_name: str,
730 application_name: str,
731 machine_id: str = None,
732 db_dict: dict = None,
733 progress_timeout: float = None,
734 total_timeout: float = None
735 ) -> Machine:
736
737 self.debug('creating machine in model: {}, existing machine id: {}'.format(model_name, machine_id))
738
739 # get juju model and observer (create model if needed)
740 model = await self._juju_get_model(model_name=model_name)
741 observer = self.juju_observers[model_name]
742
743 # find machine id in model
744 machine = None
745 if machine_id is not None:
746 self.debug('Finding existing machine id {} in model'.format(machine_id))
747 # get juju existing machines in the model
748 existing_machines = await model.get_machines()
749 if machine_id in existing_machines:
750 self.debug('Machine id {} found in model (reusing it)'.format(machine_id))
751 machine = model.machines[machine_id]
752
753 if machine is None:
754 self.debug('Creating a new machine in juju...')
755 # machine does not exist, create it and wait for it
756 machine = await model.add_machine(
757 spec=None,
758 constraints=None,
759 disks=None,
760 series='xenial'
761 )
762
763 # register machine with observer
764 observer.register_machine(machine=machine, db_dict=db_dict)
765
766 # id for the execution environment
767 ee_id = N2VCJujuConnector._build_ee_id(
768 model_name=model_name,
769 application_name=application_name,
770 machine_id=str(machine.entity_id)
771 )
772
773 # write ee_id in database
774 self._write_ee_id_db(
775 db_dict=db_dict,
776 ee_id=ee_id
777 )
778
779 # wait for machine creation
780 await observer.wait_for_machine(
781 machine_id=str(machine.entity_id),
782 progress_timeout=progress_timeout,
783 total_timeout=total_timeout
784 )
785
786 else:
787
788 self.debug('Reusing old machine pending')
789
790 # register machine with observer
791 observer.register_machine(machine=machine, db_dict=db_dict)
792
793 # machine does exist, but it is in creation process (pending), wait for create finalisation
794 await observer.wait_for_machine(
795 machine_id=machine.entity_id,
796 progress_timeout=progress_timeout,
797 total_timeout=total_timeout)
798
799 self.debug("Machine ready at " + str(machine.dns_name))
800 return machine
801
802 async def _juju_provision_machine(
803 self,
804 model_name: str,
805 hostname: str,
806 username: str,
807 private_key_path: str,
808 db_dict: dict = None,
809 progress_timeout: float = None,
810 total_timeout: float = None
811 ) -> Machine:
812
813 self.debug('provisioning machine. model: {}, hostname: {}'.format(model_name, hostname))
814
815 if not self._authenticated:
816 await self._juju_login()
817
818 # get juju model and observer
819 model = await self._juju_get_model(model_name=model_name)
820 observer = self.juju_observers[model_name]
821
822 spec = 'ssh:{}@{}:{}'.format(username, hostname, private_key_path)
823 self.debug('provisioning machine {}'.format(spec))
824 try:
825 machine = await model.add_machine(spec=spec)
826 except Exception as e:
827 import sys
828 import traceback
829 traceback.print_exc(file=sys.stdout)
830 print('-' * 60)
831 raise e
832
833 # register machine with observer
834 observer.register_machine(machine=machine, db_dict=db_dict)
835
836 # wait for machine creation
837 self.debug('waiting for provision completed... {}'.format(machine.entity_id))
838 await observer.wait_for_machine(
839 machine=machine,
840 progress_timeout=progress_timeout,
841 total_timeout=total_timeout
842 )
843
844 self.debug("Machine provisioned {}".format(machine.entity_id))
845 return machine
846
847 async def _juju_deploy_charm(
848 self,
849 model_name: str,
850 application_name: str,
851 charm_path: str,
852 machine_id: str,
853 db_dict: dict,
854 progress_timeout: float = None,
855 total_timeout: float = None
856 ) -> (Application, int):
857
858 # get juju model and observer
859 model = await self._juju_get_model(model_name=model_name)
860 observer = self.juju_observers[model_name]
861
862 # check if application already exists
863 application = None
864 if application_name in model.applications:
865 application = model.applications[application_name]
866
867 if application is None:
868
869 # application does not exist, create it and wait for it
870 self.debug('deploying application {} to machine {}, model {}'
871 .format(application_name, machine_id, model_name))
872 self.debug('charm: {}'.format(charm_path))
873 application = await model.deploy(
874 entity_url=charm_path,
875 application_name=application_name,
876 channel='stable',
877 num_units=1,
878 series='xenial',
879 to=machine_id
880 )
881
882 # register application with observer
883 observer.register_application(application=application, db_dict=db_dict)
884
885 self.debug('waiting for application deployed... {}'.format(application.entity_id))
886 retries = await observer.wait_for_application(
887 application_id=application.entity_id,
888 progress_timeout=progress_timeout,
889 total_timeout=total_timeout)
890 self.debug('application deployed')
891
892 else:
893
894 # register application with observer
895 observer.register_application(application=application, db_dict=db_dict)
896
897 # application already exists, but not finalised
898 self.debug('application already exists, waiting for deployed...')
899 retries = await observer.wait_for_application(
900 application_id=application.entity_id,
901 progress_timeout=progress_timeout,
902 total_timeout=total_timeout)
903 self.debug('application deployed')
904
905 return application, retries
906
907 async def _juju_execute_action(
908 self,
909 model_name: str,
910 application_name: str,
911 action_name: str,
912 db_dict: dict,
913 progress_timeout: float = None,
914 total_timeout: float = None,
915 **kwargs
916 ) -> Action:
917
918 # get juju model and observer
919 model = await self._juju_get_model(model_name=model_name)
920 observer = self.juju_observers[model_name]
921
922 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
923
924 self.debug('trying to execute action {}'.format(action_name))
925 unit = application.units[0]
926 if unit is not None:
927 actions = await application.get_actions()
928 if action_name in actions:
929 self.debug('executing action {} with params {}'.format(action_name, kwargs))
930 action = await unit.run_action(action_name, **kwargs)
931
932 # register action with observer
933 observer.register_action(action=action, db_dict=db_dict)
934
935 self.debug(' waiting for action completed or error...')
936 await observer.wait_for_action(
937 action_id=action.entity_id,
938 progress_timeout=progress_timeout,
939 total_timeout=total_timeout)
940 self.debug('action completed with status: {}'.format(action.status))
941 output = await model.get_action_output(action_uuid=action.entity_id)
942 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
943 if action.entity_id in status:
944 status = status[action.entity_id]
945 else:
946 status = 'failed'
947 return output, status
948
949 raise N2VCExecutionException(
950 message='Cannot execute action on charm',
951 primitive_name=action_name
952 )
953
954 async def _juju_configure_application(
955 self,
956 model_name: str,
957 application_name: str,
958 config: dict,
959 db_dict: dict,
960 progress_timeout: float = None,
961 total_timeout: float = None
962 ):
963
964 # get juju model
965 model = await self._juju_get_model(model_name=model_name)
966
967 # get the application
968 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
969
970 self.debug('configuring the application {} -> {}'.format(application_name, config))
971 res = await application.set_config(config)
972 self.debug('application {} configured. res={}'.format(application_name, res))
973
974 # Verify the config is set
975 new_conf = await application.get_config()
976 for key in config:
977 value = new_conf[key]['value']
978 self.debug(' {} = {}'.format(key, value))
979 if config[key] != value:
980 raise N2VCException(
981 message='key {} is not configured correctly {} != {}'.format(key, config[key], new_conf[key])
982 )
983
984 # check if 'verify-ssh-credentials' action exists
985 unit = application.units[0]
986 actions = await application.get_actions()
987 if 'verify-ssh-credentials' not in actions:
988 msg = 'Action verify-ssh-credentials does not exist in application {}'.format(application_name)
989 return False
990
991 # execute verify-credentials
992 num_retries = 20
993 retry_timeout = 15.0
994 for i in range(num_retries):
995 try:
996 self.debug('Executing action verify-ssh-credentials...')
997 output, ok = await self._juju_execute_action(
998 model_name=model_name,
999 application_name=application_name,
1000 action_name='verify-ssh-credentials',
1001 db_dict=db_dict,
1002 progress_timeout=progress_timeout,
1003 total_timeout=total_timeout
1004 )
1005 self.debug('Result: {}, output: {}'.format(ok, output))
1006 return True
1007 except Exception as e:
1008 self.debug('Error executing verify-ssh-credentials: {}. Retrying...'.format(e))
1009 await asyncio.sleep(retry_timeout)
1010 else:
1011 self.error('Error executing verify-ssh-credentials after {} retries. '.format(num_retries))
1012 return False
1013
1014 async def _juju_get_application(
1015 self,
1016 model_name: str,
1017 application_name: str
1018 ):
1019 """Get the deployed application."""
1020
1021 model = await self._juju_get_model(model_name=model_name)
1022
1023 application_name = N2VCJujuConnector._format_app_name(application_name)
1024
1025 if model.applications and application_name in model.applications:
1026 return model.applications[application_name]
1027 else:
1028 raise N2VCException(message='Cannot get application {} from model {}'.format(application_name, model_name))
1029
1030 async def _juju_get_model(self, model_name: str) -> Model:
1031 """ Get a model object from juju controller
1032
1033 :param str model_name: name of the model
1034 :returns Model: model obtained from juju controller or Exception
1035 """
1036
1037 # format model name
1038 model_name = N2VCJujuConnector._format_model_name(model_name)
1039
1040 if model_name in self.juju_models:
1041 return self.juju_models[model_name]
1042
1043 if self._creating_model:
1044 self.debug('Another coroutine is creating a model. Wait...')
1045 while self._creating_model:
1046 # another coroutine is creating a model, wait
1047 await asyncio.sleep(0.1)
1048 # retry (perhaps another coroutine has created the model meanwhile)
1049 if model_name in self.juju_models:
1050 return self.juju_models[model_name]
1051
1052 try:
1053 self._creating_model = True
1054
1055 # get juju model names from juju
1056 model_list = await self.controller.list_models()
1057
1058 if model_name not in model_list:
1059 self.info('Model {} does not exist. Creating new model...'.format(model_name))
1060 model = await self.controller.add_model(
1061 model_name=model_name,
1062 config={'authorized-keys': self.public_key}
1063 )
1064 self.info('New model created, name={}'.format(model_name))
1065 else:
1066 self.debug('Model already exists in juju. Getting model {}'.format(model_name))
1067 model = await self.controller.get_model(model_name)
1068 self.debug('Existing model in juju, name={}'.format(model_name))
1069
1070 self.juju_models[model_name] = model
1071 self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model)
1072 return model
1073
1074 except Exception as e:
1075 msg = 'Cannot get model {}. Exception: {}'.format(model_name, e)
1076 self.error(msg)
1077 raise N2VCException(msg)
1078 finally:
1079 self._creating_model = False
1080
1081 async def _juju_add_relation(
1082 self,
1083 model_name: str,
1084 application_name_1: str,
1085 application_name_2: str,
1086 relation_1: str,
1087 relation_2: str
1088 ):
1089
1090 self.debug('adding relation')
1091
1092 # get juju model and observer
1093 model = await self._juju_get_model(model_name=model_name)
1094
1095 r1 = '{}:{}'.format(application_name_1, relation_1)
1096 r2 = '{}:{}'.format(application_name_2, relation_2)
1097 await model.add_relation(relation1=r1, relation2=r2)
1098
1099 async def _juju_destroy_application(
1100 self,
1101 model_name: str,
1102 application_name: str
1103 ):
1104
1105 self.debug('Destroying application {} in model {}'.format(application_name, model_name))
1106
1107 # get juju model and observer
1108 model = await self._juju_get_model(model_name=model_name)
1109
1110 application = model.applications.get(application_name)
1111 if application:
1112 await application.destroy()
1113 else:
1114 self.debug('Application not found: {}'.format(application_name))
1115
1116 async def _juju_destroy_machine(
1117 self,
1118 model_name: str,
1119 machine_id: str,
1120 total_timeout: float = None
1121 ):
1122
1123 self.debug('Destroying machine {} in model {}'.format(machine_id, model_name))
1124
1125 if total_timeout is None:
1126 total_timeout = 3600
1127
1128 # get juju model and observer
1129 model = await self._juju_get_model(model_name=model_name)
1130
1131 machines = await model.get_machines()
1132 if machine_id in machines:
1133 machine = model.machines[machine_id]
1134 await machine.destroy(force=True)
1135 # max timeout
1136 end = time.time() + total_timeout
1137 # wait for machine removal
1138 machines = await model.get_machines()
1139 while machine_id in machines and time.time() < end:
1140 self.debug('Waiting for machine {} is destroyed'.format(machine_id))
1141 await asyncio.sleep(0.5)
1142 machines = await model.get_machines()
1143 self.debug('Machine destroyed: {}'.format(machine_id))
1144 else:
1145 self.debug('Machine not found: {}'.format(machine_id))
1146
1147 async def _juju_destroy_model(
1148 self,
1149 model_name: str,
1150 total_timeout: float = None
1151 ):
1152
1153 self.debug('Destroying model {}'.format(model_name))
1154
1155 if total_timeout is None:
1156 total_timeout = 3600
1157
1158 model = await self._juju_get_model(model_name=model_name)
1159 uuid = model.info.uuid
1160
1161 self.debug('disconnecting model {}...'.format(model_name))
1162 await self._juju_disconnect_model(model_name=model_name)
1163 self.juju_models[model_name] = None
1164 self.juju_observers[model_name] = None
1165
1166 self.debug('destroying model {}...'.format(model_name))
1167 await self.controller.destroy_model(uuid)
1168
1169 # wait for model is completely destroyed
1170 end = time.time() + total_timeout
1171 while time.time() < end:
1172 self.debug('waiting for model is destroyed...')
1173 try:
1174 await self.controller.get_model(uuid)
1175 except Exception:
1176 self.debug('model destroyed')
1177 return
1178 await asyncio.sleep(1.0)
1179
1180 async def _juju_login(self):
1181 """Connect to juju controller
1182
1183 """
1184
1185 # if already authenticated, exit function
1186 if self._authenticated:
1187 return
1188
1189 # if connecting, wait for finish
1190 # another task could be trying to connect in parallel
1191 while self._connecting:
1192 await asyncio.sleep(0.1)
1193
1194 # double check after other task has finished
1195 if self._authenticated:
1196 return
1197
1198 try:
1199 self._connecting = True
1200 self.info(
1201 'connecting to juju controller: {} {}:{} ca_cert: {}'
1202 .format(self.url, self.username, self.secret, '\n'+self.ca_cert if self.ca_cert else 'None'))
1203
1204 # Create controller object
1205 self.controller = Controller(loop=self.loop)
1206 # Connect to controller
1207 await self.controller.connect(
1208 endpoint=self.url,
1209 username=self.username,
1210 password=self.secret,
1211 cacert=self.ca_cert
1212 )
1213 self._authenticated = True
1214 self.info('juju controller connected')
1215 except Exception as e:
1216 message = 'Exception connecting to juju: {}'.format(e)
1217 self.error(message)
1218 raise N2VCConnectionException(
1219 message=message,
1220 url=self.url
1221 )
1222 finally:
1223 self._connecting = False
1224
1225 async def _juju_logout(self):
1226 """Logout of the Juju controller."""
1227 if not self._authenticated:
1228 return False
1229
1230 # disconnect all models
1231 for model_name in self.juju_models:
1232 try:
1233 await self._juju_disconnect_model(model_name)
1234 except Exception as e:
1235 self.error('Error disconnecting model {} : {}'.format(model_name, e))
1236 # continue with next model...
1237
1238 self.info("Disconnecting controller")
1239 try:
1240 await self.controller.disconnect()
1241 except Exception as e:
1242 raise N2VCConnectionException(message='Error disconnecting controller: {}'.format(e), url=self.url)
1243
1244 self.controller = None
1245 self._authenticated = False
1246 self.info('disconnected')
1247
1248 async def _juju_disconnect_model(
1249 self,
1250 model_name: str
1251 ):
1252 self.debug("Disconnecting model {}".format(model_name))
1253 if model_name in self.juju_models:
1254 await self.juju_models[model_name].disconnect()
1255 self.juju_models[model_name] = None
1256 self.juju_observers[model_name] = None
1257
1258 def _create_juju_public_key(self):
1259 """Recreate the Juju public key on lcm container, if needed
1260 Certain libjuju commands expect to be run from the same machine as Juju
1261 is bootstrapped to. This method will write the public key to disk in
1262 that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
1263 """
1264
1265 # Make sure that we have a public key before writing to disk
1266 if self.public_key is None or len(self.public_key) == 0:
1267 if 'OSMLCM_VCA_PUBKEY' in os.environ:
1268 self.public_key = os.getenv('OSMLCM_VCA_PUBKEY', '')
1269 if len(self.public_key) == 0:
1270 return
1271 else:
1272 return
1273
1274 pk_path = "{}/.local/share/juju/ssh".format(os.path.expanduser('~'))
1275 file_path = "{}/juju_id_rsa.pub".format(pk_path)
1276 self.debug('writing juju public key to file:\n{}\npublic key: {}'.format(file_path, self.public_key))
1277 if not os.path.exists(pk_path):
1278 # create path and write file
1279 os.makedirs(pk_path)
1280 with open(file_path, 'w') as f:
1281 self.debug('Creating juju public key file: {}'.format(file_path))
1282 f.write(self.public_key)
1283 else:
1284 self.debug('juju public key file already exists: {}'.format(file_path))
1285
1286 @staticmethod
1287 def _format_model_name(name: str) -> str:
1288 """Format the name of the model.
1289
1290 Model names may only contain lowercase letters, digits and hyphens
1291 """
1292
1293 return name.replace('_', '-').replace(' ', '-').lower()
1294
1295 @staticmethod
1296 def _format_app_name(name: str) -> str:
1297 """Format the name of the application (in order to assure valid application name).
1298
1299 Application names have restrictions (run juju deploy --help):
1300 - contains lowercase letters 'a'-'z'
1301 - contains numbers '0'-'9'
1302 - contains hyphens '-'
1303 - starts with a lowercase letter
1304 - not two or more consecutive hyphens
1305 - after a hyphen, not a group with all numbers
1306 """
1307
1308 def all_numbers(s: str) -> bool:
1309 for c in s:
1310 if not c.isdigit():
1311 return False
1312 return True
1313
1314 new_name = name.replace('_', '-')
1315 new_name = new_name.replace(' ', '-')
1316 new_name = new_name.lower()
1317 while new_name.find('--') >= 0:
1318 new_name = new_name.replace('--', '-')
1319 groups = new_name.split('-')
1320
1321 # find 'all numbers' groups and prefix them with a letter
1322 app_name = ''
1323 for i in range(len(groups)):
1324 group = groups[i]
1325 if all_numbers(group):
1326 group = 'z' + group
1327 if i > 0:
1328 app_name += '-'
1329 app_name += group
1330
1331 if app_name[0].isdigit():
1332 app_name = 'z' + app_name
1333
1334 return app_name