1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.client
._definitions
import FullStatus
, QueryApplicationOffersResults
26 from n2vc
.juju_watcher
import JujuModelWatcher
27 from n2vc
.provisioner
import AsyncSSHProvisioner
28 from n2vc
.n2vc_conn
import N2VCConnector
29 from n2vc
.exceptions
import (
31 JujuApplicationNotFound
,
32 JujuLeaderUnitNotFound
,
34 JujuModelAlreadyExists
,
35 JujuControllerFailedConnecting
,
36 JujuApplicationExists
,
38 from n2vc
.utils
import DB_DATA
39 from osm_common
.dbbase
import DbException
50 loop
: asyncio
.AbstractEventLoop
= None,
51 log
: logging
.Logger
= None,
53 n2vc
: N2VCConnector
= None,
54 apt_mirror
: str = None,
55 enable_os_upgrade
: bool = True,
60 :param: endpoint: Endpoint of the juju controller (host:port)
61 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
62 :param: username: Juju username
63 :param: password: Juju password
64 :param: cacert: Juju CA Certificate
65 :param: loop: Asyncio loop
68 :param: n2vc: N2VC object
69 :param: apt_mirror: APT Mirror
70 :param: enable_os_upgrade: Enable OS Upgrade
73 self
.log
= log
or logging
.getLogger("Libjuju")
75 db_endpoints
= self
._get
_api
_endpoints
_db
()
76 self
.endpoints
= db_endpoints
or [endpoint
]
77 if db_endpoints
is None:
78 self
._update
_api
_endpoints
_db
(self
.endpoints
)
79 self
.api_proxy
= api_proxy
80 self
.username
= username
81 self
.password
= password
83 self
.loop
= loop
or asyncio
.get_event_loop()
86 # Generate config for models
87 self
.model_config
= {}
89 self
.model_config
["apt-mirror"] = apt_mirror
90 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
91 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
93 self
.loop
.set_exception_handler(self
.handle_exception
)
94 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
97 self
.log
.debug("Libjuju initialized!")
99 self
.health_check_task
= self
.loop
.create_task(self
.health_check())
101 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
105 :param: timeout: Time in seconds to wait for controller to connect
109 controller
= Controller(loop
=self
.loop
)
110 await asyncio
.wait_for(
112 endpoint
=self
.endpoints
,
113 username
=self
.username
,
114 password
=self
.password
,
119 endpoints
= await controller
.api_endpoints
120 if self
.endpoints
!= endpoints
:
121 self
.endpoints
= endpoints
122 self
._update
_api
_endpoints
_db
(self
.endpoints
)
124 except asyncio
.CancelledError
as e
:
126 except Exception as e
:
128 "Failed connecting to controller: {}...".format(self
.endpoints
)
131 await self
.disconnect_controller(controller
)
132 raise JujuControllerFailedConnecting(e
)
134 async def disconnect(self
):
136 # Cancel health check task
137 self
.health_check_task
.cancel()
138 self
.log
.debug("Libjuju disconnected!")
140 async def disconnect_model(self
, model
: Model
):
144 :param: model: Model that will be disconnected
146 await model
.disconnect()
148 async def disconnect_controller(self
, controller
: Controller
):
150 Disconnect controller
152 :param: controller: Controller that will be disconnected
154 await controller
.disconnect()
156 async def add_model(self
, model_name
: str, cloud_name
: str):
160 :param: model_name: Model name
161 :param: cloud_name: Cloud name
165 controller
= await self
.get_controller()
168 # Raise exception if model already exists
169 if await self
.model_exists(model_name
, controller
=controller
):
170 raise JujuModelAlreadyExists(
171 "Model {} already exists.".format(model_name
)
174 # Block until other workers have finished model creation
175 while self
.creating_model
.locked():
176 await asyncio
.sleep(0.1)
178 # If the model exists, return it from the controller
179 if model_name
in self
.models
:
183 async with self
.creating_model
:
184 self
.log
.debug("Creating model {}".format(model_name
))
185 model
= await controller
.add_model(
187 config
=self
.model_config
,
188 cloud_name
=cloud_name
,
189 credential_name
=cloud_name
,
191 self
.models
.add(model_name
)
194 await self
.disconnect_model(model
)
195 await self
.disconnect_controller(controller
)
198 self
, controller
: Controller
, model_name
: str, id=None
201 Get model from controller
203 :param: controller: Controller
204 :param: model_name: Model name
206 :return: Model: The created Juju model object
208 return await controller
.get_model(model_name
)
210 async def model_exists(
211 self
, model_name
: str, controller
: Controller
= None
214 Check if model exists
216 :param: controller: Controller
217 :param: model_name: Model name
221 need_to_disconnect
= False
223 # Get controller if not passed
225 controller
= await self
.get_controller()
226 need_to_disconnect
= True
228 # Check if model exists
230 return model_name
in await controller
.list_models()
232 if need_to_disconnect
:
233 await self
.disconnect_controller(controller
)
235 async def models_exist(self
, model_names
: [str]) -> (bool, list):
237 Check if models exists
239 :param: model_names: List of strings with model names
241 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
245 "model_names must be a non-empty array. Given value: {}".format(
249 non_existing_models
= []
250 models
= await self
.list_models()
251 existing_models
= list(set(models
).intersection(model_names
))
252 non_existing_models
= list(set(model_names
) - set(existing_models
))
255 len(non_existing_models
) == 0,
259 async def get_model_status(self
, model_name
: str) -> FullStatus
:
263 :param: model_name: Model name
265 :return: Full status object
267 controller
= await self
.get_controller()
268 model
= await self
.get_model(controller
, model_name
)
270 return await model
.get_status()
272 await self
.disconnect_model(model
)
273 await self
.disconnect_controller(controller
)
275 async def create_machine(
278 machine_id
: str = None,
279 db_dict
: dict = None,
280 progress_timeout
: float = None,
281 total_timeout
: float = None,
282 series
: str = "xenial",
284 ) -> (Machine
, bool):
288 :param: model_name: Model name
289 :param: machine_id: Machine id
290 :param: db_dict: Dictionary with data of the DB to write the updates
291 :param: progress_timeout: Maximum time between two updates in the model
292 :param: total_timeout: Timeout for the entity to be active
293 :param: series: Series of the machine (xenial, bionic, focal, ...)
294 :param: wait: Wait until machine is ready
296 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
297 if the machine is new or it already existed
303 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
307 controller
= await self
.get_controller()
310 model
= await self
.get_model(controller
, model_name
)
312 if machine_id
is not None:
314 "Searching machine (id={}) in model {}".format(
315 machine_id
, model_name
319 # Get machines from model and get the machine with machine_id if exists
320 machines
= await model
.get_machines()
321 if machine_id
in machines
:
323 "Machine (id={}) found in model {}".format(
324 machine_id
, model_name
327 machine
= machines
[machine_id
]
329 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
332 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
335 machine
= await model
.add_machine(
336 spec
=None, constraints
=None, disks
=None, series
=series
340 # Wait until the machine is ready
342 "Wait until machine {} is ready in model {}".format(
343 machine
.entity_id
, model_name
347 await JujuModelWatcher
.wait_for(
350 progress_timeout
=progress_timeout
,
351 total_timeout
=total_timeout
,
356 await self
.disconnect_model(model
)
357 await self
.disconnect_controller(controller
)
360 "Machine {} ready at {} in model {}".format(
361 machine
.entity_id
, machine
.dns_name
, model_name
366 async def provision_machine(
371 private_key_path
: str,
372 db_dict
: dict = None,
373 progress_timeout
: float = None,
374 total_timeout
: float = None,
377 Manually provisioning of a machine
379 :param: model_name: Model name
380 :param: hostname: IP to access the machine
381 :param: username: Username to login to the machine
382 :param: private_key_path: Local path for the private key
383 :param: db_dict: Dictionary with data of the DB to write the updates
384 :param: progress_timeout: Maximum time between two updates in the model
385 :param: total_timeout: Timeout for the entity to be active
387 :return: (Entity): Machine id
390 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
391 model_name
, hostname
, username
396 controller
= await self
.get_controller()
399 model
= await self
.get_model(controller
, model_name
)
403 provisioner
= AsyncSSHProvisioner(
406 private_key_path
=private_key_path
,
411 params
= await provisioner
.provision_machine()
413 params
.jobs
= ["JobHostUnits"]
415 self
.log
.debug("Adding machine to model")
416 connection
= model
.connection()
417 client_facade
= client
.ClientFacade
.from_connection(connection
)
419 results
= await client_facade
.AddMachines(params
=[params
])
420 error
= results
.machines
[0].error
423 msg
= "Error adding machine: {}".format(error
.message
)
424 self
.log
.error(msg
=msg
)
425 raise ValueError(msg
)
427 machine_id
= results
.machines
[0].machine
429 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
430 asyncio
.ensure_future(
431 provisioner
.install_agent(
432 connection
=connection
,
434 machine_id
=machine_id
,
435 proxy
=self
.api_proxy
,
441 machine_list
= await model
.get_machines()
442 if machine_id
in machine_list
:
443 self
.log
.debug("Machine {} found in model!".format(machine_id
))
444 machine
= model
.machines
.get(machine_id
)
446 await asyncio
.sleep(2)
449 msg
= "Machine {} not found in model".format(machine_id
)
450 self
.log
.error(msg
=msg
)
451 raise JujuMachineNotFound(msg
)
454 "Wait until machine {} is ready in model {}".format(
455 machine
.entity_id
, model_name
458 await JujuModelWatcher
.wait_for(
461 progress_timeout
=progress_timeout
,
462 total_timeout
=total_timeout
,
466 except Exception as e
:
469 await self
.disconnect_model(model
)
470 await self
.disconnect_controller(controller
)
473 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
478 async def deploy_charm(
480 application_name
: str,
484 db_dict
: dict = None,
485 progress_timeout
: float = None,
486 total_timeout
: float = None,
493 :param: application_name: Application name
494 :param: path: Local path to the charm
495 :param: model_name: Model name
496 :param: machine_id ID of the machine
497 :param: db_dict: Dictionary with data of the DB to write the updates
498 :param: progress_timeout: Maximum time between two updates in the model
499 :param: total_timeout: Timeout for the entity to be active
500 :param: config: Config for the charm
501 :param: series: Series of the charm
502 :param: num_units: Number of units
504 :return: (juju.application.Application): Juju application
507 "Deploying charm {} to machine {} in model ~{}".format(
508 application_name
, machine_id
, model_name
511 self
.log
.debug("charm: {}".format(path
))
514 controller
= await self
.get_controller()
517 model
= await self
.get_model(controller
, model_name
)
521 if application_name
not in model
.applications
:
523 if machine_id
is not None:
524 if machine_id
not in model
.machines
:
525 msg
= "Machine {} not found in model".format(machine_id
)
526 self
.log
.error(msg
=msg
)
527 raise JujuMachineNotFound(msg
)
528 machine
= model
.machines
[machine_id
]
529 series
= machine
.series
531 application
= await model
.deploy(
533 application_name
=application_name
,
542 "Wait until application {} is ready in model {}".format(
543 application_name
, model_name
547 for _
in range(num_units
- 1):
548 m
, _
= await self
.create_machine(model_name
, wait
=False)
549 await application
.add_unit(to
=m
.entity_id
)
551 await JujuModelWatcher
.wait_for(
554 progress_timeout
=progress_timeout
,
555 total_timeout
=total_timeout
,
560 "Application {} is ready in model {}".format(
561 application_name
, model_name
565 raise JujuApplicationExists(
566 "Application {} exists".format(application_name
)
569 await self
.disconnect_model(model
)
570 await self
.disconnect_controller(controller
)
574 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
577 :param: model: Model object
578 :param: application_name: Application name
580 :return: juju.application.Application (or None if it doesn't exist)
582 if model
.applications
and application_name
in model
.applications
:
583 return model
.applications
[application_name
]
585 async def execute_action(
587 application_name
: str,
590 db_dict
: dict = None,
591 progress_timeout
: float = None,
592 total_timeout
: float = None,
597 :param: application_name: Application name
598 :param: model_name: Model name
599 :param: action_name: Name of the action
600 :param: db_dict: Dictionary with data of the DB to write the updates
601 :param: progress_timeout: Maximum time between two updates in the model
602 :param: total_timeout: Timeout for the entity to be active
604 :return: (str, str): (output and status)
607 "Executing action {} using params {}".format(action_name
, kwargs
)
610 controller
= await self
.get_controller()
613 model
= await self
.get_model(controller
, model_name
)
617 application
= self
._get
_application
(
618 model
, application_name
=application_name
,
620 if application
is None:
621 raise JujuApplicationNotFound("Cannot execute action")
625 for u
in application
.units
:
626 if await u
.is_leader_from_status():
629 raise JujuLeaderUnitNotFound(
630 "Cannot execute action: leader unit not found"
633 actions
= await application
.get_actions()
635 if action_name
not in actions
:
636 raise JujuActionNotFound(
637 "Action {} not in available actions".format(action_name
)
640 action
= await unit
.run_action(action_name
, **kwargs
)
643 "Wait until action {} is completed in application {} (model={})".format(
644 action_name
, application_name
, model_name
647 await JujuModelWatcher
.wait_for(
650 progress_timeout
=progress_timeout
,
651 total_timeout
=total_timeout
,
656 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
657 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
659 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
663 "Action {} completed with status {} in application {} (model={})".format(
664 action_name
, action
.status
, application_name
, model_name
668 await self
.disconnect_model(model
)
669 await self
.disconnect_controller(controller
)
671 return output
, status
673 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
674 """Get list of actions
676 :param: application_name: Application name
677 :param: model_name: Model name
679 :return: Dict with this format
681 "action_name": "Description of the action",
686 "Getting list of actions for application {}".format(application_name
)
690 controller
= await self
.get_controller()
693 model
= await self
.get_model(controller
, model_name
)
697 application
= self
._get
_application
(
698 model
, application_name
=application_name
,
701 # Return list of actions
702 return await application
.get_actions()
705 # Disconnect from model and controller
706 await self
.disconnect_model(model
)
707 await self
.disconnect_controller(controller
)
709 async def add_relation(
710 self
, model_name
: str, endpoint_1
: str, endpoint_2
: str,
714 :param: model_name: Model name
715 :param: endpoint_1 First endpoint name
716 ("app:endpoint" format or directly the saas name)
717 :param: endpoint_2: Second endpoint name (^ same format)
720 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
723 controller
= await self
.get_controller()
726 model
= await self
.get_model(controller
, model_name
)
730 await model
.add_relation(endpoint_1
, endpoint_2
)
731 except JujuAPIError
as e
:
732 if "not found" in e
.message
:
733 self
.log
.warning("Relation not found: {}".format(e
.message
))
735 if "already exists" in e
.message
:
736 self
.log
.warning("Relation already exists: {}".format(e
.message
))
738 # another exception, raise it
741 await self
.disconnect_model(model
)
742 await self
.disconnect_controller(controller
)
745 self
, offer_url
: str, model_name
: str,
748 Adds a remote offer to the model. Relations can be created later using "juju relate".
750 :param: offer_url: Offer Url
751 :param: model_name: Model name
753 :raises ParseError if there's a problem parsing the offer_url
754 :raises JujuError if remote offer includes and endpoint
755 :raises JujuAPIError if the operation is not successful
757 controller
= await self
.get_controller()
758 model
= await controller
.get_model(model_name
)
761 await model
.consume(offer_url
)
763 await self
.disconnect_model(model
)
764 await self
.disconnect_controller(controller
)
766 async def destroy_model(self
, model_name
: str, total_timeout
: float):
770 :param: model_name: Model name
771 :param: total_timeout: Timeout
774 controller
= await self
.get_controller()
775 model
= await self
.get_model(controller
, model_name
)
777 self
.log
.debug("Destroying model {}".format(model_name
))
778 uuid
= model
.info
.uuid
781 machines
= await model
.get_machines()
782 for machine_id
in machines
:
784 await self
.destroy_machine(
785 model
, machine_id
=machine_id
, total_timeout
=total_timeout
,
787 except asyncio
.CancelledError
:
793 await self
.disconnect_model(model
)
796 if model_name
in self
.models
:
797 self
.models
.remove(model_name
)
799 await controller
.destroy_model(uuid
)
801 # Wait until model is destroyed
802 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
805 if total_timeout
is None:
807 end
= time
.time() + total_timeout
808 while time
.time() < end
:
810 models
= await controller
.list_models()
811 if model_name
not in models
:
813 "The model {} ({}) was destroyed".format(model_name
, uuid
)
816 except asyncio
.CancelledError
:
818 except Exception as e
:
820 await asyncio
.sleep(5)
822 "Timeout waiting for model {} to be destroyed {}".format(
823 model_name
, last_exception
827 await self
.disconnect_controller(controller
)
829 async def destroy_application(self
, model
: Model
, application_name
: str):
833 :param: model: Model object
834 :param: application_name: Application name
837 "Destroying application {} in model {}".format(
838 application_name
, model
.info
.name
841 application
= model
.applications
.get(application_name
)
843 await application
.destroy()
845 self
.log
.warning("Application not found: {}".format(application_name
))
847 async def destroy_machine(
848 self
, model
: Model
, machine_id
: str, total_timeout
: float = 3600
853 :param: model: Model object
854 :param: machine_id: Machine id
855 :param: total_timeout: Timeout in seconds
857 machines
= await model
.get_machines()
858 if machine_id
in machines
:
859 machine
= machines
[machine_id
]
860 await machine
.destroy(force
=True)
862 end
= time
.time() + total_timeout
864 # wait for machine removal
865 machines
= await model
.get_machines()
866 while machine_id
in machines
and time
.time() < end
:
867 self
.log
.debug("Waiting for machine {} is destroyed".format(machine_id
))
868 await asyncio
.sleep(0.5)
869 machines
= await model
.get_machines()
870 self
.log
.debug("Machine destroyed: {}".format(machine_id
))
872 self
.log
.debug("Machine not found: {}".format(machine_id
))
874 async def configure_application(
875 self
, model_name
: str, application_name
: str, config
: dict = None
877 """Configure application
879 :param: model_name: Model name
880 :param: application_name: Application name
881 :param: config: Config to apply to the charm
883 self
.log
.debug("Configuring application {}".format(application_name
))
887 controller
= await self
.get_controller()
888 model
= await self
.get_model(controller
, model_name
)
889 application
= self
._get
_application
(
890 model
, application_name
=application_name
,
892 await application
.set_config(config
)
894 await self
.disconnect_model(model
)
895 await self
.disconnect_controller(controller
)
897 def _get_api_endpoints_db(self
) -> [str]:
899 Get API Endpoints from DB
901 :return: List of API endpoints
903 self
.log
.debug("Getting endpoints from database")
905 juju_info
= self
.db
.get_one(
906 DB_DATA
.api_endpoints
.table
,
907 q_filter
=DB_DATA
.api_endpoints
.filter,
910 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
911 return juju_info
[DB_DATA
.api_endpoints
.key
]
913 def _update_api_endpoints_db(self
, endpoints
: [str]):
915 Update API endpoints in Database
917 :param: List of endpoints
919 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
921 juju_info
= self
.db
.get_one(
922 DB_DATA
.api_endpoints
.table
,
923 q_filter
=DB_DATA
.api_endpoints
.filter,
926 # If it doesn't, then create it
930 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
932 except DbException
as e
:
933 # Racing condition: check if another N2VC worker has created it
934 juju_info
= self
.db
.get_one(
935 DB_DATA
.api_endpoints
.table
,
936 q_filter
=DB_DATA
.api_endpoints
.filter,
942 DB_DATA
.api_endpoints
.table
,
943 DB_DATA
.api_endpoints
.filter,
944 {DB_DATA
.api_endpoints
.key
: endpoints
},
947 def handle_exception(self
, loop
, context
):
948 # All unhandled exceptions by libjuju are handled here.
951 async def health_check(self
, interval
: float = 300.0):
953 Health check to make sure controller and controller_model connections are OK
955 :param: interval: Time in seconds between checks
959 controller
= await self
.get_controller()
960 # self.log.debug("VCA is alive")
961 except Exception as e
:
962 self
.log
.error("Health check to VCA failed: {}".format(e
))
964 await self
.disconnect_controller(controller
)
965 await asyncio
.sleep(interval
)
967 async def list_models(self
, contains
: str = None) -> [str]:
968 """List models with certain names
970 :param: contains: String that is contained in model name
972 :retur: [models] Returns list of model names
975 controller
= await self
.get_controller()
977 models
= await controller
.list_models()
979 models
= [model
for model
in models
if contains
in model
]
982 await self
.disconnect_controller(controller
)
984 async def list_offers(self
, model_name
: str) -> QueryApplicationOffersResults
:
985 """List models with certain names
987 :param: model_name: Model name
989 :return: Returns list of offers
992 controller
= await self
.get_controller()
994 return await controller
.list_offers(model_name
)
996 await self
.disconnect_controller(controller
)