b39efe80692d4948747caeb31f1d508f17580cac
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 import logging
13 import functools
14 from dbbase import DbException
15 from fsbase import FsException
16 from msgbase import MsgException
17 from os import environ
18 # from vca import DeployApplication, RemoveApplication
19 from n2vc.vnf import N2VC
20 import os.path
21 import time
22
23 from copy import deepcopy
24 from http import HTTPStatus
25
26 class LcmException(Exception):
27 pass
28
29
30 class Lcm:
31
32 def __init__(self, config_file):
33 """
34 Init, Connect to database, filesystem storage, and messaging
35 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
36 :return: None
37 """
38 # contains created tasks/futures to be able to cancel
39 self.lcm_tasks = {}
40 # logging
41 self.logger = logging.getLogger('lcm')
42 # load configuration
43 config = self.read_config_file(config_file)
44 self.config = config
45 self.ro_config={
46 "endpoint_url": "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]),
47 "tenant": config.get("tenant", "osm"),
48 "logger_name": "lcm.ROclient",
49 "loglevel": "ERROR",
50 }
51
52 self.vca = config["VCA"] # TODO VCA
53 self.loop = None
54
55 # logging
56 log_format_simple = "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
57 log_formatter_simple = logging.Formatter(log_format_simple, datefmt='%Y-%m-%dT%H:%M:%S')
58 config["database"]["logger_name"] = "lcm.db"
59 config["storage"]["logger_name"] = "lcm.fs"
60 config["message"]["logger_name"] = "lcm.msg"
61 if "logfile" in config["global"]:
62 file_handler = logging.handlers.RotatingFileHandler(config["global"]["logfile"],
63 maxBytes=100e6, backupCount=9, delay=0)
64 file_handler.setFormatter(log_formatter_simple)
65 self.logger.addHandler(file_handler)
66 else:
67 str_handler = logging.StreamHandler()
68 str_handler.setFormatter(log_formatter_simple)
69 self.logger.addHandler(str_handler)
70
71 if config["global"].get("loglevel"):
72 self.logger.setLevel(config["global"]["loglevel"])
73
74 # logging other modules
75 for k1, logname in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
76 config[k1]["logger_name"] = logname
77 logger_module = logging.getLogger(logname)
78 if "logfile" in config[k1]:
79 file_handler = logging.handlers.RotatingFileHandler(config[k1]["logfile"],
80 maxBytes=100e6, backupCount=9, delay=0)
81 file_handler.setFormatter(log_formatter_simple)
82 logger_module.addHandler(file_handler)
83 if "loglevel" in config[k1]:
84 logger_module.setLevel(config[k1]["loglevel"])
85
86 self.n2vc = N2VC(
87 log=self.logger,
88 server=config['VCA']['host'],
89 port=config['VCA']['port'],
90 user=config['VCA']['user'],
91 secret=config['VCA']['secret'],
92 # TODO: This should point to the base folder where charms are stored,
93 # if there is a common one (like object storage). Otherwise, leave
94 # it unset and pass it via DeployCharms
95 # artifacts=config['VCA'][''],
96 artifacts=None,
97 )
98
99 try:
100 if config["database"]["driver"] == "mongo":
101 self.db = dbmongo.DbMongo()
102 self.db.db_connect(config["database"])
103 elif config["database"]["driver"] == "memory":
104 self.db = dbmemory.DbMemory()
105 self.db.db_connect(config["database"])
106 else:
107 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
108 config["database"]["driver"]))
109
110 if config["storage"]["driver"] == "local":
111 self.fs = fslocal.FsLocal()
112 self.fs.fs_connect(config["storage"])
113 else:
114 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
115 config["storage"]["driver"]))
116
117 if config["message"]["driver"] == "local":
118 self.msg = msglocal.MsgLocal()
119 self.msg.connect(config["message"])
120 elif config["message"]["driver"] == "kafka":
121 self.msg = msgkafka.MsgKafka()
122 self.msg.connect(config["message"])
123 else:
124 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
125 config["storage"]["driver"]))
126 except (DbException, FsException, MsgException) as e:
127 self.logger.critical(str(e), exc_info=True)
128 raise LcmException(str(e))
129
130 def update_nsr_db(self, nsr_id, nsr_desc):
131 try:
132 self.db.replace("nsrs", nsr_id, nsr_desc)
133 except DbException as e:
134 self.logger.error("Updating nsr_id={}: {}".format(nsr_id, e))
135
136 def n2vc_callback(self, nsd, vnfd, vnf_member_index, workload_status, *args):
137 """Update the lcm database with the status of the charm.
138
139 Updates the VNF's operational status with the state of the charm:
140 - blocked: The unit needs manual intervention
141 - maintenance: The unit is actively deploying/configuring
142 - waiting: The unit is waiting for another charm to be ready
143 - active: The unit is deployed, configured, and ready
144 - error: The charm has failed and needs attention.
145 - terminated: The charm has been destroyed
146
147 Updates the network service's config-status to reflect the state of all
148 charms.
149 """
150 if workload_status and len(args) == 3:
151 # self.logger.debug("[n2vc_callback] Workload status \"{}\"".format(workload_status))
152 try:
153 (db_nsr, vnf_index, task) = args
154
155 nsr_id = db_nsr["_id"]
156 nsr_lcm = db_nsr["_admin"]["deploy"]
157 nsr_lcm["VCA"][vnf_index]['operational-status'] = workload_status
158
159 if task:
160 if task.cancelled():
161 return
162
163 if task.done():
164 exc = task.exception()
165 if exc:
166 nsr_lcm = db_nsr["_admin"]["deploy"]
167 nsr_lcm["VCA"][vnf_index]['operational-status'] = "failed"
168 db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_index, exc)
169 db_nsr["config-status"] = "failed"
170 self.update_nsr_db(nsr_id, db_nsr)
171 else:
172 units = len(nsr_lcm["VCA"])
173 active = 0
174 statusmap = {}
175 for vnf_index in nsr_lcm["VCA"]:
176 if 'operational-status' in nsr_lcm["VCA"][vnf_index]:
177
178 if nsr_lcm["VCA"][vnf_index]['operational-status'] not in statusmap:
179 # Initialize it
180 statusmap[nsr_lcm["VCA"][vnf_index]['operational-status']] = 0
181
182 statusmap[nsr_lcm["VCA"][vnf_index]['operational-status']] += 1
183
184 if nsr_lcm["VCA"][vnf_index]['operational-status'] == "active":
185 active += 1
186 else:
187 self.logger.debug("No operational-status")
188
189 cs = ""
190 for status in statusmap:
191 cs += "{} ({}) ".format(status, statusmap[status])
192 db_nsr["config-status"] = cs
193 self.update_nsr_db(nsr_id, db_nsr)
194
195 except Exception as e:
196 # self.logger.critical("Task create_ns={} n2vc_callback Exception {}".format(nsr_id, e), exc_info=True)
197 self.logger.critical("Task create_ns n2vc_callback Exception {}".format(e), exc_info=True)
198 pass
199
200 def vca_deploy_callback(self, db_nsr, vnf_index, status, task):
201 # TODO study using this callback when VCA.DeployApplication success from VCAMonitor
202 # By the moment this callback is used only to capture exception conditions from VCA DeployApplication
203 nsr_id = db_nsr["_id"]
204 self.logger.debug("Task create_ns={} vca_deploy_callback Enter".format(nsr_id))
205 try:
206 if task.cancelled():
207 return
208 if task.done():
209 exc = task.exception()
210 if exc:
211 nsr_lcm = db_nsr["_admin"]["deploy"]
212 nsr_lcm["VCA"][vnf_index]['operational-status'] = "failed"
213 db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_index, exc)
214 db_nsr["config-status"] = "failed"
215 self.update_nsr_db(nsr_id, db_nsr)
216 else:
217 # TODO may be used to be called when VCA monitor status changes
218 pass
219 # except DbException as e:
220 # self.logger.error("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id, e))
221 except Exception as e:
222 self.logger.critical("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id, e), exc_info=True)
223
224 async def create_ns(self, nsr_id, order_id):
225 logging_text = "Task create_ns={} ".format(nsr_id)
226 self.logger.debug(logging_text + "Enter")
227 # get all needed from database
228 db_nsr = None
229 exc = None
230 step = "Getting nsr from db"
231 try:
232 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
233 nsd = db_nsr["nsd"]
234 needed_vnfd = {}
235 for c_vnf in nsd["constituent-vnfd"]:
236 vnfd_id = c_vnf["vnfd-id-ref"]
237 if vnfd_id not in needed_vnfd:
238 step = "Getting vnfd={} from db".format(vnfd_id)
239 needed_vnfd[vnfd_id] = self.db.get_one("vnfds", {"id": vnfd_id})
240
241 nsr_lcm = {
242 "id": nsr_id,
243 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
244 "nsr_ip": {},
245 "VCA": {},
246 }
247 db_nsr["_admin"]["deploy"] = nsr_lcm
248 db_nsr["detailed-status"] = "creating"
249 db_nsr["operational-status"] = "init"
250
251 deloyment_timeout = 120
252
253 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
254
255 # get vnfds, instantiate at RO
256 for vnfd_id, vnfd in needed_vnfd.items():
257 step = db_nsr["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id)
258 self.logger.debug(logging_text + step)
259 vnfd_id_RO = nsr_id + "." + vnfd_id[:200]
260
261 # look if present
262 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id_RO})
263 if vnfd_list:
264 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
265 self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
266 vnfd_id, vnfd_list[0]["uuid"]))
267 else:
268 vnfd_RO = deepcopy(vnfd)
269 vnfd_RO.pop("_id", None)
270 vnfd_RO.pop("_admin", None)
271 vnfd_RO["id"] = vnfd_id_RO
272 desc = await RO.create("vnfd", descriptor=vnfd_RO)
273 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
274 self.update_nsr_db(nsr_id, db_nsr)
275
276 # create nsd at RO
277 nsd_id = nsd["id"]
278 step = db_nsr["detailed-status"] = "Creating nsd={} at RO".format(nsd_id)
279 self.logger.debug(logging_text + step)
280
281 nsd_id_RO = nsd_id + "." + nsd_id[:200]
282 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id_RO})
283 if nsd_list:
284 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
285 self.logger.debug(logging_text + "RO nsd={} exist. Using RO_id={}".format(
286 nsd_id, nsd_list[0]["uuid"]))
287 else:
288 nsd_RO = deepcopy(nsd)
289 nsd_RO["id"] = nsd_id_RO
290 nsd_RO.pop("_id", None)
291 nsd_RO.pop("_admin", None)
292 for c_vnf in nsd_RO["constituent-vnfd"]:
293 vnfd_id = c_vnf["vnfd-id-ref"]
294 c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
295 desc = await RO.create("nsd", descriptor=nsd_RO)
296 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
297 self.update_nsr_db(nsr_id, db_nsr)
298
299 # Crate ns at RO
300 # if present use it unless in error status
301 RO_nsr_id = nsr_lcm["RO"].get("nsr_id")
302 if RO_nsr_id:
303 try:
304 step = db_nsr["detailed-status"] = "Looking for existing ns at RO"
305 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
306 desc = await RO.show("ns", RO_nsr_id)
307 except ROclient.ROClientException as e:
308 if e.http_code != HTTPStatus.NOT_FOUND:
309 raise
310 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
311 if RO_nsr_id:
312 ns_status, ns_status_info = RO.check_ns_status(desc)
313 nsr_lcm["RO"]["nsr_status"] = ns_status
314 if ns_status == "ERROR":
315 step = db_nsr["detailed-status"] = "Deleting ns at RO"
316 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
317 await RO.delete("ns", RO_nsr_id)
318 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = None
319
320 if not RO_nsr_id:
321 step = db_nsr["detailed-status"] = "Creating ns at RO"
322 self.logger.debug(logging_text + step)
323
324 desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"],
325 scenario=nsr_lcm["RO"]["nsd_id"])
326 RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
327 nsr_lcm["RO"]["nsr_status"] = "BUILD"
328 self.update_nsr_db(nsr_id, db_nsr)
329
330 # wait until NS is ready
331 step = ns_status_detailed = "Waiting ns ready at RO"
332 db_nsr["detailed-status"] = ns_status_detailed
333 self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
334 deloyment_timeout = 600
335 while deloyment_timeout > 0:
336 desc = await RO.show("ns", RO_nsr_id)
337 ns_status, ns_status_info = RO.check_ns_status(desc)
338 nsr_lcm["RO"]["nsr_status"] = ns_status
339 if ns_status == "ERROR":
340 raise ROclient.ROClientException(ns_status_info)
341 elif ns_status == "BUILD":
342 db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
343 self.update_nsr_db(nsr_id, db_nsr)
344 elif ns_status == "ACTIVE":
345 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
346 break
347 else:
348 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
349
350 await asyncio.sleep(5, loop=self.loop)
351 deloyment_timeout -= 5
352 if deloyment_timeout <= 0:
353 raise ROclient.ROClientException("Timeout waiting ns to be ready")
354 db_nsr["detailed-status"] = "Configuring vnfr"
355 self.update_nsr_db(nsr_id, db_nsr)
356
357 vnfd_to_config = 0
358 step = "Looking for needed vnfd to configure"
359 self.logger.debug(logging_text + step)
360 for c_vnf in nsd["constituent-vnfd"]:
361 vnfd_id = c_vnf["vnfd-id-ref"]
362 vnf_index = str(c_vnf["member-vnf-index"])
363 vnfd = needed_vnfd[vnfd_id]
364 if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"):
365 nsr_lcm["VCA"][vnf_index] = {}
366 vnfd_to_config += 1
367 proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"]
368
369 # Note: The charm needs to exist on disk at the location
370 # specified by charm_path.
371 base_folder = vnfd["_admin"]["storage"]
372 charm_path = "{}{}/{}/charms/{}".format(
373 base_folder["path"],
374 base_folder["folder"],
375 base_folder["file"],
376 proxy_charm
377 )
378
379 self.logger.debug("Passing artifacts path '{}' for {}".format(charm_path, proxy_charm))
380 task = asyncio.ensure_future(
381 self.n2vc.DeployCharms(nsd, vnfd, vnf_index, charm_path, self.n2vc_callback, db_nsr, vnf_index, None)
382 )
383 task.add_done_callback(functools.partial(self.n2vc_callback, None, None, None, None, None, db_nsr, vnf_index))
384
385 # task.add_done_callback(functools.partial(self.vca_deploy_callback, db_nsr, vnf_index, None))
386 self.lcm_tasks[nsr_id][order_id]["create_charm:" + vnf_index] = task
387 db_nsr["config-status"] = "configuring" if vnfd_to_config else "configured"
388 db_nsr["detailed-status"] = "Configuring 1/{}".format(vnfd_to_config) if vnfd_to_config else "done"
389 db_nsr["operational-status"] = "running"
390 self.update_nsr_db(nsr_id, db_nsr)
391
392 self.logger.debug("create_ns task nsr_id={} Exit Ok".format(nsr_id))
393 return nsr_lcm
394
395 except (ROclient.ROClientException, DbException) as e:
396 self.logger.error(logging_text + "Exit Exception {}".format(e))
397 exc = e
398 except Exception as e:
399 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
400 exc = e
401 finally:
402 if exc and db_nsr:
403 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
404 db_nsr["operational-status"] = "failed"
405 self.update_nsr_db(nsr_id, db_nsr)
406
407 async def delete_ns(self, nsr_id, order_id):
408 logging_text = "Task delete_ns={} ".format(nsr_id)
409 self.logger.debug(logging_text + "Enter")
410 db_nsr = None
411 exc = None
412 step = "Getting nsr from db"
413 try:
414 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
415 nsd = db_nsr["nsd"]
416 nsr_lcm = db_nsr["_admin"]["deploy"]
417
418 db_nsr["operational-status"] = "terminate"
419 db_nsr["config-status"] = "terminate"
420 db_nsr["detailed-status"] = "Deleting charms"
421 self.update_nsr_db(nsr_id, db_nsr)
422
423 try:
424 step = db_nsr["detailed-status"] = "Deleting charms"
425 self.logger.debug(logging_text + step)
426 for vnf_index, deploy_info in nsr_lcm["VCA"].items():
427 if deploy_info and deploy_info.get("appliation"):
428
429 task = asyncio.ensure_future(
430 self.n2vc.RemoveCharms(nsd, vnfd, vnf_index, self.n2vc_callback, db_nsr, vnf_index, None)
431 )
432 self.lcm_tasks[nsr_id][order_id]["delete_charm:" + vnf_index] = task
433 except Exception as e:
434 self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
435 # remove from RO
436
437 RO = ROclient.ROClient(self.loop, datacenter=db_nsr["datacenter"], **self.ro_config)
438 # Delete ns
439 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
440 if RO_nsr_id:
441 try:
442 step = db_nsr["detailed-status"] = "Deleting ns at RO"
443 self.logger.debug(logging_text + step)
444 desc = await RO.delete("ns", RO_nsr_id)
445 nsr_lcm["RO"]["nsr_id"] = None
446 nsr_lcm["RO"]["nsr_status"] = "DELETED"
447 except ROclient.ROClientException as e:
448 if e.http_code == 404: # not found
449 nsr_lcm["RO"]["nsr_id"] = None
450 nsr_lcm["RO"]["nsr_status"] = "DELETED"
451 self.logger.debug(logging_text + "RO_ns_id={} already deleted".format(RO_nsr_id))
452 elif e.http_code == 409: #conflict
453 self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
454 else:
455 self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
456 self.update_nsr_db(nsr_id, db_nsr)
457
458 # Delete nsd
459 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
460 if RO_nsd_id:
461 try:
462 step = db_nsr["detailed-status"] = "Deleting nsd at RO"
463 desc = await RO.delete("nsd", RO_nsd_id)
464 self.logger.debug(logging_text + "RO_nsd_id={} deleted".format(RO_nsd_id))
465 nsr_lcm["RO"]["nsd_id"] = None
466 except ROclient.ROClientException as e:
467 if e.http_code == 404: # not found
468 nsr_lcm["RO"]["nsd_id"] = None
469 self.logger.debug(logging_text + "RO_nsd_id={} already deleted".format(RO_nsd_id))
470 elif e.http_code == 409: #conflict
471 self.logger.debug(logging_text + "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
472 else:
473 self.logger.error(logging_text + "RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
474 self.update_nsr_db(nsr_id, db_nsr)
475
476 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
477 if not RO_vnfd_id:
478 continue
479 try:
480 step = db_nsr["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id)
481 desc = await RO.delete("vnfd", RO_vnfd_id)
482 self.logger.debug(logging_text + "RO_vnfd_id={} deleted".format(RO_vnfd_id))
483 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
484 except ROclient.ROClientException as e:
485 if e.http_code == 404: # not found
486 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
487 self.logger.debug(logging_text + "RO_vnfd_id={} already deleted ".format(RO_vnfd_id))
488 elif e.http_code == 409: #conflict
489 self.logger.debug(logging_text + "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
490 else:
491 self.logger.error(logging_text + "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
492 self.update_nsr_db(nsr_id, db_nsr)
493
494 # TODO delete from database or mark as deleted???
495 db_nsr["operational-status"] = "terminated"
496 self.db.del_one("nsrs", {"_id": nsr_id})
497 self.logger.debug(logging_text + "Exit")
498
499 except (ROclient.ROClientException, DbException) as e:
500 self.logger.error(logging_text + "Exit Exception {}".format(e))
501 exc = e
502 except Exception as e:
503 self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
504 exc = e
505 finally:
506 if exc and db_nsr:
507 db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
508 db_nsr["operational-status"] = "failed"
509 self.update_nsr_db(nsr_id, db_nsr)
510
511 async def test(self, param=None):
512 self.logger.debug("Starting/Ending test task: {}".format(param))
513
514 def cancel_tasks(self, nsr_id):
515 """
516 Cancel all active tasks of a concrete nsr identified for nsr_id
517 :param nsr_id: nsr identity
518 :return: None, or raises an exception if not possible
519 """
520 if not self.lcm_tasks.get(nsr_id):
521 return
522 for order_id, tasks_set in self.lcm_tasks[nsr_id].items():
523 for task_name, task in tasks_set.items():
524 result = task.cancel()
525 if result:
526 self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name))
527 self.lcm_tasks[nsr_id] = {}
528
529 async def read_kafka(self):
530 self.logger.debug("kafka task Enter")
531 order_id = 1
532 # future = asyncio.Future()
533
534 while True:
535 command, params = await self.msg.aioread("ns", self.loop)
536 order_id += 1
537 if command == "exit":
538 print("Bye!")
539 break
540 elif command.startswith("#"):
541 continue
542 elif command == "echo":
543 # just for test
544 print(params)
545 elif command == "test":
546 asyncio.Task(self.test(params), loop=self.loop)
547 elif command == "break":
548 print("put a break in this line of code")
549 elif command == "create":
550 nsr_id = params.strip()
551 self.logger.debug("Deploying NS {}".format(nsr_id))
552 task = asyncio.ensure_future(self.create_ns(nsr_id, order_id))
553 if nsr_id not in self.lcm_tasks:
554 self.lcm_tasks[nsr_id] = {}
555 self.lcm_tasks[nsr_id][order_id] = {"create_ns": task}
556 elif command == "delete":
557 nsr_id = params.strip()
558 self.logger.debug("Deleting NS {}".format(nsr_id))
559 self.cancel_tasks(nsr_id)
560 task = asyncio.ensure_future(self.delete_ns(nsr_id, order_id))
561 if nsr_id not in self.lcm_tasks:
562 self.lcm_tasks[nsr_id] = {}
563 self.lcm_tasks[nsr_id][order_id] = {"delete_ns": task}
564 elif command == "show":
565 # just for test
566 nsr_id = params.strip()
567 try:
568 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
569 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
570 "{}\n deploy: {}\n tasks: {}".format(
571 nsr_id, db_nsr["operational-status"],
572 db_nsr["config-status"], db_nsr["detailed-status"],
573 db_nsr["_admin"]["deploy"], self.lcm_tasks.get(nsr_id)))
574 except Exception as e:
575 print("nsr {} not found: {}".format(nsr_id, e))
576 else:
577 self.logger.critical("unknown command '{}'".format(command))
578 self.logger.debug("kafka task Exit")
579
580
581 def start(self):
582 self.loop = asyncio.get_event_loop()
583 self.loop.run_until_complete(self.read_kafka())
584 self.loop.close()
585 self.loop = None
586
587
588 def read_config_file(self, config_file):
589 # TODO make a [ini] + yaml inside parser
590 # the configparser library is not suitable, because it does not admit comments at the end of line,
591 # and not parse integer or boolean
592 try:
593 with open(config_file) as f:
594 conf = yaml.load(f)
595 for k, v in environ.items():
596 if not k.startswith("OSMLCM_"):
597 continue
598 k_items = k.lower().split("_")
599 c = conf
600 try:
601 for k_item in k_items[1:-1]:
602 if k_item in ("ro", "vca"):
603 # put in capital letter
604 k_item = k_item.upper()
605 c = c[k_item]
606 if k_items[-1] == "port":
607 c[k_items[-1]] = int(v)
608 else:
609 c[k_items[-1]] = v
610 except Exception as e:
611 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
612
613 return conf
614 except Exception as e:
615 self.logger.critical("At config file '{}': {}".format(config_file, e))
616
617
618 if __name__ == '__main__':
619
620 config_file = "lcm.cfg"
621 lcm = Lcm(config_file)
622
623 lcm.start()