ceabda5b80fef69b53699e4189a4e83f8b230a5a
2 # -*- coding: utf-8 -*-
14 from dbbase
import DbException
15 from fsbase
import FsException
16 from msgbase
import MsgException
17 from os
import environ
18 # from vca import DeployApplication, RemoveApplication
19 from n2vc
.vnf
import N2VC
23 from copy
import deepcopy
24 from http
import HTTPStatus
26 class LcmException(Exception):
32 def __init__(self
, config_file
):
34 Init, Connect to database, filesystem storage, and messaging
35 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
38 # contains created tasks/futures to be able to cancel
41 self
.logger
= logging
.getLogger('lcm')
43 config
= self
.read_config_file(config_file
)
46 "endpoint_url": "http://{}:{}/openmano".format(config
["RO"]["host"], config
["RO"]["port"]),
47 "tenant": config
.get("tenant", "osm"),
48 "logger_name": "lcm.ROclient",
52 self
.vca
= config
["VCA"] # TODO VCA
56 log_format_simple
= "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
57 log_formatter_simple
= logging
.Formatter(log_format_simple
, datefmt
='%Y-%m-%dT%H:%M:%S')
58 config
["database"]["logger_name"] = "lcm.db"
59 config
["storage"]["logger_name"] = "lcm.fs"
60 config
["message"]["logger_name"] = "lcm.msg"
61 if "logfile" in config
["global"]:
62 file_handler
= logging
.handlers
.RotatingFileHandler(config
["global"]["logfile"],
63 maxBytes
=100e6
, backupCount
=9, delay
=0)
64 file_handler
.setFormatter(log_formatter_simple
)
65 self
.logger
.addHandler(file_handler
)
67 str_handler
= logging
.StreamHandler()
68 str_handler
.setFormatter(log_formatter_simple
)
69 self
.logger
.addHandler(str_handler
)
71 if config
["global"].get("loglevel"):
72 self
.logger
.setLevel(config
["global"]["loglevel"])
74 # logging other modules
75 for k1
, logname
in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
76 config
[k1
]["logger_name"] = logname
77 logger_module
= logging
.getLogger(logname
)
78 if "logfile" in config
[k1
]:
79 file_handler
= logging
.handlers
.RotatingFileHandler(config
[k1
]["logfile"],
80 maxBytes
=100e6
, backupCount
=9, delay
=0)
81 file_handler
.setFormatter(log_formatter_simple
)
82 logger_module
.addHandler(file_handler
)
83 if "loglevel" in config
[k1
]:
84 logger_module
.setLevel(config
[k1
]["loglevel"])
88 server
=config
['VCA']['host'],
89 port
=config
['VCA']['port'],
90 user
=config
['VCA']['user'],
91 secret
=config
['VCA']['secret'],
92 # TODO: This should point to the base folder where charms are stored,
93 # if there is a common one (like object storage). Otherwise, leave
94 # it unset and pass it via DeployCharms
95 # artifacts=config['VCA'][''],
100 if config
["database"]["driver"] == "mongo":
101 self
.db
= dbmongo
.DbMongo()
102 self
.db
.db_connect(config
["database"])
103 elif config
["database"]["driver"] == "memory":
104 self
.db
= dbmemory
.DbMemory()
105 self
.db
.db_connect(config
["database"])
107 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
108 config
["database"]["driver"]))
110 if config
["storage"]["driver"] == "local":
111 self
.fs
= fslocal
.FsLocal()
112 self
.fs
.fs_connect(config
["storage"])
114 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
115 config
["storage"]["driver"]))
117 if config
["message"]["driver"] == "local":
118 self
.msg
= msglocal
.MsgLocal()
119 self
.msg
.connect(config
["message"])
120 elif config
["message"]["driver"] == "kafka":
121 self
.msg
= msgkafka
.MsgKafka()
122 self
.msg
.connect(config
["message"])
124 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
125 config
["storage"]["driver"]))
126 except (DbException
, FsException
, MsgException
) as e
:
127 self
.logger
.critical(str(e
), exc_info
=True)
128 raise LcmException(str(e
))
130 def update_nsr_db(self
, nsr_id
, nsr_desc
):
132 self
.db
.replace("nsrs", nsr_id
, nsr_desc
)
133 except DbException
as e
:
134 self
.logger
.error("Updating nsr_id={}: {}".format(nsr_id
, e
))
136 def n2vc_callback(self
, model_name
, application_name
, workload_status
, db_nsr
, vnf_member_index
, task
=None):
137 """Update the lcm database with the status of the charm.
139 Updates the VNF's operational status with the state of the charm:
140 - blocked: The unit needs manual intervention
141 - maintenance: The unit is actively deploying/configuring
142 - waiting: The unit is waiting for another charm to be ready
143 - active: The unit is deployed, configured, and ready
144 - error: The charm has failed and needs attention.
145 - terminated: The charm has been destroyed
147 Updates the network service's config-status to reflect the state of all
151 if not workload_status
and not task
:
152 self
.logger
.error("Task create_ns={} n2vc_callback Enter with bad parameters")
155 if not workload_status
:
156 self
.logger
.error("Task create_ns={} n2vc_callback Enter with bad parameters, no workload_status")
160 self
.logger
.debug("[n2vc_callback] Workload status \"{}\"".format(workload_status
))
161 nsr_id
= db_nsr
["_id"]
162 nsr_lcm
= db_nsr
["_admin"]["deploy"]
163 nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] = workload_status
170 exc
= task
.exception()
172 nsr_lcm
= db_nsr
["_admin"]["deploy"]
173 nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] = "failed"
174 db_nsr
["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_member_index
, exc
)
175 db_nsr
["config-status"] = "failed"
176 self
.update_nsr_db(nsr_id
, db_nsr
)
178 vca_status
= nsr_lcm
["VCA"][vnf_member_index
]['operational-status']
180 units
= len(nsr_lcm
["VCA"])
183 for vnf_index
in nsr_lcm
["VCA"]:
184 if vca_status
not in statusmap
:
186 statusmap
[vca_status
] = 0
188 statusmap
[vca_status
] += 1
190 if vca_status
== "active":
194 for status
in statusmap
:
195 cs
+= "{} ({}) ".format(status
, statusmap
[status
])
196 db_nsr
["config-status"] = cs
197 self
.update_nsr_db(nsr_id
, db_nsr
)
199 except Exception as e
:
200 # self.logger.critical("Task create_ns={} n2vc_callback Exception {}".format(nsr_id, e), exc_info=True)
201 self
.logger
.critical("Task create_ns n2vc_callback Exception {}".format(e
), exc_info
=True)
204 def vca_deploy_callback(self
, db_nsr
, vnf_index
, status
, task
):
205 # TODO study using this callback when VCA.DeployApplication success from VCAMonitor
206 # By the moment this callback is used only to capture exception conditions from VCA DeployApplication
207 nsr_id
= db_nsr
["_id"]
208 self
.logger
.debug("Task create_ns={} vca_deploy_callback Enter".format(nsr_id
))
213 exc
= task
.exception()
215 nsr_lcm
= db_nsr
["_admin"]["deploy"]
216 nsr_lcm
["VCA"][vnf_index
]['operational-status'] = "failed"
217 db_nsr
["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_index
, exc
)
218 db_nsr
["config-status"] = "failed"
219 self
.update_nsr_db(nsr_id
, db_nsr
)
221 # TODO may be used to be called when VCA monitor status changes
223 # except DbException as e:
224 # self.logger.error("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id, e))
225 except Exception as e
:
226 self
.logger
.critical("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id
, e
), exc_info
=True)
228 async def create_ns(self
, nsr_id
, order_id
):
229 logging_text
= "Task create_ns={} ".format(nsr_id
)
230 self
.logger
.debug(logging_text
+ "Enter")
231 # get all needed from database
234 step
= "Getting nsr from db"
236 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
239 for c_vnf
in nsd
["constituent-vnfd"]:
240 vnfd_id
= c_vnf
["vnfd-id-ref"]
241 if vnfd_id
not in needed_vnfd
:
242 step
= "Getting vnfd={} from db".format(vnfd_id
)
243 needed_vnfd
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
247 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
251 db_nsr
["_admin"]["deploy"] = nsr_lcm
252 db_nsr
["detailed-status"] = "creating"
253 db_nsr
["operational-status"] = "init"
255 deloyment_timeout
= 120
257 RO
= ROclient
.ROClient(self
.loop
, datacenter
=db_nsr
["datacenter"], **self
.ro_config
)
259 # get vnfds, instantiate at RO
260 for vnfd_id
, vnfd
in needed_vnfd
.items():
261 step
= db_nsr
["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id
)
262 self
.logger
.debug(logging_text
+ step
)
263 vnfd_id_RO
= nsr_id
+ "." + vnfd_id
[:200]
266 vnfd_list
= await RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
268 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = vnfd_list
[0]["uuid"]
269 self
.logger
.debug(logging_text
+ "RO vnfd={} exist. Using RO_id={}".format(
270 vnfd_id
, vnfd_list
[0]["uuid"]))
272 vnfd_RO
= deepcopy(vnfd
)
273 vnfd_RO
.pop("_id", None)
274 vnfd_RO
.pop("_admin", None)
275 vnfd_RO
["id"] = vnfd_id_RO
276 desc
= await RO
.create("vnfd", descriptor
=vnfd_RO
)
277 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = desc
["uuid"]
278 self
.update_nsr_db(nsr_id
, db_nsr
)
282 step
= db_nsr
["detailed-status"] = "Creating nsd={} at RO".format(nsd_id
)
283 self
.logger
.debug(logging_text
+ step
)
285 nsd_id_RO
= nsd_id
+ "." + nsd_id
[:200]
286 nsd_list
= await RO
.get_list("nsd", filter_by
={"osm_id": nsd_id_RO
})
288 nsr_lcm
["RO"]["nsd_id"] = nsd_list
[0]["uuid"]
289 self
.logger
.debug(logging_text
+ "RO nsd={} exist. Using RO_id={}".format(
290 nsd_id
, nsd_list
[0]["uuid"]))
292 nsd_RO
= deepcopy(nsd
)
293 nsd_RO
["id"] = nsd_id_RO
294 nsd_RO
.pop("_id", None)
295 nsd_RO
.pop("_admin", None)
296 for c_vnf
in nsd_RO
["constituent-vnfd"]:
297 vnfd_id
= c_vnf
["vnfd-id-ref"]
298 c_vnf
["vnfd-id-ref"] = nsr_id
+ "." + vnfd_id
[:200]
299 desc
= await RO
.create("nsd", descriptor
=nsd_RO
)
300 nsr_lcm
["RO"]["nsd_id"] = desc
["uuid"]
301 self
.update_nsr_db(nsr_id
, db_nsr
)
304 # if present use it unless in error status
305 RO_nsr_id
= nsr_lcm
["RO"].get("nsr_id")
308 step
= db_nsr
["detailed-status"] = "Looking for existing ns at RO"
309 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
310 desc
= await RO
.show("ns", RO_nsr_id
)
311 except ROclient
.ROClientException
as e
:
312 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
314 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
316 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
317 nsr_lcm
["RO"]["nsr_status"] = ns_status
318 if ns_status
== "ERROR":
319 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
320 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
321 await RO
.delete("ns", RO_nsr_id
)
322 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
325 step
= db_nsr
["detailed-status"] = "Creating ns at RO"
326 self
.logger
.debug(logging_text
+ step
)
328 desc
= await RO
.create("ns", name
=db_nsr
["name"], datacenter
=db_nsr
["datacenter"],
329 scenario
=nsr_lcm
["RO"]["nsd_id"])
330 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = desc
["uuid"]
331 nsr_lcm
["RO"]["nsr_status"] = "BUILD"
332 self
.update_nsr_db(nsr_id
, db_nsr
)
334 # wait until NS is ready
335 step
= ns_status_detailed
= "Waiting ns ready at RO"
336 db_nsr
["detailed-status"] = ns_status_detailed
337 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
338 deloyment_timeout
= 600
339 while deloyment_timeout
> 0:
340 desc
= await RO
.show("ns", RO_nsr_id
)
341 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
342 nsr_lcm
["RO"]["nsr_status"] = ns_status
343 if ns_status
== "ERROR":
344 raise ROclient
.ROClientException(ns_status_info
)
345 elif ns_status
== "BUILD":
346 db_nsr
["detailed-status"] = ns_status_detailed
+ "; {}".format(ns_status_info
)
347 self
.update_nsr_db(nsr_id
, db_nsr
)
348 elif ns_status
== "ACTIVE":
349 nsr_lcm
["nsr_ip"] = RO
.get_ns_vnf_ip(desc
)
352 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
354 await asyncio
.sleep(5, loop
=self
.loop
)
355 deloyment_timeout
-= 5
356 if deloyment_timeout
<= 0:
357 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
358 db_nsr
["detailed-status"] = "Configuring vnfr"
359 self
.update_nsr_db(nsr_id
, db_nsr
)
362 step
= "Looking for needed vnfd to configure"
363 self
.logger
.debug(logging_text
+ step
)
364 for c_vnf
in nsd
["constituent-vnfd"]:
365 vnfd_id
= c_vnf
["vnfd-id-ref"]
366 vnf_index
= str(c_vnf
["member-vnf-index"])
367 vnfd
= needed_vnfd
[vnfd_id
]
368 if vnfd
.get("vnf-configuration") and vnfd
["vnf-configuration"].get("juju"):
370 proxy_charm
= vnfd
["vnf-configuration"]["juju"]["charm"]
372 # Note: The charm needs to exist on disk at the location
373 # specified by charm_path.
374 base_folder
= vnfd
["_admin"]["storage"]
375 charm_path
= "{}{}/{}/charms/{}".format(
377 base_folder
["folder"],
382 # Setup the runtime parameters for this VNF
384 'rw_mgmt_ip': nsr_lcm
['nsr_ip'][vnf_index
],
387 # ns_name will be ignored in the current version of N2VC
388 # but will be implemented for the next point release.
390 application_name
= self
.n2vc
.FormatApplicationName(
396 nsr_lcm
["VCA"][vnf_index
] = {
398 "application": application_name
,
399 "operational-status": "init",
403 self
.logger
.debug("Passing artifacts path '{}' for {}".format(charm_path
, proxy_charm
))
404 task
= asyncio
.ensure_future(
405 self
.n2vc
.DeployCharms(
406 ns_name
, # The network service name
407 application_name
, # The application name
408 vnfd
, # The vnf descriptor
409 charm_path
, # Path to charm
410 params
, # Runtime params, like mgmt ip
411 {}, # for native charms only
412 self
.n2vc_callback
, # Callback for status changes
413 db_nsr
, # Callback parameter
414 vnf_index
, # Callback parameter
415 None, # Callback parameter (task)
418 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, None, None, None, None, db_nsr
))
420 self
.lcm_tasks
[nsr_id
][order_id
]["create_charm:" + vnf_index
] = task
421 db_nsr
["config-status"] = "configuring" if vnfd_to_config
else "configured"
422 db_nsr
["detailed-status"] = "Configuring 1/{}".format(vnfd_to_config
) if vnfd_to_config
else "done"
423 db_nsr
["operational-status"] = "running"
424 self
.update_nsr_db(nsr_id
, db_nsr
)
426 self
.logger
.debug("create_ns task nsr_id={} Exit Ok".format(nsr_id
))
429 except (ROclient
.ROClientException
, DbException
) as e
:
430 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
432 except Exception as e
:
433 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
437 db_nsr
["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
438 db_nsr
["operational-status"] = "failed"
439 self
.update_nsr_db(nsr_id
, db_nsr
)
441 async def delete_ns(self
, nsr_id
, order_id
):
442 logging_text
= "Task delete_ns={} ".format(nsr_id
)
443 self
.logger
.debug(logging_text
+ "Enter")
446 step
= "Getting nsr from db"
448 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
450 nsr_lcm
= db_nsr
["_admin"]["deploy"]
452 db_nsr
["operational-status"] = "terminate"
453 db_nsr
["config-status"] = "terminate"
454 db_nsr
["detailed-status"] = "Deleting charms"
455 self
.update_nsr_db(nsr_id
, db_nsr
)
458 step
= db_nsr
["detailed-status"] = "Deleting charms"
459 self
.logger
.debug(logging_text
+ step
)
460 for vnf_index
, deploy_info
in nsr_lcm
["VCA"].items():
461 if deploy_info
and deploy_info
.get("application"):
462 # n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
464 # self.n2vc.RemoveCharms(model_name, application_name, self.n2vc_callback, model_name, application_name)
465 task
= asyncio
.ensure_future(
466 self
.n2vc
.RemoveCharms(
467 deploy_info
['model'],
468 deploy_info
['application'],
475 self
.lcm_tasks
[nsr_id
][order_id
]["delete_charm:" + vnf_index
] = task
476 except Exception as e
:
477 self
.logger
.debug(logging_text
+ "Failed while deleting charms: {}".format(e
))
480 RO
= ROclient
.ROClient(self
.loop
, datacenter
=db_nsr
["datacenter"], **self
.ro_config
)
482 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"]
485 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
486 self
.logger
.debug(logging_text
+ step
)
487 desc
= await RO
.delete("ns", RO_nsr_id
)
488 nsr_lcm
["RO"]["nsr_id"] = None
489 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
490 except ROclient
.ROClientException
as e
:
491 if e
.http_code
== 404: # not found
492 nsr_lcm
["RO"]["nsr_id"] = None
493 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
494 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(RO_nsr_id
))
495 elif e
.http_code
== 409: #conflict
496 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(RO_nsr_id
, e
))
498 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(RO_nsr_id
, e
))
499 self
.update_nsr_db(nsr_id
, db_nsr
)
502 RO_nsd_id
= nsr_lcm
["RO"]["nsd_id"]
505 step
= db_nsr
["detailed-status"] = "Deleting nsd at RO"
506 desc
= await RO
.delete("nsd", RO_nsd_id
)
507 self
.logger
.debug(logging_text
+ "RO_nsd_id={} deleted".format(RO_nsd_id
))
508 nsr_lcm
["RO"]["nsd_id"] = None
509 except ROclient
.ROClientException
as e
:
510 if e
.http_code
== 404: # not found
511 nsr_lcm
["RO"]["nsd_id"] = None
512 self
.logger
.debug(logging_text
+ "RO_nsd_id={} already deleted".format(RO_nsd_id
))
513 elif e
.http_code
== 409: #conflict
514 self
.logger
.debug(logging_text
+ "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id
, e
))
516 self
.logger
.error(logging_text
+ "RO_nsd_id={} delete error: {}".format(RO_nsd_id
, e
))
517 self
.update_nsr_db(nsr_id
, db_nsr
)
519 for vnf_id
, RO_vnfd_id
in nsr_lcm
["RO"]["vnfd_id"].items():
523 step
= db_nsr
["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id
)
524 desc
= await RO
.delete("vnfd", RO_vnfd_id
)
525 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} deleted".format(RO_vnfd_id
))
526 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
527 except ROclient
.ROClientException
as e
:
528 if e
.http_code
== 404: # not found
529 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
530 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} already deleted ".format(RO_vnfd_id
))
531 elif e
.http_code
== 409: #conflict
532 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id
, e
))
534 self
.logger
.error(logging_text
+ "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id
, e
))
535 self
.update_nsr_db(nsr_id
, db_nsr
)
537 # TODO delete from database or mark as deleted???
538 db_nsr
["operational-status"] = "terminated"
539 self
.db
.del_one("nsrs", {"_id": nsr_id
})
540 self
.logger
.debug(logging_text
+ "Exit")
542 except (ROclient
.ROClientException
, DbException
) as e
:
543 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
545 except Exception as e
:
546 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
550 db_nsr
["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
551 db_nsr
["operational-status"] = "failed"
552 self
.update_nsr_db(nsr_id
, db_nsr
)
554 async def test(self
, param
=None):
555 self
.logger
.debug("Starting/Ending test task: {}".format(param
))
557 def cancel_tasks(self
, nsr_id
):
559 Cancel all active tasks of a concrete nsr identified for nsr_id
560 :param nsr_id: nsr identity
561 :return: None, or raises an exception if not possible
563 if not self
.lcm_tasks
.get(nsr_id
):
565 for order_id
, tasks_set
in self
.lcm_tasks
[nsr_id
].items():
566 for task_name
, task
in tasks_set
.items():
567 result
= task
.cancel()
569 self
.logger
.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id
, order_id
, task_name
))
570 self
.lcm_tasks
[nsr_id
] = {}
572 async def read_kafka(self
):
573 self
.logger
.debug("kafka task Enter")
575 # future = asyncio.Future()
578 command
, params
= await self
.msg
.aioread("ns", self
.loop
)
580 if command
== "exit":
583 elif command
.startswith("#"):
585 elif command
== "echo":
588 elif command
== "test":
589 asyncio
.Task(self
.test(params
), loop
=self
.loop
)
590 elif command
== "break":
591 print("put a break in this line of code")
592 elif command
== "create":
593 nsr_id
= params
.strip()
594 self
.logger
.debug("Deploying NS {}".format(nsr_id
))
595 task
= asyncio
.ensure_future(self
.create_ns(nsr_id
, order_id
))
596 if nsr_id
not in self
.lcm_tasks
:
597 self
.lcm_tasks
[nsr_id
] = {}
598 self
.lcm_tasks
[nsr_id
][order_id
] = {"create_ns": task
}
599 elif command
== "delete":
600 nsr_id
= params
.strip()
601 self
.logger
.debug("Deleting NS {}".format(nsr_id
))
602 self
.cancel_tasks(nsr_id
)
603 task
= asyncio
.ensure_future(self
.delete_ns(nsr_id
, order_id
))
604 if nsr_id
not in self
.lcm_tasks
:
605 self
.lcm_tasks
[nsr_id
] = {}
606 self
.lcm_tasks
[nsr_id
][order_id
] = {"delete_ns": task
}
607 elif command
== "show":
609 nsr_id
= params
.strip()
611 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
612 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
613 "{}\n deploy: {}\n tasks: {}".format(
614 nsr_id
, db_nsr
["operational-status"],
615 db_nsr
["config-status"], db_nsr
["detailed-status"],
616 db_nsr
["_admin"]["deploy"], self
.lcm_tasks
.get(nsr_id
)))
617 except Exception as e
:
618 print("nsr {} not found: {}".format(nsr_id
, e
))
620 self
.logger
.critical("unknown command '{}'".format(command
))
621 self
.logger
.debug("kafka task Exit")
625 self
.loop
= asyncio
.get_event_loop()
626 self
.loop
.run_until_complete(self
.read_kafka())
631 def read_config_file(self
, config_file
):
632 # TODO make a [ini] + yaml inside parser
633 # the configparser library is not suitable, because it does not admit comments at the end of line,
634 # and not parse integer or boolean
636 with
open(config_file
) as f
:
638 for k
, v
in environ
.items():
639 if not k
.startswith("OSMLCM_"):
641 k_items
= k
.lower().split("_")
644 for k_item
in k_items
[1:-1]:
645 if k_item
in ("ro", "vca"):
646 # put in capital letter
647 k_item
= k_item
.upper()
649 if k_items
[-1] == "port":
650 c
[k_items
[-1]] = int(v
)
653 except Exception as e
:
654 self
.logger
.warn("skipping environ '{}' on exception '{}'".format(k
, e
))
657 except Exception as e
:
658 self
.logger
.critical("At config file '{}': {}".format(config_file
, e
))
661 if __name__
== '__main__':
663 config_file
= "lcm.cfg"
664 lcm
= Lcm(config_file
)