WIP - Fix N2VC integration issues
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 import logging
13 import functools
14 from dbbase import DbException
15 from fsbase import FsException
16 from msgbase import MsgException
17 from os import environ
18 # from vca import DeployApplication, RemoveApplication
19 from n2vc.vnf import N2VC
20 import os.path
21 import time
22
23 from copy import deepcopy
24 from http import HTTPStatus
25
26 class LcmException(Exception):
27 pass
28
29
30 class Lcm:
31
32 def __init__(self, config_file):
33 """
34 Init, Connect to database, filesystem storage, and messaging
35 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
36 :return: None
37 """
38 # contains created tasks/futures to be able to cancel
39 self.lcm_tasks = {}
40 # logging
41 self.logger = logging.getLogger('lcm')
42 # load configuration
43 config = self.read_config_file(config_file)
44 self.config = config
45 self.ro_config={
46 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
47 "tenant": config.get("tenant", "osm"),
48 "logger_name": "lcm.ROclient",
49 "loglevel": "ERROR",
50 }
51
52 self.vca = config["VCA"] # TODO VCA
53 self.loop = None
54
55 # logging
56 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
57 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
58 config["database"]["logger_name"] = "lcm.db"
59 config["storage"]["logger_name"] = "lcm.fs"
60 config["message"]["logger_name"] = "lcm.msg"
61 if "logfile" in config["global"]:
62 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
63 maxBytes=100e6, backupCount=9, delay=0)
64 file_handler.setFormatter(log_formatter_simple)
65 self.logger.addHandler(file_handler)
66 else:
67 str_handler = logging.StreamHandler()
68 str_handler.setFormatter(log_formatter_simple)
69 self.logger.addHandler(str_handler)
70
71 if config["global"].get("loglevel"):
72 self.logger.setLevel(config["global"]["loglevel"])
73
74 # logging other modules
75 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
76 config[k1]["logger_name"] = logname
77 logger_module = logging.getLogger(logname)
78 if "logfile" in config[k1]:
79 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
80 maxBytes=100e6, backupCount=9, delay=0)
81 file_handler.setFormatter(log_formatter_simple)
82 logger_module.addHandler(file_handler)
83 if "loglevel" in config[k1]:
84 logger_module.setLevel(config[k1]["loglevel"])
85
86 self.n2vc = N2VC(
87 log=self.logger,
88 server=config['VCA']['host'],
89 port=config['VCA']['port'],
90 user=config['VCA']['user'],
91 secret=config['VCA']['secret'],
92 # TODO: This should point to the base folder where charms are stored,
93 # if there is a common one (like object storage). Otherwise, leave
94 # it unset and pass it via DeployCharms
95 # artifacts=config['VCA'][''],
96 artifacts=None,
97 )
98
99 try:
100 if config["database"]["driver"] == "mongo":
101 self.db = dbmongo.DbMongo()
102 self.db.db_connect(config["database"])
103 elif config["database"]["driver"] == "memory":
104 self.db = dbmemory.DbMemory()
105 self.db.db_connect(config["database"])
106 else:
107 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
108 config["database"]["driver"]))
109
110 if config["storage"]["driver"] == "local":
111 self.fs = fslocal.FsLocal()
112 self.fs.fs_connect(config["storage"])
113 else:
114 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
115 config["storage"]["driver"]))
116
117 if config["message"]["driver"] == "local":
118 self.msg = msglocal.MsgLocal()
119 self.msg.connect(config["message"])
120 elif config["message"]["driver"] == "kafka":
121 self.msg = msgkafka.MsgKafka()
122 self.msg.connect(config["message"])
123 else:
124 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
125 config["storage"]["driver"]))
126 except (DbException, FsException, MsgException) as e:
127 self.logger.critical(str(e), exc_info=True)
128 raise LcmException(str(e))
129
130 def update_nsr_db(self, nsr_id, nsr_desc):
131 try:
132 self.db.replace("nsrs", nsr_id, nsr_desc)
133 except DbException as e:
134 self.logger.error("Updating nsr_id={}: {}".format(nsr_id, e))
135
136 def n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
137 """Update the lcm database with the status of the charm.
138
139 Updates the VNF's operational status with the state of the charm:
140 - blocked: The unit needs manual intervention
141 - maintenance: The unit is actively deploying/configuring
142 - waiting: The unit is waiting for another charm to be ready
143 - active: The unit is deployed, configured, and ready
144 - error: The charm has failed and needs attention.
145 - terminated: The charm has been destroyed
146
147 Updates the network service's config-status to reflect the state of all
148 charms.
149 """
150
151 if not workload_status and not task:
152 self.logger.error("Task create_ns={} n2vc_callback Enter with bad parameters")
153 return
154
155 if not workload_status:
156 self.logger.error("Task create_ns={} n2vc_callback Enter with bad parameters, no workload_status")
157 return
158
159 try:
160 self.logger.debug("[n2vc_callback] Workload status \"{}\"".format(workload_status))
161 nsr_id = db_nsr["_id"]
162 nsr_lcm = db_nsr["_admin"]["deploy"]
163 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = workload_status
164
165 if task:
166 if task.cancelled():
167 return
168
169 if task.done():
170 exc = task.exception()
171 if exc:
172 nsr_lcm = db_nsr["_admin"]["deploy"]
173 nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "failed"
174 db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_member_index, exc)
175 db_nsr["config-status"] = "failed"
176 self.update_nsr_db(nsr_id, db_nsr)
177 else:
178 vca_status = nsr_lcm["VCA"][vnf_member_index]['operational-status']
179
180 units = len(nsr_lcm["VCA"])
181 active = 0
182 statusmap = {}
183 for vnf_index in nsr_lcm["VCA"]:
184 if vca_status not in statusmap:
185 # Initialize it
186 statusmap[vca_status] = 0
187
188 statusmap[vca_status] += 1
189
190 if vca_status == "active":
191 active += 1
192
193 cs = ""
194 for status in statusmap:
195 cs += "{} ({}) ".format(status, statusmap[status])
196 db_nsr["config-status"] = cs
197 self.update_nsr_db(nsr_id, db_nsr)
198
199 except Exception as e:
200 # self.logger.critical("Task create_ns={} n2vc_callback Exception {}".format(nsr_id, e), exc_info=True)
201 self.logger.critical("Task create_ns n2vc_callback Exception {}".format(e), exc_info=True)
202 pass
203
204 def vca_deploy_callback(self, db_nsr, vnf_index, status, task):
205 # TODO study using this callback when VCA.DeployApplication success from VCAMonitor
206 # By the moment this callback is used only to capture exception conditions from VCA DeployApplication
207 nsr_id = db_nsr["_id"]
208 self.logger.debug("Task create_ns={} vca_deploy_callback Enter".format(nsr_id))
209 try:
210 if task.cancelled():
211 return
212 if task.done():
213 exc = task.exception()
214 if exc:
215 nsr_lcm = db_nsr["_admin"]["deploy"]
216 nsr_lcm["VCA"][vnf_index]['operational-status'] = "failed"
217 db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_index, exc)
218 db_nsr["config-status"] = "failed"
219 self.update_nsr_db(nsr_id, db_nsr)
220 else:
221 # TODO may be used to be called when VCA monitor status changes
222 pass
223 # except DbException as e:
224 # self.logger.error("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id, e))
225 except Exception as e:
226 self.logger.critical("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id, e), exc_info=True)
227
228 async def create_ns(self, nsr_id, order_id):
229 logging_text = "Task create_ns={} ".format(nsr_id)
230 self.logger.debug(logging_text + "Enter")
231 # get all needed from database
232 db_nsr = None
233 exc = None
234 step = "Getting nsr from db"
235 try:
236 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
237 nsd = db_nsr["nsd"]
238 needed_vnfd = {}
239 for c_vnf in nsd["constituent-vnfd"]:
240 vnfd_id = c_vnf["vnfd-id-ref"]
241 if vnfd_id not in needed_vnfd:
242 step = "Getting vnfd={} from db".format(vnfd_id)
243 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
244
245 nsr_lcm = {
246 "id": nsr_id,
247 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
248 "nsr_ip": {},
249 "VCA": {},
250 }
251 db_nsr["_admin"]["deploy"] = nsr_lcm
252 db_nsr["detailed-status"] = "creating"
253 db_nsr["operational-status"] = "init"
254
255 deloyment_timeout = 120
256
257 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
258
259 # get vnfds, instantiate at RO
260 for vnfd_id, vnfd in needed_vnfd.items():
261 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
262 self.logger.debug(logging_text + step)
263 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
264
265 # look if present
266 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
267 if vnfd_list:
268 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
269 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
270 vnfd_id, vnfd_list[0]["uuid"]))
271 else:
272 vnfd_RO = deepcopy(vnfd)
273 vnfd_RO.pop("_id", None)
274 vnfd_RO.pop("_admin", None)
275 vnfd_RO["id"] = vnfd_id_RO
276 desc = await RO.create("vnfd", descriptor=vnfd_RO)
277 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
278 self.update_nsr_db(nsr_id, db_nsr)
279
280 # create nsd at RO
281 nsd_id = nsd["id"]
282 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
283 self.logger.debug(logging_text + step)
284
285 nsd_id_RO = nsd_id + "." + nsd_id[:200]
286 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
287 if nsd_list:
288 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
289 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
290 nsd_id, nsd_list[0]["uuid"]))
291 else:
292 nsd_RO = deepcopy(nsd)
293 nsd_RO["id"] = nsd_id_RO
294 nsd_RO.pop("_id", None)
295 nsd_RO.pop("_admin", None)
296 for c_vnf in nsd_RO["constituent-vnfd"]:
297 vnfd_id = c_vnf["vnfd-id-ref"]
298 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
299 desc = await RO.create("nsd", descriptor=nsd_RO)
300 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
301 self.update_nsr_db(nsr_id, db_nsr)
302
303 # Crate ns at RO
304 # if present use it unless in error status
305 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
306 if RO_nsr_id:
307 try:
308 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
309 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
310 desc = await RO.show("ns", RO_nsr_id)
311 except ROclient.ROClientException as e:
312 if e.http_code != HTTPStatus.NOT_FOUND:
313 raise
314 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
315 if RO_nsr_id:
316 ns_status, ns_status_info = RO.check_ns_status(desc)
317 nsr_lcm["RO"]["nsr_status"] = ns_status
318 if ns_status == "ERROR":
319 step = db_nsr["detailed-status"] = "Deleting ns at RO"
320 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
321 await RO.delete("ns", RO_nsr_id)
322 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
323
324 if not RO_nsr_id:
325 step = db_nsr["detailed-status"] = "Creating ns at RO"
326 self.logger.debug(logging_text + step)
327
328 desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"],
329 scenario=nsr_lcm["RO"]["nsd_id"])
330 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
331 nsr_lcm["RO"]["nsr_status"] = "BUILD"
332 self.update_nsr_db(nsr_id, db_nsr)
333
334 # wait until NS is ready
335 step = ns_status_detailed = "Waiting ns ready at RO"
336 db_nsr["detailed-status"] = ns_status_detailed
337 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
338 deloyment_timeout = 600
339 while deloyment_timeout > 0:
340 desc = await RO.show("ns", RO_nsr_id)
341 ns_status, ns_status_info = RO.check_ns_status(desc)
342 nsr_lcm["RO"]["nsr_status"] = ns_status
343 if ns_status == "ERROR":
344 raise ROclient.ROClientException(ns_status_info)
345 elif ns_status == "BUILD":
346 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
347 self.update_nsr_db(nsr_id, db_nsr)
348 elif ns_status == "ACTIVE":
349 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
350 break
351 else:
352 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
353
354 await asyncio.sleep(5, loop=self.loop)
355 deloyment_timeout -= 5
356 if deloyment_timeout <= 0:
357 raise ROclient.ROClientException("Timeout waiting ns to be ready")
358 db_nsr["detailed-status"] = "Configuring vnfr"
359 self.update_nsr_db(nsr_id, db_nsr)
360
361 vnfd_to_config = 0
362 step = "Looking for needed vnfd to configure"
363 self.logger.debug(logging_text + step)
364 for c_vnf in nsd["constituent-vnfd"]:
365 vnfd_id = c_vnf["vnfd-id-ref"]
366 vnf_index = str(c_vnf["member-vnf-index"])
367 vnfd = needed_vnfd[vnfd_id]
368 if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"):
369 vnfd_to_config += 1
370 proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"]
371
372 # Note: The charm needs to exist on disk at the location
373 # specified by charm_path.
374 base_folder = vnfd["_admin"]["storage"]
375 charm_path = "{}{}/{}/charms/{}".format(
376 base_folder["path"],
377 base_folder["folder"],
378 base_folder["file"],
379 proxy_charm
380 )
381
382 # Setup the runtime parameters for this VNF
383 params = {
384 'rw_mgmt_ip': nsr_lcm['nsr_ip'][vnf_index],
385 }
386
387 # ns_name will be ignored in the current version of N2VC
388 # but will be implemented for the next point release.
389 ns_name = 'default'
390 application_name = self.n2vc.FormatApplicationName(
391 'default',
392 vnfd['name'],
393 vnf_index,
394 )
395
396 nsr_lcm["VCA"][vnf_index] = {
397 "model": ns_name,
398 "application": application_name,
399 "operational-status": "init",
400 "vnfd_id": vnfd_id,
401 }
402
403 self.logger.debug("Passing artifacts path '{}' for {}".format(charm_path, proxy_charm))
404 task = asyncio.ensure_future(
405 self.n2vc.DeployCharms(
406 ns_name, # The network service name
407 application_name, # The application name
408 vnfd, # The vnf descriptor
409 charm_path, # Path to charm
410 params, # Runtime params, like mgmt ip
411 {}, # for native charms only
412 self.n2vc_callback, # Callback for status changes
413 db_nsr, # Callback parameter
414 vnf_index, # Callback parameter
415 None, # Callback parameter (task)
416 )
417 )
418 task.add_done_callback(functools.partial(self.n2vc_callback, None, None, None, None, db_nsr))
419
420 self.lcm_tasks[nsr_id][order_id]["create_charm:" + vnf_index] = task
421 db_nsr["config-status"] = "configuring" if vnfd_to_config else "configured"
422 db_nsr["detailed-status"] = "Configuring 1/{}".format(vnfd_to_config) if vnfd_to_config else "done"
423 db_nsr["operational-status"] = "running"
424 self.update_nsr_db(nsr_id, db_nsr)
425
426 self.logger.debug("create_ns task nsr_id={} Exit Ok".format(nsr_id))
427 return nsr_lcm
428
429 except (ROclient.ROClientException, DbException) as e:
430 self.logger.error(logging_text + "Exit Exception {}".format(e))
431 exc = e
432 except Exception as e:
433 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
434 exc = e
435 finally:
436 if exc and db_nsr:
437 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
438 db_nsr["operational-status"] = "failed"
439 self.update_nsr_db(nsr_id, db_nsr)
440
441 async def delete_ns(self, nsr_id, order_id):
442 logging_text = "Task delete_ns={} ".format(nsr_id)
443 self.logger.debug(logging_text + "Enter")
444 db_nsr = None
445 exc = None
446 step = "Getting nsr from db"
447 try:
448 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
449 nsd = db_nsr["nsd"]
450 nsr_lcm = db_nsr["_admin"]["deploy"]
451
452 db_nsr["operational-status"] = "terminate"
453 db_nsr["config-status"] = "terminate"
454 db_nsr["detailed-status"] = "Deleting charms"
455 self.update_nsr_db(nsr_id, db_nsr)
456
457 try:
458 step = db_nsr["detailed-status"] = "Deleting charms"
459 self.logger.debug(logging_text + step)
460 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
461 if deploy_info and deploy_info.get("application"):
462 # n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
463
464 # self.n2vc.RemoveCharms(model_name, application_name, self.n2vc_callback, model_name, application_name)
465 task = asyncio.ensure_future(
466 self.n2vc.RemoveCharms(
467 deploy_info['model'],
468 deploy_info['application'],
469 self.n2vc_callback,
470 db_nsr,
471 vnf_index,
472 )
473 )
474
475 self.lcm_tasks[nsr_id][order_id]["delete_charm:" + vnf_index] = task
476 except Exception as e:
477 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
478 # remove from RO
479
480 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
481 # Delete ns
482 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
483 if RO_nsr_id:
484 try:
485 step = db_nsr["detailed-status"] = "Deleting ns at RO"
486 self.logger.debug(logging_text + step)
487 desc = await RO.delete("ns", RO_nsr_id)
488 nsr_lcm["RO"]["nsr_id"] = None
489 nsr_lcm["RO"]["nsr_status"] = "DELETED"
490 except ROclient.ROClientException as e:
491 if e.http_code == 404: # not found
492 nsr_lcm["RO"]["nsr_id"] = None
493 nsr_lcm["RO"]["nsr_status"] = "DELETED"
494 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
495 elif e.http_code == 409: #conflict
496 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
497 else:
498 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
499 self.update_nsr_db(nsr_id, db_nsr)
500
501 # Delete nsd
502 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
503 if RO_nsd_id:
504 try:
505 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
506 desc = await RO.delete("nsd", RO_nsd_id)
507 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
508 nsr_lcm["RO"]["nsd_id"] = None
509 except ROclient.ROClientException as e:
510 if e.http_code == 404: # not found
511 nsr_lcm["RO"]["nsd_id"] = None
512 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
513 elif e.http_code == 409: #conflict
514 self.logger.debug(logging_text + "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
515 else:
516 self.logger.error(logging_text + "RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
517 self.update_nsr_db(nsr_id, db_nsr)
518
519 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
520 if not RO_vnfd_id:
521 continue
522 try:
523 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
524 desc = await RO.delete("vnfd", RO_vnfd_id)
525 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
526 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
527 except ROclient.ROClientException as e:
528 if e.http_code == 404: # not found
529 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
530 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
531 elif e.http_code == 409: #conflict
532 self.logger.debug(logging_text + "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
533 else:
534 self.logger.error(logging_text + "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
535 self.update_nsr_db(nsr_id, db_nsr)
536
537 # TODO delete from database or mark as deleted???
538 db_nsr["operational-status"] = "terminated"
539 self.db.del_one("nsrs", {"_id": nsr_id})
540 self.logger.debug(logging_text + "Exit")
541
542 except (ROclient.ROClientException, DbException) as e:
543 self.logger.error(logging_text + "Exit Exception {}".format(e))
544 exc = e
545 except Exception as e:
546 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
547 exc = e
548 finally:
549 if exc and db_nsr:
550 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
551 db_nsr["operational-status"] = "failed"
552 self.update_nsr_db(nsr_id, db_nsr)
553
554 async def test(self, param=None):
555 self.logger.debug("Starting/Ending test task: {}".format(param))
556
557 def cancel_tasks(self, nsr_id):
558 """
559 Cancel all active tasks of a concrete nsr identified for nsr_id
560 :param nsr_id: nsr identity
561 :return: None, or raises an exception if not possible
562 """
563 if not self.lcm_tasks.get(nsr_id):
564 return
565 for order_id, tasks_set in self.lcm_tasks[nsr_id].items():
566 for task_name, task in tasks_set.items():
567 result = task.cancel()
568 if result:
569 self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name))
570 self.lcm_tasks[nsr_id] = {}
571
572 async def read_kafka(self):
573 self.logger.debug("kafka task Enter")
574 order_id = 1
575 # future = asyncio.Future()
576
577 while True:
578 command, params = await self.msg.aioread("ns", self.loop)
579 order_id += 1
580 if command == "exit":
581 print("Bye!")
582 break
583 elif command.startswith("#"):
584 continue
585 elif command == "echo":
586 # just for test
587 print(params)
588 elif command == "test":
589 asyncio.Task(self.test(params), loop=self.loop)
590 elif command == "break":
591 print("put a break in this line of code")
592 elif command == "create":
593 nsr_id = params.strip()
594 self.logger.debug("Deploying NS {}".format(nsr_id))
595 task = asyncio.ensure_future(self.create_ns(nsr_id, order_id))
596 if nsr_id not in self.lcm_tasks:
597 self.lcm_tasks[nsr_id] = {}
598 self.lcm_tasks[nsr_id][order_id] = {"create_ns": task}
599 elif command == "delete":
600 nsr_id = params.strip()
601 self.logger.debug("Deleting NS {}".format(nsr_id))
602 self.cancel_tasks(nsr_id)
603 task = asyncio.ensure_future(self.delete_ns(nsr_id, order_id))
604 if nsr_id not in self.lcm_tasks:
605 self.lcm_tasks[nsr_id] = {}
606 self.lcm_tasks[nsr_id][order_id] = {"delete_ns": task}
607 elif command == "show":
608 # just for test
609 nsr_id = params.strip()
610 try:
611 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
612 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
613 "{}\n deploy: {}\n tasks: {}".format(
614 nsr_id, db_nsr["operational-status"],
615 db_nsr["config-status"], db_nsr["detailed-status"],
616 db_nsr["_admin"]["deploy"], self.lcm_tasks.get(nsr_id)))
617 except Exception as e:
618 print("nsr {} not found: {}".format(nsr_id, e))
619 else:
620 self.logger.critical("unknown command '{}'".format(command))
621 self.logger.debug("kafka task Exit")
622
623
624 def start(self):
625 self.loop = asyncio.get_event_loop()
626 self.loop.run_until_complete(self.read_kafka())
627 self.loop.close()
628 self.loop = None
629
630
631 def read_config_file(self, config_file):
632 # TODO make a [ini] + yaml inside parser
633 # the configparser library is not suitable, because it does not admit comments at the end of line,
634 # and not parse integer or boolean
635 try:
636 with open(config_file) as f:
637 conf = yaml.load(f)
638 for k, v in environ.items():
639 if not k.startswith("OSMLCM_"):
640 continue
641 k_items = k.lower().split("_")
642 c = conf
643 try:
644 for k_item in k_items[1:-1]:
645 if k_item in ("ro", "vca"):
646 # put in capital letter
647 k_item = k_item.upper()
648 c = c[k_item]
649 if k_items[-1] == "port":
650 c[k_items[-1]] = int(v)
651 else:
652 c[k_items[-1]] = v
653 except Exception as e:
654 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
655
656 return conf
657 except Exception as e:
658 self.logger.critical("At config file '{}': {}".format(config_file, e))
659
660
661 if __name__ == '__main__':
662
663 config_file = "lcm.cfg"
664 lcm = Lcm(config_file)
665
666 lcm.start()