1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
import client
21 from juju
.errors
import JujuAPIError
22 from juju
.model
import Model
23 from juju
.machine
import Machine
24 from juju
.application
import Application
25 from juju
.client
._definitions
import FullStatus
, QueryApplicationOffersResults
26 from n2vc
.juju_watcher
import JujuModelWatcher
27 from n2vc
.provisioner
import AsyncSSHProvisioner
28 from n2vc
.n2vc_conn
import N2VCConnector
29 from n2vc
.exceptions
import (
31 JujuApplicationNotFound
,
32 JujuLeaderUnitNotFound
,
34 JujuModelAlreadyExists
,
35 JujuControllerFailedConnecting
,
36 JujuApplicationExists
,
38 from n2vc
.utils
import DB_DATA
39 from osm_common
.dbbase
import DbException
50 loop
: asyncio
.AbstractEventLoop
= None,
51 log
: logging
.Logger
= None,
53 n2vc
: N2VCConnector
= None,
54 apt_mirror
: str = None,
55 enable_os_upgrade
: bool = True,
60 :param: endpoint: Endpoint of the juju controller (host:port)
61 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
62 :param: username: Juju username
63 :param: password: Juju password
64 :param: cacert: Juju CA Certificate
65 :param: loop: Asyncio loop
68 :param: n2vc: N2VC object
69 :param: apt_mirror: APT Mirror
70 :param: enable_os_upgrade: Enable OS Upgrade
73 self
.log
= log
or logging
.getLogger("Libjuju")
75 db_endpoints
= self
._get
_api
_endpoints
_db
()
76 self
.endpoints
= db_endpoints
or [endpoint
]
77 if db_endpoints
is None:
78 self
._update
_api
_endpoints
_db
(self
.endpoints
)
79 self
.api_proxy
= api_proxy
80 self
.username
= username
81 self
.password
= password
83 self
.loop
= loop
or asyncio
.get_event_loop()
86 # Generate config for models
87 self
.model_config
= {}
89 self
.model_config
["apt-mirror"] = apt_mirror
90 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
91 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
93 self
.loop
.set_exception_handler(self
.handle_exception
)
94 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
97 self
.log
.debug("Libjuju initialized!")
99 self
.health_check_task
= self
.loop
.create_task(self
.health_check())
101 async def get_controller(self
, timeout
: float = 5.0) -> Controller
:
105 :param: timeout: Time in seconds to wait for controller to connect
109 controller
= Controller(loop
=self
.loop
)
110 await asyncio
.wait_for(
112 endpoint
=self
.endpoints
,
113 username
=self
.username
,
114 password
=self
.password
,
119 endpoints
= await controller
.api_endpoints
120 if self
.endpoints
!= endpoints
:
121 self
.endpoints
= endpoints
122 self
._update
_api
_endpoints
_db
(self
.endpoints
)
124 except asyncio
.CancelledError
as e
:
126 except Exception as e
:
128 "Failed connecting to controller: {}...".format(self
.endpoints
)
131 await self
.disconnect_controller(controller
)
132 raise JujuControllerFailedConnecting(e
)
134 async def disconnect(self
):
136 # Cancel health check task
137 self
.health_check_task
.cancel()
138 self
.log
.debug("Libjuju disconnected!")
140 async def disconnect_model(self
, model
: Model
):
144 :param: model: Model that will be disconnected
146 await model
.disconnect()
148 async def disconnect_controller(self
, controller
: Controller
):
150 Disconnect controller
152 :param: controller: Controller that will be disconnected
154 await controller
.disconnect()
156 async def add_model(self
, model_name
: str, cloud_name
: str):
160 :param: model_name: Model name
161 :param: cloud_name: Cloud name
165 controller
= await self
.get_controller()
168 # Raise exception if model already exists
169 if await self
.model_exists(model_name
, controller
=controller
):
170 raise JujuModelAlreadyExists(
171 "Model {} already exists.".format(model_name
)
174 # Block until other workers have finished model creation
175 while self
.creating_model
.locked():
176 await asyncio
.sleep(0.1)
178 # If the model exists, return it from the controller
179 if model_name
in self
.models
:
183 async with self
.creating_model
:
184 self
.log
.debug("Creating model {}".format(model_name
))
185 model
= await controller
.add_model(
187 config
=self
.model_config
,
188 cloud_name
=cloud_name
,
189 credential_name
=cloud_name
,
191 self
.models
.add(model_name
)
194 await self
.disconnect_model(model
)
195 await self
.disconnect_controller(controller
)
198 self
, controller
: Controller
, model_name
: str, id=None
201 Get model from controller
203 :param: controller: Controller
204 :param: model_name: Model name
206 :return: Model: The created Juju model object
208 return await controller
.get_model(model_name
)
210 async def model_exists(
211 self
, model_name
: str, controller
: Controller
= None
214 Check if model exists
216 :param: controller: Controller
217 :param: model_name: Model name
221 need_to_disconnect
= False
223 # Get controller if not passed
225 controller
= await self
.get_controller()
226 need_to_disconnect
= True
228 # Check if model exists
230 return model_name
in await controller
.list_models()
232 if need_to_disconnect
:
233 await self
.disconnect_controller(controller
)
235 async def models_exist(self
, model_names
: [str]) -> (bool, list):
237 Check if models exists
239 :param: model_names: List of strings with model names
241 :return (bool, list[str]): (True if all models exists, List of model names that don't exist)
245 "model_names must be a non-empty array. Given value: {}".format(model_names
)
247 non_existing_models
= []
248 models
= await self
.list_models()
249 existing_models
= list(set(models
).intersection(model_names
))
250 non_existing_models
= list(set(model_names
) - set(existing_models
))
253 len(non_existing_models
) == 0,
257 async def get_model_status(self
, model_name
: str) -> FullStatus
:
261 :param: model_name: Model name
263 :return: Full status object
265 controller
= await self
.get_controller()
266 model
= await self
.get_model(controller
, model_name
)
268 return await model
.get_status()
270 await self
.disconnect_model(model
)
271 await self
.disconnect_controller(controller
)
273 async def create_machine(
276 machine_id
: str = None,
277 db_dict
: dict = None,
278 progress_timeout
: float = None,
279 total_timeout
: float = None,
280 series
: str = "xenial",
282 ) -> (Machine
, bool):
286 :param: model_name: Model name
287 :param: machine_id: Machine id
288 :param: db_dict: Dictionary with data of the DB to write the updates
289 :param: progress_timeout: Maximum time between two updates in the model
290 :param: total_timeout: Timeout for the entity to be active
291 :param: series: Series of the machine (xenial, bionic, focal, ...)
292 :param: wait: Wait until machine is ready
294 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
295 if the machine is new or it already existed
301 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
305 controller
= await self
.get_controller()
308 model
= await self
.get_model(controller
, model_name
)
310 if machine_id
is not None:
312 "Searching machine (id={}) in model {}".format(
313 machine_id
, model_name
317 # Get machines from model and get the machine with machine_id if exists
318 machines
= await model
.get_machines()
319 if machine_id
in machines
:
321 "Machine (id={}) found in model {}".format(
322 machine_id
, model_name
325 machine
= machines
[machine_id
]
327 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
330 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
333 machine
= await model
.add_machine(
334 spec
=None, constraints
=None, disks
=None, series
=series
338 # Wait until the machine is ready
340 "Wait until machine {} is ready in model {}".format(
341 machine
.entity_id
, model_name
345 await JujuModelWatcher
.wait_for(
348 progress_timeout
=progress_timeout
,
349 total_timeout
=total_timeout
,
354 await self
.disconnect_model(model
)
355 await self
.disconnect_controller(controller
)
358 "Machine {} ready at {} in model {}".format(
359 machine
.entity_id
, machine
.dns_name
, model_name
364 async def provision_machine(
369 private_key_path
: str,
370 db_dict
: dict = None,
371 progress_timeout
: float = None,
372 total_timeout
: float = None,
375 Manually provisioning of a machine
377 :param: model_name: Model name
378 :param: hostname: IP to access the machine
379 :param: username: Username to login to the machine
380 :param: private_key_path: Local path for the private key
381 :param: db_dict: Dictionary with data of the DB to write the updates
382 :param: progress_timeout: Maximum time between two updates in the model
383 :param: total_timeout: Timeout for the entity to be active
385 :return: (Entity): Machine id
388 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
389 model_name
, hostname
, username
394 controller
= await self
.get_controller()
397 model
= await self
.get_model(controller
, model_name
)
401 provisioner
= AsyncSSHProvisioner(
404 private_key_path
=private_key_path
,
409 params
= await provisioner
.provision_machine()
411 params
.jobs
= ["JobHostUnits"]
413 self
.log
.debug("Adding machine to model")
414 connection
= model
.connection()
415 client_facade
= client
.ClientFacade
.from_connection(connection
)
417 results
= await client_facade
.AddMachines(params
=[params
])
418 error
= results
.machines
[0].error
421 msg
= "Error adding machine: {}".format(error
.message
)
422 self
.log
.error(msg
=msg
)
423 raise ValueError(msg
)
425 machine_id
= results
.machines
[0].machine
427 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
428 asyncio
.ensure_future(
429 provisioner
.install_agent(
430 connection
=connection
,
432 machine_id
=machine_id
,
433 proxy
=self
.api_proxy
,
439 machine_list
= await model
.get_machines()
440 if machine_id
in machine_list
:
441 self
.log
.debug("Machine {} found in model!".format(machine_id
))
442 machine
= model
.machines
.get(machine_id
)
444 await asyncio
.sleep(2)
447 msg
= "Machine {} not found in model".format(machine_id
)
448 self
.log
.error(msg
=msg
)
449 raise JujuMachineNotFound(msg
)
452 "Wait until machine {} is ready in model {}".format(
453 machine
.entity_id
, model_name
456 await JujuModelWatcher
.wait_for(
459 progress_timeout
=progress_timeout
,
460 total_timeout
=total_timeout
,
464 except Exception as e
:
467 await self
.disconnect_model(model
)
468 await self
.disconnect_controller(controller
)
471 "Machine provisioned {} in model {}".format(machine_id
, model_name
)
476 async def deploy_charm(
478 application_name
: str,
482 db_dict
: dict = None,
483 progress_timeout
: float = None,
484 total_timeout
: float = None,
491 :param: application_name: Application name
492 :param: path: Local path to the charm
493 :param: model_name: Model name
494 :param: machine_id ID of the machine
495 :param: db_dict: Dictionary with data of the DB to write the updates
496 :param: progress_timeout: Maximum time between two updates in the model
497 :param: total_timeout: Timeout for the entity to be active
498 :param: config: Config for the charm
499 :param: series: Series of the charm
500 :param: num_units: Number of units
502 :return: (juju.application.Application): Juju application
505 "Deploying charm {} to machine {} in model ~{}".format(
506 application_name
, machine_id
, model_name
509 self
.log
.debug("charm: {}".format(path
))
512 controller
= await self
.get_controller()
515 model
= await self
.get_model(controller
, model_name
)
519 if application_name
not in model
.applications
:
521 if machine_id
is not None:
522 if machine_id
not in model
.machines
:
523 msg
= "Machine {} not found in model".format(machine_id
)
524 self
.log
.error(msg
=msg
)
525 raise JujuMachineNotFound(msg
)
526 machine
= model
.machines
[machine_id
]
527 series
= machine
.series
529 application
= await model
.deploy(
531 application_name
=application_name
,
540 "Wait until application {} is ready in model {}".format(
541 application_name
, model_name
545 for _
in range(num_units
- 1):
546 m
, _
= await self
.create_machine(model_name
, wait
=False)
547 await application
.add_unit(to
=m
.entity_id
)
549 await JujuModelWatcher
.wait_for(
552 progress_timeout
=progress_timeout
,
553 total_timeout
=total_timeout
,
558 "Application {} is ready in model {}".format(
559 application_name
, model_name
563 raise JujuApplicationExists(
564 "Application {} exists".format(application_name
)
567 await self
.disconnect_model(model
)
568 await self
.disconnect_controller(controller
)
572 def _get_application(self
, model
: Model
, application_name
: str) -> Application
:
575 :param: model: Model object
576 :param: application_name: Application name
578 :return: juju.application.Application (or None if it doesn't exist)
580 if model
.applications
and application_name
in model
.applications
:
581 return model
.applications
[application_name
]
583 async def execute_action(
585 application_name
: str,
588 db_dict
: dict = None,
589 progress_timeout
: float = None,
590 total_timeout
: float = None,
595 :param: application_name: Application name
596 :param: model_name: Model name
597 :param: action_name: Name of the action
598 :param: db_dict: Dictionary with data of the DB to write the updates
599 :param: progress_timeout: Maximum time between two updates in the model
600 :param: total_timeout: Timeout for the entity to be active
602 :return: (str, str): (output and status)
605 "Executing action {} using params {}".format(action_name
, kwargs
)
608 controller
= await self
.get_controller()
611 model
= await self
.get_model(controller
, model_name
)
615 application
= self
._get
_application
(
616 model
, application_name
=application_name
,
618 if application
is None:
619 raise JujuApplicationNotFound("Cannot execute action")
623 for u
in application
.units
:
624 if await u
.is_leader_from_status():
627 raise JujuLeaderUnitNotFound("Cannot execute action: leader unit not found")
629 actions
= await application
.get_actions()
631 if action_name
not in actions
:
632 raise JujuActionNotFound(
633 "Action {} not in available actions".format(action_name
)
636 action
= await unit
.run_action(action_name
, **kwargs
)
639 "Wait until action {} is completed in application {} (model={})".format(
640 action_name
, application_name
, model_name
643 await JujuModelWatcher
.wait_for(
646 progress_timeout
=progress_timeout
,
647 total_timeout
=total_timeout
,
652 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
653 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
655 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
659 "Action {} completed with status {} in application {} (model={})".format(
660 action_name
, action
.status
, application_name
, model_name
664 await self
.disconnect_model(model
)
665 await self
.disconnect_controller(controller
)
667 return output
, status
669 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
670 """Get list of actions
672 :param: application_name: Application name
673 :param: model_name: Model name
675 :return: Dict with this format
677 "action_name": "Description of the action",
682 "Getting list of actions for application {}".format(application_name
)
686 controller
= await self
.get_controller()
689 model
= await self
.get_model(controller
, model_name
)
693 application
= self
._get
_application
(
694 model
, application_name
=application_name
,
697 # Return list of actions
698 return await application
.get_actions()
701 # Disconnect from model and controller
702 await self
.disconnect_model(model
)
703 await self
.disconnect_controller(controller
)
705 async def add_relation(
713 :param: model_name: Model name
714 :param: endpoint_1 First endpoint name
715 ("app:endpoint" format or directly the saas name)
716 :param: endpoint_2: Second endpoint name (^ same format)
719 self
.log
.debug("Adding relation: {} -> {}".format(endpoint_1
, endpoint_2
))
722 controller
= await self
.get_controller()
725 model
= await self
.get_model(controller
, model_name
)
729 await model
.add_relation(endpoint_1
, endpoint_2
)
730 except JujuAPIError
as e
:
731 if "not found" in e
.message
:
732 self
.log
.warning("Relation not found: {}".format(e
.message
))
734 if "already exists" in e
.message
:
735 self
.log
.warning("Relation already exists: {}".format(e
.message
))
737 # another exception, raise it
740 await self
.disconnect_model(model
)
741 await self
.disconnect_controller(controller
)
744 self
, offer_url
: str, model_name
: str,
747 Adds a remote offer to the model. Relations can be created later using "juju relate".
749 :param: offer_url: Offer Url
750 :param: model_name: Model name
752 :raises ParseError if there's a problem parsing the offer_url
753 :raises JujuError if remote offer includes and endpoint
754 :raises JujuAPIError if the operation is not successful
756 controller
= await self
.get_controller()
757 model
= await controller
.get_model(model_name
)
760 await model
.consume(offer_url
)
762 await self
.disconnect_model(model
)
763 await self
.disconnect_controller(controller
)
765 async def destroy_model(self
, model_name
: str, total_timeout
: float):
769 :param: model_name: Model name
770 :param: total_timeout: Timeout
773 controller
= await self
.get_controller()
774 model
= await self
.get_model(controller
, model_name
)
776 self
.log
.debug("Destroying model {}".format(model_name
))
777 uuid
= model
.info
.uuid
780 machines
= await model
.get_machines()
781 for machine_id
in machines
:
783 await self
.destroy_machine(
784 model
, machine_id
=machine_id
, total_timeout
=total_timeout
,
786 except asyncio
.CancelledError
:
792 await self
.disconnect_model(model
)
795 if model_name
in self
.models
:
796 self
.models
.remove(model_name
)
798 await controller
.destroy_model(uuid
)
800 # Wait until model is destroyed
801 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
804 if total_timeout
is None:
806 end
= time
.time() + total_timeout
807 while time
.time() < end
:
809 models
= await controller
.list_models()
810 if model_name
not in models
:
812 "The model {} ({}) was destroyed".format(model_name
, uuid
)
815 except asyncio
.CancelledError
:
817 except Exception as e
:
819 await asyncio
.sleep(5)
821 "Timeout waiting for model {} to be destroyed {}".format(
822 model_name
, last_exception
826 await self
.disconnect_controller(controller
)
828 async def destroy_application(self
, model
: Model
, application_name
: str):
832 :param: model: Model object
833 :param: application_name: Application name
836 "Destroying application {} in model {}".format(
837 application_name
, model
.info
.name
840 application
= model
.applications
.get(application_name
)
842 await application
.destroy()
844 self
.log
.warning("Application not found: {}".format(application_name
))
846 async def destroy_machine(
847 self
, model
: Model
, machine_id
: str, total_timeout
: float = 3600
852 :param: model: Model object
853 :param: machine_id: Machine id
854 :param: total_timeout: Timeout in seconds
856 machines
= await model
.get_machines()
857 if machine_id
in machines
:
858 machine
= machines
[machine_id
]
859 await machine
.destroy(force
=True)
861 end
= time
.time() + total_timeout
863 # wait for machine removal
864 machines
= await model
.get_machines()
865 while machine_id
in machines
and time
.time() < end
:
867 "Waiting for machine {} is destroyed".format(machine_id
)
869 await asyncio
.sleep(0.5)
870 machines
= await model
.get_machines()
871 self
.log
.debug("Machine destroyed: {}".format(machine_id
))
873 self
.log
.debug("Machine not found: {}".format(machine_id
))
875 async def configure_application(
876 self
, model_name
: str, application_name
: str, config
: dict = None
878 """Configure application
880 :param: model_name: Model name
881 :param: application_name: Application name
882 :param: config: Config to apply to the charm
884 self
.log
.debug("Configuring application {}".format(application_name
))
888 controller
= await self
.get_controller()
889 model
= await self
.get_model(controller
, model_name
)
890 application
= self
._get
_application
(
891 model
, application_name
=application_name
,
893 await application
.set_config(config
)
895 await self
.disconnect_model(model
)
896 await self
.disconnect_controller(controller
)
898 def _get_api_endpoints_db(self
) -> [str]:
900 Get API Endpoints from DB
902 :return: List of API endpoints
904 self
.log
.debug("Getting endpoints from database")
906 juju_info
= self
.db
.get_one(
907 DB_DATA
.api_endpoints
.table
,
908 q_filter
=DB_DATA
.api_endpoints
.filter,
911 if juju_info
and DB_DATA
.api_endpoints
.key
in juju_info
:
912 return juju_info
[DB_DATA
.api_endpoints
.key
]
914 def _update_api_endpoints_db(self
, endpoints
: [str]):
916 Update API endpoints in Database
918 :param: List of endpoints
920 self
.log
.debug("Saving endpoints {} in database".format(endpoints
))
922 juju_info
= self
.db
.get_one(
923 DB_DATA
.api_endpoints
.table
,
924 q_filter
=DB_DATA
.api_endpoints
.filter,
927 # If it doesn't, then create it
931 DB_DATA
.api_endpoints
.table
, DB_DATA
.api_endpoints
.filter,
933 except DbException
as e
:
934 # Racing condition: check if another N2VC worker has created it
935 juju_info
= self
.db
.get_one(
936 DB_DATA
.api_endpoints
.table
,
937 q_filter
=DB_DATA
.api_endpoints
.filter,
943 DB_DATA
.api_endpoints
.table
,
944 DB_DATA
.api_endpoints
.filter,
945 {DB_DATA
.api_endpoints
.key
: endpoints
},
948 def handle_exception(self
, loop
, context
):
949 # All unhandled exceptions by libjuju are handled here.
952 async def health_check(self
, interval
: float = 300.0):
954 Health check to make sure controller and controller_model connections are OK
956 :param: interval: Time in seconds between checks
960 controller
= await self
.get_controller()
961 # self.log.debug("VCA is alive")
962 except Exception as e
:
963 self
.log
.error("Health check to VCA failed: {}".format(e
))
965 await self
.disconnect_controller(controller
)
966 await asyncio
.sleep(interval
)
968 async def list_models(self
, contains
: str = None) -> [str]:
969 """List models with certain names
971 :param: contains: String that is contained in model name
973 :retur: [models] Returns list of model names
976 controller
= await self
.get_controller()
978 models
= await controller
.list_models()
980 models
= [model
for model
in models
if contains
in model
]
983 await self
.disconnect_controller(controller
)
985 async def list_offers(self
, model_name
: str) -> QueryApplicationOffersResults
:
986 """List models with certain names
988 :param: model_name: Model name
990 :return: Returns list of offers
993 controller
= await self
.get_controller()
995 return await controller
.list_offers(model_name
)
997 await self
.disconnect_controller(controller
)