1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.client
._definitions
import FullStatus
26 from n2vc
.juju_watcher
import JujuModelWatcher
27 from n2vc
.provisioner
import AsyncSSHProvisioner
28 from n2vc
.n2vc_conn
import N2VCConnector
29 from n2vc
.exceptions
import (
31 JujuApplicationNotFound
,
32 JujuLeaderUnitNotFound
,
34 JujuModelAlreadyExists
,
35 JujuControllerFailedConnecting
,
36 JujuApplicationExists
,
38 from n2vc
.utils
import DB_DATA
39 from osm_common
.dbbase
import DbException
50 loop
: asyncio
.AbstractEventLoop
= None,
51 log
: logging
.Logger
= None,
53 n2vc
: N2VCConnector
= None,
54 apt_mirror
: str = None,
55 enable_os_upgrade
: bool = True,
60 :param: endpoint: Endpoint of the juju controller (host:port)
61 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
62 :param: username: Juju username
63 :param: password: Juju password
64 :param: cacert: Juju CA Certificate
65 :param: loop: Asyncio loop
68 :param: n2vc: N2VC object
69 :param: apt_mirror: APT Mirror
70 :param: enable_os_upgrade: Enable OS Upgrade
73 self
.log
= log
or logging
.getLogger("Libjuju")
75 db_endpoints
= self
._get
_api
_endpoints
_db
()
76 self
.endpoints
= db_endpoints
or [endpoint
]
77 if db_endpoints
is None:
78 self
._update
_api
_endpoints
_db
(self
.endpoints
)
79 self
.api_proxy
= api_proxy
80 self
.username
= username
81 self
.password
= password
83 self
.loop
= loop
or asyncio
.get_event_loop()
86 # Generate config for models
87 self
.model_config
= {}
89 self
.model_config
["apt-mirror"] = apt_mirror
90 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
91 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
93 self
.loop
.set_exception_handler(self
.handle_exception
)
94 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
97 self
.log
.debug("Libjuju initialized!")
99 self
.health_check_task
= self
.loop
.create_task(self
.health_check())
101 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
105 :param: timeout: Time in seconds to wait for controller to connect
109 controller
= Controller(loop
=self
.loop
)
110 await asyncio
.wait_for(
112 endpoint
=self
.endpoints
,
113 username
=self
.username
,
114 password
=self
.password
,
119 endpoints
= await controller
.api_endpoints
120 if self
.endpoints
!= endpoints
:
121 self
.endpoints
= endpoints
122 self
._update
_api
_endpoints
_db
(self
.endpoints
)
124 except asyncio
.CancelledError
as e
:
126 except Exception as e
:
128 "Failed connecting to controller: {}...".format(self
.endpoints
)
131 await self
.disconnect_controller(controller
)
132 raise JujuControllerFailedConnecting(e
)
134 async def disconnect(self
):
136 # Cancel health check task
137 self
.health_check_task
.cancel()
138 self
.log
.debug("Libjuju disconnected!")
140 async def disconnect_model(self
, model
: Model
):
144 :param: model: Model that will be disconnected
146 await model
.disconnect()
148 async def disconnect_controller(self
, controller
: Controller
):
150 Disconnect controller
152 :param: controller: Controller that will be disconnected
154 await controller
.disconnect()
156 async def add_model(self
, model_name
: str, cloud_name
: str):
160 :param: model_name: Model name
161 :param: cloud_name: Cloud name
165 controller
= await self
.get_controller()
168 # Raise exception if model already exists
169 if await self
.model_exists(model_name
, controller
=controller
):
170 raise JujuModelAlreadyExists(
171 "Model {} already exists.".format(model_name
)
174 # Block until other workers have finished model creation
175 while self
.creating_model
.locked():
176 await asyncio
.sleep(0.1)
178 # If the model exists, return it from the controller
179 if model_name
in self
.models
:
183 async with self
.creating_model
:
184 self
.log
.debug("Creating model {}".format(model_name
))
185 model
= await controller
.add_model(
187 config
=self
.model_config
,
188 cloud_name
=cloud_name
,
189 credential_name
=cloud_name
,
191 self
.models
.add(model_name
)
194 await self
.disconnect_model(model
)
195 await self
.disconnect_controller(controller
)
198 self
, controller
: Controller
, model_name
: str, id=None
201 Get model from controller
203 :param: controller: Controller
204 :param: model_name: Model name
206 :return: Model: The created Juju model object
208 return await controller
.get_model(model_name
)
210 async def model_exists(
211 self
, model_name
: str, controller
: Controller
= None
214 Check if model exists
216 :param: controller: Controller
217 :param: model_name: Model name
221 need_to_disconnect
= False
223 # Get controller if not passed
225 controller
= await self
.get_controller()
226 need_to_disconnect
= True
228 # Check if model exists
230 return model_name
in await controller
.list_models()
232 if need_to_disconnect
:
233 await self
.disconnect_controller(controller
)
235 async def models_exist(self
, model_names
: [str]) -> (bool, list):
237 Check if models exists
239 :param: model_names: List of strings with model names
241 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
245 "model_names must be a non-empty array. Given value: {}".format(
249 non_existing_models
= []
250 models
= await self
.list_models()
251 existing_models
= list(set(models
).intersection(model_names
))
252 non_existing_models
= list(set(model_names
) - set(existing_models
))
255 len(non_existing_models
) == 0,
259 async def get_model_status(self
, model_name
: str) -> FullStatus
:
263 :param: model_name: Model name
265 :return: Full status object
267 controller
= await self
.get_controller()
268 model
= await self
.get_model(controller
, model_name
)
270 return await model
.get_status()
272 await self
.disconnect_model(model
)
273 await self
.disconnect_controller(controller
)
275 async def create_machine(
278 machine_id
: str = None,
279 db_dict
: dict = None,
280 progress_timeout
: float = None,
281 total_timeout
: float = None,
282 series
: str = "xenial",
284 ) -> (Machine
, bool):
288 :param: model_name: Model name
289 :param: machine_id: Machine id
290 :param: db_dict: Dictionary with data of the DB to write the updates
291 :param: progress_timeout: Maximum time between two updates in the model
292 :param: total_timeout: Timeout for the entity to be active
293 :param: series: Series of the machine (xenial, bionic, focal, ...)
294 :param: wait: Wait until machine is ready
296 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
297 if the machine is new or it already existed
303 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
307 controller
= await self
.get_controller()
310 model
= await self
.get_model(controller
, model_name
)
312 if machine_id
is not None:
314 "Searching machine (id={}) in model {}".format(
315 machine_id
, model_name
319 # Get machines from model and get the machine with machine_id if exists
320 machines
= await model
.get_machines()
321 if machine_id
in machines
:
323 "Machine (id={}) found in model {}".format(
324 machine_id
, model_name
327 machine
= machines
[machine_id
]
329 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
332 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
335 machine
= await model
.add_machine(
336 spec
=None, constraints
=None, disks
=None, series
=series
340 # Wait until the machine is ready
342 "Wait until machine {} is ready in model {}".format(
343 machine
.entity_id
, model_name
347 await JujuModelWatcher
.wait_for(
350 progress_timeout
=progress_timeout
,
351 total_timeout
=total_timeout
,
356 await self
.disconnect_model(model
)
357 await self
.disconnect_controller(controller
)
360 "Machine {} ready at {} in model {}".format(
361 machine
.entity_id
, machine
.dns_name
, model_name
366 async def provision_machine(
371 private_key_path
: str,
372 db_dict
: dict = None,
373 progress_timeout
: float = None,
374 total_timeout
: float = None,
377 Manually provisioning of a machine
379 :param: model_name: Model name
380 :param: hostname: IP to access the machine
381 :param: username: Username to login to the machine
382 :param: private_key_path: Local path for the private key
383 :param: db_dict: Dictionary with data of the DB to write the updates
384 :param: progress_timeout: Maximum time between two updates in the model
385 :param: total_timeout: Timeout for the entity to be active
387 :return: (Entity): Machine id
390 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
391 model_name
, hostname
, username
396 controller
= await self
.get_controller()
399 model
= await self
.get_model(controller
, model_name
)
403 provisioner
= AsyncSSHProvisioner(
406 private_key_path
=private_key_path
,
411 params
= await provisioner
.provision_machine()
413 params
.jobs
= ["JobHostUnits"]
415 self
.log
.debug("Adding machine to model")
416 connection
= model
.connection()
417 client_facade
= client
.ClientFacade
.from_connection(connection
)
419 results
= await client_facade
.AddMachines(params
=[params
])
420 error
= results
.machines
[0].error
423 msg
= "Error adding machine: {}".format(error
.message
)
424 self
.log
.error(msg
=msg
)
425 raise ValueError(msg
)
427 machine_id
= results
.machines
[0].machine
429 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
430 asyncio
.ensure_future(
431 provisioner
.install_agent(
432 connection
=connection
,
434 machine_id
=machine_id
,
435 proxy
=self
.api_proxy
,
441 machine_list
= await model
.get_machines()
442 if machine_id
in machine_list
:
443 self
.log
.debug("Machine {} found in model!".format(machine_id
))
444 machine
= model
.machines
.get(machine_id
)
446 await asyncio
.sleep(2)
449 msg
= "Machine {} not found in model".format(machine_id
)
450 self
.log
.error(msg
=msg
)
451 raise JujuMachineNotFound(msg
)
454 "Wait until machine {} is ready in model {}".format(
455 machine
.entity_id
, model_name
458 await JujuModelWatcher
.wait_for(
461 progress_timeout
=progress_timeout
,
462 total_timeout
=total_timeout
,
466 except Exception as e
:
469 await self
.disconnect_model(model
)
470 await self
.disconnect_controller(controller
)
473 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
478 async def deploy_charm(
480 application_name
: str,
484 db_dict
: dict = None,
485 progress_timeout
: float = None,
486 total_timeout
: float = None,
493 :param: application_name: Application name
494 :param: path: Local path to the charm
495 :param: model_name: Model name
496 :param: machine_id ID of the machine
497 :param: db_dict: Dictionary with data of the DB to write the updates
498 :param: progress_timeout: Maximum time between two updates in the model
499 :param: total_timeout: Timeout for the entity to be active
500 :param: config: Config for the charm
501 :param: series: Series of the charm
502 :param: num_units: Number of units
504 :return: (juju.application.Application): Juju application
507 "Deploying charm {} to machine {} in model ~{}".format(
508 application_name
, machine_id
, model_name
511 self
.log
.debug("charm: {}".format(path
))
514 controller
= await self
.get_controller()
517 model
= await self
.get_model(controller
, model_name
)
521 if application_name
not in model
.applications
:
523 if machine_id
is not None:
524 if machine_id
not in model
.machines
:
525 msg
= "Machine {} not found in model".format(machine_id
)
526 self
.log
.error(msg
=msg
)
527 raise JujuMachineNotFound(msg
)
528 machine
= model
.machines
[machine_id
]
529 series
= machine
.series
531 application
= await model
.deploy(
533 application_name
=application_name
,
542 "Wait until application {} is ready in model {}".format(
543 application_name
, model_name
547 for _
in range(num_units
- 1):
548 m
, _
= await self
.create_machine(model_name
, wait
=False)
549 await application
.add_unit(to
=m
.entity_id
)
551 await JujuModelWatcher
.wait_for(
554 progress_timeout
=progress_timeout
,
555 total_timeout
=total_timeout
,
560 "Application {} is ready in model {}".format(
561 application_name
, model_name
565 raise JujuApplicationExists(
566 "Application {} exists".format(application_name
)
569 await self
.disconnect_model(model
)
570 await self
.disconnect_controller(controller
)
574 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
577 :param: model: Model object
578 :param: application_name: Application name
580 :return: juju.application.Application (or None if it doesn't exist)
582 if model
.applications
and application_name
in model
.applications
:
583 return model
.applications
[application_name
]
585 async def execute_action(
587 application_name
: str,
590 db_dict
: dict = None,
591 progress_timeout
: float = None,
592 total_timeout
: float = None,
597 :param: application_name: Application name
598 :param: model_name: Model name
599 :param: action_name: Name of the action
600 :param: db_dict: Dictionary with data of the DB to write the updates
601 :param: progress_timeout: Maximum time between two updates in the model
602 :param: total_timeout: Timeout for the entity to be active
604 :return: (str, str): (output and status)
607 "Executing action {} using params {}".format(action_name
, kwargs
)
610 controller
= await self
.get_controller()
613 model
= await self
.get_model(controller
, model_name
)
617 application
= self
._get
_application
(
618 model
, application_name
=application_name
,
620 if application
is None:
621 raise JujuApplicationNotFound("Cannot execute action")
625 for u
in application
.units
:
626 if await u
.is_leader_from_status():
629 raise JujuLeaderUnitNotFound(
630 "Cannot execute action: leader unit not found"
633 actions
= await application
.get_actions()
635 if action_name
not in actions
:
636 raise JujuActionNotFound(
637 "Action {} not in available actions".format(action_name
)
640 action
= await unit
.run_action(action_name
, **kwargs
)
643 "Wait until action {} is completed in application {} (model={})".format(
644 action_name
, application_name
, model_name
647 await JujuModelWatcher
.wait_for(
650 progress_timeout
=progress_timeout
,
651 total_timeout
=total_timeout
,
656 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
657 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
659 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
663 "Action {} completed with status {} in application {} (model={})".format(
664 action_name
, action
.status
, application_name
, model_name
668 await self
.disconnect_model(model
)
669 await self
.disconnect_controller(controller
)
671 return output
, status
673 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
674 """Get list of actions
676 :param: application_name: Application name
677 :param: model_name: Model name
679 :return: Dict with this format
681 "action_name": "Description of the action",
686 "Getting list of actions for application {}".format(application_name
)
690 controller
= await self
.get_controller()
693 model
= await self
.get_model(controller
, model_name
)
697 application
= self
._get
_application
(
698 model
, application_name
=application_name
,
701 # Return list of actions
702 return await application
.get_actions()
705 # Disconnect from model and controller
706 await self
.disconnect_model(model
)
707 await self
.disconnect_controller(controller
)
709 async def add_relation(
712 application_name_1
: str,
713 application_name_2
: str,
719 :param: model_name: Model name
720 :param: application_name_1 First application name
721 :param: application_name_2: Second application name
722 :param: relation_1: First relation name
723 :param: relation_2: Second relation name
726 self
.log
.debug("Adding relation: {} -> {}".format(relation_1
, relation_2
))
729 controller
= await self
.get_controller()
732 model
= await self
.get_model(controller
, model_name
)
734 # Build relation strings
735 r1
= "{}:{}".format(application_name_1
, relation_1
)
736 r2
= "{}:{}".format(application_name_2
, relation_2
)
740 await model
.add_relation(relation1
=r1
, relation2
=r2
)
741 except JujuAPIError
as e
:
742 if "not found" in e
.message
:
743 self
.log
.warning("Relation not found: {}".format(e
.message
))
745 if "already exists" in e
.message
:
746 self
.log
.warning("Relation already exists: {}".format(e
.message
))
748 # another exception, raise it
751 await self
.disconnect_model(model
)
752 await self
.disconnect_controller(controller
)
754 async def destroy_model(self
, model_name
: str, total_timeout
: float):
758 :param: model_name: Model name
759 :param: total_timeout: Timeout
762 controller
= await self
.get_controller()
763 model
= await self
.get_model(controller
, model_name
)
765 self
.log
.debug("Destroying model {}".format(model_name
))
766 uuid
= model
.info
.uuid
769 await self
.disconnect_model(model
)
772 if model_name
in self
.models
:
773 self
.models
.remove(model_name
)
775 await controller
.destroy_model(uuid
, force
=True, max_wait
=0)
777 # Wait until model is destroyed
778 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
780 if total_timeout
is None:
782 end
= time
.time() + total_timeout
783 while time
.time() < end
:
784 models
= await controller
.list_models()
785 if model_name
not in models
:
787 "The model {} ({}) was destroyed".format(model_name
, uuid
)
790 await asyncio
.sleep(5)
792 "Timeout waiting for model {} to be destroyed".format(model_name
)
795 await self
.disconnect_controller(controller
)
797 async def destroy_application(self
, model
: Model
, application_name
: str):
801 :param: model: Model object
802 :param: application_name: Application name
805 "Destroying application {} in model {}".format(
806 application_name
, model
.info
.name
809 application
= model
.applications
.get(application_name
)
811 await application
.destroy()
813 self
.log
.warning("Application not found: {}".format(application_name
))
815 # async def destroy_machine(
816 # self, model: Model, machine_id: str, total_timeout: float = 3600
821 # :param: model: Model object
822 # :param: machine_id: Machine id
823 # :param: total_timeout: Timeout in seconds
825 # machines = await model.get_machines()
826 # if machine_id in machines:
827 # machine = machines[machine_id]
828 # await machine.destroy(force=True)
830 # end = time.time() + total_timeout
832 # # wait for machine removal
833 # machines = await model.get_machines()
834 # while machine_id in machines and time.time() < end:
835 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
836 # await asyncio.sleep(0.5)
837 # machines = await model.get_machines()
838 # self.log.debug("Machine destroyed: {}".format(machine_id))
840 # self.log.debug("Machine not found: {}".format(machine_id))
842 async def configure_application(
843 self
, model_name
: str, application_name
: str, config
: dict = None
845 """Configure application
847 :param: model_name: Model name
848 :param: application_name: Application name
849 :param: config: Config to apply to the charm
851 self
.log
.debug("Configuring application {}".format(application_name
))
855 controller
= await self
.get_controller()
856 model
= await self
.get_model(controller
, model_name
)
857 application
= self
._get
_application
(
858 model
, application_name
=application_name
,
860 await application
.set_config(config
)
862 await self
.disconnect_model(model
)
863 await self
.disconnect_controller(controller
)
865 def _get_api_endpoints_db(self
) -> [str]:
867 Get API Endpoints from DB
869 :return: List of API endpoints
871 self
.log
.debug("Getting endpoints from database")
873 juju_info
= self
.db
.get_one(
874 DB_DATA
.api_endpoints
.table
,
875 q_filter
=DB_DATA
.api_endpoints
.filter,
878 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
879 return juju_info
[DB_DATA
.api_endpoints
.key
]
881 def _update_api_endpoints_db(self
, endpoints
: [str]):
883 Update API endpoints in Database
885 :param: List of endpoints
887 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
889 juju_info
= self
.db
.get_one(
890 DB_DATA
.api_endpoints
.table
,
891 q_filter
=DB_DATA
.api_endpoints
.filter,
894 # If it doesn't, then create it
898 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
900 except DbException
as e
:
901 # Racing condition: check if another N2VC worker has created it
902 juju_info
= self
.db
.get_one(
903 DB_DATA
.api_endpoints
.table
,
904 q_filter
=DB_DATA
.api_endpoints
.filter,
910 DB_DATA
.api_endpoints
.table
,
911 DB_DATA
.api_endpoints
.filter,
912 {DB_DATA
.api_endpoints
.key
: endpoints
},
915 def handle_exception(self
, loop
, context
):
916 # All unhandled exceptions by libjuju are handled here.
919 async def health_check(self
, interval
: float = 300.0):
921 Health check to make sure controller and controller_model connections are OK
923 :param: interval: Time in seconds between checks
927 controller
= await self
.get_controller()
928 # self.log.debug("VCA is alive")
929 except Exception as e
:
930 self
.log
.error("Health check to VCA failed: {}".format(e
))
932 await self
.disconnect_controller(controller
)
933 await asyncio
.sleep(interval
)
935 async def list_models(self
, contains
: str = None) -> [str]:
936 """List models with certain names
938 :param: contains: String that is contained in model name
940 :retur: [models] Returns list of model names
943 controller
= await self
.get_controller()
945 models
= await controller
.list_models()
947 models
= [model
for model
in models
if contains
in model
]
950 await self
.disconnect_controller(controller
)