1 # -*- coding: utf-8 -*-
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 This module implements a thread that reads from kafka bus reading VIM messages.
18 It is based on asyncio.
19 It is in charge of load tasks assigned to VIMs that nobody is in chage of it
25 from http
import HTTPStatus
27 from osm_common
import dbmongo
, dbmemory
, msglocal
, msgkafka
28 from osm_common
.dbbase
import DbException
29 from osm_common
.msgbase
import MsgException
30 from osm_ng_ro
.ns
import NsException
33 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
36 class VimAdminException(Exception):
38 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
39 self
.http_code
= http_code
40 Exception.__init
__(self
, message
)
43 class VimAdminThread(threading
.Thread
):
44 MAX_TIME_LOCKED
= 3600 # 1h
45 MAX_TIME_UNATTENDED
= 60 # 600 # 10min
46 kafka_topics
= ("vim_account", "wim_account", "sdn")
48 def __init__(self
, config
, engine
):
51 :param config: configuration parameters of database and messaging
52 :param engine: an instance of Engine class, used for deleting instances
54 threading
.Thread
.__init
__(self
)
55 self
.to_terminate
= False
61 self
.last_rotask_time
= 0
62 self
.logger
= logging
.getLogger("ro.vimadmin")
63 self
.aiomain_task_kafka
= None # asyncio task for receiving vim actions from kafka bus
64 self
.aiomain_task_vim
= None # asyncio task for watching ro_tasks not processed by nobody
66 async def vim_watcher(self
):
67 """ Reads database periodically looking for tasks not processed by nobody because of a restar
68 in order to load this vim"""
69 while not self
.to_terminate
:
71 if not self
.last_rotask_time
:
72 self
.last_rotask_time
= 0
73 ro_tasks
= self
.db
.get_list("ro_tasks",
74 q_filter
={"target_id.ncont": self
.engine
.assignment_list
,
75 "tasks.status": ['SCHEDULED', 'BUILD', 'DONE', 'FAILED'],
76 "locked_at.lt": now
- self
.MAX_TIME_LOCKED
,
77 "to_check_at.gt": self
.last_rotask_time
,
78 "to_check_at.lte": now
- self
.MAX_TIME_UNATTENDED
})
79 self
.last_rotask_time
= now
- self
.MAX_TIME_UNATTENDED
80 for ro_task
in ro_tasks
:
81 if ro_task
["target_id"] not in self
.engine
.assignment_list
:
82 self
.engine
.assign_vim(ro_task
["target_id"])
83 self
.logger
.debug("ordered to load {}. Inactivity detected".format(ro_task
["target_id"]))
85 await asyncio
.sleep(300, loop
=self
.loop
)
87 async def aiomain(self
):
89 while not self
.to_terminate
:
91 if not self
.aiomain_task_kafka
:
92 # await self.msg.aiowrite("admin", "echo", "dummy message", loop=self.loop)
93 await self
.msg
.aiowrite("vim_account", "echo", "dummy message", loop
=self
.loop
)
95 self
.logger
.debug("Starting vim_account subscription task")
96 self
.aiomain_task_kafka
= asyncio
.ensure_future(
97 self
.msg
.aioread(self
.kafka_topics
, loop
=self
.loop
, group_id
=False,
98 aiocallback
=self
._msg
_callback
),
100 if not self
.aiomain_task_vim
:
101 self
.aiomain_task_vim
= asyncio
.ensure_future(
104 done
, _
= await asyncio
.wait([self
.aiomain_task_kafka
, self
.aiomain_task_vim
],
105 timeout
=None, loop
=self
.loop
, return_when
=asyncio
.FIRST_COMPLETED
)
107 if self
.aiomain_task_kafka
in done
:
108 exc
= self
.aiomain_task_kafka
.exception()
109 self
.logger
.error("kafka subscription task exception: {}".format(exc
))
110 self
.aiomain_task_kafka
= None
111 if self
.aiomain_task_vim
in done
:
112 exc
= self
.aiomain_task_vim
.exception()
113 self
.logger
.error("vim_account watcher task exception: {}".format(exc
))
114 self
.aiomain_task_vim
= None
115 except asyncio
.CancelledError
:
118 except Exception as e
:
119 if self
.to_terminate
:
122 # logging only first time
123 self
.logger
.critical("Error accessing kafka '{}'. Retrying ...".format(e
))
124 kafka_working
= False
125 await asyncio
.sleep(10, loop
=self
.loop
)
132 self
.loop
= asyncio
.new_event_loop()
135 if self
.config
["database"]["driver"] == "mongo":
136 self
.db
= dbmongo
.DbMongo()
137 self
.db
.db_connect(self
.config
["database"])
138 elif self
.config
["database"]["driver"] == "memory":
139 self
.db
= dbmemory
.DbMemory()
140 self
.db
.db_connect(self
.config
["database"])
142 raise VimAdminException("Invalid configuration param '{}' at '[database]':'driver'".format(
143 self
.config
["database"]["driver"]))
145 config_msg
= self
.config
["message"].copy()
146 config_msg
["loop"] = self
.loop
147 if config_msg
["driver"] == "local":
148 self
.msg
= msglocal
.MsgLocal()
149 self
.msg
.connect(config_msg
)
150 elif config_msg
["driver"] == "kafka":
151 self
.msg
= msgkafka
.MsgKafka()
152 self
.msg
.connect(config_msg
)
154 raise VimAdminException("Invalid configuration param '{}' at '[message]':'driver'".format(
155 config_msg
["driver"]))
156 except (DbException
, MsgException
) as e
:
157 raise VimAdminException(str(e
), http_code
=e
.http_code
)
159 self
.logger
.debug("Starting")
160 while not self
.to_terminate
:
162 self
.loop
.run_until_complete(asyncio
.ensure_future(self
.aiomain(), loop
=self
.loop
))
163 # except asyncio.CancelledError:
164 # break # if cancelled it should end, breaking loop
165 except Exception as e
:
166 if not self
.to_terminate
:
167 self
.logger
.exception("Exception '{}' at messaging read loop".format(e
), exc_info
=True)
169 self
.logger
.debug("Finishing")
173 async def _msg_callback(self
, topic
, command
, params
):
175 Callback to process a received message from kafka
176 :param topic: topic received
177 :param command: command received
178 :param params: rest of parameters
182 if command
== "echo":
184 if topic
in self
.kafka_topics
:
185 target
= topic
[0:3] # vim, wim or sdn
186 target_id
= target
+ ":" + params
["_id"]
187 if command
in ("edited", "edit"):
188 self
.engine
.reload_vim(target_id
)
189 self
.logger
.debug("ordered to reload {}".format(target_id
))
190 elif command
in ("deleted", "delete"):
191 self
.engine
.unload_vim(target_id
)
192 self
.logger
.debug("ordered to unload {}".format(target_id
))
193 elif command
in ("create", "created"):
194 self
.engine
.check_vim(target_id
)
195 self
.logger
.debug("ordered to check {}".format(target_id
))
197 except (NsException
, DbException
, MsgException
) as e
:
198 self
.logger
.error("Error while processing topic={} command={}: {}".format(topic
, command
, e
))
199 except Exception as e
:
200 self
.logger
.exception("Exception while processing topic={} command={}: {}".format(topic
, command
, e
),
205 Close all connections
210 self
.db
.db_disconnect()
212 self
.msg
.disconnect()
213 except (DbException
, MsgException
) as e
:
214 raise VimAdminException(str(e
), http_code
=e
.http_code
)
218 This is a threading safe method to terminate this thread. Termination is done asynchronous afterwards,
222 self
.to_terminate
= True
223 if self
.aiomain_task_kafka
:
224 self
.loop
.call_soon_threadsafe(self
.aiomain_task_kafka
.cancel
)
225 if self
.aiomain_task_vim
:
226 self
.loop
.call_soon_threadsafe(self
.aiomain_task_vim
.cancel
)