Revert "Tox doesn't like -"
[osm/N2VC.git] / n2vc / juju_watcher.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import time
17 from juju.client import client
18 from n2vc.utils import FinalStatus, EntityType
19 from n2vc.exceptions import EntityInvalidException
20 from n2vc.n2vc_conn import N2VCConnector
21 from juju.model import ModelEntity, Model
22 from juju.client.overrides import Delta
23
24 import logging
25
26 logger = logging.getLogger("__main__")
27
28
29 class JujuModelWatcher:
30 @staticmethod
31 async def wait_for(
32 model,
33 entity: ModelEntity,
34 progress_timeout: float = 3600,
35 total_timeout: float = 3600,
36 db_dict: dict = None,
37 n2vc: N2VCConnector = None,
38 ):
39 """
40 Wait for entity to reach its final state.
41
42 :param: model: Model to observe
43 :param: entity: Entity object
44 :param: progress_timeout: Maximum time between two updates in the model
45 :param: total_timeout: Timeout for the entity to be active
46 :param: db_dict: Dictionary with data of the DB to write the updates
47 :param: n2vc: N2VC Connector objector
48
49 :raises: asyncio.TimeoutError when timeout reaches
50 """
51
52 if progress_timeout is None:
53 progress_timeout = 3600.0
54 if total_timeout is None:
55 total_timeout = 3600.0
56
57 entity_type = EntityType.get_entity(type(entity))
58 if entity_type not in FinalStatus:
59 raise EntityInvalidException("Entity type not found")
60
61 # Get final states
62 final_states = FinalStatus[entity_type].status
63 field_to_check = FinalStatus[entity_type].field
64
65 # Coroutine to wait until the entity reaches the final state
66 wait_for_entity = asyncio.ensure_future(
67 asyncio.wait_for(
68 model.block_until(
69 lambda: entity.__getattribute__(field_to_check) in final_states
70 ),
71 timeout=total_timeout,
72 )
73 )
74
75 # Coroutine to watch the model for changes (and write them to DB)
76 watcher = asyncio.ensure_future(
77 JujuModelWatcher.model_watcher(
78 model,
79 entity_id=entity.entity_id,
80 entity_type=entity_type,
81 timeout=progress_timeout,
82 db_dict=db_dict,
83 n2vc=n2vc,
84 )
85 )
86
87 tasks = [wait_for_entity, watcher]
88 try:
89 # Execute tasks, and stop when the first is finished
90 # The watcher task won't never finish (unless it timeouts)
91 await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
92 except Exception as e:
93 raise e
94 finally:
95 # Cancel tasks
96 for task in tasks:
97 task.cancel()
98
99 @staticmethod
100 async def model_watcher(
101 model: Model,
102 entity_id: str,
103 entity_type: EntityType,
104 timeout: float,
105 db_dict: dict = None,
106 n2vc: N2VCConnector = None,
107 ):
108 """
109 Observes the changes related to an specific entity in a model
110
111 :param: model: Model to observe
112 :param: entity_id: ID of the entity to be observed
113 :param: entity_type: EntityType (p.e. .APPLICATION, .MACHINE, and .ACTION)
114 :param: timeout: Maximum time between two updates in the model
115 :param: db_dict: Dictionary with data of the DB to write the updates
116 :param: n2vc: N2VC Connector objector
117
118 :raises: asyncio.TimeoutError when timeout reaches
119 """
120
121 allwatcher = client.AllWatcherFacade.from_connection(model.connection())
122
123 # Genenerate array with entity types to listen
124 entity_types = (
125 [entity_type, EntityType.UNIT]
126 if entity_type == EntityType.APPLICATION # TODO: Add .ACTION too
127 else [entity_type]
128 )
129
130 # Get time when it should timeout
131 timeout_end = time.time() + timeout
132
133 while True:
134 change = await allwatcher.Next()
135 for delta in change.deltas:
136 write = False
137 delta_entity = None
138
139 # Get delta EntityType
140 delta_entity = EntityType.get_entity_from_delta(delta.entity)
141
142 if delta_entity in entity_types:
143 # Get entity id
144 if entity_type == EntityType.APPLICATION:
145 id = (
146 delta.data["application"]
147 if delta_entity == EntityType.UNIT
148 else delta.data["name"]
149 )
150 else:
151 id = delta.data["id"]
152
153 # Write if the entity id match
154 write = True if id == entity_id else False
155
156 # Update timeout
157 timeout_end = time.time() + timeout
158 (status, status_message, vca_status) = JujuModelWatcher.get_status(
159 delta, entity_type=delta_entity
160 )
161
162 if write and n2vc is not None and db_dict:
163 # Write status to DB
164 status = n2vc.osm_status(delta_entity, status)
165 await n2vc.write_app_status_to_db(
166 db_dict=db_dict,
167 status=status,
168 detailed_status=status_message,
169 vca_status=vca_status,
170 entity_type=delta_entity.value.__name__.lower(),
171 )
172 # Check if timeout
173 if time.time() > timeout_end:
174 raise asyncio.TimeoutError()
175
176 @staticmethod
177 def get_status(delta: Delta, entity_type: EntityType) -> (str, str, str):
178 """
179 Get status from delta
180
181 :param: delta: Delta generated by the allwatcher
182 :param: entity_type: EntityType (p.e. .APPLICATION, .MACHINE, and .ACTION)
183
184 :return (status, message, vca_status)
185 """
186 if entity_type == EntityType.MACHINE:
187 return (
188 delta.data["agent-status"]["current"],
189 delta.data["instance-status"]["message"],
190 delta.data["instance-status"]["current"],
191 )
192 elif entity_type == EntityType.ACTION:
193 return (
194 delta.data["status"],
195 delta.data["status"],
196 delta.data["status"],
197 )
198 elif entity_type == EntityType.APPLICATION:
199 return (
200 delta.data["status"]["current"],
201 delta.data["status"]["message"],
202 delta.data["status"]["current"],
203 )
204 elif entity_type == EntityType.UNIT:
205 return (
206 delta.data["workload-status"]["current"],
207 delta.data["workload-status"]["message"],
208 delta.data["workload-status"]["current"],
209 )