1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.client
._definitions
import FullStatus
26 from n2vc
.juju_watcher
import JujuModelWatcher
27 from n2vc
.provisioner
import AsyncSSHProvisioner
28 from n2vc
.n2vc_conn
import N2VCConnector
29 from n2vc
.exceptions
import (
31 JujuApplicationNotFound
,
32 JujuLeaderUnitNotFound
,
34 JujuModelAlreadyExists
,
35 JujuControllerFailedConnecting
,
36 JujuApplicationExists
,
39 from n2vc
.utils
import DB_DATA
40 from osm_common
.dbbase
import DbException
51 loop
: asyncio
.AbstractEventLoop
= None,
52 log
: logging
.Logger
= None,
54 n2vc
: N2VCConnector
= None,
55 apt_mirror
: str = None,
56 enable_os_upgrade
: bool = True,
61 :param: endpoint: Endpoint of the juju controller (host:port)
62 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
63 :param: username: Juju username
64 :param: password: Juju password
65 :param: cacert: Juju CA Certificate
66 :param: loop: Asyncio loop
69 :param: n2vc: N2VC object
70 :param: apt_mirror: APT Mirror
71 :param: enable_os_upgrade: Enable OS Upgrade
74 self
.log
= log
or logging
.getLogger("Libjuju")
76 db_endpoints
= self
._get
_api
_endpoints
_db
()
77 self
.endpoints
= db_endpoints
or [endpoint
]
78 if db_endpoints
is None:
79 self
._update
_api
_endpoints
_db
(self
.endpoints
)
80 self
.api_proxy
= api_proxy
81 self
.username
= username
82 self
.password
= password
84 self
.loop
= loop
or asyncio
.get_event_loop()
87 # Generate config for models
88 self
.model_config
= {}
90 self
.model_config
["apt-mirror"] = apt_mirror
91 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
92 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
94 self
.loop
.set_exception_handler(self
.handle_exception
)
95 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
98 self
.log
.debug("Libjuju initialized!")
100 self
.health_check_task
= self
.loop
.create_task(self
.health_check())
102 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
106 :param: timeout: Time in seconds to wait for controller to connect
110 controller
= Controller(loop
=self
.loop
)
111 await asyncio
.wait_for(
113 endpoint
=self
.endpoints
,
114 username
=self
.username
,
115 password
=self
.password
,
120 endpoints
= await controller
.api_endpoints
121 if self
.endpoints
!= endpoints
:
122 self
.endpoints
= endpoints
123 self
._update
_api
_endpoints
_db
(self
.endpoints
)
125 except asyncio
.CancelledError
as e
:
127 except Exception as e
:
129 "Failed connecting to controller: {}...".format(self
.endpoints
)
132 await self
.disconnect_controller(controller
)
133 raise JujuControllerFailedConnecting(e
)
135 async def disconnect(self
):
137 # Cancel health check task
138 self
.health_check_task
.cancel()
139 self
.log
.debug("Libjuju disconnected!")
141 async def disconnect_model(self
, model
: Model
):
145 :param: model: Model that will be disconnected
147 await model
.disconnect()
149 async def disconnect_controller(self
, controller
: Controller
):
151 Disconnect controller
153 :param: controller: Controller that will be disconnected
155 await controller
.disconnect()
157 async def add_model(self
, model_name
: str, cloud_name
: str):
161 :param: model_name: Model name
162 :param: cloud_name: Cloud name
166 controller
= await self
.get_controller()
169 # Raise exception if model already exists
170 if await self
.model_exists(model_name
, controller
=controller
):
171 raise JujuModelAlreadyExists(
172 "Model {} already exists.".format(model_name
)
175 # Block until other workers have finished model creation
176 while self
.creating_model
.locked():
177 await asyncio
.sleep(0.1)
179 # If the model exists, return it from the controller
180 if model_name
in self
.models
:
184 async with self
.creating_model
:
185 self
.log
.debug("Creating model {}".format(model_name
))
186 model
= await controller
.add_model(
188 config
=self
.model_config
,
189 cloud_name
=cloud_name
,
190 credential_name
=cloud_name
,
192 self
.models
.add(model_name
)
195 await self
.disconnect_model(model
)
196 await self
.disconnect_controller(controller
)
198 async def get_executed_actions(self
, model_name
: str) -> list:
200 Get executed/history of actions for a model.
202 :param: model_name: Model name, str.
203 :return: List of executed actions for a model.
206 executed_actions
= []
207 controller
= await self
.get_controller()
209 model
= await self
.get_model(controller
, model_name
)
210 # Get all unique action names
212 for application
in model
.applications
:
213 application_actions
= await self
.get_actions(application
, model_name
)
214 actions
.update(application_actions
)
215 # Get status of all actions
216 for application_action
in actions
:
217 app_action_status_list
= await model
.get_action_status(name
=application_action
)
218 for action_id
, action_status
in app_action_status_list
.items():
219 executed_action
= {"id": action_id
, "action": application_action
,
220 "status": action_status
}
221 # Get action output by id
222 action_status
= await model
.get_action_output(executed_action
["id"])
223 for k
, v
in action_status
.items():
224 executed_action
[k
] = v
225 executed_actions
.append(executed_action
)
226 except Exception as e
:
227 raise JujuError("Error in getting executed actions for model: {}. Error: {}"
228 .format(model_name
, str(e
)))
231 await self
.disconnect_model(model
)
232 await self
.disconnect_controller(controller
)
233 return executed_actions
235 async def get_application_configs(self
, model_name
: str, application_name
: str) -> dict:
237 Get available configs for an application.
239 :param: model_name: Model name, str.
240 :param: application_name: Application name, str.
242 :return: A dict which has key - action name, value - action description
245 application_configs
= {}
246 controller
= await self
.get_controller()
248 model
= await self
.get_model(controller
, model_name
)
249 application
= self
._get
_application
(model
, application_name
=application_name
)
250 application_configs
= await application
.get_config()
251 except Exception as e
:
252 raise JujuError("Error in getting configs for application: {} in model: {}. Error: {}"
253 .format(application_name
, model_name
, str(e
)))
256 await self
.disconnect_model(model
)
257 await self
.disconnect_controller(controller
)
258 return application_configs
261 self
, controller
: Controller
, model_name
: str, id=None
264 Get model from controller
266 :param: controller: Controller
267 :param: model_name: Model name
269 :return: Model: The created Juju model object
271 return await controller
.get_model(model_name
)
273 async def model_exists(
274 self
, model_name
: str, controller
: Controller
= None
277 Check if model exists
279 :param: controller: Controller
280 :param: model_name: Model name
284 need_to_disconnect
= False
286 # Get controller if not passed
288 controller
= await self
.get_controller()
289 need_to_disconnect
= True
291 # Check if model exists
293 return model_name
in await controller
.list_models()
295 if need_to_disconnect
:
296 await self
.disconnect_controller(controller
)
298 async def models_exist(self
, model_names
: [str]) -> (bool, list):
300 Check if models exists
302 :param: model_names: List of strings with model names
304 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
308 "model_names must be a non-empty array. Given value: {}".format(
312 non_existing_models
= []
313 models
= await self
.list_models()
314 existing_models
= list(set(models
).intersection(model_names
))
315 non_existing_models
= list(set(model_names
) - set(existing_models
))
318 len(non_existing_models
) == 0,
322 async def get_model_status(self
, model_name
: str) -> FullStatus
:
326 :param: model_name: Model name
328 :return: Full status object
330 controller
= await self
.get_controller()
331 model
= await self
.get_model(controller
, model_name
)
333 return await model
.get_status()
335 await self
.disconnect_model(model
)
336 await self
.disconnect_controller(controller
)
338 async def create_machine(
341 machine_id
: str = None,
342 db_dict
: dict = None,
343 progress_timeout
: float = None,
344 total_timeout
: float = None,
345 series
: str = "xenial",
347 ) -> (Machine
, bool):
351 :param: model_name: Model name
352 :param: machine_id: Machine id
353 :param: db_dict: Dictionary with data of the DB to write the updates
354 :param: progress_timeout: Maximum time between two updates in the model
355 :param: total_timeout: Timeout for the entity to be active
356 :param: series: Series of the machine (xenial, bionic, focal, ...)
357 :param: wait: Wait until machine is ready
359 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
360 if the machine is new or it already existed
366 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
370 controller
= await self
.get_controller()
373 model
= await self
.get_model(controller
, model_name
)
375 if machine_id
is not None:
377 "Searching machine (id={}) in model {}".format(
378 machine_id
, model_name
382 # Get machines from model and get the machine with machine_id if exists
383 machines
= await model
.get_machines()
384 if machine_id
in machines
:
386 "Machine (id={}) found in model {}".format(
387 machine_id
, model_name
390 machine
= machines
[machine_id
]
392 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
395 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
398 machine
= await model
.add_machine(
399 spec
=None, constraints
=None, disks
=None, series
=series
403 # Wait until the machine is ready
405 "Wait until machine {} is ready in model {}".format(
406 machine
.entity_id
, model_name
410 await JujuModelWatcher
.wait_for(
413 progress_timeout
=progress_timeout
,
414 total_timeout
=total_timeout
,
419 await self
.disconnect_model(model
)
420 await self
.disconnect_controller(controller
)
423 "Machine {} ready at {} in model {}".format(
424 machine
.entity_id
, machine
.dns_name
, model_name
429 async def provision_machine(
434 private_key_path
: str,
435 db_dict
: dict = None,
436 progress_timeout
: float = None,
437 total_timeout
: float = None,
440 Manually provisioning of a machine
442 :param: model_name: Model name
443 :param: hostname: IP to access the machine
444 :param: username: Username to login to the machine
445 :param: private_key_path: Local path for the private key
446 :param: db_dict: Dictionary with data of the DB to write the updates
447 :param: progress_timeout: Maximum time between two updates in the model
448 :param: total_timeout: Timeout for the entity to be active
450 :return: (Entity): Machine id
453 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
454 model_name
, hostname
, username
459 controller
= await self
.get_controller()
462 model
= await self
.get_model(controller
, model_name
)
466 provisioner
= AsyncSSHProvisioner(
469 private_key_path
=private_key_path
,
474 params
= await provisioner
.provision_machine()
476 params
.jobs
= ["JobHostUnits"]
478 self
.log
.debug("Adding machine to model")
479 connection
= model
.connection()
480 client_facade
= client
.ClientFacade
.from_connection(connection
)
482 results
= await client_facade
.AddMachines(params
=[params
])
483 error
= results
.machines
[0].error
486 msg
= "Error adding machine: {}".format(error
.message
)
487 self
.log
.error(msg
=msg
)
488 raise ValueError(msg
)
490 machine_id
= results
.machines
[0].machine
492 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
493 asyncio
.ensure_future(
494 provisioner
.install_agent(
495 connection
=connection
,
497 machine_id
=machine_id
,
498 proxy
=self
.api_proxy
,
504 machine_list
= await model
.get_machines()
505 if machine_id
in machine_list
:
506 self
.log
.debug("Machine {} found in model!".format(machine_id
))
507 machine
= model
.machines
.get(machine_id
)
509 await asyncio
.sleep(2)
512 msg
= "Machine {} not found in model".format(machine_id
)
513 self
.log
.error(msg
=msg
)
514 raise JujuMachineNotFound(msg
)
517 "Wait until machine {} is ready in model {}".format(
518 machine
.entity_id
, model_name
521 await JujuModelWatcher
.wait_for(
524 progress_timeout
=progress_timeout
,
525 total_timeout
=total_timeout
,
529 except Exception as e
:
532 await self
.disconnect_model(model
)
533 await self
.disconnect_controller(controller
)
536 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
541 async def deploy_charm(
543 application_name
: str,
547 db_dict
: dict = None,
548 progress_timeout
: float = None,
549 total_timeout
: float = None,
556 :param: application_name: Application name
557 :param: path: Local path to the charm
558 :param: model_name: Model name
559 :param: machine_id ID of the machine
560 :param: db_dict: Dictionary with data of the DB to write the updates
561 :param: progress_timeout: Maximum time between two updates in the model
562 :param: total_timeout: Timeout for the entity to be active
563 :param: config: Config for the charm
564 :param: series: Series of the charm
565 :param: num_units: Number of units
567 :return: (juju.application.Application): Juju application
570 "Deploying charm {} to machine {} in model ~{}".format(
571 application_name
, machine_id
, model_name
574 self
.log
.debug("charm: {}".format(path
))
577 controller
= await self
.get_controller()
580 model
= await self
.get_model(controller
, model_name
)
584 if application_name
not in model
.applications
:
586 if machine_id
is not None:
587 if machine_id
not in model
.machines
:
588 msg
= "Machine {} not found in model".format(machine_id
)
589 self
.log
.error(msg
=msg
)
590 raise JujuMachineNotFound(msg
)
591 machine
= model
.machines
[machine_id
]
592 series
= machine
.series
594 application
= await model
.deploy(
596 application_name
=application_name
,
605 "Wait until application {} is ready in model {}".format(
606 application_name
, model_name
610 for _
in range(num_units
- 1):
611 m
, _
= await self
.create_machine(model_name
, wait
=False)
612 await application
.add_unit(to
=m
.entity_id
)
614 await JujuModelWatcher
.wait_for(
617 progress_timeout
=progress_timeout
,
618 total_timeout
=total_timeout
,
623 "Application {} is ready in model {}".format(
624 application_name
, model_name
628 raise JujuApplicationExists(
629 "Application {} exists".format(application_name
)
632 await self
.disconnect_model(model
)
633 await self
.disconnect_controller(controller
)
637 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
640 :param: model: Model object
641 :param: application_name: Application name
643 :return: juju.application.Application (or None if it doesn't exist)
645 if model
.applications
and application_name
in model
.applications
:
646 return model
.applications
[application_name
]
648 async def execute_action(
650 application_name
: str,
653 db_dict
: dict = None,
654 progress_timeout
: float = None,
655 total_timeout
: float = None,
660 :param: application_name: Application name
661 :param: model_name: Model name
662 :param: action_name: Name of the action
663 :param: db_dict: Dictionary with data of the DB to write the updates
664 :param: progress_timeout: Maximum time between two updates in the model
665 :param: total_timeout: Timeout for the entity to be active
667 :return: (str, str): (output and status)
670 "Executing action {} using params {}".format(action_name
, kwargs
)
673 controller
= await self
.get_controller()
676 model
= await self
.get_model(controller
, model_name
)
680 application
= self
._get
_application
(
681 model
, application_name
=application_name
,
683 if application
is None:
684 raise JujuApplicationNotFound("Cannot execute action")
688 for u
in application
.units
:
689 if await u
.is_leader_from_status():
692 raise JujuLeaderUnitNotFound(
693 "Cannot execute action: leader unit not found"
696 actions
= await application
.get_actions()
698 if action_name
not in actions
:
699 raise JujuActionNotFound(
700 "Action {} not in available actions".format(action_name
)
703 action
= await unit
.run_action(action_name
, **kwargs
)
706 "Wait until action {} is completed in application {} (model={})".format(
707 action_name
, application_name
, model_name
710 await JujuModelWatcher
.wait_for(
713 progress_timeout
=progress_timeout
,
714 total_timeout
=total_timeout
,
719 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
720 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
722 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
726 "Action {} completed with status {} in application {} (model={})".format(
727 action_name
, action
.status
, application_name
, model_name
731 await self
.disconnect_model(model
)
732 await self
.disconnect_controller(controller
)
734 return output
, status
736 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
737 """Get list of actions
739 :param: application_name: Application name
740 :param: model_name: Model name
742 :return: Dict with this format
744 "action_name": "Description of the action",
749 "Getting list of actions for application {}".format(application_name
)
753 controller
= await self
.get_controller()
756 model
= await self
.get_model(controller
, model_name
)
760 application
= self
._get
_application
(
761 model
, application_name
=application_name
,
764 # Return list of actions
765 return await application
.get_actions()
768 # Disconnect from model and controller
769 await self
.disconnect_model(model
)
770 await self
.disconnect_controller(controller
)
772 async def add_relation(
775 application_name_1
: str,
776 application_name_2
: str,
782 :param: model_name: Model name
783 :param: application_name_1 First application name
784 :param: application_name_2: Second application name
785 :param: relation_1: First relation name
786 :param: relation_2: Second relation name
789 self
.log
.debug("Adding relation: {} -> {}".format(relation_1
, relation_2
))
792 controller
= await self
.get_controller()
795 model
= await self
.get_model(controller
, model_name
)
797 # Build relation strings
798 r1
= "{}:{}".format(application_name_1
, relation_1
)
799 r2
= "{}:{}".format(application_name_2
, relation_2
)
803 await model
.add_relation(relation1
=r1
, relation2
=r2
)
804 except JujuAPIError
as e
:
805 if "not found" in e
.message
:
806 self
.log
.warning("Relation not found: {}".format(e
.message
))
808 if "already exists" in e
.message
:
809 self
.log
.warning("Relation already exists: {}".format(e
.message
))
811 # another exception, raise it
814 await self
.disconnect_model(model
)
815 await self
.disconnect_controller(controller
)
817 async def destroy_model(self
, model_name
: str, total_timeout
: float):
821 :param: model_name: Model name
822 :param: total_timeout: Timeout
825 controller
= await self
.get_controller()
826 model
= await self
.get_model(controller
, model_name
)
828 self
.log
.debug("Destroying model {}".format(model_name
))
829 uuid
= model
.info
.uuid
832 await self
.disconnect_model(model
)
835 if model_name
in self
.models
:
836 self
.models
.remove(model_name
)
838 await controller
.destroy_model(uuid
, force
=True, max_wait
=0)
840 # Wait until model is destroyed
841 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
843 if total_timeout
is None:
845 end
= time
.time() + total_timeout
846 while time
.time() < end
:
847 models
= await controller
.list_models()
848 if model_name
not in models
:
850 "The model {} ({}) was destroyed".format(model_name
, uuid
)
853 await asyncio
.sleep(5)
855 "Timeout waiting for model {} to be destroyed".format(model_name
)
858 await self
.disconnect_controller(controller
)
860 async def destroy_application(self
, model
: Model
, application_name
: str):
864 :param: model: Model object
865 :param: application_name: Application name
868 "Destroying application {} in model {}".format(
869 application_name
, model
.info
.name
872 application
= model
.applications
.get(application_name
)
874 await application
.destroy()
876 self
.log
.warning("Application not found: {}".format(application_name
))
878 # async def destroy_machine(
879 # self, model: Model, machine_id: str, total_timeout: float = 3600
884 # :param: model: Model object
885 # :param: machine_id: Machine id
886 # :param: total_timeout: Timeout in seconds
888 # machines = await model.get_machines()
889 # if machine_id in machines:
890 # machine = machines[machine_id]
891 # await machine.destroy(force=True)
893 # end = time.time() + total_timeout
895 # # wait for machine removal
896 # machines = await model.get_machines()
897 # while machine_id in machines and time.time() < end:
898 # self.log.debug("Waiting for machine {} is destroyed".format(machine_id))
899 # await asyncio.sleep(0.5)
900 # machines = await model.get_machines()
901 # self.log.debug("Machine destroyed: {}".format(machine_id))
903 # self.log.debug("Machine not found: {}".format(machine_id))
905 async def configure_application(
906 self
, model_name
: str, application_name
: str, config
: dict = None
908 """Configure application
910 :param: model_name: Model name
911 :param: application_name: Application name
912 :param: config: Config to apply to the charm
914 self
.log
.debug("Configuring application {}".format(application_name
))
918 controller
= await self
.get_controller()
919 model
= await self
.get_model(controller
, model_name
)
920 application
= self
._get
_application
(
921 model
, application_name
=application_name
,
923 await application
.set_config(config
)
925 await self
.disconnect_model(model
)
926 await self
.disconnect_controller(controller
)
928 def _get_api_endpoints_db(self
) -> [str]:
930 Get API Endpoints from DB
932 :return: List of API endpoints
934 self
.log
.debug("Getting endpoints from database")
936 juju_info
= self
.db
.get_one(
937 DB_DATA
.api_endpoints
.table
,
938 q_filter
=DB_DATA
.api_endpoints
.filter,
941 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
942 return juju_info
[DB_DATA
.api_endpoints
.key
]
944 def _update_api_endpoints_db(self
, endpoints
: [str]):
946 Update API endpoints in Database
948 :param: List of endpoints
950 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
952 juju_info
= self
.db
.get_one(
953 DB_DATA
.api_endpoints
.table
,
954 q_filter
=DB_DATA
.api_endpoints
.filter,
957 # If it doesn't, then create it
961 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
963 except DbException
as e
:
964 # Racing condition: check if another N2VC worker has created it
965 juju_info
= self
.db
.get_one(
966 DB_DATA
.api_endpoints
.table
,
967 q_filter
=DB_DATA
.api_endpoints
.filter,
973 DB_DATA
.api_endpoints
.table
,
974 DB_DATA
.api_endpoints
.filter,
975 {DB_DATA
.api_endpoints
.key
: endpoints
},
978 def handle_exception(self
, loop
, context
):
979 # All unhandled exceptions by libjuju are handled here.
982 async def health_check(self
, interval
: float = 300.0):
984 Health check to make sure controller and controller_model connections are OK
986 :param: interval: Time in seconds between checks
990 controller
= await self
.get_controller()
991 # self.log.debug("VCA is alive")
992 except Exception as e
:
993 self
.log
.error("Health check to VCA failed: {}".format(e
))
995 await self
.disconnect_controller(controller
)
996 await asyncio
.sleep(interval
)
998 async def list_models(self
, contains
: str = None) -> [str]:
999 """List models with certain names
1001 :param: contains: String that is contained in model name
1003 :retur: [models] Returns list of model names
1006 controller
= await self
.get_controller()
1008 models
= await controller
.list_models()
1010 models
= [model
for model
in models
if contains
in model
]
1013 await self
.disconnect_controller(controller
)