a9f9b764b1c419a1f89299b9bfc4dc03bef763fa
[osm/N2VC.git] / n2vc / n2vc_juju_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import logging
24 import os
25 import asyncio
26 import time
27 import base64
28 import binascii
29 import re
30
31 from n2vc.n2vc_conn import N2VCConnector
32 from n2vc.exceptions \
33 import N2VCBadArgumentsException, N2VCException, N2VCConnectionException, \
34 N2VCExecutionException, N2VCInvalidCertificate
35 from n2vc.juju_observer import JujuModelObserver
36
37 from juju.controller import Controller
38 from juju.model import Model
39 from juju.application import Application
40 from juju.action import Action
41 from juju.machine import Machine
42 from juju.client import client
43
44 from n2vc.provisioner import SSHProvisioner
45
46
47 class N2VCJujuConnector(N2VCConnector):
48
49 """
50 ##################################################################################################
51 ########################################## P U B L I C ###########################################
52 ##################################################################################################
53 """
54
55 def __init__(
56 self,
57 db: object,
58 fs: object,
59 log: object = None,
60 loop: object = None,
61 url: str = '127.0.0.1:17070',
62 username: str = 'admin',
63 vca_config: dict = None,
64 on_update_db=None
65 ):
66 """Initialize juju N2VC connector
67 """
68
69 # parent class constructor
70 N2VCConnector.__init__(
71 self,
72 db=db,
73 fs=fs,
74 log=log,
75 loop=loop,
76 url=url,
77 username=username,
78 vca_config=vca_config,
79 on_update_db=on_update_db
80 )
81
82 # silence websocket traffic log
83 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
84 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
85 logging.getLogger('model').setLevel(logging.WARN)
86
87 self.info('Initializing N2VC juju connector...')
88
89 """
90 ##############################################################
91 # check arguments
92 ##############################################################
93 """
94
95 # juju URL
96 if url is None:
97 raise N2VCBadArgumentsException('Argument url is mandatory', ['url'])
98 url_parts = url.split(':')
99 if len(url_parts) != 2:
100 raise N2VCBadArgumentsException('Argument url: bad format (localhost:port) -> {}'.format(url), ['url'])
101 self.hostname = url_parts[0]
102 try:
103 self.port = int(url_parts[1])
104 except ValueError:
105 raise N2VCBadArgumentsException('url port must be a number -> {}'.format(url), ['url'])
106
107 # juju USERNAME
108 if username is None:
109 raise N2VCBadArgumentsException('Argument username is mandatory', ['username'])
110
111 # juju CONFIGURATION
112 if vca_config is None:
113 raise N2VCBadArgumentsException('Argument vca_config is mandatory', ['vca_config'])
114
115 if 'secret' in vca_config:
116 self.secret = vca_config['secret']
117 else:
118 raise N2VCBadArgumentsException('Argument vca_config.secret is mandatory', ['vca_config.secret'])
119
120 # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub
121 # if exists, it will be written in lcm container: _create_juju_public_key()
122 if 'public_key' in vca_config:
123 self.public_key = vca_config['public_key']
124 else:
125 self.public_key = None
126
127 # TODO: Verify ca_cert is valid before using. VCA will crash
128 # if the ca_cert isn't formatted correctly.
129 def base64_to_cacert(b64string):
130 """Convert the base64-encoded string containing the VCA CACERT.
131
132 The input string....
133
134 """
135 try:
136 cacert = base64.b64decode(b64string).decode("utf-8")
137
138 cacert = re.sub(
139 r'\\n',
140 r'\n',
141 cacert,
142 )
143 except binascii.Error as e:
144 self.debug("Caught binascii.Error: {}".format(e))
145 raise N2VCInvalidCertificate(message="Invalid CA Certificate")
146
147 return cacert
148
149 self.ca_cert = vca_config.get('ca_cert')
150 if self.ca_cert:
151 self.ca_cert = base64_to_cacert(vca_config['ca_cert'])
152
153 if 'api_proxy' in vca_config:
154 self.api_proxy = vca_config['api_proxy']
155 self.debug('api_proxy for native charms configured: {}'.format(self.api_proxy))
156 else:
157 self.warning('api_proxy is not configured. Support for native charms is disabled')
158
159 self.debug('Arguments have been checked')
160
161 # juju data
162 self.controller = None # it will be filled when connect to juju
163 self.juju_models = {} # model objects for every model_name
164 self.juju_observers = {} # model observers for every model_name
165 self._connecting = False # while connecting to juju (to avoid duplicate connections)
166 self._authenticated = False # it will be True when juju connection be stablished
167 self._creating_model = False # True during model creation
168
169 # create juju pub key file in lcm container at ./local/share/juju/ssh/juju_id_rsa.pub
170 self._create_juju_public_key()
171
172 self.info('N2VC juju connector initialized')
173
174 async def get_status(self, namespace: str):
175 self.info('Getting NS status. namespace: {}'.format(namespace))
176
177 if not self._authenticated:
178 await self._juju_login()
179
180 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
181 # model name is ns_id
182 model_name = ns_id
183 if model_name is None:
184 msg = 'Namespace {} not valid'.format(namespace)
185 self.error(msg)
186 raise N2VCBadArgumentsException(msg, ['namespace'])
187
188 # get juju model (create model if needed)
189 model = await self._juju_get_model(model_name=model_name)
190
191 status = await model.get_status()
192
193 return status
194
195 async def create_execution_environment(
196 self,
197 namespace: str,
198 db_dict: dict,
199 reuse_ee_id: str = None,
200 progress_timeout: float = None,
201 total_timeout: float = None
202 ) -> (str, dict):
203
204 self.info('Creating execution environment. namespace: {}, reuse_ee_id: {}'.format(namespace, reuse_ee_id))
205
206 if not self._authenticated:
207 await self._juju_login()
208
209 machine_id = None
210 if reuse_ee_id:
211 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=reuse_ee_id)
212 else:
213 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
214 # model name is ns_id
215 model_name = ns_id
216 # application name
217 application_name = self._get_application_name(namespace=namespace)
218
219 self.debug('model name: {}, application name: {}, machine_id: {}'
220 .format(model_name, application_name, machine_id))
221
222 # create or reuse a new juju machine
223 try:
224 machine = await self._juju_create_machine(
225 model_name=model_name,
226 application_name=application_name,
227 machine_id=machine_id,
228 db_dict=db_dict,
229 progress_timeout=progress_timeout,
230 total_timeout=total_timeout
231 )
232 except Exception as e:
233 message = 'Error creating machine on juju: {}'.format(e)
234 self.error(message)
235 raise N2VCException(message=message)
236
237 # id for the execution environment
238 ee_id = N2VCJujuConnector._build_ee_id(
239 model_name=model_name,
240 application_name=application_name,
241 machine_id=str(machine.entity_id)
242 )
243 self.debug('ee_id: {}'.format(ee_id))
244
245 # new machine credentials
246 credentials = dict()
247 credentials['hostname'] = machine.dns_name
248
249 self.info('Execution environment created. ee_id: {}, credentials: {}'.format(ee_id, credentials))
250
251 return ee_id, credentials
252
253 async def register_execution_environment(
254 self,
255 namespace: str,
256 credentials: dict,
257 db_dict: dict,
258 progress_timeout: float = None,
259 total_timeout: float = None
260 ) -> str:
261
262 if not self._authenticated:
263 await self._juju_login()
264
265 self.info('Registering execution environment. namespace={}, credentials={}'.format(namespace, credentials))
266
267 if credentials is None:
268 raise N2VCBadArgumentsException(message='credentials are mandatory', bad_args=['credentials'])
269 if credentials.get('hostname'):
270 hostname = credentials['hostname']
271 else:
272 raise N2VCBadArgumentsException(message='hostname is mandatory', bad_args=['credentials.hostname'])
273 if credentials.get('username'):
274 username = credentials['username']
275 else:
276 raise N2VCBadArgumentsException(message='username is mandatory', bad_args=['credentials.username'])
277 if 'private_key_path' in credentials:
278 private_key_path = credentials['private_key_path']
279 else:
280 # if not passed as argument, use generated private key path
281 private_key_path = self.private_key_path
282
283 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
284
285 # model name
286 model_name = ns_id
287 # application name
288 application_name = self._get_application_name(namespace=namespace)
289
290 # register machine on juju
291 try:
292 machine_id = await self._juju_provision_machine(
293 model_name=model_name,
294 hostname=hostname,
295 username=username,
296 private_key_path=private_key_path,
297 db_dict=db_dict,
298 progress_timeout=progress_timeout,
299 total_timeout=total_timeout
300 )
301 except Exception as e:
302 self.error('Error registering machine: {}'.format(e))
303 raise N2VCException(message='Error registering machine on juju: {}'.format(e))
304
305 self.info('Machine registered: {}'.format(machine_id))
306
307 # id for the execution environment
308 ee_id = N2VCJujuConnector._build_ee_id(
309 model_name=model_name,
310 application_name=application_name,
311 machine_id=str(machine_id)
312 )
313
314 self.info('Execution environment registered. ee_id: {}'.format(ee_id))
315
316 return ee_id
317
318 async def install_configuration_sw(
319 self,
320 ee_id: str,
321 artifact_path: str,
322 db_dict: dict,
323 progress_timeout: float = None,
324 total_timeout: float = None
325 ):
326
327 self.info('Installing configuration sw on ee_id: {}, artifact path: {}, db_dict: {}'
328 .format(ee_id, artifact_path, db_dict))
329
330 if not self._authenticated:
331 await self._juju_login()
332
333 # check arguments
334 if ee_id is None or len(ee_id) == 0:
335 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
336 if artifact_path is None or len(artifact_path) == 0:
337 raise N2VCBadArgumentsException(message='artifact_path is mandatory', bad_args=['artifact_path'])
338 if db_dict is None:
339 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
340
341 try:
342 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
343 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
344 except Exception as e:
345 raise N2VCBadArgumentsException(
346 message='ee_id={} is not a valid execution environment id'.format(ee_id),
347 bad_args=['ee_id']
348 )
349
350 # remove // in charm path
351 while artifact_path.find('//') >= 0:
352 artifact_path = artifact_path.replace('//', '/')
353
354 # check charm path
355 if not self.fs.file_exists(artifact_path, mode="dir"):
356 msg = 'artifact path does not exist: {}'.format(artifact_path)
357 raise N2VCBadArgumentsException(message=msg, bad_args=['artifact_path'])
358
359 if artifact_path.startswith('/'):
360 full_path = self.fs.path + artifact_path
361 else:
362 full_path = self.fs.path + '/' + artifact_path
363
364 try:
365 application, retries = await self._juju_deploy_charm(
366 model_name=model_name,
367 application_name=application_name,
368 charm_path=full_path,
369 machine_id=machine_id,
370 db_dict=db_dict,
371 progress_timeout=progress_timeout,
372 total_timeout=total_timeout
373 )
374 except Exception as e:
375 raise N2VCException(message='Error desploying charm into ee={} : {}'.format(ee_id, e))
376
377 self.info('Configuration sw installed')
378
379 async def get_ee_ssh_public__key(
380 self,
381 ee_id: str,
382 db_dict: dict,
383 progress_timeout: float = None,
384 total_timeout: float = None
385 ) -> str:
386
387 self.info('Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}'.format(ee_id, db_dict))
388
389 if not self._authenticated:
390 await self._juju_login()
391
392 # check arguments
393 if ee_id is None or len(ee_id) == 0:
394 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
395 if db_dict is None:
396 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
397
398 try:
399 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
400 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
401 except Exception as e:
402 raise N2VCBadArgumentsException(
403 message='ee_id={} is not a valid execution environment id'.format(ee_id),
404 bad_args=['ee_id']
405 )
406
407 # try to execute ssh layer primitives (if exist):
408 # generate-ssh-key
409 # get-ssh-public-key
410
411 output = None
412
413 # execute action: generate-ssh-key
414 try:
415 output, status = await self._juju_execute_action(
416 model_name=model_name,
417 application_name=application_name,
418 action_name='generate-ssh-key',
419 db_dict=db_dict,
420 progress_timeout=progress_timeout,
421 total_timeout=total_timeout
422 )
423 except Exception as e:
424 self.info('Cannot execute action generate-ssh-key: {}\nContinuing...'.format(e))
425
426 # execute action: get-ssh-public-key
427 try:
428 output, status = await self._juju_execute_action(
429 model_name=model_name,
430 application_name=application_name,
431 action_name='get-ssh-public-key',
432 db_dict=db_dict,
433 progress_timeout=progress_timeout,
434 total_timeout=total_timeout
435 )
436 except Exception as e:
437 msg = 'Cannot execute action get-ssh-public-key: {}\n'.format(e)
438 self.info(msg)
439 raise e
440
441 # return public key if exists
442 return output
443
444 async def add_relation(
445 self,
446 ee_id_1: str,
447 ee_id_2: str,
448 endpoint_1: str,
449 endpoint_2: str
450 ):
451
452 self.debug('adding new relation between {} and {}, endpoints: {}, {}'
453 .format(ee_id_1, ee_id_2, endpoint_1, endpoint_2))
454
455 if not self._authenticated:
456 await self._juju_login()
457
458 # get model, application and machines
459 model_1, app_1, machine_1 = self._get_ee_id_components(ee_id_1)
460 model_2, app_2, machine_2 = self._get_ee_id_components(ee_id_2)
461
462 # model must be the same
463 if model_1 != model_2:
464 message = 'EE models are not the same: {} vs {}'.format(ee_id_1, ee_id_2)
465 self.error(message)
466 raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_1', 'ee_id_2'])
467
468 # add juju relations between two applications
469 try:
470 self._juju_add_relation()
471 except Exception as e:
472 message = 'Error adding relation between {} and {}'.format(ee_id_1, ee_id_2)
473 self.error(message)
474 raise N2VCException(message=message)
475
476 async def remove_relation(
477 self
478 ):
479 if not self._authenticated:
480 await self._juju_login()
481 # TODO
482 self.info('Method not implemented yet')
483 raise NotImplemented()
484
485 async def deregister_execution_environments(
486 self
487 ):
488 if not self._authenticated:
489 await self._juju_login()
490 # TODO
491 self.info('Method not implemented yet')
492 raise NotImplemented()
493
494 async def delete_namespace(
495 self,
496 namespace: str,
497 db_dict: dict = None,
498 total_timeout: float = None
499 ):
500 self.info('Deleting namespace={}'.format(namespace))
501
502 if not self._authenticated:
503 await self._juju_login()
504
505 # check arguments
506 if namespace is None:
507 raise N2VCBadArgumentsException(message='namespace is mandatory', bad_args=['namespace'])
508
509 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
510 if ns_id is not None:
511 self.debug('Deleting model {}'.format(ns_id))
512 try:
513 await self._juju_destroy_model(
514 model_name=ns_id,
515 total_timeout=total_timeout
516 )
517 except Exception as e:
518 raise N2VCException(message='Error deleting namespace {} : {}'.format(namespace, e))
519 else:
520 raise N2VCBadArgumentsException(message='only ns_id is permitted to delete yet', bad_args=['namespace'])
521
522 self.info('Namespace {} deleted'.format(namespace))
523
524 async def delete_execution_environment(
525 self,
526 ee_id: str,
527 db_dict: dict = None,
528 total_timeout: float = None
529 ):
530 self.info('Deleting execution environment ee_id={}'.format(ee_id))
531
532 if not self._authenticated:
533 await self._juju_login()
534
535 # check arguments
536 if ee_id is None:
537 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
538
539 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=ee_id)
540
541 # destroy the application
542 try:
543 await self._juju_destroy_application(model_name=model_name, application_name=application_name)
544 except Exception as e:
545 raise N2VCException(message='Error deleting execution environment {} (application {}) : {}'
546 .format(ee_id, application_name, e))
547
548 # destroy the machine
549 try:
550 await self._juju_destroy_machine(
551 model_name=model_name,
552 machine_id=machine_id,
553 total_timeout=total_timeout
554 )
555 except Exception as e:
556 raise N2VCException(message='Error deleting execution environment {} (machine {}) : {}'
557 .format(ee_id, machine_id, e))
558
559 self.info('Execution environment {} deleted'.format(ee_id))
560
561 async def exec_primitive(
562 self,
563 ee_id: str,
564 primitive_name: str,
565 params_dict: dict,
566 db_dict: dict = None,
567 progress_timeout: float = None,
568 total_timeout: float = None
569 ) -> str:
570
571 self.info('Executing primitive: {} on ee: {}, params: {}'.format(primitive_name, ee_id, params_dict))
572
573 if not self._authenticated:
574 await self._juju_login()
575
576 # check arguments
577 if ee_id is None or len(ee_id) == 0:
578 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
579 if primitive_name is None or len(primitive_name) == 0:
580 raise N2VCBadArgumentsException(message='action_name is mandatory', bad_args=['action_name'])
581 if params_dict is None:
582 params_dict = dict()
583
584 try:
585 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
586 except Exception:
587 raise N2VCBadArgumentsException(
588 message='ee_id={} is not a valid execution environment id'.format(ee_id),
589 bad_args=['ee_id']
590 )
591
592 if primitive_name == 'config':
593 # Special case: config primitive
594 try:
595 await self._juju_configure_application(
596 model_name=model_name,
597 application_name=application_name,
598 config=params_dict,
599 db_dict=db_dict,
600 progress_timeout=progress_timeout,
601 total_timeout=total_timeout
602 )
603 except Exception as e:
604 self.error('Error configuring juju application: {}'.format(e))
605 raise N2VCExecutionException(
606 message='Error configuring application into ee={} : {}'.format(ee_id, e),
607 primitive_name=primitive_name
608 )
609 return 'CONFIG OK'
610 else:
611 try:
612 output, status = await self._juju_execute_action(
613 model_name=model_name,
614 application_name=application_name,
615 action_name=primitive_name,
616 db_dict=db_dict,
617 progress_timeout=progress_timeout,
618 total_timeout=total_timeout,
619 **params_dict
620 )
621 if status == 'completed':
622 return output
623 else:
624 raise Exception('status is not completed: {}'.format(status))
625 except Exception as e:
626 self.error('Error executing primitive {}: {}'.format(primitive_name, e))
627 raise N2VCExecutionException(
628 message='Error executing primitive {} into ee={} : {}'.format(primitive_name, ee_id, e),
629 primitive_name=primitive_name
630 )
631
632 async def disconnect(self):
633 self.info('closing juju N2VC...')
634 await self._juju_logout()
635
636 """
637 ##################################################################################################
638 ########################################## P R I V A T E #########################################
639 ##################################################################################################
640 """
641
642 def _write_ee_id_db(
643 self,
644 db_dict: dict,
645 ee_id: str
646 ):
647
648 # write ee_id to database: _admin.deployed.VCA.x
649 try:
650 the_table = db_dict['collection']
651 the_filter = db_dict['filter']
652 the_path = db_dict['path']
653 if not the_path[-1] == '.':
654 the_path = the_path + '.'
655 update_dict = {the_path + 'ee_id': ee_id}
656 self.debug('Writing ee_id to database: {}'.format(the_path))
657 self.db.set_one(
658 table=the_table,
659 q_filter=the_filter,
660 update_dict=update_dict,
661 fail_on_empty=True
662 )
663 except Exception as e:
664 self.error('Error writing ee_id to database: {}'.format(e))
665
666 @staticmethod
667 def _build_ee_id(
668 model_name: str,
669 application_name: str,
670 machine_id: str
671 ):
672 """
673 Build an execution environment id form model, application and machine
674 :param model_name:
675 :param application_name:
676 :param machine_id:
677 :return:
678 """
679 # id for the execution environment
680 return '{}.{}.{}'.format(model_name, application_name, machine_id)
681
682 @staticmethod
683 def _get_ee_id_components(
684 ee_id: str
685 ) -> (str, str, str):
686 """
687 Get model, application and machine components from an execution environment id
688 :param ee_id:
689 :return: model_name, application_name, machine_id
690 """
691
692 if ee_id is None:
693 return None, None, None
694
695 # split components of id
696 parts = ee_id.split('.')
697 model_name = parts[0]
698 application_name = parts[1]
699 machine_id = parts[2]
700 return model_name, application_name, machine_id
701
702 def _get_application_name(self, namespace: str) -> str:
703 """
704 Build application name from namespace
705 :param namespace:
706 :return: app-vnf-<vnf id>-vdu-<vdu-id>-cnt-<vdu-count>
707 """
708
709 # split namespace components
710 _, _, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
711
712 if vnf_id is None or len(vnf_id) == 0:
713 vnf_id = ''
714 else:
715 vnf_id = 'vnf-' + vnf_id
716
717 if vdu_id is None or len(vdu_id) == 0:
718 vdu_id = ''
719 else:
720 vdu_id = '-vdu-' + vdu_id
721
722 if vdu_count is None or len(vdu_count) == 0:
723 vdu_count = ''
724 else:
725 vdu_count = '-cnt-' + vdu_count
726
727 application_name = 'app-{}{}{}'.format(vnf_id, vdu_id, vdu_count)
728
729 return N2VCJujuConnector._format_app_name(application_name)
730
731 async def _juju_create_machine(
732 self,
733 model_name: str,
734 application_name: str,
735 machine_id: str = None,
736 db_dict: dict = None,
737 progress_timeout: float = None,
738 total_timeout: float = None
739 ) -> Machine:
740
741 self.debug('creating machine in model: {}, existing machine id: {}'.format(model_name, machine_id))
742
743 # get juju model and observer (create model if needed)
744 model = await self._juju_get_model(model_name=model_name)
745 observer = self.juju_observers[model_name]
746
747 # find machine id in model
748 machine = None
749 if machine_id is not None:
750 self.debug('Finding existing machine id {} in model'.format(machine_id))
751 # get juju existing machines in the model
752 existing_machines = await model.get_machines()
753 if machine_id in existing_machines:
754 self.debug('Machine id {} found in model (reusing it)'.format(machine_id))
755 machine = model.machines[machine_id]
756
757 if machine is None:
758 self.debug('Creating a new machine in juju...')
759 # machine does not exist, create it and wait for it
760 machine = await model.add_machine(
761 spec=None,
762 constraints=None,
763 disks=None,
764 series='xenial'
765 )
766
767 # register machine with observer
768 observer.register_machine(machine=machine, db_dict=db_dict)
769
770 # id for the execution environment
771 ee_id = N2VCJujuConnector._build_ee_id(
772 model_name=model_name,
773 application_name=application_name,
774 machine_id=str(machine.entity_id)
775 )
776
777 # write ee_id in database
778 self._write_ee_id_db(
779 db_dict=db_dict,
780 ee_id=ee_id
781 )
782
783 # wait for machine creation
784 await observer.wait_for_machine(
785 machine_id=str(machine.entity_id),
786 progress_timeout=progress_timeout,
787 total_timeout=total_timeout
788 )
789
790 else:
791
792 self.debug('Reusing old machine pending')
793
794 # register machine with observer
795 observer.register_machine(machine=machine, db_dict=db_dict)
796
797 # machine does exist, but it is in creation process (pending), wait for create finalisation
798 await observer.wait_for_machine(
799 machine_id=machine.entity_id,
800 progress_timeout=progress_timeout,
801 total_timeout=total_timeout)
802
803 self.debug("Machine ready at " + str(machine.dns_name))
804 return machine
805
806 async def _juju_provision_machine(
807 self,
808 model_name: str,
809 hostname: str,
810 username: str,
811 private_key_path: str,
812 db_dict: dict = None,
813 progress_timeout: float = None,
814 total_timeout: float = None
815 ) -> str:
816
817 if not self.api_proxy:
818 msg = 'Cannot provision machine: api_proxy is not defined'
819 self.error(msg=msg)
820 raise N2VCException(message=msg)
821
822 self.debug('provisioning machine. model: {}, hostname: {}, username: {}'.format(model_name, hostname, username))
823
824 if not self._authenticated:
825 await self._juju_login()
826
827 # get juju model and observer
828 model = await self._juju_get_model(model_name=model_name)
829 observer = self.juju_observers[model_name]
830
831 # TODO check if machine is already provisioned
832 machine_list = await model.get_machines()
833
834 provisioner = SSHProvisioner(
835 host=hostname,
836 user=username,
837 private_key_path=private_key_path,
838 log=self.log
839 )
840
841 params = None
842 try:
843 params = provisioner.provision_machine()
844 except Exception as ex:
845 msg = "Exception provisioning machine: {}".format(ex)
846 self.log.error(msg)
847 raise N2VCException(message=msg)
848
849 params.jobs = ['JobHostUnits']
850
851 connection = model.connection()
852
853 # Submit the request.
854 self.debug("Adding machine to model")
855 client_facade = client.ClientFacade.from_connection(connection)
856 results = await client_facade.AddMachines(params=[params])
857 error = results.machines[0].error
858 if error:
859 msg = "Error adding machine: {}}".format(error.message)
860 self.error(msg=msg)
861 raise ValueError(msg)
862
863 machine_id = results.machines[0].machine
864
865 # Need to run this after AddMachines has been called,
866 # as we need the machine_id
867 self.debug("Installing Juju agent into machine {}".format(machine_id))
868 asyncio.ensure_future(provisioner.install_agent(
869 connection=connection,
870 nonce=params.nonce,
871 machine_id=machine_id,
872 api=self.api_proxy,
873 ))
874
875 # wait for machine in model (now, machine is not yet in model, so we must wait for it)
876 machine = None
877 for i in range(10):
878 machine_list = await model.get_machines()
879 if machine_id in machine_list:
880 self.debug('Machine {} found in model!'.format(machine_id))
881 machine = model.machines.get(machine_id)
882 break
883 await asyncio.sleep(2)
884
885 if machine is None:
886 msg = 'Machine {} not found in model'.format(machine_id)
887 self.error(msg=msg)
888 raise Exception(msg)
889
890 # register machine with observer
891 observer.register_machine(machine=machine, db_dict=db_dict)
892
893 # wait for machine creation
894 self.debug('waiting for provision finishes... {}'.format(machine_id))
895 await observer.wait_for_machine(
896 machine_id=machine_id,
897 progress_timeout=progress_timeout,
898 total_timeout=total_timeout
899 )
900
901 self.debug("Machine provisioned {}".format(machine_id))
902
903 return machine_id
904
905 async def _juju_deploy_charm(
906 self,
907 model_name: str,
908 application_name: str,
909 charm_path: str,
910 machine_id: str,
911 db_dict: dict,
912 progress_timeout: float = None,
913 total_timeout: float = None
914 ) -> (Application, int):
915
916 # get juju model and observer
917 model = await self._juju_get_model(model_name=model_name)
918 observer = self.juju_observers[model_name]
919
920 # check if application already exists
921 application = None
922 if application_name in model.applications:
923 application = model.applications[application_name]
924
925 if application is None:
926
927 # application does not exist, create it and wait for it
928 self.debug('deploying application {} to machine {}, model {}'
929 .format(application_name, machine_id, model_name))
930 self.debug('charm: {}'.format(charm_path))
931 series = 'xenial'
932 # series = None
933 application = await model.deploy(
934 entity_url=charm_path,
935 application_name=application_name,
936 channel='stable',
937 num_units=1,
938 series=series,
939 to=machine_id
940 )
941
942 # register application with observer
943 observer.register_application(application=application, db_dict=db_dict)
944
945 self.debug('waiting for application deployed... {}'.format(application.entity_id))
946 retries = await observer.wait_for_application(
947 application_id=application.entity_id,
948 progress_timeout=progress_timeout,
949 total_timeout=total_timeout)
950 self.debug('application deployed')
951
952 else:
953
954 # register application with observer
955 observer.register_application(application=application, db_dict=db_dict)
956
957 # application already exists, but not finalised
958 self.debug('application already exists, waiting for deployed...')
959 retries = await observer.wait_for_application(
960 application_id=application.entity_id,
961 progress_timeout=progress_timeout,
962 total_timeout=total_timeout)
963 self.debug('application deployed')
964
965 return application, retries
966
967 async def _juju_execute_action(
968 self,
969 model_name: str,
970 application_name: str,
971 action_name: str,
972 db_dict: dict,
973 progress_timeout: float = None,
974 total_timeout: float = None,
975 **kwargs
976 ) -> Action:
977
978 # get juju model and observer
979 model = await self._juju_get_model(model_name=model_name)
980 observer = self.juju_observers[model_name]
981
982 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
983
984 self.debug('trying to execute action {}'.format(action_name))
985 unit = application.units[0]
986 if unit is not None:
987 actions = await application.get_actions()
988 if action_name in actions:
989 self.debug('executing action {} with params {}'.format(action_name, kwargs))
990 action = await unit.run_action(action_name, **kwargs)
991
992 # register action with observer
993 observer.register_action(action=action, db_dict=db_dict)
994
995 self.debug(' waiting for action completed or error...')
996 await observer.wait_for_action(
997 action_id=action.entity_id,
998 progress_timeout=progress_timeout,
999 total_timeout=total_timeout)
1000 self.debug('action completed with status: {}'.format(action.status))
1001 output = await model.get_action_output(action_uuid=action.entity_id)
1002 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1003 if action.entity_id in status:
1004 status = status[action.entity_id]
1005 else:
1006 status = 'failed'
1007 return output, status
1008
1009 raise N2VCExecutionException(
1010 message='Cannot execute action on charm',
1011 primitive_name=action_name
1012 )
1013
1014 async def _juju_configure_application(
1015 self,
1016 model_name: str,
1017 application_name: str,
1018 config: dict,
1019 db_dict: dict,
1020 progress_timeout: float = None,
1021 total_timeout: float = None
1022 ):
1023
1024 # get juju model
1025 model = await self._juju_get_model(model_name=model_name)
1026
1027 # get the application
1028 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
1029
1030 self.debug('configuring the application {} -> {}'.format(application_name, config))
1031 res = await application.set_config(config)
1032 self.debug('application {} configured. res={}'.format(application_name, res))
1033
1034 # Verify the config is set
1035 new_conf = await application.get_config()
1036 for key in config:
1037 value = new_conf[key]['value']
1038 self.debug(' {} = {}'.format(key, value))
1039 if config[key] != value:
1040 raise N2VCException(
1041 message='key {} is not configured correctly {} != {}'.format(key, config[key], new_conf[key])
1042 )
1043
1044 # check if 'verify-ssh-credentials' action exists
1045 unit = application.units[0]
1046 actions = await application.get_actions()
1047 if 'verify-ssh-credentials' not in actions:
1048 msg = 'Action verify-ssh-credentials does not exist in application {}'.format(application_name)
1049 return False
1050
1051 # execute verify-credentials
1052 num_retries = 20
1053 retry_timeout = 15.0
1054 for i in range(num_retries):
1055 try:
1056 self.debug('Executing action verify-ssh-credentials...')
1057 output, ok = await self._juju_execute_action(
1058 model_name=model_name,
1059 application_name=application_name,
1060 action_name='verify-ssh-credentials',
1061 db_dict=db_dict,
1062 progress_timeout=progress_timeout,
1063 total_timeout=total_timeout
1064 )
1065 self.debug('Result: {}, output: {}'.format(ok, output))
1066 return True
1067 except Exception as e:
1068 self.debug('Error executing verify-ssh-credentials: {}. Retrying...'.format(e))
1069 await asyncio.sleep(retry_timeout)
1070 else:
1071 self.error('Error executing verify-ssh-credentials after {} retries. '.format(num_retries))
1072 return False
1073
1074 async def _juju_get_application(
1075 self,
1076 model_name: str,
1077 application_name: str
1078 ):
1079 """Get the deployed application."""
1080
1081 model = await self._juju_get_model(model_name=model_name)
1082
1083 application_name = N2VCJujuConnector._format_app_name(application_name)
1084
1085 if model.applications and application_name in model.applications:
1086 return model.applications[application_name]
1087 else:
1088 raise N2VCException(message='Cannot get application {} from model {}'.format(application_name, model_name))
1089
1090 async def _juju_get_model(self, model_name: str) -> Model:
1091 """ Get a model object from juju controller
1092
1093 :param str model_name: name of the model
1094 :returns Model: model obtained from juju controller or Exception
1095 """
1096
1097 # format model name
1098 model_name = N2VCJujuConnector._format_model_name(model_name)
1099
1100 if model_name in self.juju_models:
1101 return self.juju_models[model_name]
1102
1103 if self._creating_model:
1104 self.debug('Another coroutine is creating a model. Wait...')
1105 while self._creating_model:
1106 # another coroutine is creating a model, wait
1107 await asyncio.sleep(0.1)
1108 # retry (perhaps another coroutine has created the model meanwhile)
1109 if model_name in self.juju_models:
1110 return self.juju_models[model_name]
1111
1112 try:
1113 self._creating_model = True
1114
1115 # get juju model names from juju
1116 model_list = await self.controller.list_models()
1117
1118 if model_name not in model_list:
1119 self.info('Model {} does not exist. Creating new model...'.format(model_name))
1120 model = await self.controller.add_model(
1121 model_name=model_name,
1122 config={'authorized-keys': self.public_key}
1123 )
1124 self.info('New model created, name={}'.format(model_name))
1125 else:
1126 self.debug('Model already exists in juju. Getting model {}'.format(model_name))
1127 model = await self.controller.get_model(model_name)
1128 self.debug('Existing model in juju, name={}'.format(model_name))
1129
1130 self.juju_models[model_name] = model
1131 self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model)
1132 return model
1133
1134 except Exception as e:
1135 msg = 'Cannot get model {}. Exception: {}'.format(model_name, e)
1136 self.error(msg)
1137 raise N2VCException(msg)
1138 finally:
1139 self._creating_model = False
1140
1141 async def _juju_add_relation(
1142 self,
1143 model_name: str,
1144 application_name_1: str,
1145 application_name_2: str,
1146 relation_1: str,
1147 relation_2: str
1148 ):
1149
1150 self.debug('adding relation')
1151
1152 # get juju model and observer
1153 model = await self._juju_get_model(model_name=model_name)
1154
1155 r1 = '{}:{}'.format(application_name_1, relation_1)
1156 r2 = '{}:{}'.format(application_name_2, relation_2)
1157 await model.add_relation(relation1=r1, relation2=r2)
1158
1159 async def _juju_destroy_application(
1160 self,
1161 model_name: str,
1162 application_name: str
1163 ):
1164
1165 self.debug('Destroying application {} in model {}'.format(application_name, model_name))
1166
1167 # get juju model and observer
1168 model = await self._juju_get_model(model_name=model_name)
1169
1170 application = model.applications.get(application_name)
1171 if application:
1172 await application.destroy()
1173 else:
1174 self.debug('Application not found: {}'.format(application_name))
1175
1176 async def _juju_destroy_machine(
1177 self,
1178 model_name: str,
1179 machine_id: str,
1180 total_timeout: float = None
1181 ):
1182
1183 self.debug('Destroying machine {} in model {}'.format(machine_id, model_name))
1184
1185 if total_timeout is None:
1186 total_timeout = 3600
1187
1188 # get juju model and observer
1189 model = await self._juju_get_model(model_name=model_name)
1190
1191 machines = await model.get_machines()
1192 if machine_id in machines:
1193 machine = model.machines[machine_id]
1194 await machine.destroy(force=True)
1195 # max timeout
1196 end = time.time() + total_timeout
1197 # wait for machine removal
1198 machines = await model.get_machines()
1199 while machine_id in machines and time.time() < end:
1200 self.debug('Waiting for machine {} is destroyed'.format(machine_id))
1201 await asyncio.sleep(0.5)
1202 machines = await model.get_machines()
1203 self.debug('Machine destroyed: {}'.format(machine_id))
1204 else:
1205 self.debug('Machine not found: {}'.format(machine_id))
1206
1207 async def _juju_destroy_model(
1208 self,
1209 model_name: str,
1210 total_timeout: float = None
1211 ):
1212
1213 self.debug('Destroying model {}'.format(model_name))
1214
1215 if total_timeout is None:
1216 total_timeout = 3600
1217
1218 model = await self._juju_get_model(model_name=model_name)
1219 uuid = model.info.uuid
1220
1221 self.debug('disconnecting model {}...'.format(model_name))
1222 await self._juju_disconnect_model(model_name=model_name)
1223 self.juju_models[model_name] = None
1224 self.juju_observers[model_name] = None
1225
1226 self.debug('destroying model {}...'.format(model_name))
1227 await self.controller.destroy_model(uuid)
1228
1229 # wait for model is completely destroyed
1230 end = time.time() + total_timeout
1231 while time.time() < end:
1232 self.debug('waiting for model is destroyed...')
1233 try:
1234 await self.controller.get_model(uuid)
1235 except Exception:
1236 self.debug('model destroyed')
1237 return
1238 await asyncio.sleep(1.0)
1239
1240 async def _juju_login(self):
1241 """Connect to juju controller
1242
1243 """
1244
1245 # if already authenticated, exit function
1246 if self._authenticated:
1247 return
1248
1249 # if connecting, wait for finish
1250 # another task could be trying to connect in parallel
1251 while self._connecting:
1252 await asyncio.sleep(0.1)
1253
1254 # double check after other task has finished
1255 if self._authenticated:
1256 return
1257
1258 try:
1259 self._connecting = True
1260 self.info(
1261 'connecting to juju controller: {} {}:{} ca_cert: {}'
1262 .format(self.url, self.username, self.secret, '\n'+self.ca_cert if self.ca_cert else 'None'))
1263
1264 # Create controller object
1265 self.controller = Controller(loop=self.loop)
1266 # Connect to controller
1267 await self.controller.connect(
1268 endpoint=self.url,
1269 username=self.username,
1270 password=self.secret,
1271 cacert=self.ca_cert
1272 )
1273 self._authenticated = True
1274 self.info('juju controller connected')
1275 except Exception as e:
1276 message = 'Exception connecting to juju: {}'.format(e)
1277 self.error(message)
1278 raise N2VCConnectionException(
1279 message=message,
1280 url=self.url
1281 )
1282 finally:
1283 self._connecting = False
1284
1285 async def _juju_logout(self):
1286 """Logout of the Juju controller."""
1287 if not self._authenticated:
1288 return False
1289
1290 # disconnect all models
1291 for model_name in self.juju_models:
1292 try:
1293 await self._juju_disconnect_model(model_name)
1294 except Exception as e:
1295 self.error('Error disconnecting model {} : {}'.format(model_name, e))
1296 # continue with next model...
1297
1298 self.info("Disconnecting controller")
1299 try:
1300 await self.controller.disconnect()
1301 except Exception as e:
1302 raise N2VCConnectionException(message='Error disconnecting controller: {}'.format(e), url=self.url)
1303
1304 self.controller = None
1305 self._authenticated = False
1306 self.info('disconnected')
1307
1308 async def _juju_disconnect_model(
1309 self,
1310 model_name: str
1311 ):
1312 self.debug("Disconnecting model {}".format(model_name))
1313 if model_name in self.juju_models:
1314 await self.juju_models[model_name].disconnect()
1315 self.juju_models[model_name] = None
1316 self.juju_observers[model_name] = None
1317
1318 def _create_juju_public_key(self):
1319 """Recreate the Juju public key on lcm container, if needed
1320 Certain libjuju commands expect to be run from the same machine as Juju
1321 is bootstrapped to. This method will write the public key to disk in
1322 that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
1323 """
1324
1325 # Make sure that we have a public key before writing to disk
1326 if self.public_key is None or len(self.public_key) == 0:
1327 if 'OSMLCM_VCA_PUBKEY' in os.environ:
1328 self.public_key = os.getenv('OSMLCM_VCA_PUBKEY', '')
1329 if len(self.public_key) == 0:
1330 return
1331 else:
1332 return
1333
1334 pk_path = "{}/.local/share/juju/ssh".format(os.path.expanduser('~'))
1335 file_path = "{}/juju_id_rsa.pub".format(pk_path)
1336 self.debug('writing juju public key to file:\n{}\npublic key: {}'.format(file_path, self.public_key))
1337 if not os.path.exists(pk_path):
1338 # create path and write file
1339 os.makedirs(pk_path)
1340 with open(file_path, 'w') as f:
1341 self.debug('Creating juju public key file: {}'.format(file_path))
1342 f.write(self.public_key)
1343 else:
1344 self.debug('juju public key file already exists: {}'.format(file_path))
1345
1346 @staticmethod
1347 def _format_model_name(name: str) -> str:
1348 """Format the name of the model.
1349
1350 Model names may only contain lowercase letters, digits and hyphens
1351 """
1352
1353 return name.replace('_', '-').replace(' ', '-').lower()
1354
1355 @staticmethod
1356 def _format_app_name(name: str) -> str:
1357 """Format the name of the application (in order to assure valid application name).
1358
1359 Application names have restrictions (run juju deploy --help):
1360 - contains lowercase letters 'a'-'z'
1361 - contains numbers '0'-'9'
1362 - contains hyphens '-'
1363 - starts with a lowercase letter
1364 - not two or more consecutive hyphens
1365 - after a hyphen, not a group with all numbers
1366 """
1367
1368 def all_numbers(s: str) -> bool:
1369 for c in s:
1370 if not c.isdigit():
1371 return False
1372 return True
1373
1374 new_name = name.replace('_', '-')
1375 new_name = new_name.replace(' ', '-')
1376 new_name = new_name.lower()
1377 while new_name.find('--') >= 0:
1378 new_name = new_name.replace('--', '-')
1379 groups = new_name.split('-')
1380
1381 # find 'all numbers' groups and prefix them with a letter
1382 app_name = ''
1383 for i in range(len(groups)):
1384 group = groups[i]
1385 if all_numbers(group):
1386 group = 'z' + group
1387 if i > 0:
1388 app_name += '-'
1389 app_name += group
1390
1391 if app_name[0].isdigit():
1392 app_name = 'z' + app_name
1393
1394 return app_name