Changes in NS and operation status
[osm/N2VC.git] / n2vc / n2vc_juju_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import logging
24 import os
25 import asyncio
26 import time
27 import base64
28 import binascii
29 import re
30
31 from n2vc.n2vc_conn import N2VCConnector
32 from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
33 from n2vc.exceptions \
34 import N2VCBadArgumentsException, N2VCException, N2VCConnectionException, \
35 N2VCExecutionException, N2VCInvalidCertificate
36 from n2vc.juju_observer import JujuModelObserver
37
38 from juju.controller import Controller
39 from juju.model import Model
40 from juju.application import Application
41 from juju.action import Action
42 from juju.machine import Machine
43 from juju.client import client
44
45 from n2vc.provisioner import SSHProvisioner
46
47
48 class N2VCJujuConnector(N2VCConnector):
49
50 """
51 ##################################################################################################
52 ########################################## P U B L I C ###########################################
53 ##################################################################################################
54 """
55
56 def __init__(
57 self,
58 db: object,
59 fs: object,
60 log: object = None,
61 loop: object = None,
62 url: str = '127.0.0.1:17070',
63 username: str = 'admin',
64 vca_config: dict = None,
65 on_update_db=None
66 ):
67 """Initialize juju N2VC connector
68 """
69
70 # parent class constructor
71 N2VCConnector.__init__(
72 self,
73 db=db,
74 fs=fs,
75 log=log,
76 loop=loop,
77 url=url,
78 username=username,
79 vca_config=vca_config,
80 on_update_db=on_update_db
81 )
82
83 # silence websocket traffic log
84 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
85 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
86 logging.getLogger('model').setLevel(logging.WARN)
87
88 self.info('Initializing N2VC juju connector...')
89
90 """
91 ##############################################################
92 # check arguments
93 ##############################################################
94 """
95
96 # juju URL
97 if url is None:
98 raise N2VCBadArgumentsException('Argument url is mandatory', ['url'])
99 url_parts = url.split(':')
100 if len(url_parts) != 2:
101 raise N2VCBadArgumentsException('Argument url: bad format (localhost:port) -> {}'.format(url), ['url'])
102 self.hostname = url_parts[0]
103 try:
104 self.port = int(url_parts[1])
105 except ValueError:
106 raise N2VCBadArgumentsException('url port must be a number -> {}'.format(url), ['url'])
107
108 # juju USERNAME
109 if username is None:
110 raise N2VCBadArgumentsException('Argument username is mandatory', ['username'])
111
112 # juju CONFIGURATION
113 if vca_config is None:
114 raise N2VCBadArgumentsException('Argument vca_config is mandatory', ['vca_config'])
115
116 if 'secret' in vca_config:
117 self.secret = vca_config['secret']
118 else:
119 raise N2VCBadArgumentsException('Argument vca_config.secret is mandatory', ['vca_config.secret'])
120
121 # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub
122 # if exists, it will be written in lcm container: _create_juju_public_key()
123 if 'public_key' in vca_config:
124 self.public_key = vca_config['public_key']
125 else:
126 self.public_key = None
127
128 # TODO: Verify ca_cert is valid before using. VCA will crash
129 # if the ca_cert isn't formatted correctly.
130 def base64_to_cacert(b64string):
131 """Convert the base64-encoded string containing the VCA CACERT.
132
133 The input string....
134
135 """
136 try:
137 cacert = base64.b64decode(b64string).decode("utf-8")
138
139 cacert = re.sub(
140 r'\\n',
141 r'\n',
142 cacert,
143 )
144 except binascii.Error as e:
145 self.debug("Caught binascii.Error: {}".format(e))
146 raise N2VCInvalidCertificate(message="Invalid CA Certificate")
147
148 return cacert
149
150 self.ca_cert = vca_config.get('ca_cert')
151 if self.ca_cert:
152 self.ca_cert = base64_to_cacert(vca_config['ca_cert'])
153
154 if 'api_proxy' in vca_config:
155 self.api_proxy = vca_config['api_proxy']
156 self.debug('api_proxy for native charms configured: {}'.format(self.api_proxy))
157 else:
158 self.warning('api_proxy is not configured. Support for native charms is disabled')
159
160 self.debug('Arguments have been checked')
161
162 # juju data
163 self.controller = None # it will be filled when connect to juju
164 self.juju_models = {} # model objects for every model_name
165 self.juju_observers = {} # model observers for every model_name
166 self._connecting = False # while connecting to juju (to avoid duplicate connections)
167 self._authenticated = False # it will be True when juju connection be stablished
168 self._creating_model = False # True during model creation
169
170 # create juju pub key file in lcm container at ./local/share/juju/ssh/juju_id_rsa.pub
171 self._create_juju_public_key()
172
173 self.info('N2VC juju connector initialized')
174
175 async def get_status(self, namespace: str, yaml_format: bool = True):
176
177 # self.info('Getting NS status. namespace: {}'.format(namespace))
178
179 if not self._authenticated:
180 await self._juju_login()
181
182 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
183 # model name is ns_id
184 model_name = ns_id
185 if model_name is None:
186 msg = 'Namespace {} not valid'.format(namespace)
187 self.error(msg)
188 raise N2VCBadArgumentsException(msg, ['namespace'])
189
190 # get juju model (create model if needed)
191 model = await self._juju_get_model(model_name=model_name)
192
193 status = await model.get_status()
194
195 if yaml_format:
196 return obj_to_yaml(status)
197 else:
198 return obj_to_dict(status)
199
200 async def create_execution_environment(
201 self,
202 namespace: str,
203 db_dict: dict,
204 reuse_ee_id: str = None,
205 progress_timeout: float = None,
206 total_timeout: float = None
207 ) -> (str, dict):
208
209 self.info('Creating execution environment. namespace: {}, reuse_ee_id: {}'.format(namespace, reuse_ee_id))
210
211 if not self._authenticated:
212 await self._juju_login()
213
214 machine_id = None
215 if reuse_ee_id:
216 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=reuse_ee_id)
217 else:
218 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
219 # model name is ns_id
220 model_name = ns_id
221 # application name
222 application_name = self._get_application_name(namespace=namespace)
223
224 self.debug('model name: {}, application name: {}, machine_id: {}'
225 .format(model_name, application_name, machine_id))
226
227 # create or reuse a new juju machine
228 try:
229 machine = await self._juju_create_machine(
230 model_name=model_name,
231 application_name=application_name,
232 machine_id=machine_id,
233 db_dict=db_dict,
234 progress_timeout=progress_timeout,
235 total_timeout=total_timeout
236 )
237 except Exception as e:
238 message = 'Error creating machine on juju: {}'.format(e)
239 self.error(message)
240 raise N2VCException(message=message)
241
242 # id for the execution environment
243 ee_id = N2VCJujuConnector._build_ee_id(
244 model_name=model_name,
245 application_name=application_name,
246 machine_id=str(machine.entity_id)
247 )
248 self.debug('ee_id: {}'.format(ee_id))
249
250 # new machine credentials
251 credentials = dict()
252 credentials['hostname'] = machine.dns_name
253
254 self.info('Execution environment created. ee_id: {}, credentials: {}'.format(ee_id, credentials))
255
256 return ee_id, credentials
257
258 async def register_execution_environment(
259 self,
260 namespace: str,
261 credentials: dict,
262 db_dict: dict,
263 progress_timeout: float = None,
264 total_timeout: float = None
265 ) -> str:
266
267 if not self._authenticated:
268 await self._juju_login()
269
270 self.info('Registering execution environment. namespace={}, credentials={}'.format(namespace, credentials))
271
272 if credentials is None:
273 raise N2VCBadArgumentsException(message='credentials are mandatory', bad_args=['credentials'])
274 if credentials.get('hostname'):
275 hostname = credentials['hostname']
276 else:
277 raise N2VCBadArgumentsException(message='hostname is mandatory', bad_args=['credentials.hostname'])
278 if credentials.get('username'):
279 username = credentials['username']
280 else:
281 raise N2VCBadArgumentsException(message='username is mandatory', bad_args=['credentials.username'])
282 if 'private_key_path' in credentials:
283 private_key_path = credentials['private_key_path']
284 else:
285 # if not passed as argument, use generated private key path
286 private_key_path = self.private_key_path
287
288 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
289
290 # model name
291 model_name = ns_id
292 # application name
293 application_name = self._get_application_name(namespace=namespace)
294
295 # register machine on juju
296 try:
297 machine_id = await self._juju_provision_machine(
298 model_name=model_name,
299 hostname=hostname,
300 username=username,
301 private_key_path=private_key_path,
302 db_dict=db_dict,
303 progress_timeout=progress_timeout,
304 total_timeout=total_timeout
305 )
306 except Exception as e:
307 self.error('Error registering machine: {}'.format(e))
308 raise N2VCException(message='Error registering machine on juju: {}'.format(e))
309
310 self.info('Machine registered: {}'.format(machine_id))
311
312 # id for the execution environment
313 ee_id = N2VCJujuConnector._build_ee_id(
314 model_name=model_name,
315 application_name=application_name,
316 machine_id=str(machine_id)
317 )
318
319 self.info('Execution environment registered. ee_id: {}'.format(ee_id))
320
321 return ee_id
322
323 async def install_configuration_sw(
324 self,
325 ee_id: str,
326 artifact_path: str,
327 db_dict: dict,
328 progress_timeout: float = None,
329 total_timeout: float = None
330 ):
331
332 self.info('Installing configuration sw on ee_id: {}, artifact path: {}, db_dict: {}'
333 .format(ee_id, artifact_path, db_dict))
334
335 if not self._authenticated:
336 await self._juju_login()
337
338 # check arguments
339 if ee_id is None or len(ee_id) == 0:
340 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
341 if artifact_path is None or len(artifact_path) == 0:
342 raise N2VCBadArgumentsException(message='artifact_path is mandatory', bad_args=['artifact_path'])
343 if db_dict is None:
344 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
345
346 try:
347 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
348 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
349 except Exception as e:
350 raise N2VCBadArgumentsException(
351 message='ee_id={} is not a valid execution environment id'.format(ee_id),
352 bad_args=['ee_id']
353 )
354
355 # remove // in charm path
356 while artifact_path.find('//') >= 0:
357 artifact_path = artifact_path.replace('//', '/')
358
359 # check charm path
360 if not self.fs.file_exists(artifact_path, mode="dir"):
361 msg = 'artifact path does not exist: {}'.format(artifact_path)
362 raise N2VCBadArgumentsException(message=msg, bad_args=['artifact_path'])
363
364 if artifact_path.startswith('/'):
365 full_path = self.fs.path + artifact_path
366 else:
367 full_path = self.fs.path + '/' + artifact_path
368
369 try:
370 application, retries = await self._juju_deploy_charm(
371 model_name=model_name,
372 application_name=application_name,
373 charm_path=full_path,
374 machine_id=machine_id,
375 db_dict=db_dict,
376 progress_timeout=progress_timeout,
377 total_timeout=total_timeout
378 )
379 except Exception as e:
380 raise N2VCException(message='Error desploying charm into ee={} : {}'.format(ee_id, e))
381
382 self.info('Configuration sw installed')
383
384 async def get_ee_ssh_public__key(
385 self,
386 ee_id: str,
387 db_dict: dict,
388 progress_timeout: float = None,
389 total_timeout: float = None
390 ) -> str:
391
392 self.info('Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}'.format(ee_id, db_dict))
393
394 if not self._authenticated:
395 await self._juju_login()
396
397 # check arguments
398 if ee_id is None or len(ee_id) == 0:
399 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
400 if db_dict is None:
401 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
402
403 try:
404 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
405 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
406 except Exception as e:
407 raise N2VCBadArgumentsException(
408 message='ee_id={} is not a valid execution environment id'.format(ee_id),
409 bad_args=['ee_id']
410 )
411
412 # try to execute ssh layer primitives (if exist):
413 # generate-ssh-key
414 # get-ssh-public-key
415
416 output = None
417
418 # execute action: generate-ssh-key
419 try:
420 output, status = await self._juju_execute_action(
421 model_name=model_name,
422 application_name=application_name,
423 action_name='generate-ssh-key',
424 db_dict=db_dict,
425 progress_timeout=progress_timeout,
426 total_timeout=total_timeout
427 )
428 except Exception as e:
429 self.info('Cannot execute action generate-ssh-key: {}\nContinuing...'.format(e))
430
431 # execute action: get-ssh-public-key
432 try:
433 output, status = await self._juju_execute_action(
434 model_name=model_name,
435 application_name=application_name,
436 action_name='get-ssh-public-key',
437 db_dict=db_dict,
438 progress_timeout=progress_timeout,
439 total_timeout=total_timeout
440 )
441 except Exception as e:
442 msg = 'Cannot execute action get-ssh-public-key: {}\n'.format(e)
443 self.info(msg)
444 raise e
445
446 # return public key if exists
447 return output["pubkey"] if "pubkey" in output else output
448
449 async def add_relation(
450 self,
451 ee_id_1: str,
452 ee_id_2: str,
453 endpoint_1: str,
454 endpoint_2: str
455 ):
456
457 self.debug('adding new relation between {} and {}, endpoints: {}, {}'
458 .format(ee_id_1, ee_id_2, endpoint_1, endpoint_2))
459
460 if not self._authenticated:
461 await self._juju_login()
462
463 # get model, application and machines
464 model_1, app_1, machine_1 = self._get_ee_id_components(ee_id_1)
465 model_2, app_2, machine_2 = self._get_ee_id_components(ee_id_2)
466
467 # model must be the same
468 if model_1 != model_2:
469 message = 'EE models are not the same: {} vs {}'.format(ee_id_1, ee_id_2)
470 self.error(message)
471 raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_1', 'ee_id_2'])
472
473 # add juju relations between two applications
474 try:
475 self._juju_add_relation()
476 except Exception as e:
477 message = 'Error adding relation between {} and {}'.format(ee_id_1, ee_id_2)
478 self.error(message)
479 raise N2VCException(message=message)
480
481 async def remove_relation(
482 self
483 ):
484 if not self._authenticated:
485 await self._juju_login()
486 # TODO
487 self.info('Method not implemented yet')
488 raise NotImplemented()
489
490 async def deregister_execution_environments(
491 self
492 ):
493 if not self._authenticated:
494 await self._juju_login()
495 # TODO
496 self.info('Method not implemented yet')
497 raise NotImplemented()
498
499 async def delete_namespace(
500 self,
501 namespace: str,
502 db_dict: dict = None,
503 total_timeout: float = None
504 ):
505 self.info('Deleting namespace={}'.format(namespace))
506
507 if not self._authenticated:
508 await self._juju_login()
509
510 # check arguments
511 if namespace is None:
512 raise N2VCBadArgumentsException(message='namespace is mandatory', bad_args=['namespace'])
513
514 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
515 if ns_id is not None:
516 try:
517 await self._juju_destroy_model(
518 model_name=ns_id,
519 total_timeout=total_timeout
520 )
521 except Exception as e:
522 raise N2VCException(message='Error deleting namespace {} : {}'.format(namespace, e))
523 else:
524 raise N2VCBadArgumentsException(message='only ns_id is permitted to delete yet', bad_args=['namespace'])
525
526 self.info('Namespace {} deleted'.format(namespace))
527
528 async def delete_execution_environment(
529 self,
530 ee_id: str,
531 db_dict: dict = None,
532 total_timeout: float = None
533 ):
534 self.info('Deleting execution environment ee_id={}'.format(ee_id))
535
536 if not self._authenticated:
537 await self._juju_login()
538
539 # check arguments
540 if ee_id is None:
541 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
542
543 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=ee_id)
544
545 # destroy the application
546 try:
547 await self._juju_destroy_application(model_name=model_name, application_name=application_name)
548 except Exception as e:
549 raise N2VCException(message='Error deleting execution environment {} (application {}) : {}'
550 .format(ee_id, application_name, e))
551
552 # destroy the machine
553 try:
554 await self._juju_destroy_machine(
555 model_name=model_name,
556 machine_id=machine_id,
557 total_timeout=total_timeout
558 )
559 except Exception as e:
560 raise N2VCException(message='Error deleting execution environment {} (machine {}) : {}'
561 .format(ee_id, machine_id, e))
562
563 self.info('Execution environment {} deleted'.format(ee_id))
564
565 async def exec_primitive(
566 self,
567 ee_id: str,
568 primitive_name: str,
569 params_dict: dict,
570 db_dict: dict = None,
571 progress_timeout: float = None,
572 total_timeout: float = None
573 ) -> str:
574
575 self.info('Executing primitive: {} on ee: {}, params: {}'.format(primitive_name, ee_id, params_dict))
576
577 if not self._authenticated:
578 await self._juju_login()
579
580 # check arguments
581 if ee_id is None or len(ee_id) == 0:
582 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
583 if primitive_name is None or len(primitive_name) == 0:
584 raise N2VCBadArgumentsException(message='action_name is mandatory', bad_args=['action_name'])
585 if params_dict is None:
586 params_dict = dict()
587
588 try:
589 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
590 except Exception:
591 raise N2VCBadArgumentsException(
592 message='ee_id={} is not a valid execution environment id'.format(ee_id),
593 bad_args=['ee_id']
594 )
595
596 if primitive_name == 'config':
597 # Special case: config primitive
598 try:
599 await self._juju_configure_application(
600 model_name=model_name,
601 application_name=application_name,
602 config=params_dict,
603 db_dict=db_dict,
604 progress_timeout=progress_timeout,
605 total_timeout=total_timeout
606 )
607 except Exception as e:
608 self.error('Error configuring juju application: {}'.format(e))
609 raise N2VCExecutionException(
610 message='Error configuring application into ee={} : {}'.format(ee_id, e),
611 primitive_name=primitive_name
612 )
613 return 'CONFIG OK'
614 else:
615 try:
616 output, status = await self._juju_execute_action(
617 model_name=model_name,
618 application_name=application_name,
619 action_name=primitive_name,
620 db_dict=db_dict,
621 progress_timeout=progress_timeout,
622 total_timeout=total_timeout,
623 **params_dict
624 )
625 if status == 'completed':
626 return output
627 else:
628 raise Exception('status is not completed: {}'.format(status))
629 except Exception as e:
630 self.error('Error executing primitive {}: {}'.format(primitive_name, e))
631 raise N2VCExecutionException(
632 message='Error executing primitive {} into ee={} : {}'.format(primitive_name, ee_id, e),
633 primitive_name=primitive_name
634 )
635
636 async def disconnect(self):
637 self.info('closing juju N2VC...')
638 await self._juju_logout()
639
640 """
641 ##################################################################################################
642 ########################################## P R I V A T E #########################################
643 ##################################################################################################
644 """
645
646 def _write_ee_id_db(
647 self,
648 db_dict: dict,
649 ee_id: str
650 ):
651
652 # write ee_id to database: _admin.deployed.VCA.x
653 try:
654 the_table = db_dict['collection']
655 the_filter = db_dict['filter']
656 the_path = db_dict['path']
657 if not the_path[-1] == '.':
658 the_path = the_path + '.'
659 update_dict = {the_path + 'ee_id': ee_id}
660 # self.debug('Writing ee_id to database: {}'.format(the_path))
661 self.db.set_one(
662 table=the_table,
663 q_filter=the_filter,
664 update_dict=update_dict,
665 fail_on_empty=True
666 )
667 except Exception as e:
668 self.error('Error writing ee_id to database: {}'.format(e))
669
670 @staticmethod
671 def _build_ee_id(
672 model_name: str,
673 application_name: str,
674 machine_id: str
675 ):
676 """
677 Build an execution environment id form model, application and machine
678 :param model_name:
679 :param application_name:
680 :param machine_id:
681 :return:
682 """
683 # id for the execution environment
684 return '{}.{}.{}'.format(model_name, application_name, machine_id)
685
686 @staticmethod
687 def _get_ee_id_components(
688 ee_id: str
689 ) -> (str, str, str):
690 """
691 Get model, application and machine components from an execution environment id
692 :param ee_id:
693 :return: model_name, application_name, machine_id
694 """
695
696 if ee_id is None:
697 return None, None, None
698
699 # split components of id
700 parts = ee_id.split('.')
701 model_name = parts[0]
702 application_name = parts[1]
703 machine_id = parts[2]
704 return model_name, application_name, machine_id
705
706 def _get_application_name(self, namespace: str) -> str:
707 """
708 Build application name from namespace
709 :param namespace:
710 :return: app-vnf-<vnf id>-vdu-<vdu-id>-cnt-<vdu-count>
711 """
712
713 # TODO: Enforce the Juju 50-character application limit
714
715 # split namespace components
716 _, _, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
717
718 if vnf_id is None or len(vnf_id) == 0:
719 vnf_id = ''
720 else:
721 # Shorten the vnf_id to its last twelve characters
722 vnf_id = 'vnf-' + vnf_id[-12:]
723
724 if vdu_id is None or len(vdu_id) == 0:
725 vdu_id = ''
726 else:
727 # Shorten the vdu_id to its last twelve characters
728 vdu_id = '-vdu-' + vdu_id[-12:]
729
730 if vdu_count is None or len(vdu_count) == 0:
731 vdu_count = ''
732 else:
733 vdu_count = '-cnt-' + vdu_count
734
735 application_name = 'app-{}{}{}'.format(vnf_id, vdu_id, vdu_count)
736
737 return N2VCJujuConnector._format_app_name(application_name)
738
739 async def _juju_create_machine(
740 self,
741 model_name: str,
742 application_name: str,
743 machine_id: str = None,
744 db_dict: dict = None,
745 progress_timeout: float = None,
746 total_timeout: float = None
747 ) -> Machine:
748
749 self.debug('creating machine in model: {}, existing machine id: {}'.format(model_name, machine_id))
750
751 # get juju model and observer (create model if needed)
752 model = await self._juju_get_model(model_name=model_name)
753 observer = self.juju_observers[model_name]
754
755 # find machine id in model
756 machine = None
757 if machine_id is not None:
758 self.debug('Finding existing machine id {} in model'.format(machine_id))
759 # get juju existing machines in the model
760 existing_machines = await model.get_machines()
761 if machine_id in existing_machines:
762 self.debug('Machine id {} found in model (reusing it)'.format(machine_id))
763 machine = model.machines[machine_id]
764
765 if machine is None:
766 self.debug('Creating a new machine in juju...')
767 # machine does not exist, create it and wait for it
768 machine = await model.add_machine(
769 spec=None,
770 constraints=None,
771 disks=None,
772 series='xenial'
773 )
774
775 # register machine with observer
776 observer.register_machine(machine=machine, db_dict=db_dict)
777
778 # id for the execution environment
779 ee_id = N2VCJujuConnector._build_ee_id(
780 model_name=model_name,
781 application_name=application_name,
782 machine_id=str(machine.entity_id)
783 )
784
785 # write ee_id in database
786 self._write_ee_id_db(
787 db_dict=db_dict,
788 ee_id=ee_id
789 )
790
791 # wait for machine creation
792 await observer.wait_for_machine(
793 machine_id=str(machine.entity_id),
794 progress_timeout=progress_timeout,
795 total_timeout=total_timeout
796 )
797
798 else:
799
800 self.debug('Reusing old machine pending')
801
802 # register machine with observer
803 observer.register_machine(machine=machine, db_dict=db_dict)
804
805 # machine does exist, but it is in creation process (pending), wait for create finalisation
806 await observer.wait_for_machine(
807 machine_id=machine.entity_id,
808 progress_timeout=progress_timeout,
809 total_timeout=total_timeout)
810
811 self.debug("Machine ready at " + str(machine.dns_name))
812 return machine
813
814 async def _juju_provision_machine(
815 self,
816 model_name: str,
817 hostname: str,
818 username: str,
819 private_key_path: str,
820 db_dict: dict = None,
821 progress_timeout: float = None,
822 total_timeout: float = None
823 ) -> str:
824
825 if not self.api_proxy:
826 msg = 'Cannot provision machine: api_proxy is not defined'
827 self.error(msg=msg)
828 raise N2VCException(message=msg)
829
830 self.debug('provisioning machine. model: {}, hostname: {}, username: {}'.format(model_name, hostname, username))
831
832 if not self._authenticated:
833 await self._juju_login()
834
835 # get juju model and observer
836 model = await self._juju_get_model(model_name=model_name)
837 observer = self.juju_observers[model_name]
838
839 # TODO check if machine is already provisioned
840 machine_list = await model.get_machines()
841
842 provisioner = SSHProvisioner(
843 host=hostname,
844 user=username,
845 private_key_path=private_key_path,
846 log=self.log
847 )
848
849 params = None
850 try:
851 params = provisioner.provision_machine()
852 except Exception as ex:
853 msg = "Exception provisioning machine: {}".format(ex)
854 self.log.error(msg)
855 raise N2VCException(message=msg)
856
857 params.jobs = ['JobHostUnits']
858
859 connection = model.connection()
860
861 # Submit the request.
862 self.debug("Adding machine to model")
863 client_facade = client.ClientFacade.from_connection(connection)
864 results = await client_facade.AddMachines(params=[params])
865 error = results.machines[0].error
866 if error:
867 msg = "Error adding machine: {}}".format(error.message)
868 self.error(msg=msg)
869 raise ValueError(msg)
870
871 machine_id = results.machines[0].machine
872
873 # Need to run this after AddMachines has been called,
874 # as we need the machine_id
875 self.debug("Installing Juju agent into machine {}".format(machine_id))
876 asyncio.ensure_future(provisioner.install_agent(
877 connection=connection,
878 nonce=params.nonce,
879 machine_id=machine_id,
880 api=self.api_proxy,
881 ))
882
883 # wait for machine in model (now, machine is not yet in model, so we must wait for it)
884 machine = None
885 for i in range(10):
886 machine_list = await model.get_machines()
887 if machine_id in machine_list:
888 self.debug('Machine {} found in model!'.format(machine_id))
889 machine = model.machines.get(machine_id)
890 break
891 await asyncio.sleep(2)
892
893 if machine is None:
894 msg = 'Machine {} not found in model'.format(machine_id)
895 self.error(msg=msg)
896 raise Exception(msg)
897
898 # register machine with observer
899 observer.register_machine(machine=machine, db_dict=db_dict)
900
901 # wait for machine creation
902 self.debug('waiting for provision finishes... {}'.format(machine_id))
903 await observer.wait_for_machine(
904 machine_id=machine_id,
905 progress_timeout=progress_timeout,
906 total_timeout=total_timeout
907 )
908
909 self.debug("Machine provisioned {}".format(machine_id))
910
911 return machine_id
912
913 async def _juju_deploy_charm(
914 self,
915 model_name: str,
916 application_name: str,
917 charm_path: str,
918 machine_id: str,
919 db_dict: dict,
920 progress_timeout: float = None,
921 total_timeout: float = None
922 ) -> (Application, int):
923
924 # get juju model and observer
925 model = await self._juju_get_model(model_name=model_name)
926 observer = self.juju_observers[model_name]
927
928 # check if application already exists
929 application = None
930 if application_name in model.applications:
931 application = model.applications[application_name]
932
933 if application is None:
934
935 # application does not exist, create it and wait for it
936 self.debug('deploying application {} to machine {}, model {}'
937 .format(application_name, machine_id, model_name))
938 self.debug('charm: {}'.format(charm_path))
939 series = 'xenial'
940 # series = None
941 application = await model.deploy(
942 entity_url=charm_path,
943 application_name=application_name,
944 channel='stable',
945 num_units=1,
946 series=series,
947 to=machine_id
948 )
949
950 # register application with observer
951 observer.register_application(application=application, db_dict=db_dict)
952
953 self.debug('waiting for application deployed... {}'.format(application.entity_id))
954 retries = await observer.wait_for_application(
955 application_id=application.entity_id,
956 progress_timeout=progress_timeout,
957 total_timeout=total_timeout)
958 self.debug('application deployed')
959
960 else:
961
962 # register application with observer
963 observer.register_application(application=application, db_dict=db_dict)
964
965 # application already exists, but not finalised
966 self.debug('application already exists, waiting for deployed...')
967 retries = await observer.wait_for_application(
968 application_id=application.entity_id,
969 progress_timeout=progress_timeout,
970 total_timeout=total_timeout)
971 self.debug('application deployed')
972
973 return application, retries
974
975 async def _juju_execute_action(
976 self,
977 model_name: str,
978 application_name: str,
979 action_name: str,
980 db_dict: dict,
981 progress_timeout: float = None,
982 total_timeout: float = None,
983 **kwargs
984 ) -> Action:
985
986 # get juju model and observer
987 model = await self._juju_get_model(model_name=model_name)
988 observer = self.juju_observers[model_name]
989
990 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
991
992 unit = application.units[0]
993 if unit is not None:
994 actions = await application.get_actions()
995 if action_name in actions:
996 self.debug('executing action "{}" using params: {}'.format(action_name, kwargs))
997 action = await unit.run_action(action_name, **kwargs)
998
999 # register action with observer
1000 observer.register_action(action=action, db_dict=db_dict)
1001
1002 await observer.wait_for_action(
1003 action_id=action.entity_id,
1004 progress_timeout=progress_timeout,
1005 total_timeout=total_timeout)
1006 self.debug('action completed with status: {}'.format(action.status))
1007 output = await model.get_action_output(action_uuid=action.entity_id)
1008 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1009 if action.entity_id in status:
1010 status = status[action.entity_id]
1011 else:
1012 status = 'failed'
1013 return output, status
1014
1015 raise N2VCExecutionException(
1016 message='Cannot execute action on charm',
1017 primitive_name=action_name
1018 )
1019
1020 async def _juju_configure_application(
1021 self,
1022 model_name: str,
1023 application_name: str,
1024 config: dict,
1025 db_dict: dict,
1026 progress_timeout: float = None,
1027 total_timeout: float = None
1028 ):
1029
1030 # get the application
1031 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
1032
1033 self.debug('configuring the application {} -> {}'.format(application_name, config))
1034 res = await application.set_config(config)
1035 self.debug('application {} configured. res={}'.format(application_name, res))
1036
1037 # Verify the config is set
1038 new_conf = await application.get_config()
1039 for key in config:
1040 value = new_conf[key]['value']
1041 self.debug(' {} = {}'.format(key, value))
1042 if config[key] != value:
1043 raise N2VCException(
1044 message='key {} is not configured correctly {} != {}'.format(key, config[key], new_conf[key])
1045 )
1046
1047 # check if 'verify-ssh-credentials' action exists
1048 # unit = application.units[0]
1049 actions = await application.get_actions()
1050 if 'verify-ssh-credentials' not in actions:
1051 msg = 'Action verify-ssh-credentials does not exist in application {}'.format(application_name)
1052 self.debug(msg=msg)
1053 return False
1054
1055 # execute verify-credentials
1056 num_retries = 20
1057 retry_timeout = 15.0
1058 for i in range(num_retries):
1059 try:
1060 self.debug('Executing action verify-ssh-credentials...')
1061 output, ok = await self._juju_execute_action(
1062 model_name=model_name,
1063 application_name=application_name,
1064 action_name='verify-ssh-credentials',
1065 db_dict=db_dict,
1066 progress_timeout=progress_timeout,
1067 total_timeout=total_timeout
1068 )
1069 self.debug('Result: {}, output: {}'.format(ok, output))
1070 return True
1071 except Exception as e:
1072 self.debug('Error executing verify-ssh-credentials: {}. Retrying...'.format(e))
1073 await asyncio.sleep(retry_timeout)
1074 else:
1075 self.error('Error executing verify-ssh-credentials after {} retries. '.format(num_retries))
1076 return False
1077
1078 async def _juju_get_application(
1079 self,
1080 model_name: str,
1081 application_name: str
1082 ):
1083 """Get the deployed application."""
1084
1085 model = await self._juju_get_model(model_name=model_name)
1086
1087 application_name = N2VCJujuConnector._format_app_name(application_name)
1088
1089 if model.applications and application_name in model.applications:
1090 return model.applications[application_name]
1091 else:
1092 raise N2VCException(message='Cannot get application {} from model {}'.format(application_name, model_name))
1093
1094 async def _juju_get_model(self, model_name: str) -> Model:
1095 """ Get a model object from juju controller
1096
1097 :param str model_name: name of the model
1098 :returns Model: model obtained from juju controller or Exception
1099 """
1100
1101 # format model name
1102 model_name = N2VCJujuConnector._format_model_name(model_name)
1103
1104 if model_name in self.juju_models:
1105 return self.juju_models[model_name]
1106
1107 if self._creating_model:
1108 self.debug('Another coroutine is creating a model. Wait...')
1109 while self._creating_model:
1110 # another coroutine is creating a model, wait
1111 await asyncio.sleep(0.1)
1112 # retry (perhaps another coroutine has created the model meanwhile)
1113 if model_name in self.juju_models:
1114 return self.juju_models[model_name]
1115
1116 try:
1117 self._creating_model = True
1118
1119 # get juju model names from juju
1120 model_list = await self.controller.list_models()
1121
1122 if model_name not in model_list:
1123 self.info('Model {} does not exist. Creating new model...'.format(model_name))
1124 model = await self.controller.add_model(
1125 model_name=model_name,
1126 config={'authorized-keys': self.public_key}
1127 )
1128 self.info('New model created, name={}'.format(model_name))
1129 else:
1130 self.debug('Model already exists in juju. Getting model {}'.format(model_name))
1131 model = await self.controller.get_model(model_name)
1132 self.debug('Existing model in juju, name={}'.format(model_name))
1133
1134 self.juju_models[model_name] = model
1135 self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model)
1136 return model
1137
1138 except Exception as e:
1139 msg = 'Cannot get model {}. Exception: {}'.format(model_name, e)
1140 self.error(msg)
1141 raise N2VCException(msg)
1142 finally:
1143 self._creating_model = False
1144
1145 async def _juju_add_relation(
1146 self,
1147 model_name: str,
1148 application_name_1: str,
1149 application_name_2: str,
1150 relation_1: str,
1151 relation_2: str
1152 ):
1153
1154 self.debug('adding relation')
1155
1156 # get juju model and observer
1157 model = await self._juju_get_model(model_name=model_name)
1158
1159 r1 = '{}:{}'.format(application_name_1, relation_1)
1160 r2 = '{}:{}'.format(application_name_2, relation_2)
1161 await model.add_relation(relation1=r1, relation2=r2)
1162
1163 async def _juju_destroy_application(
1164 self,
1165 model_name: str,
1166 application_name: str
1167 ):
1168
1169 self.debug('Destroying application {} in model {}'.format(application_name, model_name))
1170
1171 # get juju model and observer
1172 model = await self._juju_get_model(model_name=model_name)
1173
1174 application = model.applications.get(application_name)
1175 if application:
1176 await application.destroy()
1177 else:
1178 self.debug('Application not found: {}'.format(application_name))
1179
1180 async def _juju_destroy_machine(
1181 self,
1182 model_name: str,
1183 machine_id: str,
1184 total_timeout: float = None
1185 ):
1186
1187 self.debug('Destroying machine {} in model {}'.format(machine_id, model_name))
1188
1189 if total_timeout is None:
1190 total_timeout = 3600
1191
1192 # get juju model and observer
1193 model = await self._juju_get_model(model_name=model_name)
1194
1195 machines = await model.get_machines()
1196 if machine_id in machines:
1197 machine = model.machines[machine_id]
1198 await machine.destroy(force=True)
1199 # max timeout
1200 end = time.time() + total_timeout
1201 # wait for machine removal
1202 machines = await model.get_machines()
1203 while machine_id in machines and time.time() < end:
1204 self.debug('Waiting for machine {} is destroyed'.format(machine_id))
1205 await asyncio.sleep(0.5)
1206 machines = await model.get_machines()
1207 self.debug('Machine destroyed: {}'.format(machine_id))
1208 else:
1209 self.debug('Machine not found: {}'.format(machine_id))
1210
1211 async def _juju_destroy_model(
1212 self,
1213 model_name: str,
1214 total_timeout: float = None
1215 ):
1216
1217 self.debug('Destroying model {}'.format(model_name))
1218
1219 if total_timeout is None:
1220 total_timeout = 3600
1221
1222 model = await self._juju_get_model(model_name=model_name)
1223 uuid = model.info.uuid
1224
1225 await self._juju_disconnect_model(model_name=model_name)
1226 self.juju_models[model_name] = None
1227 self.juju_observers[model_name] = None
1228
1229 self.debug('destroying model {}...'.format(model_name))
1230 await self.controller.destroy_model(uuid)
1231 self.debug('model destroy requested {}'.format(model_name))
1232
1233 # wait for model is completely destroyed
1234 end = time.time() + total_timeout
1235 while time.time() < end:
1236 self.debug('Waiting for model is destroyed...')
1237 try:
1238 # await self.controller.get_model(uuid)
1239 models = await self.controller.list_models()
1240 if model_name not in models:
1241 self.debug('The model {} ({}) was destroyed'.format(model_name, uuid))
1242 return
1243 except Exception as e:
1244 pass
1245 await asyncio.sleep(1.0)
1246
1247 async def _juju_login(self):
1248 """Connect to juju controller
1249
1250 """
1251
1252 # if already authenticated, exit function
1253 if self._authenticated:
1254 return
1255
1256 # if connecting, wait for finish
1257 # another task could be trying to connect in parallel
1258 while self._connecting:
1259 await asyncio.sleep(0.1)
1260
1261 # double check after other task has finished
1262 if self._authenticated:
1263 return
1264
1265 try:
1266 self._connecting = True
1267 self.info(
1268 'connecting to juju controller: {} {}:{} ca_cert: {}'
1269 .format(self.url, self.username, self.secret, '\n'+self.ca_cert if self.ca_cert else 'None'))
1270
1271 # Create controller object
1272 self.controller = Controller(loop=self.loop)
1273 # Connect to controller
1274 await self.controller.connect(
1275 endpoint=self.url,
1276 username=self.username,
1277 password=self.secret,
1278 cacert=self.ca_cert
1279 )
1280 self._authenticated = True
1281 self.info('juju controller connected')
1282 except Exception as e:
1283 message = 'Exception connecting to juju: {}'.format(e)
1284 self.error(message)
1285 raise N2VCConnectionException(
1286 message=message,
1287 url=self.url
1288 )
1289 finally:
1290 self._connecting = False
1291
1292 async def _juju_logout(self):
1293 """Logout of the Juju controller."""
1294 if not self._authenticated:
1295 return False
1296
1297 # disconnect all models
1298 for model_name in self.juju_models:
1299 try:
1300 await self._juju_disconnect_model(model_name)
1301 except Exception as e:
1302 self.error('Error disconnecting model {} : {}'.format(model_name, e))
1303 # continue with next model...
1304
1305 self.info("Disconnecting controller")
1306 try:
1307 await self.controller.disconnect()
1308 except Exception as e:
1309 raise N2VCConnectionException(message='Error disconnecting controller: {}'.format(e), url=self.url)
1310
1311 self.controller = None
1312 self._authenticated = False
1313 self.info('disconnected')
1314
1315 async def _juju_disconnect_model(
1316 self,
1317 model_name: str
1318 ):
1319 self.debug("Disconnecting model {}".format(model_name))
1320 if model_name in self.juju_models:
1321 await self.juju_models[model_name].disconnect()
1322 self.juju_models[model_name] = None
1323 self.juju_observers[model_name] = None
1324 else:
1325 self.warning('Cannot disconnect model: {}'.format(model_name))
1326
1327 def _create_juju_public_key(self):
1328 """Recreate the Juju public key on lcm container, if needed
1329 Certain libjuju commands expect to be run from the same machine as Juju
1330 is bootstrapped to. This method will write the public key to disk in
1331 that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
1332 """
1333
1334 # Make sure that we have a public key before writing to disk
1335 if self.public_key is None or len(self.public_key) == 0:
1336 if 'OSMLCM_VCA_PUBKEY' in os.environ:
1337 self.public_key = os.getenv('OSMLCM_VCA_PUBKEY', '')
1338 if len(self.public_key) == 0:
1339 return
1340 else:
1341 return
1342
1343 pk_path = "{}/.local/share/juju/ssh".format(os.path.expanduser('~'))
1344 file_path = "{}/juju_id_rsa.pub".format(pk_path)
1345 self.debug('writing juju public key to file:\n{}\npublic key: {}'.format(file_path, self.public_key))
1346 if not os.path.exists(pk_path):
1347 # create path and write file
1348 os.makedirs(pk_path)
1349 with open(file_path, 'w') as f:
1350 self.debug('Creating juju public key file: {}'.format(file_path))
1351 f.write(self.public_key)
1352 else:
1353 self.debug('juju public key file already exists: {}'.format(file_path))
1354
1355 @staticmethod
1356 def _format_model_name(name: str) -> str:
1357 """Format the name of the model.
1358
1359 Model names may only contain lowercase letters, digits and hyphens
1360 """
1361
1362 return name.replace('_', '-').replace(' ', '-').lower()
1363
1364 @staticmethod
1365 def _format_app_name(name: str) -> str:
1366 """Format the name of the application (in order to assure valid application name).
1367
1368 Application names have restrictions (run juju deploy --help):
1369 - contains lowercase letters 'a'-'z'
1370 - contains numbers '0'-'9'
1371 - contains hyphens '-'
1372 - starts with a lowercase letter
1373 - not two or more consecutive hyphens
1374 - after a hyphen, not a group with all numbers
1375 """
1376
1377 def all_numbers(s: str) -> bool:
1378 for c in s:
1379 if not c.isdigit():
1380 return False
1381 return True
1382
1383 new_name = name.replace('_', '-')
1384 new_name = new_name.replace(' ', '-')
1385 new_name = new_name.lower()
1386 while new_name.find('--') >= 0:
1387 new_name = new_name.replace('--', '-')
1388 groups = new_name.split('-')
1389
1390 # find 'all numbers' groups and prefix them with a letter
1391 app_name = ''
1392 for i in range(len(groups)):
1393 group = groups[i]
1394 if all_numbers(group):
1395 group = 'z' + group
1396 if i > 0:
1397 app_name += '-'
1398 app_name += group
1399
1400 if app_name[0].isdigit():
1401 app_name = 'z' + app_name
1402
1403 return app_name