aa24caa1ecec27d77ef36c20525be0728b50f895
2 # -*- coding: utf-8 -*-
15 from dbbase
import DbException
16 from fsbase
import FsException
17 from msgbase
import MsgException
18 from os
import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc
.vnf
import N2VC
24 from copy
import deepcopy
25 from http
import HTTPStatus
28 class LcmException(Exception):
34 def __init__(self
, config_file
):
36 Init, Connect to database, filesystem storage, and messaging
37 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
44 # contains created tasks/futures to be able to cancel
46 self
.lcm_ns_tasks
= {}
47 self
.lcm_vim_tasks
= {}
48 self
.lcm_sdn_tasks
= {}
50 self
.logger
= logging
.getLogger('lcm')
52 config
= self
.read_config_file(config_file
)
55 "endpoint_url": "http://{}:{}/openmano".format(config
["RO"]["host"], config
["RO"]["port"]),
56 "tenant": config
.get("tenant", "osm"),
57 "logger_name": "lcm.ROclient",
61 self
.vca
= config
["VCA"] # TODO VCA
65 log_format_simple
= "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
66 log_formatter_simple
= logging
.Formatter(log_format_simple
, datefmt
='%Y-%m-%dT%H:%M:%S')
67 config
["database"]["logger_name"] = "lcm.db"
68 config
["storage"]["logger_name"] = "lcm.fs"
69 config
["message"]["logger_name"] = "lcm.msg"
70 if "logfile" in config
["global"]:
71 file_handler
= logging
.handlers
.RotatingFileHandler(config
["global"]["logfile"],
72 maxBytes
=100e6
, backupCount
=9, delay
=0)
73 file_handler
.setFormatter(log_formatter_simple
)
74 self
.logger
.addHandler(file_handler
)
76 str_handler
= logging
.StreamHandler()
77 str_handler
.setFormatter(log_formatter_simple
)
78 self
.logger
.addHandler(str_handler
)
80 if config
["global"].get("loglevel"):
81 self
.logger
.setLevel(config
["global"]["loglevel"])
83 # logging other modules
84 for k1
, logname
in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
85 config
[k1
]["logger_name"] = logname
86 logger_module
= logging
.getLogger(logname
)
87 if "logfile" in config
[k1
]:
88 file_handler
= logging
.handlers
.RotatingFileHandler(config
[k1
]["logfile"],
89 maxBytes
=100e6
, backupCount
=9, delay
=0)
90 file_handler
.setFormatter(log_formatter_simple
)
91 logger_module
.addHandler(file_handler
)
92 if "loglevel" in config
[k1
]:
93 logger_module
.setLevel(config
[k1
]["loglevel"])
97 server
=config
['VCA']['host'],
98 port
=config
['VCA']['port'],
99 user
=config
['VCA']['user'],
100 secret
=config
['VCA']['secret'],
101 # TODO: This should point to the base folder where charms are stored,
102 # if there is a common one (like object storage). Otherwise, leave
103 # it unset and pass it via DeployCharms
104 # artifacts=config['VCA'][''],
109 if config
["database"]["driver"] == "mongo":
110 self
.db
= dbmongo
.DbMongo()
111 self
.db
.db_connect(config
["database"])
112 elif config
["database"]["driver"] == "memory":
113 self
.db
= dbmemory
.DbMemory()
114 self
.db
.db_connect(config
["database"])
116 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
117 config
["database"]["driver"]))
119 if config
["storage"]["driver"] == "local":
120 self
.fs
= fslocal
.FsLocal()
121 self
.fs
.fs_connect(config
["storage"])
123 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
124 config
["storage"]["driver"]))
126 if config
["message"]["driver"] == "local":
127 self
.msg
= msglocal
.MsgLocal()
128 self
.msg
.connect(config
["message"])
129 elif config
["message"]["driver"] == "kafka":
130 self
.msg
= msgkafka
.MsgKafka()
131 self
.msg
.connect(config
["message"])
133 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
134 config
["storage"]["driver"]))
135 except (DbException
, FsException
, MsgException
) as e
:
136 self
.logger
.critical(str(e
), exc_info
=True)
137 raise LcmException(str(e
))
139 def update_db(self
, item
, _id
, _desc
):
141 self
.db
.replace(item
, _id
, _desc
)
142 except DbException
as e
:
143 self
.logger
.error("Updating {} _id={}: {}".format(item
, _id
, e
))
145 async def create_vim(self
, vim_content
, order_id
):
146 vim_id
= vim_content
["_id"]
147 logging_text
= "Task create_vim={} ".format(vim_id
)
148 self
.logger
.debug(logging_text
+ "Enter")
152 step
= "Getting vim from db"
153 db_vim
= self
.db
.get_one("vims", {"_id": vim_id
})
154 if "_admin" not in db_vim
:
155 db_vim
["_admin"] = {}
156 if "deploy" not in db_vim
["_admin"]:
157 db_vim
["_admin"]["deploy"] = {}
158 db_vim
["_admin"]["deploy"]["RO"] = None
160 step
= "Creating vim at RO"
161 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
162 vim_RO
= deepcopy(vim_content
)
163 vim_RO
.pop("_id", None)
164 vim_RO
.pop("_admin", None)
165 vim_RO
.pop("schema_version", None)
166 vim_RO
.pop("schema_type", None)
167 vim_RO
.pop("vim_tenant_name", None)
168 vim_RO
["type"] = vim_RO
.pop("vim_type")
169 vim_RO
.pop("vim_user", None)
170 vim_RO
.pop("vim_password", None)
171 desc
= await RO
.create("vim", descriptor
=vim_RO
)
172 RO_vim_id
= desc
["uuid"]
173 db_vim
["_admin"]["deploy"]["RO"] = RO_vim_id
174 self
.update_db("vims", vim_id
, db_vim
)
176 step
= "Attach vim to RO tenant"
177 vim_RO
= {"vim_tenant_name": vim_content
["vim_tenant_name"],
178 "vim_username": vim_content
["vim_user"],
179 "vim_password": vim_content
["vim_password"],
180 "config": vim_content
["config"]
182 desc
= await RO
.attach_datacenter(RO_vim_id
, descriptor
=vim_RO
)
183 db_vim
["_admin"]["operationalState"] = "ENABLED"
184 self
.update_db("vims", vim_id
, db_vim
)
186 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id".format(RO_vim_id
))
189 except (ROclient
.ROClientException
, DbException
) as e
:
190 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
192 except Exception as e
:
193 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
197 db_vim
["_admin"]["operationalState"] = "ERROR"
198 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
199 self
.update_db("vims", vim_id
, db_vim
)
201 async def edit_vim(self
, vim_content
, order_id
):
202 vim_id
= vim_content
["_id"]
203 logging_text
= "Task edit_vim={} ".format(vim_id
)
204 self
.logger
.debug(logging_text
+ "Enter")
207 step
= "Getting vim from db"
209 db_vim
= self
.db
.get_one("vims", {"_id": vim_id
})
210 if db_vim
.get("_admin") and db_vim
["_admin"].get("deploy") and db_vim
["_admin"]["deploy"].get("RO"):
211 RO_vim_id
= db_vim
["_admin"]["deploy"]["RO"]
212 step
= "Editing vim at RO"
213 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
214 vim_RO
= deepcopy(vim_content
)
215 vim_RO
.pop("_id", None)
216 vim_RO
.pop("_admin", None)
217 vim_RO
.pop("schema_version", None)
218 vim_RO
.pop("schema_type", None)
219 vim_RO
.pop("vim_tenant_name", None)
220 vim_RO
["type"] = vim_RO
.pop("vim_type")
221 vim_RO
.pop("vim_user", None)
222 vim_RO
.pop("vim_password", None)
224 desc
= await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
226 step
= "Editing vim-account at RO tenant"
228 for k
in ("vim_tenant_name", "vim_password", "config"):
230 vim_RO
[k
] = vim_content
[k
]
231 if "vim_user" in vim_content
:
232 vim_content
["vim_username"] = vim_content
["vim_user"]
234 desc
= await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_RO
)
235 db_vim
["_admin"]["operationalState"] = "ENABLED"
236 self
.update_db("vims", vim_id
, db_vim
)
238 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id".format(RO_vim_id
))
241 except (ROclient
.ROClientException
, DbException
) as e
:
242 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
244 except Exception as e
:
245 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
249 db_vim
["_admin"]["operationalState"] = "ERROR"
250 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
251 self
.update_db("vims", vim_id
, db_vim
)
253 async def delete_vim(self
, vim_id
, order_id
):
254 logging_text
= "Task delete_vim={} ".format(vim_id
)
255 self
.logger
.debug(logging_text
+ "Enter")
258 step
= "Getting vim from db"
260 db_vim
= self
.db
.get_one("vims", {"_id": vim_id
})
261 if db_vim
.get("_admin") and db_vim
["_admin"].get("deploy") and db_vim
["_admin"]["deploy"].get("RO"):
262 RO_vim_id
= db_vim
["_admin"]["deploy"]["RO"]
263 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
264 step
= "Detaching vim from RO tenant"
266 await RO
.detach_datacenter(RO_vim_id
)
267 except ROclient
.ROClientException
as e
:
268 if e
.http_code
== 404: # not found
269 self
.logger
.debug(logging_text
+ "RO_vim_id={} already detached".format(RO_vim_id
))
273 step
= "Deleting vim from RO"
275 await RO
.delete("vim", RO_vim_id
)
276 except ROclient
.ROClientException
as e
:
277 if e
.http_code
== 404: # not found
278 self
.logger
.debug(logging_text
+ "RO_vim_id={} already deleted".format(RO_vim_id
))
283 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
284 self
.db
.del_one("vims", {"_id": vim_id
})
285 self
.logger
.debug("delete_vim task vim_id={} Exit Ok".format(vim_id
))
288 except (ROclient
.ROClientException
, DbException
) as e
:
289 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
291 except Exception as e
:
292 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
296 db_vim
["_admin"]["operationalState"] = "ERROR"
297 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
298 self
.update_db("vims", vim_id
, db_vim
)
300 async def create_sdn(self
, sdn_content
, order_id
):
301 sdn_id
= sdn_content
["_id"]
302 logging_text
= "Task create_sdn={} ".format(sdn_id
)
303 self
.logger
.debug(logging_text
+ "Enter")
307 step
= "Getting sdn from db"
308 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
309 if "_admin" not in db_sdn
:
310 db_sdn
["_admin"] = {}
311 if "deploy" not in db_sdn
["_admin"]:
312 db_sdn
["_admin"]["deploy"] = {}
313 db_sdn
["_admin"]["deploy"]["RO"] = None
315 step
= "Creating sdn at RO"
316 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
317 sdn_RO
= deepcopy(sdn_content
)
318 sdn_RO
.pop("_id", None)
319 sdn_RO
.pop("_admin", None)
320 sdn_RO
.pop("schema_version", None)
321 sdn_RO
.pop("schema_type", None)
322 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
323 RO_sdn_id
= desc
["uuid"]
324 db_sdn
["_admin"]["deploy"]["RO"] = RO_sdn_id
325 db_sdn
["_admin"]["operationalState"] = "ENABLED"
326 self
.update_db("sdns", sdn_id
, db_sdn
)
327 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id".format(RO_sdn_id
))
330 except (ROclient
.ROClientException
, DbException
) as e
:
331 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
333 except Exception as e
:
334 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
338 db_sdn
["_admin"]["operationalState"] = "ERROR"
339 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
340 self
.update_db("sdns", sdn_id
, db_sdn
)
342 async def edit_sdn(self
, sdn_content
, order_id
):
343 sdn_id
= sdn_content
["_id"]
344 logging_text
= "Task edit_sdn={} ".format(sdn_id
)
345 self
.logger
.debug(logging_text
+ "Enter")
348 step
= "Getting sdn from db"
350 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
351 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deploy") and db_sdn
["_admin"]["deploy"].get("RO"):
352 RO_sdn_id
= db_sdn
["_admin"]["deploy"]["RO"]
353 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
354 step
= "Editing sdn at RO"
355 sdn_RO
= deepcopy(sdn_content
)
356 sdn_RO
.pop("_id", None)
357 sdn_RO
.pop("_admin", None)
358 sdn_RO
.pop("schema_version", None)
359 sdn_RO
.pop("schema_type", None)
361 desc
= await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
362 db_sdn
["_admin"]["operationalState"] = "ENABLED"
363 self
.update_db("sdns", sdn_id
, db_sdn
)
365 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id".format(RO_sdn_id
))
368 except (ROclient
.ROClientException
, DbException
) as e
:
369 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
371 except Exception as e
:
372 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
376 db_sdn
["_admin"]["operationalState"] = "ERROR"
377 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
378 self
.update_db("sdns", sdn_id
, db_sdn
)
380 async def delete_sdn(self
, sdn_id
, order_id
):
381 logging_text
= "Task delete_sdn={} ".format(sdn_id
)
382 self
.logger
.debug(logging_text
+ "Enter")
385 step
= "Getting sdn from db"
387 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
388 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deploy") and db_sdn
["_admin"]["deploy"].get("RO"):
389 RO_sdn_id
= db_sdn
["_admin"]["deploy"]["RO"]
390 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
391 step
= "Deleting sdn from RO"
393 await RO
.delete("sdn", RO_sdn_id
)
394 except ROclient
.ROClientException
as e
:
395 if e
.http_code
== 404: # not found
396 self
.logger
.debug(logging_text
+ "RO_sdn_id={} already deleted".format(RO_sdn_id
))
401 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
402 self
.db
.del_one("sdns", {"_id": sdn_id
})
403 self
.logger
.debug("delete_sdn task sdn_id={} Exit Ok".format(sdn_id
))
406 except (ROclient
.ROClientException
, DbException
) as e
:
407 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
409 except Exception as e
:
410 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
414 db_sdn
["_admin"]["operationalState"] = "ERROR"
415 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
416 self
.update_db("sdns", sdn_id
, db_sdn
)
418 def vnfd2RO(self
, vnfd
, new_id
=None):
420 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
421 :param vnfd: input vnfd
422 :param new_id: overrides vnf id if provided
423 :return: copy of vnfd
427 vnfd_RO
= deepcopy(vnfd
)
428 vnfd_RO
.pop("_id", None)
429 vnfd_RO
.pop("_admin", None)
431 vnfd_RO
["id"] = new_id
432 for vdu
in vnfd_RO
["vdu"]:
433 if "cloud-init-file" in vdu
:
434 base_folder
= vnfd
["_admin"]["storage"]
435 clout_init_file
= "{}/{}/cloud_init/{}".format(
436 base_folder
["folder"],
437 base_folder
["pkg-dir"],
438 vdu
["cloud-init-file"]
440 ci_file
= self
.fs
.file_open(clout_init_file
, "r")
441 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
442 clout_init_content
= ci_file
.read()
445 vdu
.pop("cloud-init-file", None)
446 vdu
["cloud-init"] = clout_init_content
448 except FsException
as e
:
449 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd
["_id"], e
))
454 def n2vc_callback(self
, model_name
, application_name
, workload_status
, db_nsr
, vnf_member_index
, task
=None):
455 """Update the lcm database with the status of the charm.
457 Updates the VNF's operational status with the state of the charm:
458 - blocked: The unit needs manual intervention
459 - maintenance: The unit is actively deploying/configuring
460 - waiting: The unit is waiting for another charm to be ready
461 - active: The unit is deployed, configured, and ready
462 - error: The charm has failed and needs attention.
463 - terminated: The charm has been destroyed
467 Updates the network service's config-status to reflect the state of all
472 nsr_id
= db_nsr
["_id"]
473 nsr_lcm
= db_nsr
["_admin"]["deploy"]
476 self
.logger
.debug("[n2vc_callback] create_ns={} vnf_index={} task Cancelled".format(nsr_id
, vnf_member_index
))
480 exc
= task
.exception()
483 "[n2vc_callback] create_ns={} vnf_index={} task Exception={}".format(nsr_id
, vnf_member_index
, exc
))
484 nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] = "error"
485 nsr_lcm
["VCA"][vnf_member_index
]['detailed-status'] = str(exc
)
487 self
.logger
.debug("[n2vc_callback] create_ns={} vnf_index={} task Done".format(nsr_id
, vnf_member_index
))
488 # TODO it seams that task Done, but callback is still ongoing. For the moment comment this two lines
489 # nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "active"
490 # nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
491 elif workload_status
:
492 self
.logger
.debug("[n2vc_callback] create_ns={} vnf_index={} Enter workload_status={}".format(nsr_id
, vnf_member_index
, workload_status
))
493 if nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] == workload_status
:
494 return # same status, ignore
495 nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] = workload_status
496 # TODO N2VC some error message in case of error should be obtained from N2VC
497 nsr_lcm
["VCA"][vnf_member_index
]['detailed-status'] = ""
499 self
.logger
.critical("[n2vc_callback] create_ns={} vnf_index={} Enter with bad parameters".format(nsr_id
, vnf_member_index
), exc_info
=True)
505 for vnf_index
, vca_info
in nsr_lcm
["VCA"].items():
506 vca_status
= vca_info
["operational-status"]
507 if vca_status
not in status_map
:
509 status_map
[vca_status
] = 0
510 status_map
[vca_status
] += 1
512 if vca_status
!= "active":
514 if vca_status
== "error":
516 db_nsr
["config-status"] = "failed"
517 db_nsr
["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_member_index
,
518 vca_info
["detailed-status"])
522 self
.logger
.debug("[n2vc_callback] create_ns={} vnf_index={} All active".format(nsr_id
, vnf_member_index
))
523 db_nsr
["config-status"] = "configured"
524 db_nsr
["detailed-status"] = "done"
530 for status
, num
in status_map
.items():
531 cs
+= separator
+ "{}: {}".format(status
, num
)
533 db_nsr
["config-status"] = cs
534 self
.update_db("nsrs", nsr_id
, db_nsr
)
536 except Exception as e
:
537 self
.logger
.critical("[n2vc_callback] create_ns={} vnf_index={} Exception {}".format(nsr_id
, vnf_member_index
, e
), exc_info
=True)
539 async def create_ns(self
, nsr_id
, order_id
):
540 logging_text
= "Task create_ns={} ".format(nsr_id
)
541 self
.logger
.debug(logging_text
+ "Enter")
542 # get all needed from database
545 step
= "Getting nsr from db"
547 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
549 nsr_name
= db_nsr
["name"] # TODO short-name??
551 for c_vnf
in nsd
["constituent-vnfd"]:
552 vnfd_id
= c_vnf
["vnfd-id-ref"]
553 if vnfd_id
not in needed_vnfd
:
554 step
= "Getting vnfd={} from db".format(vnfd_id
)
555 needed_vnfd
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
559 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
563 db_nsr
["_admin"]["deploy"] = nsr_lcm
564 db_nsr
["detailed-status"] = "creating"
565 db_nsr
["operational-status"] = "init"
567 deloyment_timeout
= 120
569 RO
= ROclient
.ROClient(self
.loop
, datacenter
=db_nsr
["datacenter"], **self
.ro_config
)
571 # get vnfds, instantiate at RO
572 for vnfd_id
, vnfd
in needed_vnfd
.items():
573 step
= db_nsr
["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id
)
574 self
.logger
.debug(logging_text
+ step
)
575 vnfd_id_RO
= nsr_id
+ "." + vnfd_id
[:200]
578 vnfd_list
= await RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
580 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = vnfd_list
[0]["uuid"]
581 self
.logger
.debug(logging_text
+ "RO vnfd={} exist. Using RO_id={}".format(
582 vnfd_id
, vnfd_list
[0]["uuid"]))
584 vnfd_RO
= self
.vnfd2RO(vnfd
, vnfd_id_RO
)
585 desc
= await RO
.create("vnfd", descriptor
=vnfd_RO
)
586 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = desc
["uuid"]
587 self
.update_db("nsrs", nsr_id
, db_nsr
)
591 step
= db_nsr
["detailed-status"] = "Creating nsd={} at RO".format(nsd_id
)
592 self
.logger
.debug(logging_text
+ step
)
594 nsd_id_RO
= nsd_id
+ "." + nsd_id
[:200]
595 nsd_list
= await RO
.get_list("nsd", filter_by
={"osm_id": nsd_id_RO
})
597 nsr_lcm
["RO"]["nsd_id"] = nsd_list
[0]["uuid"]
598 self
.logger
.debug(logging_text
+ "RO nsd={} exist. Using RO_id={}".format(
599 nsd_id
, nsd_list
[0]["uuid"]))
601 nsd_RO
= deepcopy(nsd
)
602 nsd_RO
["id"] = nsd_id_RO
603 nsd_RO
.pop("_id", None)
604 nsd_RO
.pop("_admin", None)
605 for c_vnf
in nsd_RO
["constituent-vnfd"]:
606 vnfd_id
= c_vnf
["vnfd-id-ref"]
607 c_vnf
["vnfd-id-ref"] = nsr_id
+ "." + vnfd_id
[:200]
608 desc
= await RO
.create("nsd", descriptor
=nsd_RO
)
609 nsr_lcm
["RO"]["nsd_id"] = desc
["uuid"]
610 self
.update_db("nsrs", nsr_id
, db_nsr
)
613 # if present use it unless in error status
614 RO_nsr_id
= nsr_lcm
["RO"].get("nsr_id")
617 step
= db_nsr
["detailed-status"] = "Looking for existing ns at RO"
618 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
619 desc
= await RO
.show("ns", RO_nsr_id
)
620 except ROclient
.ROClientException
as e
:
621 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
623 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
625 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
626 nsr_lcm
["RO"]["nsr_status"] = ns_status
627 if ns_status
== "ERROR":
628 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
629 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
630 await RO
.delete("ns", RO_nsr_id
)
631 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
634 step
= db_nsr
["detailed-status"] = "Creating ns at RO"
635 self
.logger
.debug(logging_text
+ step
)
637 desc
= await RO
.create("ns", name
=db_nsr
["name"], datacenter
=db_nsr
["datacenter"],
638 scenario
=nsr_lcm
["RO"]["nsd_id"])
639 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = desc
["uuid"]
640 nsr_lcm
["RO"]["nsr_status"] = "BUILD"
641 self
.update_db("nsrs", nsr_id
, db_nsr
)
643 # wait until NS is ready
644 step
= ns_status_detailed
= "Waiting ns ready at RO"
645 db_nsr
["detailed-status"] = ns_status_detailed
646 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
647 deloyment_timeout
= 600
648 while deloyment_timeout
> 0:
649 desc
= await RO
.show("ns", RO_nsr_id
)
650 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
651 nsr_lcm
["RO"]["nsr_status"] = ns_status
652 if ns_status
== "ERROR":
653 raise ROclient
.ROClientException(ns_status_info
)
654 elif ns_status
== "BUILD":
655 db_nsr
["detailed-status"] = ns_status_detailed
+ "; {}".format(ns_status_info
)
656 self
.update_db("nsrs", nsr_id
, db_nsr
)
657 elif ns_status
== "ACTIVE":
658 nsr_lcm
["nsr_ip"] = RO
.get_ns_vnf_ip(desc
)
661 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
663 await asyncio
.sleep(5, loop
=self
.loop
)
664 deloyment_timeout
-= 5
665 if deloyment_timeout
<= 0:
666 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
667 db_nsr
["detailed-status"] = "Configuring vnfr"
668 self
.update_db("nsrs", nsr_id
, db_nsr
)
671 step
= "Looking for needed vnfd to configure"
672 self
.logger
.debug(logging_text
+ step
)
673 for c_vnf
in nsd
["constituent-vnfd"]:
674 vnfd_id
= c_vnf
["vnfd-id-ref"]
675 vnf_index
= str(c_vnf
["member-vnf-index"])
676 vnfd
= needed_vnfd
[vnfd_id
]
677 if vnfd
.get("vnf-configuration") and vnfd
["vnf-configuration"].get("juju"):
679 proxy_charm
= vnfd
["vnf-configuration"]["juju"]["charm"]
681 # Note: The charm needs to exist on disk at the location
682 # specified by charm_path.
683 base_folder
= vnfd
["_admin"]["storage"]
684 storage_params
= self
.fs
.get_params()
685 charm_path
= "{}{}/{}/charms/{}".format(
686 storage_params
["path"],
687 base_folder
["folder"],
688 base_folder
["pkg-dir"],
692 # Setup the runtime parameters for this VNF
694 'rw_mgmt_ip': nsr_lcm
['nsr_ip'][vnf_index
],
697 # model_name will be ignored in the current version of N2VC
698 # but will be implemented for the next point release.
699 model_name
= 'default'
700 application_name
= self
.n2vc
.FormatApplicationName(
701 nsr_name
, # 'default',
705 # TODO N2VC implement this inside n2vc.FormatApplicationName
706 application_name
= application_name
[:50]
708 nsr_lcm
["VCA"][vnf_index
] = {
710 "application": application_name
,
711 "operational-status": "init",
712 "detailed-status": "",
716 self
.logger
.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id
, charm_path
, proxy_charm
))
717 task
= asyncio
.ensure_future(
718 self
.n2vc
.DeployCharms(
719 model_name
, # The network service name
720 application_name
, # The application name
721 vnfd
, # The vnf descriptor
722 charm_path
, # Path to charm
723 params
, # Runtime params, like mgmt ip
724 {}, # for native charms only
725 self
.n2vc_callback
, # Callback for status changes
726 db_nsr
, # Callback parameter
727 vnf_index
, # Callback parameter
728 None, # Callback parameter (task)
731 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, model_name
, application_name
,
732 None, db_nsr
, vnf_index
))
734 self
.lcm_ns_tasks
[nsr_id
][order_id
]["create_charm:" + vnf_index
] = task
735 db_nsr
["config-status"] = "configuring" if vnfd_to_config
else "configured"
736 db_nsr
["detailed-status"] = "configuring: init: {}".format(vnfd_to_config
) if vnfd_to_config
else "done"
737 db_nsr
["operational-status"] = "running"
738 self
.update_db("nsrs", nsr_id
, db_nsr
)
740 self
.logger
.debug("Task create_ns={} Exit Ok".format(nsr_id
))
743 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
744 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
746 except Exception as e
:
747 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
751 db_nsr
["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
752 db_nsr
["operational-status"] = "failed"
753 self
.update_db("nsrs", nsr_id
, db_nsr
)
755 async def delete_ns(self
, nsr_id
, order_id
):
756 logging_text
= "Task delete_ns={} ".format(nsr_id
)
757 self
.logger
.debug(logging_text
+ "Enter")
760 step
= "Getting nsr from db"
762 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
764 nsr_lcm
= db_nsr
["_admin"]["deploy"]
766 db_nsr
["operational-status"] = "terminating"
767 db_nsr
["config-status"] = "terminating"
768 db_nsr
["detailed-status"] = "Deleting charms"
769 self
.update_db("nsrs", nsr_id
, db_nsr
)
772 self
.logger
.debug(logging_text
+ step
)
773 for vnf_index
, deploy_info
in nsr_lcm
["VCA"].items():
774 if deploy_info
and deploy_info
.get("application"):
775 # n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
777 # self.n2vc.RemoveCharms(model_name, application_name, self.n2vc_callback, model_name, application_name)
778 task
= asyncio
.ensure_future(
779 self
.n2vc
.RemoveCharms(
780 deploy_info
['model'],
781 deploy_info
['application'],
787 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
788 # deploy_info['application'],None, db_nsr, vnf_index))
789 self
.lcm_ns_tasks
[nsr_id
][order_id
]["delete_charm:" + vnf_index
] = task
790 except Exception as e
:
791 self
.logger
.debug(logging_text
+ "Failed while deleting charms: {}".format(e
))
794 RO
= ROclient
.ROClient(self
.loop
, datacenter
=db_nsr
["datacenter"], **self
.ro_config
)
796 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"]
799 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
800 self
.logger
.debug(logging_text
+ step
)
801 desc
= await RO
.delete("ns", RO_nsr_id
)
802 nsr_lcm
["RO"]["nsr_id"] = None
803 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
804 except ROclient
.ROClientException
as e
:
805 if e
.http_code
== 404: # not found
806 nsr_lcm
["RO"]["nsr_id"] = None
807 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
808 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(RO_nsr_id
))
809 elif e
.http_code
== 409: #conflict
810 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(RO_nsr_id
, e
))
812 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(RO_nsr_id
, e
))
813 self
.update_db("nsrs", nsr_id
, db_nsr
)
816 RO_nsd_id
= nsr_lcm
["RO"]["nsd_id"]
819 step
= db_nsr
["detailed-status"] = "Deleting nsd at RO"
820 desc
= await RO
.delete("nsd", RO_nsd_id
)
821 self
.logger
.debug(logging_text
+ "RO_nsd_id={} deleted".format(RO_nsd_id
))
822 nsr_lcm
["RO"]["nsd_id"] = None
823 except ROclient
.ROClientException
as e
:
824 if e
.http_code
== 404: # not found
825 nsr_lcm
["RO"]["nsd_id"] = None
826 self
.logger
.debug(logging_text
+ "RO_nsd_id={} already deleted".format(RO_nsd_id
))
827 elif e
.http_code
== 409: #conflict
828 self
.logger
.debug(logging_text
+ "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id
, e
))
830 self
.logger
.error(logging_text
+ "RO_nsd_id={} delete error: {}".format(RO_nsd_id
, e
))
831 self
.update_db("nsrs", nsr_id
, db_nsr
)
833 for vnf_id
, RO_vnfd_id
in nsr_lcm
["RO"]["vnfd_id"].items():
837 step
= db_nsr
["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id
)
838 desc
= await RO
.delete("vnfd", RO_vnfd_id
)
839 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} deleted".format(RO_vnfd_id
))
840 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
841 except ROclient
.ROClientException
as e
:
842 if e
.http_code
== 404: # not found
843 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
844 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} already deleted ".format(RO_vnfd_id
))
845 elif e
.http_code
== 409: #conflict
846 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id
, e
))
848 self
.logger
.error(logging_text
+ "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id
, e
))
849 self
.update_db("nsrs", nsr_id
, db_nsr
)
851 # TODO delete from database or mark as deleted???
852 db_nsr
["operational-status"] = "terminated"
853 self
.db
.del_one("nsrs", {"_id": nsr_id
})
854 self
.logger
.debug(logging_text
+ "Exit")
856 except (ROclient
.ROClientException
, DbException
) as e
:
857 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
859 except Exception as e
:
860 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
864 db_nsr
["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
865 db_nsr
["operational-status"] = "failed"
866 self
.update_db("nsrs", nsr_id
, db_nsr
)
868 async def test(self
, param
=None):
869 self
.logger
.debug("Starting/Ending test task: {}".format(param
))
871 def cancel_tasks(self
, topic
, _id
):
873 Cancel all active tasks of a concrete nsr or vim identified for _id
874 :param topic: can be ns or vim_account
875 :param _id: nsr or vim identity
876 :return: None, or raises an exception if not possible
879 lcm_tasks
= self
.lcm_ns_tasks
880 elif topic
== "vim_account":
881 lcm_tasks
= self
.lcm_vim_tasks
883 lcm_tasks
= self
.lcm_sdn_tasks
885 if not lcm_tasks
.get(_id
):
887 for order_id
, tasks_set
in lcm_tasks
[_id
].items():
888 for task_name
, task
in tasks_set
.items():
889 result
= task
.cancel()
891 self
.logger
.debug("{} _id={} order_id={} task={} cancelled".format(topic
, _id
, order_id
, task_name
))
894 async def read_kafka(self
):
895 self
.logger
.debug("Task Kafka Enter")
897 # future = asyncio.Future()
898 consecutive_errors
= 0
899 while consecutive_errors
< 10:
901 topic
, command
, params
= await self
.msg
.aioread(("ns", "vim_account", "sdn"), self
.loop
)
902 self
.logger
.debug("Task Kafka receives {} {}: {}".format(topic
, command
, params
))
903 consecutive_errors
= 0
905 if command
== "exit":
908 elif command
.startswith("#"):
910 elif command
== "echo":
915 elif command
== "test":
916 asyncio
.Task(self
.test(params
), loop
=self
.loop
)
920 nsr_id
= params
.strip()
921 if command
== "create":
922 # self.logger.debug("Deploying NS {}".format(nsr_id))
923 task
= asyncio
.ensure_future(self
.create_ns(nsr_id
, order_id
))
924 if nsr_id
not in self
.lcm_ns_tasks
:
925 self
.lcm_ns_tasks
[nsr_id
] = {}
926 self
.lcm_ns_tasks
[nsr_id
][order_id
] = {"create_ns": task
}
928 elif command
== "delete":
929 # self.logger.debug("Deleting NS {}".format(nsr_id))
930 self
.cancel_tasks(topic
, nsr_id
)
931 task
= asyncio
.ensure_future(self
.delete_ns(nsr_id
, order_id
))
932 if nsr_id
not in self
.lcm_ns_tasks
:
933 self
.lcm_ns_tasks
[nsr_id
] = {}
934 self
.lcm_ns_tasks
[nsr_id
][order_id
] = {"delete_ns": task
}
936 elif command
== "show":
938 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
940 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
941 "{}\n deploy: {}\n tasks: {}".format(
942 nsr_id
, db_nsr
["operational-status"],
943 db_nsr
["config-status"], db_nsr
["detailed-status"],
944 db_nsr
["_admin"]["deploy"], self
.lcm_ns_tasks
.get(nsr_id
)))
945 except Exception as e
:
946 print("nsr {} not found: {}".format(nsr_id
, e
))
949 elif topic
== "vim_account":
950 vim_id
= params
["_id"]
951 if command
== "create":
952 task
= asyncio
.ensure_future(self
.create_vim(params
, order_id
))
953 if vim_id
not in self
.lcm_vim_tasks
:
954 self
.lcm_vim_tasks
[vim_id
] = {}
955 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"create_vim": task
}
957 elif command
== "delete":
958 self
.cancel_tasks(topic
, vim_id
)
959 task
= asyncio
.ensure_future(self
.delete_vim(vim_id
, order_id
))
960 if vim_id
not in self
.lcm_vim_tasks
:
961 self
.lcm_vim_tasks
[vim_id
] = {}
962 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"delete_vim": task
}
964 elif command
== "show":
965 print("not implemented show with vim_account")
968 elif command
== "edit":
969 task
= asyncio
.ensure_future(self
.edit_vim(vim_id
, order_id
))
970 if vim_id
not in self
.lcm_vim_tasks
:
971 self
.lcm_vim_tasks
[vim_id
] = {}
972 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"edit_vim": task
}
975 _sdn_id
= params
["_id"]
976 if command
== "create":
977 task
= asyncio
.ensure_future(self
.create_sdn(params
, order_id
))
978 if _sdn_id
not in self
.lcm_sdn_tasks
:
979 self
.lcm_sdn_tasks
[_sdn_id
] = {}
980 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"create_sdn": task
}
982 elif command
== "delete":
983 self
.cancel_tasks(topic
, _sdn_id
)
984 task
= asyncio
.ensure_future(self
.delete_sdn(_sdn_id
, order_id
))
985 if _sdn_id
not in self
.lcm_sdn_tasks
:
986 self
.lcm_sdn_tasks
[_sdn_id
] = {}
987 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"delete_sdn": task
}
989 elif command
== "edit":
990 task
= asyncio
.ensure_future(self
.edit_sdn(_sdn_id
, order_id
))
991 if _sdn_id
not in self
.lcm_sdn_tasks
:
992 self
.lcm_sdn_tasks
[_sdn_id
] = {}
993 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"edit_sdn": task
}
995 self
.logger
.critical("unknown topic {} and command '{}'".format(topic
, command
))
996 except Exception as e
:
997 if consecutive_errors
== 5:
998 self
.logger
.error("Task Kafka task exit error too many errors. Exception: {}".format(e
))
1001 consecutive_errors
+= 1
1002 self
.logger
.error("Task Kafka Exception {}".format(e
))
1003 await asyncio
.sleep(1, loop
=self
.loop
)
1004 self
.logger
.debug("Task Kafka terminating")
1006 # self.cancel_tasks("ALL", "create")
1008 # while self.is_pending_tasks():
1009 # self.logger.debug("Task Kafka terminating. Waiting for tasks termination")
1010 # await asyncio.sleep(2, loop=self.loop)
1013 # self.cancel_tasks("ALL", "ALL")
1014 self
.logger
.debug("Task Kafka exit")
1017 self
.loop
= asyncio
.get_event_loop()
1018 self
.loop
.run_until_complete(self
.read_kafka())
1022 self
.db
.db_disconnect()
1024 self
.msg
.disconnect()
1026 self
.fs
.fs_disconnect()
1029 def read_config_file(self
, config_file
):
1030 # TODO make a [ini] + yaml inside parser
1031 # the configparser library is not suitable, because it does not admit comments at the end of line,
1032 # and not parse integer or boolean
1034 with
open(config_file
) as f
:
1036 for k
, v
in environ
.items():
1037 if not k
.startswith("OSMLCM_"):
1039 k_items
= k
.lower().split("_")
1042 for k_item
in k_items
[1:-1]:
1043 if k_item
in ("ro", "vca"):
1044 # put in capital letter
1045 k_item
= k_item
.upper()
1047 if k_items
[-1] == "port":
1048 c
[k_items
[-1]] = int(v
)
1051 except Exception as e
:
1052 self
.logger
.warn("skipping environ '{}' on exception '{}'".format(k
, e
))
1055 except Exception as e
:
1056 self
.logger
.critical("At config file '{}': {}".format(config_file
, e
))
1060 if __name__
== '__main__':
1062 config_file
= "lcm.cfg"
1063 lcm
= Lcm(config_file
)