2 # -*- coding: utf-8 -*-
8 import logging
.handlers
12 from osm_common
import dbmemory
13 from osm_common
import dbmongo
14 from osm_common
import fslocal
15 from osm_common
import msglocal
16 from osm_common
import msgkafka
17 from osm_common
.dbbase
import DbException
18 from osm_common
.fsbase
import FsException
19 from osm_common
.msgbase
import MsgException
20 from os
import environ
, path
21 from n2vc
.vnf
import N2VC
22 from n2vc
import version
as N2VC_version
24 from copy
import deepcopy
25 from http
import HTTPStatus
29 __author__
= "Alfonso Tierno"
32 class LcmException(Exception):
38 def __init__(self
, config_file
):
40 Init, Connect to database, filesystem storage, and messaging
41 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
48 self
.pings_not_received
= 1
50 # contains created tasks/futures to be able to cancel
51 self
.lcm_ns_tasks
= {}
52 self
.lcm_vim_tasks
= {}
53 self
.lcm_sdn_tasks
= {}
55 self
.logger
= logging
.getLogger('lcm')
57 config
= self
.read_config_file(config_file
)
60 "endpoint_url": "http://{}:{}/openmano".format(config
["RO"]["host"], config
["RO"]["port"]),
61 "tenant": config
.get("tenant", "osm"),
62 "logger_name": "lcm.ROclient",
66 self
.vca
= config
["VCA"] # TODO VCA
70 log_format_simple
= "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
71 log_formatter_simple
= logging
.Formatter(log_format_simple
, datefmt
='%Y-%m-%dT%H:%M:%S')
72 config
["database"]["logger_name"] = "lcm.db"
73 config
["storage"]["logger_name"] = "lcm.fs"
74 config
["message"]["logger_name"] = "lcm.msg"
75 if "logfile" in config
["global"]:
76 file_handler
= logging
.handlers
.RotatingFileHandler(config
["global"]["logfile"],
77 maxBytes
=100e6
, backupCount
=9, delay
=0)
78 file_handler
.setFormatter(log_formatter_simple
)
79 self
.logger
.addHandler(file_handler
)
81 str_handler
= logging
.StreamHandler()
82 str_handler
.setFormatter(log_formatter_simple
)
83 self
.logger
.addHandler(str_handler
)
85 if config
["global"].get("loglevel"):
86 self
.logger
.setLevel(config
["global"]["loglevel"])
88 # logging other modules
89 for k1
, logname
in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
90 config
[k1
]["logger_name"] = logname
91 logger_module
= logging
.getLogger(logname
)
92 if "logfile" in config
[k1
]:
93 file_handler
= logging
.handlers
.RotatingFileHandler(config
[k1
]["logfile"],
94 maxBytes
=100e6
, backupCount
=9, delay
=0)
95 file_handler
.setFormatter(log_formatter_simple
)
96 logger_module
.addHandler(file_handler
)
97 if "loglevel" in config
[k1
]:
98 logger_module
.setLevel(config
[k1
]["loglevel"])
102 server
=config
['VCA']['host'],
103 port
=config
['VCA']['port'],
104 user
=config
['VCA']['user'],
105 secret
=config
['VCA']['secret'],
106 # TODO: This should point to the base folder where charms are stored,
107 # if there is a common one (like object storage). Otherwise, leave
108 # it unset and pass it via DeployCharms
109 # artifacts=config['VCA'][''],
112 # check version of N2VC
113 # TODO enhance with int conversion or from distutils.version import LooseVersion
114 # or with list(map(int, version.split(".")))
115 if N2VC_version
< "0.0.2":
116 raise LcmException("Not compatible osm/N2VC version '{}'. Needed '0.0.2' or higher".format(N2VC_version
))
118 if config
["database"]["driver"] == "mongo":
119 self
.db
= dbmongo
.DbMongo()
120 self
.db
.db_connect(config
["database"])
121 elif config
["database"]["driver"] == "memory":
122 self
.db
= dbmemory
.DbMemory()
123 self
.db
.db_connect(config
["database"])
125 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
126 config
["database"]["driver"]))
128 if config
["storage"]["driver"] == "local":
129 self
.fs
= fslocal
.FsLocal()
130 self
.fs
.fs_connect(config
["storage"])
132 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
133 config
["storage"]["driver"]))
135 if config
["message"]["driver"] == "local":
136 self
.msg
= msglocal
.MsgLocal()
137 self
.msg
.connect(config
["message"])
138 elif config
["message"]["driver"] == "kafka":
139 self
.msg
= msgkafka
.MsgKafka()
140 self
.msg
.connect(config
["message"])
142 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
143 config
["storage"]["driver"]))
144 except (DbException
, FsException
, MsgException
) as e
:
145 self
.logger
.critical(str(e
), exc_info
=True)
146 raise LcmException(str(e
))
148 def update_db(self
, item
, _id
, _desc
):
150 self
.db
.replace(item
, _id
, _desc
)
151 except DbException
as e
:
152 self
.logger
.error("Updating {} _id={}: {}".format(item
, _id
, e
))
154 def update_db_2(self
, item
, _id
, _desc
):
156 self
.db
.set_one(item
, {"_id": _id
}, _desc
)
157 except DbException
as e
:
158 self
.logger
.error("Updating {} _id={}: {}".format(item
, _id
, e
))
160 async def vim_create(self
, vim_content
, order_id
):
161 vim_id
= vim_content
["_id"]
162 logging_text
= "Task vim_create={} ".format(vim_id
)
163 self
.logger
.debug(logging_text
+ "Enter")
168 step
= "Getting vim-id='{}' from db".format(vim_id
)
169 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
170 if "_admin" not in db_vim
:
171 db_vim
["_admin"] = {}
172 if "deployed" not in db_vim
["_admin"]:
173 db_vim
["_admin"]["deployed"] = {}
174 db_vim
["_admin"]["deployed"]["RO"] = None
175 if vim_content
.get("config") and vim_content
["config"].get("sdn-controller"):
176 step
= "Getting sdn-controller-id='{}' from db".format(vim_content
["config"]["sdn-controller"])
177 db_sdn
= self
.db
.get_one("sdns", {"_id": vim_content
["config"]["sdn-controller"]})
178 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
179 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
181 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
182 vim_content
["config"]["sdn-controller"]))
184 step
= "Creating vim at RO"
185 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
186 vim_RO
= deepcopy(vim_content
)
187 vim_RO
.pop("_id", None)
188 vim_RO
.pop("_admin", None)
189 vim_RO
.pop("schema_version", None)
190 vim_RO
.pop("schema_type", None)
191 vim_RO
.pop("vim_tenant_name", None)
192 vim_RO
["type"] = vim_RO
.pop("vim_type")
193 vim_RO
.pop("vim_user", None)
194 vim_RO
.pop("vim_password", None)
196 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
197 desc
= await RO
.create("vim", descriptor
=vim_RO
)
198 RO_vim_id
= desc
["uuid"]
199 db_vim
["_admin"]["deployed"]["RO"] = RO_vim_id
200 self
.update_db("vim_accounts", vim_id
, db_vim
)
202 step
= "Creating vim_account at RO"
203 vim_account_RO
= {"vim_tenant_name": vim_content
["vim_tenant_name"],
204 "vim_username": vim_content
["vim_user"],
205 "vim_password": vim_content
["vim_password"]
207 if vim_RO
.get("config"):
208 vim_account_RO
["config"] = vim_RO
["config"]
209 if "sdn-controller" in vim_account_RO
["config"]:
210 del vim_account_RO
["config"]["sdn-controller"]
211 if "sdn-port-mapping" in vim_account_RO
["config"]:
212 del vim_account_RO
["config"]["sdn-port-mapping"]
213 await RO
.attach_datacenter(RO_vim_id
, descriptor
=vim_account_RO
)
214 db_vim
["_admin"]["operationalState"] = "ENABLED"
215 self
.update_db("vim_accounts", vim_id
, db_vim
)
217 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id".format(RO_vim_id
))
220 except (ROclient
.ROClientException
, DbException
) as e
:
221 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
223 except Exception as e
:
224 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
228 db_vim
["_admin"]["operationalState"] = "ERROR"
229 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
230 self
.update_db("vim_accounts", vim_id
, db_vim
)
232 async def vim_edit(self
, vim_content
, order_id
):
233 vim_id
= vim_content
["_id"]
234 logging_text
= "Task vim_edit={} ".format(vim_id
)
235 self
.logger
.debug(logging_text
+ "Enter")
239 step
= "Getting vim-id='{}' from db".format(vim_id
)
241 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
242 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
243 if vim_content
.get("config") and vim_content
["config"].get("sdn-controller"):
244 step
= "Getting sdn-controller-id='{}' from db".format(vim_content
["config"]["sdn-controller"])
245 db_sdn
= self
.db
.get_one("sdns", {"_id": vim_content
["config"]["sdn-controller"]})
246 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get(
248 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
250 raise LcmException("sdn-controller={} is not available. Not deployed at RO".format(
251 vim_content
["config"]["sdn-controller"]))
253 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
254 step
= "Editing vim at RO"
255 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
256 vim_RO
= deepcopy(vim_content
)
257 vim_RO
.pop("_id", None)
258 vim_RO
.pop("_admin", None)
259 vim_RO
.pop("schema_version", None)
260 vim_RO
.pop("schema_type", None)
261 vim_RO
.pop("vim_tenant_name", None)
262 if "vim_type" in vim_RO
:
263 vim_RO
["type"] = vim_RO
.pop("vim_type")
264 vim_RO
.pop("vim_user", None)
265 vim_RO
.pop("vim_password", None)
267 vim_RO
["config"]["sdn-controller"] = RO_sdn_id
268 # TODO make a deep update of sdn-port-mapping
270 await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
272 step
= "Editing vim-account at RO tenant"
274 if "config" in vim_content
:
275 if "sdn-controller" in vim_content
["config"]:
276 del vim_content
["config"]["sdn-controller"]
277 if "sdn-port-mapping" in vim_content
["config"]:
278 del vim_content
["config"]["sdn-port-mapping"]
279 if not vim_content
["config"]:
280 del vim_content
["config"]
281 for k
in ("vim_tenant_name", "vim_password", "config"):
283 vim_account_RO
[k
] = vim_content
[k
]
284 if "vim_user" in vim_content
:
285 vim_content
["vim_username"] = vim_content
["vim_user"]
286 # vim_account must be edited always even if empty in order to ensure changes are translated to RO
287 # vim_thread. RO will remove and relaunch a new thread for this vim_account
288 await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_account_RO
)
289 db_vim
["_admin"]["operationalState"] = "ENABLED"
290 self
.update_db("vim_accounts", vim_id
, db_vim
)
292 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id".format(RO_vim_id
))
295 except (ROclient
.ROClientException
, DbException
) as e
:
296 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
298 except Exception as e
:
299 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
303 db_vim
["_admin"]["operationalState"] = "ERROR"
304 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
305 self
.update_db("vim_accounts", vim_id
, db_vim
)
307 async def vim_delete(self
, vim_id
, order_id
):
308 logging_text
= "Task vim_delete={} ".format(vim_id
)
309 self
.logger
.debug(logging_text
+ "Enter")
312 step
= "Getting vim from db"
314 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
315 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
316 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
317 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
318 step
= "Detaching vim from RO tenant"
320 await RO
.detach_datacenter(RO_vim_id
)
321 except ROclient
.ROClientException
as e
:
322 if e
.http_code
== 404: # not found
323 self
.logger
.debug(logging_text
+ "RO_vim_id={} already detached".format(RO_vim_id
))
327 step
= "Deleting vim from RO"
329 await RO
.delete("vim", RO_vim_id
)
330 except ROclient
.ROClientException
as e
:
331 if e
.http_code
== 404: # not found
332 self
.logger
.debug(logging_text
+ "RO_vim_id={} already deleted".format(RO_vim_id
))
337 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
338 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
339 self
.logger
.debug("vim_delete task vim_id={} Exit Ok".format(vim_id
))
342 except (ROclient
.ROClientException
, DbException
) as e
:
343 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
345 except Exception as e
:
346 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
350 db_vim
["_admin"]["operationalState"] = "ERROR"
351 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
352 self
.update_db("vim_accounts", vim_id
, db_vim
)
354 async def sdn_create(self
, sdn_content
, order_id
):
355 sdn_id
= sdn_content
["_id"]
356 logging_text
= "Task sdn_create={} ".format(sdn_id
)
357 self
.logger
.debug(logging_text
+ "Enter")
361 step
= "Getting sdn from db"
362 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
363 if "_admin" not in db_sdn
:
364 db_sdn
["_admin"] = {}
365 if "deployed" not in db_sdn
["_admin"]:
366 db_sdn
["_admin"]["deployed"] = {}
367 db_sdn
["_admin"]["deployed"]["RO"] = None
369 step
= "Creating sdn at RO"
370 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
371 sdn_RO
= deepcopy(sdn_content
)
372 sdn_RO
.pop("_id", None)
373 sdn_RO
.pop("_admin", None)
374 sdn_RO
.pop("schema_version", None)
375 sdn_RO
.pop("schema_type", None)
376 sdn_RO
.pop("description", None)
377 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
378 RO_sdn_id
= desc
["uuid"]
379 db_sdn
["_admin"]["deployed"]["RO"] = RO_sdn_id
380 db_sdn
["_admin"]["operationalState"] = "ENABLED"
381 self
.update_db("sdns", sdn_id
, db_sdn
)
382 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id".format(RO_sdn_id
))
385 except (ROclient
.ROClientException
, DbException
) as e
:
386 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
388 except Exception as e
:
389 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
393 db_sdn
["_admin"]["operationalState"] = "ERROR"
394 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
395 self
.update_db("sdns", sdn_id
, db_sdn
)
397 async def sdn_edit(self
, sdn_content
, order_id
):
398 sdn_id
= sdn_content
["_id"]
399 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
400 self
.logger
.debug(logging_text
+ "Enter")
403 step
= "Getting sdn from db"
405 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
406 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
407 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
408 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
409 step
= "Editing sdn at RO"
410 sdn_RO
= deepcopy(sdn_content
)
411 sdn_RO
.pop("_id", None)
412 sdn_RO
.pop("_admin", None)
413 sdn_RO
.pop("schema_version", None)
414 sdn_RO
.pop("schema_type", None)
415 sdn_RO
.pop("description", None)
417 await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
418 db_sdn
["_admin"]["operationalState"] = "ENABLED"
419 self
.update_db("sdns", sdn_id
, db_sdn
)
421 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id".format(RO_sdn_id
))
424 except (ROclient
.ROClientException
, DbException
) as e
:
425 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
427 except Exception as e
:
428 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
432 db_sdn
["_admin"]["operationalState"] = "ERROR"
433 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
434 self
.update_db("sdns", sdn_id
, db_sdn
)
436 async def sdn_delete(self
, sdn_id
, order_id
):
437 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
438 self
.logger
.debug(logging_text
+ "Enter")
441 step
= "Getting sdn from db"
443 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
444 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
445 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
446 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
447 step
= "Deleting sdn from RO"
449 await RO
.delete("sdn", RO_sdn_id
)
450 except ROclient
.ROClientException
as e
:
451 if e
.http_code
== 404: # not found
452 self
.logger
.debug(logging_text
+ "RO_sdn_id={} already deleted".format(RO_sdn_id
))
457 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
458 self
.db
.del_one("sdns", {"_id": sdn_id
})
459 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
462 except (ROclient
.ROClientException
, DbException
) as e
:
463 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
465 except Exception as e
:
466 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
470 db_sdn
["_admin"]["operationalState"] = "ERROR"
471 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
472 self
.update_db("sdns", sdn_id
, db_sdn
)
474 def vnfd2RO(self
, vnfd
, new_id
=None):
476 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
477 :param vnfd: input vnfd
478 :param new_id: overrides vnf id if provided
479 :return: copy of vnfd
483 vnfd_RO
= deepcopy(vnfd
)
484 vnfd_RO
.pop("_id", None)
485 vnfd_RO
.pop("_admin", None)
487 vnfd_RO
["id"] = new_id
488 for vdu
in vnfd_RO
["vdu"]:
489 if "cloud-init-file" in vdu
:
490 base_folder
= vnfd
["_admin"]["storage"]
491 clout_init_file
= "{}/{}/cloud_init/{}".format(
492 base_folder
["folder"],
493 base_folder
["pkg-dir"],
494 vdu
["cloud-init-file"]
496 ci_file
= self
.fs
.file_open(clout_init_file
, "r")
497 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails
498 # convert to base 64 or similar
499 clout_init_content
= ci_file
.read()
502 vdu
.pop("cloud-init-file", None)
503 vdu
["cloud-init"] = clout_init_content
505 except FsException
as e
:
506 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd
["_id"], e
))
511 def n2vc_callback(self
, model_name
, application_name
, status
, message
, db_nsr
, db_nslcmop
, member_vnf_index
,
514 Callback both for charm status change and task completion
515 :param model_name: Charm model name
516 :param application_name: Charm application name
517 :param status: Can be
518 - blocked: The unit needs manual intervention
519 - maintenance: The unit is actively deploying/configuring
520 - waiting: The unit is waiting for another charm to be ready
521 - active: The unit is deployed, configured, and ready
522 - error: The charm has failed and needs attention.
523 - terminated: The charm has been destroyed
526 :param message: detailed message error
527 :param db_nsr: nsr database content
528 :param db_nslcmop: nslcmop database content
529 :param member_vnf_index: NSD member-vnf-index
530 :param task: None for charm status change, or task for completion task callback
535 update_nsr
= update_nslcmop
= False
537 nsr_id
= db_nsr
["_id"]
538 nslcmop_id
= db_nslcmop
["_id"]
539 nsr_lcm
= db_nsr
["_admin"]["deployed"]
540 ns_action
= db_nslcmop
["lcmOperationType"]
541 logging_text
= "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id
, ns_action
, nslcmop_id
,
546 self
.logger
.debug(logging_text
+ " task Cancelled")
547 # TODO update db_nslcmop
551 exc
= task
.exception()
553 self
.logger
.error(logging_text
+ " task Exception={}".format(exc
))
554 if ns_action
in ("instantiate", "terminate"):
555 nsr_lcm
["VCA"][member_vnf_index
]['operational-status'] = "error"
556 nsr_lcm
["VCA"][member_vnf_index
]['detailed-status'] = str(exc
)
557 elif ns_action
== "action":
558 db_nslcmop
["operationState"] = "FAILED"
559 db_nslcmop
["detailed-status"] = str(exc
)
560 db_nslcmop
["statusEnteredTime"] = time()
561 update_nslcmop
= True
565 self
.logger
.debug(logging_text
+ " task Done")
566 # TODO revise with Adam if action is finished and ok when task is done
567 if ns_action
== "action":
568 db_nslcmop
["operationState"] = "COMPLETED"
569 db_nslcmop
["detailed-status"] = "Done"
570 db_nslcmop
["statusEnteredTime"] = time()
571 update_nslcmop
= True
572 # task is Done, but callback is still ongoing. So ignore
575 self
.logger
.debug(logging_text
+ " Enter status={}".format(status
))
576 if nsr_lcm
["VCA"][member_vnf_index
]['operational-status'] == status
:
577 return # same status, ignore
578 nsr_lcm
["VCA"][member_vnf_index
]['operational-status'] = status
579 nsr_lcm
["VCA"][member_vnf_index
]['detailed-status'] = str(message
)
581 self
.logger
.critical(logging_text
+ " Enter with bad parameters", exc_info
=True)
586 n2vc_error_text
= [] # contain text error list. If empty no one is in error status
587 for vnf_index
, vca_info
in nsr_lcm
["VCA"].items():
588 vca_status
= vca_info
["operational-status"]
589 if vca_status
not in status_map
:
591 status_map
[vca_status
] = 0
592 status_map
[vca_status
] += 1
594 if vca_status
!= "active":
596 elif vca_status
in ("error", "blocked"):
597 n2vc_error_text
.append("member_vnf_index={} {}: {}".format(member_vnf_index
, vca_status
,
598 vca_info
["detailed-status"]))
601 self
.logger
.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id
,
603 db_nsr
["config-status"] = "configured"
604 db_nsr
["detailed-status"] = "done"
605 db_nslcmop
["operationState"] = "COMPLETED"
606 db_nslcmop
["detailed-status"] = "Done"
607 db_nslcmop
["statusEnteredTime"] = time()
608 elif n2vc_error_text
:
609 db_nsr
["config-status"] = "failed"
610 error_text
= "fail configuring " + ";".join(n2vc_error_text
)
611 db_nsr
["detailed-status"] = error_text
612 db_nslcmop
["operationState"] = "FAILED_TEMP"
613 db_nslcmop
["detailed-status"] = error_text
614 db_nslcmop
["statusEnteredTime"] = time()
618 for status
, num
in status_map
.items():
619 cs
+= separator
+ "{}: {}".format(status
, num
)
621 db_nsr
["config-status"] = cs
622 db_nsr
["detailed-status"] = cs
623 db_nslcmop
["detailed-status"] = cs
624 update_nsr
= update_nslcmop
= True
626 except Exception as e
:
627 self
.logger
.critical("[n2vc_callback] vnf_index={} Exception {}".format(member_vnf_index
, e
), exc_info
=True)
631 self
.update_db("nslcmops", nslcmop_id
, db_nslcmop
)
633 self
.update_db("nsrs", nsr_id
, db_nsr
)
634 except Exception as e
:
635 self
.logger
.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
636 member_vnf_index
, e
), exc_info
=True)
638 def ns_params_2_RO(self
, ns_params
):
640 Creates a RO ns descriptor from OSM ns_instantite params
641 :param ns_params: OSM instantiate params
642 :return: The RO ns descriptor
646 def vim_account_2_RO(vim_account
):
647 if vim_account
in vim_2_RO
:
648 return vim_2_RO
[vim_account
]
649 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
650 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
651 # #TODO check if VIM is creating and wait
652 if db_vim
["_admin"]["operationalState"] != "ENABLED":
653 raise LcmException("VIM={} is not available. operationalState={}".format(
654 vim_account
, db_vim
["_admin"]["operationalState"]))
655 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
656 vim_2_RO
[vim_account
] = RO_vim_id
662 # "name": ns_params["nsName"],
663 # "description": ns_params.get("nsDescription"),
664 "datacenter": vim_account_2_RO(ns_params
["vimAccountId"]),
665 # "scenario": ns_params["nsdId"],
669 if ns_params
.get("ssh-authorized-key"):
670 RO_ns_params
["cloud-config"] = {"key-pairs": ns_params
["ssh-authorized-key"]}
671 if ns_params
.get("vnf"):
672 for vnf
in ns_params
["vnf"]:
674 if "vimAccountId" in vnf
:
675 RO_vnf
["datacenter"] = vim_account_2_RO(vnf
["vimAccountId"])
677 RO_ns_params
["vnfs"][vnf
["member-vnf-index"]] = RO_vnf
678 if ns_params
.get("vld"):
679 for vld
in ns_params
["vld"]:
681 if "ip-profile" in vld
:
682 RO_vld
["ip-profile"] = vld
["ip-profile"]
683 if "vim-network-name" in vld
:
685 if isinstance(vld
["vim-network-name"], dict):
686 for vim_account
, vim_net
in vld
["vim-network-name"].items():
687 RO_vld
["sites"].append({
688 "netmap-use": vim_net
,
689 "datacenter": vim_account_2_RO(vim_account
)
691 else: # isinstance str
692 RO_vld
["sites"].append({"netmap-use": vld
["vim-network-name"]})
694 RO_ns_params
["networks"][vld
["name"]] = RO_vld
697 async def ns_instantiate(self
, nsr_id
, nslcmop_id
):
698 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
699 self
.logger
.debug(logging_text
+ "Enter")
700 # get all needed from database
706 step
= "Getting nslcmop={} from db".format(nslcmop_id
)
707 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
708 step
= "Getting nsr={} from db".format(nsr_id
)
709 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
711 nsr_name
= db_nsr
["name"] # TODO short-name??
713 vnfr_filter
= {"nsr-id-ref": nsr_id
, "member-vnf-index-ref": None}
714 for c_vnf
in nsd
["constituent-vnfd"]:
715 vnfd_id
= c_vnf
["vnfd-id-ref"]
716 vnfr_filter
["member-vnf-index-ref"] = c_vnf
["member-vnf-index"]
717 db_vnfr
[c_vnf
["member-vnf-index"]] = self
.db
.get_one("vnfrs", vnfr_filter
)
718 if vnfd_id
not in needed_vnfd
:
719 step
= "Getting vnfd={} from db".format(vnfd_id
)
720 needed_vnfd
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
722 nsr_lcm
= db_nsr
["_admin"].get("deployed")
724 nsr_lcm
= db_nsr
["_admin"]["deployed"] = {
726 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
730 db_nsr
["detailed-status"] = "creating"
731 db_nsr
["operational-status"] = "init"
733 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
735 # get vnfds, instantiate at RO
736 for vnfd_id
, vnfd
in needed_vnfd
.items():
737 step
= db_nsr
["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id
)
738 # self.logger.debug(logging_text + step)
739 vnfd_id_RO
= nsr_id
+ "." + vnfd_id
[:200]
742 vnfd_list
= await RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
744 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = vnfd_list
[0]["uuid"]
745 self
.logger
.debug(logging_text
+ "vnfd={} exists at RO. Using RO_id={}".format(
746 vnfd_id
, vnfd_list
[0]["uuid"]))
748 vnfd_RO
= self
.vnfd2RO(vnfd
, vnfd_id_RO
)
749 desc
= await RO
.create("vnfd", descriptor
=vnfd_RO
)
750 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = desc
["uuid"]
751 db_nsr
["_admin"]["nsState"] = "INSTANTIATED"
752 self
.logger
.debug(logging_text
+ "vnfd={} created at RO. RO_id={}".format(
753 vnfd_id
, desc
["uuid"]))
754 self
.update_db("nsrs", nsr_id
, db_nsr
)
758 step
= db_nsr
["detailed-status"] = "Creating nsd={} at RO".format(nsd_id
)
759 # self.logger.debug(logging_text + step)
761 nsd_id_RO
= nsr_id
+ "." + nsd_id
[:200]
762 nsd_list
= await RO
.get_list("nsd", filter_by
={"osm_id": nsd_id_RO
})
764 nsr_lcm
["RO"]["nsd_id"] = nsd_list
[0]["uuid"]
765 self
.logger
.debug(logging_text
+ "nsd={} exists at RO. Using RO_id={}".format(
766 nsd_id
, nsd_list
[0]["uuid"]))
768 nsd_RO
= deepcopy(nsd
)
769 nsd_RO
["id"] = nsd_id_RO
770 nsd_RO
.pop("_id", None)
771 nsd_RO
.pop("_admin", None)
772 for c_vnf
in nsd_RO
["constituent-vnfd"]:
773 vnfd_id
= c_vnf
["vnfd-id-ref"]
774 c_vnf
["vnfd-id-ref"] = nsr_id
+ "." + vnfd_id
[:200]
775 desc
= await RO
.create("nsd", descriptor
=nsd_RO
)
776 db_nsr
["_admin"]["nsState"] = "INSTANTIATED"
777 nsr_lcm
["RO"]["nsd_id"] = desc
["uuid"]
778 self
.logger
.debug(logging_text
+ "nsd={} created at RO. RO_id={}".format(nsd_id
, desc
["uuid"]))
779 self
.update_db("nsrs", nsr_id
, db_nsr
)
782 # if present use it unless in error status
783 RO_nsr_id
= nsr_lcm
["RO"].get("nsr_id")
786 step
= db_nsr
["detailed-status"] = "Looking for existing ns at RO"
787 # self.logger.debug(logging_text + step + " RO_ns_id={}".format(RO_nsr_id))
788 desc
= await RO
.show("ns", RO_nsr_id
)
789 except ROclient
.ROClientException
as e
:
790 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
792 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
794 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
795 nsr_lcm
["RO"]["nsr_status"] = ns_status
796 if ns_status
== "ERROR":
797 step
= db_nsr
["detailed-status"] = "Deleting ns at RO. RO_ns_id={}".format(RO_nsr_id
)
798 self
.logger
.debug(logging_text
+ step
)
799 await RO
.delete("ns", RO_nsr_id
)
800 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
802 step
= db_nsr
["detailed-status"] = "Creating ns at RO"
803 # self.logger.debug(logging_text + step)
804 RO_ns_params
= self
.ns_params_2_RO(db_nsr
.get("instantiate_params"))
805 desc
= await RO
.create("ns", descriptor
=RO_ns_params
,
807 scenario
=nsr_lcm
["RO"]["nsd_id"])
808 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = desc
["uuid"]
809 db_nsr
["_admin"]["nsState"] = "INSTANTIATED"
810 nsr_lcm
["RO"]["nsr_status"] = "BUILD"
811 self
.logger
.debug(logging_text
+ "ns created at RO. RO_id={}".format(desc
["uuid"]))
812 self
.update_db("nsrs", nsr_id
, db_nsr
)
814 # update VNFR vimAccount
815 step
= "Updating VNFR vimAcccount"
816 for vnf_index
, vnfr
in db_vnfr
.items():
817 if vnfr
.get("vim-account-id"):
819 vnfr
["vim-account-id"] = db_nsr
["instantiate_params"]["vimAccountId"]
820 if db_nsr
["instantiate_params"].get("vnf"):
821 for vnf_params
in db_nsr
["instantiate_params"]["vnf"]:
822 if vnf_params
.get("member-vnf-index") == vnf_index
:
823 if vnf_params
.get("vimAccountId"):
824 vnfr
["vim-account-id"] = vnf_params
.get("vimAccountId")
826 self
.update_db("vnfrs", vnfr
["_id"], vnfr
)
828 # wait until NS is ready
829 step
= ns_status_detailed
= "Waiting ns ready at RO. RO_id={}".format(RO_nsr_id
)
830 db_nsr
["detailed-status"] = ns_status_detailed
831 self
.logger
.debug(logging_text
+ step
)
832 deployment_timeout
= 2 * 3600 # Two hours
833 while deployment_timeout
> 0:
834 desc
= await RO
.show("ns", RO_nsr_id
)
835 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
836 nsr_lcm
["RO"]["nsr_status"] = ns_status
837 if ns_status
== "ERROR":
838 raise ROclient
.ROClientException(ns_status_info
)
839 elif ns_status
== "BUILD":
840 db_nsr_detailed_status_old
= db_nsr
["detailed-status"]
841 db_nsr
["detailed-status"] = ns_status_detailed
+ "; {}".format(ns_status_info
)
842 if db_nsr_detailed_status_old
!= db_nsr
["detailed-status"]:
843 self
.update_db("nsrs", nsr_id
, db_nsr
)
844 elif ns_status
== "ACTIVE":
845 step
= "Waiting for management IP address from VIM"
847 ns_RO_info
= nsr_lcm
["nsr_ip"] = RO
.get_ns_vnf_info(desc
)
849 except ROclient
.ROClientException
as e
:
850 if e
.http_code
!= 409: # IP address is not ready return code is 409 CONFLICT
853 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
854 await asyncio
.sleep(5, loop
=self
.loop
)
855 deployment_timeout
-= 5
856 if deployment_timeout
<= 0:
857 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
859 step
= "Updating VNFRs"
860 for vnf_index
, vnfr_deployed
in ns_RO_info
.items():
861 vnfr
= db_vnfr
[vnf_index
]
862 vnfr
["ip-address"] = vnfr_deployed
.get("ip_address")
863 for vdu_id
, vdu_deployed
in vnfr_deployed
["vdur"].items():
864 for vdur
in vnfr
["vdur"]:
865 if vdur
["vdu-id-ref"] == vdu_id
:
866 vdur
["vim-id"] = vdu_deployed
.get("vim_id")
867 vdur
["ip-address"] = vdu_deployed
.get("ip_address")
869 self
.update_db("vnfrs", vnfr
["_id"], vnfr
)
871 db_nsr
["detailed-status"] = "Configuring vnfr"
872 self
.update_db("nsrs", nsr_id
, db_nsr
)
874 # The parameters we'll need to deploy a charm
875 number_to_configure
= 0
878 """An inner function to deploy the charm from either vnf or vdu
882 # if number_to_configure == 0:
883 # self.logger.debug("Logging into N2VC...")
884 # task = asyncio.ensure_future(self.n2vc.login())
885 # yield from asyncio.wait_for(task, 30.0)
886 # self.logger.debug("Logged into N2VC!")
888 # # await self.n2vc.login()
890 # Note: The charm needs to exist on disk at the location
891 # specified by charm_path.
892 base_folder
= vnfd
["_admin"]["storage"]
893 storage_params
= self
.fs
.get_params()
894 charm_path
= "{}{}/{}/charms/{}".format(
895 storage_params
["path"],
896 base_folder
["folder"],
897 base_folder
["pkg-dir"],
901 # Setup the runtime parameters for this VNF
902 params
['rw_mgmt_ip'] = db_vnfr
[vnf_index
]["ip-address"]
904 # ns_name will be ignored in the current version of N2VC
905 # but will be implemented for the next point release.
906 model_name
= 'default'
907 application_name
= self
.n2vc
.FormatApplicationName(
913 nsr_lcm
["VCA"][vnf_index
] = {
915 "application": application_name
,
916 "operational-status": "init",
917 "detailed-status": "",
921 self
.logger
.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id
, charm_path
,
923 task
= asyncio
.ensure_future(
924 self
.n2vc
.DeployCharms(
925 model_name
, # The network service name
926 application_name
, # The application name
927 vnfd
, # The vnf descriptor
928 charm_path
, # Path to charm
929 params
, # Runtime params, like mgmt ip
930 {}, # for native charms only
931 self
.n2vc_callback
, # Callback for status changes
932 db_nsr
, # Callback parameter
934 vnf_index
, # Callback parameter
935 None, # Callback parameter (task)
938 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, model_name
, application_name
, None, None,
939 db_nsr
, db_nslcmop
, vnf_index
))
940 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
]["create_charm:" + vnf_index
] = task
942 # TODO: Make this call inside deploy()
943 # Login to the VCA. If there are multiple calls to login(),
944 # subsequent calls will be a nop and return immediately.
945 await self
.n2vc
.login()
947 step
= "Looking for needed vnfd to configure"
948 self
.logger
.debug(logging_text
+ step
)
949 for c_vnf
in nsd
["constituent-vnfd"]:
950 vnfd_id
= c_vnf
["vnfd-id-ref"]
951 vnf_index
= str(c_vnf
["member-vnf-index"])
952 vnfd
= needed_vnfd
[vnfd_id
]
954 # Check if this VNF has a charm configuration
955 vnf_config
= vnfd
.get("vnf-configuration")
957 if vnf_config
and vnf_config
.get("juju"):
958 proxy_charm
= vnf_config
["juju"]["charm"]
962 if 'initial-config-primitive' in vnf_config
:
963 params
['initial-config-primitive'] = vnf_config
['initial-config-primitive']
966 number_to_configure
+= 1
968 # Deploy charms for each VDU that supports one.
969 for vdu
in vnfd
['vdu']:
970 vdu_config
= vdu
.get('vdu-configuration')
974 if vdu_config
and vdu_config
.get("juju"):
975 proxy_charm
= vdu_config
["juju"]["charm"]
977 if 'initial-config-primitive' in vdu_config
:
978 params
['initial-config-primitive'] = vdu_config
['initial-config-primitive']
982 number_to_configure
+= 1
984 if number_to_configure
:
985 db_nsr
["config-status"] = "configuring"
986 db_nsr
["detailed-status"] = "configuring: init: {}".format(number_to_configure
)
987 db_nslcmop
["detailed-status"] = "configuring: init: {}".format(number_to_configure
)
989 db_nslcmop
["operationState"] = "COMPLETED"
990 db_nslcmop
["statusEnteredTime"] = time()
991 db_nslcmop
["detailed-status"] = "done"
992 db_nsr
["config-status"] = "configured"
993 db_nsr
["detailed-status"] = "done"
994 db_nsr
["operational-status"] = "running"
995 self
.update_db("nsrs", nsr_id
, db_nsr
)
996 self
.update_db("nslcmops", nslcmop_id
, db_nslcmop
)
997 self
.logger
.debug("Task ns_instantiate={} Exit Ok".format(nsr_id
))
1000 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
1001 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(step
, e
))
1003 except Exception as e
:
1004 self
.logger
.critical(logging_text
+ "Exit Exception {} while '{}': {}".format(type(e
).__name
__, step
, e
),
1010 db_nsr
["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
1011 db_nsr
["operational-status"] = "failed"
1012 self
.update_db("nsrs", nsr_id
, db_nsr
)
1014 db_nslcmop
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
1015 db_nslcmop
["operationState"] = "FAILED"
1016 db_nslcmop
["statusEnteredTime"] = time()
1017 self
.update_db("nslcmops", nslcmop_id
, db_nslcmop
)
1019 async def ns_terminate(self
, nsr_id
, nslcmop_id
):
1020 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
1021 self
.logger
.debug(logging_text
+ "Enter")
1025 step
= "Getting nsr, nslcmop from db"
1026 failed_detail
= [] # annotates all failed error messages
1030 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1031 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1032 # nsd = db_nsr["nsd"]
1033 nsr_lcm
= deepcopy(db_nsr
["_admin"]["deployed"])
1034 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
1037 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
1038 # #TODO check if VIM is creating and wait
1039 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
1042 "operational-status": "terminating",
1043 "config-status": "terminating",
1044 "detailed-status": "Deleting charms",
1046 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1049 self
.logger
.debug(logging_text
+ step
)
1050 for vnf_index
, deploy_info
in nsr_lcm
["VCA"].items():
1051 if deploy_info
and deploy_info
.get("application"):
1052 task
= asyncio
.ensure_future(
1053 self
.n2vc
.RemoveCharms(
1054 deploy_info
['model'],
1055 deploy_info
['application'],
1056 # self.n2vc_callback,
1062 vca_task_list
.append(task
)
1063 vca_task_dict
[vnf_index
] = task
1064 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
1065 # deploy_info['application'], None, db_nsr,
1066 # db_nslcmop, vnf_index))
1067 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
]["delete_charm:" + vnf_index
] = task
1068 except Exception as e
:
1069 self
.logger
.debug(logging_text
+ "Failed while deleting charms: {}".format(e
))
1072 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
1074 RO_nsr_id
= nsr_lcm
["RO"].get("nsr_id")
1077 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
1078 self
.logger
.debug(logging_text
+ step
)
1079 await RO
.delete("ns", RO_nsr_id
)
1080 nsr_lcm
["RO"]["nsr_id"] = None
1081 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
1082 except ROclient
.ROClientException
as e
:
1083 if e
.http_code
== 404: # not found
1084 nsr_lcm
["RO"]["nsr_id"] = None
1085 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
1086 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(RO_nsr_id
))
1087 elif e
.http_code
== 409: # conflict
1088 failed_detail
.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id
, e
))
1089 self
.logger
.debug(logging_text
+ failed_detail
[-1])
1091 failed_detail
.append("RO_ns_id={} delete error: {}".format(RO_nsr_id
, e
))
1092 self
.logger
.error(logging_text
+ failed_detail
[-1])
1095 RO_nsd_id
= nsr_lcm
["RO"]["nsd_id"]
1098 step
= db_nsr
["detailed-status"] = "Deleting nsd at RO"
1099 await RO
.delete("nsd", RO_nsd_id
)
1100 self
.logger
.debug(logging_text
+ "RO_nsd_id={} deleted".format(RO_nsd_id
))
1101 nsr_lcm
["RO"]["nsd_id"] = None
1102 except ROclient
.ROClientException
as e
:
1103 if e
.http_code
== 404: # not found
1104 nsr_lcm
["RO"]["nsd_id"] = None
1105 self
.logger
.debug(logging_text
+ "RO_nsd_id={} already deleted".format(RO_nsd_id
))
1106 elif e
.http_code
== 409: # conflict
1107 failed_detail
.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id
, e
))
1108 self
.logger
.debug(logging_text
+ failed_detail
[-1])
1110 failed_detail
.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id
, e
))
1111 self
.logger
.error(logging_text
+ failed_detail
[-1])
1113 for vnf_id
, RO_vnfd_id
in nsr_lcm
["RO"]["vnfd_id"].items():
1117 step
= db_nsr
["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id
)
1118 await RO
.delete("vnfd", RO_vnfd_id
)
1119 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} deleted".format(RO_vnfd_id
))
1120 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
1121 except ROclient
.ROClientException
as e
:
1122 if e
.http_code
== 404: # not found
1123 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
1124 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} already deleted ".format(RO_vnfd_id
))
1125 elif e
.http_code
== 409: # conflict
1126 failed_detail
.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id
, e
))
1127 self
.logger
.debug(logging_text
+ failed_detail
[-1])
1129 failed_detail
.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id
, e
))
1130 self
.logger
.error(logging_text
+ failed_detail
[-1])
1133 await asyncio
.wait(vca_task_list
, timeout
=300)
1134 for vnf_index
, task
in vca_task_dict
.items():
1135 if task
.cancelled():
1136 failed_detail
.append("VCA[{}] Deletion has been cancelled".format(vnf_index
))
1138 exc
= task
.exception()
1140 failed_detail
.append("VCA[{}] Deletion exception: {}".format(vnf_index
, exc
))
1142 nsr_lcm
["VCA"][vnf_index
] = None
1144 # TODO Should it be cancelled?!!
1146 failed_detail
.append("VCA[{}] Deletion timeout".format(vnf_index
))
1149 self
.logger
.error(logging_text
+ " ;".join(failed_detail
))
1151 "operational-status": "failed",
1152 "detailed-status": "Deletion errors " + "; ".join(failed_detail
),
1153 "_admin.deployed": nsr_lcm
1155 db_nslcmop_update
= {
1156 "detailed-status": "; ".join(failed_detail
),
1157 "operationState": "FAILED",
1158 "statusEnteredTime": time()
1160 elif db_nslcmop
["operationParams"].get("autoremove"):
1161 self
.db
.del_one("nsrs", {"_id": nsr_id
})
1162 self
.db
.del_list("nslcmops", {"nsInstanceId": nsr_id
})
1163 self
.db
.del_list("vnfrs", {"nsr-id-ref": nsr_id
})
1164 self
.logger
.debug(logging_text
+ "Delete from database")
1167 "operational-status": "terminated",
1168 "detailed-status": "Done",
1169 "_admin.deployed": nsr_lcm
,
1170 "_admin.nsState": "NOT_INSTANTIATED"
1172 db_nslcmop_update
= {
1173 "detailed-status": "Done",
1174 "operationState": "COMPLETED",
1175 "statusEnteredTime": time()
1177 self
.logger
.debug(logging_text
+ "Exit")
1179 except (ROclient
.ROClientException
, DbException
) as e
:
1180 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1182 except Exception as e
:
1183 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1186 if exc
and db_nslcmop
:
1187 db_nslcmop_update
= {
1188 "detailed-status": "FAILED {}: {}".format(step
, exc
),
1189 "operationState": "FAILED",
1190 "statusEnteredTime": time(),
1192 if db_nslcmop_update
:
1193 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
1195 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1197 async def ns_action(self
, nsr_id
, nslcmop_id
):
1198 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
1199 self
.logger
.debug(logging_text
+ "Enter")
1200 # get all needed from database
1203 db_nslcmop_update
= None
1206 step
= "Getting information from database"
1207 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1208 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1209 nsr_lcm
= db_nsr
["_admin"].get("deployed")
1210 vnf_index
= db_nslcmop
["operationParams"]["member_vnf_index"]
1212 # TODO check if ns is in a proper status
1213 vca_deployed
= nsr_lcm
["VCA"].get(vnf_index
)
1214 if not vca_deployed
:
1215 raise LcmException("charm for member_vnf_index={} is not deployed".format(vnf_index
))
1216 model_name
= vca_deployed
.get("model")
1217 application_name
= vca_deployed
.get("application")
1218 if not model_name
or not application_name
:
1219 raise LcmException("charm for member_vnf_index={} is not properly deployed".format(vnf_index
))
1220 if vca_deployed
["operational-status"] != "active":
1221 raise LcmException("charm for member_vnf_index={} operational_status={} not 'active'".format(
1222 vnf_index
, vca_deployed
["operational-status"]))
1223 primitive
= db_nslcmop
["operationParams"]["primitive"]
1224 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
1225 callback
= None # self.n2vc_callback
1226 callback_args
= () # [db_nsr, db_nslcmop, vnf_index, None]
1227 await self
.n2vc
.login()
1228 task
= asyncio
.ensure_future(
1229 self
.n2vc
.ExecutePrimitive(
1232 primitive
, callback
,
1237 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1238 # db_nsr, db_nslcmop, vnf_index))
1239 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1240 # wait until completed with timeout
1241 await asyncio
.wait((task
,), timeout
=300)
1243 result
= "FAILED" # by default
1245 if task
.cancelled():
1246 db_nslcmop
["detailed-status"] = "Task has been cancelled"
1248 exc
= task
.exception()
1250 result_detail
= str(exc
)
1252 self
.logger
.debug(logging_text
+ " task Done")
1253 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1254 result
= "COMPLETED"
1255 result_detail
= "Done"
1257 # TODO Should it be cancelled?!!
1259 result_detail
= "timeout"
1261 db_nslcmop_update
= {
1262 "detailed-status": result_detail
,
1263 "operationState": result
,
1264 "statusEnteredTime": time()
1266 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(result
, result_detail
))
1267 return # database update is called inside finally
1269 except (DbException
, LcmException
) as e
:
1270 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1272 except Exception as e
:
1273 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
1276 if exc
and db_nslcmop
:
1277 db_nslcmop_update
= {
1278 "detailed-status": "FAILED {}: {}".format(step
, exc
),
1279 "operationState": "FAILED",
1280 "statusEnteredTime": time(),
1282 if db_nslcmop_update
:
1283 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
1285 async def test(self
, param
=None):
1286 self
.logger
.debug("Starting/Ending test task: {}".format(param
))
1288 def cancel_tasks(self
, topic
, _id
):
1290 Cancel all active tasks of a concrete nsr or vim identified for _id
1291 :param topic: can be ns or vim_account
1292 :param _id: nsr or vim identity
1293 :return: None, or raises an exception if not possible
1296 lcm_tasks
= self
.lcm_ns_tasks
1297 elif topic
== "vim_account":
1298 lcm_tasks
= self
.lcm_vim_tasks
1299 elif topic
== "sdn":
1300 lcm_tasks
= self
.lcm_sdn_tasks
1302 if not lcm_tasks
.get(_id
):
1304 for order_id
, tasks_set
in lcm_tasks
[_id
].items():
1305 for task_name
, task
in tasks_set
.items():
1306 result
= task
.cancel()
1308 self
.logger
.debug("{} _id={} order_id={} task={} cancelled".format(topic
, _id
, order_id
, task_name
))
1311 async def kafka_ping(self
):
1312 self
.logger
.debug("Task kafka_ping Enter")
1313 consecutive_errors
= 0
1315 kafka_has_received
= False
1316 self
.pings_not_received
= 1
1319 await self
.msg
.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self
.loop
)
1320 # time between pings are low when it is not received and at starting
1321 wait_time
= 5 if not kafka_has_received
else 120
1322 if not self
.pings_not_received
:
1323 kafka_has_received
= True
1324 self
.pings_not_received
+= 1
1325 await asyncio
.sleep(wait_time
, loop
=self
.loop
)
1326 if self
.pings_not_received
> 10:
1327 raise LcmException("It is not receiving pings from Kafka bus")
1328 consecutive_errors
= 0
1330 except LcmException
:
1332 except Exception as e
:
1333 # if not first_start is the first time after starting. So leave more time and wait
1334 # to allow kafka starts
1335 if consecutive_errors
== 8 if not first_start
else 30:
1336 self
.logger
.error("Task kafka_read task exit error too many errors. Exception: {}".format(e
))
1338 consecutive_errors
+= 1
1339 self
.logger
.error("Task kafka_read retrying after Exception {}".format(e
))
1340 wait_time
= 1 if not first_start
else 5
1341 await asyncio
.sleep(wait_time
, loop
=self
.loop
)
1343 async def kafka_read(self
):
1344 self
.logger
.debug("Task kafka_read Enter")
1346 # future = asyncio.Future()
1347 consecutive_errors
= 0
1349 while consecutive_errors
< 10:
1351 topics
= ("admin", "ns", "vim_account", "sdn")
1352 topic
, command
, params
= await self
.msg
.aioread(topics
, self
.loop
)
1353 if topic
!= "admin" and command
!= "ping":
1354 self
.logger
.debug("Task kafka_read receives {} {}: {}".format(topic
, command
, params
))
1355 consecutive_errors
= 0
1358 if command
== "exit":
1361 elif command
.startswith("#"):
1363 elif command
== "echo":
1368 elif command
== "test":
1369 asyncio
.Task(self
.test(params
), loop
=self
.loop
)
1372 if topic
== "admin":
1373 if command
== "ping" and params
["to"] == "lcm" and params
["from"] == "lcm":
1374 self
.pings_not_received
= 0
1377 if command
== "instantiate":
1378 # self.logger.debug("Deploying NS {}".format(nsr_id))
1380 nslcmop_id
= nslcmop
["_id"]
1381 nsr_id
= nslcmop
["nsInstanceId"]
1382 task
= asyncio
.ensure_future(self
.ns_instantiate(nsr_id
, nslcmop_id
))
1383 if nsr_id
not in self
.lcm_ns_tasks
:
1384 self
.lcm_ns_tasks
[nsr_id
] = {}
1385 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_instantiate": task
}
1387 elif command
== "terminate":
1388 # self.logger.debug("Deleting NS {}".format(nsr_id))
1390 nslcmop_id
= nslcmop
["_id"]
1391 nsr_id
= nslcmop
["nsInstanceId"]
1392 self
.cancel_tasks(topic
, nsr_id
)
1393 task
= asyncio
.ensure_future(self
.ns_terminate(nsr_id
, nslcmop_id
))
1394 if nsr_id
not in self
.lcm_ns_tasks
:
1395 self
.lcm_ns_tasks
[nsr_id
] = {}
1396 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_terminate": task
}
1398 elif command
== "action":
1399 # self.logger.debug("Update NS {}".format(nsr_id))
1401 nslcmop_id
= nslcmop
["_id"]
1402 nsr_id
= nslcmop
["nsInstanceId"]
1403 task
= asyncio
.ensure_future(self
.ns_action(nsr_id
, nslcmop_id
))
1404 if nsr_id
not in self
.lcm_ns_tasks
:
1405 self
.lcm_ns_tasks
[nsr_id
] = {}
1406 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_action": task
}
1408 elif command
== "show":
1410 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1411 print("nsr:\n _id={}\n operational-status: {}\n config-status: {}"
1412 "\n detailed-status: {}\n deploy: {}\n tasks: {}"
1413 "".format(nsr_id
, db_nsr
["operational-status"], db_nsr
["config-status"],
1414 db_nsr
["detailed-status"],
1415 db_nsr
["_admin"]["deployed"], self
.lcm_ns_tasks
.get(nsr_id
)))
1416 except Exception as e
:
1417 print("nsr {} not found: {}".format(nsr_id
, e
))
1420 elif command
== "deleted":
1421 continue # TODO cleaning of task just in case should be done
1422 elif topic
== "vim_account":
1423 vim_id
= params
["_id"]
1424 if command
== "create":
1425 task
= asyncio
.ensure_future(self
.vim_create(params
, order_id
))
1426 if vim_id
not in self
.lcm_vim_tasks
:
1427 self
.lcm_vim_tasks
[vim_id
] = {}
1428 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"vim_create": task
}
1430 elif command
== "delete":
1431 self
.cancel_tasks(topic
, vim_id
)
1432 task
= asyncio
.ensure_future(self
.vim_delete(vim_id
, order_id
))
1433 if vim_id
not in self
.lcm_vim_tasks
:
1434 self
.lcm_vim_tasks
[vim_id
] = {}
1435 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"vim_delete": task
}
1437 elif command
== "show":
1438 print("not implemented show with vim_account")
1441 elif command
== "edit":
1442 task
= asyncio
.ensure_future(self
.vim_edit(params
, order_id
))
1443 if vim_id
not in self
.lcm_vim_tasks
:
1444 self
.lcm_vim_tasks
[vim_id
] = {}
1445 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"vim_edit": task
}
1447 elif topic
== "sdn":
1448 _sdn_id
= params
["_id"]
1449 if command
== "create":
1450 task
= asyncio
.ensure_future(self
.sdn_create(params
, order_id
))
1451 if _sdn_id
not in self
.lcm_sdn_tasks
:
1452 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1453 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"sdn_create": task
}
1455 elif command
== "delete":
1456 self
.cancel_tasks(topic
, _sdn_id
)
1457 task
= asyncio
.ensure_future(self
.sdn_delete(_sdn_id
, order_id
))
1458 if _sdn_id
not in self
.lcm_sdn_tasks
:
1459 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1460 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"sdn_delete": task
}
1462 elif command
== "edit":
1463 task
= asyncio
.ensure_future(self
.sdn_edit(params
, order_id
))
1464 if _sdn_id
not in self
.lcm_sdn_tasks
:
1465 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1466 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"sdn_edit": task
}
1468 self
.logger
.critical("unknown topic {} and command '{}'".format(topic
, command
))
1469 except Exception as e
:
1470 # if not first_start is the first time after starting. So leave more time and wait
1471 # to allow kafka starts
1472 if consecutive_errors
== 8 if not first_start
else 30:
1473 self
.logger
.error("Task kafka_read task exit error too many errors. Exception: {}".format(e
))
1475 consecutive_errors
+= 1
1476 self
.logger
.error("Task kafka_read retrying after Exception {}".format(e
))
1477 wait_time
= 2 if not first_start
else 5
1478 await asyncio
.sleep(wait_time
, loop
=self
.loop
)
1480 # self.logger.debug("Task kafka_read terminating")
1481 self
.logger
.debug("Task kafka_read exit")
1484 self
.loop
= asyncio
.get_event_loop()
1485 self
.loop
.run_until_complete(asyncio
.gather(
1490 # self.logger.debug("Terminating cancelling creation tasks")
1491 # self.cancel_tasks("ALL", "create")
1493 # while self.is_pending_tasks():
1494 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1495 # await asyncio.sleep(2, loop=self.loop)
1498 # self.cancel_tasks("ALL", "ALL")
1502 self
.db
.db_disconnect()
1504 self
.msg
.disconnect()
1506 self
.fs
.fs_disconnect()
1508 def read_config_file(self
, config_file
):
1509 # TODO make a [ini] + yaml inside parser
1510 # the configparser library is not suitable, because it does not admit comments at the end of line,
1511 # and not parse integer or boolean
1513 with
open(config_file
) as f
:
1515 for k
, v
in environ
.items():
1516 if not k
.startswith("OSMLCM_"):
1518 k_items
= k
.lower().split("_")
1521 for k_item
in k_items
[1:-1]:
1522 if k_item
in ("ro", "vca"):
1523 # put in capital letter
1524 k_item
= k_item
.upper()
1526 if k_items
[-1] == "port":
1527 c
[k_items
[-1]] = int(v
)
1530 except Exception as e
:
1531 self
.logger
.warn("skipping environ '{}' on exception '{}'".format(k
, e
))
1534 except Exception as e
:
1535 self
.logger
.critical("At config file '{}': {}".format(config_file
, e
))
1540 print("""Usage: {} [options]
1541 -c|--config [configuration_file]: loads the configuration file (default: ./nbi.cfg)
1542 -h|--help: shows this help
1543 """.format(sys
.argv
[0]))
1544 # --log-socket-host HOST: send logs to this host")
1545 # --log-socket-port PORT: send logs using this port (default: 9022)")
1548 if __name__
== '__main__':
1550 # load parameters and configuration
1551 opts
, args
= getopt
.getopt(sys
.argv
[1:], "hc:", ["config=", "help"])
1552 # TODO add "log-socket-host=", "log-socket-port=", "log-file="
1555 if o
in ("-h", "--help"):
1558 elif o
in ("-c", "--config"):
1560 # elif o == "--log-socket-port":
1561 # log_socket_port = a
1562 # elif o == "--log-socket-host":
1563 # log_socket_host = a
1564 # elif o == "--log-file":
1567 assert False, "Unhandled option"
1569 if not path
.isfile(config_file
):
1570 print("configuration file '{}' that not exist".format(config_file
), file=sys
.stderr
)
1573 for config_file
in (__file__
[:__file__
.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
1574 if path
.isfile(config_file
):
1577 print("No configuration file 'nbi.cfg' found neither at local folder nor at /etc/osm/", file=sys
.stderr
)
1579 lcm
= Lcm(config_file
)
1581 except getopt
.GetoptError
as e
:
1582 print(str(e
), file=sys
.stderr
)