| Felipe Vicens | c2033f2 | 2018-11-15 15:09:58 +0100 | [diff] [blame] | 1 | #!/usr/bin/python3 |
| 2 | # -*- coding: utf-8 -*- |
| 3 | |
| 4 | import asyncio |
| 5 | import logging |
| 6 | import logging.handlers |
| 7 | import traceback |
| 8 | import ns |
| 9 | from lcm_utils import LcmException, LcmBase |
| 10 | from osm_common.dbbase import DbException |
| 11 | from time import time |
| 12 | |
| 13 | __author__ = "Felipe Vicens, Pol Alemany, Alfonso Tierno" |
| 14 | |
| 15 | |
| 16 | class NetsliceLcm(LcmBase): |
| 17 | |
| 18 | def __init__(self, db, msg, fs, lcm_tasks, ro_config, vca_config, loop): |
| 19 | """ |
| 20 | Init, Connect to database, filesystem storage, and messaging |
| 21 | :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', |
| 22 | :return: None |
| 23 | """ |
| 24 | # logging |
| 25 | self.logger = logging.getLogger('lcm.netslice') |
| 26 | self.loop = loop |
| 27 | self.lcm_tasks = lcm_tasks |
| 28 | self.ns = ns.NsLcm(db, msg, fs, lcm_tasks, ro_config, vca_config, loop) |
| 29 | |
| 30 | super().__init__(db, msg, fs, self.logger) |
| 31 | |
| 32 | # TODO: check logging_text within the self.logger.info/debug |
| 33 | async def instantiate(self, nsir_id, nsilcmop_id): |
| 34 | logging_text = "Task netslice={} instantiate={} ".format(nsir_id, nsilcmop_id) |
| 35 | self.logger.debug(logging_text + "Enter") |
| 36 | # get all needed from database |
| 37 | exc = None |
| 38 | db_nsir = None |
| 39 | db_nsilcmop = None |
| 40 | db_nsir_update = {"_admin.nsilcmop": nsilcmop_id} |
| 41 | db_nsilcmop_update = {} |
| 42 | nsilcmop_operation_state = None |
| 43 | |
| 44 | try: |
| 45 | step = "Getting nsir={} from db".format(nsir_id) |
| 46 | db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) |
| 47 | step = "Getting nsilcmop={} from db".format(nsilcmop_id) |
| 48 | db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id}) |
| 49 | |
| 50 | # look if previous tasks is in process |
| 51 | task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id) |
| 52 | if task_dependency: |
| 53 | step = db_nsilcmop_update["detailed-status"] = \ |
| 54 | "Waiting for related tasks to be completed: {}".format(task_name) |
| 55 | self.logger.debug(logging_text + step) |
| 56 | self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) |
| 57 | _, pending = await asyncio.wait(task_dependency, timeout=3600) |
| 58 | if pending: |
| 59 | raise LcmException("Timeout waiting related tasks to be completed") |
| 60 | |
| 61 | # Empty list to keep track of network service records status in the netslice |
| 62 | nsir_admin = db_nsir["_admin"] |
| 63 | |
| 64 | nsir_admin["nsrs-detailed-list"] = [] |
| 65 | |
| 66 | # Slice status Creating |
| 67 | db_nsir_update["detailed-status"] = "creating" |
| 68 | db_nsir_update["operational-status"] = "init" |
| 69 | self.update_db_2("nsis", nsir_id, db_nsir_update) |
| 70 | |
| 71 | # Iterate over the network services operation ids to instantiate NSs |
| 72 | # TODO: (future improvement) look another way check the tasks instead of keep asking |
| 73 | # -> https://docs.python.org/3/library/asyncio-task.html#waiting-primitives |
| 74 | # steps: declare ns_tasks, add task when terminate is called, await asyncio.wait(vca_task_list, timeout=300) |
| 75 | # ns_tasks = [] |
| 76 | nslcmop_ids = db_nsilcmop["operationParams"].get("nslcmops_ids") |
| 77 | for nslcmop_id in nslcmop_ids: |
| 78 | nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) |
| 79 | nsr_id = nslcmop.get("nsInstanceId") |
| 80 | step = "Launching ns={} instantiate={} task".format(nsr_id, nslcmop) |
| 81 | task = asyncio.ensure_future(self.ns.instantiate(nsr_id, nslcmop_id)) |
| 82 | self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task) |
| 83 | |
| 84 | # Wait until Network Slice is ready |
| 85 | step = nsir_status_detailed = " Waiting nsi ready. nsi_id={}".format(nsir_id) |
| 86 | nsrs_detailed_list_old = None |
| 87 | self.logger.debug(logging_text + step) |
| 88 | |
| 89 | # TODO: substitute while for await (all task to be done or not) |
| 90 | deployment_timeout = 2 * 3600 # Two hours |
| 91 | while deployment_timeout > 0: |
| 92 | # Check ns instantiation status |
| 93 | nsi_ready = True |
| 94 | nsrs_detailed_list = [] |
| 95 | for nslcmop_item in nslcmop_ids: |
| 96 | nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_item}) |
| 97 | status = nslcmop.get("operationState") |
| 98 | # TODO: (future improvement) other possible status: ROLLING_BACK,ROLLED_BACK |
| 99 | nsrs_detailed_list.append({"nsrId": nslcmop["nsInstanceId"], "status": nslcmop["operationState"], |
| 100 | "detailed-status": |
| 101 | nsir_status_detailed + "; {}".format(nslcmop.get("detailed-status"))}) |
| 102 | if status not in ["COMPLETED", "PARTIALLY_COMPLETED", "FAILED", "FAILED_TEMP"]: |
| 103 | nsi_ready = False |
| 104 | |
| 105 | if nsrs_detailed_list != nsrs_detailed_list_old: |
| 106 | nsir_admin["nsrs-detailed-list"] = nsrs_detailed_list |
| 107 | nsrs_detailed_list_old = nsrs_detailed_list |
| 108 | db_nsir_update["_admin"] = nsir_admin |
| 109 | self.update_db_2("nsis", nsir_id, db_nsir_update) |
| 110 | |
| 111 | if nsi_ready: |
| 112 | step = "Network Slice Instance is ready. nsi_id={}".format(nsir_id) |
| 113 | for items in nsrs_detailed_list: |
| 114 | if "FAILED" in items.values(): |
| 115 | raise LcmException("Error deploying NSI: {}".format(nsir_id)) |
| 116 | break |
| 117 | |
| 118 | # TODO: future improvement due to synchronism -> await asyncio.wait(vca_task_list, timeout=300) |
| 119 | await asyncio.sleep(5, loop=self.loop) |
| 120 | deployment_timeout -= 5 |
| 121 | |
| 122 | if deployment_timeout <= 0: |
| 123 | raise LcmException("Timeout waiting nsi to be ready. nsi_id={}".format(nsir_id)) |
| 124 | |
| 125 | db_nsir_update["operational-status"] = "running" |
| 126 | db_nsir_update["detailed-status"] = "done" |
| 127 | db_nsir_update["config-status"] = "configured" |
| 128 | db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "COMPLETED" |
| 129 | db_nsilcmop_update["statusEnteredTime"] = time() |
| 130 | db_nsilcmop_update["detailed-status"] = "done" |
| 131 | return |
| 132 | |
| 133 | except (LcmException, DbException) as e: |
| 134 | self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e)) |
| 135 | exc = e |
| 136 | except asyncio.CancelledError: |
| 137 | self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step)) |
| 138 | exc = "Operation was cancelled" |
| 139 | except Exception as e: |
| 140 | exc = traceback.format_exc() |
| 141 | self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e), |
| 142 | exc_info=True) |
| 143 | finally: |
| 144 | if exc: |
| 145 | if db_nsir: |
| 146 | db_nsir_update["detailed-status"] = "ERROR {}: {}".format(step, exc) |
| 147 | db_nsir_update["operational-status"] = "failed" |
| 148 | if db_nsilcmop: |
| 149 | db_nsilcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) |
| 150 | db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "FAILED" |
| 151 | db_nsilcmop_update["statusEnteredTime"] = time() |
| 152 | if db_nsir: |
| 153 | db_nsir_update["_admin.nsiState"] = "INSTANTIATED" |
| 154 | db_nsir_update["_admin.nsilcmop"] = None |
| 155 | self.update_db_2("nsis", nsir_id, db_nsir_update) |
| 156 | if db_nsilcmop: |
| 157 | |
| 158 | self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) |
| 159 | if nsilcmop_operation_state: |
| 160 | try: |
| 161 | await self.msg.aiowrite("nsi", "instantiated", {"nsir_id": nsir_id, "nsilcmop_id": nsilcmop_id, |
| 162 | "operationState": nsilcmop_operation_state}) |
| 163 | except Exception as e: |
| 164 | self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) |
| 165 | self.logger.debug(logging_text + "Exit") |
| 166 | self.lcm_tasks.remove("nsi", nsir_id, nsilcmop_id, "nsi_instantiate") |
| 167 | |
| 168 | async def terminate(self, nsir_id, nsilcmop_id): |
| 169 | logging_text = "Task nsi={} terminate={} ".format(nsir_id, nsilcmop_id) |
| 170 | self.logger.debug(logging_text + "Enter") |
| 171 | exc = None |
| 172 | db_nsir = None |
| 173 | db_nsilcmop = None |
| 174 | db_nsir_update = {"_admin.nsilcmop": nsilcmop_id} |
| 175 | db_nsilcmop_update = {} |
| 176 | nsilcmop_operation_state = None |
| 177 | try: |
| 178 | step = "Getting nsir={} from db".format(nsir_id) |
| 179 | db_nsir = self.db.get_one("nsis", {"_id": nsir_id}) |
| 180 | step = "Getting nsilcmop={} from db".format(nsilcmop_id) |
| 181 | db_nsilcmop = self.db.get_one("nsilcmops", {"_id": nsilcmop_id}) |
| 182 | |
| 183 | # TODO: Check if makes sense check the nsiState=NOT_INSTANTIATED when terminate |
| 184 | # CASE: Instance was terminated but there is a second request to terminate the instance |
| 185 | if db_nsir["_admin"]["nsiState"] == "NOT_INSTANTIATED": |
| 186 | return |
| 187 | |
| 188 | # Slice status Terminating |
| 189 | db_nsir_update["operational-status"] = "terminating" |
| 190 | db_nsir_update["config-status"] = "terminating" |
| 191 | self.update_db_2("nsis", nsir_id, db_nsir_update) |
| 192 | |
| 193 | # look if previous tasks is in process |
| 194 | task_name, task_dependency = self.lcm_tasks.lookfor_related("nsi", nsir_id, nsilcmop_id) |
| 195 | if task_dependency: |
| 196 | step = db_nsilcmop_update["detailed-status"] = \ |
| 197 | "Waiting for related tasks to be completed: {}".format(task_name) |
| 198 | self.logger.debug(logging_text + step) |
| 199 | self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) |
| 200 | _, pending = await asyncio.wait(task_dependency, timeout=3600) |
| 201 | if pending: |
| 202 | raise LcmException("Timeout waiting related tasks to be completed") |
| 203 | |
| 204 | # Gets the list to keep track of network service records status in the netslice |
| 205 | nsir_admin = db_nsir["_admin"] |
| 206 | nsrs_detailed_list = [] |
| 207 | |
| 208 | # Iterate over the network services operation ids to terminate NSs |
| 209 | # TODO: (future improvement) look another way check the tasks instead of keep asking |
| 210 | # -> https://docs.python.org/3/library/asyncio-task.html#waiting-primitives |
| 211 | # steps: declare ns_tasks, add task when terminate is called, await asyncio.wait(vca_task_list, timeout=300) |
| 212 | # ns_tasks = [] |
| 213 | nslcmop_ids = db_nsilcmop["operationParams"].get("nslcmops_ids") |
| 214 | for nslcmop_id in nslcmop_ids: |
| 215 | nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id}) |
| 216 | nsr_id = nslcmop["operationParams"].get("nsInstanceId") |
| 217 | task = asyncio.ensure_future(self.ns.terminate(nsr_id, nslcmop_id)) |
| 218 | self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_instantiate", task) |
| 219 | |
| 220 | # Wait until Network Slice is terminated |
| 221 | step = nsir_status_detailed = " Waiting nsi terminated. nsi_id={}".format(nsir_id) |
| 222 | nsrs_detailed_list_old = None |
| 223 | self.logger.debug(logging_text + step) |
| 224 | |
| 225 | termination_timeout = 2 * 3600 # Two hours |
| 226 | while termination_timeout > 0: |
| 227 | # Check ns termination status |
| 228 | nsi_ready = True |
| 229 | nsrs_detailed_list = [] |
| 230 | for nslcmop_item in nslcmop_ids: |
| 231 | nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_item}) |
| 232 | status = nslcmop["operationState"] |
| 233 | # TODO: (future improvement) other possible status: ROLLING_BACK,ROLLED_BACK |
| 234 | nsrs_detailed_list.append({"nsrId": nslcmop["nsInstanceId"], "status": nslcmop["operationState"], |
| 235 | "detailed-status": |
| 236 | nsir_status_detailed + "; {}".format(nslcmop.get("detailed-status"))}) |
| 237 | if status not in ["COMPLETED", "PARTIALLY_COMPLETED", "FAILED", "FAILED_TEMP"]: |
| 238 | nsi_ready = False |
| 239 | |
| 240 | if nsrs_detailed_list != nsrs_detailed_list_old: |
| 241 | nsir_admin["nsrs-detailed-list"] = nsrs_detailed_list |
| 242 | nsrs_detailed_list_old = nsrs_detailed_list |
| 243 | db_nsir_update["_admin"] = nsir_admin |
| 244 | self.update_db_2("nsis", nsir_id, db_nsir_update) |
| 245 | |
| 246 | if nsi_ready: |
| 247 | step = "Network Slice Instance is terminated. nsi_id={}".format(nsir_id) |
| 248 | for items in nsrs_detailed_list: |
| 249 | if "FAILED" in items.values(): |
| 250 | raise LcmException("Error terminating NSI: {}".format(nsir_id)) |
| 251 | break |
| 252 | |
| 253 | await asyncio.sleep(5, loop=self.loop) |
| 254 | termination_timeout -= 5 |
| 255 | |
| 256 | if termination_timeout <= 0: |
| 257 | raise LcmException("Timeout waiting nsi to be terminated. nsi_id={}".format(nsir_id)) |
| 258 | |
| 259 | db_nsir_update["operational-status"] = "terminated" |
| 260 | db_nsir_update["config-status"] = "configured" |
| 261 | db_nsir_update["detailed-status"] = "done" |
| 262 | db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "COMPLETED" |
| 263 | db_nsilcmop_update["statusEnteredTime"] = time() |
| 264 | db_nsilcmop_update["detailed-status"] = "done" |
| 265 | return |
| 266 | |
| 267 | except (LcmException, DbException) as e: |
| 268 | self.logger.error(logging_text + "Exit Exception while '{}': {}".format(step, e)) |
| 269 | exc = e |
| 270 | except asyncio.CancelledError: |
| 271 | self.logger.error(logging_text + "Cancelled Exception while '{}'".format(step)) |
| 272 | exc = "Operation was cancelled" |
| 273 | except Exception as e: |
| 274 | exc = traceback.format_exc() |
| 275 | self.logger.critical(logging_text + "Exit Exception {} while '{}': {}".format(type(e).__name__, step, e), |
| 276 | exc_info=True) |
| 277 | finally: |
| 278 | if exc: |
| 279 | if db_nsir: |
| 280 | db_nsir_update["detailed-status"] = "ERROR {}: {}".format(step, exc) |
| 281 | db_nsir_update["operational-status"] = "failed" |
| 282 | if db_nsilcmop: |
| 283 | db_nsilcmop_update["detailed-status"] = "FAILED {}: {}".format(step, exc) |
| 284 | db_nsilcmop_update["operationState"] = nsilcmop_operation_state = "FAILED" |
| 285 | db_nsilcmop_update["statusEnteredTime"] = time() |
| 286 | if db_nsir: |
| 287 | db_nsir_update["_admin.nsilcmop"] = None |
| 288 | db_nsir_update["_admin.nsiState"] = "TERMINATED" |
| 289 | self.update_db_2("nsis", nsir_id, db_nsir_update) |
| 290 | if db_nsilcmop: |
| 291 | self.update_db_2("nsilcmops", nsilcmop_id, db_nsilcmop_update) |
| 292 | |
| 293 | if nsilcmop_operation_state: |
| 294 | try: |
| 295 | await self.msg.aiowrite("nsi", "terminated", {"nsir_id": nsir_id, "nsilcmop_id": nsilcmop_id, |
| 296 | "operationState": nsilcmop_operation_state}) |
| 297 | except Exception as e: |
| 298 | self.logger.error(logging_text + "kafka_write notification Exception {}".format(e)) |
| 299 | self.logger.debug(logging_text + "Exit") |
| 300 | self.lcm_tasks.remove("nsi", nsir_id, nsilcmop_id, "nsi_terminate") |