2 # Copyright 2019 Telefonica Investigacion y Desarrollo, S.A.U.
3 # This file is part of OSM
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
19 # For those usages not covered by the Apache License, Version 2.0 please
20 # contact with: nfvlabs@tid.es
31 from juju
.action
import Action
32 from juju
.application
import Application
33 from juju
.client
import client
34 from juju
.controller
import Controller
35 from juju
.errors
import JujuAPIError
36 from juju
.machine
import Machine
37 from juju
.model
import Model
38 from n2vc
.exceptions
import (
39 N2VCBadArgumentsException
,
41 N2VCConnectionException
,
42 N2VCExecutionException
,
43 N2VCInvalidCertificate
,
47 from n2vc
.juju_observer
import JujuModelObserver
48 from n2vc
.n2vc_conn
import N2VCConnector
49 from n2vc
.n2vc_conn
import obj_to_dict
, obj_to_yaml
50 from n2vc
.provisioner
import AsyncSSHProvisioner
51 from n2vc
.libjuju
import Libjuju
54 class N2VCJujuConnector(N2VCConnector
):
57 ####################################################################################
58 ################################### P U B L I C ####################################
59 ####################################################################################
62 BUILT_IN_CLOUDS
= ["localhost", "microk8s"]
70 url
: str = "127.0.0.1:17070",
71 username
: str = "admin",
72 vca_config
: dict = None,
75 """Initialize juju N2VC connector
78 # parent class constructor
79 N2VCConnector
.__init
__(
87 vca_config
=vca_config
,
88 on_update_db
=on_update_db
,
91 # silence websocket traffic log
92 logging
.getLogger("websockets.protocol").setLevel(logging
.INFO
)
93 logging
.getLogger("juju.client.connection").setLevel(logging
.WARN
)
94 logging
.getLogger("model").setLevel(logging
.WARN
)
96 self
.log
.info("Initializing N2VC juju connector...")
99 ##############################################################
101 ##############################################################
106 raise N2VCBadArgumentsException("Argument url is mandatory", ["url"])
107 url_parts
= url
.split(":")
108 if len(url_parts
) != 2:
109 raise N2VCBadArgumentsException(
110 "Argument url: bad format (localhost:port) -> {}".format(url
), ["url"]
112 self
.hostname
= url_parts
[0]
114 self
.port
= int(url_parts
[1])
116 raise N2VCBadArgumentsException(
117 "url port must be a number -> {}".format(url
), ["url"]
122 raise N2VCBadArgumentsException(
123 "Argument username is mandatory", ["username"]
127 if vca_config
is None:
128 raise N2VCBadArgumentsException(
129 "Argument vca_config is mandatory", ["vca_config"]
132 if "secret" in vca_config
:
133 self
.secret
= vca_config
["secret"]
135 raise N2VCBadArgumentsException(
136 "Argument vca_config.secret is mandatory", ["vca_config.secret"]
139 # pubkey of juju client in osm machine: ~/.local/share/juju/ssh/juju_id_rsa.pub
140 # if exists, it will be written in lcm container: _create_juju_public_key()
141 if "public_key" in vca_config
:
142 self
.public_key
= vca_config
["public_key"]
144 self
.public_key
= None
146 # TODO: Verify ca_cert is valid before using. VCA will crash
147 # if the ca_cert isn't formatted correctly.
148 def base64_to_cacert(b64string
):
149 """Convert the base64-encoded string containing the VCA CACERT.
155 cacert
= base64
.b64decode(b64string
).decode("utf-8")
157 cacert
= re
.sub(r
"\\n", r
"\n", cacert
,)
158 except binascii
.Error
as e
:
159 self
.log
.debug("Caught binascii.Error: {}".format(e
))
160 raise N2VCInvalidCertificate(message
="Invalid CA Certificate")
164 self
.ca_cert
= vca_config
.get("ca_cert")
166 self
.ca_cert
= base64_to_cacert(vca_config
["ca_cert"])
168 if "api_proxy" in vca_config
:
169 self
.api_proxy
= vca_config
["api_proxy"]
171 "api_proxy for native charms configured: {}".format(self
.api_proxy
)
175 "api_proxy is not configured. Support for native charms is disabled"
178 if "enable_os_upgrade" in vca_config
:
179 self
.enable_os_upgrade
= vca_config
["enable_os_upgrade"]
181 self
.enable_os_upgrade
= True
183 if "apt_mirror" in vca_config
:
184 self
.apt_mirror
= vca_config
["apt_mirror"]
186 self
.apt_mirror
= None
188 self
.cloud
= vca_config
.get("cloud")
189 # self.log.debug('Arguments have been checked')
192 self
.controller
= None # it will be filled when connect to juju
193 self
.juju_models
= {} # model objects for every model_name
194 self
.juju_observers
= {} # model observers for every model_name
196 False # while connecting to juju (to avoid duplicate connections)
198 self
._authenticated
= (
199 False # it will be True when juju connection be stablished
201 self
._creating
_model
= False # True during model creation
202 self
.libjuju
= Libjuju(
204 api_proxy
=self
.api_proxy
,
205 enable_os_upgrade
=self
.enable_os_upgrade
,
206 apt_mirror
=self
.apt_mirror
,
207 username
=self
.username
,
208 password
=self
.secret
,
216 # create juju pub key file in lcm container at
217 # ./local/share/juju/ssh/juju_id_rsa.pub
218 self
._create
_juju
_public
_key
()
220 self
.log
.info("N2VC juju connector initialized")
222 async def get_status(self
, namespace
: str, yaml_format
: bool = True):
224 # self.log.info('Getting NS status. namespace: {}'.format(namespace))
226 _nsi_id
, ns_id
, _vnf_id
, _vdu_id
, _vdu_count
= self
._get
_namespace
_components
(
229 # model name is ns_id
231 if model_name
is None:
232 msg
= "Namespace {} not valid".format(namespace
)
234 raise N2VCBadArgumentsException(msg
, ["namespace"])
236 status
= await self
.libjuju
.get_model_status(model_name
)
239 return obj_to_yaml(status
)
241 return obj_to_dict(status
)
243 async def create_execution_environment(
247 reuse_ee_id
: str = None,
248 progress_timeout
: float = None,
249 total_timeout
: float = None,
253 "Creating execution environment. namespace: {}, reuse_ee_id: {}".format(
254 namespace
, reuse_ee_id
260 model_name
, application_name
, machine_id
= self
._get
_ee
_id
_components
(
270 ) = self
._get
_namespace
_components
(namespace
=namespace
)
271 # model name is ns_id
274 application_name
= self
._get
_application
_name
(namespace
=namespace
)
277 "model name: {}, application name: {}, machine_id: {}".format(
278 model_name
, application_name
, machine_id
282 # create or reuse a new juju machine
284 if not await self
.libjuju
.model_exists(model_name
):
285 await self
.libjuju
.add_model(model_name
, cloud_name
=self
.cloud
)
286 machine
, new
= await self
.libjuju
.create_machine(
287 model_name
=model_name
,
288 machine_id
=machine_id
,
290 progress_timeout
=progress_timeout
,
291 total_timeout
=total_timeout
,
293 # id for the execution environment
294 ee_id
= N2VCJujuConnector
._build
_ee
_id
(
295 model_name
=model_name
,
296 application_name
=application_name
,
297 machine_id
=str(machine
.entity_id
),
299 self
.log
.debug("ee_id: {}".format(ee_id
))
302 # write ee_id in database
303 self
._write
_ee
_id
_db
(db_dict
=db_dict
, ee_id
=ee_id
)
305 except Exception as e
:
306 message
= "Error creating machine on juju: {}".format(e
)
307 self
.log
.error(message
)
308 raise N2VCException(message
=message
)
310 # new machine credentials
312 "hostname": machine
.dns_name
,
316 "Execution environment created. ee_id: {}, credentials: {}".format(
321 return ee_id
, credentials
323 async def register_execution_environment(
328 progress_timeout
: float = None,
329 total_timeout
: float = None,
333 "Registering execution environment. namespace={}, credentials={}".format(
334 namespace
, credentials
338 if credentials
is None:
339 raise N2VCBadArgumentsException(
340 message
="credentials are mandatory", bad_args
=["credentials"]
342 if credentials
.get("hostname"):
343 hostname
= credentials
["hostname"]
345 raise N2VCBadArgumentsException(
346 message
="hostname is mandatory", bad_args
=["credentials.hostname"]
348 if credentials
.get("username"):
349 username
= credentials
["username"]
351 raise N2VCBadArgumentsException(
352 message
="username is mandatory", bad_args
=["credentials.username"]
354 if "private_key_path" in credentials
:
355 private_key_path
= credentials
["private_key_path"]
357 # if not passed as argument, use generated private key path
358 private_key_path
= self
.private_key_path
360 _nsi_id
, ns_id
, _vnf_id
, _vdu_id
, _vdu_count
= self
._get
_namespace
_components
(
367 application_name
= self
._get
_application
_name
(namespace
=namespace
)
369 # register machine on juju
371 if not self
.api_proxy
:
372 msg
= "Cannot provision machine: api_proxy is not defined"
373 self
.log
.error(msg
=msg
)
374 raise N2VCException(message
=msg
)
375 if not await self
.libjuju
.model_exists(model_name
):
376 await self
.libjuju
.add_model(model_name
, cloud_name
=self
.cloud
)
377 machine_id
= await self
.libjuju
.provision_machine(
378 model_name
=model_name
,
381 private_key_path
=private_key_path
,
383 progress_timeout
=progress_timeout
,
384 total_timeout
=total_timeout
,
386 except Exception as e
:
387 self
.log
.error("Error registering machine: {}".format(e
))
389 message
="Error registering machine on juju: {}".format(e
)
392 self
.log
.info("Machine registered: {}".format(machine_id
))
394 # id for the execution environment
395 ee_id
= N2VCJujuConnector
._build
_ee
_id
(
396 model_name
=model_name
,
397 application_name
=application_name
,
398 machine_id
=str(machine_id
),
401 self
.log
.info("Execution environment registered. ee_id: {}".format(ee_id
))
405 async def install_configuration_sw(
410 progress_timeout
: float = None,
411 total_timeout
: float = None,
418 "Installing configuration sw on ee_id: {}, "
419 "artifact path: {}, db_dict: {}"
420 ).format(ee_id
, artifact_path
, db_dict
)
424 if ee_id
is None or len(ee_id
) == 0:
425 raise N2VCBadArgumentsException(
426 message
="ee_id is mandatory", bad_args
=["ee_id"]
428 if artifact_path
is None or len(artifact_path
) == 0:
429 raise N2VCBadArgumentsException(
430 message
="artifact_path is mandatory", bad_args
=["artifact_path"]
433 raise N2VCBadArgumentsException(
434 message
="db_dict is mandatory", bad_args
=["db_dict"]
442 ) = N2VCJujuConnector
._get
_ee
_id
_components
(ee_id
=ee_id
)
444 "model: {}, application: {}, machine: {}".format(
445 model_name
, application_name
, machine_id
449 raise N2VCBadArgumentsException(
450 message
="ee_id={} is not a valid execution environment id".format(
456 # remove // in charm path
457 while artifact_path
.find("//") >= 0:
458 artifact_path
= artifact_path
.replace("//", "/")
461 if not self
.fs
.file_exists(artifact_path
, mode
="dir"):
462 msg
= "artifact path does not exist: {}".format(artifact_path
)
463 raise N2VCBadArgumentsException(message
=msg
, bad_args
=["artifact_path"])
465 if artifact_path
.startswith("/"):
466 full_path
= self
.fs
.path
+ artifact_path
468 full_path
= self
.fs
.path
+ "/" + artifact_path
471 await self
.libjuju
.deploy_charm(
472 model_name
=model_name
,
473 application_name
=application_name
,
475 machine_id
=machine_id
,
477 progress_timeout
=progress_timeout
,
478 total_timeout
=total_timeout
,
482 except Exception as e
:
484 message
="Error desploying charm into ee={} : {}".format(ee_id
, e
)
487 self
.log
.info("Configuration sw installed")
489 async def get_ee_ssh_public__key(
493 progress_timeout
: float = None,
494 total_timeout
: float = None,
499 "Generating priv/pub key pair and get pub key on ee_id: {}, db_dict: {}"
500 ).format(ee_id
, db_dict
)
504 if ee_id
is None or len(ee_id
) == 0:
505 raise N2VCBadArgumentsException(
506 message
="ee_id is mandatory", bad_args
=["ee_id"]
509 raise N2VCBadArgumentsException(
510 message
="db_dict is mandatory", bad_args
=["db_dict"]
518 ) = N2VCJujuConnector
._get
_ee
_id
_components
(ee_id
=ee_id
)
520 "model: {}, application: {}, machine: {}".format(
521 model_name
, application_name
, machine_id
525 raise N2VCBadArgumentsException(
526 message
="ee_id={} is not a valid execution environment id".format(
532 # try to execute ssh layer primitives (if exist):
538 application_name
= N2VCJujuConnector
._format
_app
_name
(application_name
)
540 # execute action: generate-ssh-key
542 output
, _status
= await self
.libjuju
.execute_action(
543 model_name
=model_name
,
544 application_name
=application_name
,
545 action_name
="generate-ssh-key",
547 progress_timeout
=progress_timeout
,
548 total_timeout
=total_timeout
,
550 except Exception as e
:
552 "Skipping exception while executing action generate-ssh-key: {}".format(
557 # execute action: get-ssh-public-key
559 output
, _status
= await self
.libjuju
.execute_action(
560 model_name
=model_name
,
561 application_name
=application_name
,
562 action_name
="get-ssh-public-key",
564 progress_timeout
=progress_timeout
,
565 total_timeout
=total_timeout
,
567 except Exception as e
:
568 msg
= "Cannot execute action get-ssh-public-key: {}\n".format(e
)
570 raise N2VCExecutionException(e
, primitive_name
="get-ssh-public-key")
572 # return public key if exists
573 return output
["pubkey"] if "pubkey" in output
else output
575 async def add_relation(
576 self
, ee_id_1
: str, ee_id_2
: str, endpoint_1
: str, endpoint_2
: str
580 "adding new relation between {} and {}, endpoints: {}, {}".format(
581 ee_id_1
, ee_id_2
, endpoint_1
, endpoint_2
587 message
= "EE 1 is mandatory"
588 self
.log
.error(message
)
589 raise N2VCBadArgumentsException(message
=message
, bad_args
=["ee_id_1"])
591 message
= "EE 2 is mandatory"
592 self
.log
.error(message
)
593 raise N2VCBadArgumentsException(message
=message
, bad_args
=["ee_id_2"])
595 message
= "endpoint 1 is mandatory"
596 self
.log
.error(message
)
597 raise N2VCBadArgumentsException(message
=message
, bad_args
=["endpoint_1"])
599 message
= "endpoint 2 is mandatory"
600 self
.log
.error(message
)
601 raise N2VCBadArgumentsException(message
=message
, bad_args
=["endpoint_2"])
603 # get the model, the applications and the machines from the ee_id's
604 model_1
, app_1
, _machine_1
= self
._get
_ee
_id
_components
(ee_id_1
)
605 model_2
, app_2
, _machine_2
= self
._get
_ee
_id
_components
(ee_id_2
)
607 # model must be the same
608 if model_1
!= model_2
:
609 message
= "EE models are not the same: {} vs {}".format(ee_id_1
, ee_id_2
)
610 self
.log
.error(message
)
611 raise N2VCBadArgumentsException(
612 message
=message
, bad_args
=["ee_id_1", "ee_id_2"]
615 # add juju relations between two applications
617 await self
.libjuju
.add_relation(
619 application_name_1
=app_1
,
620 application_name_2
=app_2
,
621 relation_1
=endpoint_1
,
622 relation_2
=endpoint_2
,
624 except Exception as e
:
625 message
= "Error adding relation between {} and {}: {}".format(
628 self
.log
.error(message
)
629 raise N2VCException(message
=message
)
631 async def remove_relation(self
):
633 self
.log
.info("Method not implemented yet")
634 raise MethodNotImplemented()
636 async def deregister_execution_environments(self
):
637 self
.log
.info("Method not implemented yet")
638 raise MethodNotImplemented()
640 async def delete_namespace(
641 self
, namespace
: str, db_dict
: dict = None, total_timeout
: float = None
643 self
.log
.info("Deleting namespace={}".format(namespace
))
646 if namespace
is None:
647 raise N2VCBadArgumentsException(
648 message
="namespace is mandatory", bad_args
=["namespace"]
651 _nsi_id
, ns_id
, _vnf_id
, _vdu_id
, _vdu_count
= self
._get
_namespace
_components
(
654 if ns_id
is not None:
656 if not await self
.libjuju
.model_exists(ns_id
):
657 raise N2VCNotFound(message
="Model {} does not exist".format(ns_id
))
658 await self
.libjuju
.destroy_model(
659 model_name
=ns_id
, total_timeout
=total_timeout
663 except Exception as e
:
665 message
="Error deleting namespace {} : {}".format(namespace
, e
)
668 raise N2VCBadArgumentsException(
669 message
="only ns_id is permitted to delete yet", bad_args
=["namespace"]
672 self
.log
.info("Namespace {} deleted".format(namespace
))
674 async def delete_execution_environment(
675 self
, ee_id
: str, db_dict
: dict = None, total_timeout
: float = None
677 self
.log
.info("Deleting execution environment ee_id={}".format(ee_id
))
681 raise N2VCBadArgumentsException(
682 message
="ee_id is mandatory", bad_args
=["ee_id"]
685 model_name
, application_name
, _machine_id
= self
._get
_ee
_id
_components
(
689 # destroy the application
691 await self
.libjuju
.destroy_model(
692 model_name
=model_name
, total_timeout
=total_timeout
694 except Exception as e
:
697 "Error deleting execution environment {} (application {}) : {}"
698 ).format(ee_id
, application_name
, e
)
701 # destroy the machine
703 # await self._juju_destroy_machine(
704 # model_name=model_name,
705 # machine_id=machine_id,
706 # total_timeout=total_timeout
708 # except Exception as e:
709 # raise N2VCException(
710 # message='Error deleting execution environment {} (machine {}) : {}'
711 # .format(ee_id, machine_id, e))
713 self
.log
.info("Execution environment {} deleted".format(ee_id
))
715 async def exec_primitive(
720 db_dict
: dict = None,
721 progress_timeout
: float = None,
722 total_timeout
: float = None,
726 "Executing primitive: {} on ee: {}, params: {}".format(
727 primitive_name
, ee_id
, params_dict
732 if ee_id
is None or len(ee_id
) == 0:
733 raise N2VCBadArgumentsException(
734 message
="ee_id is mandatory", bad_args
=["ee_id"]
736 if primitive_name
is None or len(primitive_name
) == 0:
737 raise N2VCBadArgumentsException(
738 message
="action_name is mandatory", bad_args
=["action_name"]
740 if params_dict
is None:
748 ) = N2VCJujuConnector
._get
_ee
_id
_components
(ee_id
=ee_id
)
750 raise N2VCBadArgumentsException(
751 message
="ee_id={} is not a valid execution environment id".format(
757 if primitive_name
== "config":
758 # Special case: config primitive
760 await self
.libjuju
.configure_application(
761 model_name
=model_name
,
762 application_name
=application_name
,
765 actions
= await self
.libjuju
.get_actions(
766 application_name
=application_name
, model_name
=model_name
,
769 "Application {} has these actions: {}".format(
770 application_name
, actions
773 if "verify-ssh-credentials" in actions
:
774 # execute verify-credentials
777 for _
in range(num_retries
):
779 self
.log
.debug("Executing action verify-ssh-credentials...")
780 output
, ok
= await self
.libjuju
.execute_action(
781 model_name
=model_name
,
782 application_name
=application_name
,
783 action_name
="verify-ssh-credentials",
785 progress_timeout
=progress_timeout
,
786 total_timeout
=total_timeout
,
788 self
.log
.debug("Result: {}, output: {}".format(ok
, output
))
790 except asyncio
.CancelledError
:
792 except Exception as e
:
794 "Error executing verify-ssh-credentials: {}. Retrying...".format(
798 await asyncio
.sleep(retry_timeout
)
801 "Error executing verify-ssh-credentials after {} retries. ".format(
806 msg
= "Action verify-ssh-credentials does not exist in application {}".format(
809 self
.log
.debug(msg
=msg
)
810 except Exception as e
:
811 self
.log
.error("Error configuring juju application: {}".format(e
))
812 raise N2VCExecutionException(
813 message
="Error configuring application into ee={} : {}".format(
816 primitive_name
=primitive_name
,
821 output
, status
= await self
.libjuju
.execute_action(
822 model_name
=model_name
,
823 application_name
=application_name
,
824 action_name
=primitive_name
,
826 progress_timeout
=progress_timeout
,
827 total_timeout
=total_timeout
,
830 if status
== "completed":
833 raise Exception("status is not completed: {}".format(status
))
834 except Exception as e
:
836 "Error executing primitive {}: {}".format(primitive_name
, e
)
838 raise N2VCExecutionException(
839 message
="Error executing primitive {} into ee={} : {}".format(
840 primitive_name
, ee_id
, e
842 primitive_name
=primitive_name
,
845 async def disconnect(self
):
846 self
.log
.info("closing juju N2VC...")
848 await self
.libjuju
.disconnect()
849 except Exception as e
:
850 raise N2VCConnectionException(
851 message
="Error disconnecting controller: {}".format(e
), url
=self
.url
855 ####################################################################################
856 ################################### P R I V A T E ##################################
857 ####################################################################################
860 def _write_ee_id_db(self
, db_dict
: dict, ee_id
: str):
862 # write ee_id to database: _admin.deployed.VCA.x
864 the_table
= db_dict
["collection"]
865 the_filter
= db_dict
["filter"]
866 the_path
= db_dict
["path"]
867 if not the_path
[-1] == ".":
868 the_path
= the_path
+ "."
869 update_dict
= {the_path
+ "ee_id": ee_id
}
870 # self.log.debug('Writing ee_id to database: {}'.format(the_path))
874 update_dict
=update_dict
,
877 except asyncio
.CancelledError
:
879 except Exception as e
:
880 self
.log
.error("Error writing ee_id to database: {}".format(e
))
883 def _build_ee_id(model_name
: str, application_name
: str, machine_id
: str):
885 Build an execution environment id form model, application and machine
887 :param application_name:
891 # id for the execution environment
892 return "{}.{}.{}".format(model_name
, application_name
, machine_id
)
895 def _get_ee_id_components(ee_id
: str) -> (str, str, str):
897 Get model, application and machine components from an execution environment id
899 :return: model_name, application_name, machine_id
903 return None, None, None
905 # split components of id
906 parts
= ee_id
.split(".")
907 model_name
= parts
[0]
908 application_name
= parts
[1]
909 machine_id
= parts
[2]
910 return model_name
, application_name
, machine_id
912 def _get_application_name(self
, namespace
: str) -> str:
914 Build application name from namespace
916 :return: app-vnf-<vnf id>-vdu-<vdu-id>-cnt-<vdu-count>
919 # TODO: Enforce the Juju 50-character application limit
921 # split namespace components
922 _
, _
, vnf_id
, vdu_id
, vdu_count
= self
._get
_namespace
_components
(
926 if vnf_id
is None or len(vnf_id
) == 0:
929 # Shorten the vnf_id to its last twelve characters
930 vnf_id
= "vnf-" + vnf_id
[-12:]
932 if vdu_id
is None or len(vdu_id
) == 0:
935 # Shorten the vdu_id to its last twelve characters
936 vdu_id
= "-vdu-" + vdu_id
[-12:]
938 if vdu_count
is None or len(vdu_count
) == 0:
941 vdu_count
= "-cnt-" + vdu_count
943 application_name
= "app-{}{}{}".format(vnf_id
, vdu_id
, vdu_count
)
945 return N2VCJujuConnector
._format
_app
_name
(application_name
)
947 async def _juju_create_machine(
950 application_name
: str,
951 machine_id
: str = None,
952 db_dict
: dict = None,
953 progress_timeout
: float = None,
954 total_timeout
: float = None,
958 "creating machine in model: {}, existing machine id: {}".format(
959 model_name
, machine_id
963 # get juju model and observer (create model if needed)
964 model
= await self
._juju
_get
_model
(model_name
=model_name
)
965 observer
= self
.juju_observers
[model_name
]
967 # find machine id in model
969 if machine_id
is not None:
970 self
.log
.debug("Finding existing machine id {} in model".format(machine_id
))
971 # get juju existing machines in the model
972 existing_machines
= await model
.get_machines()
973 if machine_id
in existing_machines
:
975 "Machine id {} found in model (reusing it)".format(machine_id
)
977 machine
= model
.machines
[machine_id
]
980 self
.log
.debug("Creating a new machine in juju...")
981 # machine does not exist, create it and wait for it
982 machine
= await model
.add_machine(
983 spec
=None, constraints
=None, disks
=None, series
="xenial"
986 # register machine with observer
987 observer
.register_machine(machine
=machine
, db_dict
=db_dict
)
989 # id for the execution environment
990 ee_id
= N2VCJujuConnector
._build
_ee
_id
(
991 model_name
=model_name
,
992 application_name
=application_name
,
993 machine_id
=str(machine
.entity_id
),
996 # write ee_id in database
997 self
._write
_ee
_id
_db
(db_dict
=db_dict
, ee_id
=ee_id
)
999 # wait for machine creation
1000 await observer
.wait_for_machine(
1001 machine_id
=str(machine
.entity_id
),
1002 progress_timeout
=progress_timeout
,
1003 total_timeout
=total_timeout
,
1008 self
.log
.debug("Reusing old machine pending")
1010 # register machine with observer
1011 observer
.register_machine(machine
=machine
, db_dict
=db_dict
)
1013 # machine does exist, but it is in creation process (pending), wait for
1014 # create finalisation
1015 await observer
.wait_for_machine(
1016 machine_id
=machine
.entity_id
,
1017 progress_timeout
=progress_timeout
,
1018 total_timeout
=total_timeout
,
1021 self
.log
.debug("Machine ready at " + str(machine
.dns_name
))
1024 async def _juju_provision_machine(
1029 private_key_path
: str,
1030 db_dict
: dict = None,
1031 progress_timeout
: float = None,
1032 total_timeout
: float = None,
1035 if not self
.api_proxy
:
1036 msg
= "Cannot provision machine: api_proxy is not defined"
1037 self
.log
.error(msg
=msg
)
1038 raise N2VCException(message
=msg
)
1041 "provisioning machine. model: {}, hostname: {}, username: {}".format(
1042 model_name
, hostname
, username
1046 if not self
._authenticated
:
1047 await self
._juju
_login
()
1049 # get juju model and observer
1050 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1051 observer
= self
.juju_observers
[model_name
]
1053 # TODO check if machine is already provisioned
1054 machine_list
= await model
.get_machines()
1056 provisioner
= AsyncSSHProvisioner(
1059 private_key_path
=private_key_path
,
1065 params
= await provisioner
.provision_machine()
1066 except Exception as ex
:
1067 msg
= "Exception provisioning machine: {}".format(ex
)
1069 raise N2VCException(message
=msg
)
1071 params
.jobs
= ["JobHostUnits"]
1073 connection
= model
.connection()
1075 # Submit the request.
1076 self
.log
.debug("Adding machine to model")
1077 client_facade
= client
.ClientFacade
.from_connection(connection
)
1078 results
= await client_facade
.AddMachines(params
=[params
])
1079 error
= results
.machines
[0].error
1081 msg
= "Error adding machine: {}".format(error
.message
)
1082 self
.log
.error(msg
=msg
)
1083 raise ValueError(msg
)
1085 machine_id
= results
.machines
[0].machine
1087 # Need to run this after AddMachines has been called,
1088 # as we need the machine_id
1089 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
1090 asyncio
.ensure_future(
1091 provisioner
.install_agent(
1092 connection
=connection
,
1094 machine_id
=machine_id
,
1099 # wait for machine in model (now, machine is not yet in model, so we must
1103 machine_list
= await model
.get_machines()
1104 if machine_id
in machine_list
:
1105 self
.log
.debug("Machine {} found in model!".format(machine_id
))
1106 machine
= model
.machines
.get(machine_id
)
1108 await asyncio
.sleep(2)
1111 msg
= "Machine {} not found in model".format(machine_id
)
1112 self
.log
.error(msg
=msg
)
1113 raise Exception(msg
)
1115 # register machine with observer
1116 observer
.register_machine(machine
=machine
, db_dict
=db_dict
)
1118 # wait for machine creation
1119 self
.log
.debug("waiting for provision finishes... {}".format(machine_id
))
1120 await observer
.wait_for_machine(
1121 machine_id
=machine_id
,
1122 progress_timeout
=progress_timeout
,
1123 total_timeout
=total_timeout
,
1126 self
.log
.debug("Machine provisioned {}".format(machine_id
))
1130 async def _juju_deploy_charm(
1133 application_name
: str,
1137 progress_timeout
: float = None,
1138 total_timeout
: float = None,
1139 config
: dict = None,
1140 ) -> (Application
, int):
1142 # get juju model and observer
1143 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1144 observer
= self
.juju_observers
[model_name
]
1146 # check if application already exists
1148 if application_name
in model
.applications
:
1149 application
= model
.applications
[application_name
]
1151 if application
is None:
1153 # application does not exist, create it and wait for it
1155 "deploying application {} to machine {}, model {}".format(
1156 application_name
, machine_id
, model_name
1159 self
.log
.debug("charm: {}".format(charm_path
))
1160 machine
= model
.machines
[machine_id
]
1162 application
= await model
.deploy(
1163 entity_url
=charm_path
,
1164 application_name
=application_name
,
1167 series
=machine
.series
,
1172 # register application with observer
1173 observer
.register_application(application
=application
, db_dict
=db_dict
)
1176 "waiting for application deployed... {}".format(application
.entity_id
)
1178 retries
= await observer
.wait_for_application(
1179 application_id
=application
.entity_id
,
1180 progress_timeout
=progress_timeout
,
1181 total_timeout
=total_timeout
,
1183 self
.log
.debug("application deployed")
1187 # register application with observer
1188 observer
.register_application(application
=application
, db_dict
=db_dict
)
1190 # application already exists, but not finalised
1191 self
.log
.debug("application already exists, waiting for deployed...")
1192 retries
= await observer
.wait_for_application(
1193 application_id
=application
.entity_id
,
1194 progress_timeout
=progress_timeout
,
1195 total_timeout
=total_timeout
,
1197 self
.log
.debug("application deployed")
1199 return application
, retries
1201 async def _juju_execute_action(
1204 application_name
: str,
1207 progress_timeout
: float = None,
1208 total_timeout
: float = None,
1212 # get juju model and observer
1213 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1214 observer
= self
.juju_observers
[model_name
]
1216 application
= await self
._juju
_get
_application
(
1217 model_name
=model_name
, application_name
=application_name
1221 for u
in application
.units
:
1222 if await u
.is_leader_from_status():
1224 if unit
is not None:
1225 actions
= await application
.get_actions()
1226 if action_name
in actions
:
1228 'executing action "{}" using params: {}'.format(action_name
, kwargs
)
1230 action
= await unit
.run_action(action_name
, **kwargs
)
1232 # register action with observer
1233 observer
.register_action(action
=action
, db_dict
=db_dict
)
1235 await observer
.wait_for_action(
1236 action_id
=action
.entity_id
,
1237 progress_timeout
=progress_timeout
,
1238 total_timeout
=total_timeout
,
1240 self
.log
.debug("action completed with status: {}".format(action
.status
))
1241 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
1242 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
1243 if action
.entity_id
in status
:
1244 status
= status
[action
.entity_id
]
1247 return output
, status
1249 raise N2VCExecutionException(
1250 message
="Cannot execute action on charm", primitive_name
=action_name
1253 async def _juju_configure_application(
1256 application_name
: str,
1259 progress_timeout
: float = None,
1260 total_timeout
: float = None,
1263 # get the application
1264 application
= await self
._juju
_get
_application
(
1265 model_name
=model_name
, application_name
=application_name
1269 "configuring the application {} -> {}".format(application_name
, config
)
1271 res
= await application
.set_config(config
)
1273 "application {} configured. res={}".format(application_name
, res
)
1276 # Verify the config is set
1277 new_conf
= await application
.get_config()
1279 value
= new_conf
[key
]["value"]
1280 self
.log
.debug(" {} = {}".format(key
, value
))
1281 if config
[key
] != value
:
1282 raise N2VCException(
1283 message
="key {} is not configured correctly {} != {}".format(
1284 key
, config
[key
], new_conf
[key
]
1288 # check if 'verify-ssh-credentials' action exists
1289 # unit = application.units[0]
1290 actions
= await application
.get_actions()
1291 if "verify-ssh-credentials" not in actions
:
1293 "Action verify-ssh-credentials does not exist in application {}"
1294 ).format(application_name
)
1295 self
.log
.debug(msg
=msg
)
1298 # execute verify-credentials
1300 retry_timeout
= 15.0
1301 for _
in range(num_retries
):
1303 self
.log
.debug("Executing action verify-ssh-credentials...")
1304 output
, ok
= await self
._juju
_execute
_action
(
1305 model_name
=model_name
,
1306 application_name
=application_name
,
1307 action_name
="verify-ssh-credentials",
1309 progress_timeout
=progress_timeout
,
1310 total_timeout
=total_timeout
,
1312 self
.log
.debug("Result: {}, output: {}".format(ok
, output
))
1314 except asyncio
.CancelledError
:
1316 except Exception as e
:
1318 "Error executing verify-ssh-credentials: {}. Retrying...".format(e
)
1320 await asyncio
.sleep(retry_timeout
)
1323 "Error executing verify-ssh-credentials after {} retries. ".format(
1329 async def _juju_get_application(self
, model_name
: str, application_name
: str):
1330 """Get the deployed application."""
1332 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1334 application_name
= N2VCJujuConnector
._format
_app
_name
(application_name
)
1336 if model
.applications
and application_name
in model
.applications
:
1337 return model
.applications
[application_name
]
1339 raise N2VCException(
1340 message
="Cannot get application {} from model {}".format(
1341 application_name
, model_name
1345 async def _juju_get_model(self
, model_name
: str) -> Model
:
1346 """ Get a model object from juju controller
1347 If the model does not exits, it creates it.
1349 :param str model_name: name of the model
1350 :returns Model: model obtained from juju controller or Exception
1354 model_name
= N2VCJujuConnector
._format
_model
_name
(model_name
)
1356 if model_name
in self
.juju_models
:
1357 return self
.juju_models
[model_name
]
1359 if self
._creating
_model
:
1360 self
.log
.debug("Another coroutine is creating a model. Wait...")
1361 while self
._creating
_model
:
1362 # another coroutine is creating a model, wait
1363 await asyncio
.sleep(0.1)
1364 # retry (perhaps another coroutine has created the model meanwhile)
1365 if model_name
in self
.juju_models
:
1366 return self
.juju_models
[model_name
]
1369 self
._creating
_model
= True
1371 # get juju model names from juju
1372 model_list
= await self
.controller
.list_models()
1374 if model_name
not in model_list
:
1376 "Model {} does not exist. Creating new model...".format(model_name
)
1378 config_dict
= {"authorized-keys": self
.public_key
}
1380 config_dict
["apt-mirror"] = self
.apt_mirror
1381 if not self
.enable_os_upgrade
:
1382 config_dict
["enable-os-refresh-update"] = False
1383 config_dict
["enable-os-upgrade"] = False
1384 if self
.cloud
in self
.BUILT_IN_CLOUDS
:
1385 model
= await self
.controller
.add_model(
1386 model_name
=model_name
,
1388 cloud_name
=self
.cloud
,
1391 model
= await self
.controller
.add_model(
1392 model_name
=model_name
,
1394 cloud_name
=self
.cloud
,
1395 credential_name
=self
.cloud
,
1397 self
.log
.info("New model created, name={}".format(model_name
))
1400 "Model already exists in juju. Getting model {}".format(model_name
)
1402 model
= await self
.controller
.get_model(model_name
)
1403 self
.log
.debug("Existing model in juju, name={}".format(model_name
))
1405 self
.juju_models
[model_name
] = model
1406 self
.juju_observers
[model_name
] = JujuModelObserver(n2vc
=self
, model
=model
)
1409 except Exception as e
:
1410 msg
= "Cannot get model {}. Exception: {}".format(model_name
, e
)
1412 raise N2VCException(msg
)
1414 self
._creating
_model
= False
1416 async def _juju_add_relation(
1419 application_name_1
: str,
1420 application_name_2
: str,
1425 # get juju model and observer
1426 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1428 r1
= "{}:{}".format(application_name_1
, relation_1
)
1429 r2
= "{}:{}".format(application_name_2
, relation_2
)
1431 self
.log
.debug("adding relation: {} -> {}".format(r1
, r2
))
1433 await model
.add_relation(relation1
=r1
, relation2
=r2
)
1434 except JujuAPIError
as e
:
1435 # If one of the applications in the relationship doesn't exist, or the
1436 # relation has already been added,
1437 # let the operation fail silently.
1438 if "not found" in e
.message
:
1440 if "already exists" in e
.message
:
1442 # another execption, raise it
1445 async def _juju_destroy_application(self
, model_name
: str, application_name
: str):
1448 "Destroying application {} in model {}".format(application_name
, model_name
)
1451 # get juju model and observer
1452 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1453 observer
= self
.juju_observers
[model_name
]
1455 application
= model
.applications
.get(application_name
)
1457 observer
.unregister_application(application_name
)
1458 await application
.destroy()
1460 self
.log
.debug("Application not found: {}".format(application_name
))
1462 async def _juju_destroy_machine(
1463 self
, model_name
: str, machine_id
: str, total_timeout
: float = None
1467 "Destroying machine {} in model {}".format(machine_id
, model_name
)
1470 if total_timeout
is None:
1471 total_timeout
= 3600
1473 # get juju model and observer
1474 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1475 observer
= self
.juju_observers
[model_name
]
1477 machines
= await model
.get_machines()
1478 if machine_id
in machines
:
1479 machine
= model
.machines
[machine_id
]
1480 observer
.unregister_machine(machine_id
)
1481 # TODO: change this by machine.is_manual when this is upstreamed:
1482 # https://github.com/juju/python-libjuju/pull/396
1483 if "instance-id" in machine
.safe_data
and machine
.safe_data
[
1485 ].startswith("manual:"):
1486 self
.log
.debug("machine.destroy(force=True) started.")
1487 await machine
.destroy(force
=True)
1488 self
.log
.debug("machine.destroy(force=True) passed.")
1490 end
= time
.time() + total_timeout
1491 # wait for machine removal
1492 machines
= await model
.get_machines()
1493 while machine_id
in machines
and time
.time() < end
:
1495 "Waiting for machine {} is destroyed".format(machine_id
)
1497 await asyncio
.sleep(0.5)
1498 machines
= await model
.get_machines()
1499 self
.log
.debug("Machine destroyed: {}".format(machine_id
))
1501 self
.log
.debug("Machine not found: {}".format(machine_id
))
1503 async def _juju_destroy_model(self
, model_name
: str, total_timeout
: float = None):
1505 self
.log
.debug("Destroying model {}".format(model_name
))
1507 if total_timeout
is None:
1508 total_timeout
= 3600
1509 end
= time
.time() + total_timeout
1511 model
= await self
._juju
_get
_model
(model_name
=model_name
)
1514 raise N2VCNotFound(message
="Model {} does not exist".format(model_name
))
1516 uuid
= model
.info
.uuid
1518 # destroy applications
1519 for application_name
in model
.applications
:
1521 await self
._juju
_destroy
_application
(
1522 model_name
=model_name
, application_name
=application_name
1524 except Exception as e
:
1526 "Error destroying application {} in model {}: {}".format(
1527 application_name
, model_name
, e
1532 machines
= await model
.get_machines()
1533 for machine_id
in machines
:
1535 await self
._juju
_destroy
_machine
(
1536 model_name
=model_name
, machine_id
=machine_id
1538 except asyncio
.CancelledError
:
1541 # ignore exceptions destroying machine
1544 await self
._juju
_disconnect
_model
(model_name
=model_name
)
1546 self
.log
.debug("destroying model {}...".format(model_name
))
1547 await self
.controller
.destroy_model(uuid
)
1548 # self.log.debug('model destroy requested {}'.format(model_name))
1550 # wait for model is completely destroyed
1551 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
1553 while time
.time() < end
:
1555 # await self.controller.get_model(uuid)
1556 models
= await self
.controller
.list_models()
1557 if model_name
not in models
:
1559 "The model {} ({}) was destroyed".format(model_name
, uuid
)
1562 except asyncio
.CancelledError
:
1564 except Exception as e
:
1566 await asyncio
.sleep(5)
1567 raise N2VCException(
1568 "Timeout waiting for model {} to be destroyed {}".format(
1569 model_name
, last_exception
1573 async def _juju_login(self
):
1574 """Connect to juju controller
1578 # if already authenticated, exit function
1579 if self
._authenticated
:
1582 # if connecting, wait for finish
1583 # another task could be trying to connect in parallel
1584 while self
._connecting
:
1585 await asyncio
.sleep(0.1)
1587 # double check after other task has finished
1588 if self
._authenticated
:
1592 self
._connecting
= True
1594 "connecting to juju controller: {} {}:{}{}".format(
1597 self
.secret
[:8] + "...",
1598 " with ca_cert" if self
.ca_cert
else "",
1602 # Create controller object
1603 self
.controller
= Controller(loop
=self
.loop
)
1604 # Connect to controller
1605 await self
.controller
.connect(
1607 username
=self
.username
,
1608 password
=self
.secret
,
1609 cacert
=self
.ca_cert
,
1611 self
._authenticated
= True
1612 self
.log
.info("juju controller connected")
1613 except Exception as e
:
1614 message
= "Exception connecting to juju: {}".format(e
)
1615 self
.log
.error(message
)
1616 raise N2VCConnectionException(message
=message
, url
=self
.url
)
1618 self
._connecting
= False
1620 async def _juju_logout(self
):
1621 """Logout of the Juju controller."""
1622 if not self
._authenticated
:
1625 # disconnect all models
1626 for model_name
in self
.juju_models
:
1628 await self
._juju
_disconnect
_model
(model_name
)
1629 except Exception as e
:
1631 "Error disconnecting model {} : {}".format(model_name
, e
)
1633 # continue with next model...
1635 self
.log
.info("Disconnecting controller")
1637 await self
.controller
.disconnect()
1638 except Exception as e
:
1639 raise N2VCConnectionException(
1640 message
="Error disconnecting controller: {}".format(e
), url
=self
.url
1643 self
.controller
= None
1644 self
._authenticated
= False
1645 self
.log
.info("disconnected")
1647 async def _juju_disconnect_model(self
, model_name
: str):
1648 self
.log
.debug("Disconnecting model {}".format(model_name
))
1649 if model_name
in self
.juju_models
:
1650 await self
.juju_models
[model_name
].disconnect()
1651 self
.juju_models
[model_name
] = None
1652 self
.juju_observers
[model_name
] = None
1654 self
.warning("Cannot disconnect model: {}".format(model_name
))
1656 def _create_juju_public_key(self
):
1657 """Recreate the Juju public key on lcm container, if needed
1658 Certain libjuju commands expect to be run from the same machine as Juju
1659 is bootstrapped to. This method will write the public key to disk in
1660 that location: ~/.local/share/juju/ssh/juju_id_rsa.pub
1663 # Make sure that we have a public key before writing to disk
1664 if self
.public_key
is None or len(self
.public_key
) == 0:
1665 if "OSMLCM_VCA_PUBKEY" in os
.environ
:
1666 self
.public_key
= os
.getenv("OSMLCM_VCA_PUBKEY", "")
1667 if len(self
.public_key
) == 0:
1672 pk_path
= "{}/.local/share/juju/ssh".format(os
.path
.expanduser("~"))
1673 file_path
= "{}/juju_id_rsa.pub".format(pk_path
)
1675 "writing juju public key to file:\n{}\npublic key: {}".format(
1676 file_path
, self
.public_key
1679 if not os
.path
.exists(pk_path
):
1680 # create path and write file
1681 os
.makedirs(pk_path
)
1682 with
open(file_path
, "w") as f
:
1683 self
.log
.debug("Creating juju public key file: {}".format(file_path
))
1684 f
.write(self
.public_key
)
1686 self
.log
.debug("juju public key file already exists: {}".format(file_path
))
1689 def _format_model_name(name
: str) -> str:
1690 """Format the name of the model.
1692 Model names may only contain lowercase letters, digits and hyphens
1695 return name
.replace("_", "-").replace(" ", "-").lower()
1698 def _format_app_name(name
: str) -> str:
1699 """Format the name of the application (in order to assure valid application name).
1701 Application names have restrictions (run juju deploy --help):
1702 - contains lowercase letters 'a'-'z'
1703 - contains numbers '0'-'9'
1704 - contains hyphens '-'
1705 - starts with a lowercase letter
1706 - not two or more consecutive hyphens
1707 - after a hyphen, not a group with all numbers
1710 def all_numbers(s
: str) -> bool:
1716 new_name
= name
.replace("_", "-")
1717 new_name
= new_name
.replace(" ", "-")
1718 new_name
= new_name
.lower()
1719 while new_name
.find("--") >= 0:
1720 new_name
= new_name
.replace("--", "-")
1721 groups
= new_name
.split("-")
1723 # find 'all numbers' groups and prefix them with a letter
1725 for i
in range(len(groups
)):
1727 if all_numbers(group
):
1733 if app_name
[0].isdigit():
1734 app_name
= "z" + app_name