1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.client
._definitions
import FullStatus
, QueryApplicationOffersResults
26 from n2vc
.juju_watcher
import JujuModelWatcher
27 from n2vc
.provisioner
import AsyncSSHProvisioner
28 from n2vc
.n2vc_conn
import N2VCConnector
29 from n2vc
.exceptions
import (
31 JujuApplicationNotFound
,
32 JujuLeaderUnitNotFound
,
34 JujuModelAlreadyExists
,
35 JujuControllerFailedConnecting
,
36 JujuApplicationExists
,
38 from n2vc
.utils
import DB_DATA
39 from osm_common
.dbbase
import DbException
50 loop
: asyncio
.AbstractEventLoop
= None,
51 log
: logging
.Logger
= None,
53 n2vc
: N2VCConnector
= None,
54 apt_mirror
: str = None,
55 enable_os_upgrade
: bool = True,
60 :param: endpoint: Endpoint of the juju controller (host:port)
61 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
62 :param: username: Juju username
63 :param: password: Juju password
64 :param: cacert: Juju CA Certificate
65 :param: loop: Asyncio loop
68 :param: n2vc: N2VC object
69 :param: apt_mirror: APT Mirror
70 :param: enable_os_upgrade: Enable OS Upgrade
73 self
.log
= log
or logging
.getLogger("Libjuju")
75 db_endpoints
= self
._get
_api
_endpoints
_db
()
76 self
.endpoints
= db_endpoints
or [endpoint
]
77 if db_endpoints
is None:
78 self
._update
_api
_endpoints
_db
(self
.endpoints
)
79 self
.api_proxy
= api_proxy
80 self
.username
= username
81 self
.password
= password
83 self
.loop
= loop
or asyncio
.get_event_loop()
86 # Generate config for models
87 self
.model_config
= {}
89 self
.model_config
["apt-mirror"] = apt_mirror
90 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
91 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
93 self
.loop
.set_exception_handler(self
.handle_exception
)
94 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
97 self
.log
.debug("Libjuju initialized!")
99 self
.health_check_task
= self
.loop
.create_task(self
.health_check())
101 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
105 :param: timeout: Time in seconds to wait for controller to connect
109 controller
= Controller(loop
=self
.loop
)
110 await asyncio
.wait_for(
112 endpoint
=self
.endpoints
,
113 username
=self
.username
,
114 password
=self
.password
,
119 endpoints
= await controller
.api_endpoints
120 if self
.endpoints
!= endpoints
:
121 self
.endpoints
= endpoints
122 self
._update
_api
_endpoints
_db
(self
.endpoints
)
124 except asyncio
.CancelledError
as e
:
126 except Exception as e
:
128 "Failed connecting to controller: {}...".format(self
.endpoints
)
131 await self
.disconnect_controller(controller
)
132 raise JujuControllerFailedConnecting(e
)
134 async def disconnect(self
):
136 # Cancel health check task
137 self
.health_check_task
.cancel()
138 self
.log
.debug("Libjuju disconnected!")
140 async def disconnect_model(self
, model
: Model
):
144 :param: model: Model that will be disconnected
146 await model
.disconnect()
148 async def disconnect_controller(self
, controller
: Controller
):
150 Disconnect controller
152 :param: controller: Controller that will be disconnected
154 await controller
.disconnect()
156 async def add_model(self
, model_name
: str, cloud_name
: str):
160 :param: model_name: Model name
161 :param: cloud_name: Cloud name
165 controller
= await self
.get_controller()
168 # Raise exception if model already exists
169 if await self
.model_exists(model_name
, controller
=controller
):
170 raise JujuModelAlreadyExists(
171 "Model {} already exists.".format(model_name
)
174 # Block until other workers have finished model creation
175 while self
.creating_model
.locked():
176 await asyncio
.sleep(0.1)
178 # If the model exists, return it from the controller
179 if model_name
in self
.models
:
183 async with self
.creating_model
:
184 self
.log
.debug("Creating model {}".format(model_name
))
185 model
= await controller
.add_model(
187 config
=self
.model_config
,
188 cloud_name
=cloud_name
,
189 credential_name
=cloud_name
,
191 self
.models
.add(model_name
)
194 await self
.disconnect_model(model
)
195 await self
.disconnect_controller(controller
)
198 self
, controller
: Controller
, model_name
: str, id=None
201 Get model from controller
203 :param: controller: Controller
204 :param: model_name: Model name
206 :return: Model: The created Juju model object
208 return await controller
.get_model(model_name
)
210 async def model_exists(
211 self
, model_name
: str, controller
: Controller
= None
214 Check if model exists
216 :param: controller: Controller
217 :param: model_name: Model name
221 need_to_disconnect
= False
223 # Get controller if not passed
225 controller
= await self
.get_controller()
226 need_to_disconnect
= True
228 # Check if model exists
230 return model_name
in await controller
.list_models()
232 if need_to_disconnect
:
233 await self
.disconnect_controller(controller
)
235 async def models_exist(self
, model_names
: [str]) -> (bool, list):
237 Check if models exists
239 :param: model_names: List of strings with model names
241 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
245 "model_names must be a non-empty array. Given value: {}".format(model_names
)
247 non_existing_models
= []
248 models
= await self
.list_models()
249 existing_models
= list(set(models
).intersection(model_names
))
250 non_existing_models
= list(set(model_names
) - set(existing_models
))
253 len(non_existing_models
) == 0,
257 async def get_model_status(self
, model_name
: str) -> FullStatus
:
261 :param: model_name: Model name
263 :return: Full status object
265 controller
= await self
.get_controller()
266 model
= await self
.get_model(controller
, model_name
)
268 return await model
.get_status()
270 await self
.disconnect_model(model
)
271 await self
.disconnect_controller(controller
)
273 async def create_machine(
276 machine_id
: str = None,
277 db_dict
: dict = None,
278 progress_timeout
: float = None,
279 total_timeout
: float = None,
280 series
: str = "xenial",
282 ) -> (Machine
, bool):
286 :param: model_name: Model name
287 :param: machine_id: Machine id
288 :param: db_dict: Dictionary with data of the DB to write the updates
289 :param: progress_timeout: Maximum time between two updates in the model
290 :param: total_timeout: Timeout for the entity to be active
291 :param: series: Series of the machine (xenial, bionic, focal, ...)
292 :param: wait: Wait until machine is ready
294 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
295 if the machine is new or it already existed
301 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
305 controller
= await self
.get_controller()
308 model
= await self
.get_model(controller
, model_name
)
310 if machine_id
is not None:
312 "Searching machine (id={}) in model {}".format(
313 machine_id
, model_name
317 # Get machines from model and get the machine with machine_id if exists
318 machines
= await model
.get_machines()
319 if machine_id
in machines
:
321 "Machine (id={}) found in model {}".format(
322 machine_id
, model_name
325 machine
= machines
[machine_id
]
327 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
330 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
333 machine
= await model
.add_machine(
334 spec
=None, constraints
=None, disks
=None, series
=series
338 # Wait until the machine is ready
340 "Wait until machine {} is ready in model {}".format(
341 machine
.entity_id
, model_name
345 await JujuModelWatcher
.wait_for(
348 progress_timeout
=progress_timeout
,
349 total_timeout
=total_timeout
,
354 await self
.disconnect_model(model
)
355 await self
.disconnect_controller(controller
)
358 "Machine {} ready at {} in model {}".format(
359 machine
.entity_id
, machine
.dns_name
, model_name
364 async def provision_machine(
369 private_key_path
: str,
370 db_dict
: dict = None,
371 progress_timeout
: float = None,
372 total_timeout
: float = None,
375 Manually provisioning of a machine
377 :param: model_name: Model name
378 :param: hostname: IP to access the machine
379 :param: username: Username to login to the machine
380 :param: private_key_path: Local path for the private key
381 :param: db_dict: Dictionary with data of the DB to write the updates
382 :param: progress_timeout: Maximum time between two updates in the model
383 :param: total_timeout: Timeout for the entity to be active
385 :return: (Entity): Machine id
388 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
389 model_name
, hostname
, username
394 controller
= await self
.get_controller()
397 model
= await self
.get_model(controller
, model_name
)
401 provisioner
= AsyncSSHProvisioner(
404 private_key_path
=private_key_path
,
409 params
= await provisioner
.provision_machine()
411 params
.jobs
= ["JobHostUnits"]
413 self
.log
.debug("Adding machine to model")
414 connection
= model
.connection()
415 client_facade
= client
.ClientFacade
.from_connection(connection
)
417 results
= await client_facade
.AddMachines(params
=[params
])
418 error
= results
.machines
[0].error
421 msg
= "Error adding machine: {}".format(error
.message
)
422 self
.log
.error(msg
=msg
)
423 raise ValueError(msg
)
425 machine_id
= results
.machines
[0].machine
427 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
428 asyncio
.ensure_future(
429 provisioner
.install_agent(
430 connection
=connection
,
432 machine_id
=machine_id
,
433 proxy
=self
.api_proxy
,
439 machine_list
= await model
.get_machines()
440 if machine_id
in machine_list
:
441 self
.log
.debug("Machine {} found in model!".format(machine_id
))
442 machine
= model
.machines
.get(machine_id
)
444 await asyncio
.sleep(2)
447 msg
= "Machine {} not found in model".format(machine_id
)
448 self
.log
.error(msg
=msg
)
449 raise JujuMachineNotFound(msg
)
452 "Wait until machine {} is ready in model {}".format(
453 machine
.entity_id
, model_name
456 await JujuModelWatcher
.wait_for(
459 progress_timeout
=progress_timeout
,
460 total_timeout
=total_timeout
,
464 except Exception as e
:
467 await self
.disconnect_model(model
)
468 await self
.disconnect_controller(controller
)
471 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
476 async def deploy_charm(
478 application_name
: str,
482 db_dict
: dict = None,
483 progress_timeout
: float = None,
484 total_timeout
: float = None,
491 :param: application_name: Application name
492 :param: path: Local path to the charm
493 :param: model_name: Model name
494 :param: machine_id ID of the machine
495 :param: db_dict: Dictionary with data of the DB to write the updates
496 :param: progress_timeout: Maximum time between two updates in the model
497 :param: total_timeout: Timeout for the entity to be active
498 :param: config: Config for the charm
499 :param: series: Series of the charm
500 :param: num_units: Number of units
502 :return: (juju.application.Application): Juju application
505 "Deploying charm {} to machine {} in model ~{}".format(
506 application_name
, machine_id
, model_name
509 self
.log
.debug("charm: {}".format(path
))
512 controller
= await self
.get_controller()
515 model
= await self
.get_model(controller
, model_name
)
519 if application_name
not in model
.applications
:
521 if machine_id
is not None:
522 if machine_id
not in model
.machines
:
523 msg
= "Machine {} not found in model".format(machine_id
)
524 self
.log
.error(msg
=msg
)
525 raise JujuMachineNotFound(msg
)
526 machine
= model
.machines
[machine_id
]
527 series
= machine
.series
529 application
= await model
.deploy(
531 application_name
=application_name
,
540 "Wait until application {} is ready in model {}".format(
541 application_name
, model_name
545 for _
in range(num_units
- 1):
546 m
, _
= await self
.create_machine(model_name
, wait
=False)
547 await application
.add_unit(to
=m
.entity_id
)
549 await JujuModelWatcher
.wait_for(
552 progress_timeout
=progress_timeout
,
553 total_timeout
=total_timeout
,
558 "Application {} is ready in model {}".format(
559 application_name
, model_name
563 raise JujuApplicationExists(
564 "Application {} exists".format(application_name
)
567 await self
.disconnect_model(model
)
568 await self
.disconnect_controller(controller
)
572 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
575 :param: model: Model object
576 :param: application_name: Application name
578 :return: juju.application.Application (or None if it doesn't exist)
580 if model
.applications
and application_name
in model
.applications
:
581 return model
.applications
[application_name
]
583 async def execute_action(
585 application_name
: str,
588 db_dict
: dict = None,
589 progress_timeout
: float = None,
590 total_timeout
: float = None,
595 :param: application_name: Application name
596 :param: model_name: Model name
597 :param: action_name: Name of the action
598 :param: db_dict: Dictionary with data of the DB to write the updates
599 :param: progress_timeout: Maximum time between two updates in the model
600 :param: total_timeout: Timeout for the entity to be active
602 :return: (str, str): (output and status)
605 "Executing action {} using params {}".format(action_name
, kwargs
)
608 controller
= await self
.get_controller()
611 model
= await self
.get_model(controller
, model_name
)
615 application
= self
._get
_application
(
616 model
, application_name
=application_name
,
618 if application
is None:
619 raise JujuApplicationNotFound("Cannot execute action")
623 for u
in application
.units
:
624 if await u
.is_leader_from_status():
627 raise JujuLeaderUnitNotFound("Cannot execute action: leader unit not found")
629 actions
= await application
.get_actions()
631 if action_name
not in actions
:
632 raise JujuActionNotFound(
633 "Action {} not in available actions".format(action_name
)
636 action
= await unit
.run_action(action_name
, **kwargs
)
639 "Wait until action {} is completed in application {} (model={})".format(
640 action_name
, application_name
, model_name
643 await JujuModelWatcher
.wait_for(
646 progress_timeout
=progress_timeout
,
647 total_timeout
=total_timeout
,
652 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
653 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
655 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
659 "Action {} completed with status {} in application {} (model={})".format(
660 action_name
, action
.status
, application_name
, model_name
664 await self
.disconnect_model(model
)
665 await self
.disconnect_controller(controller
)
667 return output
, status
669 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
670 """Get list of actions
672 :param: application_name: Application name
673 :param: model_name: Model name
675 :return: Dict with this format
677 "action_name": "Description of the action",
682 "Getting list of actions for application {}".format(application_name
)
686 controller
= await self
.get_controller()
689 model
= await self
.get_model(controller
, model_name
)
693 application
= self
._get
_application
(
694 model
, application_name
=application_name
,
697 # Return list of actions
698 return await application
.get_actions()
701 # Disconnect from model and controller
702 await self
.disconnect_model(model
)
703 await self
.disconnect_controller(controller
)
705 async def add_relation(
713 :param: model_name: Model name
714 :param: endpoint_1 First endpoint name
715 ("app:endpoint" format or directly the saas name)
716 :param: endpoint_2: Second endpoint name (^ same format)
719 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
722 controller
= await self
.get_controller()
725 model
= await self
.get_model(controller
, model_name
)
729 await model
.add_relation(endpoint_1
, endpoint_2
)
730 except JujuAPIError
as e
:
731 if "not found" in e
.message
:
732 self
.log
.warning("Relation not found: {}".format(e
.message
))
734 if "already exists" in e
.message
:
735 self
.log
.warning("Relation already exists: {}".format(e
.message
))
737 # another exception, raise it
740 await self
.disconnect_model(model
)
741 await self
.disconnect_controller(controller
)
743 async def destroy_model(self
, model_name
: str, total_timeout
: float):
747 :param: model_name: Model name
748 :param: total_timeout: Timeout
751 controller
= await self
.get_controller()
752 model
= await self
.get_model(controller
, model_name
)
754 self
.log
.debug("Destroying model {}".format(model_name
))
755 uuid
= model
.info
.uuid
758 machines
= await model
.get_machines()
759 for machine_id
in machines
:
761 await self
.destroy_machine(
762 model
, machine_id
=machine_id
, total_timeout
=total_timeout
,
764 except asyncio
.CancelledError
:
770 await self
.disconnect_model(model
)
773 if model_name
in self
.models
:
774 self
.models
.remove(model_name
)
776 await controller
.destroy_model(uuid
)
778 # Wait until model is destroyed
779 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
782 if total_timeout
is None:
784 end
= time
.time() + total_timeout
785 while time
.time() < end
:
787 models
= await controller
.list_models()
788 if model_name
not in models
:
790 "The model {} ({}) was destroyed".format(model_name
, uuid
)
793 except asyncio
.CancelledError
:
795 except Exception as e
:
797 await asyncio
.sleep(5)
799 "Timeout waiting for model {} to be destroyed {}".format(
800 model_name
, last_exception
804 await self
.disconnect_controller(controller
)
806 async def destroy_application(self
, model
: Model
, application_name
: str):
810 :param: model: Model object
811 :param: application_name: Application name
814 "Destroying application {} in model {}".format(
815 application_name
, model
.info
.name
818 application
= model
.applications
.get(application_name
)
820 await application
.destroy()
822 self
.log
.warning("Application not found: {}".format(application_name
))
824 async def destroy_machine(
825 self
, model
: Model
, machine_id
: str, total_timeout
: float = 3600
830 :param: model: Model object
831 :param: machine_id: Machine id
832 :param: total_timeout: Timeout in seconds
834 machines
= await model
.get_machines()
835 if machine_id
in machines
:
836 machine
= machines
[machine_id
]
837 await machine
.destroy(force
=True)
839 end
= time
.time() + total_timeout
841 # wait for machine removal
842 machines
= await model
.get_machines()
843 while machine_id
in machines
and time
.time() < end
:
845 "Waiting for machine {} is destroyed".format(machine_id
)
847 await asyncio
.sleep(0.5)
848 machines
= await model
.get_machines()
849 self
.log
.debug("Machine destroyed: {}".format(machine_id
))
851 self
.log
.debug("Machine not found: {}".format(machine_id
))
853 async def configure_application(
854 self
, model_name
: str, application_name
: str, config
: dict = None
856 """Configure application
858 :param: model_name: Model name
859 :param: application_name: Application name
860 :param: config: Config to apply to the charm
862 self
.log
.debug("Configuring application {}".format(application_name
))
866 controller
= await self
.get_controller()
867 model
= await self
.get_model(controller
, model_name
)
868 application
= self
._get
_application
(
869 model
, application_name
=application_name
,
871 await application
.set_config(config
)
873 await self
.disconnect_model(model
)
874 await self
.disconnect_controller(controller
)
876 def _get_api_endpoints_db(self
) -> [str]:
878 Get API Endpoints from DB
880 :return: List of API endpoints
882 self
.log
.debug("Getting endpoints from database")
884 juju_info
= self
.db
.get_one(
885 DB_DATA
.api_endpoints
.table
,
886 q_filter
=DB_DATA
.api_endpoints
.filter,
889 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
890 return juju_info
[DB_DATA
.api_endpoints
.key
]
892 def _update_api_endpoints_db(self
, endpoints
: [str]):
894 Update API endpoints in Database
896 :param: List of endpoints
898 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
900 juju_info
= self
.db
.get_one(
901 DB_DATA
.api_endpoints
.table
,
902 q_filter
=DB_DATA
.api_endpoints
.filter,
905 # If it doesn't, then create it
909 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
911 except DbException
as e
:
912 # Racing condition: check if another N2VC worker has created it
913 juju_info
= self
.db
.get_one(
914 DB_DATA
.api_endpoints
.table
,
915 q_filter
=DB_DATA
.api_endpoints
.filter,
921 DB_DATA
.api_endpoints
.table
,
922 DB_DATA
.api_endpoints
.filter,
923 {DB_DATA
.api_endpoints
.key
: endpoints
},
926 def handle_exception(self
, loop
, context
):
927 # All unhandled exceptions by libjuju are handled here.
930 async def health_check(self
, interval
: float = 300.0):
932 Health check to make sure controller and controller_model connections are OK
934 :param: interval: Time in seconds between checks
938 controller
= await self
.get_controller()
939 # self.log.debug("VCA is alive")
940 except Exception as e
:
941 self
.log
.error("Health check to VCA failed: {}".format(e
))
943 await self
.disconnect_controller(controller
)
944 await asyncio
.sleep(interval
)
946 async def list_models(self
, contains
: str = None) -> [str]:
947 """List models with certain names
949 :param: contains: String that is contained in model name
951 :retur: [models] Returns list of model names
954 controller
= await self
.get_controller()
956 models
= await controller
.list_models()
958 models
= [model
for model
in models
if contains
in model
]
961 await self
.disconnect_controller(controller
)
963 async def list_offers(self
, model_name
: str) -> QueryApplicationOffersResults
:
964 """List models with certain names
966 :param: model_name: Model name
968 :return: Returns list of offers
971 controller
= await self
.get_controller()
973 return await controller
.list_offers(model_name
)
975 await self
.disconnect_controller(controller
)