Revert "Revert ""Remove unused lines of code"""
[osm/N2VC.git] / n2vc / juju_watcher.py
1 # Copyright 2020 Canonical Ltd.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import asyncio
16 import time
17 from juju.client import client
18 from n2vc.utils import FinalStatus, EntityType
19 from n2vc.exceptions import EntityInvalidException
20 from n2vc.n2vc_conn import N2VCConnector
21 from juju.model import ModelEntity, Model
22 from juju.client.overrides import Delta
23 from websockets.exceptions import ConnectionClosed
24 import logging
25
26 logger = logging.getLogger("__main__")
27
28
29 class JujuModelWatcher:
30 @staticmethod
31 async def wait_for(
32 model,
33 entity: ModelEntity,
34 progress_timeout: float = 3600,
35 total_timeout: float = 3600,
36 db_dict: dict = None,
37 n2vc: N2VCConnector = None,
38 ):
39 """
40 Wait for entity to reach its final state.
41
42 :param: model: Model to observe
43 :param: entity: Entity object
44 :param: progress_timeout: Maximum time between two updates in the model
45 :param: total_timeout: Timeout for the entity to be active
46 :param: db_dict: Dictionary with data of the DB to write the updates
47 :param: n2vc: N2VC Connector objector
48
49 :raises: asyncio.TimeoutError when timeout reaches
50 """
51
52 if progress_timeout is None:
53 progress_timeout = 3600.0
54 if total_timeout is None:
55 total_timeout = 3600.0
56
57 entity_type = EntityType.get_entity(type(entity))
58 if entity_type not in FinalStatus:
59 raise EntityInvalidException("Entity type not found")
60
61 # Get final states
62 final_states = FinalStatus[entity_type].status
63 field_to_check = FinalStatus[entity_type].field
64
65 # Coroutine to wait until the entity reaches the final state
66 wait_for_entity = asyncio.ensure_future(
67 asyncio.wait_for(
68 model.block_until(
69 lambda: entity.__getattribute__(field_to_check) in final_states
70 ),
71 timeout=total_timeout,
72 )
73 )
74
75 # Coroutine to watch the model for changes (and write them to DB)
76 watcher = asyncio.ensure_future(
77 JujuModelWatcher.model_watcher(
78 model,
79 entity_id=entity.entity_id,
80 entity_type=entity_type,
81 timeout=progress_timeout,
82 db_dict=db_dict,
83 n2vc=n2vc,
84 )
85 )
86
87 tasks = [wait_for_entity, watcher]
88 try:
89 # Execute tasks, and stop when the first is finished
90 # The watcher task won't never finish (unless it timeouts)
91 await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
92 except Exception as e:
93 raise e
94 finally:
95 # Cancel tasks
96 for task in tasks:
97 task.cancel()
98
99 @staticmethod
100 async def model_watcher(
101 model: Model,
102 entity_id: str,
103 entity_type: EntityType,
104 timeout: float,
105 db_dict: dict = None,
106 n2vc: N2VCConnector = None,
107 ):
108 """
109 Observes the changes related to an specific entity in a model
110
111 :param: model: Model to observe
112 :param: entity_id: ID of the entity to be observed
113 :param: entity_type: EntityType (p.e. .APPLICATION, .MACHINE, and .ACTION)
114 :param: timeout: Maximum time between two updates in the model
115 :param: db_dict: Dictionary with data of the DB to write the updates
116 :param: n2vc: N2VC Connector objector
117
118 :raises: asyncio.TimeoutError when timeout reaches
119 """
120
121 allwatcher = client.AllWatcherFacade.from_connection(model.connection())
122
123 # Genenerate array with entity types to listen
124 entity_types = (
125 [entity_type, EntityType.UNIT]
126 if entity_type == EntityType.APPLICATION # TODO: Add .ACTION too
127 else [entity_type]
128 )
129
130 # Get time when it should timeout
131 timeout_end = time.time() + timeout
132
133 try:
134 while True:
135 change = await allwatcher.Next()
136 for delta in change.deltas:
137 write = False
138 delta_entity = None
139
140 # Get delta EntityType
141 delta_entity = EntityType.get_entity_from_delta(delta.entity)
142
143 if delta_entity in entity_types:
144 # Get entity id
145 if entity_type == EntityType.APPLICATION:
146 id = (
147 delta.data["application"]
148 if delta_entity == EntityType.UNIT
149 else delta.data["name"]
150 )
151 else:
152 id = delta.data["id"]
153
154 # Write if the entity id match
155 write = True if id == entity_id else False
156
157 # Update timeout
158 timeout_end = time.time() + timeout
159 (status, status_message, vca_status) = JujuModelWatcher.get_status(
160 delta, entity_type=delta_entity
161 )
162
163 if write and n2vc is not None and db_dict:
164 # Write status to DB
165 status = n2vc.osm_status(delta_entity, status)
166 await n2vc.write_app_status_to_db(
167 db_dict=db_dict,
168 status=status,
169 detailed_status=status_message,
170 vca_status=vca_status,
171 entity_type=delta_entity.value.__name__.lower(),
172 )
173 # Check if timeout
174 if time.time() > timeout_end:
175 raise asyncio.TimeoutError()
176 except ConnectionClosed:
177 pass
178 # This is expected to happen when the
179 # entity reaches its final state, because
180 # the model connection is closed afterwards
181
182 @staticmethod
183 def get_status(delta: Delta, entity_type: EntityType) -> (str, str, str):
184 """
185 Get status from delta
186
187 :param: delta: Delta generated by the allwatcher
188 :param: entity_type: EntityType (p.e. .APPLICATION, .MACHINE, and .ACTION)
189
190 :return (status, message, vca_status)
191 """
192 if entity_type == EntityType.MACHINE:
193 return (
194 delta.data["agent-status"]["current"],
195 delta.data["instance-status"]["message"],
196 delta.data["instance-status"]["current"],
197 )
198 elif entity_type == EntityType.ACTION:
199 return (
200 delta.data["status"],
201 delta.data["status"],
202 delta.data["status"],
203 )
204 elif entity_type == EntityType.APPLICATION:
205 return (
206 delta.data["status"]["current"],
207 delta.data["status"]["message"],
208 delta.data["status"]["current"],
209 )
210 elif entity_type == EntityType.UNIT:
211 return (
212 delta.data["workload-status"]["current"],
213 delta.data["workload-status"]["message"],
214 delta.data["workload-status"]["current"],
215 )