1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.client
._definitions
import FullStatus
26 from n2vc
.juju_watcher
import JujuModelWatcher
27 from n2vc
.provisioner
import AsyncSSHProvisioner
28 from n2vc
.n2vc_conn
import N2VCConnector
29 from n2vc
.exceptions
import (
31 JujuApplicationNotFound
,
32 JujuModelAlreadyExists
,
33 JujuControllerFailedConnecting
,
34 JujuApplicationExists
,
36 from n2vc
.utils
import DB_DATA
37 from osm_common
.dbbase
import DbException
48 loop
: asyncio
.AbstractEventLoop
= None,
49 log
: logging
.Logger
= None,
51 n2vc
: N2VCConnector
= None,
52 apt_mirror
: str = None,
53 enable_os_upgrade
: bool = True,
58 :param: endpoint: Endpoint of the juju controller (host:port)
59 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
60 :param: username: Juju username
61 :param: password: Juju password
62 :param: cacert: Juju CA Certificate
63 :param: loop: Asyncio loop
66 :param: n2vc: N2VC object
67 :param: apt_mirror: APT Mirror
68 :param: enable_os_upgrade: Enable OS Upgrade
71 self
.log
= log
or logging
.getLogger("Libjuju")
73 db_endpoints
= self
._get
_api
_endpoints
_db
()
74 self
.endpoints
= db_endpoints
or [endpoint
]
75 if db_endpoints
is None:
76 self
._update
_api
_endpoints
_db
(self
.endpoints
)
77 self
.api_proxy
= api_proxy
78 self
.username
= username
79 self
.password
= password
81 self
.loop
= loop
or asyncio
.get_event_loop()
84 # Generate config for models
85 self
.model_config
= {}
87 self
.model_config
["apt-mirror"] = apt_mirror
88 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
89 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
91 self
.loop
.set_exception_handler(self
.handle_exception
)
92 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
95 self
.log
.debug("Libjuju initialized!")
97 self
.health_check_task
= self
.loop
.create_task(self
.health_check())
99 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
103 :param: timeout: Time in seconds to wait for controller to connect
107 controller
= Controller(loop
=self
.loop
)
108 await asyncio
.wait_for(
110 endpoint
=self
.endpoints
,
111 username
=self
.username
,
112 password
=self
.password
,
117 endpoints
= await controller
.api_endpoints
118 if self
.endpoints
!= endpoints
:
119 self
.endpoints
= endpoints
120 self
._update
_api
_endpoints
_db
(self
.endpoints
)
122 except asyncio
.CancelledError
as e
:
124 except Exception as e
:
126 "Failed connecting to controller: {}...".format(self
.endpoints
)
129 await self
.disconnect_controller(controller
)
130 raise JujuControllerFailedConnecting(e
)
132 async def disconnect(self
):
134 # Cancel health check task
135 self
.health_check_task
.cancel()
136 self
.log
.debug("Libjuju disconnected!")
138 async def disconnect_model(self
, model
: Model
):
142 :param: model: Model that will be disconnected
144 await model
.disconnect()
146 async def disconnect_controller(self
, controller
: Controller
):
148 Disconnect controller
150 :param: controller: Controller that will be disconnected
152 await controller
.disconnect()
154 async def add_model(self
, model_name
: str, cloud_name
: str):
158 :param: model_name: Model name
159 :param: cloud_name: Cloud name
163 controller
= await self
.get_controller()
166 # Raise exception if model already exists
167 if await self
.model_exists(model_name
, controller
=controller
):
168 raise JujuModelAlreadyExists(
169 "Model {} already exists.".format(model_name
)
172 # Block until other workers have finished model creation
173 while self
.creating_model
.locked():
174 await asyncio
.sleep(0.1)
176 # If the model exists, return it from the controller
177 if model_name
in self
.models
:
181 async with self
.creating_model
:
182 self
.log
.debug("Creating model {}".format(model_name
))
183 model
= await controller
.add_model(
185 config
=self
.model_config
,
186 cloud_name
=cloud_name
,
187 credential_name
=cloud_name
,
189 self
.models
.add(model_name
)
192 await self
.disconnect_model(model
)
193 await self
.disconnect_controller(controller
)
196 self
, controller
: Controller
, model_name
: str, id=None
199 Get model from controller
201 :param: controller: Controller
202 :param: model_name: Model name
204 :return: Model: The created Juju model object
206 return await controller
.get_model(model_name
)
208 async def model_exists(
209 self
, model_name
: str, controller
: Controller
= None
212 Check if model exists
214 :param: controller: Controller
215 :param: model_name: Model name
219 need_to_disconnect
= False
221 # Get controller if not passed
223 controller
= await self
.get_controller()
224 need_to_disconnect
= True
226 # Check if model exists
228 return model_name
in await controller
.list_models()
230 if need_to_disconnect
:
231 await self
.disconnect_controller(controller
)
233 async def get_model_status(self
, model_name
: str) -> FullStatus
:
237 :param: model_name: Model name
239 :return: Full status object
241 controller
= await self
.get_controller()
242 model
= await self
.get_model(controller
, model_name
)
244 return await model
.get_status()
246 await self
.disconnect_model(model
)
247 await self
.disconnect_controller(controller
)
249 async def create_machine(
252 machine_id
: str = None,
253 db_dict
: dict = None,
254 progress_timeout
: float = None,
255 total_timeout
: float = None,
256 series
: str = "xenial",
258 ) -> (Machine
, bool):
262 :param: model_name: Model name
263 :param: machine_id: Machine id
264 :param: db_dict: Dictionary with data of the DB to write the updates
265 :param: progress_timeout: Maximum time between two updates in the model
266 :param: total_timeout: Timeout for the entity to be active
267 :param: series: Series of the machine (xenial, bionic, focal, ...)
268 :param: wait: Wait until machine is ready
270 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
271 if the machine is new or it already existed
277 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
281 controller
= await self
.get_controller()
284 model
= await self
.get_model(controller
, model_name
)
286 if machine_id
is not None:
288 "Searching machine (id={}) in model {}".format(
289 machine_id
, model_name
293 # Get machines from model and get the machine with machine_id if exists
294 machines
= await model
.get_machines()
295 if machine_id
in machines
:
297 "Machine (id={}) found in model {}".format(
298 machine_id
, model_name
301 machine
= model
.machines
[machine_id
]
303 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
306 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
309 machine
= await model
.add_machine(
310 spec
=None, constraints
=None, disks
=None, series
=series
314 # Wait until the machine is ready
316 "Wait until machine {} is ready in model {}".format(
317 machine
.entity_id
, model_name
321 await JujuModelWatcher
.wait_for(
324 progress_timeout
=progress_timeout
,
325 total_timeout
=total_timeout
,
330 await self
.disconnect_model(model
)
331 await self
.disconnect_controller(controller
)
334 "Machine {} ready at {} in model {}".format(
335 machine
.entity_id
, machine
.dns_name
, model_name
340 async def provision_machine(
345 private_key_path
: str,
346 db_dict
: dict = None,
347 progress_timeout
: float = None,
348 total_timeout
: float = None,
351 Manually provisioning of a machine
353 :param: model_name: Model name
354 :param: hostname: IP to access the machine
355 :param: username: Username to login to the machine
356 :param: private_key_path: Local path for the private key
357 :param: db_dict: Dictionary with data of the DB to write the updates
358 :param: progress_timeout: Maximum time between two updates in the model
359 :param: total_timeout: Timeout for the entity to be active
361 :return: (Entity): Machine id
364 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
365 model_name
, hostname
, username
370 controller
= await self
.get_controller()
373 model
= await self
.get_model(controller
, model_name
)
377 provisioner
= AsyncSSHProvisioner(
380 private_key_path
=private_key_path
,
385 params
= await provisioner
.provision_machine()
387 params
.jobs
= ["JobHostUnits"]
389 self
.log
.debug("Adding machine to model")
390 connection
= model
.connection()
391 client_facade
= client
.ClientFacade
.from_connection(connection
)
393 results
= await client_facade
.AddMachines(params
=[params
])
394 error
= results
.machines
[0].error
397 msg
= "Error adding machine: {}".format(error
.message
)
398 self
.log
.error(msg
=msg
)
399 raise ValueError(msg
)
401 machine_id
= results
.machines
[0].machine
403 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
404 asyncio
.ensure_future(
405 provisioner
.install_agent(
406 connection
=connection
,
408 machine_id
=machine_id
,
415 machine_list
= await model
.get_machines()
416 if machine_id
in machine_list
:
417 self
.log
.debug("Machine {} found in model!".format(machine_id
))
418 machine
= model
.machines
.get(machine_id
)
420 await asyncio
.sleep(2)
423 msg
= "Machine {} not found in model".format(machine_id
)
424 self
.log
.error(msg
=msg
)
425 raise JujuMachineNotFound(msg
)
428 "Wait until machine {} is ready in model {}".format(
429 machine
.entity_id
, model_name
432 await JujuModelWatcher
.wait_for(
435 progress_timeout
=progress_timeout
,
436 total_timeout
=total_timeout
,
440 except Exception as e
:
443 await self
.disconnect_model(model
)
444 await self
.disconnect_controller(controller
)
447 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
452 async def deploy_charm(
454 application_name
: str,
458 db_dict
: dict = None,
459 progress_timeout
: float = None,
460 total_timeout
: float = None,
467 :param: application_name: Application name
468 :param: path: Local path to the charm
469 :param: model_name: Model name
470 :param: machine_id ID of the machine
471 :param: db_dict: Dictionary with data of the DB to write the updates
472 :param: progress_timeout: Maximum time between two updates in the model
473 :param: total_timeout: Timeout for the entity to be active
474 :param: config: Config for the charm
475 :param: series: Series of the charm
476 :param: num_units: Number of units
478 :return: (juju.application.Application): Juju application
481 "Deploying charm {} to machine {} in model ~{}".format(
482 application_name
, machine_id
, model_name
485 self
.log
.debug("charm: {}".format(path
))
488 controller
= await self
.get_controller()
491 model
= await self
.get_model(controller
, model_name
)
495 if application_name
not in model
.applications
:
497 if machine_id
is not None:
498 if machine_id
not in model
.machines
:
499 msg
= "Machine {} not found in model".format(machine_id
)
500 self
.log
.error(msg
=msg
)
501 raise JujuMachineNotFound(msg
)
502 machine
= model
.machines
[machine_id
]
503 series
= machine
.series
505 application
= await model
.deploy(
507 application_name
=application_name
,
516 "Wait until application {} is ready in model {}".format(
517 application_name
, model_name
521 for _
in range(num_units
- 1):
522 m
, _
= await self
.create_machine(model_name
, wait
=False)
523 await application
.add_unit(to
=m
.entity_id
)
525 await JujuModelWatcher
.wait_for(
528 progress_timeout
=progress_timeout
,
529 total_timeout
=total_timeout
,
534 "Application {} is ready in model {}".format(
535 application_name
, model_name
539 raise JujuApplicationExists(
540 "Application {} exists".format(application_name
)
543 await self
.disconnect_model(model
)
544 await self
.disconnect_controller(controller
)
548 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
551 :param: model: Model object
552 :param: application_name: Application name
554 :return: juju.application.Application (or None if it doesn't exist)
556 if model
.applications
and application_name
in model
.applications
:
557 return model
.applications
[application_name
]
559 async def execute_action(
561 application_name
: str,
564 db_dict
: dict = None,
565 progress_timeout
: float = None,
566 total_timeout
: float = None,
571 :param: application_name: Application name
572 :param: model_name: Model name
573 :param: cloud_name: Cloud name
574 :param: action_name: Name of the action
575 :param: db_dict: Dictionary with data of the DB to write the updates
576 :param: progress_timeout: Maximum time between two updates in the model
577 :param: total_timeout: Timeout for the entity to be active
579 :return: (str, str): (output and status)
582 "Executing action {} using params {}".format(action_name
, kwargs
)
585 controller
= await self
.get_controller()
588 model
= await self
.get_model(controller
, model_name
)
592 application
= self
._get
_application
(
593 model
, application_name
=application_name
,
595 if application
is None:
596 raise JujuApplicationNotFound("Cannot execute action")
600 for u
in application
.units
:
601 if await u
.is_leader_from_status():
604 raise Exception("Cannot execute action: leader unit not found")
606 actions
= await application
.get_actions()
608 if action_name
not in actions
:
610 "Action {} not in available actions".format(action_name
)
613 action
= await unit
.run_action(action_name
, **kwargs
)
616 "Wait until action {} is completed in application {} (model={})".format(
617 action_name
, application_name
, model_name
620 await JujuModelWatcher
.wait_for(
623 progress_timeout
=progress_timeout
,
624 total_timeout
=total_timeout
,
629 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
630 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
632 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
636 "Action {} completed with status {} in application {} (model={})".format(
637 action_name
, action
.status
, application_name
, model_name
640 except Exception as e
:
643 await self
.disconnect_model(model
)
644 await self
.disconnect_controller(controller
)
646 return output
, status
648 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
649 """Get list of actions
651 :param: application_name: Application name
652 :param: model_name: Model name
654 :return: Dict with this format
656 "action_name": "Description of the action",
661 "Getting list of actions for application {}".format(application_name
)
665 controller
= await self
.get_controller()
668 model
= await self
.get_model(controller
, model_name
)
672 application
= self
._get
_application
(
673 model
, application_name
=application_name
,
676 # Return list of actions
677 return await application
.get_actions()
680 # Disconnect from model and controller
681 await self
.disconnect_model(model
)
682 await self
.disconnect_controller(controller
)
684 async def add_relation(
687 application_name_1
: str,
688 application_name_2
: str,
694 :param: model_name: Model name
695 :param: application_name_1 First application name
696 :param: application_name_2: Second application name
697 :param: relation_1: First relation name
698 :param: relation_2: Second relation name
701 self
.log
.debug("Adding relation: {} -> {}".format(relation_1
, relation_2
))
704 controller
= await self
.get_controller()
707 model
= await self
.get_model(controller
, model_name
)
709 # Build relation strings
710 r1
= "{}:{}".format(application_name_1
, relation_1
)
711 r2
= "{}:{}".format(application_name_2
, relation_2
)
715 await model
.add_relation(relation1
=r1
, relation2
=r2
)
716 except JujuAPIError
as e
:
717 if "not found" in e
.message
:
718 self
.log
.warning("Relation not found: {}".format(e
.message
))
720 if "already exists" in e
.message
:
721 self
.log
.warning("Relation already exists: {}".format(e
.message
))
723 # another exception, raise it
726 await self
.disconnect_model(model
)
727 await self
.disconnect_controller(controller
)
729 async def destroy_model(self
, model_name
: str, total_timeout
: float):
733 :param: model_name: Model name
734 :param: total_timeout: Timeout
737 controller
= await self
.get_controller()
738 model
= await self
.get_model(controller
, model_name
)
740 self
.log
.debug("Destroying model {}".format(model_name
))
741 uuid
= model
.info
.uuid
744 machines
= await model
.get_machines()
745 for machine_id
in machines
:
747 await self
.destroy_machine(
748 model
, machine_id
=machine_id
, total_timeout
=total_timeout
,
750 except asyncio
.CancelledError
:
756 await self
.disconnect_model(model
)
759 if model_name
in self
.models
:
760 self
.models
.remove(model_name
)
762 await controller
.destroy_model(uuid
)
764 # Wait until model is destroyed
765 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
768 if total_timeout
is None:
770 end
= time
.time() + total_timeout
771 while time
.time() < end
:
773 models
= await controller
.list_models()
774 if model_name
not in models
:
776 "The model {} ({}) was destroyed".format(model_name
, uuid
)
779 except asyncio
.CancelledError
:
781 except Exception as e
:
783 await asyncio
.sleep(5)
785 "Timeout waiting for model {} to be destroyed {}".format(
786 model_name
, last_exception
790 await self
.disconnect_controller(controller
)
792 async def destroy_application(self
, model
: Model
, application_name
: str):
796 :param: model: Model object
797 :param: application_name: Application name
800 "Destroying application {} in model {}".format(
801 application_name
, model
.info
.name
804 application
= model
.applications
.get(application_name
)
806 await application
.destroy()
808 self
.log
.warning("Application not found: {}".format(application_name
))
810 async def destroy_machine(
811 self
, model
: Model
, machine_id
: str, total_timeout
: float = 3600
816 :param: model: Model object
817 :param: machine_id: Machine id
818 :param: total_timeout: Timeout in seconds
820 machines
= await model
.get_machines()
821 if machine_id
in machines
:
822 machine
= model
.machines
[machine_id
]
823 await machine
.destroy(force
=True)
825 end
= time
.time() + total_timeout
827 # wait for machine removal
828 machines
= await model
.get_machines()
829 while machine_id
in machines
and time
.time() < end
:
831 "Waiting for machine {} is destroyed".format(machine_id
)
833 await asyncio
.sleep(0.5)
834 machines
= await model
.get_machines()
835 self
.log
.debug("Machine destroyed: {}".format(machine_id
))
837 self
.log
.debug("Machine not found: {}".format(machine_id
))
839 async def configure_application(
840 self
, model_name
: str, application_name
: str, config
: dict = None
842 """Configure application
844 :param: model_name: Model name
845 :param: application_name: Application name
846 :param: config: Config to apply to the charm
848 self
.log
.debug("Configuring application {}".format(application_name
))
852 controller
= await self
.get_controller()
853 model
= await self
.get_model(controller
, model_name
)
854 application
= self
._get
_application
(
855 model
, application_name
=application_name
,
857 await application
.set_config(config
)
859 await self
.disconnect_model(model
)
860 await self
.disconnect_controller(controller
)
862 def _get_api_endpoints_db(self
) -> [str]:
864 Get API Endpoints from DB
866 :return: List of API endpoints
868 self
.log
.debug("Getting endpoints from database")
870 juju_info
= self
.db
.get_one(
871 DB_DATA
.api_endpoints
.table
,
872 q_filter
=DB_DATA
.api_endpoints
.filter,
875 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
876 return juju_info
[DB_DATA
.api_endpoints
.key
]
878 def _update_api_endpoints_db(self
, endpoints
: [str]):
880 Update API endpoints in Database
882 :param: List of endpoints
884 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
886 juju_info
= self
.db
.get_one(
887 DB_DATA
.api_endpoints
.table
,
888 q_filter
=DB_DATA
.api_endpoints
.filter,
891 # If it doesn't, then create it
895 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
897 except DbException
as e
:
898 # Racing condition: check if another N2VC worker has created it
899 juju_info
= self
.db
.get_one(
900 DB_DATA
.api_endpoints
.table
,
901 q_filter
=DB_DATA
.api_endpoints
.filter,
907 DB_DATA
.api_endpoints
.table
,
908 DB_DATA
.api_endpoints
.filter,
909 {DB_DATA
.api_endpoints
.key
: endpoints
},
912 def handle_exception(self
, loop
, context
):
913 # All unhandled exceptions by libjuju are handled here.
916 async def health_check(self
, interval
: float = 300.0):
918 Health check to make sure controller and controller_model connections are OK
920 :param: interval: Time in seconds between checks
924 controller
= await self
.get_controller()
925 # self.log.debug("VCA is alive")
926 except Exception as e
:
927 self
.log
.error("Health check to VCA failed: {}".format(e
))
929 await self
.disconnect_controller(controller
)
930 await asyncio
.sleep(interval
)
932 async def list_models(self
, contains
: str = None) -> [str]:
933 """List models with certain names
935 :param: contains: String that is contained in model name
937 :retur: [models] Returns list of model names
940 controller
= await self
.get_controller()
942 models
= await controller
.list_models()
944 models
= [model
for model
in models
if contains
in model
]
947 await self
.disconnect_controller(controller
)