2 # -*- coding: utf-8 -*-
15 from dbbase
import DbException
16 from fsbase
import FsException
17 from msgbase
import MsgException
18 from os
import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc
.vnf
import N2VC
24 from copy
import deepcopy
25 from http
import HTTPStatus
28 class LcmException(Exception):
34 def __init__(self
, config_file
):
36 Init, Connect to database, filesystem storage, and messaging
37 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
44 # contains created tasks/futures to be able to cancel
46 self
.lcm_ns_tasks
= {}
47 self
.lcm_vim_tasks
= {}
48 self
.lcm_sdn_tasks
= {}
50 self
.logger
= logging
.getLogger('lcm')
52 config
= self
.read_config_file(config_file
)
55 "endpoint_url": "http://{}:{}/openmano".format(config
["RO"]["host"], config
["RO"]["port"]),
56 "tenant": config
.get("tenant", "osm"),
57 "logger_name": "lcm.ROclient",
61 self
.vca
= config
["VCA"] # TODO VCA
65 log_format_simple
= "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
66 log_formatter_simple
= logging
.Formatter(log_format_simple
, datefmt
='%Y-%m-%dT%H:%M:%S')
67 config
["database"]["logger_name"] = "lcm.db"
68 config
["storage"]["logger_name"] = "lcm.fs"
69 config
["message"]["logger_name"] = "lcm.msg"
70 if "logfile" in config
["global"]:
71 file_handler
= logging
.handlers
.RotatingFileHandler(config
["global"]["logfile"],
72 maxBytes
=100e6
, backupCount
=9, delay
=0)
73 file_handler
.setFormatter(log_formatter_simple
)
74 self
.logger
.addHandler(file_handler
)
76 str_handler
= logging
.StreamHandler()
77 str_handler
.setFormatter(log_formatter_simple
)
78 self
.logger
.addHandler(str_handler
)
80 if config
["global"].get("loglevel"):
81 self
.logger
.setLevel(config
["global"]["loglevel"])
83 # logging other modules
84 for k1
, logname
in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
85 config
[k1
]["logger_name"] = logname
86 logger_module
= logging
.getLogger(logname
)
87 if "logfile" in config
[k1
]:
88 file_handler
= logging
.handlers
.RotatingFileHandler(config
[k1
]["logfile"],
89 maxBytes
=100e6
, backupCount
=9, delay
=0)
90 file_handler
.setFormatter(log_formatter_simple
)
91 logger_module
.addHandler(file_handler
)
92 if "loglevel" in config
[k1
]:
93 logger_module
.setLevel(config
[k1
]["loglevel"])
97 server
=config
['VCA']['host'],
98 port
=config
['VCA']['port'],
99 user
=config
['VCA']['user'],
100 secret
=config
['VCA']['secret'],
101 # TODO: This should point to the base folder where charms are stored,
102 # if there is a common one (like object storage). Otherwise, leave
103 # it unset and pass it via DeployCharms
104 # artifacts=config['VCA'][''],
109 if config
["database"]["driver"] == "mongo":
110 self
.db
= dbmongo
.DbMongo()
111 self
.db
.db_connect(config
["database"])
112 elif config
["database"]["driver"] == "memory":
113 self
.db
= dbmemory
.DbMemory()
114 self
.db
.db_connect(config
["database"])
116 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
117 config
["database"]["driver"]))
119 if config
["storage"]["driver"] == "local":
120 self
.fs
= fslocal
.FsLocal()
121 self
.fs
.fs_connect(config
["storage"])
123 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
124 config
["storage"]["driver"]))
126 if config
["message"]["driver"] == "local":
127 self
.msg
= msglocal
.MsgLocal()
128 self
.msg
.connect(config
["message"])
129 elif config
["message"]["driver"] == "kafka":
130 self
.msg
= msgkafka
.MsgKafka()
131 self
.msg
.connect(config
["message"])
133 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
134 config
["storage"]["driver"]))
135 except (DbException
, FsException
, MsgException
) as e
:
136 self
.logger
.critical(str(e
), exc_info
=True)
137 raise LcmException(str(e
))
139 def update_db(self
, item
, _id
, _desc
):
141 self
.db
.replace(item
, _id
, _desc
)
142 except DbException
as e
:
143 self
.logger
.error("Updating {} _id={}: {}".format(item
, _id
, e
))
145 async def create_vim(self
, vim_content
, order_id
):
146 vim_id
= vim_content
["_id"]
147 logging_text
= "Task create_vim={} ".format(vim_id
)
148 self
.logger
.debug(logging_text
+ "Enter")
152 step
= "Getting vim from db"
153 db_vim
= self
.db
.get_one("vims", {"_id": vim_id
})
154 if "_admin" not in db_vim
:
155 db_vim
["_admin"] = {}
156 if "deploy" not in db_vim
["_admin"]:
157 db_vim
["_admin"]["deploy"] = {}
158 db_vim
["_admin"]["deploy"]["RO"] = None
160 step
= "Creating vim at RO"
161 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
162 vim_RO
= deepcopy(vim_content
)
163 vim_RO
.pop("_id", None)
164 vim_RO
.pop("_admin", None)
165 vim_RO
.pop("schema_version", None)
166 vim_RO
.pop("schema_type", None)
167 vim_RO
.pop("vim_tenant_name", None)
168 vim_RO
["type"] = vim_RO
.pop("vim_type")
169 vim_RO
.pop("vim_user", None)
170 vim_RO
.pop("vim_password", None)
171 desc
= await RO
.create("vim", descriptor
=vim_RO
)
172 RO_vim_id
= desc
["uuid"]
173 db_vim
["_admin"]["deploy"]["RO"] = RO_vim_id
174 self
.update_db("vims", vim_id
, db_vim
)
176 step
= "Attach vim to RO tenant"
177 vim_RO
= {"vim_tenant_name": vim_content
["vim_tenant_name"],
178 "vim_username": vim_content
["vim_user"],
179 "vim_password": vim_content
["vim_password"],
180 "config": vim_content
["config"]
182 desc
= await RO
.attach_datacenter(RO_vim_id
, descriptor
=vim_RO
)
183 db_vim
["_admin"]["operationalState"] = "ENABLED"
184 self
.update_db("vims", vim_id
, db_vim
)
186 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id".format(RO_vim_id
))
189 except (ROclient
.ROClientException
, DbException
) as e
:
190 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
192 except Exception as e
:
193 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
197 db_vim
["_admin"]["operationalState"] = "ERROR"
198 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
199 self
.update_db("vims", vim_id
, db_vim
)
201 async def edit_vim(self
, vim_content
, order_id
):
202 vim_id
= vim_content
["_id"]
203 logging_text
= "Task edit_vim={} ".format(vim_id
)
204 self
.logger
.debug(logging_text
+ "Enter")
207 step
= "Getting vim from db"
209 db_vim
= self
.db
.get_one("vims", {"_id": vim_id
})
210 if db_vim
.get("_admin") and db_vim
["_admin"].get("deploy") and db_vim
["_admin"]["deploy"].get("RO"):
211 RO_vim_id
= db_vim
["_admin"]["deploy"]["RO"]
212 step
= "Editing vim at RO"
213 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
214 vim_RO
= deepcopy(vim_content
)
215 vim_RO
.pop("_id", None)
216 vim_RO
.pop("_admin", None)
217 vim_RO
.pop("schema_version", None)
218 vim_RO
.pop("schema_type", None)
219 vim_RO
.pop("vim_tenant_name", None)
220 vim_RO
["type"] = vim_RO
.pop("vim_type")
221 vim_RO
.pop("vim_user", None)
222 vim_RO
.pop("vim_password", None)
224 desc
= await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
226 step
= "Editing vim-account at RO tenant"
228 for k
in ("vim_tenant_name", "vim_password", "config"):
230 vim_RO
[k
] = vim_content
[k
]
231 if "vim_user" in vim_content
:
232 vim_content
["vim_username"] = vim_content
["vim_user"]
234 desc
= await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_RO
)
235 db_vim
["_admin"]["operationalState"] = "ENABLED"
236 self
.update_db("vims", vim_id
, db_vim
)
238 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id".format(RO_vim_id
))
241 except (ROclient
.ROClientException
, DbException
) as e
:
242 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
244 except Exception as e
:
245 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
249 db_vim
["_admin"]["operationalState"] = "ERROR"
250 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
251 self
.update_db("vims", vim_id
, db_vim
)
253 async def delete_vim(self
, vim_id
, order_id
):
254 logging_text
= "Task delete_vim={} ".format(vim_id
)
255 self
.logger
.debug(logging_text
+ "Enter")
258 step
= "Getting vim from db"
260 db_vim
= self
.db
.get_one("vims", {"_id": vim_id
})
261 if db_vim
.get("_admin") and db_vim
["_admin"].get("deploy") and db_vim
["_admin"]["deploy"].get("RO"):
262 RO_vim_id
= db_vim
["_admin"]["deploy"]["RO"]
263 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
264 step
= "Detaching vim from RO tenant"
266 await RO
.detach_datacenter(RO_vim_id
)
267 except ROclient
.ROClientException
as e
:
268 if e
.http_code
== 404: # not found
269 self
.logger
.debug(logging_text
+ "RO_vim_id={} already detached".format(RO_vim_id
))
273 step
= "Deleting vim from RO"
275 await RO
.delete("vim", RO_vim_id
)
276 except ROclient
.ROClientException
as e
:
277 if e
.http_code
== 404: # not found
278 self
.logger
.debug(logging_text
+ "RO_vim_id={} already deleted".format(RO_vim_id
))
283 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
284 self
.db
.del_one("vims", {"_id": vim_id
})
285 self
.logger
.debug("delete_vim task vim_id={} Exit Ok".format(vim_id
))
288 except (ROclient
.ROClientException
, DbException
) as e
:
289 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
291 except Exception as e
:
292 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
296 db_vim
["_admin"]["operationalState"] = "ERROR"
297 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
298 self
.update_db("vims", vim_id
, db_vim
)
300 async def create_sdn(self
, sdn_content
, order_id
):
301 sdn_id
= sdn_content
["_id"]
302 logging_text
= "Task create_sdn={} ".format(sdn_id
)
303 self
.logger
.debug(logging_text
+ "Enter")
307 step
= "Getting sdn from db"
308 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
309 if "_admin" not in db_sdn
:
310 db_sdn
["_admin"] = {}
311 if "deploy" not in db_sdn
["_admin"]:
312 db_sdn
["_admin"]["deploy"] = {}
313 db_sdn
["_admin"]["deploy"]["RO"] = None
315 step
= "Creating sdn at RO"
316 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
317 sdn_RO
= deepcopy(sdn_content
)
318 sdn_RO
.pop("_id", None)
319 sdn_RO
.pop("_admin", None)
320 sdn_RO
.pop("schema_version", None)
321 sdn_RO
.pop("schema_type", None)
322 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
323 RO_sdn_id
= desc
["uuid"]
324 db_sdn
["_admin"]["deploy"]["RO"] = RO_sdn_id
325 db_sdn
["_admin"]["operationalState"] = "ENABLED"
326 self
.update_db("sdns", sdn_id
, db_sdn
)
327 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id".format(RO_sdn_id
))
330 except (ROclient
.ROClientException
, DbException
) as e
:
331 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
333 except Exception as e
:
334 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
338 db_sdn
["_admin"]["operationalState"] = "ERROR"
339 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
340 self
.update_db("sdns", sdn_id
, db_sdn
)
342 async def edit_sdn(self
, sdn_content
, order_id
):
343 sdn_id
= sdn_content
["_id"]
344 logging_text
= "Task edit_sdn={} ".format(sdn_id
)
345 self
.logger
.debug(logging_text
+ "Enter")
348 step
= "Getting sdn from db"
350 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
351 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deploy") and db_sdn
["_admin"]["deploy"].get("RO"):
352 RO_sdn_id
= db_sdn
["_admin"]["deploy"]["RO"]
353 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
354 step
= "Editing sdn at RO"
355 sdn_RO
= deepcopy(sdn_content
)
356 sdn_RO
.pop("_id", None)
357 sdn_RO
.pop("_admin", None)
358 sdn_RO
.pop("schema_version", None)
359 sdn_RO
.pop("schema_type", None)
361 desc
= await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
362 db_sdn
["_admin"]["operationalState"] = "ENABLED"
363 self
.update_db("sdns", sdn_id
, db_sdn
)
365 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id".format(RO_sdn_id
))
368 except (ROclient
.ROClientException
, DbException
) as e
:
369 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
371 except Exception as e
:
372 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
376 db_sdn
["_admin"]["operationalState"] = "ERROR"
377 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
378 self
.update_db("sdns", sdn_id
, db_sdn
)
380 async def delete_sdn(self
, sdn_id
, order_id
):
381 logging_text
= "Task delete_sdn={} ".format(sdn_id
)
382 self
.logger
.debug(logging_text
+ "Enter")
385 step
= "Getting sdn from db"
387 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
388 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deploy") and db_sdn
["_admin"]["deploy"].get("RO"):
389 RO_sdn_id
= db_sdn
["_admin"]["deploy"]["RO"]
390 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
391 step
= "Deleting sdn from RO"
393 await RO
.delete("sdn", RO_sdn_id
)
394 except ROclient
.ROClientException
as e
:
395 if e
.http_code
== 404: # not found
396 self
.logger
.debug(logging_text
+ "RO_sdn_id={} already deleted".format(RO_sdn_id
))
401 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
402 self
.db
.del_one("sdns", {"_id": sdn_id
})
403 self
.logger
.debug("delete_sdn task sdn_id={} Exit Ok".format(sdn_id
))
406 except (ROclient
.ROClientException
, DbException
) as e
:
407 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
409 except Exception as e
:
410 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
414 db_sdn
["_admin"]["operationalState"] = "ERROR"
415 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
416 self
.update_db("sdns", sdn_id
, db_sdn
)
418 def vnfd2RO(self
, vnfd
, new_id
=None):
420 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
421 :param vnfd: input vnfd
422 :param new_id: overrides vnf id if provided
423 :return: copy of vnfd
427 vnfd_RO
= deepcopy(vnfd
)
428 vnfd_RO
.pop("_id", None)
429 vnfd_RO
.pop("_admin", None)
431 vnfd_RO
["id"] = new_id
432 for vdu
in vnfd_RO
["vdu"]:
433 if "cloud-init-file" in vdu
:
434 base_folder
= vnfd
["_admin"]["storage"]
435 clout_init_file
= "{}/{}/cloud_init/{}".format(
436 base_folder
["folder"],
437 base_folder
["pkg-dir"],
438 vdu
["cloud-init-file"]
440 ci_file
= self
.fs
.file_open(clout_init_file
, "r")
441 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
442 clout_init_content
= ci_file
.read()
445 vdu
.pop("cloud-init-file", None)
446 vdu
["cloud-init"] = clout_init_content
448 except FsException
as e
:
449 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd
["_id"], e
))
454 def n2vc_callback(self
, model_name
, application_name
, workload_status
, db_nsr
, vnf_member_index
, task
=None):
455 """Update the lcm database with the status of the charm.
457 Updates the VNF's operational status with the state of the charm:
458 - blocked: The unit needs manual intervention
459 - maintenance: The unit is actively deploying/configuring
460 - waiting: The unit is waiting for another charm to be ready
461 - active: The unit is deployed, configured, and ready
462 - error: The charm has failed and needs attention.
463 - terminated: The charm has been destroyed
467 Updates the network service's config-status to reflect the state of all
472 nsr_id
= db_nsr
["_id"]
473 nsr_lcm
= db_nsr
["_admin"]["deploy"]
476 self
.logger
.debug("[n2vc_callback] create_ns={} vnf_index={} task Cancelled".format(nsr_id
, vnf_member_index
))
480 exc
= task
.exception()
483 "[n2vc_callback] create_ns={} vnf_index={} task Exception={}".format(nsr_id
, vnf_member_index
, exc
))
484 nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] = "error"
485 nsr_lcm
["VCA"][vnf_member_index
]['detailed-status'] = str(exc
)
487 self
.logger
.debug("[n2vc_callback] create_ns={} vnf_index={} task Done".format(nsr_id
, vnf_member_index
))
488 # TODO it seams that task Done, but callback is still ongoing. For the moment comment this two lines
489 # nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "active"
490 # nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
491 elif workload_status
:
492 self
.logger
.debug("[n2vc_callback] create_ns={} vnf_index={} Enter workload_status={}".format(nsr_id
, vnf_member_index
, workload_status
))
493 if nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] == workload_status
:
494 return # same status, ignore
495 nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] = workload_status
496 # TODO N2VC some error message in case of error should be obtained from N2VC
497 nsr_lcm
["VCA"][vnf_member_index
]['detailed-status'] = ""
499 self
.logger
.critical("[n2vc_callback] create_ns={} vnf_index={} Enter with bad parameters".format(nsr_id
, vnf_member_index
), exc_info
=True)
505 for vnf_index
, vca_info
in nsr_lcm
["VCA"].items():
506 vca_status
= vca_info
["operational-status"]
507 if vca_status
not in status_map
:
509 status_map
[vca_status
] = 0
510 status_map
[vca_status
] += 1
512 if vca_status
!= "active":
514 if vca_status
== "error":
516 db_nsr
["config-status"] = "failed"
517 db_nsr
["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_member_index
,
518 vca_info
["detailed-status"])
522 self
.logger
.debug("[n2vc_callback] create_ns={} vnf_index={} All active".format(nsr_id
, vnf_member_index
))
523 db_nsr
["config-status"] = "configured"
524 db_nsr
["detailed-status"] = "done"
530 for status
, num
in status_map
.items():
531 cs
+= separator
+ "{}: {}".format(status
, num
)
533 db_nsr
["config-status"] = cs
534 self
.update_db("nsrs", nsr_id
, db_nsr
)
536 except Exception as e
:
537 self
.logger
.critical("[n2vc_callback] create_ns={} vnf_index={} Exception {}".format(nsr_id
, vnf_member_index
, e
), exc_info
=True)
539 async def create_ns(self
, nsr_id
, order_id
):
540 logging_text
= "Task create_ns={} ".format(nsr_id
)
541 self
.logger
.debug(logging_text
+ "Enter")
542 # get all needed from database
545 step
= "Getting nsr from db"
547 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
549 nsr_name
= db_nsr
["name"] # TODO short-name??
551 for c_vnf
in nsd
["constituent-vnfd"]:
552 vnfd_id
= c_vnf
["vnfd-id-ref"]
553 if vnfd_id
not in needed_vnfd
:
554 step
= "Getting vnfd={} from db".format(vnfd_id
)
555 needed_vnfd
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
559 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
563 db_nsr
["_admin"]["deploy"] = nsr_lcm
564 db_nsr
["detailed-status"] = "creating"
565 db_nsr
["operational-status"] = "init"
567 deloyment_timeout
= 120
569 RO
= ROclient
.ROClient(self
.loop
, datacenter
=db_nsr
["datacenter"], **self
.ro_config
)
571 # get vnfds, instantiate at RO
572 for vnfd_id
, vnfd
in needed_vnfd
.items():
573 step
= db_nsr
["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id
)
574 self
.logger
.debug(logging_text
+ step
)
575 vnfd_id_RO
= nsr_id
+ "." + vnfd_id
[:200]
578 vnfd_list
= await RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
580 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = vnfd_list
[0]["uuid"]
581 self
.logger
.debug(logging_text
+ "RO vnfd={} exist. Using RO_id={}".format(
582 vnfd_id
, vnfd_list
[0]["uuid"]))
584 vnfd_RO
= self
.vnfd2RO(vnfd
, vnfd_id_RO
)
585 desc
= await RO
.create("vnfd", descriptor
=vnfd_RO
)
586 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = desc
["uuid"]
587 self
.update_db("nsrs", nsr_id
, db_nsr
)
591 step
= db_nsr
["detailed-status"] = "Creating nsd={} at RO".format(nsd_id
)
592 self
.logger
.debug(logging_text
+ step
)
594 nsd_id_RO
= nsd_id
+ "." + nsd_id
[:200]
595 nsd_list
= await RO
.get_list("nsd", filter_by
={"osm_id": nsd_id_RO
})
597 nsr_lcm
["RO"]["nsd_id"] = nsd_list
[0]["uuid"]
598 self
.logger
.debug(logging_text
+ "RO nsd={} exist. Using RO_id={}".format(
599 nsd_id
, nsd_list
[0]["uuid"]))
601 nsd_RO
= deepcopy(nsd
)
602 nsd_RO
["id"] = nsd_id_RO
603 nsd_RO
.pop("_id", None)
604 nsd_RO
.pop("_admin", None)
605 for c_vnf
in nsd_RO
["constituent-vnfd"]:
606 vnfd_id
= c_vnf
["vnfd-id-ref"]
607 c_vnf
["vnfd-id-ref"] = nsr_id
+ "." + vnfd_id
[:200]
608 desc
= await RO
.create("nsd", descriptor
=nsd_RO
)
609 nsr_lcm
["RO"]["nsd_id"] = desc
["uuid"]
610 self
.update_db("nsrs", nsr_id
, db_nsr
)
613 # if present use it unless in error status
614 RO_nsr_id
= nsr_lcm
["RO"].get("nsr_id")
617 step
= db_nsr
["detailed-status"] = "Looking for existing ns at RO"
618 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
619 desc
= await RO
.show("ns", RO_nsr_id
)
620 except ROclient
.ROClientException
as e
:
621 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
623 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
625 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
626 nsr_lcm
["RO"]["nsr_status"] = ns_status
627 if ns_status
== "ERROR":
628 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
629 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
630 await RO
.delete("ns", RO_nsr_id
)
631 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
634 step
= db_nsr
["detailed-status"] = "Creating ns at RO"
635 self
.logger
.debug(logging_text
+ step
)
637 desc
= await RO
.create("ns", name
=db_nsr
["name"], datacenter
=db_nsr
["datacenter"],
638 scenario
=nsr_lcm
["RO"]["nsd_id"])
639 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = desc
["uuid"]
640 nsr_lcm
["RO"]["nsr_status"] = "BUILD"
641 self
.update_db("nsrs", nsr_id
, db_nsr
)
643 # wait until NS is ready
644 step
= ns_status_detailed
= "Waiting ns ready at RO"
645 db_nsr
["detailed-status"] = ns_status_detailed
646 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
647 deloyment_timeout
= 600
648 while deloyment_timeout
> 0:
649 desc
= await RO
.show("ns", RO_nsr_id
)
650 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
651 nsr_lcm
["RO"]["nsr_status"] = ns_status
652 if ns_status
== "ERROR":
653 raise ROclient
.ROClientException(ns_status_info
)
654 elif ns_status
== "BUILD":
655 db_nsr
["detailed-status"] = ns_status_detailed
+ "; {}".format(ns_status_info
)
656 self
.update_db("nsrs", nsr_id
, db_nsr
)
657 elif ns_status
== "ACTIVE":
658 nsr_lcm
["nsr_ip"] = RO
.get_ns_vnf_ip(desc
)
661 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
663 await asyncio
.sleep(5, loop
=self
.loop
)
664 deloyment_timeout
-= 5
665 if deloyment_timeout
<= 0:
666 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
667 db_nsr
["detailed-status"] = "Configuring vnfr"
668 self
.update_db("nsrs", nsr_id
, db_nsr
)
670 # The parameters we'll need to deploy a charm
671 number_to_configure
= 0
674 """An inner function to deploy the charm from either vnf or vdu
678 # if number_to_configure == 0:
679 # self.logger.debug("Logging into N2VC...")
680 # task = asyncio.ensure_future(self.n2vc.login())
681 # yield from asyncio.wait_for(task, 30.0)
682 # self.logger.debug("Logged into N2VC!")
684 ## await self.n2vc.login()
686 # Note: The charm needs to exist on disk at the location
687 # specified by charm_path.
688 base_folder
= vnfd
["_admin"]["storage"]
689 storage_params
= self
.fs
.get_params()
690 charm_path
= "{}{}/{}/charms/{}".format(
691 storage_params
["path"],
692 base_folder
["folder"],
693 base_folder
["pkg-dir"],
697 # Setup the runtime parameters for this VNF
698 params
['rw_mgmt_ip'] = nsr_lcm
['nsr_ip'][vnf_index
]
700 # ns_name will be ignored in the current version of N2VC
701 # but will be implemented for the next point release.
702 model_name
= 'default'
703 application_name
= self
.n2vc
.FormatApplicationName(
709 nsr_lcm
["VCA"][vnf_index
] = {
711 "application": application_name
,
712 "operational-status": "init",
713 "detailed-status": "",
717 self
.logger
.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id
, charm_path
, proxy_charm
))
718 task
= asyncio
.ensure_future(
719 self
.n2vc
.DeployCharms(
720 model_name
, # The network service name
721 application_name
, # The application name
722 vnfd
, # The vnf descriptor
723 charm_path
, # Path to charm
724 params
, # Runtime params, like mgmt ip
725 {}, # for native charms only
726 self
.n2vc_callback
, # Callback for status changes
727 db_nsr
, # Callback parameter
728 vnf_index
, # Callback parameter
729 None, # Callback parameter (task)
732 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, model_name
, application_name
, None, db_nsr
, vnf_index
))
733 self
.lcm_ns_tasks
[nsr_id
][order_id
]["create_charm:" + vnf_index
] = task
735 # TODO: Make this call inside deploy()
736 # Login to the VCA. If there are multiple calls to login(),
737 # subsequent calls will be a nop and return immediately.
738 await self
.n2vc
.login()
740 step
= "Looking for needed vnfd to configure"
741 self
.logger
.debug(logging_text
+ step
)
742 for c_vnf
in nsd
["constituent-vnfd"]:
743 vnfd_id
= c_vnf
["vnfd-id-ref"]
744 vnf_index
= str(c_vnf
["member-vnf-index"])
745 vnfd
= needed_vnfd
[vnfd_id
]
747 # Deploy charms for each VDU that supports one.
748 for vdu
in vnfd
['vdu']:
749 vdu_config
= vdu
.get('vdu-configuration')
753 if vdu_config
and vdu_config
.get("juju"):
754 proxy_charm
= vdu_config
["juju"]["charm"]
756 if 'initial-config-primitive' in vdu_config
:
757 params
['initial-config-primitive'] = vdu_config
['initial-config-primitive']
760 # If a VDU doesn't declare it's own charm, check
761 # if the VNF does and deploy that instead.
763 # Check if this VNF has a charm configuration
764 vnf_config
= vnfd
.get("vnf-configuration")
766 if vnf_config
and vnf_config
.get("juju"):
767 proxy_charm
= vnf_config
["juju"]["charm"]
769 if 'initial-config-primitive' in vnf_config
:
770 params
['initial-config-primitive'] = vnf_config
['initial-config-primitive']
774 number_to_configure
+= 1
776 db_nsr
["config-status"] = "configuring" if number_to_configure
else "configured"
777 db_nsr
["detailed-status"] = "configuring: init: {}".format(number_to_configure
) if number_to_configure
else "done"
778 db_nsr
["operational-status"] = "running"
779 self
.update_db("nsrs", nsr_id
, db_nsr
)
781 self
.logger
.debug("Task create_ns={} Exit Ok".format(nsr_id
))
784 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
785 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
787 except Exception as e
:
788 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
792 db_nsr
["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
793 db_nsr
["operational-status"] = "failed"
794 self
.update_db("nsrs", nsr_id
, db_nsr
)
796 async def delete_ns(self
, nsr_id
, order_id
):
797 logging_text
= "Task delete_ns={} ".format(nsr_id
)
798 self
.logger
.debug(logging_text
+ "Enter")
801 step
= "Getting nsr from db"
803 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
805 nsr_lcm
= db_nsr
["_admin"]["deploy"]
807 db_nsr
["operational-status"] = "terminating"
808 db_nsr
["config-status"] = "terminating"
809 db_nsr
["detailed-status"] = "Deleting charms"
810 self
.update_db("nsrs", nsr_id
, db_nsr
)
813 self
.logger
.debug(logging_text
+ step
)
814 for vnf_index
, deploy_info
in nsr_lcm
["VCA"].items():
815 if deploy_info
and deploy_info
.get("application"):
816 # n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
818 # self.n2vc.RemoveCharms(model_name, application_name, self.n2vc_callback, model_name, application_name)
819 task
= asyncio
.ensure_future(
820 self
.n2vc
.RemoveCharms(
821 deploy_info
['model'],
822 deploy_info
['application'],
828 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
829 # deploy_info['application'],None, db_nsr, vnf_index))
830 self
.lcm_ns_tasks
[nsr_id
][order_id
]["delete_charm:" + vnf_index
] = task
831 except Exception as e
:
832 self
.logger
.debug(logging_text
+ "Failed while deleting charms: {}".format(e
))
835 RO
= ROclient
.ROClient(self
.loop
, datacenter
=db_nsr
["datacenter"], **self
.ro_config
)
837 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"]
840 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
841 self
.logger
.debug(logging_text
+ step
)
842 desc
= await RO
.delete("ns", RO_nsr_id
)
843 nsr_lcm
["RO"]["nsr_id"] = None
844 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
845 except ROclient
.ROClientException
as e
:
846 if e
.http_code
== 404: # not found
847 nsr_lcm
["RO"]["nsr_id"] = None
848 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
849 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(RO_nsr_id
))
850 elif e
.http_code
== 409: #conflict
851 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(RO_nsr_id
, e
))
853 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(RO_nsr_id
, e
))
854 self
.update_db("nsrs", nsr_id
, db_nsr
)
857 RO_nsd_id
= nsr_lcm
["RO"]["nsd_id"]
860 step
= db_nsr
["detailed-status"] = "Deleting nsd at RO"
861 desc
= await RO
.delete("nsd", RO_nsd_id
)
862 self
.logger
.debug(logging_text
+ "RO_nsd_id={} deleted".format(RO_nsd_id
))
863 nsr_lcm
["RO"]["nsd_id"] = None
864 except ROclient
.ROClientException
as e
:
865 if e
.http_code
== 404: # not found
866 nsr_lcm
["RO"]["nsd_id"] = None
867 self
.logger
.debug(logging_text
+ "RO_nsd_id={} already deleted".format(RO_nsd_id
))
868 elif e
.http_code
== 409: #conflict
869 self
.logger
.debug(logging_text
+ "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id
, e
))
871 self
.logger
.error(logging_text
+ "RO_nsd_id={} delete error: {}".format(RO_nsd_id
, e
))
872 self
.update_db("nsrs", nsr_id
, db_nsr
)
874 for vnf_id
, RO_vnfd_id
in nsr_lcm
["RO"]["vnfd_id"].items():
878 step
= db_nsr
["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id
)
879 desc
= await RO
.delete("vnfd", RO_vnfd_id
)
880 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} deleted".format(RO_vnfd_id
))
881 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
882 except ROclient
.ROClientException
as e
:
883 if e
.http_code
== 404: # not found
884 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
885 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} already deleted ".format(RO_vnfd_id
))
886 elif e
.http_code
== 409: #conflict
887 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id
, e
))
889 self
.logger
.error(logging_text
+ "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id
, e
))
890 self
.update_db("nsrs", nsr_id
, db_nsr
)
892 # TODO delete from database or mark as deleted???
893 db_nsr
["operational-status"] = "terminated"
894 self
.db
.del_one("nsrs", {"_id": nsr_id
})
895 self
.logger
.debug(logging_text
+ "Exit")
897 except (ROclient
.ROClientException
, DbException
) as e
:
898 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
900 except Exception as e
:
901 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
905 db_nsr
["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
906 db_nsr
["operational-status"] = "failed"
907 self
.update_db("nsrs", nsr_id
, db_nsr
)
909 async def test(self
, param
=None):
910 self
.logger
.debug("Starting/Ending test task: {}".format(param
))
912 def cancel_tasks(self
, topic
, _id
):
914 Cancel all active tasks of a concrete nsr or vim identified for _id
915 :param topic: can be ns or vim_account
916 :param _id: nsr or vim identity
917 :return: None, or raises an exception if not possible
920 lcm_tasks
= self
.lcm_ns_tasks
921 elif topic
== "vim_account":
922 lcm_tasks
= self
.lcm_vim_tasks
924 lcm_tasks
= self
.lcm_sdn_tasks
926 if not lcm_tasks
.get(_id
):
928 for order_id
, tasks_set
in lcm_tasks
[_id
].items():
929 for task_name
, task
in tasks_set
.items():
930 result
= task
.cancel()
932 self
.logger
.debug("{} _id={} order_id={} task={} cancelled".format(topic
, _id
, order_id
, task_name
))
935 async def read_kafka(self
):
936 self
.logger
.debug("Task Kafka Enter")
938 # future = asyncio.Future()
939 consecutive_errors
= 0
940 while consecutive_errors
< 10:
942 topic
, command
, params
= await self
.msg
.aioread(("ns", "vim_account", "sdn"), self
.loop
)
943 self
.logger
.debug("Task Kafka receives {} {}: {}".format(topic
, command
, params
))
944 consecutive_errors
= 0
946 if command
== "exit":
949 elif command
.startswith("#"):
951 elif command
== "echo":
956 elif command
== "test":
957 asyncio
.Task(self
.test(params
), loop
=self
.loop
)
961 nsr_id
= params
.strip()
962 if command
== "create":
963 # self.logger.debug("Deploying NS {}".format(nsr_id))
964 task
= asyncio
.ensure_future(self
.create_ns(nsr_id
, order_id
))
965 if nsr_id
not in self
.lcm_ns_tasks
:
966 self
.lcm_ns_tasks
[nsr_id
] = {}
967 self
.lcm_ns_tasks
[nsr_id
][order_id
] = {"create_ns": task
}
969 elif command
== "delete":
970 # self.logger.debug("Deleting NS {}".format(nsr_id))
971 self
.cancel_tasks(topic
, nsr_id
)
972 task
= asyncio
.ensure_future(self
.delete_ns(nsr_id
, order_id
))
973 if nsr_id
not in self
.lcm_ns_tasks
:
974 self
.lcm_ns_tasks
[nsr_id
] = {}
975 self
.lcm_ns_tasks
[nsr_id
][order_id
] = {"delete_ns": task
}
977 elif command
== "show":
979 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
981 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
982 "{}\n deploy: {}\n tasks: {}".format(
983 nsr_id
, db_nsr
["operational-status"],
984 db_nsr
["config-status"], db_nsr
["detailed-status"],
985 db_nsr
["_admin"]["deploy"], self
.lcm_ns_tasks
.get(nsr_id
)))
986 except Exception as e
:
987 print("nsr {} not found: {}".format(nsr_id
, e
))
990 elif topic
== "vim_account":
991 vim_id
= params
["_id"]
992 if command
== "create":
993 task
= asyncio
.ensure_future(self
.create_vim(params
, order_id
))
994 if vim_id
not in self
.lcm_vim_tasks
:
995 self
.lcm_vim_tasks
[vim_id
] = {}
996 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"create_vim": task
}
998 elif command
== "delete":
999 self
.cancel_tasks(topic
, vim_id
)
1000 task
= asyncio
.ensure_future(self
.delete_vim(vim_id
, order_id
))
1001 if vim_id
not in self
.lcm_vim_tasks
:
1002 self
.lcm_vim_tasks
[vim_id
] = {}
1003 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"delete_vim": task
}
1005 elif command
== "show":
1006 print("not implemented show with vim_account")
1009 elif command
== "edit":
1010 task
= asyncio
.ensure_future(self
.edit_vim(vim_id
, order_id
))
1011 if vim_id
not in self
.lcm_vim_tasks
:
1012 self
.lcm_vim_tasks
[vim_id
] = {}
1013 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"edit_vim": task
}
1015 elif topic
== "sdn":
1016 _sdn_id
= params
["_id"]
1017 if command
== "create":
1018 task
= asyncio
.ensure_future(self
.create_sdn(params
, order_id
))
1019 if _sdn_id
not in self
.lcm_sdn_tasks
:
1020 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1021 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"create_sdn": task
}
1023 elif command
== "delete":
1024 self
.cancel_tasks(topic
, _sdn_id
)
1025 task
= asyncio
.ensure_future(self
.delete_sdn(_sdn_id
, order_id
))
1026 if _sdn_id
not in self
.lcm_sdn_tasks
:
1027 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1028 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"delete_sdn": task
}
1030 elif command
== "edit":
1031 task
= asyncio
.ensure_future(self
.edit_sdn(_sdn_id
, order_id
))
1032 if _sdn_id
not in self
.lcm_sdn_tasks
:
1033 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1034 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"edit_sdn": task
}
1036 self
.logger
.critical("unknown topic {} and command '{}'".format(topic
, command
))
1037 except Exception as e
:
1038 if consecutive_errors
== 5:
1039 self
.logger
.error("Task Kafka task exit error too many errors. Exception: {}".format(e
))
1042 consecutive_errors
+= 1
1043 self
.logger
.error("Task Kafka Exception {}".format(e
))
1044 await asyncio
.sleep(1, loop
=self
.loop
)
1045 self
.logger
.debug("Task Kafka terminating")
1047 # self.cancel_tasks("ALL", "create")
1049 # while self.is_pending_tasks():
1050 # self.logger.debug("Task Kafka terminating. Waiting for tasks termination")
1051 # await asyncio.sleep(2, loop=self.loop)
1054 # self.cancel_tasks("ALL", "ALL")
1055 self
.logger
.debug("Task Kafka exit")
1058 self
.loop
= asyncio
.get_event_loop()
1059 self
.loop
.run_until_complete(self
.read_kafka())
1063 self
.db
.db_disconnect()
1065 self
.msg
.disconnect()
1067 self
.fs
.fs_disconnect()
1070 def read_config_file(self
, config_file
):
1071 # TODO make a [ini] + yaml inside parser
1072 # the configparser library is not suitable, because it does not admit comments at the end of line,
1073 # and not parse integer or boolean
1075 with
open(config_file
) as f
:
1077 for k
, v
in environ
.items():
1078 if not k
.startswith("OSMLCM_"):
1080 k_items
= k
.lower().split("_")
1083 for k_item
in k_items
[1:-1]:
1084 if k_item
in ("ro", "vca"):
1085 # put in capital letter
1086 k_item
= k_item
.upper()
1088 if k_items
[-1] == "port":
1089 c
[k_items
[-1]] = int(v
)
1092 except Exception as e
:
1093 self
.logger
.warn("skipping environ '{}' on exception '{}'".format(k
, e
))
1096 except Exception as e
:
1097 self
.logger
.critical("At config file '{}': {}".format(config_file
, e
))
1101 if __name__
== '__main__':
1103 config_file
= "lcm.cfg"
1104 lcm
= Lcm(config_file
)