1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.client
._definitions
import FullStatus
26 from n2vc
.juju_watcher
import JujuModelWatcher
27 from n2vc
.provisioner
import AsyncSSHProvisioner
28 from n2vc
.n2vc_conn
import N2VCConnector
29 from n2vc
.exceptions
import (
31 JujuApplicationNotFound
,
32 JujuModelAlreadyExists
,
33 JujuControllerFailedConnecting
,
34 JujuApplicationExists
,
36 from n2vc
.utils
import DB_DATA
37 from osm_common
.dbbase
import DbException
48 loop
: asyncio
.AbstractEventLoop
= None,
49 log
: logging
.Logger
= None,
51 n2vc
: N2VCConnector
= None,
52 apt_mirror
: str = None,
53 enable_os_upgrade
: bool = True,
58 :param: endpoint: Endpoint of the juju controller (host:port)
59 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
60 :param: username: Juju username
61 :param: password: Juju password
62 :param: cacert: Juju CA Certificate
63 :param: loop: Asyncio loop
66 :param: n2vc: N2VC object
67 :param: apt_mirror: APT Mirror
68 :param: enable_os_upgrade: Enable OS Upgrade
71 self
.log
= log
or logging
.getLogger("Libjuju")
73 self
.endpoints
= self
._get
_api
_endpoints
_db
() or [endpoint
]
74 self
.api_proxy
= api_proxy
75 self
.username
= username
76 self
.password
= password
78 self
.loop
= loop
or asyncio
.get_event_loop()
81 # Generate config for models
82 self
.model_config
= {}
84 self
.model_config
["apt-mirror"] = apt_mirror
85 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
86 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
88 self
.loop
.set_exception_handler(self
.handle_exception
)
89 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
92 self
.log
.debug("Libjuju initialized!")
94 self
.health_check_task
= self
.loop
.create_task(self
.health_check())
96 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
100 :param: timeout: Time in seconds to wait for controller to connect
104 controller
= Controller(loop
=self
.loop
)
105 await asyncio
.wait_for(
107 endpoint
=self
.endpoints
,
108 username
=self
.username
,
109 password
=self
.password
,
114 endpoints
= await controller
.api_endpoints
115 if self
.endpoints
!= endpoints
:
116 self
.endpoints
= endpoints
117 self
._update
_api
_endpoints
_db
(self
.endpoints
)
119 except asyncio
.CancelledError
as e
:
121 except Exception as e
:
123 "Failed connecting to controller: {}...".format(self
.endpoints
)
126 await self
.disconnect_controller(controller
)
127 raise JujuControllerFailedConnecting(e
)
129 async def disconnect(self
):
131 # Cancel health check task
132 self
.health_check_task
.cancel()
133 self
.log
.debug("Libjuju disconnected!")
135 async def disconnect_model(self
, model
: Model
):
139 :param: model: Model that will be disconnected
141 await model
.disconnect()
143 async def disconnect_controller(self
, controller
: Controller
):
145 Disconnect controller
147 :param: controller: Controller that will be disconnected
149 await controller
.disconnect()
151 async def add_model(self
, model_name
: str, cloud_name
: str):
155 :param: model_name: Model name
156 :param: cloud_name: Cloud name
160 controller
= await self
.get_controller()
163 # Raise exception if model already exists
164 if await self
.model_exists(model_name
, controller
=controller
):
165 raise JujuModelAlreadyExists(
166 "Model {} already exists.".format(model_name
)
169 # Block until other workers have finished model creation
170 while self
.creating_model
.locked():
171 await asyncio
.sleep(0.1)
173 # If the model exists, return it from the controller
174 if model_name
in self
.models
:
178 async with self
.creating_model
:
179 self
.log
.debug("Creating model {}".format(model_name
))
180 model
= await controller
.add_model(
182 config
=self
.model_config
,
183 cloud_name
=cloud_name
,
184 credential_name
=cloud_name
,
186 self
.models
.add(model_name
)
189 await self
.disconnect_model(model
)
190 await self
.disconnect_controller(controller
)
193 self
, controller
: Controller
, model_name
: str, id=None
196 Get model from controller
198 :param: controller: Controller
199 :param: model_name: Model name
201 :return: Model: The created Juju model object
203 return await controller
.get_model(model_name
)
205 async def model_exists(
206 self
, model_name
: str, controller
: Controller
= None
209 Check if model exists
211 :param: controller: Controller
212 :param: model_name: Model name
216 need_to_disconnect
= False
218 # Get controller if not passed
220 controller
= await self
.get_controller()
221 need_to_disconnect
= True
223 # Check if model exists
225 return model_name
in await controller
.list_models()
227 if need_to_disconnect
:
228 await self
.disconnect_controller(controller
)
230 async def get_model_status(self
, model_name
: str) -> FullStatus
:
234 :param: model_name: Model name
236 :return: Full status object
238 controller
= await self
.get_controller()
239 model
= await self
.get_model(controller
, model_name
)
241 return await model
.get_status()
243 await self
.disconnect_model(model
)
244 await self
.disconnect_controller(controller
)
246 async def create_machine(
249 machine_id
: str = None,
250 db_dict
: dict = None,
251 progress_timeout
: float = None,
252 total_timeout
: float = None,
253 series
: str = "xenial",
254 ) -> (Machine
, bool):
258 :param: model_name: Model name
259 :param: machine_id: Machine id
260 :param: db_dict: Dictionary with data of the DB to write the updates
261 :param: progress_timeout: Maximum time between two updates in the model
262 :param: total_timeout: Timeout for the entity to be active
264 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
265 if the machine is new or it already existed
271 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
275 controller
= await self
.get_controller()
278 model
= await self
.get_model(controller
, model_name
)
280 if machine_id
is not None:
282 "Searching machine (id={}) in model {}".format(
283 machine_id
, model_name
287 # Get machines from model and get the machine with machine_id if exists
288 machines
= await model
.get_machines()
289 if machine_id
in machines
:
291 "Machine (id={}) found in model {}".format(
292 machine_id
, model_name
295 machine
= model
.machines
[machine_id
]
297 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
300 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
303 machine
= await model
.add_machine(
304 spec
=None, constraints
=None, disks
=None, series
=series
308 # Wait until the machine is ready
310 "Wait until machine {} is ready in model {}".format(
311 machine
.entity_id
, model_name
314 await JujuModelWatcher
.wait_for(
317 progress_timeout
=progress_timeout
,
318 total_timeout
=total_timeout
,
323 await self
.disconnect_model(model
)
324 await self
.disconnect_controller(controller
)
327 "Machine {} ready at {} in model {}".format(
328 machine
.entity_id
, machine
.dns_name
, model_name
333 async def provision_machine(
338 private_key_path
: str,
339 db_dict
: dict = None,
340 progress_timeout
: float = None,
341 total_timeout
: float = None,
344 Manually provisioning of a machine
346 :param: model_name: Model name
347 :param: hostname: IP to access the machine
348 :param: username: Username to login to the machine
349 :param: private_key_path: Local path for the private key
350 :param: db_dict: Dictionary with data of the DB to write the updates
351 :param: progress_timeout: Maximum time between two updates in the model
352 :param: total_timeout: Timeout for the entity to be active
354 :return: (Entity): Machine id
357 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
358 model_name
, hostname
, username
363 controller
= await self
.get_controller()
366 model
= await self
.get_model(controller
, model_name
)
370 provisioner
= AsyncSSHProvisioner(
373 private_key_path
=private_key_path
,
378 params
= await provisioner
.provision_machine()
380 params
.jobs
= ["JobHostUnits"]
382 self
.log
.debug("Adding machine to model")
383 connection
= model
.connection()
384 client_facade
= client
.ClientFacade
.from_connection(connection
)
386 results
= await client_facade
.AddMachines(params
=[params
])
387 error
= results
.machines
[0].error
390 msg
= "Error adding machine: {}".format(error
.message
)
391 self
.log
.error(msg
=msg
)
392 raise ValueError(msg
)
394 machine_id
= results
.machines
[0].machine
396 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
397 asyncio
.ensure_future(
398 provisioner
.install_agent(
399 connection
=connection
,
401 machine_id
=machine_id
,
408 machine_list
= await model
.get_machines()
409 if machine_id
in machine_list
:
410 self
.log
.debug("Machine {} found in model!".format(machine_id
))
411 machine
= model
.machines
.get(machine_id
)
413 await asyncio
.sleep(2)
416 msg
= "Machine {} not found in model".format(machine_id
)
417 self
.log
.error(msg
=msg
)
418 raise JujuMachineNotFound(msg
)
421 "Wait until machine {} is ready in model {}".format(
422 machine
.entity_id
, model_name
425 await JujuModelWatcher
.wait_for(
428 progress_timeout
=progress_timeout
,
429 total_timeout
=total_timeout
,
433 except Exception as e
:
436 await self
.disconnect_model(model
)
437 await self
.disconnect_controller(controller
)
440 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
445 async def deploy_charm(
447 application_name
: str,
451 db_dict
: dict = None,
452 progress_timeout
: float = None,
453 total_timeout
: float = None,
459 :param: application_name: Application name
460 :param: path: Local path to the charm
461 :param: model_name: Model name
462 :param: machine_id ID of the machine
463 :param: db_dict: Dictionary with data of the DB to write the updates
464 :param: progress_timeout: Maximum time between two updates in the model
465 :param: total_timeout: Timeout for the entity to be active
466 :param: config: Config for the charm
467 :param: series: Series of the charm
469 :return: (juju.application.Application): Juju application
472 "Deploying charm {} to machine {} in model ~{}".format(
473 application_name
, machine_id
, model_name
476 self
.log
.debug("charm: {}".format(path
))
479 controller
= await self
.get_controller()
482 model
= await self
.get_model(controller
, model_name
)
486 if application_name
not in model
.applications
:
488 if machine_id
is not None:
489 if machine_id
not in model
.machines
:
490 msg
= "Machine {} not found in model".format(machine_id
)
491 self
.log
.error(msg
=msg
)
492 raise JujuMachineNotFound(msg
)
493 machine
= model
.machines
[machine_id
]
494 series
= machine
.series
496 application
= await model
.deploy(
498 application_name
=application_name
,
507 "Wait until application {} is ready in model {}".format(
508 application_name
, model_name
511 await JujuModelWatcher
.wait_for(
514 progress_timeout
=progress_timeout
,
515 total_timeout
=total_timeout
,
520 "Application {} is ready in model {}".format(
521 application_name
, model_name
525 raise JujuApplicationExists(
526 "Application {} exists".format(application_name
)
529 await self
.disconnect_model(model
)
530 await self
.disconnect_controller(controller
)
534 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
537 :param: model: Model object
538 :param: application_name: Application name
540 :return: juju.application.Application (or None if it doesn't exist)
542 if model
.applications
and application_name
in model
.applications
:
543 return model
.applications
[application_name
]
545 async def execute_action(
547 application_name
: str,
550 db_dict
: dict = None,
551 progress_timeout
: float = None,
552 total_timeout
: float = None,
557 :param: application_name: Application name
558 :param: model_name: Model name
559 :param: cloud_name: Cloud name
560 :param: action_name: Name of the action
561 :param: db_dict: Dictionary with data of the DB to write the updates
562 :param: progress_timeout: Maximum time between two updates in the model
563 :param: total_timeout: Timeout for the entity to be active
565 :return: (str, str): (output and status)
568 "Executing action {} using params {}".format(action_name
, kwargs
)
571 controller
= await self
.get_controller()
574 model
= await self
.get_model(controller
, model_name
)
578 application
= self
._get
_application
(
579 model
, application_name
=application_name
,
581 if application
is None:
582 raise JujuApplicationNotFound("Cannot execute action")
586 for u
in application
.units
:
587 if await u
.is_leader_from_status():
590 raise Exception("Cannot execute action: leader unit not found")
592 actions
= await application
.get_actions()
594 if action_name
not in actions
:
596 "Action {} not in available actions".format(action_name
)
599 action
= await unit
.run_action(action_name
, **kwargs
)
602 "Wait until action {} is completed in application {} (model={})".format(
603 action_name
, application_name
, model_name
606 await JujuModelWatcher
.wait_for(
609 progress_timeout
=progress_timeout
,
610 total_timeout
=total_timeout
,
615 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
616 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
618 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
622 "Action {} completed with status {} in application {} (model={})".format(
623 action_name
, action
.status
, application_name
, model_name
626 except Exception as e
:
629 await self
.disconnect_model(model
)
630 await self
.disconnect_controller(controller
)
632 return output
, status
634 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
635 """Get list of actions
637 :param: application_name: Application name
638 :param: model_name: Model name
640 :return: Dict with this format
642 "action_name": "Description of the action",
647 "Getting list of actions for application {}".format(application_name
)
651 controller
= await self
.get_controller()
654 model
= await self
.get_model(controller
, model_name
)
658 application
= self
._get
_application
(
659 model
, application_name
=application_name
,
662 # Return list of actions
663 return await application
.get_actions()
666 # Disconnect from model and controller
667 await self
.disconnect_model(model
)
668 await self
.disconnect_controller(controller
)
670 async def add_relation(
673 application_name_1
: str,
674 application_name_2
: str,
680 :param: model_name: Model name
681 :param: application_name_1 First application name
682 :param: application_name_2: Second application name
683 :param: relation_1: First relation name
684 :param: relation_2: Second relation name
687 self
.log
.debug("Adding relation: {} -> {}".format(relation_1
, relation_2
))
690 controller
= await self
.get_controller()
693 model
= await self
.get_model(controller
, model_name
)
695 # Build relation strings
696 r1
= "{}:{}".format(application_name_1
, relation_1
)
697 r2
= "{}:{}".format(application_name_2
, relation_2
)
701 await model
.add_relation(relation1
=r1
, relation2
=r2
)
702 except JujuAPIError
as e
:
703 if "not found" in e
.message
:
704 self
.log
.warning("Relation not found: {}".format(e
.message
))
706 if "already exists" in e
.message
:
707 self
.log
.warning("Relation already exists: {}".format(e
.message
))
709 # another exception, raise it
712 await self
.disconnect_model(model
)
713 await self
.disconnect_controller(controller
)
715 async def destroy_model(
716 self
, model_name
: str, total_timeout
: float,
721 :param: model_name: Model name
722 :param: total_timeout: Timeout
725 controller
= await self
.get_controller()
726 model
= await self
.get_model(controller
, model_name
)
728 self
.log
.debug("Destroying model {}".format(model_name
))
729 uuid
= model
.info
.uuid
731 # Destroy applications
732 for application_name
in model
.applications
:
734 await self
.destroy_application(
735 model
, application_name
=application_name
,
737 except Exception as e
:
739 "Error destroying application {} in model {}: {}".format(
740 application_name
, model_name
, e
745 machines
= await model
.get_machines()
746 for machine_id
in machines
:
748 await self
.destroy_machine(
749 model
, machine_id
=machine_id
, total_timeout
=total_timeout
,
751 except asyncio
.CancelledError
:
757 await self
.disconnect_model(model
)
760 if model_name
in self
.models
:
761 self
.models
.remove(model_name
)
763 await controller
.destroy_model(uuid
)
765 # Wait until model is destroyed
766 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
769 if total_timeout
is None:
771 end
= time
.time() + total_timeout
772 while time
.time() < end
:
774 models
= await controller
.list_models()
775 if model_name
not in models
:
777 "The model {} ({}) was destroyed".format(model_name
, uuid
)
780 except asyncio
.CancelledError
:
782 except Exception as e
:
784 await asyncio
.sleep(5)
786 "Timeout waiting for model {} to be destroyed {}".format(
787 model_name
, last_exception
791 await self
.disconnect_controller(controller
)
793 async def destroy_application(self
, model
: Model
, application_name
: str):
797 :param: model: Model object
798 :param: application_name: Application name
801 "Destroying application {} in model {}".format(
802 application_name
, model
.info
.name
805 application
= model
.applications
.get(application_name
)
807 await application
.destroy()
809 self
.log
.warning("Application not found: {}".format(application_name
))
811 async def destroy_machine(
812 self
, model
: Model
, machine_id
: str, total_timeout
: float = 3600
817 :param: model: Model object
818 :param: machine_id: Machine id
819 :param: total_timeout: Timeout in seconds
821 machines
= await model
.get_machines()
822 if machine_id
in machines
:
823 machine
= model
.machines
[machine_id
]
824 # TODO: change this by machine.is_manual when this is upstreamed:
825 # https://github.com/juju/python-libjuju/pull/396
826 if "instance-id" in machine
.safe_data
and machine
.safe_data
[
828 ].startswith("manual:"):
829 await machine
.destroy(force
=True)
832 end
= time
.time() + total_timeout
834 # wait for machine removal
835 machines
= await model
.get_machines()
836 while machine_id
in machines
and time
.time() < end
:
838 "Waiting for machine {} is destroyed".format(machine_id
)
840 await asyncio
.sleep(0.5)
841 machines
= await model
.get_machines()
842 self
.log
.debug("Machine destroyed: {}".format(machine_id
))
844 self
.log
.debug("Machine not found: {}".format(machine_id
))
846 async def configure_application(
847 self
, model_name
: str, application_name
: str, config
: dict = None
849 """Configure application
851 :param: model_name: Model name
852 :param: application_name: Application name
853 :param: config: Config to apply to the charm
855 self
.log
.debug("Configuring application {}".format(application_name
))
859 controller
= await self
.get_controller()
860 model
= await self
.get_model(controller
, model_name
)
861 application
= self
._get
_application
(
862 model
, application_name
=application_name
,
864 await application
.set_config(config
)
866 await self
.disconnect_model(model
)
867 await self
.disconnect_controller(controller
)
869 def _get_api_endpoints_db(self
) -> [str]:
871 Get API Endpoints from DB
873 :return: List of API endpoints
875 self
.log
.debug("Getting endpoints from database")
877 juju_info
= self
.db
.get_one(
878 DB_DATA
.api_endpoints
.table
,
879 q_filter
=DB_DATA
.api_endpoints
.filter,
882 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
883 return juju_info
[DB_DATA
.api_endpoints
.key
]
885 def _update_api_endpoints_db(self
, endpoints
: [str]):
887 Update API endpoints in Database
889 :param: List of endpoints
891 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
893 juju_info
= self
.db
.get_one(
894 DB_DATA
.api_endpoints
.table
,
895 q_filter
=DB_DATA
.api_endpoints
.filter,
898 # If it doesn't, then create it
902 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
904 except DbException
as e
:
905 # Racing condition: check if another N2VC worker has created it
906 juju_info
= self
.db
.get_one(
907 DB_DATA
.api_endpoints
.table
,
908 q_filter
=DB_DATA
.api_endpoints
.filter,
914 DB_DATA
.api_endpoints
.table
,
915 DB_DATA
.api_endpoints
.filter,
916 {DB_DATA
.api_endpoints
.key
: endpoints
},
919 def handle_exception(self
, loop
, context
):
920 # All unhandled exceptions by libjuju are handled here.
923 async def health_check(self
, interval
: float = 300.0):
925 Health check to make sure controller and controller_model connections are OK
927 :param: interval: Time in seconds between checks
931 controller
= await self
.get_controller()
932 # self.log.debug("VCA is alive")
933 except Exception as e
:
934 self
.log
.error("Health check to VCA failed: {}".format(e
))
936 await self
.disconnect_controller(controller
)
937 await asyncio
.sleep(interval
)