2 # -*- coding: utf-8 -*-
14 from dbbase
import DbException
15 from fsbase
import FsException
16 from msgbase
import MsgException
17 from os
import environ
18 from vca
import DeployApplication
, RemoveApplication
19 from copy
import deepcopy
20 from http
import HTTPStatus
22 class LcmException(Exception):
28 def __init__(self
, config_file
):
30 Init, Connect to database, filesystem storage, and messaging
31 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
34 # contains created tasks/futures to be able to cancel
37 self
.logger
= logging
.getLogger('lcm')
39 config
= self
.read_config_file(config_file
)
42 "endpoint_url": "http://{}:{}/openmano".format(config
["RO"]["host"], config
["RO"]["port"]),
43 "tenant": config
.get("tenant", "osm"),
44 "logger_name": "lcm.ROclient",
47 self
.vca
= config
["VCA"] # TODO VCA
51 log_format_simple
= "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
52 log_formatter_simple
= logging
.Formatter(log_format_simple
, datefmt
='%Y-%m-%dT%H:%M:%S')
53 config
["database"]["logger_name"] = "lcm.db"
54 config
["storage"]["logger_name"] = "lcm.fs"
55 config
["message"]["logger_name"] = "lcm.msg"
56 if "logfile" in config
["global"]:
57 file_handler
= logging
.handlers
.RotatingFileHandler(config
["global"]["logfile"],
58 maxBytes
=100e6
, backupCount
=9, delay
=0)
59 file_handler
.setFormatter(log_formatter_simple
)
60 self
.logger
.addHandler(file_handler
)
62 str_handler
= logging
.StreamHandler()
63 str_handler
.setFormatter(log_formatter_simple
)
64 self
.logger
.addHandler(str_handler
)
66 if config
["global"].get("loglevel"):
67 self
.logger
.setLevel(config
["global"]["loglevel"])
69 # logging other modules
70 for k1
, logname
in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
71 config
[k1
]["logger_name"] = logname
72 logger_module
= logging
.getLogger(logname
)
73 if "logfile" in config
[k1
]:
74 file_handler
= logging
.handlers
.RotatingFileHandler(config
[k1
]["logfile"],
75 maxBytes
=100e6
, backupCount
=9, delay
=0)
76 file_handler
.setFormatter(log_formatter_simple
)
77 logger_module
.addHandler(file_handler
)
78 if "loglevel" in config
[k1
]:
79 logger_module
.setLevel(config
[k1
]["loglevel"])
82 if config
["database"]["driver"] == "mongo":
83 self
.db
= dbmongo
.DbMongo()
84 self
.db
.db_connect(config
["database"])
85 elif config
["database"]["driver"] == "memory":
86 self
.db
= dbmemory
.DbMemory()
87 self
.db
.db_connect(config
["database"])
89 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
90 config
["database"]["driver"]))
92 if config
["storage"]["driver"] == "local":
93 self
.fs
= fslocal
.FsLocal()
94 self
.fs
.fs_connect(config
["storage"])
96 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
97 config
["storage"]["driver"]))
99 if config
["message"]["driver"] == "local":
100 self
.msg
= msglocal
.MsgLocal()
101 self
.msg
.connect(config
["message"])
102 elif config
["message"]["driver"] == "kafka":
103 self
.msg
= msgkafka
.MsgKafka()
104 self
.msg
.connect(config
["message"])
106 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
107 config
["storage"]["driver"]))
108 except (DbException
, FsException
, MsgException
) as e
:
109 self
.logger
.critical(str(e
), exc_info
=True)
110 raise LcmException(str(e
))
112 def update_nsr_db(self
, nsr_id
, nsr_desc
):
114 self
.db
.replace("nsrs", nsr_id
, nsr_desc
)
115 except DbException
as e
:
116 self
.logger
.error("Updating nsr_id={}: {}".format(nsr_id
, e
))
118 def vca_deploy_callback(self
, db_nsr
, vnf_index
, status
, task
):
119 # TODO study using this callback when VCA.DeployApplication success from VCAMonitor
120 # By the moment this callback is used only to capture exception conditions from VCA DeployApplication
121 nsr_id
= db_nsr
["_id"]
122 self
.logger
.debug("Task create_ns={} vca_deploy_callback Enter".format(nsr_id
))
127 exc
= task
.exception()
129 nsr_lcm
= db_nsr
["_admin"]["deploy"]
130 nsr_lcm
["VCA"][vnf_index
]['operational-status'] = "failed"
131 db_nsr
["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_index
, exc
)
132 db_nsr
["config-status"] = "failed"
133 self
.update_nsr_db(nsr_id
, db_nsr
)
135 # TODO may be used to be called when VCA monitor status changes
137 # except DbException as e:
138 # self.logger.error("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id, e))
139 except Exception as e
:
140 self
.logger
.critical("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id
, e
), exc_info
=True)
142 async def create_ns(self
, nsr_id
, order_id
):
143 logging_text
= "Task create_ns={} ".format(nsr_id
)
144 self
.logger
.debug(logging_text
+ "Enter")
145 # get all needed from database
148 step
= "Getting nsr from db"
150 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
153 for c_vnf
in nsd
["constituent-vnfd"]:
154 vnfd_id
= c_vnf
["vnfd-id-ref"]
155 if vnfd_id
not in needed_vnfd
:
156 step
= "Getting vnfd={} from db".format(vnfd_id
)
157 needed_vnfd
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
161 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
165 db_nsr
["_admin"]["deploy"] = nsr_lcm
166 db_nsr
["detailed-status"] = "creating"
167 db_nsr
["operational-status"] = "init"
169 deloyment_timeout
= 120
171 RO
= ROclient
.ROClient(self
.loop
, datacenter
=db_nsr
["datacenter"], **self
.ro_config
)
173 # get vnfds, instantiate at RO
174 for vnfd_id
, vnfd
in needed_vnfd
.items():
175 step
= db_nsr
["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id
)
176 self
.logger
.debug(logging_text
+ step
)
177 vnfd_id_RO
= nsr_id
+ "." + vnfd_id
[:200]
180 vnfd_list
= await RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
182 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = vnfd_list
[0]["uuid"]
183 self
.logger
.debug(logging_text
+ "RO vnfd={} exist. Using RO_id={}".format(
184 vnfd_id
, vnfd_list
[0]["uuid"]))
186 vnfd_RO
= deepcopy(vnfd
)
187 vnfd_RO
.pop("_id", None)
188 vnfd_RO
.pop("_admin", None)
189 vnfd_RO
["id"] = vnfd_id_RO
190 desc
= await RO
.create("vnfd", descriptor
=vnfd_RO
)
191 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = desc
["uuid"]
192 self
.update_nsr_db(nsr_id
, db_nsr
)
196 step
= db_nsr
["detailed-status"] = "Creating nsd={} at RO".format(nsd_id
)
197 self
.logger
.debug(logging_text
+ step
)
199 nsd_id_RO
= nsd_id
+ "." + nsd_id
[:200]
200 nsd_list
= await RO
.get_list("nsd", filter_by
={"osm_id": nsd_id_RO
})
202 nsr_lcm
["RO"]["nsd_id"] = nsd_list
[0]["uuid"]
203 self
.logger
.debug(logging_text
+ "RO nsd={} exist. Using RO_id={}".format(
204 nsd_id
, nsd_list
[0]["uuid"]))
206 nsd_RO
= deepcopy(nsd
)
207 nsd_RO
["id"] = nsd_id_RO
208 nsd_RO
.pop("_id", None)
209 nsd_RO
.pop("_admin", None)
210 for c_vnf
in nsd_RO
["constituent-vnfd"]:
211 vnfd_id
= c_vnf
["vnfd-id-ref"]
212 c_vnf
["vnfd-id-ref"] = nsr_id
+ "." + vnfd_id
[:200]
213 desc
= await RO
.create("nsd", descriptor
=nsd_RO
)
214 nsr_lcm
["RO"]["nsd_id"] = desc
["uuid"]
215 self
.update_nsr_db(nsr_id
, db_nsr
)
218 # if present use it unless in error status
219 RO_nsr_id
= nsr_lcm
["RO"].get("nsr_id")
222 step
= db_nsr
["detailed-status"] = "Looking for existing ns at RO"
223 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
224 desc
= await RO
.show("ns", RO_nsr_id
)
225 except ROclient
.ROClientException
as e
:
226 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
228 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
230 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
231 nsr_lcm
["RO"]["nsr_status"] = ns_status
232 if ns_status
== "ERROR":
233 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
234 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
235 await RO
.delete("ns", RO_nsr_id
)
236 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
239 step
= db_nsr
["detailed-status"] = "Creating ns at RO"
240 self
.logger
.debug(logging_text
+ step
)
242 desc
= await RO
.create("ns", name
=db_nsr
["name"], datacenter
=db_nsr
["datacenter"],
243 scenario
=nsr_lcm
["RO"]["nsd_id"])
244 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = desc
["uuid"]
245 nsr_lcm
["RO"]["nsr_status"] = "BUILD"
246 self
.update_nsr_db(nsr_id
, db_nsr
)
248 # wait until NS is ready
249 step
= ns_status_detailed
= "Waiting ns ready at RO"
250 db_nsr
["detailed-status"] = ns_status_detailed
251 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
252 deloyment_timeout
= 600
253 while deloyment_timeout
> 0:
254 desc
= await RO
.show("ns", RO_nsr_id
)
255 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
256 nsr_lcm
["RO"]["nsr_status"] = ns_status
257 if ns_status
== "ERROR":
258 raise ROclient
.ROClientException(ns_status_info
)
259 elif ns_status
== "BUILD":
260 db_nsr
["detailed-status"] = ns_status_detailed
+ "; {}".format(ns_status_info
)
261 self
.update_nsr_db(nsr_id
, db_nsr
)
262 elif ns_status
== "ACTIVE":
263 nsr_lcm
["nsr_ip"] = RO
.get_ns_vnf_ip(desc
)
266 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
268 await asyncio
.sleep(5, loop
=self
.loop
)
269 deloyment_timeout
-= 5
270 if deloyment_timeout
<= 0:
271 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
272 db_nsr
["detailed-status"] = "Configuring vnfr"
273 self
.update_nsr_db(nsr_id
, db_nsr
)
276 step
= "Looking for needed vnfd to configure"
277 self
.logger
.debug(logging_text
+ step
)
278 for c_vnf
in nsd
["constituent-vnfd"]:
279 vnfd_id
= c_vnf
["vnfd-id-ref"]
280 vnf_index
= str(c_vnf
["member-vnf-index"])
281 vnfd
= needed_vnfd
[vnfd_id
]
282 if vnfd
.get("vnf-configuration") and vnfd
["vnf-configuration"].get("juju"):
283 nsr_lcm
["VCA"][vnf_index
] = {}
285 proxy_charm
= vnfd
["vnf-configuration"]["juju"]["charm"]
287 # Note: The charm needs to exist on disk at the location
288 # specified by charm_path.
289 base_folder
= vnfd
["_admin"]["storage"]
290 charm_path
= "{}{}/{}/charms/{}".format(
292 base_folder
["folder"],
296 task
= asyncio
.ensure_future(
306 task
.add_done_callback(functools
.partial(self
.vca_deploy_callback
, db_nsr
, vnf_index
, None))
307 self
.lcm_tasks
[nsr_id
][order_id
]["create_charm:" + vnf_index
] = task
308 db_nsr
["config-status"] = "configuring" if vnfd_to_config
else "configured"
309 db_nsr
["detailed-status"] = "Configuring 1/{}".format(vnfd_to_config
) if vnfd_to_config
else "done"
310 db_nsr
["operational-status"] = "running"
311 self
.update_nsr_db(nsr_id
, db_nsr
)
313 self
.logger
.debug("create_ns task nsr_id={} Exit Ok".format(nsr_id
))
316 except (ROclient
.ROClientException
, DbException
) as e
:
317 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
319 except Exception as e
:
320 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
324 db_nsr
["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
325 db_nsr
["operational-status"] = "failed"
326 self
.update_nsr_db(nsr_id
, db_nsr
)
328 async def delete_ns(self
, nsr_id
, order_id
):
329 logging_text
= "Task delete_ns={} ".format(nsr_id
)
330 self
.logger
.debug(logging_text
+ "Enter")
333 step
= "Getting nsr from db"
335 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
337 nsr_lcm
= db_nsr
["_admin"]["deploy"]
339 db_nsr
["operational-status"] = "terminate"
340 db_nsr
["config-status"] = "terminate"
341 db_nsr
["detailed-status"] = "Deleting charms"
342 self
.update_nsr_db(nsr_id
, db_nsr
)
345 step
= db_nsr
["detailed-status"] = "Deleting charms"
346 self
.logger
.debug(logging_text
+ step
)
347 for vnf_index
, deploy_info
in nsr_lcm
["VCA"].items():
348 if deploy_info
and deploy_info
.get("appliation"):
349 task
= asyncio
.ensure_future(
357 self
.lcm_tasks
[nsr_id
][order_id
]["delete_charm:" + vnf_index
] = task
358 except Exception as e
:
359 self
.logger
.debug(logging_text
+ "Failed while deleting charms: {}".format(e
))
362 RO
= ROclient
.ROClient(self
.loop
, datacenter
=db_nsr
["datacenter"], **self
.ro_config
)
364 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"]
367 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
368 self
.logger
.debug(logging_text
+ step
)
369 desc
= await RO
.delete("ns", RO_nsr_id
)
370 nsr_lcm
["RO"]["nsr_id"] = None
371 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
372 except ROclient
.ROClientException
as e
:
373 if e
.http_code
== 404: # not found
374 nsr_lcm
["RO"]["nsr_id"] = None
375 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
376 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(RO_nsr_id
))
377 elif e
.http_code
== 409: #conflict
378 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(RO_nsr_id
, e
))
380 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(RO_nsr_id
, e
))
381 self
.update_nsr_db(nsr_id
, db_nsr
)
384 RO_nsd_id
= nsr_lcm
["RO"]["nsd_id"]
387 step
= db_nsr
["detailed-status"] = "Deleting nsd at RO"
388 desc
= await RO
.delete("nsd", RO_nsd_id
)
389 self
.logger
.debug(logging_text
+ "RO_nsd_id={} deleted".format(RO_nsd_id
))
390 nsr_lcm
["RO"]["nsd_id"] = None
391 except ROclient
.ROClientException
as e
:
392 if e
.http_code
== 404: # not found
393 nsr_lcm
["RO"]["nsd_id"] = None
394 self
.logger
.debug(logging_text
+ "RO_nsd_id={} already deleted".format(RO_nsd_id
))
395 elif e
.http_code
== 409: #conflict
396 self
.logger
.debug(logging_text
+ "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id
, e
))
398 self
.logger
.error(logging_text
+ "RO_nsd_id={} delete error: {}".format(RO_nsd_id
, e
))
399 self
.update_nsr_db(nsr_id
, db_nsr
)
401 for vnf_id
, RO_vnfd_id
in nsr_lcm
["RO"]["vnfd_id"].items():
405 step
= db_nsr
["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id
)
406 desc
= await RO
.delete("vnfd", RO_vnfd_id
)
407 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} deleted".format(RO_vnfd_id
))
408 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
409 except ROclient
.ROClientException
as e
:
410 if e
.http_code
== 404: # not found
411 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
412 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} already deleted ".format(RO_vnfd_id
))
413 elif e
.http_code
== 409: #conflict
414 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id
, e
))
416 self
.logger
.error(logging_text
+ "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id
, e
))
417 self
.update_nsr_db(nsr_id
, db_nsr
)
419 # TODO delete from database or mark as deleted???
420 db_nsr
["operational-status"] = "terminated"
421 self
.db
.del_one("nsrs", {"_id": nsr_id
})
422 self
.logger
.debug(logging_text
+ "Exit")
424 except (ROclient
.ROClientException
, DbException
) as e
:
425 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
427 except Exception as e
:
428 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
432 db_nsr
["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
433 db_nsr
["operational-status"] = "failed"
434 self
.update_nsr_db(nsr_id
, db_nsr
)
436 async def test(self
, param
=None):
437 self
.logger
.debug("Starting/Ending test task: {}".format(param
))
439 def cancel_tasks(self
, nsr_id
):
441 Cancel all active tasks of a concrete nsr identified for nsr_id
442 :param nsr_id: nsr identity
443 :return: None, or raises an exception if not possible
445 if not self
.lcm_tasks
.get(nsr_id
):
447 for order_id
, tasks_set
in self
.lcm_tasks
[nsr_id
].items():
448 for task_name
, task
in tasks_set
.items():
449 result
= task
.cancel()
451 self
.logger
.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id
, order_id
, task_name
))
452 self
.lcm_tasks
[nsr_id
] = {}
454 async def read_kafka(self
):
455 self
.logger
.debug("kafka task Enter")
457 # future = asyncio.Future()
460 command
, params
= await self
.msg
.aioread("ns", self
.loop
)
462 if command
== "exit":
465 elif command
.startswith("#"):
467 elif command
== "echo":
470 elif command
== "test":
471 asyncio
.Task(self
.test(params
), loop
=self
.loop
)
472 elif command
== "break":
473 print("put a break in this line of code")
474 elif command
== "create":
475 nsr_id
= params
.strip()
476 self
.logger
.debug("Deploying NS {}".format(nsr_id
))
477 task
= asyncio
.ensure_future(self
.create_ns(nsr_id
, order_id
))
478 if nsr_id
not in self
.lcm_tasks
:
479 self
.lcm_tasks
[nsr_id
] = {}
480 self
.lcm_tasks
[nsr_id
][order_id
] = {"create_ns": task
}
481 elif command
== "delete":
482 nsr_id
= params
.strip()
483 self
.logger
.debug("Deleting NS {}".format(nsr_id
))
484 self
.cancel_tasks(nsr_id
)
485 task
= asyncio
.ensure_future(self
.delete_ns(nsr_id
, order_id
))
486 if nsr_id
not in self
.lcm_tasks
:
487 self
.lcm_tasks
[nsr_id
] = {}
488 self
.lcm_tasks
[nsr_id
][order_id
] = {"delete_ns": task
}
489 elif command
== "show":
491 nsr_id
= params
.strip()
493 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
494 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
495 "{}\n deploy: {}\n tasks: {}".format(
496 nsr_id
, db_nsr
["operational-status"],
497 db_nsr
["config-status"], db_nsr
["detailed-status"],
498 db_nsr
["_admin"]["deploy"], self
.lcm_tasks
.get(nsr_id
)))
499 except Exception as e
:
500 print("nsr {} not found: {}".format(nsr_id
, e
))
502 self
.logger
.critical("unknown command '{}'".format(command
))
503 self
.logger
.debug("kafka task Exit")
507 self
.loop
= asyncio
.get_event_loop()
508 self
.loop
.run_until_complete(self
.read_kafka())
513 def read_config_file(self
, config_file
):
514 # TODO make a [ini] + yaml inside parser
515 # the configparser library is not suitable, because it does not admit comments at the end of line,
516 # and not parse integer or boolean
518 with
open(config_file
) as f
:
520 for k
, v
in environ
.items():
521 if not k
.startswith("OSMLCM_"):
523 k_items
= k
.lower().split("_")
526 for k_item
in k_items
[1:-1]:
527 if k_item
in ("ro", "vca"):
528 # put in capital letter
529 k_item
= k_item
.upper()
531 if k_items
[-1] == "port":
532 c
[k_items
[-1]] = int(v
)
535 except Exception as e
:
536 self
.logger
.warn("skipping environ '{}' on exception '{}'".format(k
, e
))
539 except Exception as e
:
540 self
.logger
.critical("At config file '{}': {}".format(config_file
, e
))
543 if __name__
== '__main__':
545 config_file
= "lcm.cfg"
546 lcm
= Lcm(config_file
)