1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.client
._definitions
import FullStatus
26 from n2vc
.juju_watcher
import JujuModelWatcher
27 from n2vc
.provisioner
import AsyncSSHProvisioner
28 from n2vc
.n2vc_conn
import N2VCConnector
29 from n2vc
.exceptions
import (
31 JujuApplicationNotFound
,
32 JujuModelAlreadyExists
,
33 JujuControllerFailedConnecting
,
34 JujuApplicationExists
,
36 from n2vc
.utils
import DB_DATA
37 from osm_common
.dbbase
import DbException
48 loop
: asyncio
.AbstractEventLoop
= None,
49 log
: logging
.Logger
= None,
51 n2vc
: N2VCConnector
= None,
52 apt_mirror
: str = None,
53 enable_os_upgrade
: bool = True,
58 :param: endpoint: Endpoint of the juju controller (host:port)
59 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
60 :param: username: Juju username
61 :param: password: Juju password
62 :param: cacert: Juju CA Certificate
63 :param: loop: Asyncio loop
66 :param: n2vc: N2VC object
67 :param: apt_mirror: APT Mirror
68 :param: enable_os_upgrade: Enable OS Upgrade
71 self
.log
= log
or logging
.getLogger("Libjuju")
73 self
.endpoints
= self
._get
_api
_endpoints
_db
() or [endpoint
]
74 self
.api_proxy
= api_proxy
75 self
.username
= username
76 self
.password
= password
78 self
.loop
= loop
or asyncio
.get_event_loop()
81 # Generate config for models
82 self
.model_config
= {}
84 self
.model_config
["apt-mirror"] = apt_mirror
85 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
86 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
88 self
.loop
.set_exception_handler(self
.handle_exception
)
89 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
92 self
.log
.debug("Libjuju initialized!")
94 self
.health_check_task
= self
.loop
.create_task(self
.health_check())
96 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
100 :param: timeout: Time in seconds to wait for controller to connect
104 controller
= Controller(loop
=self
.loop
)
105 await asyncio
.wait_for(
107 endpoint
=self
.endpoints
,
108 username
=self
.username
,
109 password
=self
.password
,
114 endpoints
= await controller
.api_endpoints
115 if self
.endpoints
!= endpoints
:
116 self
.endpoints
= endpoints
117 self
._update
_api
_endpoints
_db
(self
.endpoints
)
119 except asyncio
.CancelledError
as e
:
121 except Exception as e
:
123 "Failed connecting to controller: {}...".format(self
.endpoints
)
126 await self
.disconnect_controller(controller
)
127 raise JujuControllerFailedConnecting(e
)
129 async def disconnect(self
):
131 # Cancel health check task
132 self
.health_check_task
.cancel()
133 self
.log
.debug("Libjuju disconnected!")
135 async def disconnect_model(self
, model
: Model
):
139 :param: model: Model that will be disconnected
141 await model
.disconnect()
143 async def disconnect_controller(self
, controller
: Controller
):
145 Disconnect controller
147 :param: controller: Controller that will be disconnected
149 await controller
.disconnect()
151 async def add_model(self
, model_name
: str, cloud_name
: str):
155 :param: model_name: Model name
156 :param: cloud_name: Cloud name
160 controller
= await self
.get_controller()
163 # Raise exception if model already exists
164 if await self
.model_exists(model_name
, controller
=controller
):
165 raise JujuModelAlreadyExists(
166 "Model {} already exists.".format(model_name
)
169 # Block until other workers have finished model creation
170 while self
.creating_model
.locked():
171 await asyncio
.sleep(0.1)
173 # If the model exists, return it from the controller
174 if model_name
in self
.models
:
178 async with self
.creating_model
:
179 self
.log
.debug("Creating model {}".format(model_name
))
180 model
= await controller
.add_model(
182 config
=self
.model_config
,
183 cloud_name
=cloud_name
,
184 credential_name
=cloud_name
,
186 self
.models
.add(model_name
)
189 await self
.disconnect_model(model
)
190 await self
.disconnect_controller(controller
)
193 self
, controller
: Controller
, model_name
: str, id=None
196 Get model from controller
198 :param: controller: Controller
199 :param: model_name: Model name
201 :return: Model: The created Juju model object
203 return await controller
.get_model(model_name
)
205 async def model_exists(
206 self
, model_name
: str, controller
: Controller
= None
209 Check if model exists
211 :param: controller: Controller
212 :param: model_name: Model name
216 need_to_disconnect
= False
218 # Get controller if not passed
220 controller
= await self
.get_controller()
221 need_to_disconnect
= True
223 # Check if model exists
225 return model_name
in await controller
.list_models()
227 if need_to_disconnect
:
228 await self
.disconnect_controller(controller
)
230 async def get_model_status(self
, model_name
: str) -> FullStatus
:
234 :param: model_name: Model name
236 :return: Full status object
238 controller
= await self
.get_controller()
239 model
= await self
.get_model(controller
, model_name
)
241 return await model
.get_status()
243 await self
.disconnect_model(model
)
244 await self
.disconnect_controller(controller
)
246 async def create_machine(
249 machine_id
: str = None,
250 db_dict
: dict = None,
251 progress_timeout
: float = None,
252 total_timeout
: float = None,
253 series
: str = "xenial",
255 ) -> (Machine
, bool):
259 :param: model_name: Model name
260 :param: machine_id: Machine id
261 :param: db_dict: Dictionary with data of the DB to write the updates
262 :param: progress_timeout: Maximum time between two updates in the model
263 :param: total_timeout: Timeout for the entity to be active
264 :param: series: Series of the machine (xenial, bionic, focal, ...)
265 :param: wait: Wait until machine is ready
267 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
268 if the machine is new or it already existed
274 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
278 controller
= await self
.get_controller()
281 model
= await self
.get_model(controller
, model_name
)
283 if machine_id
is not None:
285 "Searching machine (id={}) in model {}".format(
286 machine_id
, model_name
290 # Get machines from model and get the machine with machine_id if exists
291 machines
= await model
.get_machines()
292 if machine_id
in machines
:
294 "Machine (id={}) found in model {}".format(
295 machine_id
, model_name
298 machine
= model
.machines
[machine_id
]
300 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
303 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
306 machine
= await model
.add_machine(
307 spec
=None, constraints
=None, disks
=None, series
=series
311 # Wait until the machine is ready
313 "Wait until machine {} is ready in model {}".format(
314 machine
.entity_id
, model_name
318 await JujuModelWatcher
.wait_for(
321 progress_timeout
=progress_timeout
,
322 total_timeout
=total_timeout
,
327 await self
.disconnect_model(model
)
328 await self
.disconnect_controller(controller
)
331 "Machine {} ready at {} in model {}".format(
332 machine
.entity_id
, machine
.dns_name
, model_name
337 async def provision_machine(
342 private_key_path
: str,
343 db_dict
: dict = None,
344 progress_timeout
: float = None,
345 total_timeout
: float = None,
348 Manually provisioning of a machine
350 :param: model_name: Model name
351 :param: hostname: IP to access the machine
352 :param: username: Username to login to the machine
353 :param: private_key_path: Local path for the private key
354 :param: db_dict: Dictionary with data of the DB to write the updates
355 :param: progress_timeout: Maximum time between two updates in the model
356 :param: total_timeout: Timeout for the entity to be active
358 :return: (Entity): Machine id
361 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
362 model_name
, hostname
, username
367 controller
= await self
.get_controller()
370 model
= await self
.get_model(controller
, model_name
)
374 provisioner
= AsyncSSHProvisioner(
377 private_key_path
=private_key_path
,
382 params
= await provisioner
.provision_machine()
384 params
.jobs
= ["JobHostUnits"]
386 self
.log
.debug("Adding machine to model")
387 connection
= model
.connection()
388 client_facade
= client
.ClientFacade
.from_connection(connection
)
390 results
= await client_facade
.AddMachines(params
=[params
])
391 error
= results
.machines
[0].error
394 msg
= "Error adding machine: {}".format(error
.message
)
395 self
.log
.error(msg
=msg
)
396 raise ValueError(msg
)
398 machine_id
= results
.machines
[0].machine
400 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
401 asyncio
.ensure_future(
402 provisioner
.install_agent(
403 connection
=connection
,
405 machine_id
=machine_id
,
412 machine_list
= await model
.get_machines()
413 if machine_id
in machine_list
:
414 self
.log
.debug("Machine {} found in model!".format(machine_id
))
415 machine
= model
.machines
.get(machine_id
)
417 await asyncio
.sleep(2)
420 msg
= "Machine {} not found in model".format(machine_id
)
421 self
.log
.error(msg
=msg
)
422 raise JujuMachineNotFound(msg
)
425 "Wait until machine {} is ready in model {}".format(
426 machine
.entity_id
, model_name
429 await JujuModelWatcher
.wait_for(
432 progress_timeout
=progress_timeout
,
433 total_timeout
=total_timeout
,
437 except Exception as e
:
440 await self
.disconnect_model(model
)
441 await self
.disconnect_controller(controller
)
444 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
449 async def deploy_charm(
451 application_name
: str,
455 db_dict
: dict = None,
456 progress_timeout
: float = None,
457 total_timeout
: float = None,
464 :param: application_name: Application name
465 :param: path: Local path to the charm
466 :param: model_name: Model name
467 :param: machine_id ID of the machine
468 :param: db_dict: Dictionary with data of the DB to write the updates
469 :param: progress_timeout: Maximum time between two updates in the model
470 :param: total_timeout: Timeout for the entity to be active
471 :param: config: Config for the charm
472 :param: series: Series of the charm
473 :param: num_units: Number of units
475 :return: (juju.application.Application): Juju application
478 "Deploying charm {} to machine {} in model ~{}".format(
479 application_name
, machine_id
, model_name
482 self
.log
.debug("charm: {}".format(path
))
485 controller
= await self
.get_controller()
488 model
= await self
.get_model(controller
, model_name
)
492 if application_name
not in model
.applications
:
494 if machine_id
is not None:
495 if machine_id
not in model
.machines
:
496 msg
= "Machine {} not found in model".format(machine_id
)
497 self
.log
.error(msg
=msg
)
498 raise JujuMachineNotFound(msg
)
499 machine
= model
.machines
[machine_id
]
500 series
= machine
.series
502 application
= await model
.deploy(
504 application_name
=application_name
,
513 "Wait until application {} is ready in model {}".format(
514 application_name
, model_name
518 for _
in range(num_units
- 1):
519 m
, _
= await self
.create_machine(model_name
, wait
=False)
520 await application
.add_unit(to
=m
.entity_id
)
522 await JujuModelWatcher
.wait_for(
525 progress_timeout
=progress_timeout
,
526 total_timeout
=total_timeout
,
531 "Application {} is ready in model {}".format(
532 application_name
, model_name
536 raise JujuApplicationExists(
537 "Application {} exists".format(application_name
)
540 await self
.disconnect_model(model
)
541 await self
.disconnect_controller(controller
)
545 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
548 :param: model: Model object
549 :param: application_name: Application name
551 :return: juju.application.Application (or None if it doesn't exist)
553 if model
.applications
and application_name
in model
.applications
:
554 return model
.applications
[application_name
]
556 async def execute_action(
558 application_name
: str,
561 db_dict
: dict = None,
562 progress_timeout
: float = None,
563 total_timeout
: float = None,
568 :param: application_name: Application name
569 :param: model_name: Model name
570 :param: cloud_name: Cloud name
571 :param: action_name: Name of the action
572 :param: db_dict: Dictionary with data of the DB to write the updates
573 :param: progress_timeout: Maximum time between two updates in the model
574 :param: total_timeout: Timeout for the entity to be active
576 :return: (str, str): (output and status)
579 "Executing action {} using params {}".format(action_name
, kwargs
)
582 controller
= await self
.get_controller()
585 model
= await self
.get_model(controller
, model_name
)
589 application
= self
._get
_application
(
590 model
, application_name
=application_name
,
592 if application
is None:
593 raise JujuApplicationNotFound("Cannot execute action")
597 for u
in application
.units
:
598 if await u
.is_leader_from_status():
601 raise Exception("Cannot execute action: leader unit not found")
603 actions
= await application
.get_actions()
605 if action_name
not in actions
:
607 "Action {} not in available actions".format(action_name
)
610 action
= await unit
.run_action(action_name
, **kwargs
)
613 "Wait until action {} is completed in application {} (model={})".format(
614 action_name
, application_name
, model_name
617 await JujuModelWatcher
.wait_for(
620 progress_timeout
=progress_timeout
,
621 total_timeout
=total_timeout
,
626 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
627 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
629 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
633 "Action {} completed with status {} in application {} (model={})".format(
634 action_name
, action
.status
, application_name
, model_name
637 except Exception as e
:
640 await self
.disconnect_model(model
)
641 await self
.disconnect_controller(controller
)
643 return output
, status
645 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
646 """Get list of actions
648 :param: application_name: Application name
649 :param: model_name: Model name
651 :return: Dict with this format
653 "action_name": "Description of the action",
658 "Getting list of actions for application {}".format(application_name
)
662 controller
= await self
.get_controller()
665 model
= await self
.get_model(controller
, model_name
)
669 application
= self
._get
_application
(
670 model
, application_name
=application_name
,
673 # Return list of actions
674 return await application
.get_actions()
677 # Disconnect from model and controller
678 await self
.disconnect_model(model
)
679 await self
.disconnect_controller(controller
)
681 async def add_relation(
684 application_name_1
: str,
685 application_name_2
: str,
691 :param: model_name: Model name
692 :param: application_name_1 First application name
693 :param: application_name_2: Second application name
694 :param: relation_1: First relation name
695 :param: relation_2: Second relation name
698 self
.log
.debug("Adding relation: {} -> {}".format(relation_1
, relation_2
))
701 controller
= await self
.get_controller()
704 model
= await self
.get_model(controller
, model_name
)
706 # Build relation strings
707 r1
= "{}:{}".format(application_name_1
, relation_1
)
708 r2
= "{}:{}".format(application_name_2
, relation_2
)
712 await model
.add_relation(relation1
=r1
, relation2
=r2
)
713 except JujuAPIError
as e
:
714 if "not found" in e
.message
:
715 self
.log
.warning("Relation not found: {}".format(e
.message
))
717 if "already exists" in e
.message
:
718 self
.log
.warning("Relation already exists: {}".format(e
.message
))
720 # another exception, raise it
723 await self
.disconnect_model(model
)
724 await self
.disconnect_controller(controller
)
726 async def destroy_model(self
, model_name
: str, total_timeout
: float):
730 :param: model_name: Model name
731 :param: total_timeout: Timeout
734 controller
= await self
.get_controller()
735 model
= await self
.get_model(controller
, model_name
)
737 self
.log
.debug("Destroying model {}".format(model_name
))
738 uuid
= model
.info
.uuid
740 # Destroy applications
741 for application_name
in model
.applications
:
743 await self
.destroy_application(
744 model
, application_name
=application_name
,
746 except Exception as e
:
748 "Error destroying application {} in model {}: {}".format(
749 application_name
, model_name
, e
754 machines
= await model
.get_machines()
755 for machine_id
in machines
:
757 await self
.destroy_machine(
758 model
, machine_id
=machine_id
, total_timeout
=total_timeout
,
760 except asyncio
.CancelledError
:
766 await self
.disconnect_model(model
)
769 if model_name
in self
.models
:
770 self
.models
.remove(model_name
)
772 await controller
.destroy_model(uuid
)
774 # Wait until model is destroyed
775 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
778 if total_timeout
is None:
780 end
= time
.time() + total_timeout
781 while time
.time() < end
:
783 models
= await controller
.list_models()
784 if model_name
not in models
:
786 "The model {} ({}) was destroyed".format(model_name
, uuid
)
789 except asyncio
.CancelledError
:
791 except Exception as e
:
793 await asyncio
.sleep(5)
795 "Timeout waiting for model {} to be destroyed {}".format(
796 model_name
, last_exception
800 await self
.disconnect_controller(controller
)
802 async def destroy_application(self
, model
: Model
, application_name
: str):
806 :param: model: Model object
807 :param: application_name: Application name
810 "Destroying application {} in model {}".format(
811 application_name
, model
.info
.name
814 application
= model
.applications
.get(application_name
)
816 await application
.destroy()
818 self
.log
.warning("Application not found: {}".format(application_name
))
820 async def destroy_machine(
821 self
, model
: Model
, machine_id
: str, total_timeout
: float = 3600
826 :param: model: Model object
827 :param: machine_id: Machine id
828 :param: total_timeout: Timeout in seconds
830 machines
= await model
.get_machines()
831 if machine_id
in machines
:
832 machine
= model
.machines
[machine_id
]
833 # TODO: change this by machine.is_manual when this is upstreamed:
834 # https://github.com/juju/python-libjuju/pull/396
835 if "instance-id" in machine
.safe_data
and machine
.safe_data
[
837 ].startswith("manual:"):
838 await machine
.destroy(force
=True)
841 end
= time
.time() + total_timeout
843 # wait for machine removal
844 machines
= await model
.get_machines()
845 while machine_id
in machines
and time
.time() < end
:
847 "Waiting for machine {} is destroyed".format(machine_id
)
849 await asyncio
.sleep(0.5)
850 machines
= await model
.get_machines()
851 self
.log
.debug("Machine destroyed: {}".format(machine_id
))
853 self
.log
.debug("Machine not found: {}".format(machine_id
))
855 async def configure_application(
856 self
, model_name
: str, application_name
: str, config
: dict = None
858 """Configure application
860 :param: model_name: Model name
861 :param: application_name: Application name
862 :param: config: Config to apply to the charm
864 self
.log
.debug("Configuring application {}".format(application_name
))
868 controller
= await self
.get_controller()
869 model
= await self
.get_model(controller
, model_name
)
870 application
= self
._get
_application
(
871 model
, application_name
=application_name
,
873 await application
.set_config(config
)
875 await self
.disconnect_model(model
)
876 await self
.disconnect_controller(controller
)
878 def _get_api_endpoints_db(self
) -> [str]:
880 Get API Endpoints from DB
882 :return: List of API endpoints
884 self
.log
.debug("Getting endpoints from database")
886 juju_info
= self
.db
.get_one(
887 DB_DATA
.api_endpoints
.table
,
888 q_filter
=DB_DATA
.api_endpoints
.filter,
891 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
892 return juju_info
[DB_DATA
.api_endpoints
.key
]
894 def _update_api_endpoints_db(self
, endpoints
: [str]):
896 Update API endpoints in Database
898 :param: List of endpoints
900 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
902 juju_info
= self
.db
.get_one(
903 DB_DATA
.api_endpoints
.table
,
904 q_filter
=DB_DATA
.api_endpoints
.filter,
907 # If it doesn't, then create it
911 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
913 except DbException
as e
:
914 # Racing condition: check if another N2VC worker has created it
915 juju_info
= self
.db
.get_one(
916 DB_DATA
.api_endpoints
.table
,
917 q_filter
=DB_DATA
.api_endpoints
.filter,
923 DB_DATA
.api_endpoints
.table
,
924 DB_DATA
.api_endpoints
.filter,
925 {DB_DATA
.api_endpoints
.key
: endpoints
},
928 def handle_exception(self
, loop
, context
):
929 # All unhandled exceptions by libjuju are handled here.
932 async def health_check(self
, interval
: float = 300.0):
934 Health check to make sure controller and controller_model connections are OK
936 :param: interval: Time in seconds between checks
940 controller
= await self
.get_controller()
941 # self.log.debug("VCA is alive")
942 except Exception as e
:
943 self
.log
.error("Health check to VCA failed: {}".format(e
))
945 await self
.disconnect_controller(controller
)
946 await asyncio
.sleep(interval
)
948 async def list_models(self
, contains
: str = None) -> [str]:
949 """List models with certain names
951 :param: contains: String that is contained in model name
953 :retur: [models] Returns list of model names
956 controller
= await self
.get_controller()
958 models
= await controller
.list_models()
960 models
= [model
for model
in models
if contains
in model
]
963 await self
.disconnect_controller(controller
)