Fix bug 957: Return pubkey properly
[osm/N2VC.git] / n2vc / n2vc_juju_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import logging
24 import os
25 import asyncio
26 import time
27 import base64
28 import binascii
29 import re
30
31 from n2vc.n2vc_conn import N2VCConnector
32 from n2vc.exceptions \
33 import N2VCBadArgumentsException, N2VCException, N2VCConnectionException, \
34 N2VCExecutionException, N2VCInvalidCertificate
35 from n2vc.juju_observer import JujuModelObserver
36
37 from juju.controller import Controller
38 from juju.model import Model
39 from juju.application import Application
40 from juju.action import Action
41 from juju.machine import Machine
42 from juju.client import client
43
44 from n2vc.provisioner import SSHProvisioner
45
46
47 class N2VCJujuConnector(N2VCConnector):
48
49 """
50 ##################################################################################################
51 ########################################## P U B L I C ###########################################
52 ##################################################################################################
53 """
54
55 def __init__(
56 self,
57 db: object,
58 fs: object,
59 log: object = None,
60 loop: object = None,
61 url: str = '127.0.0.1:17070',
62 username: str = 'admin',
63 vca_config: dict = None,
64 on_update_db=None
65 ):
66 """Initialize juju N2VC connector
67 """
68
69 # parent class constructor
70 N2VCConnector.__init__(
71 self,
72 db=db,
73 fs=fs,
74 log=log,
75 loop=loop,
76 url=url,
77 username=username,
78 vca_config=vca_config,
79 on_update_db=on_update_db
80 )
81
82 # silence websocket traffic log
83 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
84 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
85 logging.getLogger('model').setLevel(logging.WARN)
86
87 self.info('Initializing N2VC juju connector...')
88
89 """
90 ##############################################################
91 # check arguments
92 ##############################################################
93 """
94
95 # juju URL
96 if url is None:
97 raise N2VCBadArgumentsException('Argument url is mandatory', ['url'])
98 url_parts = url.split(':')
99 if len(url_parts) != 2:
100 raise N2VCBadArgumentsException('Argument url: bad format (localhost:port) -> {}'.format(url), ['url'])
101 self.hostname = url_parts[0]
102 try:
103 self.port = int(url_parts[1])
104 except ValueError:
105 raise N2VCBadArgumentsException('url port must be a number -> {}'.format(url), ['url'])
106
107 # juju USERNAME
108 if username is None:
109 raise N2VCBadArgumentsException('Argument username is mandatory', ['username'])
110
111 # juju CONFIGURATION
112 if vca_config is None:
113 raise N2VCBadArgumentsException('Argument vca_config is mandatory', ['vca_config'])
114
115 if 'secret' in vca_config:
116 self.secret = vca_config['secret']
117 else:
118 raise N2VCBadArgumentsException('Argument vca_config.secret is mandatory', ['vca_config.secret'])
119
120 # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub
121 # if exists, it will be written in lcm container: _create_juju_public_key()
122 if 'public_key' in vca_config:
123 self.public_key = vca_config['public_key']
124 else:
125 self.public_key = None
126
127 # TODO: Verify ca_cert is valid before using. VCA will crash
128 # if the ca_cert isn't formatted correctly.
129 def base64_to_cacert(b64string):
130 """Convert the base64-encoded string containing the VCA CACERT.
131
132 The input string....
133
134 """
135 try:
136 cacert = base64.b64decode(b64string).decode("utf-8")
137
138 cacert = re.sub(
139 r'\\n',
140 r'\n',
141 cacert,
142 )
143 except binascii.Error as e:
144 self.debug("Caught binascii.Error: {}".format(e))
145 raise N2VCInvalidCertificate(message="Invalid CA Certificate")
146
147 return cacert
148
149 self.ca_cert = vca_config.get('ca_cert')
150 if self.ca_cert:
151 self.ca_cert = base64_to_cacert(vca_config['ca_cert'])
152
153 if 'api_proxy' in vca_config:
154 self.api_proxy = vca_config['api_proxy']
155 self.debug('api_proxy for native charms configured: {}'.format(self.api_proxy))
156 else:
157 self.warning('api_proxy is not configured. Support for native charms is disabled')
158
159 self.debug('Arguments have been checked')
160
161 # juju data
162 self.controller = None # it will be filled when connect to juju
163 self.juju_models = {} # model objects for every model_name
164 self.juju_observers = {} # model observers for every model_name
165 self._connecting = False # while connecting to juju (to avoid duplicate connections)
166 self._authenticated = False # it will be True when juju connection be stablished
167 self._creating_model = False # True during model creation
168
169 # create juju pub key file in lcm container at ./local/share/juju/ssh/juju_id_rsa.pub
170 self._create_juju_public_key()
171
172 self.info('N2VC juju connector initialized')
173
174 async def get_status(self, namespace: str):
175 self.info('Getting NS status. namespace: {}'.format(namespace))
176
177 if not self._authenticated:
178 await self._juju_login()
179
180 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
181 # model name is ns_id
182 model_name = ns_id
183 if model_name is None:
184 msg = 'Namespace {} not valid'.format(namespace)
185 self.error(msg)
186 raise N2VCBadArgumentsException(msg, ['namespace'])
187
188 # get juju model (create model if needed)
189 model = await self._juju_get_model(model_name=model_name)
190
191 status = await model.get_status()
192
193 return status
194
195 async def create_execution_environment(
196 self,
197 namespace: str,
198 db_dict: dict,
199 reuse_ee_id: str = None,
200 progress_timeout: float = None,
201 total_timeout: float = None
202 ) -> (str, dict):
203
204 self.info('Creating execution environment. namespace: {}, reuse_ee_id: {}'.format(namespace, reuse_ee_id))
205
206 if not self._authenticated:
207 await self._juju_login()
208
209 machine_id = None
210 if reuse_ee_id:
211 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=reuse_ee_id)
212 else:
213 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
214 # model name is ns_id
215 model_name = ns_id
216 # application name
217 application_name = self._get_application_name(namespace=namespace)
218
219 self.debug('model name: {}, application name: {}, machine_id: {}'
220 .format(model_name, application_name, machine_id))
221
222 # create or reuse a new juju machine
223 try:
224 machine = await self._juju_create_machine(
225 model_name=model_name,
226 application_name=application_name,
227 machine_id=machine_id,
228 db_dict=db_dict,
229 progress_timeout=progress_timeout,
230 total_timeout=total_timeout
231 )
232 except Exception as e:
233 message = 'Error creating machine on juju: {}'.format(e)
234 self.error(message)
235 raise N2VCException(message=message)
236
237 # id for the execution environment
238 ee_id = N2VCJujuConnector._build_ee_id(
239 model_name=model_name,
240 application_name=application_name,
241 machine_id=str(machine.entity_id)
242 )
243 self.debug('ee_id: {}'.format(ee_id))
244
245 # new machine credentials
246 credentials = dict()
247 credentials['hostname'] = machine.dns_name
248
249 self.info('Execution environment created. ee_id: {}, credentials: {}'.format(ee_id, credentials))
250
251 return ee_id, credentials
252
253 async def register_execution_environment(
254 self,
255 namespace: str,
256 credentials: dict,
257 db_dict: dict,
258 progress_timeout: float = None,
259 total_timeout: float = None
260 ) -> str:
261
262 if not self._authenticated:
263 await self._juju_login()
264
265 self.info('Registering execution environment. namespace={}, credentials={}'.format(namespace, credentials))
266
267 if credentials is None:
268 raise N2VCBadArgumentsException(message='credentials are mandatory', bad_args=['credentials'])
269 if credentials.get('hostname'):
270 hostname = credentials['hostname']
271 else:
272 raise N2VCBadArgumentsException(message='hostname is mandatory', bad_args=['credentials.hostname'])
273 if credentials.get('username'):
274 username = credentials['username']
275 else:
276 raise N2VCBadArgumentsException(message='username is mandatory', bad_args=['credentials.username'])
277 if 'private_key_path' in credentials:
278 private_key_path = credentials['private_key_path']
279 else:
280 # if not passed as argument, use generated private key path
281 private_key_path = self.private_key_path
282
283 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
284
285 # model name
286 model_name = ns_id
287 # application name
288 application_name = self._get_application_name(namespace=namespace)
289
290 # register machine on juju
291 try:
292 machine_id = await self._juju_provision_machine(
293 model_name=model_name,
294 hostname=hostname,
295 username=username,
296 private_key_path=private_key_path,
297 db_dict=db_dict,
298 progress_timeout=progress_timeout,
299 total_timeout=total_timeout
300 )
301 except Exception as e:
302 self.error('Error registering machine: {}'.format(e))
303 raise N2VCException(message='Error registering machine on juju: {}'.format(e))
304
305 self.info('Machine registered: {}'.format(machine_id))
306
307 # id for the execution environment
308 ee_id = N2VCJujuConnector._build_ee_id(
309 model_name=model_name,
310 application_name=application_name,
311 machine_id=str(machine_id)
312 )
313
314 self.info('Execution environment registered. ee_id: {}'.format(ee_id))
315
316 return ee_id
317
318 async def install_configuration_sw(
319 self,
320 ee_id: str,
321 artifact_path: str,
322 db_dict: dict,
323 progress_timeout: float = None,
324 total_timeout: float = None
325 ):
326
327 self.info('Installing configuration sw on ee_id: {}, artifact path: {}, db_dict: {}'
328 .format(ee_id, artifact_path, db_dict))
329
330 if not self._authenticated:
331 await self._juju_login()
332
333 # check arguments
334 if ee_id is None or len(ee_id) == 0:
335 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
336 if artifact_path is None or len(artifact_path) == 0:
337 raise N2VCBadArgumentsException(message='artifact_path is mandatory', bad_args=['artifact_path'])
338 if db_dict is None:
339 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
340
341 try:
342 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
343 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
344 except Exception as e:
345 raise N2VCBadArgumentsException(
346 message='ee_id={} is not a valid execution environment id'.format(ee_id),
347 bad_args=['ee_id']
348 )
349
350 # remove // in charm path
351 while artifact_path.find('//') >= 0:
352 artifact_path = artifact_path.replace('//', '/')
353
354 # check charm path
355 if not self.fs.file_exists(artifact_path, mode="dir"):
356 msg = 'artifact path does not exist: {}'.format(artifact_path)
357 raise N2VCBadArgumentsException(message=msg, bad_args=['artifact_path'])
358
359 if artifact_path.startswith('/'):
360 full_path = self.fs.path + artifact_path
361 else:
362 full_path = self.fs.path + '/' + artifact_path
363
364 try:
365 application, retries = await self._juju_deploy_charm(
366 model_name=model_name,
367 application_name=application_name,
368 charm_path=full_path,
369 machine_id=machine_id,
370 db_dict=db_dict,
371 progress_timeout=progress_timeout,
372 total_timeout=total_timeout
373 )
374 except Exception as e:
375 raise N2VCException(message='Error desploying charm into ee={} : {}'.format(ee_id, e))
376
377 self.info('Configuration sw installed')
378
379 async def get_ee_ssh_public__key(
380 self,
381 ee_id: str,
382 db_dict: dict,
383 progress_timeout: float = None,
384 total_timeout: float = None
385 ) -> str:
386
387 self.info('Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}'.format(ee_id, db_dict))
388
389 if not self._authenticated:
390 await self._juju_login()
391
392 # check arguments
393 if ee_id is None or len(ee_id) == 0:
394 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
395 if db_dict is None:
396 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
397
398 try:
399 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
400 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
401 except Exception as e:
402 raise N2VCBadArgumentsException(
403 message='ee_id={} is not a valid execution environment id'.format(ee_id),
404 bad_args=['ee_id']
405 )
406
407 # try to execute ssh layer primitives (if exist):
408 # generate-ssh-key
409 # get-ssh-public-key
410
411 output = None
412
413 # execute action: generate-ssh-key
414 try:
415 output, status = await self._juju_execute_action(
416 model_name=model_name,
417 application_name=application_name,
418 action_name='generate-ssh-key',
419 db_dict=db_dict,
420 progress_timeout=progress_timeout,
421 total_timeout=total_timeout
422 )
423 except Exception as e:
424 self.info('Cannot execute action generate-ssh-key: {}\nContinuing...'.format(e))
425
426 # execute action: get-ssh-public-key
427 try:
428 output, status = await self._juju_execute_action(
429 model_name=model_name,
430 application_name=application_name,
431 action_name='get-ssh-public-key',
432 db_dict=db_dict,
433 progress_timeout=progress_timeout,
434 total_timeout=total_timeout
435 )
436 except Exception as e:
437 msg = 'Cannot execute action get-ssh-public-key: {}\n'.format(e)
438 self.info(msg)
439 raise e
440
441 # return public key if exists
442 return output["pubkey"] if "pubkey" in output else output
443
444 async def add_relation(
445 self,
446 ee_id_1: str,
447 ee_id_2: str,
448 endpoint_1: str,
449 endpoint_2: str
450 ):
451
452 self.debug('adding new relation between {} and {}, endpoints: {}, {}'
453 .format(ee_id_1, ee_id_2, endpoint_1, endpoint_2))
454
455 if not self._authenticated:
456 await self._juju_login()
457
458 # get model, application and machines
459 model_1, app_1, machine_1 = self._get_ee_id_components(ee_id_1)
460 model_2, app_2, machine_2 = self._get_ee_id_components(ee_id_2)
461
462 # model must be the same
463 if model_1 != model_2:
464 message = 'EE models are not the same: {} vs {}'.format(ee_id_1, ee_id_2)
465 self.error(message)
466 raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_1', 'ee_id_2'])
467
468 # add juju relations between two applications
469 try:
470 self._juju_add_relation()
471 except Exception as e:
472 message = 'Error adding relation between {} and {}'.format(ee_id_1, ee_id_2)
473 self.error(message)
474 raise N2VCException(message=message)
475
476 async def remove_relation(
477 self
478 ):
479 if not self._authenticated:
480 await self._juju_login()
481 # TODO
482 self.info('Method not implemented yet')
483 raise NotImplemented()
484
485 async def deregister_execution_environments(
486 self
487 ):
488 if not self._authenticated:
489 await self._juju_login()
490 # TODO
491 self.info('Method not implemented yet')
492 raise NotImplemented()
493
494 async def delete_namespace(
495 self,
496 namespace: str,
497 db_dict: dict = None,
498 total_timeout: float = None
499 ):
500 self.info('Deleting namespace={}'.format(namespace))
501
502 if not self._authenticated:
503 await self._juju_login()
504
505 # check arguments
506 if namespace is None:
507 raise N2VCBadArgumentsException(message='namespace is mandatory', bad_args=['namespace'])
508
509 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
510 if ns_id is not None:
511 self.debug('Deleting model {}'.format(ns_id))
512 try:
513 await self._juju_destroy_model(
514 model_name=ns_id,
515 total_timeout=total_timeout
516 )
517 except Exception as e:
518 raise N2VCException(message='Error deleting namespace {} : {}'.format(namespace, e))
519 else:
520 raise N2VCBadArgumentsException(message='only ns_id is permitted to delete yet', bad_args=['namespace'])
521
522 self.info('Namespace {} deleted'.format(namespace))
523
524 async def delete_execution_environment(
525 self,
526 ee_id: str,
527 db_dict: dict = None,
528 total_timeout: float = None
529 ):
530 self.info('Deleting execution environment ee_id={}'.format(ee_id))
531
532 if not self._authenticated:
533 await self._juju_login()
534
535 # check arguments
536 if ee_id is None:
537 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
538
539 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=ee_id)
540
541 # destroy the application
542 try:
543 await self._juju_destroy_application(model_name=model_name, application_name=application_name)
544 except Exception as e:
545 raise N2VCException(message='Error deleting execution environment {} (application {}) : {}'
546 .format(ee_id, application_name, e))
547
548 # destroy the machine
549 try:
550 await self._juju_destroy_machine(
551 model_name=model_name,
552 machine_id=machine_id,
553 total_timeout=total_timeout
554 )
555 except Exception as e:
556 raise N2VCException(message='Error deleting execution environment {} (machine {}) : {}'
557 .format(ee_id, machine_id, e))
558
559 self.info('Execution environment {} deleted'.format(ee_id))
560
561 async def exec_primitive(
562 self,
563 ee_id: str,
564 primitive_name: str,
565 params_dict: dict,
566 db_dict: dict = None,
567 progress_timeout: float = None,
568 total_timeout: float = None
569 ) -> str:
570
571 self.info('Executing primitive: {} on ee: {}, params: {}'.format(primitive_name, ee_id, params_dict))
572
573 if not self._authenticated:
574 await self._juju_login()
575
576 # check arguments
577 if ee_id is None or len(ee_id) == 0:
578 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
579 if primitive_name is None or len(primitive_name) == 0:
580 raise N2VCBadArgumentsException(message='action_name is mandatory', bad_args=['action_name'])
581 if params_dict is None:
582 params_dict = dict()
583
584 try:
585 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
586 except Exception:
587 raise N2VCBadArgumentsException(
588 message='ee_id={} is not a valid execution environment id'.format(ee_id),
589 bad_args=['ee_id']
590 )
591
592 if primitive_name == 'config':
593 # Special case: config primitive
594 try:
595 await self._juju_configure_application(
596 model_name=model_name,
597 application_name=application_name,
598 config=params_dict,
599 db_dict=db_dict,
600 progress_timeout=progress_timeout,
601 total_timeout=total_timeout
602 )
603 except Exception as e:
604 self.error('Error configuring juju application: {}'.format(e))
605 raise N2VCExecutionException(
606 message='Error configuring application into ee={} : {}'.format(ee_id, e),
607 primitive_name=primitive_name
608 )
609 return 'CONFIG OK'
610 else:
611 try:
612 output, status = await self._juju_execute_action(
613 model_name=model_name,
614 application_name=application_name,
615 action_name=primitive_name,
616 db_dict=db_dict,
617 progress_timeout=progress_timeout,
618 total_timeout=total_timeout,
619 **params_dict
620 )
621 if status == 'completed':
622 return output
623 else:
624 raise Exception('status is not completed: {}'.format(status))
625 except Exception as e:
626 self.error('Error executing primitive {}: {}'.format(primitive_name, e))
627 raise N2VCExecutionException(
628 message='Error executing primitive {} into ee={} : {}'.format(primitive_name, ee_id, e),
629 primitive_name=primitive_name
630 )
631
632 async def disconnect(self):
633 self.info('closing juju N2VC...')
634 await self._juju_logout()
635
636 """
637 ##################################################################################################
638 ########################################## P R I V A T E #########################################
639 ##################################################################################################
640 """
641
642 def _write_ee_id_db(
643 self,
644 db_dict: dict,
645 ee_id: str
646 ):
647
648 # write ee_id to database: _admin.deployed.VCA.x
649 try:
650 the_table = db_dict['collection']
651 the_filter = db_dict['filter']
652 the_path = db_dict['path']
653 if not the_path[-1] == '.':
654 the_path = the_path + '.'
655 update_dict = {the_path + 'ee_id': ee_id}
656 self.debug('Writing ee_id to database: {}'.format(the_path))
657 self.db.set_one(
658 table=the_table,
659 q_filter=the_filter,
660 update_dict=update_dict,
661 fail_on_empty=True
662 )
663 except Exception as e:
664 self.error('Error writing ee_id to database: {}'.format(e))
665
666 @staticmethod
667 def _build_ee_id(
668 model_name: str,
669 application_name: str,
670 machine_id: str
671 ):
672 """
673 Build an execution environment id form model, application and machine
674 :param model_name:
675 :param application_name:
676 :param machine_id:
677 :return:
678 """
679 # id for the execution environment
680 return '{}.{}.{}'.format(model_name, application_name, machine_id)
681
682 @staticmethod
683 def _get_ee_id_components(
684 ee_id: str
685 ) -> (str, str, str):
686 """
687 Get model, application and machine components from an execution environment id
688 :param ee_id:
689 :return: model_name, application_name, machine_id
690 """
691
692 if ee_id is None:
693 return None, None, None
694
695 # split components of id
696 parts = ee_id.split('.')
697 model_name = parts[0]
698 application_name = parts[1]
699 machine_id = parts[2]
700 return model_name, application_name, machine_id
701
702 def _get_application_name(self, namespace: str) -> str:
703 """
704 Build application name from namespace
705 :param namespace:
706 :return: app-vnf-<vnf id>-vdu-<vdu-id>-cnt-<vdu-count>
707 """
708
709 # TODO: Enforce the Juju 50-character application limit
710
711 # split namespace components
712 _, _, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
713
714 if vnf_id is None or len(vnf_id) == 0:
715 vnf_id = ''
716 else:
717 # Shorten the vnf_id to its last twelve characters
718 vnf_id = 'vnf-' + vnf_id[-12:]
719
720 if vdu_id is None or len(vdu_id) == 0:
721 vdu_id = ''
722 else:
723 # Shorten the vdu_id to its last twelve characters
724 vdu_id = '-vdu-' + vdu_id[-12:]
725
726 if vdu_count is None or len(vdu_count) == 0:
727 vdu_count = ''
728 else:
729 vdu_count = '-cnt-' + vdu_count
730
731 application_name = 'app-{}{}{}'.format(vnf_id, vdu_id, vdu_count)
732
733 return N2VCJujuConnector._format_app_name(application_name)
734
735 async def _juju_create_machine(
736 self,
737 model_name: str,
738 application_name: str,
739 machine_id: str = None,
740 db_dict: dict = None,
741 progress_timeout: float = None,
742 total_timeout: float = None
743 ) -> Machine:
744
745 self.debug('creating machine in model: {}, existing machine id: {}'.format(model_name, machine_id))
746
747 # get juju model and observer (create model if needed)
748 model = await self._juju_get_model(model_name=model_name)
749 observer = self.juju_observers[model_name]
750
751 # find machine id in model
752 machine = None
753 if machine_id is not None:
754 self.debug('Finding existing machine id {} in model'.format(machine_id))
755 # get juju existing machines in the model
756 existing_machines = await model.get_machines()
757 if machine_id in existing_machines:
758 self.debug('Machine id {} found in model (reusing it)'.format(machine_id))
759 machine = model.machines[machine_id]
760
761 if machine is None:
762 self.debug('Creating a new machine in juju...')
763 # machine does not exist, create it and wait for it
764 machine = await model.add_machine(
765 spec=None,
766 constraints=None,
767 disks=None,
768 series='xenial'
769 )
770
771 # register machine with observer
772 observer.register_machine(machine=machine, db_dict=db_dict)
773
774 # id for the execution environment
775 ee_id = N2VCJujuConnector._build_ee_id(
776 model_name=model_name,
777 application_name=application_name,
778 machine_id=str(machine.entity_id)
779 )
780
781 # write ee_id in database
782 self._write_ee_id_db(
783 db_dict=db_dict,
784 ee_id=ee_id
785 )
786
787 # wait for machine creation
788 await observer.wait_for_machine(
789 machine_id=str(machine.entity_id),
790 progress_timeout=progress_timeout,
791 total_timeout=total_timeout
792 )
793
794 else:
795
796 self.debug('Reusing old machine pending')
797
798 # register machine with observer
799 observer.register_machine(machine=machine, db_dict=db_dict)
800
801 # machine does exist, but it is in creation process (pending), wait for create finalisation
802 await observer.wait_for_machine(
803 machine_id=machine.entity_id,
804 progress_timeout=progress_timeout,
805 total_timeout=total_timeout)
806
807 self.debug("Machine ready at " + str(machine.dns_name))
808 return machine
809
810 async def _juju_provision_machine(
811 self,
812 model_name: str,
813 hostname: str,
814 username: str,
815 private_key_path: str,
816 db_dict: dict = None,
817 progress_timeout: float = None,
818 total_timeout: float = None
819 ) -> str:
820
821 if not self.api_proxy:
822 msg = 'Cannot provision machine: api_proxy is not defined'
823 self.error(msg=msg)
824 raise N2VCException(message=msg)
825
826 self.debug('provisioning machine. model: {}, hostname: {}, username: {}'.format(model_name, hostname, username))
827
828 if not self._authenticated:
829 await self._juju_login()
830
831 # get juju model and observer
832 model = await self._juju_get_model(model_name=model_name)
833 observer = self.juju_observers[model_name]
834
835 # TODO check if machine is already provisioned
836 machine_list = await model.get_machines()
837
838 provisioner = SSHProvisioner(
839 host=hostname,
840 user=username,
841 private_key_path=private_key_path,
842 log=self.log
843 )
844
845 params = None
846 try:
847 params = provisioner.provision_machine()
848 except Exception as ex:
849 msg = "Exception provisioning machine: {}".format(ex)
850 self.log.error(msg)
851 raise N2VCException(message=msg)
852
853 params.jobs = ['JobHostUnits']
854
855 connection = model.connection()
856
857 # Submit the request.
858 self.debug("Adding machine to model")
859 client_facade = client.ClientFacade.from_connection(connection)
860 results = await client_facade.AddMachines(params=[params])
861 error = results.machines[0].error
862 if error:
863 msg = "Error adding machine: {}}".format(error.message)
864 self.error(msg=msg)
865 raise ValueError(msg)
866
867 machine_id = results.machines[0].machine
868
869 # Need to run this after AddMachines has been called,
870 # as we need the machine_id
871 self.debug("Installing Juju agent into machine {}".format(machine_id))
872 asyncio.ensure_future(provisioner.install_agent(
873 connection=connection,
874 nonce=params.nonce,
875 machine_id=machine_id,
876 api=self.api_proxy,
877 ))
878
879 # wait for machine in model (now, machine is not yet in model, so we must wait for it)
880 machine = None
881 for i in range(10):
882 machine_list = await model.get_machines()
883 if machine_id in machine_list:
884 self.debug('Machine {} found in model!'.format(machine_id))
885 machine = model.machines.get(machine_id)
886 break
887 await asyncio.sleep(2)
888
889 if machine is None:
890 msg = 'Machine {} not found in model'.format(machine_id)
891 self.error(msg=msg)
892 raise Exception(msg)
893
894 # register machine with observer
895 observer.register_machine(machine=machine, db_dict=db_dict)
896
897 # wait for machine creation
898 self.debug('waiting for provision finishes... {}'.format(machine_id))
899 await observer.wait_for_machine(
900 machine_id=machine_id,
901 progress_timeout=progress_timeout,
902 total_timeout=total_timeout
903 )
904
905 self.debug("Machine provisioned {}".format(machine_id))
906
907 return machine_id
908
909 async def _juju_deploy_charm(
910 self,
911 model_name: str,
912 application_name: str,
913 charm_path: str,
914 machine_id: str,
915 db_dict: dict,
916 progress_timeout: float = None,
917 total_timeout: float = None
918 ) -> (Application, int):
919
920 # get juju model and observer
921 model = await self._juju_get_model(model_name=model_name)
922 observer = self.juju_observers[model_name]
923
924 # check if application already exists
925 application = None
926 if application_name in model.applications:
927 application = model.applications[application_name]
928
929 if application is None:
930
931 # application does not exist, create it and wait for it
932 self.debug('deploying application {} to machine {}, model {}'
933 .format(application_name, machine_id, model_name))
934 self.debug('charm: {}'.format(charm_path))
935 series = 'xenial'
936 # series = None
937 application = await model.deploy(
938 entity_url=charm_path,
939 application_name=application_name,
940 channel='stable',
941 num_units=1,
942 series=series,
943 to=machine_id
944 )
945
946 # register application with observer
947 observer.register_application(application=application, db_dict=db_dict)
948
949 self.debug('waiting for application deployed... {}'.format(application.entity_id))
950 retries = await observer.wait_for_application(
951 application_id=application.entity_id,
952 progress_timeout=progress_timeout,
953 total_timeout=total_timeout)
954 self.debug('application deployed')
955
956 else:
957
958 # register application with observer
959 observer.register_application(application=application, db_dict=db_dict)
960
961 # application already exists, but not finalised
962 self.debug('application already exists, waiting for deployed...')
963 retries = await observer.wait_for_application(
964 application_id=application.entity_id,
965 progress_timeout=progress_timeout,
966 total_timeout=total_timeout)
967 self.debug('application deployed')
968
969 return application, retries
970
971 async def _juju_execute_action(
972 self,
973 model_name: str,
974 application_name: str,
975 action_name: str,
976 db_dict: dict,
977 progress_timeout: float = None,
978 total_timeout: float = None,
979 **kwargs
980 ) -> Action:
981
982 # get juju model and observer
983 model = await self._juju_get_model(model_name=model_name)
984 observer = self.juju_observers[model_name]
985
986 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
987
988 self.debug('trying to execute action {}'.format(action_name))
989 unit = application.units[0]
990 if unit is not None:
991 actions = await application.get_actions()
992 if action_name in actions:
993 self.debug('executing action {} with params {}'.format(action_name, kwargs))
994 action = await unit.run_action(action_name, **kwargs)
995
996 # register action with observer
997 observer.register_action(action=action, db_dict=db_dict)
998
999 self.debug(' waiting for action completed or error...')
1000 await observer.wait_for_action(
1001 action_id=action.entity_id,
1002 progress_timeout=progress_timeout,
1003 total_timeout=total_timeout)
1004 self.debug('action completed with status: {}'.format(action.status))
1005 output = await model.get_action_output(action_uuid=action.entity_id)
1006 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1007 if action.entity_id in status:
1008 status = status[action.entity_id]
1009 else:
1010 status = 'failed'
1011 return output, status
1012
1013 raise N2VCExecutionException(
1014 message='Cannot execute action on charm',
1015 primitive_name=action_name
1016 )
1017
1018 async def _juju_configure_application(
1019 self,
1020 model_name: str,
1021 application_name: str,
1022 config: dict,
1023 db_dict: dict,
1024 progress_timeout: float = None,
1025 total_timeout: float = None
1026 ):
1027
1028 # get the application
1029 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
1030
1031 self.debug('configuring the application {} -> {}'.format(application_name, config))
1032 res = await application.set_config(config)
1033 self.debug('application {} configured. res={}'.format(application_name, res))
1034
1035 # Verify the config is set
1036 new_conf = await application.get_config()
1037 for key in config:
1038 value = new_conf[key]['value']
1039 self.debug(' {} = {}'.format(key, value))
1040 if config[key] != value:
1041 raise N2VCException(
1042 message='key {} is not configured correctly {} != {}'.format(key, config[key], new_conf[key])
1043 )
1044
1045 # check if 'verify-ssh-credentials' action exists
1046 # unit = application.units[0]
1047 actions = await application.get_actions()
1048 if 'verify-ssh-credentials' not in actions:
1049 msg = 'Action verify-ssh-credentials does not exist in application {}'.format(application_name)
1050 self.debug(msg=msg)
1051 return False
1052
1053 # execute verify-credentials
1054 num_retries = 20
1055 retry_timeout = 15.0
1056 for i in range(num_retries):
1057 try:
1058 self.debug('Executing action verify-ssh-credentials...')
1059 output, ok = await self._juju_execute_action(
1060 model_name=model_name,
1061 application_name=application_name,
1062 action_name='verify-ssh-credentials',
1063 db_dict=db_dict,
1064 progress_timeout=progress_timeout,
1065 total_timeout=total_timeout
1066 )
1067 self.debug('Result: {}, output: {}'.format(ok, output))
1068 return True
1069 except Exception as e:
1070 self.debug('Error executing verify-ssh-credentials: {}. Retrying...'.format(e))
1071 await asyncio.sleep(retry_timeout)
1072 else:
1073 self.error('Error executing verify-ssh-credentials after {} retries. '.format(num_retries))
1074 return False
1075
1076 async def _juju_get_application(
1077 self,
1078 model_name: str,
1079 application_name: str
1080 ):
1081 """Get the deployed application."""
1082
1083 model = await self._juju_get_model(model_name=model_name)
1084
1085 application_name = N2VCJujuConnector._format_app_name(application_name)
1086
1087 if model.applications and application_name in model.applications:
1088 return model.applications[application_name]
1089 else:
1090 raise N2VCException(message='Cannot get application {} from model {}'.format(application_name, model_name))
1091
1092 async def _juju_get_model(self, model_name: str) -> Model:
1093 """ Get a model object from juju controller
1094
1095 :param str model_name: name of the model
1096 :returns Model: model obtained from juju controller or Exception
1097 """
1098
1099 # format model name
1100 model_name = N2VCJujuConnector._format_model_name(model_name)
1101
1102 if model_name in self.juju_models:
1103 return self.juju_models[model_name]
1104
1105 if self._creating_model:
1106 self.debug('Another coroutine is creating a model. Wait...')
1107 while self._creating_model:
1108 # another coroutine is creating a model, wait
1109 await asyncio.sleep(0.1)
1110 # retry (perhaps another coroutine has created the model meanwhile)
1111 if model_name in self.juju_models:
1112 return self.juju_models[model_name]
1113
1114 try:
1115 self._creating_model = True
1116
1117 # get juju model names from juju
1118 model_list = await self.controller.list_models()
1119
1120 if model_name not in model_list:
1121 self.info('Model {} does not exist. Creating new model...'.format(model_name))
1122 model = await self.controller.add_model(
1123 model_name=model_name,
1124 config={'authorized-keys': self.public_key}
1125 )
1126 self.info('New model created, name={}'.format(model_name))
1127 else:
1128 self.debug('Model already exists in juju. Getting model {}'.format(model_name))
1129 model = await self.controller.get_model(model_name)
1130 self.debug('Existing model in juju, name={}'.format(model_name))
1131
1132 self.juju_models[model_name] = model
1133 self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model)
1134 return model
1135
1136 except Exception as e:
1137 msg = 'Cannot get model {}. Exception: {}'.format(model_name, e)
1138 self.error(msg)
1139 raise N2VCException(msg)
1140 finally:
1141 self._creating_model = False
1142
1143 async def _juju_add_relation(
1144 self,
1145 model_name: str,
1146 application_name_1: str,
1147 application_name_2: str,
1148 relation_1: str,
1149 relation_2: str
1150 ):
1151
1152 self.debug('adding relation')
1153
1154 # get juju model and observer
1155 model = await self._juju_get_model(model_name=model_name)
1156
1157 r1 = '{}:{}'.format(application_name_1, relation_1)
1158 r2 = '{}:{}'.format(application_name_2, relation_2)
1159 await model.add_relation(relation1=r1, relation2=r2)
1160
1161 async def _juju_destroy_application(
1162 self,
1163 model_name: str,
1164 application_name: str
1165 ):
1166
1167 self.debug('Destroying application {} in model {}'.format(application_name, model_name))
1168
1169 # get juju model and observer
1170 model = await self._juju_get_model(model_name=model_name)
1171
1172 application = model.applications.get(application_name)
1173 if application:
1174 await application.destroy()
1175 else:
1176 self.debug('Application not found: {}'.format(application_name))
1177
1178 async def _juju_destroy_machine(
1179 self,
1180 model_name: str,
1181 machine_id: str,
1182 total_timeout: float = None
1183 ):
1184
1185 self.debug('Destroying machine {} in model {}'.format(machine_id, model_name))
1186
1187 if total_timeout is None:
1188 total_timeout = 3600
1189
1190 # get juju model and observer
1191 model = await self._juju_get_model(model_name=model_name)
1192
1193 machines = await model.get_machines()
1194 if machine_id in machines:
1195 machine = model.machines[machine_id]
1196 await machine.destroy(force=True)
1197 # max timeout
1198 end = time.time() + total_timeout
1199 # wait for machine removal
1200 machines = await model.get_machines()
1201 while machine_id in machines and time.time() < end:
1202 self.debug('Waiting for machine {} is destroyed'.format(machine_id))
1203 await asyncio.sleep(0.5)
1204 machines = await model.get_machines()
1205 self.debug('Machine destroyed: {}'.format(machine_id))
1206 else:
1207 self.debug('Machine not found: {}'.format(machine_id))
1208
1209 async def _juju_destroy_model(
1210 self,
1211 model_name: str,
1212 total_timeout: float = None
1213 ):
1214
1215 self.debug('Destroying model {}'.format(model_name))
1216
1217 if total_timeout is None:
1218 total_timeout = 3600
1219
1220 model = await self._juju_get_model(model_name=model_name)
1221 uuid = model.info.uuid
1222
1223 self.debug('disconnecting model {}...'.format(model_name))
1224 await self._juju_disconnect_model(model_name=model_name)
1225 self.juju_models[model_name] = None
1226 self.juju_observers[model_name] = None
1227
1228 self.debug('destroying model {}...'.format(model_name))
1229 await self.controller.destroy_model(uuid)
1230
1231 # wait for model is completely destroyed
1232 end = time.time() + total_timeout
1233 while time.time() < end:
1234 self.debug('waiting for model is destroyed...')
1235 try:
1236 await self.controller.get_model(uuid)
1237 except Exception:
1238 self.debug('model destroyed')
1239 return
1240 await asyncio.sleep(1.0)
1241
1242 async def _juju_login(self):
1243 """Connect to juju controller
1244
1245 """
1246
1247 # if already authenticated, exit function
1248 if self._authenticated:
1249 return
1250
1251 # if connecting, wait for finish
1252 # another task could be trying to connect in parallel
1253 while self._connecting:
1254 await asyncio.sleep(0.1)
1255
1256 # double check after other task has finished
1257 if self._authenticated:
1258 return
1259
1260 try:
1261 self._connecting = True
1262 self.info(
1263 'connecting to juju controller: {} {}:{} ca_cert: {}'
1264 .format(self.url, self.username, self.secret, '\n'+self.ca_cert if self.ca_cert else 'None'))
1265
1266 # Create controller object
1267 self.controller = Controller(loop=self.loop)
1268 # Connect to controller
1269 await self.controller.connect(
1270 endpoint=self.url,
1271 username=self.username,
1272 password=self.secret,
1273 cacert=self.ca_cert
1274 )
1275 self._authenticated = True
1276 self.info('juju controller connected')
1277 except Exception as e:
1278 message = 'Exception connecting to juju: {}'.format(e)
1279 self.error(message)
1280 raise N2VCConnectionException(
1281 message=message,
1282 url=self.url
1283 )
1284 finally:
1285 self._connecting = False
1286
1287 async def _juju_logout(self):
1288 """Logout of the Juju controller."""
1289 if not self._authenticated:
1290 return False
1291
1292 # disconnect all models
1293 for model_name in self.juju_models:
1294 try:
1295 await self._juju_disconnect_model(model_name)
1296 except Exception as e:
1297 self.error('Error disconnecting model {} : {}'.format(model_name, e))
1298 # continue with next model...
1299
1300 self.info("Disconnecting controller")
1301 try:
1302 await self.controller.disconnect()
1303 except Exception as e:
1304 raise N2VCConnectionException(message='Error disconnecting controller: {}'.format(e), url=self.url)
1305
1306 self.controller = None
1307 self._authenticated = False
1308 self.info('disconnected')
1309
1310 async def _juju_disconnect_model(
1311 self,
1312 model_name: str
1313 ):
1314 self.debug("Disconnecting model {}".format(model_name))
1315 if model_name in self.juju_models:
1316 await self.juju_models[model_name].disconnect()
1317 self.juju_models[model_name] = None
1318 self.juju_observers[model_name] = None
1319
1320 def _create_juju_public_key(self):
1321 """Recreate the Juju public key on lcm container, if needed
1322 Certain libjuju commands expect to be run from the same machine as Juju
1323 is bootstrapped to. This method will write the public key to disk in
1324 that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
1325 """
1326
1327 # Make sure that we have a public key before writing to disk
1328 if self.public_key is None or len(self.public_key) == 0:
1329 if 'OSMLCM_VCA_PUBKEY' in os.environ:
1330 self.public_key = os.getenv('OSMLCM_VCA_PUBKEY', '')
1331 if len(self.public_key) == 0:
1332 return
1333 else:
1334 return
1335
1336 pk_path = "{}/.local/share/juju/ssh".format(os.path.expanduser('~'))
1337 file_path = "{}/juju_id_rsa.pub".format(pk_path)
1338 self.debug('writing juju public key to file:\n{}\npublic key: {}'.format(file_path, self.public_key))
1339 if not os.path.exists(pk_path):
1340 # create path and write file
1341 os.makedirs(pk_path)
1342 with open(file_path, 'w') as f:
1343 self.debug('Creating juju public key file: {}'.format(file_path))
1344 f.write(self.public_key)
1345 else:
1346 self.debug('juju public key file already exists: {}'.format(file_path))
1347
1348 @staticmethod
1349 def _format_model_name(name: str) -> str:
1350 """Format the name of the model.
1351
1352 Model names may only contain lowercase letters, digits and hyphens
1353 """
1354
1355 return name.replace('_', '-').replace(' ', '-').lower()
1356
1357 @staticmethod
1358 def _format_app_name(name: str) -> str:
1359 """Format the name of the application (in order to assure valid application name).
1360
1361 Application names have restrictions (run juju deploy --help):
1362 - contains lowercase letters 'a'-'z'
1363 - contains numbers '0'-'9'
1364 - contains hyphens '-'
1365 - starts with a lowercase letter
1366 - not two or more consecutive hyphens
1367 - after a hyphen, not a group with all numbers
1368 """
1369
1370 def all_numbers(s: str) -> bool:
1371 for c in s:
1372 if not c.isdigit():
1373 return False
1374 return True
1375
1376 new_name = name.replace('_', '-')
1377 new_name = new_name.replace(' ', '-')
1378 new_name = new_name.lower()
1379 while new_name.find('--') >= 0:
1380 new_name = new_name.replace('--', '-')
1381 groups = new_name.split('-')
1382
1383 # find 'all numbers' groups and prefix them with a letter
1384 app_name = ''
1385 for i in range(len(groups)):
1386 group = groups[i]
1387 if all_numbers(group):
1388 group = 'z' + group
1389 if i > 0:
1390 app_name += '-'
1391 app_name += group
1392
1393 if app_name[0].isdigit():
1394 app_name = 'z' + app_name
1395
1396 return app_name