Fix 1015. Remove non used paramiko dependencies at k8s_helm_conn
[osm/N2VC.git] / n2vc / n2vc_juju_conn.py
1 ##
2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
4 # All Rights Reserved.
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 #
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
21 ##
22
23 import logging
24 import os
25 import asyncio
26 import time
27 import base64
28 import binascii
29 import re
30
31 from n2vc.n2vc_conn import N2VCConnector
32 from n2vc.n2vc_conn import obj_to_dict, obj_to_yaml
33 from n2vc.exceptions \
34 import N2VCBadArgumentsException, N2VCException, N2VCConnectionException, \
35 N2VCExecutionException, N2VCInvalidCertificate
36 from n2vc.juju_observer import JujuModelObserver
37
38 from juju.controller import Controller
39 from juju.model import Model
40 from juju.application import Application
41 from juju.action import Action
42 from juju.machine import Machine
43 from juju.client import client
44 from juju.errors import JujuAPIError
45
46 from n2vc.provisioner import SSHProvisioner
47
48
49 class N2VCJujuConnector(N2VCConnector):
50
51 """
52 ##################################################################################################
53 ########################################## P U B L I C ###########################################
54 ##################################################################################################
55 """
56
57 def __init__(
58 self,
59 db: object,
60 fs: object,
61 log: object = None,
62 loop: object = None,
63 url: str = '127.0.0.1:17070',
64 username: str = 'admin',
65 vca_config: dict = None,
66 on_update_db=None
67 ):
68 """Initialize juju N2VC connector
69 """
70
71 # parent class constructor
72 N2VCConnector.__init__(
73 self,
74 db=db,
75 fs=fs,
76 log=log,
77 loop=loop,
78 url=url,
79 username=username,
80 vca_config=vca_config,
81 on_update_db=on_update_db
82 )
83
84 # silence websocket traffic log
85 logging.getLogger('websockets.protocol').setLevel(logging.INFO)
86 logging.getLogger('juju.client.connection').setLevel(logging.WARN)
87 logging.getLogger('model').setLevel(logging.WARN)
88
89 self.info('Initializing N2VC juju connector...')
90
91 """
92 ##############################################################
93 # check arguments
94 ##############################################################
95 """
96
97 # juju URL
98 if url is None:
99 raise N2VCBadArgumentsException('Argument url is mandatory', ['url'])
100 url_parts = url.split(':')
101 if len(url_parts) != 2:
102 raise N2VCBadArgumentsException('Argument url: bad format (localhost:port) -> {}'.format(url), ['url'])
103 self.hostname = url_parts[0]
104 try:
105 self.port = int(url_parts[1])
106 except ValueError:
107 raise N2VCBadArgumentsException('url port must be a number -> {}'.format(url), ['url'])
108
109 # juju USERNAME
110 if username is None:
111 raise N2VCBadArgumentsException('Argument username is mandatory', ['username'])
112
113 # juju CONFIGURATION
114 if vca_config is None:
115 raise N2VCBadArgumentsException('Argument vca_config is mandatory', ['vca_config'])
116
117 if 'secret' in vca_config:
118 self.secret = vca_config['secret']
119 else:
120 raise N2VCBadArgumentsException('Argument vca_config.secret is mandatory', ['vca_config.secret'])
121
122 # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub
123 # if exists, it will be written in lcm container: _create_juju_public_key()
124 if 'public_key' in vca_config:
125 self.public_key = vca_config['public_key']
126 else:
127 self.public_key = None
128
129 # TODO: Verify ca_cert is valid before using. VCA will crash
130 # if the ca_cert isn't formatted correctly.
131 def base64_to_cacert(b64string):
132 """Convert the base64-encoded string containing the VCA CACERT.
133
134 The input string....
135
136 """
137 try:
138 cacert = base64.b64decode(b64string).decode("utf-8")
139
140 cacert = re.sub(
141 r'\\n',
142 r'\n',
143 cacert,
144 )
145 except binascii.Error as e:
146 self.debug("Caught binascii.Error: {}".format(e))
147 raise N2VCInvalidCertificate(message="Invalid CA Certificate")
148
149 return cacert
150
151 self.ca_cert = vca_config.get('ca_cert')
152 if self.ca_cert:
153 self.ca_cert = base64_to_cacert(vca_config['ca_cert'])
154
155 if 'api_proxy' in vca_config:
156 self.api_proxy = vca_config['api_proxy']
157 self.debug('api_proxy for native charms configured: {}'.format(self.api_proxy))
158 else:
159 self.warning('api_proxy is not configured. Support for native charms is disabled')
160
161 if 'enable_os_upgrade' in vca_config:
162 self.enable_os_upgrade = vca_config['enable_os_upgrade']
163 else:
164 self.enable_os_upgrade = True
165
166 if 'apt_mirror' in vca_config:
167 self.apt_mirror = vca_config['apt_mirror']
168 else:
169 self.apt_mirror = None
170
171 self.debug('Arguments have been checked')
172
173 # juju data
174 self.controller = None # it will be filled when connect to juju
175 self.juju_models = {} # model objects for every model_name
176 self.juju_observers = {} # model observers for every model_name
177 self._connecting = False # while connecting to juju (to avoid duplicate connections)
178 self._authenticated = False # it will be True when juju connection be stablished
179 self._creating_model = False # True during model creation
180
181 # create juju pub key file in lcm container at ./local/share/juju/ssh/juju_id_rsa.pub
182 self._create_juju_public_key()
183
184 self.info('N2VC juju connector initialized')
185
186 async def get_status(self, namespace: str, yaml_format: bool = True):
187
188 # self.info('Getting NS status. namespace: {}'.format(namespace))
189
190 if not self._authenticated:
191 await self._juju_login()
192
193 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
194 # model name is ns_id
195 model_name = ns_id
196 if model_name is None:
197 msg = 'Namespace {} not valid'.format(namespace)
198 self.error(msg)
199 raise N2VCBadArgumentsException(msg, ['namespace'])
200
201 # get juju model (create model if needed)
202 model = await self._juju_get_model(model_name=model_name)
203
204 status = await model.get_status()
205
206 if yaml_format:
207 return obj_to_yaml(status)
208 else:
209 return obj_to_dict(status)
210
211 async def create_execution_environment(
212 self,
213 namespace: str,
214 db_dict: dict,
215 reuse_ee_id: str = None,
216 progress_timeout: float = None,
217 total_timeout: float = None
218 ) -> (str, dict):
219
220 self.info('Creating execution environment. namespace: {}, reuse_ee_id: {}'.format(namespace, reuse_ee_id))
221
222 if not self._authenticated:
223 await self._juju_login()
224
225 machine_id = None
226 if reuse_ee_id:
227 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=reuse_ee_id)
228 else:
229 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
230 # model name is ns_id
231 model_name = ns_id
232 # application name
233 application_name = self._get_application_name(namespace=namespace)
234
235 self.debug('model name: {}, application name: {}, machine_id: {}'
236 .format(model_name, application_name, machine_id))
237
238 # create or reuse a new juju machine
239 try:
240 machine = await self._juju_create_machine(
241 model_name=model_name,
242 application_name=application_name,
243 machine_id=machine_id,
244 db_dict=db_dict,
245 progress_timeout=progress_timeout,
246 total_timeout=total_timeout
247 )
248 except Exception as e:
249 message = 'Error creating machine on juju: {}'.format(e)
250 self.error(message)
251 raise N2VCException(message=message)
252
253 # id for the execution environment
254 ee_id = N2VCJujuConnector._build_ee_id(
255 model_name=model_name,
256 application_name=application_name,
257 machine_id=str(machine.entity_id)
258 )
259 self.debug('ee_id: {}'.format(ee_id))
260
261 # new machine credentials
262 credentials = dict()
263 credentials['hostname'] = machine.dns_name
264
265 self.info('Execution environment created. ee_id: {}, credentials: {}'.format(ee_id, credentials))
266
267 return ee_id, credentials
268
269 async def register_execution_environment(
270 self,
271 namespace: str,
272 credentials: dict,
273 db_dict: dict,
274 progress_timeout: float = None,
275 total_timeout: float = None
276 ) -> str:
277
278 if not self._authenticated:
279 await self._juju_login()
280
281 self.info('Registering execution environment. namespace={}, credentials={}'.format(namespace, credentials))
282
283 if credentials is None:
284 raise N2VCBadArgumentsException(message='credentials are mandatory', bad_args=['credentials'])
285 if credentials.get('hostname'):
286 hostname = credentials['hostname']
287 else:
288 raise N2VCBadArgumentsException(message='hostname is mandatory', bad_args=['credentials.hostname'])
289 if credentials.get('username'):
290 username = credentials['username']
291 else:
292 raise N2VCBadArgumentsException(message='username is mandatory', bad_args=['credentials.username'])
293 if 'private_key_path' in credentials:
294 private_key_path = credentials['private_key_path']
295 else:
296 # if not passed as argument, use generated private key path
297 private_key_path = self.private_key_path
298
299 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
300
301 # model name
302 model_name = ns_id
303 # application name
304 application_name = self._get_application_name(namespace=namespace)
305
306 # register machine on juju
307 try:
308 machine_id = await self._juju_provision_machine(
309 model_name=model_name,
310 hostname=hostname,
311 username=username,
312 private_key_path=private_key_path,
313 db_dict=db_dict,
314 progress_timeout=progress_timeout,
315 total_timeout=total_timeout
316 )
317 except Exception as e:
318 self.error('Error registering machine: {}'.format(e))
319 raise N2VCException(message='Error registering machine on juju: {}'.format(e))
320
321 self.info('Machine registered: {}'.format(machine_id))
322
323 # id for the execution environment
324 ee_id = N2VCJujuConnector._build_ee_id(
325 model_name=model_name,
326 application_name=application_name,
327 machine_id=str(machine_id)
328 )
329
330 self.info('Execution environment registered. ee_id: {}'.format(ee_id))
331
332 return ee_id
333
334 async def install_configuration_sw(
335 self,
336 ee_id: str,
337 artifact_path: str,
338 db_dict: dict,
339 progress_timeout: float = None,
340 total_timeout: float = None
341 ):
342
343 self.info('Installing configuration sw on ee_id: {}, artifact path: {}, db_dict: {}'
344 .format(ee_id, artifact_path, db_dict))
345
346 if not self._authenticated:
347 await self._juju_login()
348
349 # check arguments
350 if ee_id is None or len(ee_id) == 0:
351 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
352 if artifact_path is None or len(artifact_path) == 0:
353 raise N2VCBadArgumentsException(message='artifact_path is mandatory', bad_args=['artifact_path'])
354 if db_dict is None:
355 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
356
357 try:
358 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
359 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
360 except Exception as e:
361 raise N2VCBadArgumentsException(
362 message='ee_id={} is not a valid execution environment id'.format(ee_id),
363 bad_args=['ee_id']
364 )
365
366 # remove // in charm path
367 while artifact_path.find('//') >= 0:
368 artifact_path = artifact_path.replace('//', '/')
369
370 # check charm path
371 if not self.fs.file_exists(artifact_path, mode="dir"):
372 msg = 'artifact path does not exist: {}'.format(artifact_path)
373 raise N2VCBadArgumentsException(message=msg, bad_args=['artifact_path'])
374
375 if artifact_path.startswith('/'):
376 full_path = self.fs.path + artifact_path
377 else:
378 full_path = self.fs.path + '/' + artifact_path
379
380 try:
381 application, retries = await self._juju_deploy_charm(
382 model_name=model_name,
383 application_name=application_name,
384 charm_path=full_path,
385 machine_id=machine_id,
386 db_dict=db_dict,
387 progress_timeout=progress_timeout,
388 total_timeout=total_timeout
389 )
390 except Exception as e:
391 raise N2VCException(message='Error desploying charm into ee={} : {}'.format(ee_id, e))
392
393 self.info('Configuration sw installed')
394
395 async def get_ee_ssh_public__key(
396 self,
397 ee_id: str,
398 db_dict: dict,
399 progress_timeout: float = None,
400 total_timeout: float = None
401 ) -> str:
402
403 self.info('Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}'.format(ee_id, db_dict))
404
405 if not self._authenticated:
406 await self._juju_login()
407
408 # check arguments
409 if ee_id is None or len(ee_id) == 0:
410 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
411 if db_dict is None:
412 raise N2VCBadArgumentsException(message='db_dict is mandatory', bad_args=['db_dict'])
413
414 try:
415 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
416 self.debug('model: {}, application: {}, machine: {}'.format(model_name, application_name, machine_id))
417 except Exception as e:
418 raise N2VCBadArgumentsException(
419 message='ee_id={} is not a valid execution environment id'.format(ee_id),
420 bad_args=['ee_id']
421 )
422
423 # try to execute ssh layer primitives (if exist):
424 # generate-ssh-key
425 # get-ssh-public-key
426
427 output = None
428
429 # execute action: generate-ssh-key
430 try:
431 output, status = await self._juju_execute_action(
432 model_name=model_name,
433 application_name=application_name,
434 action_name='generate-ssh-key',
435 db_dict=db_dict,
436 progress_timeout=progress_timeout,
437 total_timeout=total_timeout
438 )
439 except Exception as e:
440 self.info('Cannot execute action generate-ssh-key: {}\nContinuing...'.format(e))
441
442 # execute action: get-ssh-public-key
443 try:
444 output, status = await self._juju_execute_action(
445 model_name=model_name,
446 application_name=application_name,
447 action_name='get-ssh-public-key',
448 db_dict=db_dict,
449 progress_timeout=progress_timeout,
450 total_timeout=total_timeout
451 )
452 except Exception as e:
453 msg = 'Cannot execute action get-ssh-public-key: {}\n'.format(e)
454 self.info(msg)
455 raise e
456
457 # return public key if exists
458 return output["pubkey"] if "pubkey" in output else output
459
460 async def add_relation(
461 self,
462 ee_id_1: str,
463 ee_id_2: str,
464 endpoint_1: str,
465 endpoint_2: str
466 ):
467
468 self.debug('adding new relation between {} and {}, endpoints: {}, {}'
469 .format(ee_id_1, ee_id_2, endpoint_1, endpoint_2))
470
471 # check arguments
472 if not ee_id_1:
473 message = 'EE 1 is mandatory'
474 self.error(message)
475 raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_1'])
476 if not ee_id_2:
477 message = 'EE 2 is mandatory'
478 self.error(message)
479 raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_2'])
480 if not endpoint_1:
481 message = 'endpoint 1 is mandatory'
482 self.error(message)
483 raise N2VCBadArgumentsException(message=message, bad_args=['endpoint_1'])
484 if not endpoint_2:
485 message = 'endpoint 2 is mandatory'
486 self.error(message)
487 raise N2VCBadArgumentsException(message=message, bad_args=['endpoint_2'])
488
489 if not self._authenticated:
490 await self._juju_login()
491
492 # get the model, the applications and the machines from the ee_id's
493 model_1, app_1, machine_1 = self._get_ee_id_components(ee_id_1)
494 model_2, app_2, machine_2 = self._get_ee_id_components(ee_id_2)
495
496 # model must be the same
497 if model_1 != model_2:
498 message = 'EE models are not the same: {} vs {}'.format(ee_id_1, ee_id_2)
499 self.error(message)
500 raise N2VCBadArgumentsException(message=message, bad_args=['ee_id_1', 'ee_id_2'])
501
502 # add juju relations between two applications
503 try:
504 await self._juju_add_relation(
505 model_name=model_1,
506 application_name_1=app_1,
507 application_name_2=app_2,
508 relation_1=endpoint_1,
509 relation_2=endpoint_2
510 )
511 except Exception as e:
512 message = 'Error adding relation between {} and {}'.format(ee_id_1, ee_id_2)
513 self.error(message)
514 raise N2VCException(message=message)
515
516 async def remove_relation(
517 self
518 ):
519 if not self._authenticated:
520 await self._juju_login()
521 # TODO
522 self.info('Method not implemented yet')
523 raise NotImplemented()
524
525 async def deregister_execution_environments(
526 self
527 ):
528 if not self._authenticated:
529 await self._juju_login()
530 # TODO
531 self.info('Method not implemented yet')
532 raise NotImplemented()
533
534 async def delete_namespace(
535 self,
536 namespace: str,
537 db_dict: dict = None,
538 total_timeout: float = None
539 ):
540 self.info('Deleting namespace={}'.format(namespace))
541
542 if not self._authenticated:
543 await self._juju_login()
544
545 # check arguments
546 if namespace is None:
547 raise N2VCBadArgumentsException(message='namespace is mandatory', bad_args=['namespace'])
548
549 nsi_id, ns_id, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
550 if ns_id is not None:
551 try:
552 await self._juju_destroy_model(
553 model_name=ns_id,
554 total_timeout=total_timeout
555 )
556 except Exception as e:
557 raise N2VCException(message='Error deleting namespace {} : {}'.format(namespace, e))
558 else:
559 raise N2VCBadArgumentsException(message='only ns_id is permitted to delete yet', bad_args=['namespace'])
560
561 self.info('Namespace {} deleted'.format(namespace))
562
563 async def delete_execution_environment(
564 self,
565 ee_id: str,
566 db_dict: dict = None,
567 total_timeout: float = None
568 ):
569 self.info('Deleting execution environment ee_id={}'.format(ee_id))
570
571 if not self._authenticated:
572 await self._juju_login()
573
574 # check arguments
575 if ee_id is None:
576 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
577
578 model_name, application_name, machine_id = self._get_ee_id_components(ee_id=ee_id)
579
580 # destroy the application
581 try:
582 await self._juju_destroy_application(model_name=model_name, application_name=application_name)
583 except Exception as e:
584 raise N2VCException(message='Error deleting execution environment {} (application {}) : {}'
585 .format(ee_id, application_name, e))
586
587 # destroy the machine
588 try:
589 await self._juju_destroy_machine(
590 model_name=model_name,
591 machine_id=machine_id,
592 total_timeout=total_timeout
593 )
594 except Exception as e:
595 raise N2VCException(message='Error deleting execution environment {} (machine {}) : {}'
596 .format(ee_id, machine_id, e))
597
598 self.info('Execution environment {} deleted'.format(ee_id))
599
600 async def exec_primitive(
601 self,
602 ee_id: str,
603 primitive_name: str,
604 params_dict: dict,
605 db_dict: dict = None,
606 progress_timeout: float = None,
607 total_timeout: float = None
608 ) -> str:
609
610 self.info('Executing primitive: {} on ee: {}, params: {}'.format(primitive_name, ee_id, params_dict))
611
612 if not self._authenticated:
613 await self._juju_login()
614
615 # check arguments
616 if ee_id is None or len(ee_id) == 0:
617 raise N2VCBadArgumentsException(message='ee_id is mandatory', bad_args=['ee_id'])
618 if primitive_name is None or len(primitive_name) == 0:
619 raise N2VCBadArgumentsException(message='action_name is mandatory', bad_args=['action_name'])
620 if params_dict is None:
621 params_dict = dict()
622
623 try:
624 model_name, application_name, machine_id = N2VCJujuConnector._get_ee_id_components(ee_id=ee_id)
625 except Exception:
626 raise N2VCBadArgumentsException(
627 message='ee_id={} is not a valid execution environment id'.format(ee_id),
628 bad_args=['ee_id']
629 )
630
631 if primitive_name == 'config':
632 # Special case: config primitive
633 try:
634 await self._juju_configure_application(
635 model_name=model_name,
636 application_name=application_name,
637 config=params_dict,
638 db_dict=db_dict,
639 progress_timeout=progress_timeout,
640 total_timeout=total_timeout
641 )
642 except Exception as e:
643 self.error('Error configuring juju application: {}'.format(e))
644 raise N2VCExecutionException(
645 message='Error configuring application into ee={} : {}'.format(ee_id, e),
646 primitive_name=primitive_name
647 )
648 return 'CONFIG OK'
649 else:
650 try:
651 output, status = await self._juju_execute_action(
652 model_name=model_name,
653 application_name=application_name,
654 action_name=primitive_name,
655 db_dict=db_dict,
656 progress_timeout=progress_timeout,
657 total_timeout=total_timeout,
658 **params_dict
659 )
660 if status == 'completed':
661 return output
662 else:
663 raise Exception('status is not completed: {}'.format(status))
664 except Exception as e:
665 self.error('Error executing primitive {}: {}'.format(primitive_name, e))
666 raise N2VCExecutionException(
667 message='Error executing primitive {} into ee={} : {}'.format(primitive_name, ee_id, e),
668 primitive_name=primitive_name
669 )
670
671 async def disconnect(self):
672 self.info('closing juju N2VC...')
673 await self._juju_logout()
674
675 """
676 ##################################################################################################
677 ########################################## P R I V A T E #########################################
678 ##################################################################################################
679 """
680
681 def _write_ee_id_db(
682 self,
683 db_dict: dict,
684 ee_id: str
685 ):
686
687 # write ee_id to database: _admin.deployed.VCA.x
688 try:
689 the_table = db_dict['collection']
690 the_filter = db_dict['filter']
691 the_path = db_dict['path']
692 if not the_path[-1] == '.':
693 the_path = the_path + '.'
694 update_dict = {the_path + 'ee_id': ee_id}
695 # self.debug('Writing ee_id to database: {}'.format(the_path))
696 self.db.set_one(
697 table=the_table,
698 q_filter=the_filter,
699 update_dict=update_dict,
700 fail_on_empty=True
701 )
702 except Exception as e:
703 self.error('Error writing ee_id to database: {}'.format(e))
704
705 @staticmethod
706 def _build_ee_id(
707 model_name: str,
708 application_name: str,
709 machine_id: str
710 ):
711 """
712 Build an execution environment id form model, application and machine
713 :param model_name:
714 :param application_name:
715 :param machine_id:
716 :return:
717 """
718 # id for the execution environment
719 return '{}.{}.{}'.format(model_name, application_name, machine_id)
720
721 @staticmethod
722 def _get_ee_id_components(
723 ee_id: str
724 ) -> (str, str, str):
725 """
726 Get model, application and machine components from an execution environment id
727 :param ee_id:
728 :return: model_name, application_name, machine_id
729 """
730
731 if ee_id is None:
732 return None, None, None
733
734 # split components of id
735 parts = ee_id.split('.')
736 model_name = parts[0]
737 application_name = parts[1]
738 machine_id = parts[2]
739 return model_name, application_name, machine_id
740
741 def _get_application_name(self, namespace: str) -> str:
742 """
743 Build application name from namespace
744 :param namespace:
745 :return: app-vnf-<vnf id>-vdu-<vdu-id>-cnt-<vdu-count>
746 """
747
748 # TODO: Enforce the Juju 50-character application limit
749
750 # split namespace components
751 _, _, vnf_id, vdu_id, vdu_count = self._get_namespace_components(namespace=namespace)
752
753 if vnf_id is None or len(vnf_id) == 0:
754 vnf_id = ''
755 else:
756 # Shorten the vnf_id to its last twelve characters
757 vnf_id = 'vnf-' + vnf_id[-12:]
758
759 if vdu_id is None or len(vdu_id) == 0:
760 vdu_id = ''
761 else:
762 # Shorten the vdu_id to its last twelve characters
763 vdu_id = '-vdu-' + vdu_id[-12:]
764
765 if vdu_count is None or len(vdu_count) == 0:
766 vdu_count = ''
767 else:
768 vdu_count = '-cnt-' + vdu_count
769
770 application_name = 'app-{}{}{}'.format(vnf_id, vdu_id, vdu_count)
771
772 return N2VCJujuConnector._format_app_name(application_name)
773
774 async def _juju_create_machine(
775 self,
776 model_name: str,
777 application_name: str,
778 machine_id: str = None,
779 db_dict: dict = None,
780 progress_timeout: float = None,
781 total_timeout: float = None
782 ) -> Machine:
783
784 self.debug('creating machine in model: {}, existing machine id: {}'.format(model_name, machine_id))
785
786 # get juju model and observer (create model if needed)
787 model = await self._juju_get_model(model_name=model_name)
788 observer = self.juju_observers[model_name]
789
790 # find machine id in model
791 machine = None
792 if machine_id is not None:
793 self.debug('Finding existing machine id {} in model'.format(machine_id))
794 # get juju existing machines in the model
795 existing_machines = await model.get_machines()
796 if machine_id in existing_machines:
797 self.debug('Machine id {} found in model (reusing it)'.format(machine_id))
798 machine = model.machines[machine_id]
799
800 if machine is None:
801 self.debug('Creating a new machine in juju...')
802 # machine does not exist, create it and wait for it
803 machine = await model.add_machine(
804 spec=None,
805 constraints=None,
806 disks=None,
807 series='xenial'
808 )
809
810 # register machine with observer
811 observer.register_machine(machine=machine, db_dict=db_dict)
812
813 # id for the execution environment
814 ee_id = N2VCJujuConnector._build_ee_id(
815 model_name=model_name,
816 application_name=application_name,
817 machine_id=str(machine.entity_id)
818 )
819
820 # write ee_id in database
821 self._write_ee_id_db(
822 db_dict=db_dict,
823 ee_id=ee_id
824 )
825
826 # wait for machine creation
827 await observer.wait_for_machine(
828 machine_id=str(machine.entity_id),
829 progress_timeout=progress_timeout,
830 total_timeout=total_timeout
831 )
832
833 else:
834
835 self.debug('Reusing old machine pending')
836
837 # register machine with observer
838 observer.register_machine(machine=machine, db_dict=db_dict)
839
840 # machine does exist, but it is in creation process (pending), wait for create finalisation
841 await observer.wait_for_machine(
842 machine_id=machine.entity_id,
843 progress_timeout=progress_timeout,
844 total_timeout=total_timeout)
845
846 self.debug("Machine ready at " + str(machine.dns_name))
847 return machine
848
849 async def _juju_provision_machine(
850 self,
851 model_name: str,
852 hostname: str,
853 username: str,
854 private_key_path: str,
855 db_dict: dict = None,
856 progress_timeout: float = None,
857 total_timeout: float = None
858 ) -> str:
859
860 if not self.api_proxy:
861 msg = 'Cannot provision machine: api_proxy is not defined'
862 self.error(msg=msg)
863 raise N2VCException(message=msg)
864
865 self.debug('provisioning machine. model: {}, hostname: {}, username: {}'.format(model_name, hostname, username))
866
867 if not self._authenticated:
868 await self._juju_login()
869
870 # get juju model and observer
871 model = await self._juju_get_model(model_name=model_name)
872 observer = self.juju_observers[model_name]
873
874 # TODO check if machine is already provisioned
875 machine_list = await model.get_machines()
876
877 provisioner = SSHProvisioner(
878 host=hostname,
879 user=username,
880 private_key_path=private_key_path,
881 log=self.log
882 )
883
884 params = None
885 try:
886 params = provisioner.provision_machine()
887 except Exception as ex:
888 msg = "Exception provisioning machine: {}".format(ex)
889 self.log.error(msg)
890 raise N2VCException(message=msg)
891
892 params.jobs = ['JobHostUnits']
893
894 connection = model.connection()
895
896 # Submit the request.
897 self.debug("Adding machine to model")
898 client_facade = client.ClientFacade.from_connection(connection)
899 results = await client_facade.AddMachines(params=[params])
900 error = results.machines[0].error
901 if error:
902 msg = "Error adding machine: {}}".format(error.message)
903 self.error(msg=msg)
904 raise ValueError(msg)
905
906 machine_id = results.machines[0].machine
907
908 # Need to run this after AddMachines has been called,
909 # as we need the machine_id
910 self.debug("Installing Juju agent into machine {}".format(machine_id))
911 asyncio.ensure_future(provisioner.install_agent(
912 connection=connection,
913 nonce=params.nonce,
914 machine_id=machine_id,
915 api=self.api_proxy,
916 ))
917
918 # wait for machine in model (now, machine is not yet in model, so we must wait for it)
919 machine = None
920 for i in range(10):
921 machine_list = await model.get_machines()
922 if machine_id in machine_list:
923 self.debug('Machine {} found in model!'.format(machine_id))
924 machine = model.machines.get(machine_id)
925 break
926 await asyncio.sleep(2)
927
928 if machine is None:
929 msg = 'Machine {} not found in model'.format(machine_id)
930 self.error(msg=msg)
931 raise Exception(msg)
932
933 # register machine with observer
934 observer.register_machine(machine=machine, db_dict=db_dict)
935
936 # wait for machine creation
937 self.debug('waiting for provision finishes... {}'.format(machine_id))
938 await observer.wait_for_machine(
939 machine_id=machine_id,
940 progress_timeout=progress_timeout,
941 total_timeout=total_timeout
942 )
943
944 self.debug("Machine provisioned {}".format(machine_id))
945
946 return machine_id
947
948 async def _juju_deploy_charm(
949 self,
950 model_name: str,
951 application_name: str,
952 charm_path: str,
953 machine_id: str,
954 db_dict: dict,
955 progress_timeout: float = None,
956 total_timeout: float = None
957 ) -> (Application, int):
958
959 # get juju model and observer
960 model = await self._juju_get_model(model_name=model_name)
961 observer = self.juju_observers[model_name]
962
963 # check if application already exists
964 application = None
965 if application_name in model.applications:
966 application = model.applications[application_name]
967
968 if application is None:
969
970 # application does not exist, create it and wait for it
971 self.debug('deploying application {} to machine {}, model {}'
972 .format(application_name, machine_id, model_name))
973 self.debug('charm: {}'.format(charm_path))
974 series = 'xenial'
975 # series = None
976 application = await model.deploy(
977 entity_url=charm_path,
978 application_name=application_name,
979 channel='stable',
980 num_units=1,
981 series=series,
982 to=machine_id
983 )
984
985 # register application with observer
986 observer.register_application(application=application, db_dict=db_dict)
987
988 self.debug('waiting for application deployed... {}'.format(application.entity_id))
989 retries = await observer.wait_for_application(
990 application_id=application.entity_id,
991 progress_timeout=progress_timeout,
992 total_timeout=total_timeout)
993 self.debug('application deployed')
994
995 else:
996
997 # register application with observer
998 observer.register_application(application=application, db_dict=db_dict)
999
1000 # application already exists, but not finalised
1001 self.debug('application already exists, waiting for deployed...')
1002 retries = await observer.wait_for_application(
1003 application_id=application.entity_id,
1004 progress_timeout=progress_timeout,
1005 total_timeout=total_timeout)
1006 self.debug('application deployed')
1007
1008 return application, retries
1009
1010 async def _juju_execute_action(
1011 self,
1012 model_name: str,
1013 application_name: str,
1014 action_name: str,
1015 db_dict: dict,
1016 progress_timeout: float = None,
1017 total_timeout: float = None,
1018 **kwargs
1019 ) -> Action:
1020
1021 # get juju model and observer
1022 model = await self._juju_get_model(model_name=model_name)
1023 observer = self.juju_observers[model_name]
1024
1025 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
1026
1027 unit = application.units[0]
1028 if unit is not None:
1029 actions = await application.get_actions()
1030 if action_name in actions:
1031 self.debug('executing action "{}" using params: {}'.format(action_name, kwargs))
1032 action = await unit.run_action(action_name, **kwargs)
1033
1034 # register action with observer
1035 observer.register_action(action=action, db_dict=db_dict)
1036
1037 await observer.wait_for_action(
1038 action_id=action.entity_id,
1039 progress_timeout=progress_timeout,
1040 total_timeout=total_timeout)
1041 self.debug('action completed with status: {}'.format(action.status))
1042 output = await model.get_action_output(action_uuid=action.entity_id)
1043 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
1044 if action.entity_id in status:
1045 status = status[action.entity_id]
1046 else:
1047 status = 'failed'
1048 return output, status
1049
1050 raise N2VCExecutionException(
1051 message='Cannot execute action on charm',
1052 primitive_name=action_name
1053 )
1054
1055 async def _juju_configure_application(
1056 self,
1057 model_name: str,
1058 application_name: str,
1059 config: dict,
1060 db_dict: dict,
1061 progress_timeout: float = None,
1062 total_timeout: float = None
1063 ):
1064
1065 # get the application
1066 application = await self._juju_get_application(model_name=model_name, application_name=application_name)
1067
1068 self.debug('configuring the application {} -> {}'.format(application_name, config))
1069 res = await application.set_config(config)
1070 self.debug('application {} configured. res={}'.format(application_name, res))
1071
1072 # Verify the config is set
1073 new_conf = await application.get_config()
1074 for key in config:
1075 value = new_conf[key]['value']
1076 self.debug(' {} = {}'.format(key, value))
1077 if config[key] != value:
1078 raise N2VCException(
1079 message='key {} is not configured correctly {} != {}'.format(key, config[key], new_conf[key])
1080 )
1081
1082 # check if 'verify-ssh-credentials' action exists
1083 # unit = application.units[0]
1084 actions = await application.get_actions()
1085 if 'verify-ssh-credentials' not in actions:
1086 msg = 'Action verify-ssh-credentials does not exist in application {}'.format(application_name)
1087 self.debug(msg=msg)
1088 return False
1089
1090 # execute verify-credentials
1091 num_retries = 20
1092 retry_timeout = 15.0
1093 for i in range(num_retries):
1094 try:
1095 self.debug('Executing action verify-ssh-credentials...')
1096 output, ok = await self._juju_execute_action(
1097 model_name=model_name,
1098 application_name=application_name,
1099 action_name='verify-ssh-credentials',
1100 db_dict=db_dict,
1101 progress_timeout=progress_timeout,
1102 total_timeout=total_timeout
1103 )
1104 self.debug('Result: {}, output: {}'.format(ok, output))
1105 return True
1106 except Exception as e:
1107 self.debug('Error executing verify-ssh-credentials: {}. Retrying...'.format(e))
1108 await asyncio.sleep(retry_timeout)
1109 else:
1110 self.error('Error executing verify-ssh-credentials after {} retries. '.format(num_retries))
1111 return False
1112
1113 async def _juju_get_application(
1114 self,
1115 model_name: str,
1116 application_name: str
1117 ):
1118 """Get the deployed application."""
1119
1120 model = await self._juju_get_model(model_name=model_name)
1121
1122 application_name = N2VCJujuConnector._format_app_name(application_name)
1123
1124 if model.applications and application_name in model.applications:
1125 return model.applications[application_name]
1126 else:
1127 raise N2VCException(message='Cannot get application {} from model {}'.format(application_name, model_name))
1128
1129 async def _juju_get_model(self, model_name: str) -> Model:
1130 """ Get a model object from juju controller
1131 If the model does not exits, it creates it.
1132
1133 :param str model_name: name of the model
1134 :returns Model: model obtained from juju controller or Exception
1135 """
1136
1137 # format model name
1138 model_name = N2VCJujuConnector._format_model_name(model_name)
1139
1140 if model_name in self.juju_models:
1141 return self.juju_models[model_name]
1142
1143 if self._creating_model:
1144 self.debug('Another coroutine is creating a model. Wait...')
1145 while self._creating_model:
1146 # another coroutine is creating a model, wait
1147 await asyncio.sleep(0.1)
1148 # retry (perhaps another coroutine has created the model meanwhile)
1149 if model_name in self.juju_models:
1150 return self.juju_models[model_name]
1151
1152 try:
1153 self._creating_model = True
1154
1155 # get juju model names from juju
1156 model_list = await self.controller.list_models()
1157
1158 if model_name not in model_list:
1159 self.info('Model {} does not exist. Creating new model...'.format(model_name))
1160 config_dict = {'authorized-keys': self.public_key}
1161 if self.apt_mirror:
1162 config_dict['apt-mirror'] = self.apt_mirror
1163 if not self.enable_os_upgrade:
1164 config_dict['enable-os-refresh-update'] = False
1165 config_dict['enable-os-upgrade'] = False
1166
1167 model = await self.controller.add_model(
1168 model_name=model_name,
1169 config=config_dict
1170 )
1171 self.info('New model created, name={}'.format(model_name))
1172 else:
1173 self.debug('Model already exists in juju. Getting model {}'.format(model_name))
1174 model = await self.controller.get_model(model_name)
1175 self.debug('Existing model in juju, name={}'.format(model_name))
1176
1177 self.juju_models[model_name] = model
1178 self.juju_observers[model_name] = JujuModelObserver(n2vc=self, model=model)
1179 return model
1180
1181 except Exception as e:
1182 msg = 'Cannot get model {}. Exception: {}'.format(model_name, e)
1183 self.error(msg)
1184 raise N2VCException(msg)
1185 finally:
1186 self._creating_model = False
1187
1188 async def _juju_add_relation(
1189 self,
1190 model_name: str,
1191 application_name_1: str,
1192 application_name_2: str,
1193 relation_1: str,
1194 relation_2: str
1195 ):
1196
1197 # get juju model and observer
1198 model = await self._juju_get_model(model_name=model_name)
1199
1200 r1 = '{}:{}'.format(application_name_1, relation_1)
1201 r2 = '{}:{}'.format(application_name_2, relation_2)
1202
1203 self.debug('adding relation: {} -> {}'.format(r1, r2))
1204 try:
1205 await model.add_relation(relation1=r1, relation2=r2)
1206 except JujuAPIError as e:
1207 # If one of the applications in the relationship doesn't exist, or the relation has already been added,
1208 # let the operation fail silently.
1209 if 'not found' in e.message:
1210 return
1211 if 'already exists' in e.message:
1212 return
1213 # another execption, raise it
1214 raise e
1215
1216 async def _juju_destroy_application(
1217 self,
1218 model_name: str,
1219 application_name: str
1220 ):
1221
1222 self.debug('Destroying application {} in model {}'.format(application_name, model_name))
1223
1224 # get juju model and observer
1225 model = await self._juju_get_model(model_name=model_name)
1226
1227 application = model.applications.get(application_name)
1228 if application:
1229 await application.destroy()
1230 else:
1231 self.debug('Application not found: {}'.format(application_name))
1232
1233 async def _juju_destroy_machine(
1234 self,
1235 model_name: str,
1236 machine_id: str,
1237 total_timeout: float = None
1238 ):
1239
1240 self.debug('Destroying machine {} in model {}'.format(machine_id, model_name))
1241
1242 if total_timeout is None:
1243 total_timeout = 3600
1244
1245 # get juju model and observer
1246 model = await self._juju_get_model(model_name=model_name)
1247
1248 machines = await model.get_machines()
1249 if machine_id in machines:
1250 machine = model.machines[machine_id]
1251 await machine.destroy(force=True)
1252 # max timeout
1253 end = time.time() + total_timeout
1254 # wait for machine removal
1255 machines = await model.get_machines()
1256 while machine_id in machines and time.time() < end:
1257 self.debug('Waiting for machine {} is destroyed'.format(machine_id))
1258 await asyncio.sleep(0.5)
1259 machines = await model.get_machines()
1260 self.debug('Machine destroyed: {}'.format(machine_id))
1261 else:
1262 self.debug('Machine not found: {}'.format(machine_id))
1263
1264 async def _juju_destroy_model(
1265 self,
1266 model_name: str,
1267 total_timeout: float = None
1268 ):
1269
1270 self.debug('Destroying model {}'.format(model_name))
1271
1272 if total_timeout is None:
1273 total_timeout = 3600
1274
1275 model = await self._juju_get_model(model_name=model_name)
1276 uuid = model.info.uuid
1277
1278 # destroy machines
1279 machines = await model.get_machines()
1280 for machine_id in machines:
1281 try:
1282 await self._juju_destroy_machine(model_name=model_name, machine_id=machine_id)
1283 except Exception as e:
1284 # ignore exceptions destroying machine
1285 pass
1286
1287 await self._juju_disconnect_model(model_name=model_name)
1288 self.juju_models[model_name] = None
1289 self.juju_observers[model_name] = None
1290
1291 self.debug('destroying model {}...'.format(model_name))
1292 await self.controller.destroy_model(uuid)
1293 self.debug('model destroy requested {}'.format(model_name))
1294
1295 # wait for model is completely destroyed
1296 end = time.time() + total_timeout
1297 while time.time() < end:
1298 self.debug('Waiting for model is destroyed...')
1299 try:
1300 # await self.controller.get_model(uuid)
1301 models = await self.controller.list_models()
1302 if model_name not in models:
1303 self.debug('The model {} ({}) was destroyed'.format(model_name, uuid))
1304 return
1305 except Exception as e:
1306 pass
1307 await asyncio.sleep(1.0)
1308
1309 async def _juju_login(self):
1310 """Connect to juju controller
1311
1312 """
1313
1314 # if already authenticated, exit function
1315 if self._authenticated:
1316 return
1317
1318 # if connecting, wait for finish
1319 # another task could be trying to connect in parallel
1320 while self._connecting:
1321 await asyncio.sleep(0.1)
1322
1323 # double check after other task has finished
1324 if self._authenticated:
1325 return
1326
1327 try:
1328 self._connecting = True
1329 self.info(
1330 'connecting to juju controller: {} {}:{} ca_cert: {}'
1331 .format(self.url, self.username, self.secret, '\n'+self.ca_cert if self.ca_cert else 'None'))
1332
1333 # Create controller object
1334 self.controller = Controller(loop=self.loop)
1335 # Connect to controller
1336 await self.controller.connect(
1337 endpoint=self.url,
1338 username=self.username,
1339 password=self.secret,
1340 cacert=self.ca_cert
1341 )
1342 self._authenticated = True
1343 self.info('juju controller connected')
1344 except Exception as e:
1345 message = 'Exception connecting to juju: {}'.format(e)
1346 self.error(message)
1347 raise N2VCConnectionException(
1348 message=message,
1349 url=self.url
1350 )
1351 finally:
1352 self._connecting = False
1353
1354 async def _juju_logout(self):
1355 """Logout of the Juju controller."""
1356 if not self._authenticated:
1357 return False
1358
1359 # disconnect all models
1360 for model_name in self.juju_models:
1361 try:
1362 await self._juju_disconnect_model(model_name)
1363 except Exception as e:
1364 self.error('Error disconnecting model {} : {}'.format(model_name, e))
1365 # continue with next model...
1366
1367 self.info("Disconnecting controller")
1368 try:
1369 await self.controller.disconnect()
1370 except Exception as e:
1371 raise N2VCConnectionException(message='Error disconnecting controller: {}'.format(e), url=self.url)
1372
1373 self.controller = None
1374 self._authenticated = False
1375 self.info('disconnected')
1376
1377 async def _juju_disconnect_model(
1378 self,
1379 model_name: str
1380 ):
1381 self.debug("Disconnecting model {}".format(model_name))
1382 if model_name in self.juju_models:
1383 await self.juju_models[model_name].disconnect()
1384 self.juju_models[model_name] = None
1385 self.juju_observers[model_name] = None
1386 else:
1387 self.warning('Cannot disconnect model: {}'.format(model_name))
1388
1389 def _create_juju_public_key(self):
1390 """Recreate the Juju public key on lcm container, if needed
1391 Certain libjuju commands expect to be run from the same machine as Juju
1392 is bootstrapped to. This method will write the public key to disk in
1393 that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
1394 """
1395
1396 # Make sure that we have a public key before writing to disk
1397 if self.public_key is None or len(self.public_key) == 0:
1398 if 'OSMLCM_VCA_PUBKEY' in os.environ:
1399 self.public_key = os.getenv('OSMLCM_VCA_PUBKEY', '')
1400 if len(self.public_key) == 0:
1401 return
1402 else:
1403 return
1404
1405 pk_path = "{}/.local/share/juju/ssh".format(os.path.expanduser('~'))
1406 file_path = "{}/juju_id_rsa.pub".format(pk_path)
1407 self.debug('writing juju public key to file:\n{}\npublic key: {}'.format(file_path, self.public_key))
1408 if not os.path.exists(pk_path):
1409 # create path and write file
1410 os.makedirs(pk_path)
1411 with open(file_path, 'w') as f:
1412 self.debug('Creating juju public key file: {}'.format(file_path))
1413 f.write(self.public_key)
1414 else:
1415 self.debug('juju public key file already exists: {}'.format(file_path))
1416
1417 @staticmethod
1418 def _format_model_name(name: str) -> str:
1419 """Format the name of the model.
1420
1421 Model names may only contain lowercase letters, digits and hyphens
1422 """
1423
1424 return name.replace('_', '-').replace(' ', '-').lower()
1425
1426 @staticmethod
1427 def _format_app_name(name: str) -> str:
1428 """Format the name of the application (in order to assure valid application name).
1429
1430 Application names have restrictions (run juju deploy --help):
1431 - contains lowercase letters 'a'-'z'
1432 - contains numbers '0'-'9'
1433 - contains hyphens '-'
1434 - starts with a lowercase letter
1435 - not two or more consecutive hyphens
1436 - after a hyphen, not a group with all numbers
1437 """
1438
1439 def all_numbers(s: str) -> bool:
1440 for c in s:
1441 if not c.isdigit():
1442 return False
1443 return True
1444
1445 new_name = name.replace('_', '-')
1446 new_name = new_name.replace(' ', '-')
1447 new_name = new_name.lower()
1448 while new_name.find('--') >= 0:
1449 new_name = new_name.replace('--', '-')
1450 groups = new_name.split('-')
1451
1452 # find 'all numbers' groups and prefix them with a letter
1453 app_name = ''
1454 for i in range(len(groups)):
1455 group = groups[i]
1456 if all_numbers(group):
1457 group = 'z' + group
1458 if i > 0:
1459 app_name += '-'
1460 app_name += group
1461
1462 if app_name[0].isdigit():
1463 app_name = 'z' + app_name
1464
1465 return app_name