1 # Copyright 2020 Canonical Ltd.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
17 from juju
.controller
import Controller
18 from juju
.client
.connector
import NoConnectionException
19 from juju
.client
import client
22 from juju
.errors
import JujuAPIError
23 from juju
.model
import Model
24 from juju
.machine
import Machine
25 from juju
.application
import Application
26 from juju
.client
._definitions
import FullStatus
27 from n2vc
.juju_watcher
import JujuModelWatcher
28 from n2vc
.provisioner
import AsyncSSHProvisioner
29 from n2vc
.n2vc_conn
import N2VCConnector
30 from n2vc
.exceptions
import (
32 JujuApplicationNotFound
,
33 JujuModelAlreadyExists
,
34 JujuControllerFailedConnecting
,
35 JujuApplicationExists
,
47 loop
: asyncio
.AbstractEventLoop
= None,
48 log
: logging
.Logger
= None,
50 n2vc
: N2VCConnector
= None,
51 apt_mirror
: str = None,
52 enable_os_upgrade
: bool = True,
57 :param: endpoint: Endpoint of the juju controller (host:port)
58 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
59 :param: username: Juju username
60 :param: password: Juju password
61 :param: cacert: Juju CA Certificate
62 :param: loop: Asyncio loop
65 :param: n2vc: N2VC object
66 :param: apt_mirror: APT Mirror
67 :param: enable_os_upgrade: Enable OS Upgrade
70 self
.endpoints
= [endpoint
] # TODO: Store and get endpoints from DB
71 self
.api_proxy
= api_proxy
72 self
.username
= username
73 self
.password
= password
75 self
.loop
= loop
or asyncio
.get_event_loop()
76 self
.log
= log
or logging
.getLogger("Libjuju")
80 # Generate config for models
81 self
.model_config
= {}
83 self
.model_config
["apt-mirror"] = apt_mirror
84 self
.model_config
["enable-os-refresh-update"] = enable_os_upgrade
85 self
.model_config
["enable-os-upgrade"] = enable_os_upgrade
87 self
.reconnecting
= asyncio
.Lock(loop
=self
.loop
)
88 self
.creating_model
= asyncio
.Lock(loop
=self
.loop
)
91 self
.controller
= Controller(loop
=self
.loop
)
93 self
.loop
.run_until_complete(self
.connect())
95 async def connect(self
):
96 """Connect to the controller"""
98 self
.log
.debug("Connecting from controller")
99 await self
.controller
.connect(
100 endpoint
=self
.endpoints
,
101 username
=self
.username
,
102 password
=self
.password
,
105 e
= self
.controller
.connection().endpoint
106 self
.log
.info("Connected to controller: {}".format(e
))
108 async def disconnect(self
):
109 """Disconnect from controller"""
111 self
.log
.debug("Disconnecting from controller")
112 await self
.controller
.disconnect()
113 self
.log
.info("Disconnected from controller")
115 def controller_connected(self
) -> bool:
116 """Check if the controller connection is open
118 :return: bool: True if connected, False if not connected
123 is_connected
= self
.controller
.connection().is_open
124 except NoConnectionException
:
125 self
.log
.warning("VCA not connected")
128 async def disconnect_model(self
, model
: Model
):
132 :param: model: Model that will be disconnected
135 await model
.disconnect()
139 async def _reconnect(
143 time_between_retries
: int = 3,
144 maximum_retries
: int = 0,
147 Reconnect to the controller
149 :param: retry: Set it to True to retry if the connection fails
150 :param: time_between_retries: Time in seconds between retries
151 :param: maximum_retries Maximum retries. If not set, it will retry forever
153 :raises: Exception if cannot connect to the controller
156 if self
.reconnecting
.locked():
157 # Return if another function is trying to reconnect
159 async with self
.reconnecting
:
163 await asyncio
.wait_for(self
.connect(), timeout
=timeout
)
165 except asyncio
.TimeoutError
:
166 self
.log
.error("Error reconnecting to controller: Timeout")
167 except Exception as e
:
168 self
.log
.error("Error reconnecting to controller: {}".format(e
))
171 maximum_retries_reached
= attempt
== maximum_retries
173 if not retry
or maximum_retries_reached
:
174 raise JujuControllerFailedConnecting("Controller is not connected")
176 await asyncio
.sleep(time_between_retries
)
178 async def add_model(self
, model_name
: str, cloud_name
: str):
182 :param: model_name: Model name
183 :param: cloud_name: Cloud name
186 # Reconnect to the controller if not connected
187 if not self
.controller_connected():
188 await self
._reconnect
()
190 # Raise exception if model already exists
191 if await self
.model_exists(model_name
):
192 raise JujuModelAlreadyExists("Model {} already exists.".format(model_name
))
194 # Block until other workers have finished model creation
195 while self
.creating_model
.locked():
196 await asyncio
.sleep(0.1)
198 # If the model exists, return it from the controller
199 if model_name
in self
.models
:
200 return await self
.get_model(model_name
)
203 self
.log
.debug("Creating model {}".format(model_name
))
204 async with self
.creating_model
:
205 model
= await self
.controller
.add_model(
207 config
=self
.model_config
,
208 cloud_name
=cloud_name
,
209 credential_name
=cloud_name
,
211 await self
.disconnect_model(model
)
212 self
.models
.add(model_name
)
214 async def get_model(self
, model_name
: str) -> Model
:
216 Get model from controller
218 :param: model_name: Model name
220 :return: Model: The created Juju model object
223 # Check if controller is connected
224 if not self
.controller_connected():
225 await self
._reconnect
()
226 return await self
.controller
.get_model(model_name
)
228 async def model_exists(self
, model_name
: str) -> bool:
230 Check if model exists
232 :param: model_name: Model name
237 # Check if controller is connected
238 if not self
.controller_connected():
239 await self
._reconnect
()
241 return model_name
in await self
.controller
.list_models()
243 async def get_model_status(self
, model_name
: str) -> FullStatus
:
247 :param: model_name: Model name
249 :return: Full status object
251 model
= await self
.get_model(model_name
)
252 status
= await model
.get_status()
253 await self
.disconnect_model(model
)
256 async def create_machine(
259 machine_id
: str = None,
260 db_dict
: dict = None,
261 progress_timeout
: float = None,
262 total_timeout
: float = None,
263 series
: str = "xenial",
264 ) -> (Machine
, bool):
268 :param: model_name: Model name
269 :param: machine_id: Machine id
270 :param: db_dict: Dictionary with data of the DB to write the updates
271 :param: progress_timeout: Maximum time between two updates in the model
272 :param: total_timeout: Timeout for the entity to be active
274 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
275 if the machine is new or it already existed
281 "Creating machine (id={}) in model: {}".format(machine_id
, model_name
)
285 model
= await self
.get_model(model_name
)
287 if machine_id
is not None:
289 "Searching machine (id={}) in model {}".format(
290 machine_id
, model_name
294 # Get machines from model and get the machine with machine_id if exists
295 machines
= await model
.get_machines()
296 if machine_id
in machines
:
298 "Machine (id={}) found in model {}".format(
299 machine_id
, model_name
302 machine
= model
.machines
[machine_id
]
304 raise JujuMachineNotFound("Machine {} not found".format(machine_id
))
307 self
.log
.debug("Creating a new machine in model {}".format(model_name
))
310 machine
= await model
.add_machine(
311 spec
=None, constraints
=None, disks
=None, series
=series
315 # Wait until the machine is ready
316 await JujuModelWatcher
.wait_for(
319 progress_timeout
=progress_timeout
,
320 total_timeout
=total_timeout
,
324 except Exception as e
:
327 await self
.disconnect_model(model
)
329 self
.log
.debug("Machine ready at {}".format(machine
.dns_name
))
332 async def provision_machine(
337 private_key_path
: str,
338 db_dict
: dict = None,
339 progress_timeout
: float = None,
340 total_timeout
: float = None,
343 Manually provisioning of a machine
345 :param: model_name: Model name
346 :param: hostname: IP to access the machine
347 :param: username: Username to login to the machine
348 :param: private_key_path: Local path for the private key
349 :param: db_dict: Dictionary with data of the DB to write the updates
350 :param: progress_timeout: Maximum time between two updates in the model
351 :param: total_timeout: Timeout for the entity to be active
353 :return: (Entity): Machine id
356 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
357 model_name
, hostname
, username
362 model
= await self
.get_model(model_name
)
366 provisioner
= AsyncSSHProvisioner(
369 private_key_path
=private_key_path
,
374 params
= await provisioner
.provision_machine()
376 params
.jobs
= ["JobHostUnits"]
378 self
.log
.debug("Adding machine to model")
379 connection
= model
.connection()
380 client_facade
= client
.ClientFacade
.from_connection(connection
)
382 results
= await client_facade
.AddMachines(params
=[params
])
383 error
= results
.machines
[0].error
386 msg
= "Error adding machine: {}".format(error
.message
)
387 self
.log
.error(msg
=msg
)
388 raise ValueError(msg
)
390 machine_id
= results
.machines
[0].machine
392 self
.log
.debug("Installing Juju agent into machine {}".format(machine_id
))
393 asyncio
.ensure_future(
394 provisioner
.install_agent(
395 connection
=connection
,
397 machine_id
=machine_id
,
404 machine_list
= await model
.get_machines()
405 if machine_id
in machine_list
:
406 self
.log
.debug("Machine {} found in model!".format(machine_id
))
407 machine
= model
.machines
.get(machine_id
)
409 await asyncio
.sleep(2)
412 msg
= "Machine {} not found in model".format(machine_id
)
413 self
.log
.error(msg
=msg
)
414 raise JujuMachineNotFound(msg
)
416 await JujuModelWatcher
.wait_for(
419 progress_timeout
=progress_timeout
,
420 total_timeout
=total_timeout
,
424 except Exception as e
:
427 await self
.disconnect_model(model
)
429 self
.log
.debug("Machine provisioned {}".format(machine_id
))
433 async def deploy_charm(
435 application_name
: str,
439 db_dict
: dict = None,
440 progress_timeout
: float = None,
441 total_timeout
: float = None,
447 :param: application_name: Application name
448 :param: path: Local path to the charm
449 :param: model_name: Model name
450 :param: machine_id ID of the machine
451 :param: db_dict: Dictionary with data of the DB to write the updates
452 :param: progress_timeout: Maximum time between two updates in the model
453 :param: total_timeout: Timeout for the entity to be active
454 :param: config: Config for the charm
455 :param: series: Series of the charm
457 :return: (juju.application.Application): Juju application
461 model
= await self
.get_model(model_name
)
465 if application_name
not in model
.applications
:
467 "Deploying charm {} to machine {} in model ~{}".format(
468 application_name
, machine_id
, model_name
471 self
.log
.debug("charm: {}".format(path
))
472 if machine_id
is not None:
473 if machine_id
not in model
.machines
:
474 msg
= "Machine {} not found in model".format(machine_id
)
475 self
.log
.error(msg
=msg
)
476 raise JujuMachineNotFound(msg
)
477 machine
= model
.machines
[machine_id
]
478 series
= machine
.series
480 application
= await model
.deploy(
482 application_name
=application_name
,
490 await JujuModelWatcher
.wait_for(
493 progress_timeout
=progress_timeout
,
494 total_timeout
=total_timeout
,
499 raise JujuApplicationExists("Application {} exists".format(application_name
))
501 except Exception as e
:
504 await self
.disconnect_model(model
)
506 self
.log
.debug("application deployed")
510 async def _get_application(
511 self
, model
: Model
, application_name
: str
515 :param: model: Model object
516 :param: application_name: Application name
518 :return: juju.application.Application (or None if it doesn't exist)
520 if model
.applications
and application_name
in model
.applications
:
521 return model
.applications
[application_name
]
523 async def execute_action(
525 application_name
: str,
528 db_dict
: dict = None,
529 progress_timeout
: float = None,
530 total_timeout
: float = None,
535 :param: application_name: Application name
536 :param: model_name: Model name
537 :param: cloud_name: Cloud name
538 :param: action_name: Name of the action
539 :param: db_dict: Dictionary with data of the DB to write the updates
540 :param: progress_timeout: Maximum time between two updates in the model
541 :param: total_timeout: Timeout for the entity to be active
543 :return: (str, str): (output and status)
545 # Get model and observer
546 model
= await self
.get_model(model_name
)
550 application
= await self
._get
_application
(
551 model
, application_name
=application_name
,
553 if application
is None:
554 raise JujuApplicationNotFound("Cannot execute action")
558 for u
in application
.units
:
559 if await u
.is_leader_from_status():
562 raise Exception("Cannot execute action: leader unit not found")
564 actions
= await application
.get_actions()
566 if action_name
not in actions
:
568 "Action {} not in available actions".format(action_name
)
572 "Executing action {} using params {}".format(action_name
, kwargs
)
574 action
= await unit
.run_action(action_name
, **kwargs
)
576 # Register action with observer and wait for it to finish
577 await JujuModelWatcher
.wait_for(
580 progress_timeout
=progress_timeout
,
581 total_timeout
=total_timeout
,
585 output
= await model
.get_action_output(action_uuid
=action
.entity_id
)
586 status
= await model
.get_action_status(uuid_or_prefix
=action
.entity_id
)
588 status
[action
.entity_id
] if action
.entity_id
in status
else "failed"
591 self
.log
.debug("action completed with status: {}".format(action
.status
))
592 except Exception as e
:
595 await self
.disconnect_model(model
)
597 return output
, status
599 async def get_actions(self
, application_name
: str, model_name
: str) -> dict:
600 """Get list of actions
602 :param: application_name: Application name
603 :param: model_name: Model name
605 :return: Dict with this format
607 "action_name": "Description of the action",
613 model
= await self
.get_model(model_name
)
616 application
= await self
._get
_application
(
617 model
, application_name
=application_name
,
620 # Get list of actions
621 actions
= await application
.get_actions()
623 # Disconnect from model
624 await self
.disconnect_model(model
)
628 async def add_relation(
631 application_name_1
: str,
632 application_name_2
: str,
638 :param: model_name: Model name
639 :param: application_name_1 First application name
640 :param: application_name_2: Second application name
641 :param: relation_1: First relation name
642 :param: relation_2: Second relation name
646 model
= await self
.get_model(model_name
)
648 # Build relation strings
649 r1
= "{}:{}".format(application_name_1
, relation_1
)
650 r2
= "{}:{}".format(application_name_2
, relation_2
)
653 self
.log
.debug("Adding relation: {} -> {}".format(r1
, r2
))
655 await model
.add_relation(relation1
=r1
, relation2
=r2
)
656 except JujuAPIError
as e
:
657 if "not found" in e
.message
:
658 self
.log
.warning("Relation not found: {}".format(e
.message
))
660 if "already exists" in e
.message
:
661 self
.log
.warning("Relation already exists: {}".format(e
.message
))
663 # another exception, raise it
666 await self
.disconnect_model(model
)
668 async def destroy_model(
669 self
, model_name
: str, total_timeout
: float,
674 :param: model_name: Model name
675 :param: total_timeout: Timeout
677 model
= await self
.get_model(model_name
)
678 uuid
= model
.info
.uuid
680 # Destroy applications
681 for application_name
in model
.applications
:
683 await self
.destroy_application(
684 model
, application_name
=application_name
,
686 except Exception as e
:
688 "Error destroying application {} in model {}: {}".format(
689 application_name
, model_name
, e
694 machines
= await model
.get_machines()
695 for machine_id
in machines
:
697 await self
.destroy_machine(
698 model
, machine_id
=machine_id
, total_timeout
=total_timeout
,
700 except asyncio
.CancelledError
:
706 await self
.disconnect_model(model
)
709 self
.models
.remove(model_name
)
710 await self
.controller
.destroy_model(uuid
)
712 # Wait until model is destroyed
713 self
.log
.debug("Waiting for model {} to be destroyed...".format(model_name
))
716 if total_timeout
is None:
718 end
= time
.time() + total_timeout
719 while time
.time() < end
:
721 models
= await self
.controller
.list_models()
722 if model_name
not in models
:
724 "The model {} ({}) was destroyed".format(model_name
, uuid
)
727 except asyncio
.CancelledError
:
729 except Exception as e
:
731 await asyncio
.sleep(5)
733 "Timeout waiting for model {} to be destroyed {}".format(
734 model_name
, last_exception
738 async def destroy_application(self
, model
: Model
, application_name
: str):
742 :param: model: Model object
743 :param: application_name: Application name
746 "Destroying application {} in model {}".format(
747 application_name
, model
.info
.name
750 application
= model
.applications
.get(application_name
)
752 await application
.destroy()
754 self
.log
.warning("Application not found: {}".format(application_name
))
756 async def destroy_machine(
757 self
, model
: Model
, machine_id
: str, total_timeout
: float = 3600
762 :param: model: Model object
763 :param: machine_id: Machine id
764 :param: total_timeout: Timeout in seconds
766 machines
= await model
.get_machines()
767 if machine_id
in machines
:
768 machine
= model
.machines
[machine_id
]
769 # TODO: change this by machine.is_manual when this is upstreamed:
770 # https://github.com/juju/python-libjuju/pull/396
771 if "instance-id" in machine
.safe_data
and machine
.safe_data
[
773 ].startswith("manual:"):
774 await machine
.destroy(force
=True)
777 end
= time
.time() + total_timeout
779 # wait for machine removal
780 machines
= await model
.get_machines()
781 while machine_id
in machines
and time
.time() < end
:
783 "Waiting for machine {} is destroyed".format(machine_id
)
785 await asyncio
.sleep(0.5)
786 machines
= await model
.get_machines()
787 self
.log
.debug("Machine destroyed: {}".format(machine_id
))
789 self
.log
.debug("Machine not found: {}".format(machine_id
))
791 async def configure_application(
792 self
, model_name
: str, application_name
: str, config
: dict = None
794 """Configure application
796 :param: model_name: Model name
797 :param: application_name: Application name
798 :param: config: Config to apply to the charm
801 model
= await self
.get_model(model_name
)
802 application
= await self
._get
_application
(
803 model
, application_name
=application_name
,
805 await application
.set_config(config
)
806 await self
.disconnect_model(model
)