1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.client
._definitions
import FullStatus
26 from n2vc
.juju_watcher
import JujuModelWatcher
27 from n2vc
.provisioner
import AsyncSSHProvisioner
28 from n2vc
.n2vc_conn
import N2VCConnector
29 from n2vc
.exceptions
import (
31 JujuApplicationNotFound
,
32 JujuLeaderUnitNotFound
,
34 JujuModelAlreadyExists
,
35 JujuControllerFailedConnecting
,
36 JujuApplicationExists
,
38 from n2vc
.utils
import DB_DATA
39 from osm_common
.dbbase
import DbException
50 loop
: asyncio
.AbstractEventLoop
= None,
51 log
: logging
.Logger
= None,
53 n2vc
: N2VCConnector
= None,
54 apt_mirror
: str = None,
55 enable_os_upgrade
: bool = True,
60 :param: endpoint: Endpoint of the juju controller (host:port)
61 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
62 :param: username: Juju username
63 :param: password: Juju password
64 :param: cacert: Juju CA Certificate
65 :param: loop: Asyncio loop
68 :param: n2vc: N2VC object
69 :param: apt_mirror: APT Mirror
70 :param: enable_os_upgrade: Enable OS Upgrade
73 self
.log
= log
or logging
.getLogger("Libjuju")
75 db_endpoints
= self
._get
_api
_endpoints
_db
()
76 self
.endpoints
= db_endpoints
or [endpoint
]
77 if db_endpoints
is None:
78 self
._update
_api
_endpoints
_db
(self
.endpoints
)
79 self
.api_proxy
= api_proxy
80 self
.username
= username
81 self
.password
= password
83 self
.loop
= loop
or asyncio
.get_event_loop()
86 # Generate config for models
87 self
.model_config
= {}
89 self
.model_config
["apt-mirror"] = apt_mirror
90 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
91 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
93 self
.loop
.set_exception_handler(self
.handle_exception
)
94 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
97 self
.log
.debug("Libjuju initialized!")
99 self
.health_check_task
= self
.loop
.create_task(self
.health_check())
101 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
105 :param: timeout: Time in seconds to wait for controller to connect
109 controller
= Controller(loop
=self
.loop
)
110 await asyncio
.wait_for(
112 endpoint
=self
.endpoints
,
113 username
=self
.username
,
114 password
=self
.password
,
119 endpoints
= await controller
.api_endpoints
120 if self
.endpoints
!= endpoints
:
121 self
.endpoints
= endpoints
122 self
._update
_api
_endpoints
_db
(self
.endpoints
)
124 except asyncio
.CancelledError
as e
:
126 except Exception as e
:
128 "Failed connecting to controller: {}...".format(self
.endpoints
)
131 await self
.disconnect_controller(controller
)
132 raise JujuControllerFailedConnecting(e
)
134 async def disconnect(self
):
136 # Cancel health check task
137 self
.health_check_task
.cancel()
138 self
.log
.debug("Libjuju disconnected!")
140 async def disconnect_model(self
, model
: Model
):
144 :param: model: Model that will be disconnected
146 await model
.disconnect()
148 async def disconnect_controller(self
, controller
: Controller
):
150 Disconnect controller
152 :param: controller: Controller that will be disconnected
154 await controller
.disconnect()
156 async def add_model(self
, model_name
: str, cloud_name
: str):
160 :param: model_name: Model name
161 :param: cloud_name: Cloud name
165 controller
= await self
.get_controller()
168 # Raise exception if model already exists
169 if await self
.model_exists(model_name
, controller
=controller
):
170 raise JujuModelAlreadyExists(
171 "Model {} already exists.".format(model_name
)
174 # Block until other workers have finished model creation
175 while self
.creating_model
.locked():
176 await asyncio
.sleep(0.1)
178 # If the model exists, return it from the controller
179 if model_name
in self
.models
:
183 async with self
.creating_model
:
184 self
.log
.debug("Creating model {}".format(model_name
))
185 model
= await controller
.add_model(
187 config
=self
.model_config
,
188 cloud_name
=cloud_name
,
189 credential_name
=cloud_name
,
191 self
.models
.add(model_name
)
194 await self
.disconnect_model(model
)
195 await self
.disconnect_controller(controller
)
198 self
, controller
: Controller
, model_name
: str, id=None
201 Get model from controller
203 :param: controller: Controller
204 :param: model_name: Model name
206 :return: Model: The created Juju model object
208 return await controller
.get_model(model_name
)
210 async def model_exists(
211 self
, model_name
: str, controller
: Controller
= None
214 Check if model exists
216 :param: controller: Controller
217 :param: model_name: Model name
221 need_to_disconnect
= False
223 # Get controller if not passed
225 controller
= await self
.get_controller()
226 need_to_disconnect
= True
228 # Check if model exists
230 return model_name
in await controller
.list_models()
232 if need_to_disconnect
:
233 await self
.disconnect_controller(controller
)
235 async def get_model_status(self
, model_name
: str) -> FullStatus
:
239 :param: model_name: Model name
241 :return: Full status object
243 controller
= await self
.get_controller()
244 model
= await self
.get_model(controller
, model_name
)
246 return await model
.get_status()
248 await self
.disconnect_model(model
)
249 await self
.disconnect_controller(controller
)
251 async def create_machine(
254 machine_id
: str = None,
255 db_dict
: dict = None,
256 progress_timeout
: float = None,
257 total_timeout
: float = None,
258 series
: str = "xenial",
260 ) -> (Machine
, bool):
264 :param: model_name: Model name
265 :param: machine_id: Machine id
266 :param: db_dict: Dictionary with data of the DB to write the updates
267 :param: progress_timeout: Maximum time between two updates in the model
268 :param: total_timeout: Timeout for the entity to be active
269 :param: series: Series of the machine (xenial, bionic, focal, ...)
270 :param: wait: Wait until machine is ready
272 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
273 if the machine is new or it already existed
279 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
283 controller
= await self
.get_controller()
286 model
= await self
.get_model(controller
, model_name
)
288 if machine_id
is not None:
290 "Searching machine (id={}) in model {}".format(
291 machine_id
, model_name
295 # Get machines from model and get the machine with machine_id if exists
296 machines
= await model
.get_machines()
297 if machine_id
in machines
:
299 "Machine (id={}) found in model {}".format(
300 machine_id
, model_name
303 machine
= machines
[machine_id
]
305 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
308 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
311 machine
= await model
.add_machine(
312 spec
=None, constraints
=None, disks
=None, series
=series
316 # Wait until the machine is ready
318 "Wait until machine {} is ready in model {}".format(
319 machine
.entity_id
, model_name
323 await JujuModelWatcher
.wait_for(
326 progress_timeout
=progress_timeout
,
327 total_timeout
=total_timeout
,
332 await self
.disconnect_model(model
)
333 await self
.disconnect_controller(controller
)
336 "Machine {} ready at {} in model {}".format(
337 machine
.entity_id
, machine
.dns_name
, model_name
342 async def provision_machine(
347 private_key_path
: str,
348 db_dict
: dict = None,
349 progress_timeout
: float = None,
350 total_timeout
: float = None,
353 Manually provisioning of a machine
355 :param: model_name: Model name
356 :param: hostname: IP to access the machine
357 :param: username: Username to login to the machine
358 :param: private_key_path: Local path for the private key
359 :param: db_dict: Dictionary with data of the DB to write the updates
360 :param: progress_timeout: Maximum time between two updates in the model
361 :param: total_timeout: Timeout for the entity to be active
363 :return: (Entity): Machine id
366 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
367 model_name
, hostname
, username
372 controller
= await self
.get_controller()
375 model
= await self
.get_model(controller
, model_name
)
379 provisioner
= AsyncSSHProvisioner(
382 private_key_path
=private_key_path
,
387 params
= await provisioner
.provision_machine()
389 params
.jobs
= ["JobHostUnits"]
391 self
.log
.debug("Adding machine to model")
392 connection
= model
.connection()
393 client_facade
= client
.ClientFacade
.from_connection(connection
)
395 results
= await client_facade
.AddMachines(params
=[params
])
396 error
= results
.machines
[0].error
399 msg
= "Error adding machine: {}".format(error
.message
)
400 self
.log
.error(msg
=msg
)
401 raise ValueError(msg
)
403 machine_id
= results
.machines
[0].machine
405 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
406 asyncio
.ensure_future(
407 provisioner
.install_agent(
408 connection
=connection
,
410 machine_id
=machine_id
,
411 proxy
=self
.api_proxy
,
417 machine_list
= await model
.get_machines()
418 if machine_id
in machine_list
:
419 self
.log
.debug("Machine {} found in model!".format(machine_id
))
420 machine
= model
.machines
.get(machine_id
)
422 await asyncio
.sleep(2)
425 msg
= "Machine {} not found in model".format(machine_id
)
426 self
.log
.error(msg
=msg
)
427 raise JujuMachineNotFound(msg
)
430 "Wait until machine {} is ready in model {}".format(
431 machine
.entity_id
, model_name
434 await JujuModelWatcher
.wait_for(
437 progress_timeout
=progress_timeout
,
438 total_timeout
=total_timeout
,
442 except Exception as e
:
445 await self
.disconnect_model(model
)
446 await self
.disconnect_controller(controller
)
449 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
454 async def deploy_charm(
456 application_name
: str,
460 db_dict
: dict = None,
461 progress_timeout
: float = None,
462 total_timeout
: float = None,
469 :param: application_name: Application name
470 :param: path: Local path to the charm
471 :param: model_name: Model name
472 :param: machine_id ID of the machine
473 :param: db_dict: Dictionary with data of the DB to write the updates
474 :param: progress_timeout: Maximum time between two updates in the model
475 :param: total_timeout: Timeout for the entity to be active
476 :param: config: Config for the charm
477 :param: series: Series of the charm
478 :param: num_units: Number of units
480 :return: (juju.application.Application): Juju application
483 "Deploying charm {} to machine {} in model ~{}".format(
484 application_name
, machine_id
, model_name
487 self
.log
.debug("charm: {}".format(path
))
490 controller
= await self
.get_controller()
493 model
= await self
.get_model(controller
, model_name
)
497 if application_name
not in model
.applications
:
499 if machine_id
is not None:
500 if machine_id
not in model
.machines
:
501 msg
= "Machine {} not found in model".format(machine_id
)
502 self
.log
.error(msg
=msg
)
503 raise JujuMachineNotFound(msg
)
504 machine
= model
.machines
[machine_id
]
505 series
= machine
.series
507 application
= await model
.deploy(
509 application_name
=application_name
,
518 "Wait until application {} is ready in model {}".format(
519 application_name
, model_name
523 for _
in range(num_units
- 1):
524 m
, _
= await self
.create_machine(model_name
, wait
=False)
525 await application
.add_unit(to
=m
.entity_id
)
527 await JujuModelWatcher
.wait_for(
530 progress_timeout
=progress_timeout
,
531 total_timeout
=total_timeout
,
536 "Application {} is ready in model {}".format(
537 application_name
, model_name
541 raise JujuApplicationExists(
542 "Application {} exists".format(application_name
)
545 await self
.disconnect_model(model
)
546 await self
.disconnect_controller(controller
)
550 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
553 :param: model: Model object
554 :param: application_name: Application name
556 :return: juju.application.Application (or None if it doesn't exist)
558 if model
.applications
and application_name
in model
.applications
:
559 return model
.applications
[application_name
]
561 async def execute_action(
563 application_name
: str,
566 db_dict
: dict = None,
567 progress_timeout
: float = None,
568 total_timeout
: float = None,
573 :param: application_name: Application name
574 :param: model_name: Model name
575 :param: action_name: Name of the action
576 :param: db_dict: Dictionary with data of the DB to write the updates
577 :param: progress_timeout: Maximum time between two updates in the model
578 :param: total_timeout: Timeout for the entity to be active
580 :return: (str, str): (output and status)
583 "Executing action {} using params {}".format(action_name
, kwargs
)
586 controller
= await self
.get_controller()
589 model
= await self
.get_model(controller
, model_name
)
593 application
= self
._get
_application
(
594 model
, application_name
=application_name
,
596 if application
is None:
597 raise JujuApplicationNotFound("Cannot execute action")
601 for u
in application
.units
:
602 if await u
.is_leader_from_status():
605 raise JujuLeaderUnitNotFound("Cannot execute action: leader unit not found")
607 actions
= await application
.get_actions()
609 if action_name
not in actions
:
610 raise JujuActionNotFound(
611 "Action {} not in available actions".format(action_name
)
614 action
= await unit
.run_action(action_name
, **kwargs
)
617 "Wait until action {} is completed in application {} (model={})".format(
618 action_name
, application_name
, model_name
621 await JujuModelWatcher
.wait_for(
624 progress_timeout
=progress_timeout
,
625 total_timeout
=total_timeout
,
630 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
631 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
633 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
637 "Action {} completed with status {} in application {} (model={})".format(
638 action_name
, action
.status
, application_name
, model_name
642 await self
.disconnect_model(model
)
643 await self
.disconnect_controller(controller
)
645 return output
, status
647 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
648 """Get list of actions
650 :param: application_name: Application name
651 :param: model_name: Model name
653 :return: Dict with this format
655 "action_name": "Description of the action",
660 "Getting list of actions for application {}".format(application_name
)
664 controller
= await self
.get_controller()
667 model
= await self
.get_model(controller
, model_name
)
671 application
= self
._get
_application
(
672 model
, application_name
=application_name
,
675 # Return list of actions
676 return await application
.get_actions()
679 # Disconnect from model and controller
680 await self
.disconnect_model(model
)
681 await self
.disconnect_controller(controller
)
683 async def add_relation(
686 application_name_1
: str,
687 application_name_2
: str,
693 :param: model_name: Model name
694 :param: application_name_1 First application name
695 :param: application_name_2: Second application name
696 :param: relation_1: First relation name
697 :param: relation_2: Second relation name
700 self
.log
.debug("Adding relation: {} -> {}".format(relation_1
, relation_2
))
703 controller
= await self
.get_controller()
706 model
= await self
.get_model(controller
, model_name
)
708 # Build relation strings
709 r1
= "{}:{}".format(application_name_1
, relation_1
)
710 r2
= "{}:{}".format(application_name_2
, relation_2
)
714 await model
.add_relation(relation1
=r1
, relation2
=r2
)
715 except JujuAPIError
as e
:
716 if "not found" in e
.message
:
717 self
.log
.warning("Relation not found: {}".format(e
.message
))
719 if "already exists" in e
.message
:
720 self
.log
.warning("Relation already exists: {}".format(e
.message
))
722 # another exception, raise it
725 await self
.disconnect_model(model
)
726 await self
.disconnect_controller(controller
)
728 async def destroy_model(self
, model_name
: str, total_timeout
: float):
732 :param: model_name: Model name
733 :param: total_timeout: Timeout
736 controller
= await self
.get_controller()
737 model
= await self
.get_model(controller
, model_name
)
739 self
.log
.debug("Destroying model {}".format(model_name
))
740 uuid
= model
.info
.uuid
743 machines
= await model
.get_machines()
744 for machine_id
in machines
:
746 await self
.destroy_machine(
747 model
, machine_id
=machine_id
, total_timeout
=total_timeout
,
749 except asyncio
.CancelledError
:
755 await self
.disconnect_model(model
)
758 if model_name
in self
.models
:
759 self
.models
.remove(model_name
)
761 await controller
.destroy_model(uuid
)
763 # Wait until model is destroyed
764 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
767 if total_timeout
is None:
769 end
= time
.time() + total_timeout
770 while time
.time() < end
:
772 models
= await controller
.list_models()
773 if model_name
not in models
:
775 "The model {} ({}) was destroyed".format(model_name
, uuid
)
778 except asyncio
.CancelledError
:
780 except Exception as e
:
782 await asyncio
.sleep(5)
784 "Timeout waiting for model {} to be destroyed {}".format(
785 model_name
, last_exception
789 await self
.disconnect_controller(controller
)
791 async def destroy_application(self
, model
: Model
, application_name
: str):
795 :param: model: Model object
796 :param: application_name: Application name
799 "Destroying application {} in model {}".format(
800 application_name
, model
.info
.name
803 application
= model
.applications
.get(application_name
)
805 await application
.destroy()
807 self
.log
.warning("Application not found: {}".format(application_name
))
809 async def destroy_machine(
810 self
, model
: Model
, machine_id
: str, total_timeout
: float = 3600
815 :param: model: Model object
816 :param: machine_id: Machine id
817 :param: total_timeout: Timeout in seconds
819 machines
= await model
.get_machines()
820 if machine_id
in machines
:
821 machine
= machines
[machine_id
]
822 await machine
.destroy(force
=True)
824 end
= time
.time() + total_timeout
826 # wait for machine removal
827 machines
= await model
.get_machines()
828 while machine_id
in machines
and time
.time() < end
:
830 "Waiting for machine {} is destroyed".format(machine_id
)
832 await asyncio
.sleep(0.5)
833 machines
= await model
.get_machines()
834 self
.log
.debug("Machine destroyed: {}".format(machine_id
))
836 self
.log
.debug("Machine not found: {}".format(machine_id
))
838 async def configure_application(
839 self
, model_name
: str, application_name
: str, config
: dict = None
841 """Configure application
843 :param: model_name: Model name
844 :param: application_name: Application name
845 :param: config: Config to apply to the charm
847 self
.log
.debug("Configuring application {}".format(application_name
))
851 controller
= await self
.get_controller()
852 model
= await self
.get_model(controller
, model_name
)
853 application
= self
._get
_application
(
854 model
, application_name
=application_name
,
856 await application
.set_config(config
)
858 await self
.disconnect_model(model
)
859 await self
.disconnect_controller(controller
)
861 def _get_api_endpoints_db(self
) -> [str]:
863 Get API Endpoints from DB
865 :return: List of API endpoints
867 self
.log
.debug("Getting endpoints from database")
869 juju_info
= self
.db
.get_one(
870 DB_DATA
.api_endpoints
.table
,
871 q_filter
=DB_DATA
.api_endpoints
.filter,
874 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
875 return juju_info
[DB_DATA
.api_endpoints
.key
]
877 def _update_api_endpoints_db(self
, endpoints
: [str]):
879 Update API endpoints in Database
881 :param: List of endpoints
883 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
885 juju_info
= self
.db
.get_one(
886 DB_DATA
.api_endpoints
.table
,
887 q_filter
=DB_DATA
.api_endpoints
.filter,
890 # If it doesn't, then create it
894 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
896 except DbException
as e
:
897 # Racing condition: check if another N2VC worker has created it
898 juju_info
= self
.db
.get_one(
899 DB_DATA
.api_endpoints
.table
,
900 q_filter
=DB_DATA
.api_endpoints
.filter,
906 DB_DATA
.api_endpoints
.table
,
907 DB_DATA
.api_endpoints
.filter,
908 {DB_DATA
.api_endpoints
.key
: endpoints
},
911 def handle_exception(self
, loop
, context
):
912 # All unhandled exceptions by libjuju are handled here.
915 async def health_check(self
, interval
: float = 300.0):
917 Health check to make sure controller and controller_model connections are OK
919 :param: interval: Time in seconds between checks
923 controller
= await self
.get_controller()
924 # self.log.debug("VCA is alive")
925 except Exception as e
:
926 self
.log
.error("Health check to VCA failed: {}".format(e
))
928 await self
.disconnect_controller(controller
)
929 await asyncio
.sleep(interval
)
931 async def list_models(self
, contains
: str = None) -> [str]:
932 """List models with certain names
934 :param: contains: String that is contained in model name
936 :retur: [models] Returns list of model names
939 controller
= await self
.get_controller()
941 models
= await controller
.list_models()
943 models
= [model
for model
in models
if contains
in model
]
946 await self
.disconnect_controller(controller
)