blob: b39efe80692d4948747caeb31f1d508f17580cac [file] [log] [blame]
tierno0aef0db2018-02-01 19:13:07 +01001#!/usr/bin/python3
2# -*- coding: utf-8 -*-
3
4import asyncio
5import yaml
6import ROclient
7import dbmemory
8import dbmongo
9import fslocal
10import msglocal
tiernof3c4dbc2018-02-05 14:53:28 +010011import msgkafka
tiernoae501922018-02-06 23:17:16 +010012import logging
13import functools
tierno0aef0db2018-02-01 19:13:07 +010014from dbbase import DbException
15from fsbase import FsException
16from msgbase import MsgException
tiernof3c4dbc2018-02-05 14:53:28 +010017from os import environ
Adam Israel354ead92018-03-18 14:46:23 -040018# from vca import DeployApplication, RemoveApplication
19from n2vc.vnf import N2VC
20import os.path
21import time
22
tiernoae501922018-02-06 23:17:16 +010023from copy import deepcopy
24from http import HTTPStatus
tierno0aef0db2018-02-01 19:13:07 +010025
26class LcmException(Exception):
27 pass
28
29
30class Lcm:
31
tiernof3c4dbc2018-02-05 14:53:28 +010032 def __init__(self, config_file):
tierno0aef0db2018-02-01 19:13:07 +010033 """
34 Init, Connect to database, filesystem storage, and messaging
35 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
36 :return: None
37 """
38 # contains created tasks/futures to be able to cancel
39 self.lcm_tasks = {}
tierno0aef0db2018-02-01 19:13:07 +010040 # logging
41 self.logger = logging.getLogger('lcm')
tiernof3c4dbc2018-02-05 14:53:28 +010042 # load configuration
43 config = self.read_config_file(config_file)
44 self.config = config
tiernoae501922018-02-06 23:17:16 +010045 self.ro_config={
46 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
47 "tenant": config.get("tenant", "osm"),
48 "logger_name": "lcm.ROclient",
49 "loglevel": "ERROR",
50 }
Adam Israel354ead92018-03-18 14:46:23 -040051
tierno0aef0db2018-02-01 19:13:07 +010052 self.vca = config["VCA"] # TODO VCA
53 self.loop = None
tiernoae501922018-02-06 23:17:16 +010054
55 # logging
56 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
57 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
58 config["database"]["logger_name"] = "lcm.db"
59 config["storage"]["logger_name"] = "lcm.fs"
60 config["message"]["logger_name"] = "lcm.msg"
61 if "logfile" in config["global"]:
62 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
63 maxBytes=100e6, backupCount=9, delay=0)
64 file_handler.setFormatter(log_formatter_simple)
65 self.logger.addHandler(file_handler)
66 else:
67 str_handler = logging.StreamHandler()
68 str_handler.setFormatter(log_formatter_simple)
69 self.logger.addHandler(str_handler)
70
71 if config["global"].get("loglevel"):
72 self.logger.setLevel(config["global"]["loglevel"])
73
74 # logging other modules
75 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
76 config[k1]["logger_name"] = logname
77 logger_module = logging.getLogger(logname)
78 if "logfile" in config[k1]:
79 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
80 maxBytes=100e6, backupCount=9, delay=0)
81 file_handler.setFormatter(log_formatter_simple)
82 logger_module.addHandler(file_handler)
83 if "loglevel" in config[k1]:
84 logger_module.setLevel(config[k1]["loglevel"])
85
Adam Israel354ead92018-03-18 14:46:23 -040086 self.n2vc = N2VC(
87 log=self.logger,
88 server=config['VCA']['host'],
89 port=config['VCA']['port'],
90 user=config['VCA']['user'],
91 secret=config['VCA']['secret'],
92 # TODO: This should point to the base folder where charms are stored,
93 # if there is a common one (like object storage). Otherwise, leave
94 # it unset and pass it via DeployCharms
95 # artifacts=config['VCA'][''],
96 artifacts=None,
97 )
98
tierno0aef0db2018-02-01 19:13:07 +010099 try:
100 if config["database"]["driver"] == "mongo":
tiernoae501922018-02-06 23:17:16 +0100101 self.db = dbmongo.DbMongo()
tierno0aef0db2018-02-01 19:13:07 +0100102 self.db.db_connect(config["database"])
103 elif config["database"]["driver"] == "memory":
tiernoae501922018-02-06 23:17:16 +0100104 self.db = dbmemory.DbMemory()
tierno0aef0db2018-02-01 19:13:07 +0100105 self.db.db_connect(config["database"])
106 else:
107 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
108 config["database"]["driver"]))
109
110 if config["storage"]["driver"] == "local":
111 self.fs = fslocal.FsLocal()
112 self.fs.fs_connect(config["storage"])
113 else:
114 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
115 config["storage"]["driver"]))
116
117 if config["message"]["driver"] == "local":
tiernoae501922018-02-06 23:17:16 +0100118 self.msg = msglocal.MsgLocal()
tierno0aef0db2018-02-01 19:13:07 +0100119 self.msg.connect(config["message"])
tiernof3c4dbc2018-02-05 14:53:28 +0100120 elif config["message"]["driver"] == "kafka":
121 self.msg = msgkafka.MsgKafka()
122 self.msg.connect(config["message"])
tierno0aef0db2018-02-01 19:13:07 +0100123 else:
124 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
125 config["storage"]["driver"]))
126 except (DbException, FsException, MsgException) as e:
tiernof3c4dbc2018-02-05 14:53:28 +0100127 self.logger.critical(str(e), exc_info=True)
tierno0aef0db2018-02-01 19:13:07 +0100128 raise LcmException(str(e))
129
tiernoae501922018-02-06 23:17:16 +0100130 def update_nsr_db(self, nsr_id, nsr_desc):
tierno0aef0db2018-02-01 19:13:07 +0100131 try:
tiernoae501922018-02-06 23:17:16 +0100132 self.db.replace("nsrs", nsr_id, nsr_desc)
133 except DbException as e:
134 self.logger.error("Updating nsr_id={}: {}".format(nsr_id, e))
tierno0aef0db2018-02-01 19:13:07 +0100135
Adam Israel354ead92018-03-18 14:46:23 -0400136 def n2vc_callback(self, nsd, vnfd, vnf_member_index, workload_status, *args):
137 """Update the lcm database with the status of the charm.
138
139 Updates the VNF's operational status with the state of the charm:
140 - blocked: The unit needs manual intervention
141 - maintenance: The unit is actively deploying/configuring
142 - waiting: The unit is waiting for another charm to be ready
143 - active: The unit is deployed, configured, and ready
144 - error: The charm has failed and needs attention.
145 - terminated: The charm has been destroyed
146
147 Updates the network service's config-status to reflect the state of all
148 charms.
149 """
150 if workload_status and len(args) == 3:
151 # self.logger.debug("[n2vc_callback] Workload status \"{}\"".format(workload_status))
152 try:
153 (db_nsr, vnf_index, task) = args
154
155 nsr_id = db_nsr["_id"]
156 nsr_lcm = db_nsr["_admin"]["deploy"]
157 nsr_lcm["VCA"][vnf_index]['operational-status'] = workload_status
158
159 if task:
160 if task.cancelled():
161 return
162
163 if task.done():
164 exc = task.exception()
165 if exc:
166 nsr_lcm = db_nsr["_admin"]["deploy"]
167 nsr_lcm["VCA"][vnf_index]['operational-status'] = "failed"
168 db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_index, exc)
169 db_nsr["config-status"] = "failed"
170 self.update_nsr_db(nsr_id, db_nsr)
171 else:
172 units = len(nsr_lcm["VCA"])
173 active = 0
174 statusmap = {}
175 for vnf_index in nsr_lcm["VCA"]:
176 if 'operational-status' in nsr_lcm["VCA"][vnf_index]:
177
178 if nsr_lcm["VCA"][vnf_index]['operational-status'] not in statusmap:
179 # Initialize it
180 statusmap[nsr_lcm["VCA"][vnf_index]['operational-status']] = 0
181
182 statusmap[nsr_lcm["VCA"][vnf_index]['operational-status']] += 1
183
184 if nsr_lcm["VCA"][vnf_index]['operational-status'] == "active":
185 active += 1
186 else:
187 self.logger.debug("No operational-status")
188
189 cs = ""
190 for status in statusmap:
191 cs += "{} ({}) ".format(status, statusmap[status])
192 db_nsr["config-status"] = cs
193 self.update_nsr_db(nsr_id, db_nsr)
194
195 except Exception as e:
196 # self.logger.critical("Task create_ns={} n2vc_callback Exception {}".format(nsr_id, e), exc_info=True)
197 self.logger.critical("Task create_ns n2vc_callback Exception {}".format(e), exc_info=True)
198 pass
199
tiernoae501922018-02-06 23:17:16 +0100200 def vca_deploy_callback(self, db_nsr, vnf_index, status, task):
201 # TODO study using this callback when VCA.DeployApplication success from VCAMonitor
202 # By the moment this callback is used only to capture exception conditions from VCA DeployApplication
203 nsr_id = db_nsr["_id"]
204 self.logger.debug("Task create_ns={} vca_deploy_callback Enter".format(nsr_id))
205 try:
206 if task.cancelled():
207 return
208 if task.done():
209 exc = task.exception()
210 if exc:
211 nsr_lcm = db_nsr["_admin"]["deploy"]
212 nsr_lcm["VCA"][vnf_index]['operational-status'] = "failed"
213 db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_index, exc)
214 db_nsr["config-status"] = "failed"
215 self.update_nsr_db(nsr_id, db_nsr)
216 else:
217 # TODO may be used to be called when VCA monitor status changes
218 pass
219 # except DbException as e:
220 # self.logger.error("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id, e))
221 except Exception as e:
222 self.logger.critical("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id, e), exc_info=True)
223
224 async def create_ns(self, nsr_id, order_id):
225 logging_text = "Task create_ns={} ".format(nsr_id)
226 self.logger.debug(logging_text + "Enter")
227 # get all needed from database
228 db_nsr = None
229 exc = None
230 step = "Getting nsr from db"
231 try:
232 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
233 nsd = db_nsr["nsd"]
234 needed_vnfd = {}
tierno0aef0db2018-02-01 19:13:07 +0100235 for c_vnf in nsd["constituent-vnfd"]:
236 vnfd_id = c_vnf["vnfd-id-ref"]
tiernoae501922018-02-06 23:17:16 +0100237 if vnfd_id not in needed_vnfd:
238 step = "Getting vnfd={} from db".format(vnfd_id)
239 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
240
241 nsr_lcm = {
242 "id": nsr_id,
243 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
244 "nsr_ip": {},
245 "VCA": {},
246 }
247 db_nsr["_admin"]["deploy"] = nsr_lcm
248 db_nsr["detailed-status"] = "creating"
249 db_nsr["operational-status"] = "init"
250
251 deloyment_timeout = 120
252
253 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
254
255 # get vnfds, instantiate at RO
256 for vnfd_id, vnfd in needed_vnfd.items():
257 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
258 self.logger.debug(logging_text + step)
259 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
tierno0aef0db2018-02-01 19:13:07 +0100260
tiernof3c4dbc2018-02-05 14:53:28 +0100261 # look if present
tiernoae501922018-02-06 23:17:16 +0100262 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
tiernof3c4dbc2018-02-05 14:53:28 +0100263 if vnfd_list:
264 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
tiernoae501922018-02-06 23:17:16 +0100265 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
266 vnfd_id, vnfd_list[0]["uuid"]))
tiernof3c4dbc2018-02-05 14:53:28 +0100267 else:
tiernoae501922018-02-06 23:17:16 +0100268 vnfd_RO = deepcopy(vnfd)
269 vnfd_RO.pop("_id", None)
270 vnfd_RO.pop("_admin", None)
271 vnfd_RO["id"] = vnfd_id_RO
272 desc = await RO.create("vnfd", descriptor=vnfd_RO)
tierno0aef0db2018-02-01 19:13:07 +0100273 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
tiernoae501922018-02-06 23:17:16 +0100274 self.update_nsr_db(nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100275
276 # create nsd at RO
tiernoae501922018-02-06 23:17:16 +0100277 nsd_id = nsd["id"]
278 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
279 self.logger.debug(logging_text + step)
tiernof3c4dbc2018-02-05 14:53:28 +0100280
tiernoae501922018-02-06 23:17:16 +0100281 nsd_id_RO = nsd_id + "." + nsd_id[:200]
282 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
tiernof3c4dbc2018-02-05 14:53:28 +0100283 if nsd_list:
284 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
tiernoae501922018-02-06 23:17:16 +0100285 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
286 nsd_id, nsd_list[0]["uuid"]))
tiernof3c4dbc2018-02-05 14:53:28 +0100287 else:
tiernoae501922018-02-06 23:17:16 +0100288 nsd_RO = deepcopy(nsd)
289 nsd_RO["id"] = nsd_id_RO
290 nsd_RO.pop("_id", None)
291 nsd_RO.pop("_admin", None)
292 for c_vnf in nsd_RO["constituent-vnfd"]:
293 vnfd_id = c_vnf["vnfd-id-ref"]
294 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
295 desc = await RO.create("nsd", descriptor=nsd_RO)
tierno0aef0db2018-02-01 19:13:07 +0100296 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
tiernoae501922018-02-06 23:17:16 +0100297 self.update_nsr_db(nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100298
299 # Crate ns at RO
tiernoae501922018-02-06 23:17:16 +0100300 # if present use it unless in error status
301 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
302 if RO_nsr_id:
303 try:
304 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
305 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
306 desc = await RO.show("ns", RO_nsr_id)
307 except ROclient.ROClientException as e:
308 if e.http_code != HTTPStatus.NOT_FOUND:
309 raise
310 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
311 if RO_nsr_id:
312 ns_status, ns_status_info = RO.check_ns_status(desc)
313 nsr_lcm["RO"]["nsr_status"] = ns_status
314 if ns_status == "ERROR":
315 step = db_nsr["detailed-status"] = "Deleting ns at RO"
316 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
317 await RO.delete("ns", RO_nsr_id)
318 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
319
320 if not RO_nsr_id:
321 step = db_nsr["detailed-status"] = "Creating ns at RO"
322 self.logger.debug(logging_text + step)
323
324 desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"],
325 scenario=nsr_lcm["RO"]["nsd_id"])
326 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
327 nsr_lcm["RO"]["nsr_status"] = "BUILD"
328 self.update_nsr_db(nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100329
330 # wait until NS is ready
tiernoae501922018-02-06 23:17:16 +0100331 step = ns_status_detailed = "Waiting ns ready at RO"
332 db_nsr["detailed-status"] = ns_status_detailed
333 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
tierno0aef0db2018-02-01 19:13:07 +0100334 deloyment_timeout = 600
335 while deloyment_timeout > 0:
tierno0aef0db2018-02-01 19:13:07 +0100336 desc = await RO.show("ns", RO_nsr_id)
337 ns_status, ns_status_info = RO.check_ns_status(desc)
338 nsr_lcm["RO"]["nsr_status"] = ns_status
339 if ns_status == "ERROR":
340 raise ROclient.ROClientException(ns_status_info)
341 elif ns_status == "BUILD":
tiernoae501922018-02-06 23:17:16 +0100342 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
343 self.update_nsr_db(nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100344 elif ns_status == "ACTIVE":
345 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
346 break
347 else:
348 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
349
350 await asyncio.sleep(5, loop=self.loop)
351 deloyment_timeout -= 5
352 if deloyment_timeout <= 0:
tiernoae501922018-02-06 23:17:16 +0100353 raise ROclient.ROClientException("Timeout waiting ns to be ready")
tiernof3c4dbc2018-02-05 14:53:28 +0100354 db_nsr["detailed-status"] = "Configuring vnfr"
tiernoae501922018-02-06 23:17:16 +0100355 self.update_nsr_db(nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100356
tiernoae501922018-02-06 23:17:16 +0100357 vnfd_to_config = 0
358 step = "Looking for needed vnfd to configure"
359 self.logger.debug(logging_text + step)
tierno0aef0db2018-02-01 19:13:07 +0100360 for c_vnf in nsd["constituent-vnfd"]:
361 vnfd_id = c_vnf["vnfd-id-ref"]
tiernoae501922018-02-06 23:17:16 +0100362 vnf_index = str(c_vnf["member-vnf-index"])
363 vnfd = needed_vnfd[vnfd_id]
tierno0aef0db2018-02-01 19:13:07 +0100364 if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"):
tiernoae501922018-02-06 23:17:16 +0100365 nsr_lcm["VCA"][vnf_index] = {}
366 vnfd_to_config += 1
tierno0aef0db2018-02-01 19:13:07 +0100367 proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"]
Adam Israelc887cc22018-02-05 13:52:15 +0100368
369 # Note: The charm needs to exist on disk at the location
370 # specified by charm_path.
tierno0aef0db2018-02-01 19:13:07 +0100371 base_folder = vnfd["_admin"]["storage"]
Adam Israelc887cc22018-02-05 13:52:15 +0100372 charm_path = "{}{}/{}/charms/{}".format(
373 base_folder["path"],
374 base_folder["folder"],
375 base_folder["file"],
376 proxy_charm
377 )
Adam Israel354ead92018-03-18 14:46:23 -0400378
379 self.logger.debug("Passing artifacts path '{}' for {}".format(charm_path, proxy_charm))
Adam Israelc887cc22018-02-05 13:52:15 +0100380 task = asyncio.ensure_future(
Adam Israel354ead92018-03-18 14:46:23 -0400381 self.n2vc.DeployCharms(nsd, vnfd, vnf_index, charm_path, self.n2vc_callback, db_nsr, vnf_index, None)
Adam Israelc887cc22018-02-05 13:52:15 +0100382 )
Adam Israel354ead92018-03-18 14:46:23 -0400383 task.add_done_callback(functools.partial(self.n2vc_callback, None, None, None, None, None, db_nsr, vnf_index))
384
385 # task.add_done_callback(functools.partial(self.vca_deploy_callback, db_nsr, vnf_index, None))
tiernoae501922018-02-06 23:17:16 +0100386 self.lcm_tasks[nsr_id][order_id]["create_charm:" + vnf_index] = task
387 db_nsr["config-status"] = "configuring" if vnfd_to_config else "configured"
388 db_nsr["detailed-status"] = "Configuring 1/{}".format(vnfd_to_config) if vnfd_to_config else "done"
tiernof3c4dbc2018-02-05 14:53:28 +0100389 db_nsr["operational-status"] = "running"
tiernoae501922018-02-06 23:17:16 +0100390 self.update_nsr_db(nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100391
tiernof3c4dbc2018-02-05 14:53:28 +0100392 self.logger.debug("create_ns task nsr_id={} Exit Ok".format(nsr_id))
tierno0aef0db2018-02-01 19:13:07 +0100393 return nsr_lcm
394
tiernoae501922018-02-06 23:17:16 +0100395 except (ROclient.ROClientException, DbException) as e:
396 self.logger.error(logging_text + "Exit Exception {}".format(e))
397 exc = e
Adam Israelc887cc22018-02-05 13:52:15 +0100398 except Exception as e:
tiernoae501922018-02-06 23:17:16 +0100399 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
400 exc = e
401 finally:
402 if exc and db_nsr:
403 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
404 db_nsr["operational-status"] = "failed"
405 self.update_nsr_db(nsr_id, db_nsr)
Adam Israelc887cc22018-02-05 13:52:15 +0100406
tiernoae501922018-02-06 23:17:16 +0100407 async def delete_ns(self, nsr_id, order_id):
408 logging_text = "Task delete_ns={} ".format(nsr_id)
409 self.logger.debug(logging_text + "Enter")
410 db_nsr = None
411 exc = None
412 step = "Getting nsr from db"
413 try:
414 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
415 nsd = db_nsr["nsd"]
416 nsr_lcm = db_nsr["_admin"]["deploy"]
417
418 db_nsr["operational-status"] = "terminate"
419 db_nsr["config-status"] = "terminate"
420 db_nsr["detailed-status"] = "Deleting charms"
421 self.update_nsr_db(nsr_id, db_nsr)
422
tiernof3c4dbc2018-02-05 14:53:28 +0100423 try:
tiernoae501922018-02-06 23:17:16 +0100424 step = db_nsr["detailed-status"] = "Deleting charms"
425 self.logger.debug(logging_text + step)
426 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
427 if deploy_info and deploy_info.get("appliation"):
Adam Israel354ead92018-03-18 14:46:23 -0400428
tiernoae501922018-02-06 23:17:16 +0100429 task = asyncio.ensure_future(
Adam Israel354ead92018-03-18 14:46:23 -0400430 self.n2vc.RemoveCharms(nsd, vnfd, vnf_index, self.n2vc_callback, db_nsr, vnf_index, None)
tiernoae501922018-02-06 23:17:16 +0100431 )
432 self.lcm_tasks[nsr_id][order_id]["delete_charm:" + vnf_index] = task
433 except Exception as e:
434 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
435 # remove from RO
436
437 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
438 # Delete ns
439 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
440 if RO_nsr_id:
441 try:
442 step = db_nsr["detailed-status"] = "Deleting ns at RO"
443 self.logger.debug(logging_text + step)
444 desc = await RO.delete("ns", RO_nsr_id)
tiernof3c4dbc2018-02-05 14:53:28 +0100445 nsr_lcm["RO"]["nsr_id"] = None
446 nsr_lcm["RO"]["nsr_status"] = "DELETED"
tiernoae501922018-02-06 23:17:16 +0100447 except ROclient.ROClientException as e:
448 if e.http_code == 404: # not found
449 nsr_lcm["RO"]["nsr_id"] = None
450 nsr_lcm["RO"]["nsr_status"] = "DELETED"
451 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
452 elif e.http_code == 409: #conflict
453 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
454 else:
455 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
456 self.update_nsr_db(nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100457
tiernoae501922018-02-06 23:17:16 +0100458 # Delete nsd
459 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
460 if RO_nsd_id:
461 try:
462 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
463 desc = await RO.delete("nsd", RO_nsd_id)
464 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
tiernof3c4dbc2018-02-05 14:53:28 +0100465 nsr_lcm["RO"]["nsd_id"] = None
tiernoae501922018-02-06 23:17:16 +0100466 except ROclient.ROClientException as e:
467 if e.http_code == 404: # not found
468 nsr_lcm["RO"]["nsd_id"] = None
469 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
470 elif e.http_code == 409: #conflict
471 self.logger.debug(logging_text + "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
472 else:
473 self.logger.error(logging_text + "RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
474 self.update_nsr_db(nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100475
tiernoae501922018-02-06 23:17:16 +0100476 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
477 if not RO_vnfd_id:
478 continue
479 try:
480 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
481 desc = await RO.delete("vnfd", RO_vnfd_id)
482 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
tierno0aef0db2018-02-01 19:13:07 +0100483 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
tiernoae501922018-02-06 23:17:16 +0100484 except ROclient.ROClientException as e:
485 if e.http_code == 404: # not found
486 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
487 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
488 elif e.http_code == 409: #conflict
489 self.logger.debug(logging_text + "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
490 else:
491 self.logger.error(logging_text + "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
492 self.update_nsr_db(nsr_id, db_nsr)
tierno0aef0db2018-02-01 19:13:07 +0100493
tiernoae501922018-02-06 23:17:16 +0100494 # TODO delete from database or mark as deleted???
495 db_nsr["operational-status"] = "terminated"
496 self.db.del_one("nsrs", {"_id": nsr_id})
497 self.logger.debug(logging_text + "Exit")
498
499 except (ROclient.ROClientException, DbException) as e:
500 self.logger.error(logging_text + "Exit Exception {}".format(e))
501 exc = e
502 except Exception as e:
503 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
504 exc = e
505 finally:
506 if exc and db_nsr:
507 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
508 db_nsr["operational-status"] = "failed"
509 self.update_nsr_db(nsr_id, db_nsr)
tiernof3c4dbc2018-02-05 14:53:28 +0100510
tierno0aef0db2018-02-01 19:13:07 +0100511 async def test(self, param=None):
512 self.logger.debug("Starting/Ending test task: {}".format(param))
513
tierno0aef0db2018-02-01 19:13:07 +0100514 def cancel_tasks(self, nsr_id):
515 """
516 Cancel all active tasks of a concrete nsr identified for nsr_id
517 :param nsr_id: nsr identity
518 :return: None, or raises an exception if not possible
519 """
520 if not self.lcm_tasks.get(nsr_id):
521 return
522 for order_id, tasks_set in self.lcm_tasks[nsr_id].items():
523 for task_name, task in tasks_set.items():
524 result = task.cancel()
525 if result:
526 self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name))
527 self.lcm_tasks[nsr_id] = {}
528
tierno0aef0db2018-02-01 19:13:07 +0100529 async def read_kafka(self):
530 self.logger.debug("kafka task Enter")
531 order_id = 1
532 # future = asyncio.Future()
533
534 while True:
tiernof3c4dbc2018-02-05 14:53:28 +0100535 command, params = await self.msg.aioread("ns", self.loop)
tierno0aef0db2018-02-01 19:13:07 +0100536 order_id += 1
537 if command == "exit":
538 print("Bye!")
539 break
540 elif command.startswith("#"):
541 continue
542 elif command == "echo":
tiernoae501922018-02-06 23:17:16 +0100543 # just for test
tierno0aef0db2018-02-01 19:13:07 +0100544 print(params)
545 elif command == "test":
546 asyncio.Task(self.test(params), loop=self.loop)
547 elif command == "break":
548 print("put a break in this line of code")
549 elif command == "create":
550 nsr_id = params.strip()
551 self.logger.debug("Deploying NS {}".format(nsr_id))
tiernoae501922018-02-06 23:17:16 +0100552 task = asyncio.ensure_future(self.create_ns(nsr_id, order_id))
tierno0aef0db2018-02-01 19:13:07 +0100553 if nsr_id not in self.lcm_tasks:
554 self.lcm_tasks[nsr_id] = {}
555 self.lcm_tasks[nsr_id][order_id] = {"create_ns": task}
556 elif command == "delete":
557 nsr_id = params.strip()
558 self.logger.debug("Deleting NS {}".format(nsr_id))
559 self.cancel_tasks(nsr_id)
tiernoae501922018-02-06 23:17:16 +0100560 task = asyncio.ensure_future(self.delete_ns(nsr_id, order_id))
tierno0aef0db2018-02-01 19:13:07 +0100561 if nsr_id not in self.lcm_tasks:
562 self.lcm_tasks[nsr_id] = {}
563 self.lcm_tasks[nsr_id][order_id] = {"delete_ns": task}
564 elif command == "show":
tiernoae501922018-02-06 23:17:16 +0100565 # just for test
tierno0aef0db2018-02-01 19:13:07 +0100566 nsr_id = params.strip()
tiernoae501922018-02-06 23:17:16 +0100567 try:
568 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
569 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
570 "{}\n deploy: {}\n tasks: {}".format(
571 nsr_id, db_nsr["operational-status"],
572 db_nsr["config-status"], db_nsr["detailed-status"],
573 db_nsr["_admin"]["deploy"], self.lcm_tasks.get(nsr_id)))
574 except Exception as e:
575 print("nsr {} not found: {}".format(nsr_id, e))
tierno0aef0db2018-02-01 19:13:07 +0100576 else:
tiernoae501922018-02-06 23:17:16 +0100577 self.logger.critical("unknown command '{}'".format(command))
tierno0aef0db2018-02-01 19:13:07 +0100578 self.logger.debug("kafka task Exit")
579
580
581 def start(self):
582 self.loop = asyncio.get_event_loop()
583 self.loop.run_until_complete(self.read_kafka())
584 self.loop.close()
585 self.loop = None
586
587
tiernof3c4dbc2018-02-05 14:53:28 +0100588 def read_config_file(self, config_file):
589 # TODO make a [ini] + yaml inside parser
590 # the configparser library is not suitable, because it does not admit comments at the end of line,
591 # and not parse integer or boolean
592 try:
593 with open(config_file) as f:
594 conf = yaml.load(f)
595 for k, v in environ.items():
596 if not k.startswith("OSMLCM_"):
597 continue
598 k_items = k.lower().split("_")
599 c = conf
600 try:
601 for k_item in k_items[1:-1]:
602 if k_item in ("ro", "vca"):
603 # put in capital letter
604 k_item = k_item.upper()
605 c = c[k_item]
606 if k_items[-1] == "port":
607 c[k_items[-1]] = int(v)
608 else:
609 c[k_items[-1]] = v
610 except Exception as e:
611 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
612
613 return conf
614 except Exception as e:
615 self.logger.critical("At config file '{}': {}".format(config_file, e))
tierno0aef0db2018-02-01 19:13:07 +0100616
617
tierno0aef0db2018-02-01 19:13:07 +0100618if __name__ == '__main__':
619
620 config_file = "lcm.cfg"
tiernof3c4dbc2018-02-05 14:53:28 +0100621 lcm = Lcm(config_file)
tierno0aef0db2018-02-01 19:13:07 +0100622
tierno0aef0db2018-02-01 19:13:07 +0100623 lcm.start()