2 # -*- coding: utf-8 -*-
15 from dbbase
import DbException
16 from fsbase
import FsException
17 from msgbase
import MsgException
18 from os
import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc
.vnf
import N2VC
21 from n2vc
import version
as N2VC_version
25 from copy
import deepcopy
26 from http
import HTTPStatus
30 class LcmException(Exception):
36 def __init__(self
, config_file
):
38 Init, Connect to database, filesystem storage, and messaging
39 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
46 self
.pings_not_received
= 1
48 # contains created tasks/futures to be able to cancel
49 self
.lcm_ns_tasks
= {}
50 self
.lcm_vim_tasks
= {}
51 self
.lcm_sdn_tasks
= {}
53 self
.logger
= logging
.getLogger('lcm')
55 config
= self
.read_config_file(config_file
)
58 "endpoint_url": "http://{}:{}/openmano".format(config
["RO"]["host"], config
["RO"]["port"]),
59 "tenant": config
.get("tenant", "osm"),
60 "logger_name": "lcm.ROclient",
64 self
.vca
= config
["VCA"] # TODO VCA
68 log_format_simple
= "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
69 log_formatter_simple
= logging
.Formatter(log_format_simple
, datefmt
='%Y-%m-%dT%H:%M:%S')
70 config
["database"]["logger_name"] = "lcm.db"
71 config
["storage"]["logger_name"] = "lcm.fs"
72 config
["message"]["logger_name"] = "lcm.msg"
73 if "logfile" in config
["global"]:
74 file_handler
= logging
.handlers
.RotatingFileHandler(config
["global"]["logfile"],
75 maxBytes
=100e6
, backupCount
=9, delay
=0)
76 file_handler
.setFormatter(log_formatter_simple
)
77 self
.logger
.addHandler(file_handler
)
79 str_handler
= logging
.StreamHandler()
80 str_handler
.setFormatter(log_formatter_simple
)
81 self
.logger
.addHandler(str_handler
)
83 if config
["global"].get("loglevel"):
84 self
.logger
.setLevel(config
["global"]["loglevel"])
86 # logging other modules
87 for k1
, logname
in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
88 config
[k1
]["logger_name"] = logname
89 logger_module
= logging
.getLogger(logname
)
90 if "logfile" in config
[k1
]:
91 file_handler
= logging
.handlers
.RotatingFileHandler(config
[k1
]["logfile"],
92 maxBytes
=100e6
, backupCount
=9, delay
=0)
93 file_handler
.setFormatter(log_formatter_simple
)
94 logger_module
.addHandler(file_handler
)
95 if "loglevel" in config
[k1
]:
96 logger_module
.setLevel(config
[k1
]["loglevel"])
100 server
=config
['VCA']['host'],
101 port
=config
['VCA']['port'],
102 user
=config
['VCA']['user'],
103 secret
=config
['VCA']['secret'],
104 # TODO: This should point to the base folder where charms are stored,
105 # if there is a common one (like object storage). Otherwise, leave
106 # it unset and pass it via DeployCharms
107 # artifacts=config['VCA'][''],
110 # check version of N2VC
111 # TODO enhance with int conversion or from distutils.version import LooseVersion
112 # or with list(map(int, version.split(".")))
113 if N2VC_version
< "0.0.2":
114 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version
))
116 if config
["database"]["driver"] == "mongo":
117 self
.db
= dbmongo
.DbMongo()
118 self
.db
.db_connect(config
["database"])
119 elif config
["database"]["driver"] == "memory":
120 self
.db
= dbmemory
.DbMemory()
121 self
.db
.db_connect(config
["database"])
123 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
124 config
["database"]["driver"]))
126 if config
["storage"]["driver"] == "local":
127 self
.fs
= fslocal
.FsLocal()
128 self
.fs
.fs_connect(config
["storage"])
130 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
131 config
["storage"]["driver"]))
133 if config
["message"]["driver"] == "local":
134 self
.msg
= msglocal
.MsgLocal()
135 self
.msg
.connect(config
["message"])
136 elif config
["message"]["driver"] == "kafka":
137 self
.msg
= msgkafka
.MsgKafka()
138 self
.msg
.connect(config
["message"])
140 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
141 config
["storage"]["driver"]))
142 except (DbException
, FsException
, MsgException
) as e
:
143 self
.logger
.critical(str(e
), exc_info
=True)
144 raise LcmException(str(e
))
146 def update_db(self
, item
, _id
, _desc
):
148 self
.db
.replace(item
, _id
, _desc
)
149 except DbException
as e
:
150 self
.logger
.error("Updating {} _id={}: {}".format(item
, _id
, e
))
152 def update_db_2(self
, item
, _id
, _desc
):
154 self
.db
.set_one(item
, {"_id": _id
}, _desc
)
155 except DbException
as e
:
156 self
.logger
.error("Updating {} _id={}: {}".format(item
, _id
, e
))
158 async def vim_create(self
, vim_content
, order_id
):
159 vim_id
= vim_content
["_id"]
160 logging_text
= "Task vim_create={} ".format(vim_id
)
161 self
.logger
.debug(logging_text
+ "Enter")
165 step
= "Getting vim from db"
166 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
167 if "_admin" not in db_vim
:
168 db_vim
["_admin"] = {}
169 if "deployed" not in db_vim
["_admin"]:
170 db_vim
["_admin"]["deployed"] = {}
171 db_vim
["_admin"]["deployed"]["RO"] = None
173 step
= "Creating vim at RO"
174 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
175 vim_RO
= deepcopy(vim_content
)
176 vim_RO
.pop("_id", None)
177 vim_RO
.pop("_admin", None)
178 vim_RO
.pop("schema_version", None)
179 vim_RO
.pop("schema_type", None)
180 vim_RO
.pop("vim_tenant_name", None)
181 vim_RO
["type"] = vim_RO
.pop("vim_type")
182 vim_RO
.pop("vim_user", None)
183 vim_RO
.pop("vim_password", None)
184 desc
= await RO
.create("vim", descriptor
=vim_RO
)
185 RO_vim_id
= desc
["uuid"]
186 db_vim
["_admin"]["deployed"]["RO"] = RO_vim_id
187 self
.update_db("vim_accounts", vim_id
, db_vim
)
189 step
= "Attach vim to RO tenant"
190 vim_RO
= {"vim_tenant_name": vim_content
["vim_tenant_name"],
191 "vim_username": vim_content
["vim_user"],
192 "vim_password": vim_content
["vim_password"],
193 "config": vim_content
["config"]
195 desc
= await RO
.attach_datacenter(RO_vim_id
, descriptor
=vim_RO
)
196 db_vim
["_admin"]["operationalState"] = "ENABLED"
197 self
.update_db("vim_accounts", vim_id
, db_vim
)
199 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id".format(RO_vim_id
))
202 except (ROclient
.ROClientException
, DbException
) as e
:
203 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
205 except Exception as e
:
206 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
210 db_vim
["_admin"]["operationalState"] = "ERROR"
211 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
212 self
.update_db("vim_accounts", vim_id
, db_vim
)
214 async def vim_edit(self
, vim_content
, order_id
):
215 vim_id
= vim_content
["_id"]
216 logging_text
= "Task vim_edit={} ".format(vim_id
)
217 self
.logger
.debug(logging_text
+ "Enter")
220 step
= "Getting vim from db"
222 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
223 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
224 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
225 step
= "Editing vim at RO"
226 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
227 vim_RO
= deepcopy(vim_content
)
228 vim_RO
.pop("_id", None)
229 vim_RO
.pop("_admin", None)
230 vim_RO
.pop("schema_version", None)
231 vim_RO
.pop("schema_type", None)
232 vim_RO
.pop("vim_tenant_name", None)
233 vim_RO
["type"] = vim_RO
.pop("vim_type")
234 vim_RO
.pop("vim_user", None)
235 vim_RO
.pop("vim_password", None)
237 desc
= await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
239 step
= "Editing vim-account at RO tenant"
241 for k
in ("vim_tenant_name", "vim_password", "config"):
243 vim_RO
[k
] = vim_content
[k
]
244 if "vim_user" in vim_content
:
245 vim_content
["vim_username"] = vim_content
["vim_user"]
247 desc
= await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_RO
)
248 db_vim
["_admin"]["operationalState"] = "ENABLED"
249 self
.update_db("vim_accounts", vim_id
, db_vim
)
251 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id".format(RO_vim_id
))
254 except (ROclient
.ROClientException
, DbException
) as e
:
255 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
257 except Exception as e
:
258 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
262 db_vim
["_admin"]["operationalState"] = "ERROR"
263 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
264 self
.update_db("vim_accounts", vim_id
, db_vim
)
266 async def vim_delete(self
, vim_id
, order_id
):
267 logging_text
= "Task vim_delete={} ".format(vim_id
)
268 self
.logger
.debug(logging_text
+ "Enter")
271 step
= "Getting vim from db"
273 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
274 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
275 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
276 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
277 step
= "Detaching vim from RO tenant"
279 await RO
.detach_datacenter(RO_vim_id
)
280 except ROclient
.ROClientException
as e
:
281 if e
.http_code
== 404: # not found
282 self
.logger
.debug(logging_text
+ "RO_vim_id={} already detached".format(RO_vim_id
))
286 step
= "Deleting vim from RO"
288 await RO
.delete("vim", RO_vim_id
)
289 except ROclient
.ROClientException
as e
:
290 if e
.http_code
== 404: # not found
291 self
.logger
.debug(logging_text
+ "RO_vim_id={} already deleted".format(RO_vim_id
))
296 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
297 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
298 self
.logger
.debug("vim_delete task vim_id={} Exit Ok".format(vim_id
))
301 except (ROclient
.ROClientException
, DbException
) as e
:
302 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
304 except Exception as e
:
305 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
309 db_vim
["_admin"]["operationalState"] = "ERROR"
310 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
311 self
.update_db("vim_accounts", vim_id
, db_vim
)
313 async def sdn_create(self
, sdn_content
, order_id
):
314 sdn_id
= sdn_content
["_id"]
315 logging_text
= "Task sdn_create={} ".format(sdn_id
)
316 self
.logger
.debug(logging_text
+ "Enter")
320 step
= "Getting sdn from db"
321 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
322 if "_admin" not in db_sdn
:
323 db_sdn
["_admin"] = {}
324 if "deployed" not in db_sdn
["_admin"]:
325 db_sdn
["_admin"]["deployed"] = {}
326 db_sdn
["_admin"]["deployed"]["RO"] = None
328 step
= "Creating sdn at RO"
329 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
330 sdn_RO
= deepcopy(sdn_content
)
331 sdn_RO
.pop("_id", None)
332 sdn_RO
.pop("_admin", None)
333 sdn_RO
.pop("schema_version", None)
334 sdn_RO
.pop("schema_type", None)
335 sdn_RO
.pop("description", None)
336 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
337 RO_sdn_id
= desc
["uuid"]
338 db_sdn
["_admin"]["deployed"]["RO"] = RO_sdn_id
339 db_sdn
["_admin"]["operationalState"] = "ENABLED"
340 self
.update_db("sdns", sdn_id
, db_sdn
)
341 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id".format(RO_sdn_id
))
344 except (ROclient
.ROClientException
, DbException
) as e
:
345 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
347 except Exception as e
:
348 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
352 db_sdn
["_admin"]["operationalState"] = "ERROR"
353 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
354 self
.update_db("sdns", sdn_id
, db_sdn
)
356 async def sdn_edit(self
, sdn_content
, order_id
):
357 sdn_id
= sdn_content
["_id"]
358 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
359 self
.logger
.debug(logging_text
+ "Enter")
362 step
= "Getting sdn from db"
364 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
365 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
366 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
367 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
368 step
= "Editing sdn at RO"
369 sdn_RO
= deepcopy(sdn_content
)
370 sdn_RO
.pop("_id", None)
371 sdn_RO
.pop("_admin", None)
372 sdn_RO
.pop("schema_version", None)
373 sdn_RO
.pop("schema_type", None)
374 sdn_RO
.pop("description", None)
376 desc
= await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
377 db_sdn
["_admin"]["operationalState"] = "ENABLED"
378 self
.update_db("sdns", sdn_id
, db_sdn
)
380 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id".format(RO_sdn_id
))
383 except (ROclient
.ROClientException
, DbException
) as e
:
384 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
386 except Exception as e
:
387 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
391 db_sdn
["_admin"]["operationalState"] = "ERROR"
392 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
393 self
.update_db("sdns", sdn_id
, db_sdn
)
395 async def sdn_delete(self
, sdn_id
, order_id
):
396 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
397 self
.logger
.debug(logging_text
+ "Enter")
400 step
= "Getting sdn from db"
402 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
403 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
404 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
405 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
406 step
= "Deleting sdn from RO"
408 await RO
.delete("sdn", RO_sdn_id
)
409 except ROclient
.ROClientException
as e
:
410 if e
.http_code
== 404: # not found
411 self
.logger
.debug(logging_text
+ "RO_sdn_id={} already deleted".format(RO_sdn_id
))
416 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
417 self
.db
.del_one("sdns", {"_id": sdn_id
})
418 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
421 except (ROclient
.ROClientException
, DbException
) as e
:
422 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
424 except Exception as e
:
425 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
429 db_sdn
["_admin"]["operationalState"] = "ERROR"
430 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
431 self
.update_db("sdns", sdn_id
, db_sdn
)
433 def vnfd2RO(self
, vnfd
, new_id
=None):
435 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
436 :param vnfd: input vnfd
437 :param new_id: overrides vnf id if provided
438 :return: copy of vnfd
442 vnfd_RO
= deepcopy(vnfd
)
443 vnfd_RO
.pop("_id", None)
444 vnfd_RO
.pop("_admin", None)
446 vnfd_RO
["id"] = new_id
447 for vdu
in vnfd_RO
["vdu"]:
448 if "cloud-init-file" in vdu
:
449 base_folder
= vnfd
["_admin"]["storage"]
450 clout_init_file
= "{}/{}/cloud_init/{}".format(
451 base_folder
["folder"],
452 base_folder
["pkg-dir"],
453 vdu
["cloud-init-file"]
455 ci_file
= self
.fs
.file_open(clout_init_file
, "r")
456 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
457 clout_init_content
= ci_file
.read()
460 vdu
.pop("cloud-init-file", None)
461 vdu
["cloud-init"] = clout_init_content
463 except FsException
as e
:
464 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd
["_id"], e
))
469 def n2vc_callback(self
, model_name
, application_name
, status
, message
, db_nsr
, db_nslcmop
, vnf_member_index
, task
=None):
471 Callback both for charm status change and task completion
472 :param model_name: Charm model name
473 :param application_name: Charm application name
474 :param status: Can be
475 - blocked: The unit needs manual intervention
476 - maintenance: The unit is actively deploying/configuring
477 - waiting: The unit is waiting for another charm to be ready
478 - active: The unit is deployed, configured, and ready
479 - error: The charm has failed and needs attention.
480 - terminated: The charm has been destroyed
483 :param message: detailed message error
484 :param db_nsr: nsr database content
485 :param db_nslcmop: nslcmop database content
486 :param vnf_member_index: NSD vnf-member-index
487 :param task: None for charm status change, or task for completion task callback
492 update_nsr
= update_nslcmop
= False
494 nsr_id
= db_nsr
["_id"]
495 nslcmop_id
= db_nslcmop
["_id"]
496 nsr_lcm
= db_nsr
["_admin"]["deployed"]
497 ns_action
= db_nslcmop
["lcmOperationType"]
498 logging_text
= "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id
, ns_action
, nslcmop_id
,
503 self
.logger
.debug(logging_text
+ " task Cancelled")
504 # TODO update db_nslcmop
508 exc
= task
.exception()
510 self
.logger
.error(logging_text
+ " task Exception={}".format(exc
))
511 if ns_action
in ("instantiate", "terminate"):
512 nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] = "error"
513 nsr_lcm
["VCA"][vnf_member_index
]['detailed-status'] = str(exc
)
514 elif ns_action
== "action":
515 db_nslcmop
["operationState"] = "FAILED"
516 db_nslcmop
["detailed-status"] = str(exc
)
517 db_nslcmop
["statusEnteredTime"] = time()
518 update_nslcmop
= True
522 self
.logger
.debug(logging_text
+ " task Done")
523 # TODO revise with Adam if action is finished and ok when task is done
524 if ns_action
== "action":
525 db_nslcmop
["operationState"] = "COMPLETED"
526 db_nslcmop
["detailed-status"] = "Done"
527 db_nslcmop
["statusEnteredTime"] = time()
528 update_nslcmop
= True
529 # task is Done, but callback is still ongoing. So ignore
532 self
.logger
.debug(logging_text
+ " Enter status={}".format(status
))
533 if nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] == status
:
534 return # same status, ignore
535 nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] = status
536 nsr_lcm
["VCA"][vnf_member_index
]['detailed-status'] = str(message
)
538 self
.logger
.critical(logging_text
+ " Enter with bad parameters", exc_info
=True)
543 n2vc_error_text
= [] # contain text error list. If empty no one is in error status
544 for vnf_index
, vca_info
in nsr_lcm
["VCA"].items():
545 vca_status
= vca_info
["operational-status"]
546 if vca_status
not in status_map
:
548 status_map
[vca_status
] = 0
549 status_map
[vca_status
] += 1
551 if vca_status
!= "active":
553 elif vca_status
in ("error", "blocked"):
554 n2vc_error_text
.append("member_vnf_index={} {}: {}".format(vnf_member_index
, vca_status
,
555 vca_info
["detailed-status"]))
558 self
.logger
.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id
, vnf_member_index
))
559 db_nsr
["config-status"] = "configured"
560 db_nsr
["detailed-status"] = "done"
561 db_nslcmop
["operationState"] = "COMPLETED"
562 db_nslcmop
["detailed-status"] = "Done"
563 db_nslcmop
["statusEnteredTime"] = time()
564 elif n2vc_error_text
:
565 db_nsr
["config-status"] = "failed"
566 error_text
= "fail configuring " + ";".join(n2vc_error_text
)
567 db_nsr
["detailed-status"] = error_text
568 db_nslcmop
["operationState"] = "FAILED_TEMP"
569 db_nslcmop
["detailed-status"] = error_text
570 db_nslcmop
["statusEnteredTime"] = time()
574 for status
, num
in status_map
.items():
575 cs
+= separator
+ "{}: {}".format(status
, num
)
577 db_nsr
["config-status"] = cs
578 db_nsr
["detailed-status"] = cs
579 db_nslcmop
["detailed-status"] = cs
580 update_nsr
= update_nslcmop
= True
582 except Exception as e
:
583 self
.logger
.critical("[n2vc_callback] vnf_index={} Exception {}".format(vnf_member_index
, e
), exc_info
=True)
587 self
.update_db("nslcmops", nslcmop_id
, db_nslcmop
)
589 self
.update_db("nsrs", nsr_id
, db_nsr
)
590 except Exception as e
:
591 self
.logger
.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
592 vnf_member_index
, e
), exc_info
=True)
594 def ns_params_2_RO(self
, ns_params
):
596 Creates a RO ns descriptor from OSM ns_instantite params
597 :param ns_params: OSM instantiate params
598 :return: The RO ns descriptor
601 def vim_account_2_RO(vim_account
):
602 if vim_account
in vim_2_RO
:
603 return vim_2_RO
[vim_account
]
604 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
605 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
606 # #TODO check if VIM is creating and wait
607 if db_vim
["_admin"]["operationalState"] != "ENABLED":
608 raise LcmException("VIM={} is not available. operationalState={}".format(
609 vim_account
, db_vim
["_admin"]["operationalState"]))
610 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
611 vim_2_RO
[vim_account
] = RO_vim_id
617 # "name": ns_params["nsName"],
618 # "description": ns_params.get("nsDescription"),
619 "datacenter": vim_account_2_RO(ns_params
["vimAccountId"]),
620 # "scenario": ns_params["nsdId"],
624 if ns_params
.get("ssh-authorized-key"):
625 RO_ns_params
["cloud-config"] = {"key-pairs": ns_params
["ssh-authorized-key"]}
626 if ns_params
.get("vnf"):
627 for vnf
in ns_params
["vnf"]:
629 if "vimAccountId" in vnf
:
630 RO_vnf
["datacenter"] = vim_account_2_RO(vnf
["vimAccountId"])
632 RO_ns_params
["vnfs"][vnf
["member-vnf-index"]] = RO_vnf
633 if ns_params
.get("vld"):
634 for vld
in ns_params
["vld"]:
636 if "ip-profile" in vld
:
637 RO_vld
["ip-profile"] = vld
["ip-profile"]
638 if "vim-network-name" in vld
:
640 if isinstance(vld
["vim-network-name"], dict):
641 for vim_account
, vim_net
in vld
["vim-network-name"].items():
642 RO_vld
["sites"].append({
643 "netmap-use": vim_net
,
644 "datacenter": vim_account_2_RO(vim_account
)
646 else: #isinstance str
647 RO_vld
["sites"].append({"netmap-use": vld
["vim-network-name"]})
649 RO_ns_params
["networks"][vld
["name"]] = RO_vld
652 async def ns_instantiate(self
, nsr_id
, nslcmop_id
):
653 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
654 self
.logger
.debug(logging_text
+ "Enter")
655 # get all needed from database
660 step
= "Getting nsr, nslcmop, RO_vims from db"
662 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
663 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
665 nsr_name
= db_nsr
["name"] # TODO short-name??
667 vnfr_filter
= {"nsr-id-ref": nsr_id
, "member-vnf-index-ref": None}
668 for c_vnf
in nsd
["constituent-vnfd"]:
669 vnfd_id
= c_vnf
["vnfd-id-ref"]
670 vnfr_filter
["member-vnf-index-ref"] = c_vnf
["member-vnf-index"]
671 db_vnfr
[c_vnf
["member-vnf-index"]] = self
.db
.get_one("vnfrs", vnfr_filter
)
672 if vnfd_id
not in needed_vnfd
:
673 step
= "Getting vnfd={} from db".format(vnfd_id
)
674 needed_vnfd
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
676 nsr_lcm
= db_nsr
["_admin"].get("deployed")
678 nsr_lcm
= db_nsr
["_admin"]["deployed"] = {
680 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
684 db_nsr
["detailed-status"] = "creating"
685 db_nsr
["operational-status"] = "init"
687 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
689 # get vnfds, instantiate at RO
690 for vnfd_id
, vnfd
in needed_vnfd
.items():
691 step
= db_nsr
["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id
)
692 self
.logger
.debug(logging_text
+ step
)
693 vnfd_id_RO
= nsr_id
+ "." + vnfd_id
[:200]
696 vnfd_list
= await RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
698 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = vnfd_list
[0]["uuid"]
699 self
.logger
.debug(logging_text
+ "RO vnfd={} exist. Using RO_id={}".format(
700 vnfd_id
, vnfd_list
[0]["uuid"]))
702 vnfd_RO
= self
.vnfd2RO(vnfd
, vnfd_id_RO
)
703 desc
= await RO
.create("vnfd", descriptor
=vnfd_RO
)
704 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = desc
["uuid"]
705 db_nsr
["_admin"]["nsState"] = "INSTANTIATED"
706 self
.update_db("nsrs", nsr_id
, db_nsr
)
710 step
= db_nsr
["detailed-status"] = "Creating nsd={} at RO".format(nsd_id
)
711 self
.logger
.debug(logging_text
+ step
)
713 nsd_id_RO
= nsd_id
+ "." + nsd_id
[:200]
714 nsd_list
= await RO
.get_list("nsd", filter_by
={"osm_id": nsd_id_RO
})
716 nsr_lcm
["RO"]["nsd_id"] = nsd_list
[0]["uuid"]
717 self
.logger
.debug(logging_text
+ "RO nsd={} exist. Using RO_id={}".format(
718 nsd_id
, nsd_list
[0]["uuid"]))
720 nsd_RO
= deepcopy(nsd
)
721 nsd_RO
["id"] = nsd_id_RO
722 nsd_RO
.pop("_id", None)
723 nsd_RO
.pop("_admin", None)
724 for c_vnf
in nsd_RO
["constituent-vnfd"]:
725 vnfd_id
= c_vnf
["vnfd-id-ref"]
726 c_vnf
["vnfd-id-ref"] = nsr_id
+ "." + vnfd_id
[:200]
727 desc
= await RO
.create("nsd", descriptor
=nsd_RO
)
728 db_nsr
["_admin"]["nsState"] = "INSTANTIATED"
729 nsr_lcm
["RO"]["nsd_id"] = desc
["uuid"]
730 self
.update_db("nsrs", nsr_id
, db_nsr
)
733 # if present use it unless in error status
734 RO_nsr_id
= nsr_lcm
["RO"].get("nsr_id")
737 step
= db_nsr
["detailed-status"] = "Looking for existing ns at RO"
738 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
739 desc
= await RO
.show("ns", RO_nsr_id
)
740 except ROclient
.ROClientException
as e
:
741 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
743 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
745 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
746 nsr_lcm
["RO"]["nsr_status"] = ns_status
747 if ns_status
== "ERROR":
748 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
749 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
750 await RO
.delete("ns", RO_nsr_id
)
751 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
753 step
= db_nsr
["detailed-status"] = "Creating ns at RO"
754 self
.logger
.debug(logging_text
+ step
)
755 RO_ns_params
= self
.ns_params_2_RO(db_nsr
.get("instantiate_params"))
756 desc
= await RO
.create("ns", descriptor
=RO_ns_params
,
758 scenario
=nsr_lcm
["RO"]["nsd_id"])
759 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = desc
["uuid"]
760 db_nsr
["_admin"]["nsState"] = "INSTANTIATED"
761 nsr_lcm
["RO"]["nsr_status"] = "BUILD"
763 self
.update_db("nsrs", nsr_id
, db_nsr
)
764 # update VNFR vimAccount
765 step
= "Updating VNFR vimAcccount"
766 for vnf_index
, vnfr
in db_vnfr
.items():
767 if vnfr
.get("vim-account-id"):
769 if db_nsr
["instantiate_params"].get("vnf") and db_nsr
["instantiate_params"]["vnf"].get(vnf_index
) \
770 and db_nsr
["instantiate_params"]["vnf"][vnf_index
].get("vimAccountId"):
771 vnfr
["vim-account-id"] = db_nsr
["instantiate_params"]["vnf"][vnf_index
]["vimAccountId"]
773 vnfr
["vim-account-id"] = db_nsr
["instantiate_params"]["vimAccountId"]
774 self
.update_db("vnfrs", vnfr
["_id"], vnfr
)
776 # wait until NS is ready
777 step
= ns_status_detailed
= "Waiting ns ready at RO"
778 db_nsr
["detailed-status"] = ns_status_detailed
779 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
780 deployment_timeout
= 2*3600 # Two hours
781 while deployment_timeout
> 0:
782 desc
= await RO
.show("ns", RO_nsr_id
)
783 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
784 nsr_lcm
["RO"]["nsr_status"] = ns_status
785 if ns_status
== "ERROR":
786 raise ROclient
.ROClientException(ns_status_info
)
787 elif ns_status
== "BUILD":
788 db_nsr
["detailed-status"] = ns_status_detailed
+ "; {}".format(ns_status_info
)
789 self
.update_db("nsrs", nsr_id
, db_nsr
)
790 elif ns_status
== "ACTIVE":
791 step
= "Getting ns VIM information"
792 ns_RO_info
= nsr_lcm
["nsr_ip"] = RO
.get_ns_vnf_info(desc
)
795 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
797 await asyncio
.sleep(5, loop
=self
.loop
)
798 deployment_timeout
-= 5
799 if deployment_timeout
<= 0:
800 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
801 step
= "Updating VNFRs"
802 for vnf_index
, vnfr_deployed
in ns_RO_info
.items():
803 vnfr
= db_vnfr
[vnf_index
]
804 vnfr
["ip-address"] = vnfr_deployed
.get("ip_address")
805 for vdu_id
, vdu_deployed
in vnfr_deployed
["vdur"].items():
806 for vdur
in vnfr
["vdur"]:
807 if vdur
["vdu-id-ref"] == vdu_id
:
808 vdur
["vim-id"] = vdu_deployed
.get("vim_id")
809 vdur
["ip-address"] = vdu_deployed
.get("ip_address")
811 self
.update_db("vnfrs", vnfr
["_id"], vnfr
)
813 db_nsr
["detailed-status"] = "Configuring vnfr"
814 self
.update_db("nsrs", nsr_id
, db_nsr
)
816 # The parameters we'll need to deploy a charm
817 number_to_configure
= 0
820 """An inner function to deploy the charm from either vnf or vdu
824 # if number_to_configure == 0:
825 # self.logger.debug("Logging into N2VC...")
826 # task = asyncio.ensure_future(self.n2vc.login())
827 # yield from asyncio.wait_for(task, 30.0)
828 # self.logger.debug("Logged into N2VC!")
830 ## await self.n2vc.login()
832 # Note: The charm needs to exist on disk at the location
833 # specified by charm_path.
834 base_folder
= vnfd
["_admin"]["storage"]
835 storage_params
= self
.fs
.get_params()
836 charm_path
= "{}{}/{}/charms/{}".format(
837 storage_params
["path"],
838 base_folder
["folder"],
839 base_folder
["pkg-dir"],
843 # Setup the runtime parameters for this VNF
844 params
['rw_mgmt_ip'] = db_vnfr
[vnf_index
]["ip-address"]
846 # ns_name will be ignored in the current version of N2VC
847 # but will be implemented for the next point release.
848 model_name
= 'default'
849 application_name
= self
.n2vc
.FormatApplicationName(
855 nsr_lcm
["VCA"][vnf_index
] = {
857 "application": application_name
,
858 "operational-status": "init",
859 "detailed-status": "",
863 self
.logger
.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id
, charm_path
, proxy_charm
))
864 task
= asyncio
.ensure_future(
865 self
.n2vc
.DeployCharms(
866 model_name
, # The network service name
867 application_name
, # The application name
868 vnfd
, # The vnf descriptor
869 charm_path
, # Path to charm
870 params
, # Runtime params, like mgmt ip
871 {}, # for native charms only
872 self
.n2vc_callback
, # Callback for status changes
873 db_nsr
, # Callback parameter
875 vnf_index
, # Callback parameter
876 None, # Callback parameter (task)
879 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, model_name
, application_name
, None, None,
880 db_nsr
, db_nslcmop
, vnf_index
))
881 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
]["create_charm:" + vnf_index
] = task
883 # TODO: Make this call inside deploy()
884 # Login to the VCA. If there are multiple calls to login(),
885 # subsequent calls will be a nop and return immediately.
886 await self
.n2vc
.login()
888 step
= "Looking for needed vnfd to configure"
889 self
.logger
.debug(logging_text
+ step
)
890 for c_vnf
in nsd
["constituent-vnfd"]:
891 vnfd_id
= c_vnf
["vnfd-id-ref"]
892 vnf_index
= str(c_vnf
["member-vnf-index"])
893 vnfd
= needed_vnfd
[vnfd_id
]
895 # Check if this VNF has a charm configuration
896 vnf_config
= vnfd
.get("vnf-configuration")
898 if vnf_config
and vnf_config
.get("juju"):
899 proxy_charm
= vnf_config
["juju"]["charm"]
903 if 'initial-config-primitive' in vnf_config
:
904 params
['initial-config-primitive'] = vnf_config
['initial-config-primitive']
907 number_to_configure
+= 1
909 # Deploy charms for each VDU that supports one.
910 for vdu
in vnfd
['vdu']:
911 vdu_config
= vdu
.get('vdu-configuration')
915 if vdu_config
and vdu_config
.get("juju"):
916 proxy_charm
= vdu_config
["juju"]["charm"]
918 if 'initial-config-primitive' in vdu_config
:
919 params
['initial-config-primitive'] = vdu_config
['initial-config-primitive']
923 number_to_configure
+= 1
925 if number_to_configure
:
926 db_nsr
["config-status"] = "configuring"
927 db_nsr
["detailed-status"] = "configuring: init: {}".format(number_to_configure
)
928 db_nslcmop
["detailed-status"] = "configuring: init: {}".format(number_to_configure
)
930 db_nslcmop
["operationState"] = "COMPLETED"
931 db_nslcmop
["detailed-status"] = "done"
932 db_nsr
["config-status"] = "configured"
933 db_nsr
["detailed-status"] = "done"
934 db_nsr
["operational-status"] = "running"
935 self
.update_db("nsrs", nsr_id
, db_nsr
)
936 self
.update_db("nslcmops", nslcmop_id
, db_nslcmop
)
937 self
.logger
.debug("Task ns_instantiate={} Exit Ok".format(nsr_id
))
940 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
941 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
943 except Exception as e
:
944 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
949 db_nsr
["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
950 db_nsr
["operational-status"] = "failed"
951 self
.update_db("nsrs", nsr_id
, db_nsr
)
953 db_nslcmop
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
954 db_nslcmop
["operationState"] = "FAILED"
955 db_nslcmop
["statusEnteredTime"] = time()
956 self
.update_db("nslcmops", nslcmop_id
, db_nslcmop
)
958 async def ns_terminate(self
, nsr_id
, nslcmop_id
):
959 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
960 self
.logger
.debug(logging_text
+ "Enter")
964 step
= "Getting nsr, nslcmop from db"
965 failed_detail
= [] # annotates all failed error messages
969 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
970 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
971 # nsd = db_nsr["nsd"]
972 nsr_lcm
= deepcopy(db_nsr
["_admin"]["deployed"])
973 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
976 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
977 # #TODO check if VIM is creating and wait
978 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
981 "operational-status": "terminating",
982 "config-status": "terminating",
983 "detailed-status": "Deleting charms",
985 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
988 self
.logger
.debug(logging_text
+ step
)
989 for vnf_index
, deploy_info
in nsr_lcm
["VCA"].items():
990 if deploy_info
and deploy_info
.get("application"):
991 task
= asyncio
.ensure_future(
992 self
.n2vc
.RemoveCharms(
993 deploy_info
['model'],
994 deploy_info
['application'],
995 # self.n2vc_callback,
1001 vca_task_list
.append(task
)
1002 vca_task_dict
[vnf_index
] = task
1003 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1004 # deploy_info['application'], None, db_nsr,
1005 # db_nslcmop, vnf_index))
1006 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
]["delete_charm:" + vnf_index
] = task
1007 except Exception as e
:
1008 self
.logger
.debug(logging_text
+ "Failed while deleting charms: {}".format(e
))
1011 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
1013 RO_nsr_id
= nsr_lcm
["RO"].get("nsr_id")
1016 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
1017 self
.logger
.debug(logging_text
+ step
)
1018 desc
= await RO
.delete("ns", RO_nsr_id
)
1019 nsr_lcm
["RO"]["nsr_id"] = None
1020 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
1021 except ROclient
.ROClientException
as e
:
1022 if e
.http_code
== 404: # not found
1023 nsr_lcm
["RO"]["nsr_id"] = None
1024 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
1025 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(RO_nsr_id
))
1026 elif e
.http_code
== 409: #conflict
1027 failed_detail
.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id
, e
))
1028 self
.logger
.debug(logging_text
+ failed_detail
[-1])
1030 failed_detail
.append("RO_ns_id={} delete error: {}".format(RO_nsr_id
, e
))
1031 self
.logger
.error(logging_text
+ failed_detail
[-1])
1034 RO_nsd_id
= nsr_lcm
["RO"]["nsd_id"]
1037 step
= db_nsr
["detailed-status"] = "Deleting nsd at RO"
1038 desc
= await RO
.delete("nsd", RO_nsd_id
)
1039 self
.logger
.debug(logging_text
+ "RO_nsd_id={} deleted".format(RO_nsd_id
))
1040 nsr_lcm
["RO"]["nsd_id"] = None
1041 except ROclient
.ROClientException
as e
:
1042 if e
.http_code
== 404: # not found
1043 nsr_lcm
["RO"]["nsd_id"] = None
1044 self
.logger
.debug(logging_text
+ "RO_nsd_id={} already deleted".format(RO_nsd_id
))
1045 elif e
.http_code
== 409: #conflict
1046 failed_detail
.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id
, e
))
1047 self
.logger
.debug(logging_text
+ failed_detail
[-1])
1049 failed_detail
.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id
, e
))
1050 self
.logger
.error(logging_text
+ failed_detail
[-1])
1052 for vnf_id
, RO_vnfd_id
in nsr_lcm
["RO"]["vnfd_id"].items():
1056 step
= db_nsr
["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id
)
1057 desc
= await RO
.delete("vnfd", RO_vnfd_id
)
1058 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} deleted".format(RO_vnfd_id
))
1059 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
1060 except ROclient
.ROClientException
as e
:
1061 if e
.http_code
== 404: # not found
1062 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
1063 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} already deleted ".format(RO_vnfd_id
))
1064 elif e
.http_code
== 409: #conflict
1065 failed_detail
.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id
, e
))
1066 self
.logger
.debug(logging_text
+ failed_detail
[-1])
1068 failed_detail
.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id
, e
))
1069 self
.logger
.error(logging_text
+ failed_detail
[-1])
1072 await asyncio
.wait(vca_task_list
, timeout
=300)
1073 for vnf_index
, task
in vca_task_dict
.items():
1074 if task
.cancelled():
1075 failed_detail
.append("VCA[{}] Deletion has been cancelled".format(vnf_index
))
1077 exc
= task
.exception()
1079 failed_detail
.append("VCA[{}] Deletion exception: {}".format(vnf_index
, exc
))
1081 nsr_lcm
["VCA"][vnf_index
] = None
1083 # TODO Should it be cancelled?!!
1085 failed_detail
.append("VCA[{}] Deletion timeout".format(vnf_index
))
1088 self
.logger
.error(logging_text
+ " ;".join(failed_detail
))
1090 "operational-status": "failed",
1091 "detailed-status": "Deletion errors " + "; ".join(failed_detail
),
1092 "_admin": {"deployed": nsr_lcm
, }
1094 db_nslcmop_update
= {
1095 "detailed-status": "; ".join(failed_detail
),
1096 "operationState": "FAILED",
1097 "statusEnteredTime": time()
1099 elif db_nslcmop
["operationParams"].get("autoremove"):
1100 self
.db
.del_one("nsrs", {"_id": nsr_id
})
1101 self
.db
.del_list("nslcmops", {"nsInstanceId": nsr_id
})
1102 self
.db
.del_list("vnfrs", {"nsr-id-ref": nsr_id
})
1105 "operational-status": "terminated",
1106 "detailed-status": "Done",
1107 "_admin": {"deployed": nsr_lcm
, "nsState": "NOT_INSTANTIATED"}
1109 db_nslcmop_update
= {
1110 "detailed-status": "Done",
1111 "operationState": "COMPLETED",
1112 "statusEnteredTime": time()
1114 self
.logger
.debug(logging_text
+ "Exit")
1116 except (ROclient
.ROClientException
, DbException
) as e
:
1117 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1119 except Exception as e
:
1120 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1123 if exc
and db_nslcmop
:
1124 db_nslcmop_update
= {
1125 "detailed-status": "FAILED {}: {}".format(step
, exc
),
1126 "operationState": "FAILED",
1127 "statusEnteredTime": time(),
1129 if db_nslcmop_update
:
1130 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
1132 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1134 async def ns_action(self
, nsr_id
, nslcmop_id
):
1135 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
1136 self
.logger
.debug(logging_text
+ "Enter")
1137 # get all needed from database
1140 db_nslcmop_update
= None
1143 step
= "Getting information from database"
1144 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1145 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1146 nsr_lcm
= db_nsr
["_admin"].get("deployed")
1147 vnf_index
= db_nslcmop
["operationParams"]["vnf_member_index"]
1149 #TODO check if ns is in a proper status
1150 vca_deployed
= nsr_lcm
["VCA"].get(vnf_index
)
1151 if not vca_deployed
:
1152 raise LcmException("charm for vnf_member_index={} is not deployed".format(vnf_index
))
1153 model_name
= vca_deployed
.get("model")
1154 application_name
= vca_deployed
.get("application")
1155 if not model_name
or not application_name
:
1156 raise LcmException("charm for vnf_member_index={} is not properly deployed".format(vnf_index
))
1157 if vca_deployed
["operational-status"] != "active":
1158 raise LcmException("charm for vnf_member_index={} operational_status={} not 'active'".format(
1159 vnf_index
, vca_deployed
["operational-status"]))
1160 primitive
= db_nslcmop
["operationParams"]["primitive"]
1161 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
1162 callback
= None # self.n2vc_callback
1163 callback_args
= () # [db_nsr, db_nslcmop, vnf_index, None]
1164 await self
.n2vc
.login()
1165 task
= asyncio
.ensure_future(
1166 self
.n2vc
.ExecutePrimitive(
1169 primitive
, callback
,
1174 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1175 # db_nsr, db_nslcmop, vnf_index))
1176 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1177 # wait until completed with timeout
1178 await asyncio
.wait((task
,), timeout
=300)
1180 result
= "FAILED" # by default
1182 if task
.cancelled():
1183 db_nslcmop
["detailed-status"] = "Task has been cancelled"
1185 exc
= task
.exception()
1187 result_detail
= str(exc
)
1189 self
.logger
.debug(logging_text
+ " task Done")
1190 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1191 result
= "COMPLETED"
1192 result_detail
= "Done"
1194 # TODO Should it be cancelled?!!
1196 result_detail
= "timeout"
1198 db_nslcmop_update
= {
1199 "detailed-status": result_detail
,
1200 "operationState": result
,
1201 "statusEnteredTime": time()
1203 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(result
, result_detail
))
1204 return # database update is called inside finally
1206 except (DbException
, LcmException
) as e
:
1207 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1209 except Exception as e
:
1210 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
1213 if exc
and db_nslcmop
:
1214 db_nslcmop_update
= {
1215 "detailed-status": "FAILED {}: {}".format(step
, exc
),
1216 "operationState": "FAILED",
1217 "statusEnteredTime": time(),
1219 if db_nslcmop_update
:
1220 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
1222 async def test(self
, param
=None):
1223 self
.logger
.debug("Starting/Ending test task: {}".format(param
))
1225 def cancel_tasks(self
, topic
, _id
):
1227 Cancel all active tasks of a concrete nsr or vim identified for _id
1228 :param topic: can be ns or vim_account
1229 :param _id: nsr or vim identity
1230 :return: None, or raises an exception if not possible
1233 lcm_tasks
= self
.lcm_ns_tasks
1234 elif topic
== "vim_account":
1235 lcm_tasks
= self
.lcm_vim_tasks
1237 lcm_tasks
= self
.lcm_sdn_tasks
1239 if not lcm_tasks
.get(_id
):
1241 for order_id
, tasks_set
in lcm_tasks
[_id
].items():
1242 for task_name
, task
in tasks_set
.items():
1243 result
= task
.cancel()
1245 self
.logger
.debug("{} _id={} order_id={} task={} cancelled".format(topic
, _id
, order_id
, task_name
))
1248 async def kafka_ping(self
):
1249 self
.logger
.debug("Task kafka_ping Enter")
1250 consecutive_errors
= 0
1252 kafka_has_received
= False
1253 self
.pings_not_received
= 1
1256 await self
.msg
.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self
.loop
)
1257 # time between pings are low when it is not received and at starting
1258 wait_time
= 5 if not kafka_has_received
else 120
1259 if not self
.pings_not_received
:
1260 kafka_has_received
= True
1261 self
.pings_not_received
+= 1
1262 await asyncio
.sleep(wait_time
, loop
=self
.loop
)
1263 if self
.pings_not_received
> 10:
1264 raise LcmException("It is not receiving pings from Kafka bus")
1265 consecutive_errors
= 0
1267 except LcmException
:
1269 except Exception as e
:
1270 # if not first_start is the first time after starting. So leave more time and wait
1271 # to allow kafka starts
1272 if consecutive_errors
== 8 if not first_start
else 30:
1273 self
.logger
.error("Task kafka_read task exit error too many errors. Exception: {}".format(e
))
1275 consecutive_errors
+= 1
1276 self
.logger
.error("Task kafka_read retrying after Exception {}".format(e
))
1277 wait_time
= 1 if not first_start
else 5
1278 await asyncio
.sleep(wait_time
, loop
=self
.loop
)
1280 async def kafka_read(self
):
1281 self
.logger
.debug("Task kafka_read Enter")
1283 # future = asyncio.Future()
1284 consecutive_errors
= 0
1286 while consecutive_errors
< 10:
1288 topics
= ("admin", "ns", "vim_account", "sdn")
1289 topic
, command
, params
= await self
.msg
.aioread(topics
, self
.loop
)
1290 self
.logger
.debug("Task kafka_read receives {} {}: {}".format(topic
, command
, params
))
1291 consecutive_errors
= 0
1294 if command
== "exit":
1297 elif command
.startswith("#"):
1299 elif command
== "echo":
1304 elif command
== "test":
1305 asyncio
.Task(self
.test(params
), loop
=self
.loop
)
1308 if topic
== "admin":
1309 if command
== "ping" and params
["to"] == "lcm" and params
["from"] == "lcm":
1310 self
.pings_not_received
= 0
1313 if command
== "instantiate":
1314 # self.logger.debug("Deploying NS {}".format(nsr_id))
1316 nslcmop_id
= nslcmop
["_id"]
1317 nsr_id
= nslcmop
["nsInstanceId"]
1318 task
= asyncio
.ensure_future(self
.ns_instantiate(nsr_id
, nslcmop_id
))
1319 if nsr_id
not in self
.lcm_ns_tasks
:
1320 self
.lcm_ns_tasks
[nsr_id
] = {}
1321 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_instantiate": task
}
1323 elif command
== "terminate":
1324 # self.logger.debug("Deleting NS {}".format(nsr_id))
1326 nslcmop_id
= nslcmop
["_id"]
1327 nsr_id
= nslcmop
["nsInstanceId"]
1328 self
.cancel_tasks(topic
, nsr_id
)
1329 task
= asyncio
.ensure_future(self
.ns_terminate(nsr_id
, nslcmop_id
))
1330 if nsr_id
not in self
.lcm_ns_tasks
:
1331 self
.lcm_ns_tasks
[nsr_id
] = {}
1332 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_terminate": task
}
1334 elif command
== "action":
1335 # self.logger.debug("Update NS {}".format(nsr_id))
1337 nslcmop_id
= nslcmop
["_id"]
1338 nsr_id
= nslcmop
["nsInstanceId"]
1339 task
= asyncio
.ensure_future(self
.ns_action(nsr_id
, nslcmop_id
))
1340 if nsr_id
not in self
.lcm_ns_tasks
:
1341 self
.lcm_ns_tasks
[nsr_id
] = {}
1342 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_action": task
}
1344 elif command
== "show":
1346 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1348 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
1349 "{}\n deploy: {}\n tasks: {}".format(
1350 nsr_id
, db_nsr
["operational-status"],
1351 db_nsr
["config-status"], db_nsr
["detailed-status"],
1352 db_nsr
["_admin"]["deployed"], self
.lcm_ns_tasks
.get(nsr_id
)))
1353 except Exception as e
:
1354 print("nsr {} not found: {}".format(nsr_id
, e
))
1357 elif command
== "deleted":
1358 continue # TODO cleaning of task just in case should be done
1359 elif topic
== "vim_account":
1360 vim_id
= params
["_id"]
1361 if command
== "create":
1362 task
= asyncio
.ensure_future(self
.vim_create(params
, order_id
))
1363 if vim_id
not in self
.lcm_vim_tasks
:
1364 self
.lcm_vim_tasks
[vim_id
] = {}
1365 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"vim_create": task
}
1367 elif command
== "delete":
1368 self
.cancel_tasks(topic
, vim_id
)
1369 task
= asyncio
.ensure_future(self
.vim_delete(vim_id
, order_id
))
1370 if vim_id
not in self
.lcm_vim_tasks
:
1371 self
.lcm_vim_tasks
[vim_id
] = {}
1372 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"vim_delete": task
}
1374 elif command
== "show":
1375 print("not implemented show with vim_account")
1378 elif command
== "edit":
1379 task
= asyncio
.ensure_future(self
.vim_edit(vim_id
, order_id
))
1380 if vim_id
not in self
.lcm_vim_tasks
:
1381 self
.lcm_vim_tasks
[vim_id
] = {}
1382 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"vim_edit": task
}
1384 elif topic
== "sdn":
1385 _sdn_id
= params
["_id"]
1386 if command
== "create":
1387 task
= asyncio
.ensure_future(self
.sdn_create(params
, order_id
))
1388 if _sdn_id
not in self
.lcm_sdn_tasks
:
1389 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1390 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"sdn_create": task
}
1392 elif command
== "delete":
1393 self
.cancel_tasks(topic
, _sdn_id
)
1394 task
= asyncio
.ensure_future(self
.sdn_delete(_sdn_id
, order_id
))
1395 if _sdn_id
not in self
.lcm_sdn_tasks
:
1396 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1397 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"sdn_delete": task
}
1399 elif command
== "edit":
1400 task
= asyncio
.ensure_future(self
.sdn_edit(_sdn_id
, order_id
))
1401 if _sdn_id
not in self
.lcm_sdn_tasks
:
1402 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1403 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"sdn_edit": task
}
1405 self
.logger
.critical("unknown topic {} and command '{}'".format(topic
, command
))
1406 except Exception as e
:
1407 # if not first_start is the first time after starting. So leave more time and wait
1408 # to allow kafka starts
1409 if consecutive_errors
== 8 if not first_start
else 30:
1410 self
.logger
.error("Task kafka_read task exit error too many errors. Exception: {}".format(e
))
1412 consecutive_errors
+= 1
1413 self
.logger
.error("Task kafka_read retrying after Exception {}".format(e
))
1414 wait_time
= 2 if not first_start
else 5
1415 await asyncio
.sleep(wait_time
, loop
=self
.loop
)
1417 # self.logger.debug("Task kafka_read terminating")
1418 self
.logger
.debug("Task kafka_read exit")
1421 self
.loop
= asyncio
.get_event_loop()
1422 self
.loop
.run_until_complete(asyncio
.gather(
1427 # self.logger.debug("Terminating cancelling creation tasks")
1428 # self.cancel_tasks("ALL", "create")
1430 # while self.is_pending_tasks():
1431 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1432 # await asyncio.sleep(2, loop=self.loop)
1435 # self.cancel_tasks("ALL", "ALL")
1439 self
.db
.db_disconnect()
1441 self
.msg
.disconnect()
1443 self
.fs
.fs_disconnect()
1446 def read_config_file(self
, config_file
):
1447 # TODO make a [ini] + yaml inside parser
1448 # the configparser library is not suitable, because it does not admit comments at the end of line,
1449 # and not parse integer or boolean
1451 with
open(config_file
) as f
:
1453 for k
, v
in environ
.items():
1454 if not k
.startswith("OSMLCM_"):
1456 k_items
= k
.lower().split("_")
1459 for k_item
in k_items
[1:-1]:
1460 if k_item
in ("ro", "vca"):
1461 # put in capital letter
1462 k_item
= k_item
.upper()
1464 if k_items
[-1] == "port":
1465 c
[k_items
[-1]] = int(v
)
1468 except Exception as e
:
1469 self
.logger
.warn("skipping environ '{}' on exception '{}'".format(k
, e
))
1472 except Exception as e
:
1473 self
.logger
.critical("At config file '{}': {}".format(config_file
, e
))
1477 if __name__
== '__main__':
1479 config_file
= "lcm.cfg"
1480 lcm
= Lcm(config_file
)