2 # -*- coding: utf-8 -*-
15 from dbbase
import DbException
16 from fsbase
import FsException
17 from msgbase
import MsgException
18 from os
import environ
19 # from vca import DeployApplication, RemoveApplication
20 from n2vc
.vnf
import N2VC
24 from copy
import deepcopy
25 from http
import HTTPStatus
29 class LcmException(Exception):
35 def __init__(self
, config_file
):
37 Init, Connect to database, filesystem storage, and messaging
38 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
45 self
.pings_not_received
= 1
47 # contains created tasks/futures to be able to cancel
48 self
.lcm_ns_tasks
= {}
49 self
.lcm_vim_tasks
= {}
50 self
.lcm_sdn_tasks
= {}
52 self
.logger
= logging
.getLogger('lcm')
54 config
= self
.read_config_file(config_file
)
57 "endpoint_url": "http://{}:{}/openmano".format(config
["RO"]["host"], config
["RO"]["port"]),
58 "tenant": config
.get("tenant", "osm"),
59 "logger_name": "lcm.ROclient",
63 self
.vca
= config
["VCA"] # TODO VCA
67 log_format_simple
= "%(asctime)s %(levelname)s %(name)s %(filename)s:%(lineno)s %(message)s"
68 log_formatter_simple
= logging
.Formatter(log_format_simple
, datefmt
='%Y-%m-%dT%H:%M:%S')
69 config
["database"]["logger_name"] = "lcm.db"
70 config
["storage"]["logger_name"] = "lcm.fs"
71 config
["message"]["logger_name"] = "lcm.msg"
72 if "logfile" in config
["global"]:
73 file_handler
= logging
.handlers
.RotatingFileHandler(config
["global"]["logfile"],
74 maxBytes
=100e6
, backupCount
=9, delay
=0)
75 file_handler
.setFormatter(log_formatter_simple
)
76 self
.logger
.addHandler(file_handler
)
78 str_handler
= logging
.StreamHandler()
79 str_handler
.setFormatter(log_formatter_simple
)
80 self
.logger
.addHandler(str_handler
)
82 if config
["global"].get("loglevel"):
83 self
.logger
.setLevel(config
["global"]["loglevel"])
85 # logging other modules
86 for k1
, logname
in {"message": "lcm.msg", "database": "lcm.db", "storage": "lcm.fs"}.items():
87 config
[k1
]["logger_name"] = logname
88 logger_module
= logging
.getLogger(logname
)
89 if "logfile" in config
[k1
]:
90 file_handler
= logging
.handlers
.RotatingFileHandler(config
[k1
]["logfile"],
91 maxBytes
=100e6
, backupCount
=9, delay
=0)
92 file_handler
.setFormatter(log_formatter_simple
)
93 logger_module
.addHandler(file_handler
)
94 if "loglevel" in config
[k1
]:
95 logger_module
.setLevel(config
[k1
]["loglevel"])
99 server
=config
['VCA']['host'],
100 port
=config
['VCA']['port'],
101 user
=config
['VCA']['user'],
102 secret
=config
['VCA']['secret'],
103 # TODO: This should point to the base folder where charms are stored,
104 # if there is a common one (like object storage). Otherwise, leave
105 # it unset and pass it via DeployCharms
106 # artifacts=config['VCA'][''],
111 if config
["database"]["driver"] == "mongo":
112 self
.db
= dbmongo
.DbMongo()
113 self
.db
.db_connect(config
["database"])
114 elif config
["database"]["driver"] == "memory":
115 self
.db
= dbmemory
.DbMemory()
116 self
.db
.db_connect(config
["database"])
118 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
119 config
["database"]["driver"]))
121 if config
["storage"]["driver"] == "local":
122 self
.fs
= fslocal
.FsLocal()
123 self
.fs
.fs_connect(config
["storage"])
125 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
126 config
["storage"]["driver"]))
128 if config
["message"]["driver"] == "local":
129 self
.msg
= msglocal
.MsgLocal()
130 self
.msg
.connect(config
["message"])
131 elif config
["message"]["driver"] == "kafka":
132 self
.msg
= msgkafka
.MsgKafka()
133 self
.msg
.connect(config
["message"])
135 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
136 config
["storage"]["driver"]))
137 except (DbException
, FsException
, MsgException
) as e
:
138 self
.logger
.critical(str(e
), exc_info
=True)
139 raise LcmException(str(e
))
141 def update_db(self
, item
, _id
, _desc
):
143 self
.db
.replace(item
, _id
, _desc
)
144 except DbException
as e
:
145 self
.logger
.error("Updating {} _id={}: {}".format(item
, _id
, e
))
147 def update_db_2(self
, item
, _id
, _desc
):
149 self
.db
.set_one(item
, {"_id": _id
}, _desc
)
150 except DbException
as e
:
151 self
.logger
.error("Updating {} _id={}: {}".format(item
, _id
, e
))
153 async def vim_create(self
, vim_content
, order_id
):
154 vim_id
= vim_content
["_id"]
155 logging_text
= "Task vim_create={} ".format(vim_id
)
156 self
.logger
.debug(logging_text
+ "Enter")
160 step
= "Getting vim from db"
161 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
162 if "_admin" not in db_vim
:
163 db_vim
["_admin"] = {}
164 if "deployed" not in db_vim
["_admin"]:
165 db_vim
["_admin"]["deployed"] = {}
166 db_vim
["_admin"]["deployed"]["RO"] = None
168 step
= "Creating vim at RO"
169 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
170 vim_RO
= deepcopy(vim_content
)
171 vim_RO
.pop("_id", None)
172 vim_RO
.pop("_admin", None)
173 vim_RO
.pop("schema_version", None)
174 vim_RO
.pop("schema_type", None)
175 vim_RO
.pop("vim_tenant_name", None)
176 vim_RO
["type"] = vim_RO
.pop("vim_type")
177 vim_RO
.pop("vim_user", None)
178 vim_RO
.pop("vim_password", None)
179 desc
= await RO
.create("vim", descriptor
=vim_RO
)
180 RO_vim_id
= desc
["uuid"]
181 db_vim
["_admin"]["deployed"]["RO"] = RO_vim_id
182 self
.update_db("vim_accounts", vim_id
, db_vim
)
184 step
= "Attach vim to RO tenant"
185 vim_RO
= {"vim_tenant_name": vim_content
["vim_tenant_name"],
186 "vim_username": vim_content
["vim_user"],
187 "vim_password": vim_content
["vim_password"],
188 "config": vim_content
["config"]
190 desc
= await RO
.attach_datacenter(RO_vim_id
, descriptor
=vim_RO
)
191 db_vim
["_admin"]["operationalState"] = "ENABLED"
192 self
.update_db("vim_accounts", vim_id
, db_vim
)
194 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id".format(RO_vim_id
))
197 except (ROclient
.ROClientException
, DbException
) as e
:
198 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
200 except Exception as e
:
201 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
205 db_vim
["_admin"]["operationalState"] = "ERROR"
206 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
207 self
.update_db("vim_accounts", vim_id
, db_vim
)
209 async def vim_edit(self
, vim_content
, order_id
):
210 vim_id
= vim_content
["_id"]
211 logging_text
= "Task vim_edit={} ".format(vim_id
)
212 self
.logger
.debug(logging_text
+ "Enter")
215 step
= "Getting vim from db"
217 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
218 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
219 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
220 step
= "Editing vim at RO"
221 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
222 vim_RO
= deepcopy(vim_content
)
223 vim_RO
.pop("_id", None)
224 vim_RO
.pop("_admin", None)
225 vim_RO
.pop("schema_version", None)
226 vim_RO
.pop("schema_type", None)
227 vim_RO
.pop("vim_tenant_name", None)
228 vim_RO
["type"] = vim_RO
.pop("vim_type")
229 vim_RO
.pop("vim_user", None)
230 vim_RO
.pop("vim_password", None)
232 desc
= await RO
.edit("vim", RO_vim_id
, descriptor
=vim_RO
)
234 step
= "Editing vim-account at RO tenant"
236 for k
in ("vim_tenant_name", "vim_password", "config"):
238 vim_RO
[k
] = vim_content
[k
]
239 if "vim_user" in vim_content
:
240 vim_content
["vim_username"] = vim_content
["vim_user"]
242 desc
= await RO
.edit("vim_account", RO_vim_id
, descriptor
=vim_RO
)
243 db_vim
["_admin"]["operationalState"] = "ENABLED"
244 self
.update_db("vim_accounts", vim_id
, db_vim
)
246 self
.logger
.debug(logging_text
+ "Exit Ok RO_vim_id".format(RO_vim_id
))
249 except (ROclient
.ROClientException
, DbException
) as e
:
250 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
252 except Exception as e
:
253 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
257 db_vim
["_admin"]["operationalState"] = "ERROR"
258 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
259 self
.update_db("vim_accounts", vim_id
, db_vim
)
261 async def vim_delete(self
, vim_id
, order_id
):
262 logging_text
= "Task vim_delete={} ".format(vim_id
)
263 self
.logger
.debug(logging_text
+ "Enter")
266 step
= "Getting vim from db"
268 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_id
})
269 if db_vim
.get("_admin") and db_vim
["_admin"].get("deployed") and db_vim
["_admin"]["deployed"].get("RO"):
270 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
271 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
272 step
= "Detaching vim from RO tenant"
274 await RO
.detach_datacenter(RO_vim_id
)
275 except ROclient
.ROClientException
as e
:
276 if e
.http_code
== 404: # not found
277 self
.logger
.debug(logging_text
+ "RO_vim_id={} already detached".format(RO_vim_id
))
281 step
= "Deleting vim from RO"
283 await RO
.delete("vim", RO_vim_id
)
284 except ROclient
.ROClientException
as e
:
285 if e
.http_code
== 404: # not found
286 self
.logger
.debug(logging_text
+ "RO_vim_id={} already deleted".format(RO_vim_id
))
291 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
292 self
.db
.del_one("vim_accounts", {"_id": vim_id
})
293 self
.logger
.debug("vim_delete task vim_id={} Exit Ok".format(vim_id
))
296 except (ROclient
.ROClientException
, DbException
) as e
:
297 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
299 except Exception as e
:
300 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
304 db_vim
["_admin"]["operationalState"] = "ERROR"
305 db_vim
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
306 self
.update_db("vim_accounts", vim_id
, db_vim
)
308 async def sdn_create(self
, sdn_content
, order_id
):
309 sdn_id
= sdn_content
["_id"]
310 logging_text
= "Task sdn_create={} ".format(sdn_id
)
311 self
.logger
.debug(logging_text
+ "Enter")
315 step
= "Getting sdn from db"
316 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
317 if "_admin" not in db_sdn
:
318 db_sdn
["_admin"] = {}
319 if "deployed" not in db_sdn
["_admin"]:
320 db_sdn
["_admin"]["deployed"] = {}
321 db_sdn
["_admin"]["deployed"]["RO"] = None
323 step
= "Creating sdn at RO"
324 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
325 sdn_RO
= deepcopy(sdn_content
)
326 sdn_RO
.pop("_id", None)
327 sdn_RO
.pop("_admin", None)
328 sdn_RO
.pop("schema_version", None)
329 sdn_RO
.pop("schema_type", None)
330 desc
= await RO
.create("sdn", descriptor
=sdn_RO
)
331 RO_sdn_id
= desc
["uuid"]
332 db_sdn
["_admin"]["deployed"]["RO"] = RO_sdn_id
333 db_sdn
["_admin"]["operationalState"] = "ENABLED"
334 self
.update_db("sdns", sdn_id
, db_sdn
)
335 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id".format(RO_sdn_id
))
338 except (ROclient
.ROClientException
, DbException
) as e
:
339 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
341 except Exception as e
:
342 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
346 db_sdn
["_admin"]["operationalState"] = "ERROR"
347 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
348 self
.update_db("sdns", sdn_id
, db_sdn
)
350 async def sdn_edit(self
, sdn_content
, order_id
):
351 sdn_id
= sdn_content
["_id"]
352 logging_text
= "Task sdn_edit={} ".format(sdn_id
)
353 self
.logger
.debug(logging_text
+ "Enter")
356 step
= "Getting sdn from db"
358 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
359 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
360 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
361 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
362 step
= "Editing sdn at RO"
363 sdn_RO
= deepcopy(sdn_content
)
364 sdn_RO
.pop("_id", None)
365 sdn_RO
.pop("_admin", None)
366 sdn_RO
.pop("schema_version", None)
367 sdn_RO
.pop("schema_type", None)
369 desc
= await RO
.edit("sdn", RO_sdn_id
, descriptor
=sdn_RO
)
370 db_sdn
["_admin"]["operationalState"] = "ENABLED"
371 self
.update_db("sdns", sdn_id
, db_sdn
)
373 self
.logger
.debug(logging_text
+ "Exit Ok RO_sdn_id".format(RO_sdn_id
))
376 except (ROclient
.ROClientException
, DbException
) as e
:
377 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
379 except Exception as e
:
380 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
384 db_sdn
["_admin"]["operationalState"] = "ERROR"
385 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
386 self
.update_db("sdns", sdn_id
, db_sdn
)
388 async def sdn_delete(self
, sdn_id
, order_id
):
389 logging_text
= "Task sdn_delete={} ".format(sdn_id
)
390 self
.logger
.debug(logging_text
+ "Enter")
393 step
= "Getting sdn from db"
395 db_sdn
= self
.db
.get_one("sdns", {"_id": sdn_id
})
396 if db_sdn
.get("_admin") and db_sdn
["_admin"].get("deployed") and db_sdn
["_admin"]["deployed"].get("RO"):
397 RO_sdn_id
= db_sdn
["_admin"]["deployed"]["RO"]
398 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
399 step
= "Deleting sdn from RO"
401 await RO
.delete("sdn", RO_sdn_id
)
402 except ROclient
.ROClientException
as e
:
403 if e
.http_code
== 404: # not found
404 self
.logger
.debug(logging_text
+ "RO_sdn_id={} already deleted".format(RO_sdn_id
))
409 self
.logger
.error(logging_text
+ "Skipping. There is not RO information at database")
410 self
.db
.del_one("sdns", {"_id": sdn_id
})
411 self
.logger
.debug("sdn_delete task sdn_id={} Exit Ok".format(sdn_id
))
414 except (ROclient
.ROClientException
, DbException
) as e
:
415 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
417 except Exception as e
:
418 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
422 db_sdn
["_admin"]["operationalState"] = "ERROR"
423 db_sdn
["_admin"]["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
424 self
.update_db("sdns", sdn_id
, db_sdn
)
426 def vnfd2RO(self
, vnfd
, new_id
=None):
428 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
429 :param vnfd: input vnfd
430 :param new_id: overrides vnf id if provided
431 :return: copy of vnfd
435 vnfd_RO
= deepcopy(vnfd
)
436 vnfd_RO
.pop("_id", None)
437 vnfd_RO
.pop("_admin", None)
439 vnfd_RO
["id"] = new_id
440 for vdu
in vnfd_RO
["vdu"]:
441 if "cloud-init-file" in vdu
:
442 base_folder
= vnfd
["_admin"]["storage"]
443 clout_init_file
= "{}/{}/cloud_init/{}".format(
444 base_folder
["folder"],
445 base_folder
["pkg-dir"],
446 vdu
["cloud-init-file"]
448 ci_file
= self
.fs
.file_open(clout_init_file
, "r")
449 # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
450 clout_init_content
= ci_file
.read()
453 vdu
.pop("cloud-init-file", None)
454 vdu
["cloud-init"] = clout_init_content
456 except FsException
as e
:
457 raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd
["_id"], e
))
462 def n2vc_callback(self
, model_name
, application_name
, workload_status
, db_nsr
, db_nslcmop
, vnf_member_index
, task
=None):
463 """Update the lcm database with the status of the charm.
465 Updates the VNF's operational status with the state of the charm:
466 - blocked: The unit needs manual intervention
467 - maintenance: The unit is actively deploying/configuring
468 - waiting: The unit is waiting for another charm to be ready
469 - active: The unit is deployed, configured, and ready
470 - error: The charm has failed and needs attention.
471 - terminated: The charm has been destroyed
475 Updates the network service's config-status to reflect the state of all
480 update_nsr
= update_nslcmop
= False
482 nsr_id
= db_nsr
["_id"]
483 nslcmop_id
= db_nslcmop
["_id"]
484 nsr_lcm
= db_nsr
["_admin"]["deployed"]
485 ns_action
= db_nslcmop
["lcmOperationType"]
486 logging_text
= "Task ns={} {}={} [n2vc_callback] vnf_index={}".format(nsr_id
, ns_action
, nslcmop_id
,
491 self
.logger
.debug(logging_text
+ " task Cancelled")
492 # TODO update db_nslcmop
496 exc
= task
.exception()
498 self
.logger
.error(logging_text
+ " task Exception={}".format(exc
))
499 if ns_action
in ("instantiate", "terminate"):
500 nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] = "error"
501 nsr_lcm
["VCA"][vnf_member_index
]['detailed-status'] = str(exc
)
502 elif ns_action
== "action":
503 db_nslcmop
["operationState"] = "FAILED"
504 db_nslcmop
["detailedStatus"] = str(exc
)
505 db_nslcmop
["statusEnteredTime"] = time()
506 update_nslcmop
= True
510 self
.logger
.debug(logging_text
+ " task Done")
511 # TODO revise with Adam if action is finished and ok when task is done
512 if ns_action
== "action":
513 db_nslcmop
["operationState"] = "COMPLETED"
514 db_nslcmop
["detailedStatus"] = "Done"
515 db_nslcmop
["statusEnteredTime"] = time()
516 update_nslcmop
= True
517 # task is Done, but callback is still ongoing. So ignore
519 elif workload_status
:
520 self
.logger
.debug(logging_text
+ " Enter workload_status={}".format(workload_status
))
521 if nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] == workload_status
:
522 return # same status, ignore
523 nsr_lcm
["VCA"][vnf_member_index
]['operational-status'] = workload_status
524 # TODO N2VC some error message in case of error should be obtained from N2VC
525 nsr_lcm
["VCA"][vnf_member_index
]['detailed-status'] = ""
527 self
.logger
.critical(logging_text
+ " Enter with bad parameters", exc_info
=True)
533 for vnf_index
, vca_info
in nsr_lcm
["VCA"].items():
534 vca_status
= vca_info
["operational-status"]
535 if vca_status
not in status_map
:
537 status_map
[vca_status
] = 0
538 status_map
[vca_status
] += 1
540 if vca_status
!= "active":
542 if vca_status
== "error":
544 db_nsr
["config-status"] = "failed"
545 error_text
= "fail configuring vnf_index={} {}".format(vnf_member_index
,
546 vca_info
["detailed-status"])
547 db_nsr
["detailed-status"] = error_text
548 db_nslcmop
["operationState"] = "FAILED_TEMP"
549 db_nslcmop
["detailedStatus"] = error_text
550 db_nslcmop
["statusEnteredTime"] = time()
554 self
.logger
.debug("[n2vc_callback] ns_instantiate={} vnf_index={} All active".format(nsr_id
, vnf_member_index
))
555 db_nsr
["config-status"] = "configured"
556 db_nsr
["detailed-status"] = "done"
557 db_nslcmop
["operationState"] = "COMPLETED"
558 db_nslcmop
["detailedStatus"] = "Done"
559 db_nslcmop
["statusEnteredTime"] = time()
565 for status
, num
in status_map
.items():
566 cs
+= separator
+ "{}: {}".format(status
, num
)
568 db_nsr
["config-status"] = cs
569 db_nslcmop
["detailedStatus"] = cs
570 update_nsr
= update_nslcmop
= True
572 except Exception as e
:
573 self
.logger
.critical("[n2vc_callback] vnf_index={} Exception {}".format(vnf_member_index
, e
), exc_info
=True)
577 self
.update_db("nslcmops", nslcmop_id
, db_nslcmop
)
579 self
.update_db("nsrs", nsr_id
, db_nsr
)
580 except Exception as e
:
581 self
.logger
.critical("[n2vc_callback] vnf_index={} Update database Exception {}".format(
582 vnf_member_index
, e
), exc_info
=True)
584 def ns_params_2_RO(self
, ns_params
):
586 Creates a RO ns descriptor from OSM ns_instantite params
587 :param ns_params: OSM instantiate params
588 :return: The RO ns descriptor
591 def vim_account_2_RO(vim_account
):
592 if vim_account
in vim_2_RO
:
593 return vim_2_RO
[vim_account
]
594 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
595 # if db_vim["_admin"]["operationalState"] == "PROCESSING":
596 # #TODO check if VIM is creating and wait
597 if db_vim
["_admin"]["operationalState"] != "ENABLED":
598 raise LcmException("VIM={} is not available. operationalSstatus={}".format(
599 vim_account
, db_vim
["_admin"]["operationalState"]))
600 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
601 vim_2_RO
[vim_account
] = RO_vim_id
607 # "name": ns_params["nsName"],
608 # "description": ns_params.get("nsDescription"),
609 "datacenter": vim_account_2_RO(ns_params
["vimAccountId"]),
610 # "scenario": ns_params["nsdId"],
614 if ns_params
.get("ssh-authorized-key"):
615 RO_ns_params
["cloud-config"] = {"key-pairs": ns_params
["ssh-authorized-key"]}
616 if ns_params
.get("vnf"):
617 for vnf
in ns_params
["vnf"]:
619 if "vimAccountId" in vnf
:
620 RO_vnf
["datacenter"] = vim_account_2_RO(vnf
["vimAccountId"])
622 RO_ns_params
["vnfs"][vnf
["member-vnf-index"]] = RO_vnf
623 if ns_params
.get("vld"):
624 for vld
in ns_params
["vld"]:
626 if "ip-profile" in vld
:
627 RO_vld
["ip-profile"] = vld
["ip-profile"]
628 if "vim-network-name" in vld
:
630 if isinstance(vld
["vim-network-name"], dict):
631 for vim_account
, vim_net
in vld
["vim-network-name"].items():
632 RO_vld
["sites"].append({
633 "netmap-use": vim_net
,
634 "datacenter": vim_account_2_RO(vim_account
)
636 else: #isinstance str
637 RO_vld
["sites"].append({"netmap-use": vld
["vim-network-name"]})
639 RO_ns_params
["networks"][vld
["name"]] = RO_vld
642 async def ns_instantiate(self
, nsr_id
, nslcmop_id
):
643 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
644 self
.logger
.debug(logging_text
+ "Enter")
645 # get all needed from database
649 step
= "Getting nsr, nslcmop, RO_vims from db"
651 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
652 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
654 nsr_name
= db_nsr
["name"] # TODO short-name??
657 for c_vnf
in nsd
["constituent-vnfd"]:
658 vnfd_id
= c_vnf
["vnfd-id-ref"]
659 if vnfd_id
not in needed_vnfd
:
660 step
= "Getting vnfd={} from db".format(vnfd_id
)
661 needed_vnfd
[vnfd_id
] = self
.db
.get_one("vnfds", {"id": vnfd_id
})
663 nsr_lcm
= db_nsr
["_admin"].get("deployed")
665 nsr_lcm
= db_nsr
["_admin"]["deployed"] = {
667 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
671 db_nsr
["detailed-status"] = "creating"
672 db_nsr
["operational-status"] = "init"
674 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
676 # get vnfds, instantiate at RO
677 for vnfd_id
, vnfd
in needed_vnfd
.items():
678 step
= db_nsr
["detailed-status"] = "Creating vnfd={} at RO".format(vnfd_id
)
679 self
.logger
.debug(logging_text
+ step
)
680 vnfd_id_RO
= nsr_id
+ "." + vnfd_id
[:200]
683 vnfd_list
= await RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id_RO
})
685 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = vnfd_list
[0]["uuid"]
686 self
.logger
.debug(logging_text
+ "RO vnfd={} exist. Using RO_id={}".format(
687 vnfd_id
, vnfd_list
[0]["uuid"]))
689 vnfd_RO
= self
.vnfd2RO(vnfd
, vnfd_id_RO
)
690 desc
= await RO
.create("vnfd", descriptor
=vnfd_RO
)
691 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = desc
["uuid"]
692 db_nsr
["_admin"]["nsState"] = "INSTANTIATED"
693 self
.update_db("nsrs", nsr_id
, db_nsr
)
697 step
= db_nsr
["detailed-status"] = "Creating nsd={} at RO".format(nsd_id
)
698 self
.logger
.debug(logging_text
+ step
)
700 nsd_id_RO
= nsd_id
+ "." + nsd_id
[:200]
701 nsd_list
= await RO
.get_list("nsd", filter_by
={"osm_id": nsd_id_RO
})
703 nsr_lcm
["RO"]["nsd_id"] = nsd_list
[0]["uuid"]
704 self
.logger
.debug(logging_text
+ "RO nsd={} exist. Using RO_id={}".format(
705 nsd_id
, nsd_list
[0]["uuid"]))
707 nsd_RO
= deepcopy(nsd
)
708 nsd_RO
["id"] = nsd_id_RO
709 nsd_RO
.pop("_id", None)
710 nsd_RO
.pop("_admin", None)
711 for c_vnf
in nsd_RO
["constituent-vnfd"]:
712 vnfd_id
= c_vnf
["vnfd-id-ref"]
713 c_vnf
["vnfd-id-ref"] = nsr_id
+ "." + vnfd_id
[:200]
714 desc
= await RO
.create("nsd", descriptor
=nsd_RO
)
715 db_nsr
["_admin"]["nsState"] = "INSTANTIATED"
716 nsr_lcm
["RO"]["nsd_id"] = desc
["uuid"]
717 self
.update_db("nsrs", nsr_id
, db_nsr
)
720 # if present use it unless in error status
721 RO_nsr_id
= nsr_lcm
["RO"].get("nsr_id")
724 step
= db_nsr
["detailed-status"] = "Looking for existing ns at RO"
725 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
726 desc
= await RO
.show("ns", RO_nsr_id
)
727 except ROclient
.ROClientException
as e
:
728 if e
.http_code
!= HTTPStatus
.NOT_FOUND
:
730 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
732 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
733 nsr_lcm
["RO"]["nsr_status"] = ns_status
734 if ns_status
== "ERROR":
735 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
736 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
737 await RO
.delete("ns", RO_nsr_id
)
738 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = None
741 step
= db_nsr
["detailed-status"] = "Creating ns at RO"
742 self
.logger
.debug(logging_text
+ step
)
743 RO_ns_params
= self
.ns_params_2_RO(db_nsr
.get("instantiate_params"))
744 desc
= await RO
.create("ns", descriptor
=RO_ns_params
,
746 scenario
=nsr_lcm
["RO"]["nsd_id"])
747 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"] = desc
["uuid"]
748 db_nsr
["_admin"]["nsState"] = "INSTANTIATED"
749 nsr_lcm
["RO"]["nsr_status"] = "BUILD"
750 self
.update_db("nsrs", nsr_id
, db_nsr
)
752 # wait until NS is ready
753 step
= ns_status_detailed
= "Waiting ns ready at RO"
754 db_nsr
["detailed-status"] = ns_status_detailed
755 self
.logger
.debug(logging_text
+ step
+ " RO_ns_id={}".format(RO_nsr_id
))
756 deployment_timeout
= 2*3600 # Two hours
757 while deployment_timeout
> 0:
758 desc
= await RO
.show("ns", RO_nsr_id
)
759 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
760 nsr_lcm
["RO"]["nsr_status"] = ns_status
761 if ns_status
== "ERROR":
762 raise ROclient
.ROClientException(ns_status_info
)
763 elif ns_status
== "BUILD":
764 db_nsr
["detailed-status"] = ns_status_detailed
+ "; {}".format(ns_status_info
)
765 self
.update_db("nsrs", nsr_id
, db_nsr
)
766 elif ns_status
== "ACTIVE":
767 step
= "Getting ns VNF management IP address"
768 nsr_lcm
["nsr_ip"] = RO
.get_ns_vnf_ip(desc
)
771 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
773 await asyncio
.sleep(5, loop
=self
.loop
)
774 deployment_timeout
-= 5
775 if deployment_timeout
<= 0:
776 raise ROclient
.ROClientException("Timeout waiting ns to be ready")
777 db_nsr
["detailed-status"] = "Configuring vnfr"
778 self
.update_db("nsrs", nsr_id
, db_nsr
)
780 # The parameters we'll need to deploy a charm
781 number_to_configure
= 0
784 """An inner function to deploy the charm from either vnf or vdu
788 # if number_to_configure == 0:
789 # self.logger.debug("Logging into N2VC...")
790 # task = asyncio.ensure_future(self.n2vc.login())
791 # yield from asyncio.wait_for(task, 30.0)
792 # self.logger.debug("Logged into N2VC!")
794 ## await self.n2vc.login()
796 # Note: The charm needs to exist on disk at the location
797 # specified by charm_path.
798 base_folder
= vnfd
["_admin"]["storage"]
799 storage_params
= self
.fs
.get_params()
800 charm_path
= "{}{}/{}/charms/{}".format(
801 storage_params
["path"],
802 base_folder
["folder"],
803 base_folder
["pkg-dir"],
807 # Setup the runtime parameters for this VNF
808 params
['rw_mgmt_ip'] = nsr_lcm
['nsr_ip']["vnf"][vnf_index
]
810 # ns_name will be ignored in the current version of N2VC
811 # but will be implemented for the next point release.
812 model_name
= 'default'
813 application_name
= self
.n2vc
.FormatApplicationName(
819 nsr_lcm
["VCA"][vnf_index
] = {
821 "application": application_name
,
822 "operational-status": "init",
823 "detailed-status": "",
827 self
.logger
.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id
, charm_path
, proxy_charm
))
828 task
= asyncio
.ensure_future(
829 self
.n2vc
.DeployCharms(
830 model_name
, # The network service name
831 application_name
, # The application name
832 vnfd
, # The vnf descriptor
833 charm_path
, # Path to charm
834 params
, # Runtime params, like mgmt ip
835 {}, # for native charms only
836 self
.n2vc_callback
, # Callback for status changes
837 db_nsr
, # Callback parameter
839 vnf_index
, # Callback parameter
840 None, # Callback parameter (task)
843 task
.add_done_callback(functools
.partial(self
.n2vc_callback
, model_name
, application_name
, None,
844 db_nsr
, db_nslcmop
, vnf_index
))
845 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
]["create_charm:" + vnf_index
] = task
847 # TODO: Make this call inside deploy()
848 # Login to the VCA. If there are multiple calls to login(),
849 # subsequent calls will be a nop and return immediately.
850 await self
.n2vc
.login()
852 step
= "Looking for needed vnfd to configure"
853 self
.logger
.debug(logging_text
+ step
)
854 for c_vnf
in nsd
["constituent-vnfd"]:
855 vnfd_id
= c_vnf
["vnfd-id-ref"]
856 vnf_index
= str(c_vnf
["member-vnf-index"])
857 vnfd
= needed_vnfd
[vnfd_id
]
859 # Check if this VNF has a charm configuration
860 vnf_config
= vnfd
.get("vnf-configuration")
862 if vnf_config
and vnf_config
.get("juju"):
863 proxy_charm
= vnf_config
["juju"]["charm"]
867 if 'initial-config-primitive' in vnf_config
:
868 params
['initial-config-primitive'] = vnf_config
['initial-config-primitive']
871 number_to_configure
+= 1
873 # Deploy charms for each VDU that supports one.
874 for vdu
in vnfd
['vdu']:
875 vdu_config
= vdu
.get('vdu-configuration')
879 if vdu_config
and vdu_config
.get("juju"):
880 proxy_charm
= vdu_config
["juju"]["charm"]
882 if 'initial-config-primitive' in vdu_config
:
883 params
['initial-config-primitive'] = vdu_config
['initial-config-primitive']
887 number_to_configure
+= 1
889 if number_to_configure
:
890 db_nsr
["config-status"] = "configuring"
891 db_nsr
["detailed-status"] = "configuring: init: {}".format(number_to_configure
)
892 db_nslcmop
["detailed-status"] = "configuring: init: {}".format(number_to_configure
)
894 db_nslcmop
["operationState"] = "COMPLETED"
895 db_nslcmop
["detailed-status"] = "done"
896 db_nsr
["config-status"] = "configured"
897 db_nsr
["detailed-status"] = "done"
898 db_nsr
["operational-status"] = "running"
899 self
.update_db("nsrs", nsr_id
, db_nsr
)
900 self
.update_db("nslcmops", nslcmop_id
, db_nslcmop
)
901 self
.logger
.debug("Task ns_instantiate={} Exit Ok".format(nsr_id
))
904 except (ROclient
.ROClientException
, DbException
, LcmException
) as e
:
905 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
907 except Exception as e
:
908 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
913 db_nsr
["detailed-status"] = "ERROR {}: {}".format(step
, exc
)
914 db_nsr
["operational-status"] = "failed"
915 self
.update_db("nsrs", nsr_id
, db_nsr
)
917 db_nslcmop
["detailed-status"] = "FAILED {}: {}".format(step
, exc
)
918 db_nslcmop
["operationState"] = "FAILED"
919 db_nslcmop
["statusEnteredTime"] = time()
920 self
.update_db("nslcmops", nslcmop_id
, db_nslcmop
)
922 async def ns_terminate(self
, nsr_id
, nslcmop_id
):
923 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
924 self
.logger
.debug(logging_text
+ "Enter")
928 step
= "Getting nsr, nslcmop from db"
929 failed_detail
= [] # annotates all failed error messages
933 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
934 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
935 # nsd = db_nsr["nsd"]
936 nsr_lcm
= deepcopy(db_nsr
["_admin"]["deployed"])
937 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
940 # db_vim = self.db.get_one("vim_accounts", {"_id": db_nsr["datacenter"]})
941 # #TODO check if VIM is creating and wait
942 # RO_vim_id = db_vim["_admin"]["deployed"]["RO"]
945 "operational-status": "terminating",
946 "config-status": "terminating",
947 "detailed-status": "Deleting charms",
949 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
952 self
.logger
.debug(logging_text
+ step
)
953 for vnf_index
, deploy_info
in nsr_lcm
["VCA"].items():
954 if deploy_info
and deploy_info
.get("application"):
955 task
= asyncio
.ensure_future(
956 self
.n2vc
.RemoveCharms(
957 deploy_info
['model'],
958 deploy_info
['application'],
959 # self.n2vc_callback,
965 vca_task_list
.append(task
)
966 vca_task_dict
[vnf_index
] = task
967 # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
968 # deploy_info['application'], None, db_nsr,
969 # db_nslcmop, vnf_index))
970 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
]["delete_charm:" + vnf_index
] = task
971 except Exception as e
:
972 self
.logger
.debug(logging_text
+ "Failed while deleting charms: {}".format(e
))
975 RO
= ROclient
.ROClient(self
.loop
, **self
.ro_config
)
977 RO_nsr_id
= nsr_lcm
["RO"].get("nsr_id")
980 step
= db_nsr
["detailed-status"] = "Deleting ns at RO"
981 self
.logger
.debug(logging_text
+ step
)
982 desc
= await RO
.delete("ns", RO_nsr_id
)
983 nsr_lcm
["RO"]["nsr_id"] = None
984 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
985 except ROclient
.ROClientException
as e
:
986 if e
.http_code
== 404: # not found
987 nsr_lcm
["RO"]["nsr_id"] = None
988 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
989 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(RO_nsr_id
))
990 elif e
.http_code
== 409: #conflict
991 failed_detail
.append("RO_ns_id={} delete conflict: {}".format(RO_nsr_id
, e
))
992 self
.logger
.debug(logging_text
+ failed_detail
[-1])
994 failed_detail
.append("RO_ns_id={} delete error: {}".format(RO_nsr_id
, e
))
995 self
.logger
.error(logging_text
+ failed_detail
[-1])
998 RO_nsd_id
= nsr_lcm
["RO"]["nsd_id"]
1001 step
= db_nsr
["detailed-status"] = "Deleting nsd at RO"
1002 desc
= await RO
.delete("nsd", RO_nsd_id
)
1003 self
.logger
.debug(logging_text
+ "RO_nsd_id={} deleted".format(RO_nsd_id
))
1004 nsr_lcm
["RO"]["nsd_id"] = None
1005 except ROclient
.ROClientException
as e
:
1006 if e
.http_code
== 404: # not found
1007 nsr_lcm
["RO"]["nsd_id"] = None
1008 self
.logger
.debug(logging_text
+ "RO_nsd_id={} already deleted".format(RO_nsd_id
))
1009 elif e
.http_code
== 409: #conflict
1010 failed_detail
.append("RO_nsd_id={} delete conflict: {}".format(RO_nsd_id
, e
))
1011 self
.logger
.debug(logging_text
+ failed_detail
[-1])
1013 failed_detail
.append("RO_nsd_id={} delete error: {}".format(RO_nsd_id
, e
))
1014 self
.logger
.error(logging_text
+ failed_detail
[-1])
1016 for vnf_id
, RO_vnfd_id
in nsr_lcm
["RO"]["vnfd_id"].items():
1020 step
= db_nsr
["detailed-status"] = "Deleting vnfd={} at RO".format(vnf_id
)
1021 desc
= await RO
.delete("vnfd", RO_vnfd_id
)
1022 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} deleted".format(RO_vnfd_id
))
1023 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
1024 except ROclient
.ROClientException
as e
:
1025 if e
.http_code
== 404: # not found
1026 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
1027 self
.logger
.debug(logging_text
+ "RO_vnfd_id={} already deleted ".format(RO_vnfd_id
))
1028 elif e
.http_code
== 409: #conflict
1029 failed_detail
.append("RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id
, e
))
1030 self
.logger
.debug(logging_text
+ failed_detail
[-1])
1032 failed_detail
.append("RO_vnfd_id={} delete error: {}".format(RO_vnfd_id
, e
))
1033 self
.logger
.error(logging_text
+ failed_detail
[-1])
1036 await asyncio
.wait(vca_task_list
, timeout
=300)
1037 for vnf_index
, task
in vca_task_dict
.items():
1038 if task
.cancelled():
1039 failed_detail
.append("VCA[{}] Deletion has been cancelled".format(vnf_index
))
1041 exc
= task
.exception()
1043 failed_detail
.append("VCA[{}] Deletion exception: {}".format(vnf_index
, exc
))
1045 nsr_lcm
["VCA"][vnf_index
] = None
1047 # TODO Should it be cancelled?!!
1049 failed_detail
.append("VCA[{}] Deletion timeout".format(vnf_index
))
1052 self
.logger
.error(logging_text
+ " ;".join(failed_detail
))
1054 "operational-status": "failed",
1055 "detailed-status": "Deletion errors " + "; ".join(failed_detail
),
1056 "_admin": {"deployed": nsr_lcm
, }
1058 db_nslcmop_update
= {
1059 "detailedStatus": "; ".join(failed_detail
),
1060 "operationState": "FAILED",
1061 "statusEnteredTime": time()
1063 elif db_nslcmop
["operationParams"].get("autoremove"):
1064 self
.db
.del_one("nsrs", {"_id": nsr_id
})
1065 self
.db
.del_list("nslcmops", {"nsInstanceId": nsr_id
})
1068 "operational-status": "terminated",
1069 "detailed-status": "Done",
1070 "_admin": {"deployed": nsr_lcm
, "nsState": "NOT_INSTANTIATED"}
1072 db_nslcmop_update
= {
1073 "detailedStatus": "Done",
1074 "operationState": "COMPLETED",
1075 "statusEnteredTime": time()
1077 self
.logger
.debug(logging_text
+ "Exit")
1079 except (ROclient
.ROClientException
, DbException
) as e
:
1080 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1082 except Exception as e
:
1083 self
.logger
.critical(logging_text
+ "Exit Exception {}".format(e
), exc_info
=True)
1086 if exc
and db_nslcmop
:
1087 db_nslcmop_update
= {
1088 "detailed-status": "FAILED {}: {}".format(step
, exc
),
1089 "operationState": "FAILED",
1090 "statusEnteredTime": time(),
1092 if db_nslcmop_update
:
1093 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
1095 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1097 async def ns_action(self
, nsr_id
, nslcmop_id
):
1098 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
1099 self
.logger
.debug(logging_text
+ "Enter")
1100 # get all needed from database
1103 db_nslcmop_update
= None
1105 step
= "Getting nsr, nslcmop"
1107 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1108 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1109 nsr_lcm
= db_nsr
["_admin"].get("deployed")
1110 vnf_index
= db_nslcmop
["operationParams"]["vnf_member_index"]
1112 #TODO check if ns is in a proper status
1113 vca_deployed
= nsr_lcm
["VCA"].get(vnf_index
)
1114 if not vca_deployed
:
1115 raise LcmException("charm for vnf_member_index={} is not deployed".format(vnf_index
))
1116 model_name
= vca_deployed
.get("model")
1117 application_name
= vca_deployed
.get("application")
1118 if not model_name
or not application_name
:
1119 raise LcmException("charm for vnf_member_index={} is not properly deployed".format(vnf_index
))
1120 if vca_deployed
["operational-status"] != "active":
1121 raise LcmException("charm for vnf_member_index={} operational_status={} not 'active'".format(
1122 vnf_index
, vca_deployed
["operational-status"]))
1123 primitive
= db_nslcmop
["operationParams"]["primitive"]
1124 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
1125 callback
= None # self.n2vc_callback
1126 callback_args
= () # [db_nsr, db_nslcmop, vnf_index, None]
1127 await self
.n2vc
.login()
1128 task
= asyncio
.ensure_future(
1129 self
.n2vc
.ExecutePrimitive(
1132 primitive
, callback
,
1137 # task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name, None,
1138 # db_nsr, db_nslcmop, vnf_index))
1139 # self.lcm_ns_tasks[nsr_id][nslcmop_id]["action: " + primitive] = task
1140 # wait until completed with timeout
1141 await asyncio
.wait((task
,), timeout
=300)
1143 result
= "FAILED" # by default
1145 if task
.cancelled():
1146 db_nslcmop
["detailedStatus"] = "Task has been cancelled"
1148 exc
= task
.exception()
1150 result_detail
= str(exc
)
1152 self
.logger
.debug(logging_text
+ " task Done")
1153 # TODO revise with Adam if action is finished and ok when task is done or callback is needed
1154 result
= "COMPLETED"
1155 result_detail
= "Done"
1157 # TODO Should it be cancelled?!!
1159 result_detail
= "timeout"
1161 db_nslcmop_update
= {
1162 "detailedStatus": result_detail
,
1163 "operationState": result
,
1164 "statusEnteredTime": time()
1166 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(result
, result_detail
))
1167 return # database update is called inside finally
1169 except (DbException
, LcmException
) as e
:
1170 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
1172 except Exception as e
:
1173 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
1176 if exc
and db_nslcmop
:
1177 db_nslcmop_update
= {
1178 "detailed-status": "FAILED {}: {}".format(step
, exc
),
1179 "operationState": "FAILED",
1180 "statusEnteredTime": time(),
1182 if db_nslcmop_update
:
1183 self
.update_db_2("nslcmops", nslcmop_id
, db_nslcmop_update
)
1185 async def test(self
, param
=None):
1186 self
.logger
.debug("Starting/Ending test task: {}".format(param
))
1188 def cancel_tasks(self
, topic
, _id
):
1190 Cancel all active tasks of a concrete nsr or vim identified for _id
1191 :param topic: can be ns or vim_account
1192 :param _id: nsr or vim identity
1193 :return: None, or raises an exception if not possible
1196 lcm_tasks
= self
.lcm_ns_tasks
1197 elif topic
== "vim_account":
1198 lcm_tasks
= self
.lcm_vim_tasks
1200 lcm_tasks
= self
.lcm_sdn_tasks
1202 if not lcm_tasks
.get(_id
):
1204 for order_id
, tasks_set
in lcm_tasks
[_id
].items():
1205 for task_name
, task
in tasks_set
.items():
1206 result
= task
.cancel()
1208 self
.logger
.debug("{} _id={} order_id={} task={} cancelled".format(topic
, _id
, order_id
, task_name
))
1211 async def kafka_ping(self
):
1212 self
.logger
.debug("Task kafka_ping Enter")
1213 consecutive_errors
= 0
1215 kafka_has_received
= False
1216 self
.pings_not_received
= 1
1219 await self
.msg
.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self
.loop
)
1220 # time between pings are low when it is not received and at starting
1221 wait_time
= 5 if not kafka_has_received
else 120
1222 if not self
.pings_not_received
:
1223 kafka_has_received
= True
1224 self
.pings_not_received
+= 1
1225 await asyncio
.sleep(wait_time
, loop
=self
.loop
)
1226 if self
.pings_not_received
> 10:
1227 raise LcmException("It is not receiving pings from Kafka bus")
1228 consecutive_errors
= 0
1230 except LcmException
:
1232 except Exception as e
:
1233 # if not first_start is the first time after starting. So leave more time and wait
1234 # to allow kafka starts
1235 if consecutive_errors
== 8 if not first_start
else 30:
1236 self
.logger
.error("Task kafka_read task exit error too many errors. Exception: {}".format(e
))
1238 consecutive_errors
+= 1
1239 self
.logger
.error("Task kafka_read retrying after Exception {}".format(e
))
1240 wait_time
= 1 if not first_start
else 5
1241 await asyncio
.sleep(wait_time
, loop
=self
.loop
)
1243 async def kafka_read(self
):
1244 self
.logger
.debug("Task kafka_read Enter")
1246 # future = asyncio.Future()
1247 consecutive_errors
= 0
1249 while consecutive_errors
< 10:
1251 topics
= ("admin", "ns", "vim_account", "sdn")
1252 topic
, command
, params
= await self
.msg
.aioread(topics
, self
.loop
)
1253 self
.logger
.debug("Task kafka_read receives {} {}: {}".format(topic
, command
, params
))
1254 consecutive_errors
= 0
1257 if command
== "exit":
1260 elif command
.startswith("#"):
1262 elif command
== "echo":
1267 elif command
== "test":
1268 asyncio
.Task(self
.test(params
), loop
=self
.loop
)
1271 if topic
== "admin":
1272 if command
== "ping" and params
["to"] == "lcm" and params
["from"] == "lcm":
1273 self
.pings_not_received
= 0
1276 if command
== "instantiate":
1277 # self.logger.debug("Deploying NS {}".format(nsr_id))
1279 nslcmop_id
= nslcmop
["_id"]
1280 nsr_id
= nslcmop
["nsInstanceId"]
1281 task
= asyncio
.ensure_future(self
.ns_instantiate(nsr_id
, nslcmop_id
))
1282 if nsr_id
not in self
.lcm_ns_tasks
:
1283 self
.lcm_ns_tasks
[nsr_id
] = {}
1284 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_instantiate": task
}
1286 elif command
== "terminate":
1287 # self.logger.debug("Deleting NS {}".format(nsr_id))
1289 nslcmop_id
= nslcmop
["_id"]
1290 nsr_id
= nslcmop
["nsInstanceId"]
1291 self
.cancel_tasks(topic
, nsr_id
)
1292 task
= asyncio
.ensure_future(self
.ns_terminate(nsr_id
, nslcmop_id
))
1293 if nsr_id
not in self
.lcm_ns_tasks
:
1294 self
.lcm_ns_tasks
[nsr_id
] = {}
1295 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_terminate": task
}
1297 elif command
== "action":
1298 # self.logger.debug("Update NS {}".format(nsr_id))
1300 nslcmop_id
= nslcmop
["_id"]
1301 nsr_id
= nslcmop
["nsInstanceId"]
1302 task
= asyncio
.ensure_future(self
.ns_action(nsr_id
, nslcmop_id
))
1303 if nsr_id
not in self
.lcm_ns_tasks
:
1304 self
.lcm_ns_tasks
[nsr_id
] = {}
1305 self
.lcm_ns_tasks
[nsr_id
][nslcmop_id
] = {"ns_action": task
}
1307 elif command
== "show":
1309 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1311 "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
1312 "{}\n deploy: {}\n tasks: {}".format(
1313 nsr_id
, db_nsr
["operational-status"],
1314 db_nsr
["config-status"], db_nsr
["detailed-status"],
1315 db_nsr
["_admin"]["deployed"], self
.lcm_ns_tasks
.get(nsr_id
)))
1316 except Exception as e
:
1317 print("nsr {} not found: {}".format(nsr_id
, e
))
1320 elif command
== "deleted":
1321 continue # TODO cleaning of task just in case should be done
1322 elif topic
== "vim_account":
1323 vim_id
= params
["_id"]
1324 if command
== "create":
1325 task
= asyncio
.ensure_future(self
.vim_create(params
, order_id
))
1326 if vim_id
not in self
.lcm_vim_tasks
:
1327 self
.lcm_vim_tasks
[vim_id
] = {}
1328 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"vim_create": task
}
1330 elif command
== "delete":
1331 self
.cancel_tasks(topic
, vim_id
)
1332 task
= asyncio
.ensure_future(self
.vim_delete(vim_id
, order_id
))
1333 if vim_id
not in self
.lcm_vim_tasks
:
1334 self
.lcm_vim_tasks
[vim_id
] = {}
1335 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"vim_delete": task
}
1337 elif command
== "show":
1338 print("not implemented show with vim_account")
1341 elif command
== "edit":
1342 task
= asyncio
.ensure_future(self
.vim_edit(vim_id
, order_id
))
1343 if vim_id
not in self
.lcm_vim_tasks
:
1344 self
.lcm_vim_tasks
[vim_id
] = {}
1345 self
.lcm_vim_tasks
[vim_id
][order_id
] = {"vim_edit": task
}
1347 elif topic
== "sdn":
1348 _sdn_id
= params
["_id"]
1349 if command
== "create":
1350 task
= asyncio
.ensure_future(self
.sdn_create(params
, order_id
))
1351 if _sdn_id
not in self
.lcm_sdn_tasks
:
1352 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1353 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"sdn_create": task
}
1355 elif command
== "delete":
1356 self
.cancel_tasks(topic
, _sdn_id
)
1357 task
= asyncio
.ensure_future(self
.sdn_delete(_sdn_id
, order_id
))
1358 if _sdn_id
not in self
.lcm_sdn_tasks
:
1359 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1360 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"sdn_delete": task
}
1362 elif command
== "edit":
1363 task
= asyncio
.ensure_future(self
.sdn_edit(_sdn_id
, order_id
))
1364 if _sdn_id
not in self
.lcm_sdn_tasks
:
1365 self
.lcm_sdn_tasks
[_sdn_id
] = {}
1366 self
.lcm_sdn_tasks
[_sdn_id
][order_id
] = {"sdn_edit": task
}
1368 self
.logger
.critical("unknown topic {} and command '{}'".format(topic
, command
))
1369 except Exception as e
:
1370 # if not first_start is the first time after starting. So leave more time and wait
1371 # to allow kafka starts
1372 if consecutive_errors
== 8 if not first_start
else 30:
1373 self
.logger
.error("Task kafka_read task exit error too many errors. Exception: {}".format(e
))
1375 consecutive_errors
+= 1
1376 self
.logger
.error("Task kafka_read retrying after Exception {}".format(e
))
1377 wait_time
= 2 if not first_start
else 5
1378 await asyncio
.sleep(wait_time
, loop
=self
.loop
)
1380 # self.logger.debug("Task kafka_read terminating")
1381 self
.logger
.debug("Task kafka_read exit")
1384 self
.loop
= asyncio
.get_event_loop()
1385 self
.loop
.run_until_complete(asyncio
.gather(
1390 # self.logger.debug("Terminating cancelling creation tasks")
1391 # self.cancel_tasks("ALL", "create")
1393 # while self.is_pending_tasks():
1394 # self.logger.debug("Task kafka_read terminating. Waiting for tasks termination")
1395 # await asyncio.sleep(2, loop=self.loop)
1398 # self.cancel_tasks("ALL", "ALL")
1402 self
.db
.db_disconnect()
1404 self
.msg
.disconnect()
1406 self
.fs
.fs_disconnect()
1409 def read_config_file(self
, config_file
):
1410 # TODO make a [ini] + yaml inside parser
1411 # the configparser library is not suitable, because it does not admit comments at the end of line,
1412 # and not parse integer or boolean
1414 with
open(config_file
) as f
:
1416 for k
, v
in environ
.items():
1417 if not k
.startswith("OSMLCM_"):
1419 k_items
= k
.lower().split("_")
1422 for k_item
in k_items
[1:-1]:
1423 if k_item
in ("ro", "vca"):
1424 # put in capital letter
1425 k_item
= k_item
.upper()
1427 if k_items
[-1] == "port":
1428 c
[k_items
[-1]] = int(v
)
1431 except Exception as e
:
1432 self
.logger
.warn("skipping environ '{}' on exception '{}'".format(k
, e
))
1435 except Exception as e
:
1436 self
.logger
.critical("At config file '{}': {}".format(config_file
, e
))
1440 if __name__
== '__main__':
1442 config_file
= "lcm.cfg"
1443 lcm
= Lcm(config_file
)