Revert "Tox doesn't like -"
[osm/N2VC.git] / n2vc / libjuju.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import logging
17 from juju.controller import Controller
18 from juju.client.connector import NoConnectionException
19 from juju.client import client
20 import time
21
22 from juju.errors import JujuAPIError
23 from juju.model import Model
24 from juju.machine import Machine
25 from juju.application import Application
26 from juju.client._definitions import FullStatus
27 from n2vc.juju_watcher import JujuModelWatcher
28 from n2vc.provisioner import AsyncSSHProvisioner
29 from n2vc.n2vc_conn import N2VCConnector
30 from n2vc.exceptions import (
31 JujuMachineNotFound,
32 JujuApplicationNotFound,
33 JujuModelAlreadyExists,
34 JujuControllerFailedConnecting,
35 JujuApplicationExists,
36 )
37
38
39 class Libjuju:
40 def __init__(
41 self,
42 endpoint: str,
43 api_proxy: str,
44 username: str,
45 password: str,
46 cacert: str,
47 loop: asyncio.AbstractEventLoop = None,
48 log: logging.Logger = None,
49 db: dict = None,
50 n2vc: N2VCConnector = None,
51 apt_mirror: str = None,
52 enable_os_upgrade: bool = True,
53 ):
54 """
55 Constructor
56
57 :param: endpoint: Endpoint of the juju controller (host:port)
58 :param: api_proxy: Endpoint of the juju controller - Reachable from the VNFs
59 :param: username: Juju username
60 :param: password: Juju password
61 :param: cacert: Juju CA Certificate
62 :param: loop: Asyncio loop
63 :param: log: Logger
64 :param: db: DB object
65 :param: n2vc: N2VC object
66 :param: apt_mirror: APT Mirror
67 :param: enable_os_upgrade: Enable OS Upgrade
68 """
69
70 self.endpoints = [endpoint] # TODO: Store and get endpoints from DB
71 self.api_proxy = api_proxy
72 self.username = username
73 self.password = password
74 self.cacert = cacert
75 self.loop = loop or asyncio.get_event_loop()
76 self.log = log or logging.getLogger("Libjuju")
77 self.db = db
78 self.n2vc = n2vc
79
80 # Generate config for models
81 self.model_config = {}
82 if apt_mirror:
83 self.model_config["apt-mirror"] = apt_mirror
84 self.model_config["enable-os-refresh-update"] = enable_os_upgrade
85 self.model_config["enable-os-upgrade"] = enable_os_upgrade
86
87 self.reconnecting = asyncio.Lock(loop=self.loop)
88 self.creating_model = asyncio.Lock(loop=self.loop)
89
90 self.models = set()
91 self.controller = Controller(loop=self.loop)
92
93 self.loop.run_until_complete(self.connect())
94
95 async def connect(self):
96 """Connect to the controller"""
97
98 self.log.debug("Connecting from controller")
99 await self.controller.connect(
100 endpoint=self.endpoints,
101 username=self.username,
102 password=self.password,
103 cacert=self.cacert,
104 )
105 e = self.controller.connection().endpoint
106 self.log.info("Connected to controller: {}".format(e))
107
108 async def disconnect(self):
109 """Disconnect from controller"""
110
111 self.log.debug("Disconnecting from controller")
112 await self.controller.disconnect()
113 self.log.info("Disconnected from controller")
114
115 def controller_connected(self) -> bool:
116 """Check if the controller connection is open
117
118 :return: bool: True if connected, False if not connected
119 """
120
121 is_connected = False
122 try:
123 is_connected = self.controller.connection().is_open
124 except NoConnectionException:
125 self.log.warning("VCA not connected")
126 return is_connected
127
128 async def disconnect_model(self, model: Model):
129 """
130 Disconnect model
131
132 :param: model: Model that will be disconnected
133 """
134 try:
135 await model.disconnect()
136 except Exception:
137 pass
138
139 async def _reconnect(
140 self,
141 retry: bool = False,
142 timeout: int = 5,
143 time_between_retries: int = 3,
144 maximum_retries: int = 0,
145 ):
146 """
147 Reconnect to the controller
148
149 :param: retry: Set it to True to retry if the connection fails
150 :param: time_between_retries: Time in seconds between retries
151 :param: maximum_retries Maximum retries. If not set, it will retry forever
152
153 :raises: Exception if cannot connect to the controller
154 """
155
156 if self.reconnecting.locked():
157 # Return if another function is trying to reconnect
158 return
159 async with self.reconnecting:
160 attempt = 0
161 while True:
162 try:
163 await asyncio.wait_for(self.connect(), timeout=timeout)
164 break
165 except asyncio.TimeoutError:
166 self.log.error("Error reconnecting to controller: Timeout")
167 except Exception as e:
168 self.log.error("Error reconnecting to controller: {}".format(e))
169
170 attempt += 1
171 maximum_retries_reached = attempt == maximum_retries
172
173 if not retry or maximum_retries_reached:
174 raise JujuControllerFailedConnecting("Controller is not connected")
175 else:
176 await asyncio.sleep(time_between_retries)
177
178 async def add_model(self, model_name: str, cloud_name: str):
179 """
180 Create model
181
182 :param: model_name: Model name
183 :param: cloud_name: Cloud name
184 """
185
186 # Reconnect to the controller if not connected
187 if not self.controller_connected():
188 await self._reconnect()
189
190 # Raise exception if model already exists
191 if await self.model_exists(model_name):
192 raise JujuModelAlreadyExists("Model {} already exists.".format(model_name))
193
194 # Block until other workers have finished model creation
195 while self.creating_model.locked():
196 await asyncio.sleep(0.1)
197
198 # If the model exists, return it from the controller
199 if model_name in self.models:
200 return await self.get_model(model_name)
201
202 # Create the model
203 self.log.debug("Creating model {}".format(model_name))
204 async with self.creating_model:
205 model = await self.controller.add_model(
206 model_name,
207 config=self.model_config,
208 cloud_name=cloud_name,
209 credential_name=cloud_name,
210 )
211 await self.disconnect_model(model)
212 self.models.add(model_name)
213
214 async def get_model(self, model_name: str) -> Model:
215 """
216 Get model from controller
217
218 :param: model_name: Model name
219
220 :return: Model: The created Juju model object
221 """
222
223 # Check if controller is connected
224 if not self.controller_connected():
225 await self._reconnect()
226 return await self.controller.get_model(model_name)
227
228 async def model_exists(self, model_name: str) -> bool:
229 """
230 Check if model exists
231
232 :param: model_name: Model name
233
234 :return bool
235 """
236
237 # Check if controller is connected
238 if not self.controller_connected():
239 await self._reconnect()
240
241 return model_name in await self.controller.list_models()
242
243 async def get_model_status(self, model_name: str) -> FullStatus:
244 """
245 Get model status
246
247 :param: model_name: Model name
248
249 :return: Full status object
250 """
251 model = await self.get_model(model_name)
252 status = await model.get_status()
253 await self.disconnect_model(model)
254 return status
255
256 async def create_machine(
257 self,
258 model_name: str,
259 machine_id: str = None,
260 db_dict: dict = None,
261 progress_timeout: float = None,
262 total_timeout: float = None,
263 series: str = "xenial",
264 ) -> (Machine, bool):
265 """
266 Create machine
267
268 :param: model_name: Model name
269 :param: machine_id: Machine id
270 :param: db_dict: Dictionary with data of the DB to write the updates
271 :param: progress_timeout: Maximum time between two updates in the model
272 :param: total_timeout: Timeout for the entity to be active
273
274 :return: (juju.machine.Machine, bool): Machine object and a boolean saying
275 if the machine is new or it already existed
276 """
277 new = False
278 machine = None
279
280 self.log.debug(
281 "Creating machine (id={}) in model: {}".format(machine_id, model_name)
282 )
283
284 # Get model
285 model = await self.get_model(model_name)
286 try:
287 if machine_id is not None:
288 self.log.debug(
289 "Searching machine (id={}) in model {}".format(
290 machine_id, model_name
291 )
292 )
293
294 # Get machines from model and get the machine with machine_id if exists
295 machines = await model.get_machines()
296 if machine_id in machines:
297 self.log.debug(
298 "Machine (id={}) found in model {}".format(
299 machine_id, model_name
300 )
301 )
302 machine = model.machines[machine_id]
303 else:
304 raise JujuMachineNotFound("Machine {} not found".format(machine_id))
305
306 if machine is None:
307 self.log.debug("Creating a new machine in model {}".format(model_name))
308
309 # Create machine
310 machine = await model.add_machine(
311 spec=None, constraints=None, disks=None, series=series
312 )
313 new = True
314
315 # Wait until the machine is ready
316 await JujuModelWatcher.wait_for(
317 model=model,
318 entity=machine,
319 progress_timeout=progress_timeout,
320 total_timeout=total_timeout,
321 db_dict=db_dict,
322 n2vc=self.n2vc,
323 )
324 except Exception as e:
325 raise e
326 finally:
327 await self.disconnect_model(model)
328
329 self.log.debug("Machine ready at {}".format(machine.dns_name))
330 return machine, new
331
332 async def provision_machine(
333 self,
334 model_name: str,
335 hostname: str,
336 username: str,
337 private_key_path: str,
338 db_dict: dict = None,
339 progress_timeout: float = None,
340 total_timeout: float = None,
341 ) -> str:
342 """
343 Manually provisioning of a machine
344
345 :param: model_name: Model name
346 :param: hostname: IP to access the machine
347 :param: username: Username to login to the machine
348 :param: private_key_path: Local path for the private key
349 :param: db_dict: Dictionary with data of the DB to write the updates
350 :param: progress_timeout: Maximum time between two updates in the model
351 :param: total_timeout: Timeout for the entity to be active
352
353 :return: (Entity): Machine id
354 """
355 self.log.debug(
356 "Provisioning machine. model: {}, hostname: {}, username: {}".format(
357 model_name, hostname, username
358 )
359 )
360
361 # Get model
362 model = await self.get_model(model_name)
363
364 try:
365 # Get provisioner
366 provisioner = AsyncSSHProvisioner(
367 host=hostname,
368 user=username,
369 private_key_path=private_key_path,
370 log=self.log,
371 )
372
373 # Provision machine
374 params = await provisioner.provision_machine()
375
376 params.jobs = ["JobHostUnits"]
377
378 self.log.debug("Adding machine to model")
379 connection = model.connection()
380 client_facade = client.ClientFacade.from_connection(connection)
381
382 results = await client_facade.AddMachines(params=[params])
383 error = results.machines[0].error
384
385 if error:
386 msg = "Error adding machine: {}".format(error.message)
387 self.log.error(msg=msg)
388 raise ValueError(msg)
389
390 machine_id = results.machines[0].machine
391
392 self.log.debug("Installing Juju agent into machine {}".format(machine_id))
393 asyncio.ensure_future(
394 provisioner.install_agent(
395 connection=connection,
396 nonce=params.nonce,
397 machine_id=machine_id,
398 api=self.api_proxy,
399 )
400 )
401
402 machine = None
403 for _ in range(10):
404 machine_list = await model.get_machines()
405 if machine_id in machine_list:
406 self.log.debug("Machine {} found in model!".format(machine_id))
407 machine = model.machines.get(machine_id)
408 break
409 await asyncio.sleep(2)
410
411 if machine is None:
412 msg = "Machine {} not found in model".format(machine_id)
413 self.log.error(msg=msg)
414 raise JujuMachineNotFound(msg)
415
416 await JujuModelWatcher.wait_for(
417 model=model,
418 entity=machine,
419 progress_timeout=progress_timeout,
420 total_timeout=total_timeout,
421 db_dict=db_dict,
422 n2vc=self.n2vc,
423 )
424 except Exception as e:
425 raise e
426 finally:
427 await self.disconnect_model(model)
428
429 self.log.debug("Machine provisioned {}".format(machine_id))
430
431 return machine_id
432
433 async def deploy_charm(
434 self,
435 application_name: str,
436 path: str,
437 model_name: str,
438 machine_id: str,
439 db_dict: dict = None,
440 progress_timeout: float = None,
441 total_timeout: float = None,
442 config: dict = None,
443 series: str = None,
444 ):
445 """Deploy charm
446
447 :param: application_name: Application name
448 :param: path: Local path to the charm
449 :param: model_name: Model name
450 :param: machine_id ID of the machine
451 :param: db_dict: Dictionary with data of the DB to write the updates
452 :param: progress_timeout: Maximum time between two updates in the model
453 :param: total_timeout: Timeout for the entity to be active
454 :param: config: Config for the charm
455 :param: series: Series of the charm
456
457 :return: (juju.application.Application): Juju application
458 """
459
460 # Get model
461 model = await self.get_model(model_name)
462
463 try:
464 application = None
465 if application_name not in model.applications:
466 self.log.debug(
467 "Deploying charm {} to machine {} in model ~{}".format(
468 application_name, machine_id, model_name
469 )
470 )
471 self.log.debug("charm: {}".format(path))
472 if machine_id is not None:
473 if machine_id not in model.machines:
474 msg = "Machine {} not found in model".format(machine_id)
475 self.log.error(msg=msg)
476 raise JujuMachineNotFound(msg)
477 machine = model.machines[machine_id]
478 series = machine.series
479
480 application = await model.deploy(
481 entity_url=path,
482 application_name=application_name,
483 channel="stable",
484 num_units=1,
485 series=series,
486 to=machine_id,
487 config=config,
488 )
489
490 await JujuModelWatcher.wait_for(
491 model=model,
492 entity=application,
493 progress_timeout=progress_timeout,
494 total_timeout=total_timeout,
495 db_dict=db_dict,
496 n2vc=self.n2vc,
497 )
498 else:
499 raise JujuApplicationExists("Application {} exists".format(application_name))
500
501 except Exception as e:
502 raise e
503 finally:
504 await self.disconnect_model(model)
505
506 self.log.debug("application deployed")
507
508 return application
509
510 async def _get_application(
511 self, model: Model, application_name: str
512 ) -> Application:
513 """Get application
514
515 :param: model: Model object
516 :param: application_name: Application name
517
518 :return: juju.application.Application (or None if it doesn't exist)
519 """
520 if model.applications and application_name in model.applications:
521 return model.applications[application_name]
522
523 async def execute_action(
524 self,
525 application_name: str,
526 model_name: str,
527 action_name: str,
528 db_dict: dict = None,
529 progress_timeout: float = None,
530 total_timeout: float = None,
531 **kwargs
532 ):
533 """Execute action
534
535 :param: application_name: Application name
536 :param: model_name: Model name
537 :param: cloud_name: Cloud name
538 :param: action_name: Name of the action
539 :param: db_dict: Dictionary with data of the DB to write the updates
540 :param: progress_timeout: Maximum time between two updates in the model
541 :param: total_timeout: Timeout for the entity to be active
542
543 :return: (str, str): (output and status)
544 """
545 # Get model and observer
546 model = await self.get_model(model_name)
547
548 try:
549 # Get application
550 application = await self._get_application(
551 model, application_name=application_name,
552 )
553 if application is None:
554 raise JujuApplicationNotFound("Cannot execute action")
555
556 # Get unit
557 unit = None
558 for u in application.units:
559 if await u.is_leader_from_status():
560 unit = u
561 if unit is None:
562 raise Exception("Cannot execute action: leader unit not found")
563
564 actions = await application.get_actions()
565
566 if action_name not in actions:
567 raise Exception(
568 "Action {} not in available actions".format(action_name)
569 )
570
571 self.log.debug(
572 "Executing action {} using params {}".format(action_name, kwargs)
573 )
574 action = await unit.run_action(action_name, **kwargs)
575
576 # Register action with observer and wait for it to finish
577 await JujuModelWatcher.wait_for(
578 model=model,
579 entity=action,
580 progress_timeout=progress_timeout,
581 total_timeout=total_timeout,
582 db_dict=db_dict,
583 n2vc=self.n2vc,
584 )
585 output = await model.get_action_output(action_uuid=action.entity_id)
586 status = await model.get_action_status(uuid_or_prefix=action.entity_id)
587 status = (
588 status[action.entity_id] if action.entity_id in status else "failed"
589 )
590
591 self.log.debug("action completed with status: {}".format(action.status))
592 except Exception as e:
593 raise e
594 finally:
595 await self.disconnect_model(model)
596
597 return output, status
598
599 async def get_actions(self, application_name: str, model_name: str) -> dict:
600 """Get list of actions
601
602 :param: application_name: Application name
603 :param: model_name: Model name
604
605 :return: Dict with this format
606 {
607 "action_name": "Description of the action",
608 ...
609 }
610 """
611
612 # Get model
613 model = await self.get_model(model_name)
614
615 # Get application
616 application = await self._get_application(
617 model, application_name=application_name,
618 )
619
620 # Get list of actions
621 actions = await application.get_actions()
622
623 # Disconnect from model
624 await self.disconnect_model(model)
625
626 return actions
627
628 async def add_relation(
629 self,
630 model_name: str,
631 application_name_1: str,
632 application_name_2: str,
633 relation_1: str,
634 relation_2: str,
635 ):
636 """Add relation
637
638 :param: model_name: Model name
639 :param: application_name_1 First application name
640 :param: application_name_2: Second application name
641 :param: relation_1: First relation name
642 :param: relation_2: Second relation name
643 """
644
645 # Get model
646 model = await self.get_model(model_name)
647
648 # Build relation strings
649 r1 = "{}:{}".format(application_name_1, relation_1)
650 r2 = "{}:{}".format(application_name_2, relation_2)
651
652 # Add relation
653 self.log.debug("Adding relation: {} -> {}".format(r1, r2))
654 try:
655 await model.add_relation(relation1=r1, relation2=r2)
656 except JujuAPIError as e:
657 if "not found" in e.message:
658 self.log.warning("Relation not found: {}".format(e.message))
659 return
660 if "already exists" in e.message:
661 self.log.warning("Relation already exists: {}".format(e.message))
662 return
663 # another exception, raise it
664 raise e
665 finally:
666 await self.disconnect_model(model)
667
668 async def destroy_model(
669 self, model_name: str, total_timeout: float,
670 ):
671 """
672 Destroy model
673
674 :param: model_name: Model name
675 :param: total_timeout: Timeout
676 """
677 model = await self.get_model(model_name)
678 uuid = model.info.uuid
679
680 # Destroy applications
681 for application_name in model.applications:
682 try:
683 await self.destroy_application(
684 model, application_name=application_name,
685 )
686 except Exception as e:
687 self.log.error(
688 "Error destroying application {} in model {}: {}".format(
689 application_name, model_name, e
690 )
691 )
692
693 # Destroy machines
694 machines = await model.get_machines()
695 for machine_id in machines:
696 try:
697 await self.destroy_machine(
698 model, machine_id=machine_id, total_timeout=total_timeout,
699 )
700 except asyncio.CancelledError:
701 raise
702 except Exception:
703 pass
704
705 # Disconnect model
706 await self.disconnect_model(model)
707
708 # Destroy model
709 self.models.remove(model_name)
710 await self.controller.destroy_model(uuid)
711
712 # Wait until model is destroyed
713 self.log.debug("Waiting for model {} to be destroyed...".format(model_name))
714 last_exception = ""
715
716 if total_timeout is None:
717 total_timeout = 3600
718 end = time.time() + total_timeout
719 while time.time() < end:
720 try:
721 models = await self.controller.list_models()
722 if model_name not in models:
723 self.log.debug(
724 "The model {} ({}) was destroyed".format(model_name, uuid)
725 )
726 return
727 except asyncio.CancelledError:
728 raise
729 except Exception as e:
730 last_exception = e
731 await asyncio.sleep(5)
732 raise Exception(
733 "Timeout waiting for model {} to be destroyed {}".format(
734 model_name, last_exception
735 )
736 )
737
738 async def destroy_application(self, model: Model, application_name: str):
739 """
740 Destroy application
741
742 :param: model: Model object
743 :param: application_name: Application name
744 """
745 self.log.debug(
746 "Destroying application {} in model {}".format(
747 application_name, model.info.name
748 )
749 )
750 application = model.applications.get(application_name)
751 if application:
752 await application.destroy()
753 else:
754 self.log.warning("Application not found: {}".format(application_name))
755
756 async def destroy_machine(
757 self, model: Model, machine_id: str, total_timeout: float = 3600
758 ):
759 """
760 Destroy machine
761
762 :param: model: Model object
763 :param: machine_id: Machine id
764 :param: total_timeout: Timeout in seconds
765 """
766 machines = await model.get_machines()
767 if machine_id in machines:
768 machine = model.machines[machine_id]
769 # TODO: change this by machine.is_manual when this is upstreamed:
770 # https://github.com/juju/python-libjuju/pull/396
771 if "instance-id" in machine.safe_data and machine.safe_data[
772 "instance-id"
773 ].startswith("manual:"):
774 await machine.destroy(force=True)
775
776 # max timeout
777 end = time.time() + total_timeout
778
779 # wait for machine removal
780 machines = await model.get_machines()
781 while machine_id in machines and time.time() < end:
782 self.log.debug(
783 "Waiting for machine {} is destroyed".format(machine_id)
784 )
785 await asyncio.sleep(0.5)
786 machines = await model.get_machines()
787 self.log.debug("Machine destroyed: {}".format(machine_id))
788 else:
789 self.log.debug("Machine not found: {}".format(machine_id))
790
791 async def configure_application(
792 self, model_name: str, application_name: str, config: dict = None
793 ):
794 """Configure application
795
796 :param: model_name: Model name
797 :param: application_name: Application name
798 :param: config: Config to apply to the charm
799 """
800 if config:
801 model = await self.get_model(model_name)
802 application = await self._get_application(
803 model, application_name=application_name,
804 )
805 await application.set_config(config)
806 await self.disconnect_model(model)