1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus reading VIM messages.
18 It is based on asyncio.
19 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
25 from http
import HTTPStatus
27 from osm_common
import dbmongo
, dbmemory
, msglocal
, msgkafka
28 from osm_common
.dbbase
import DbException
29 from osm_common
.msgbase
import MsgException
30 from osm_ng_ro
.ns
import NsException
33 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
36 class VimAdminException(Exception):
38 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
39 self
.http_code
= http_code
40 Exception.__init
__(self
, message
)
43 class VimAdminThread(threading
.Thread
):
44 MAX_TIME_LOCKED
= 3600 # 1h
45 MAX_TIME_UNATTENDED
= 600 # 10min
46 TIME_CHECK_UNUSED_VIM
= 3600 * 2 # 2h
47 kafka_topics
= ("vim_account", "wim_account", "sdn")
49 def __init__(self
, config
, engine
):
52 :param config: configuration parameters of database and messaging
53 :param engine: an instance of Engine class, used for deleting instances
55 threading
.Thread
.__init
__(self
)
56 self
.to_terminate
= False
62 self
.last_rotask_time
= 0
63 self
.next_check_unused_vim
= time() + self
.TIME_CHECK_UNUSED_VIM
64 self
.logger
= logging
.getLogger("ro.vimadmin")
65 self
.aiomain_task_kafka
= None # asyncio task for receiving vim actions from kafka bus
66 self
.aiomain_task_vim
= None # asyncio task for watching ro_tasks not processed by nobody
68 async def vim_watcher(self
):
69 """ Reads database periodically looking for tasks not processed by nobody because of a reboot
70 in order to load this vim"""
71 # firstly read VIMS not processed
72 for target_database
in ("vim_accounts", "wim_accounts", "sdns"):
73 unattended_targets
= self
.db
.get_list(target_database
,
74 q_filter
={"_admin.operations.operationState": "PROCESSING"})
75 for target
in unattended_targets
:
76 target_id
= "{}:{}".format(target_database
[:3], target
["_id"])
77 self
.logger
.info("ordered to check {}".format(target_id
))
78 self
.engine
.check_vim(target_id
)
80 while not self
.to_terminate
:
83 if not self
.last_rotask_time
:
84 self
.last_rotask_time
= 0
85 ro_tasks
= self
.db
.get_list("ro_tasks",
86 q_filter
={"target_id.ncont": self
.engine
.get_assigned_vims(),
87 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
88 "locked_at.lt": now
- self
.MAX_TIME_LOCKED
,
89 "to_check_at.gt": self
.last_rotask_time
,
90 "to_check_at.lte": now
- self
.MAX_TIME_UNATTENDED
})
91 self
.last_rotask_time
= now
- self
.MAX_TIME_UNATTENDED
92 for ro_task
in ro_tasks
:
93 # if already checked ignore
94 if ro_task
["target_id"] in processed_vims
:
96 processed_vims
.append(ro_task
["target_id"])
97 # if already assigned ignore
98 if ro_task
["target_id"] in self
.engine
.get_assigned_vims():
100 # if there is some task locked on this VIM, there is an RO working on it, so ignore
101 if self
.db
.get_list("ro_tasks",
102 q_filter
={"target_id": ro_task
["target_id"],
103 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
104 "locked_at.gt": now
- self
.MAX_TIME_LOCKED
}):
106 # unattended, assign vim
107 self
.engine
.assign_vim(ro_task
["target_id"])
108 self
.logger
.debug("ordered to load {}. Inactivity detected".format(ro_task
["target_id"]))
110 # every 2 hours check if there are vims without any ro_task and unload it
111 if now
> self
.next_check_unused_vim
:
112 self
.next_check_unused_vim
= now
+ self
.TIME_CHECK_UNUSED_VIM
113 self
.engine
.unload_unused_vims()
114 await asyncio
.sleep(self
.MAX_TIME_UNATTENDED
, loop
=self
.loop
)
116 async def aiomain(self
):
118 while not self
.to_terminate
:
120 if not self
.aiomain_task_kafka
:
121 # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
122 await self
.msg
.aiowrite("vim_account", "echo", "dummy message", loop
=self
.loop
)
124 self
.logger
.debug("Starting vim_account subscription task")
125 self
.aiomain_task_kafka
= asyncio
.ensure_future(
126 self
.msg
.aioread(self
.kafka_topics
, loop
=self
.loop
, group_id
=False,
127 aiocallback
=self
._msg
_callback
),
129 if not self
.aiomain_task_vim
:
130 self
.aiomain_task_vim
= asyncio
.ensure_future(
133 done
, _
= await asyncio
.wait([self
.aiomain_task_kafka
, self
.aiomain_task_vim
],
134 timeout
=None, loop
=self
.loop
, return_when
=asyncio
.FIRST_COMPLETED
)
136 if self
.aiomain_task_kafka
in done
:
137 exc
= self
.aiomain_task_kafka
.exception()
138 self
.logger
.error("kafka subscription task exception: {}".format(exc
))
139 self
.aiomain_task_kafka
= None
140 if self
.aiomain_task_vim
in done
:
141 exc
= self
.aiomain_task_vim
.exception()
142 self
.logger
.error("vim_account watcher task exception: {}".format(exc
))
143 self
.aiomain_task_vim
= None
144 except asyncio
.CancelledError
:
147 except Exception as e
:
148 if self
.to_terminate
:
151 # logging only first time
152 self
.logger
.critical("Error accessing kafka '{}'. Retrying ...".format(e
))
153 kafka_working
= False
154 await asyncio
.sleep(10, loop
=self
.loop
)
161 self
.loop
= asyncio
.new_event_loop()
164 if self
.config
["database"]["driver"] == "mongo":
165 self
.db
= dbmongo
.DbMongo()
166 self
.db
.db_connect(self
.config
["database"])
167 elif self
.config
["database"]["driver"] == "memory":
168 self
.db
= dbmemory
.DbMemory()
169 self
.db
.db_connect(self
.config
["database"])
171 raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
172 self
.config
["database"]["driver"]))
174 config_msg
= self
.config
["message"].copy()
175 config_msg
["loop"] = self
.loop
176 if config_msg
["driver"] == "local":
177 self
.msg
= msglocal
.MsgLocal()
178 self
.msg
.connect(config_msg
)
179 elif config_msg
["driver"] == "kafka":
180 self
.msg
= msgkafka
.MsgKafka()
181 self
.msg
.connect(config_msg
)
183 raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
184 config_msg
["driver"]))
185 except (DbException
, MsgException
) as e
:
186 raise VimAdminException(str(e
), http_code
=e
.http_code
)
188 self
.logger
.info("Starting")
189 while not self
.to_terminate
:
191 self
.loop
.run_until_complete(asyncio
.ensure_future(self
.aiomain(), loop
=self
.loop
))
192 # except asyncio.CancelledError:
193 # break # if cancelled it should end, breaking loop
194 except Exception as e
:
195 if not self
.to_terminate
:
196 self
.logger
.exception("Exception '{}' at messaging read loop".format(e
), exc_info
=True)
198 self
.logger
.info("Finishing")
202 async def _msg_callback(self
, topic
, command
, params
):
204 Callback to process a received message from kafka
205 :param topic: topic received
206 :param command: command received
207 :param params: rest of parameters
211 if command
== "echo":
213 if topic
in self
.kafka_topics
:
214 target
= topic
[0:3] # vim, wim or sdn
215 target_id
= target
+ ":" + params
["_id"]
216 if command
in ("edited", "edit"):
217 self
.engine
.reload_vim(target_id
)
218 self
.logger
.debug("ordered to reload {}".format(target_id
))
219 elif command
in ("deleted", "delete"):
220 self
.engine
.unload_vim(target_id
)
221 self
.logger
.debug("ordered to unload {}".format(target_id
))
222 elif command
in ("create", "created"):
223 self
.engine
.check_vim(target_id
)
224 self
.logger
.debug("ordered to check {}".format(target_id
))
226 except (NsException
, DbException
, MsgException
) as e
:
227 self
.logger
.error("Error while processing topic={} command={}: {}".format(topic
, command
, e
))
228 except Exception as e
:
229 self
.logger
.exception("Exception while processing topic={} command={}: {}".format(topic
, command
, e
),
234 Close all connections
239 self
.db
.db_disconnect()
241 self
.msg
.disconnect()
242 except (DbException
, MsgException
) as e
:
243 raise VimAdminException(str(e
), http_code
=e
.http_code
)
247 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
251 self
.to_terminate
= True
252 if self
.aiomain_task_kafka
:
253 self
.loop
.call_soon_threadsafe(self
.aiomain_task_kafka
.cancel
)
254 if self
.aiomain_task_vim
:
255 self
.loop
.call_soon_threadsafe(self
.aiomain_task_vim
.cancel
)