1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, TemplateError
, TemplateNotFound
, StrictUndefined
, UndefinedError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
31 from osm_lcm
.data_utils
.vnfd
import get_vdu_list
, get_vdu_profile
, \
32 get_ee_sorted_initial_config_primitive_list
, get_ee_sorted_terminate_config_primitive_list
, \
33 get_kdu_list
, get_virtual_link_profiles
, get_vdu
, get_configuration
, \
34 get_vdu_index
, get_scaling_aspect
, get_number_of_instances
, get_juju_ee_ref
35 from osm_lcm
.data_utils
.list_utils
import find_in_list
36 from osm_lcm
.data_utils
.vnfr
import get_osm_params
37 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
38 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
39 from n2vc
.k8s_helm_conn
import K8sHelmConnector
40 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
41 from n2vc
.k8s_juju_conn
import K8sJujuConnector
43 from osm_common
.dbbase
import DbException
44 from osm_common
.fsbase
import FsException
46 from osm_lcm
.data_utils
.database
.database
import Database
47 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
49 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
50 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
52 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
54 from copy
import copy
, deepcopy
56 from uuid
import uuid4
58 from random
import randint
60 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
64 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete
= 10 * 60
68 timeout_primitive
= 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
71 SUBOPERATION_STATUS_NOT_FOUND
= -1
72 SUBOPERATION_STATUS_NEW
= -2
73 SUBOPERATION_STATUS_SKIP
= -3
74 task_name_deploy_vca
= "Deploying VCA"
76 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
84 logger
=logging
.getLogger('lcm.ns')
87 self
.db
= Database().instance
.db
88 self
.fs
= Filesystem().instance
.fs
90 self
.lcm_tasks
= lcm_tasks
91 self
.timeout
= config
["timeout"]
92 self
.ro_config
= config
["ro_config"]
93 self
.ng_ro
= config
["ro_config"].get("ng")
94 self
.vca_config
= config
["VCA"].copy()
96 # create N2VC connector
97 self
.n2vc
= N2VCJujuConnector(
100 url
='{}:{}'.format(self
.vca_config
['host'], self
.vca_config
['port']),
101 username
=self
.vca_config
.get('user', None),
102 vca_config
=self
.vca_config
,
103 on_update_db
=self
._on
_update
_n
2vc
_db
,
108 self
.conn_helm_ee
= LCMHelmConn(
113 vca_config
=self
.vca_config
,
114 on_update_db
=self
._on
_update
_n
2vc
_db
117 self
.k8sclusterhelm2
= K8sHelmConnector(
118 kubectl_command
=self
.vca_config
.get("kubectlpath"),
119 helm_command
=self
.vca_config
.get("helmpath"),
126 self
.k8sclusterhelm3
= K8sHelm3Connector(
127 kubectl_command
=self
.vca_config
.get("kubectlpath"),
128 helm_command
=self
.vca_config
.get("helm3path"),
135 self
.k8sclusterjuju
= K8sJujuConnector(
136 kubectl_command
=self
.vca_config
.get("kubectlpath"),
137 juju_command
=self
.vca_config
.get("jujupath"),
141 vca_config
=self
.vca_config
,
146 self
.k8scluster_map
= {
147 "helm-chart": self
.k8sclusterhelm2
,
148 "helm-chart-v3": self
.k8sclusterhelm3
,
149 "chart": self
.k8sclusterhelm3
,
150 "juju-bundle": self
.k8sclusterjuju
,
151 "juju": self
.k8sclusterjuju
,
155 "lxc_proxy_charm": self
.n2vc
,
156 "native_charm": self
.n2vc
,
157 "k8s_proxy_charm": self
.n2vc
,
158 "helm": self
.conn_helm_ee
,
159 "helm-v3": self
.conn_helm_ee
162 self
.prometheus
= prometheus
165 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
168 def increment_ip_mac(ip_mac
, vm_index
=1):
169 if not isinstance(ip_mac
, str):
172 # try with ipv4 look for last dot
173 i
= ip_mac
.rfind(".")
176 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i
= ip_mac
.rfind(":")
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
)
187 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
192 # TODO filter RO descriptor fields...
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict
['deploymentStatus'] = ro_descriptor
198 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
200 except Exception as e
:
201 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
203 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
):
205 # remove last dot from path (if exists)
206 if path
.endswith('.'):
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
214 nsr_id
= filter.get('_id')
216 # read ns record from database
217 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
218 current_ns_status
= nsr
.get('nsState')
220 # get vca status for NS
221 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False)
225 db_dict
['vcaStatus'] = status_dict
227 # update configurationStatus for this VCA
229 vca_index
= int(path
[path
.rfind(".")+1:])
231 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
232 vca_status
= vca_list
[vca_index
].get('status')
234 configuration_status_list
= nsr
.get('configurationStatus')
235 config_status
= configuration_status_list
[vca_index
].get('status')
237 if config_status
== 'BROKEN' and vca_status
!= 'failed':
238 db_dict
['configurationStatus'][vca_index
] = 'READY'
239 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
240 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
241 except Exception as e
:
242 # not update configurationStatus
243 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
248 if current_ns_status
in ('READY', 'DEGRADED'):
249 error_description
= ''
251 if status_dict
.get('machines'):
252 for machine_id
in status_dict
.get('machines'):
253 machine
= status_dict
.get('machines').get(machine_id
)
254 # check machine agent-status
255 if machine
.get('agent-status'):
256 s
= machine
.get('agent-status').get('status')
259 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
260 # check machine instance status
261 if machine
.get('instance-status'):
262 s
= machine
.get('instance-status').get('status')
265 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
267 if status_dict
.get('applications'):
268 for app_id
in status_dict
.get('applications'):
269 app
= status_dict
.get('applications').get(app_id
)
270 # check application status
271 if app
.get('status'):
272 s
= app
.get('status').get('status')
275 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
277 if error_description
:
278 db_dict
['errorDescription'] = error_description
279 if current_ns_status
== 'READY' and is_degraded
:
280 db_dict
['nsState'] = 'DEGRADED'
281 if current_ns_status
== 'DEGRADED' and not is_degraded
:
282 db_dict
['nsState'] = 'READY'
285 self
.update_db_2("nsrs", nsr_id
, db_dict
)
287 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
289 except Exception as e
:
290 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
293 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
295 env
= Environment(undefined
=StrictUndefined
)
296 template
= env
.from_string(cloud_init_text
)
297 return template
.render(additional_params
or {})
298 except UndefinedError
as e
:
299 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
300 "file, must be provided in the instantiation parameters inside the "
301 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
))
302 except (TemplateError
, TemplateNotFound
) as e
:
303 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
304 format(vnfd_id
, vdu_id
, e
))
306 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
307 cloud_init_content
= cloud_init_file
= None
309 if vdu
.get("cloud-init-file"):
310 base_folder
= vnfd
["_admin"]["storage"]
311 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
312 vdu
["cloud-init-file"])
313 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
314 cloud_init_content
= ci_file
.read()
315 elif vdu
.get("cloud-init"):
316 cloud_init_content
= vdu
["cloud-init"]
318 return cloud_init_content
319 except FsException
as e
:
320 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
321 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
323 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
324 vdur
= next(vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"])
325 additional_params
= vdur
.get("additionalParams")
326 return parse_yaml_strings(additional_params
)
328 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
330 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
331 :param vnfd: input vnfd
332 :param new_id: overrides vnf id if provided
333 :param additionalParams: Instantiation params for VNFs provided
334 :param nsrId: Id of the NSR
335 :return: copy of vnfd
337 vnfd_RO
= deepcopy(vnfd
)
338 # remove unused by RO configuration, monitoring, scaling and internal keys
339 vnfd_RO
.pop("_id", None)
340 vnfd_RO
.pop("_admin", None)
341 vnfd_RO
.pop("monitoring-param", None)
342 vnfd_RO
.pop("scaling-group-descriptor", None)
343 vnfd_RO
.pop("kdu", None)
344 vnfd_RO
.pop("k8s-cluster", None)
346 vnfd_RO
["id"] = new_id
348 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
349 for vdu
in get_iterable(vnfd_RO
, "vdu"):
350 vdu
.pop("cloud-init-file", None)
351 vdu
.pop("cloud-init", None)
355 def ip_profile_2_RO(ip_profile
):
356 RO_ip_profile
= deepcopy(ip_profile
)
357 if "dns-server" in RO_ip_profile
:
358 if isinstance(RO_ip_profile
["dns-server"], list):
359 RO_ip_profile
["dns-address"] = []
360 for ds
in RO_ip_profile
.pop("dns-server"):
361 RO_ip_profile
["dns-address"].append(ds
['address'])
363 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
364 if RO_ip_profile
.get("ip-version") == "ipv4":
365 RO_ip_profile
["ip-version"] = "IPv4"
366 if RO_ip_profile
.get("ip-version") == "ipv6":
367 RO_ip_profile
["ip-version"] = "IPv6"
368 if "dhcp-params" in RO_ip_profile
:
369 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
372 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
373 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
374 if db_vim
["_admin"]["operationalState"] != "ENABLED":
375 raise LcmException("VIM={} is not available. operationalState={}".format(
376 vim_account
, db_vim
["_admin"]["operationalState"]))
377 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
380 def get_ro_wim_id_for_wim_account(self
, wim_account
):
381 if isinstance(wim_account
, str):
382 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
383 if db_wim
["_admin"]["operationalState"] != "ENABLED":
384 raise LcmException("WIM={} is not available. operationalState={}".format(
385 wim_account
, db_wim
["_admin"]["operationalState"]))
386 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
391 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
393 db_vdu_push_list
= []
394 db_update
= {"_admin.modified": time()}
396 for vdu_id
, vdu_count
in vdu_create
.items():
397 vdur
= next((vdur
for vdur
in reversed(db_vnfr
["vdur"]) if vdur
["vdu-id-ref"] == vdu_id
), None)
399 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
402 for count
in range(vdu_count
):
403 vdur_copy
= deepcopy(vdur
)
404 vdur_copy
["status"] = "BUILD"
405 vdur_copy
["status-detailed"] = None
406 vdur_copy
["ip-address"]: None
407 vdur_copy
["_id"] = str(uuid4())
408 vdur_copy
["count-index"] += count
+ 1
409 vdur_copy
["id"] = "{}-{}".format(vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"])
410 vdur_copy
.pop("vim_info", None)
411 for iface
in vdur_copy
["interfaces"]:
412 if iface
.get("fixed-ip"):
413 iface
["ip-address"] = self
.increment_ip_mac(iface
["ip-address"], count
+1)
415 iface
.pop("ip-address", None)
416 if iface
.get("fixed-mac"):
417 iface
["mac-address"] = self
.increment_ip_mac(iface
["mac-address"], count
+1)
419 iface
.pop("mac-address", None)
420 iface
.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
421 db_vdu_push_list
.append(vdur_copy
)
422 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
424 for vdu_id
, vdu_count
in vdu_delete
.items():
426 indexes_to_delete
= [iv
[0] for iv
in enumerate(db_vnfr
["vdur"]) if iv
[1]["vdu-id-ref"] == vdu_id
]
427 db_update
.update({"vdur.{}.status".format(i
): "DELETING" for i
in indexes_to_delete
[-vdu_count
:]})
429 # it must be deleted one by one because common.db does not allow otherwise
430 vdus_to_delete
= [v
for v
in reversed(db_vnfr
["vdur"]) if v
["vdu-id-ref"] == vdu_id
]
431 for vdu
in vdus_to_delete
[:vdu_count
]:
432 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, None, pull
={"vdur": {"_id": vdu
["_id"]}})
433 db_push
= {"vdur": db_vdu_push_list
} if db_vdu_push_list
else None
434 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
435 # modify passed dictionary db_vnfr
436 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
437 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
439 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
441 Updates database nsr with the RO info for the created vld
442 :param ns_update_nsr: dictionary to be filled with the updated info
443 :param db_nsr: content of db_nsr. This is also modified
444 :param nsr_desc_RO: nsr descriptor from RO
445 :return: Nothing, LcmException is raised on errors
448 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
449 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
450 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
452 vld
["vim-id"] = net_RO
.get("vim_net_id")
453 vld
["name"] = net_RO
.get("vim_name")
454 vld
["status"] = net_RO
.get("status")
455 vld
["status-detailed"] = net_RO
.get("error_msg")
456 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
459 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
461 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
463 for db_vnfr
in db_vnfrs
.values():
464 vnfr_update
= {"status": "ERROR"}
465 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
466 if "status" not in vdur
:
467 vdur
["status"] = "ERROR"
468 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
470 vdur
["status-detailed"] = str(error_text
)
471 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
472 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
473 except DbException
as e
:
474 self
.logger
.error("Cannot update vnf. {}".format(e
))
476 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
478 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
479 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
480 :param nsr_desc_RO: nsr descriptor from RO
481 :return: Nothing, LcmException is raised on errors
483 for vnf_index
, db_vnfr
in db_vnfrs
.items():
484 for vnf_RO
in nsr_desc_RO
["vnfs"]:
485 if vnf_RO
["member_vnf_index"] != vnf_index
:
488 if vnf_RO
.get("ip_address"):
489 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
490 elif not db_vnfr
.get("ip-address"):
491 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
492 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
494 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
495 vdur_RO_count_index
= 0
496 if vdur
.get("pdu-type"):
498 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
499 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
501 if vdur
["count-index"] != vdur_RO_count_index
:
502 vdur_RO_count_index
+= 1
504 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
505 if vdur_RO
.get("ip_address"):
506 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
508 vdur
["ip-address"] = None
509 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
510 vdur
["name"] = vdur_RO
.get("vim_name")
511 vdur
["status"] = vdur_RO
.get("status")
512 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
513 for ifacer
in get_iterable(vdur
, "interfaces"):
514 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
515 if ifacer
["name"] == interface_RO
.get("internal_name"):
516 ifacer
["ip-address"] = interface_RO
.get("ip_address")
517 ifacer
["mac-address"] = interface_RO
.get("mac_address")
520 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
522 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
523 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
526 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
527 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
529 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
530 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
531 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
533 vld
["vim-id"] = net_RO
.get("vim_net_id")
534 vld
["name"] = net_RO
.get("vim_name")
535 vld
["status"] = net_RO
.get("status")
536 vld
["status-detailed"] = net_RO
.get("error_msg")
537 vnfr_update
["vld.{}".format(vld_index
)] = vld
540 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
541 vnf_index
, vld
["id"]))
543 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
547 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
549 def _get_ns_config_info(self
, nsr_id
):
551 Generates a mapping between vnf,vdu elements and the N2VC id
552 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
553 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
554 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
555 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
557 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
558 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
560 ns_config_info
= {"osm-config-mapping": mapping
}
561 for vca
in vca_deployed_list
:
562 if not vca
["member-vnf-index"]:
564 if not vca
["vdu_id"]:
565 mapping
[vca
["member-vnf-index"]] = vca
["application"]
567 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
569 return ns_config_info
571 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
572 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
576 def get_vim_account(vim_account_id
):
578 if vim_account_id
in db_vims
:
579 return db_vims
[vim_account_id
]
580 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
581 db_vims
[vim_account_id
] = db_vim
584 # modify target_vld info with instantiation parameters
585 def parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, target_sdn
):
586 if vld_params
.get("ip-profile"):
587 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
["ip-profile"]
588 if vld_params
.get("provider-network"):
589 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
["provider-network"]
590 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
591 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
["provider-network"]["sdn-ports"]
592 if vld_params
.get("wimAccountId"):
593 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
594 target_vld
["vim_info"][target_wim
] = {}
595 for param
in ("vim-network-name", "vim-network-id"):
596 if vld_params
.get(param
):
597 if isinstance(vld_params
[param
], dict):
598 for vim
, vim_net
in vld_params
[param
]:
599 other_target_vim
= "vim:" + vim
600 populate_dict(target_vld
["vim_info"], (other_target_vim
, param
.replace("-", "_")), vim_net
)
601 else: # isinstance str
602 target_vld
["vim_info"][target_vim
][param
.replace("-", "_")] = vld_params
[param
]
603 if vld_params
.get("common_id"):
604 target_vld
["common_id"] = vld_params
.get("common_id")
606 nslcmop_id
= db_nslcmop
["_id"]
608 "name": db_nsr
["name"],
611 "image": deepcopy(db_nsr
["image"]),
612 "flavor": deepcopy(db_nsr
["flavor"]),
613 "action_id": nslcmop_id
,
614 "cloud_init_content": {},
616 for image
in target
["image"]:
617 image
["vim_info"] = {}
618 for flavor
in target
["flavor"]:
619 flavor
["vim_info"] = {}
621 if db_nslcmop
.get("lcmOperationType") != "instantiate":
622 # get parameters of instantiation:
623 db_nslcmop_instantiate
= self
.db
.get_list("nslcmops", {"nsInstanceId": db_nslcmop
["nsInstanceId"],
624 "lcmOperationType": "instantiate"})[-1]
625 ns_params
= db_nslcmop_instantiate
.get("operationParams")
627 ns_params
= db_nslcmop
.get("operationParams")
628 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
629 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
632 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
633 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
637 "mgmt-network": vld
.get("mgmt-network", False),
638 "type": vld
.get("type"),
641 "vim_network_name": vld
.get("vim-network-name"),
642 "vim_account_id": ns_params
["vimAccountId"]
646 # check if this network needs SDN assist
647 if vld
.get("pci-interfaces"):
648 db_vim
= get_vim_account(ns_params
["vimAccountId"])
649 sdnc_id
= db_vim
["config"].get("sdn-controller")
651 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
652 target_sdn
= "sdn:{}".format(sdnc_id
)
653 target_vld
["vim_info"][target_sdn
] = {
654 "sdn": True, "target_vim": target_vim
, "vlds": [sdn_vld
], "type": vld
.get("type")}
656 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
657 for nsd_vnf_profile
in nsd_vnf_profiles
:
658 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
659 if cp
["virtual-link-profile-id"] == vld
["id"]:
660 cp2target
["member_vnf:{}.{}".format(
661 cp
["constituent-cpd-id"][0]["constituent-base-element-id"],
662 cp
["constituent-cpd-id"][0]["constituent-cpd-id"]
663 )] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
665 # check at nsd descriptor, if there is an ip-profile
667 virtual_link_profiles
= get_virtual_link_profiles(nsd
)
669 for vlp
in virtual_link_profiles
:
670 ip_profile
= find_in_list(nsd
["ip-profiles"],
671 lambda profile
: profile
["name"] == vlp
["ip-profile-ref"])
672 vld_params
["ip-profile"] = ip_profile
["ip-profile-params"]
673 # update vld_params with instantiation params
674 vld_instantiation_params
= find_in_list(get_iterable(ns_params
, "vld"),
675 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]))
676 if vld_instantiation_params
:
677 vld_params
.update(vld_instantiation_params
)
678 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
679 target
["ns"]["vld"].append(target_vld
)
681 for vnfr
in db_vnfrs
.values():
682 vnfd
= find_in_list(db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"])
683 vnf_params
= find_in_list(get_iterable(ns_params
, "vnf"),
684 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"])
685 target_vnf
= deepcopy(vnfr
)
686 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
687 for vld
in target_vnf
.get("vld", ()):
688 # check if connected to a ns.vld, to fill target'
689 vnf_cp
= find_in_list(vnfd
.get("int-virtual-link-desc", ()),
690 lambda cpd
: cpd
.get("id") == vld
["id"])
692 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
693 if cp2target
.get(ns_cp
):
694 vld
["target"] = cp2target
[ns_cp
]
696 vld
["vim_info"] = {target_vim
: {"vim_network_name": vld
.get("vim-network-name")}}
697 # check if this network needs SDN assist
699 if vld
.get("pci-interfaces"):
700 db_vim
= get_vim_account(vnfr
["vim-account-id"])
701 sdnc_id
= db_vim
["config"].get("sdn-controller")
703 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
704 target_sdn
= "sdn:{}".format(sdnc_id
)
705 vld
["vim_info"][target_sdn
] = {
706 "sdn": True, "target_vim": target_vim
, "vlds": [sdn_vld
], "type": vld
.get("type")}
708 # check at vnfd descriptor, if there is an ip-profile
710 vnfd_vlp
= find_in_list(
711 get_virtual_link_profiles(vnfd
),
712 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"]
714 if vnfd_vlp
and vnfd_vlp
.get("virtual-link-protocol-data") and \
715 vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data"):
716 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"]["l3-protocol-data"]
717 ip_profile_dest_data
= {}
718 if "ip-version" in ip_profile_source_data
:
719 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
["ip-version"]
720 if "cidr" in ip_profile_source_data
:
721 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
["cidr"]
722 if "gateway-ip" in ip_profile_source_data
:
723 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
["gateway-ip"]
724 if "dhcp-enabled" in ip_profile_source_data
:
725 ip_profile_dest_data
["dhcp-params"] = {
726 "enabled": ip_profile_source_data
["dhcp-enabled"]
729 vld_params
["ip-profile"] = ip_profile_dest_data
730 # update vld_params with instantiation params
732 vld_instantiation_params
= find_in_list(get_iterable(vnf_params
, "internal-vld"),
733 lambda i_vld
: i_vld
["name"] == vld
["id"])
734 if vld_instantiation_params
:
735 vld_params
.update(vld_instantiation_params
)
736 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
739 for vdur
in target_vnf
.get("vdur", ()):
740 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
741 continue # This vdu must not be created
742 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
744 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
747 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
748 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
749 if vdu_configuration
and vdu_configuration
.get("config-access") and \
750 vdu_configuration
.get("config-access").get("ssh-access"):
751 vdur
["ssh-keys"] = ssh_keys_all
752 vdur
["ssh-access-required"] = vdu_configuration
["config-access"]["ssh-access"]["required"]
753 elif vnf_configuration
and vnf_configuration
.get("config-access") and \
754 vnf_configuration
.get("config-access").get("ssh-access") and \
755 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
756 vdur
["ssh-keys"] = ssh_keys_all
757 vdur
["ssh-access-required"] = vnf_configuration
["config-access"]["ssh-access"]["required"]
758 elif ssh_keys_instantiation
and \
759 find_in_list(vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")):
760 vdur
["ssh-keys"] = ssh_keys_instantiation
762 self
.logger
.debug("NS > vdur > {}".format(vdur
))
764 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
766 if vdud
.get("cloud-init-file"):
767 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
768 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
769 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
770 base_folder
= vnfd
["_admin"]["storage"]
771 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
772 vdud
.get("cloud-init-file"))
773 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
774 target
["cloud_init_content"][vdur
["cloud-init"]] = ci_file
.read()
775 elif vdud
.get("cloud-init"):
776 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"]))
777 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
778 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
["cloud-init"]
779 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
780 deploy_params_vdu
= self
._format
_additional
_params
(vdur
.get("additionalParams") or {})
781 deploy_params_vdu
["OSM"] = get_osm_params(vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"])
782 vdur
["additionalParams"] = deploy_params_vdu
785 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
786 if target_vim
not in ns_flavor
["vim_info"]:
787 ns_flavor
["vim_info"][target_vim
] = {}
790 # in case alternative images are provided we must check if they should be applied
791 # for the vim_type, modify the vim_type taking into account
792 ns_image_id
= int(vdur
["ns-image-id"])
793 if vdur
.get("alt-image-ids"):
794 db_vim
= get_vim_account(vnfr
["vim-account-id"])
795 vim_type
= db_vim
["vim_type"]
796 for alt_image_id
in vdur
.get("alt-image-ids"):
797 ns_alt_image
= target
["image"][int(alt_image_id
)]
798 if vim_type
== ns_alt_image
.get("vim-type"):
799 # must use alternative image
800 self
.logger
.debug("use alternative image id: {}".format(alt_image_id
))
801 ns_image_id
= alt_image_id
802 vdur
["ns-image-id"] = ns_image_id
804 ns_image
= target
["image"][int(ns_image_id
)]
805 if target_vim
not in ns_image
["vim_info"]:
806 ns_image
["vim_info"][target_vim
] = {}
808 vdur
["vim_info"] = {target_vim
: {}}
809 # instantiation parameters
811 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
813 vdur_list
.append(vdur
)
814 target_vnf
["vdur"] = vdur_list
815 target
["vnf"].append(target_vnf
)
817 desc
= await self
.RO
.deploy(nsr_id
, target
)
818 self
.logger
.debug("RO return > {}".format(desc
))
819 action_id
= desc
["action_id"]
820 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
824 "_admin.deployed.RO.operational-status": "running",
825 "detailed-status": " ".join(stage
)
827 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
828 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
829 self
._write
_op
_status
(nslcmop_id
, stage
)
830 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
833 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
=None, start_time
=None, timeout
=600, stage
=None):
834 detailed_status_old
= None
836 start_time
= start_time
or time()
837 while time() <= start_time
+ timeout
:
838 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
839 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
840 if desc_status
["status"] == "FAILED":
841 raise NgRoException(desc_status
["details"])
842 elif desc_status
["status"] == "BUILD":
844 stage
[2] = "VIM: ({})".format(desc_status
["details"])
845 elif desc_status
["status"] == "DONE":
847 stage
[2] = "Deployed at VIM"
850 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
851 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
852 detailed_status_old
= stage
[2]
853 db_nsr_update
["detailed-status"] = " ".join(stage
)
854 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
855 self
._write
_op
_status
(nslcmop_id
, stage
)
856 await asyncio
.sleep(15, loop
=self
.loop
)
857 else: # timeout_ns_deploy
858 raise NgRoException("Timeout waiting ns to deploy")
860 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
864 start_deploy
= time()
871 "action_id": nslcmop_id
873 desc
= await self
.RO
.deploy(nsr_id
, target
)
874 action_id
= desc
["action_id"]
875 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
876 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
877 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
880 delete_timeout
= 20 * 60 # 20 minutes
881 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
883 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
884 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
886 await self
.RO
.delete(nsr_id
)
887 except Exception as e
:
888 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
889 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
890 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
891 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
892 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
893 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
894 failed_detail
.append("delete conflict: {}".format(e
))
895 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
897 failed_detail
.append("delete error: {}".format(e
))
898 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
901 stage
[2] = "Error deleting from VIM"
903 stage
[2] = "Deleted from VIM"
904 db_nsr_update
["detailed-status"] = " ".join(stage
)
905 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
906 self
._write
_op
_status
(nslcmop_id
, stage
)
909 raise LcmException("; ".join(failed_detail
))
912 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
913 n2vc_key_list
, stage
):
916 :param logging_text: preffix text to use at logging
917 :param nsr_id: nsr identity
918 :param nsd: database content of ns descriptor
919 :param db_nsr: database content of ns record
920 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
922 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
923 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
924 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
925 :return: None or exception
928 start_deploy
= time()
929 ns_params
= db_nslcmop
.get("operationParams")
930 if ns_params
and ns_params
.get("timeout_ns_deploy"):
931 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
933 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
935 # Check for and optionally request placement optimization. Database will be updated if placement activated
936 stage
[2] = "Waiting for Placement."
937 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
938 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
939 for vnfr
in db_vnfrs
.values():
940 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
943 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
945 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
946 db_vnfds
, n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
)
947 except Exception as e
:
948 stage
[2] = "ERROR deploying at VIM"
949 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
950 self
.logger
.error("Error deploying at VIM {}".format(e
),
951 exc_info
=not isinstance(e
, (ROclient
.ROClientException
, LcmException
, DbException
,
955 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
957 Wait for kdu to be up, get ip address
958 :param logging_text: prefix use for logging
965 # self.logger.debug(logging_text + "Starting wait_kdu_up")
968 while nb_tries
< 360:
969 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
970 kdur
= next((x
for x
in get_iterable(db_vnfr
, "kdur") if x
.get("kdu-name") == kdu_name
), None)
972 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
))
973 if kdur
.get("status"):
974 if kdur
["status"] in ("READY", "ENABLED"):
975 return kdur
.get("ip-address")
977 raise LcmException("target KDU={} is in error state".format(kdu_name
))
979 await asyncio
.sleep(10, loop
=self
.loop
)
981 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
983 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
985 Wait for ip addres at RO, and optionally, insert public key in virtual machine
986 :param logging_text: prefix use for logging
991 :param pub_key: public ssh key to inject, None to skip
992 :param user: user to apply the public ssh key
996 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1000 target_vdu_id
= None
1006 if ro_retries
>= 360: # 1 hour
1007 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
1009 await asyncio
.sleep(10, loop
=self
.loop
)
1012 if not target_vdu_id
:
1013 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1015 if not vdu_id
: # for the VNF case
1016 if db_vnfr
.get("status") == "ERROR":
1017 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1018 ip_address
= db_vnfr
.get("ip-address")
1021 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1023 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1024 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1026 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1027 vdur
= db_vnfr
["vdur"][0]
1029 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1031 # New generation RO stores information at "vim_info"
1034 if vdur
.get("vim_info"):
1035 target_vim
= next(t
for t
in vdur
["vim_info"]) # there should be only one key
1036 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1037 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE" or ng_ro_status
== "ACTIVE":
1038 ip_address
= vdur
.get("ip-address")
1041 target_vdu_id
= vdur
["vdu-id-ref"]
1042 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1043 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1045 if not target_vdu_id
:
1048 # inject public key into machine
1049 if pub_key
and user
:
1050 self
.logger
.debug(logging_text
+ "Inserting RO key")
1051 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1052 if vdur
.get("pdu-type"):
1053 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1056 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1058 target
= {"action": {"action": "inject_ssh_key", "key": pub_key
, "user": user
},
1059 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1061 desc
= await self
.RO
.deploy(nsr_id
, target
)
1062 action_id
= desc
["action_id"]
1063 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1066 # wait until NS is deployed at RO
1068 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1069 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1072 result_dict
= await self
.RO
.create_action(
1074 item_id_name
=ro_nsr_id
,
1075 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1077 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1078 if not result_dict
or not isinstance(result_dict
, dict):
1079 raise LcmException("Unknown response from RO when injecting key")
1080 for result
in result_dict
.values():
1081 if result
.get("vim_result") == 200:
1084 raise ROclient
.ROClientException("error injecting key: {}".format(
1085 result
.get("description")))
1087 except NgRoException
as e
:
1088 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1089 except ROclient
.ROClientException
as e
:
1091 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1095 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1101 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1103 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1105 my_vca
= vca_deployed_list
[vca_index
]
1106 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1107 # vdu or kdu: no dependencies
1111 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1112 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1113 configuration_status_list
= db_nsr
["configurationStatus"]
1114 for index
, vca_deployed
in enumerate(configuration_status_list
):
1115 if index
== vca_index
:
1118 if not my_vca
.get("member-vnf-index") or \
1119 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1120 internal_status
= configuration_status_list
[index
].get("status")
1121 if internal_status
== 'READY':
1123 elif internal_status
== 'BROKEN':
1124 raise LcmException("Configuration aborted because dependent charm/s has failed")
1128 # no dependencies, return
1130 await asyncio
.sleep(10)
1133 raise LcmException("Configuration aborted because dependent charm/s timeout")
1135 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1136 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
,
1137 ee_config_descriptor
):
1138 nsr_id
= db_nsr
["_id"]
1139 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1140 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1141 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1142 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1144 'collection': 'nsrs',
1145 'filter': {'_id': nsr_id
},
1146 'path': db_update_entry
1152 element_under_configuration
= nsr_id
1156 vnfr_id
= db_vnfr
["_id"]
1157 osm_config
["osm"]["vnf_id"] = vnfr_id
1159 namespace
= "{nsi}.{ns}".format(
1160 nsi
=nsi_id
if nsi_id
else "",
1164 element_type
= 'VNF'
1165 element_under_configuration
= vnfr_id
1166 namespace
+= ".{}".format(vnfr_id
)
1168 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1169 element_type
= 'VDU'
1170 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1171 osm_config
["osm"]["vdu_id"] = vdu_id
1173 namespace
+= ".{}".format(kdu_name
)
1174 element_type
= 'KDU'
1175 element_under_configuration
= kdu_name
1176 osm_config
["osm"]["kdu_name"] = kdu_name
1179 artifact_path
= "{}/{}/{}/{}".format(
1180 base_folder
["folder"],
1181 base_folder
["pkg-dir"],
1182 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1186 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1188 # get initial_config_primitive_list that applies to this element
1189 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1191 self
.logger
.debug("Initial config primitive list > {}".format(initial_config_primitive_list
))
1193 # add config if not present for NS charm
1194 ee_descriptor_id
= ee_config_descriptor
.get("id")
1195 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1196 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list
,
1197 vca_deployed
, ee_descriptor_id
)
1199 self
.logger
.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list
))
1200 # n2vc_redesign STEP 3.1
1201 # find old ee_id if exists
1202 ee_id
= vca_deployed
.get("ee_id")
1205 deep_get(db_vnfr
, ("vim-account-id",)) or
1206 deep_get(deploy_params
, ("OSM", "vim_account_id"))
1208 vca_cloud
, vca_cloud_credential
= self
.get_vca_cloud_and_credentials(vim_account_id
)
1209 vca_k8s_cloud
, vca_k8s_cloud_credential
= self
.get_vca_k8s_cloud_and_credentials(vim_account_id
)
1210 # create or register execution environment in VCA
1211 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1213 self
._write
_configuration
_status
(
1215 vca_index
=vca_index
,
1217 element_under_configuration
=element_under_configuration
,
1218 element_type
=element_type
1221 step
= "create execution environment"
1222 self
.logger
.debug(logging_text
+ step
)
1226 if vca_type
== "k8s_proxy_charm":
1227 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1228 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
1229 namespace
=namespace
,
1230 artifact_path
=artifact_path
,
1232 cloud_name
=vca_k8s_cloud
,
1233 credential_name
=vca_k8s_cloud_credential
,
1235 elif vca_type
== "helm" or vca_type
== "helm-v3":
1236 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1237 namespace
=namespace
,
1241 artifact_path
=artifact_path
,
1245 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1246 namespace
=namespace
,
1249 cloud_name
=vca_cloud
,
1250 credential_name
=vca_cloud_credential
,
1253 elif vca_type
== "native_charm":
1254 step
= "Waiting to VM being up and getting IP address"
1255 self
.logger
.debug(logging_text
+ step
)
1256 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1257 user
=None, pub_key
=None)
1258 credentials
= {"hostname": rw_mgmt_ip
}
1260 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1261 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1262 # merged. Meanwhile let's get username from initial-config-primitive
1263 if not username
and initial_config_primitive_list
:
1264 for config_primitive
in initial_config_primitive_list
:
1265 for param
in config_primitive
.get("parameter", ()):
1266 if param
["name"] == "ssh-username":
1267 username
= param
["value"]
1270 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1271 "'config-access.ssh-access.default-user'")
1272 credentials
["username"] = username
1273 # n2vc_redesign STEP 3.2
1275 self
._write
_configuration
_status
(
1277 vca_index
=vca_index
,
1278 status
='REGISTERING',
1279 element_under_configuration
=element_under_configuration
,
1280 element_type
=element_type
1283 step
= "register execution environment {}".format(credentials
)
1284 self
.logger
.debug(logging_text
+ step
)
1285 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1286 credentials
=credentials
,
1287 namespace
=namespace
,
1289 cloud_name
=vca_cloud
,
1290 credential_name
=vca_cloud_credential
,
1293 # for compatibility with MON/POL modules, the need model and application name at database
1294 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1295 ee_id_parts
= ee_id
.split('.')
1296 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1297 if len(ee_id_parts
) >= 2:
1298 model_name
= ee_id_parts
[0]
1299 application_name
= ee_id_parts
[1]
1300 db_nsr_update
[db_update_entry
+ "model"] = model_name
1301 db_nsr_update
[db_update_entry
+ "application"] = application_name
1303 # n2vc_redesign STEP 3.3
1304 step
= "Install configuration Software"
1306 self
._write
_configuration
_status
(
1308 vca_index
=vca_index
,
1309 status
='INSTALLING SW',
1310 element_under_configuration
=element_under_configuration
,
1311 element_type
=element_type
,
1312 other_update
=db_nsr_update
1315 # TODO check if already done
1316 self
.logger
.debug(logging_text
+ step
)
1318 if vca_type
== "native_charm":
1319 config_primitive
= next((p
for p
in initial_config_primitive_list
if p
["name"] == "config"), None)
1320 if config_primitive
:
1321 config
= self
._map
_primitive
_params
(
1327 if vca_type
== "lxc_proxy_charm":
1328 if element_type
== "NS":
1329 num_units
= db_nsr
.get("config-units") or 1
1330 elif element_type
== "VNF":
1331 num_units
= db_vnfr
.get("config-units") or 1
1332 elif element_type
== "VDU":
1333 for v
in db_vnfr
["vdur"]:
1334 if vdu_id
== v
["vdu-id-ref"]:
1335 num_units
= v
.get("config-units") or 1
1337 if vca_type
!= "k8s_proxy_charm":
1338 await self
.vca_map
[vca_type
].install_configuration_sw(
1340 artifact_path
=artifact_path
,
1343 num_units
=num_units
,
1346 # write in db flag of configuration_sw already installed
1347 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1349 # add relations for this VCA (wait for other peers related with this VCA)
1350 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1351 vca_index
=vca_index
, vca_type
=vca_type
)
1353 # if SSH access is required, then get execution environment SSH public
1354 # if native charm we have waited already to VM be UP
1355 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1358 # self.logger.debug("get ssh key block")
1359 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1360 # self.logger.debug("ssh key needed")
1361 # Needed to inject a ssh key
1362 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1363 step
= "Install configuration Software, getting public ssh key"
1364 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(ee_id
=ee_id
, db_dict
=db_dict
)
1366 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1368 # self.logger.debug("no need to get ssh key")
1369 step
= "Waiting to VM being up and getting IP address"
1370 self
.logger
.debug(logging_text
+ step
)
1372 # n2vc_redesign STEP 5.1
1373 # wait for RO (ip-address) Insert pub_key into VM
1376 rw_mgmt_ip
= await self
.wait_kdu_up(logging_text
, nsr_id
, vnfr_id
, kdu_name
)
1378 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
,
1379 vdu_index
, user
=user
, pub_key
=pub_key
)
1381 rw_mgmt_ip
= None # This is for a NS configuration
1383 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1385 # store rw_mgmt_ip in deploy params for later replacement
1386 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1388 # n2vc_redesign STEP 6 Execute initial config primitive
1389 step
= 'execute initial config primitive'
1391 # wait for dependent primitives execution (NS -> VNF -> VDU)
1392 if initial_config_primitive_list
:
1393 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1395 # stage, in function of element type: vdu, kdu, vnf or ns
1396 my_vca
= vca_deployed_list
[vca_index
]
1397 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1399 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1400 elif my_vca
.get("member-vnf-index"):
1402 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1405 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1407 self
._write
_configuration
_status
(
1409 vca_index
=vca_index
,
1410 status
='EXECUTING PRIMITIVE'
1413 self
._write
_op
_status
(
1418 check_if_terminated_needed
= True
1419 for initial_config_primitive
in initial_config_primitive_list
:
1420 # adding information on the vca_deployed if it is a NS execution environment
1421 if not vca_deployed
["member-vnf-index"]:
1422 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1423 # TODO check if already done
1424 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1426 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1427 self
.logger
.debug(logging_text
+ step
)
1428 await self
.vca_map
[vca_type
].exec_primitive(
1430 primitive_name
=initial_config_primitive
["name"],
1431 params_dict
=primitive_params_
,
1434 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1435 if check_if_terminated_needed
:
1436 if config_descriptor
.get('terminate-config-primitive'):
1437 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1438 check_if_terminated_needed
= False
1440 # TODO register in database that primitive is done
1442 # STEP 7 Configure metrics
1443 if vca_type
== "helm" or vca_type
== "helm-v3":
1444 prometheus_jobs
= await self
.add_prometheus_metrics(
1446 artifact_path
=artifact_path
,
1447 ee_config_descriptor
=ee_config_descriptor
,
1450 target_ip
=rw_mgmt_ip
,
1453 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "prometheus_jobs": prometheus_jobs
})
1455 step
= "instantiated at VCA"
1456 self
.logger
.debug(logging_text
+ step
)
1458 self
._write
_configuration
_status
(
1460 vca_index
=vca_index
,
1464 except Exception as e
: # TODO not use Exception but N2VC exception
1465 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1466 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1467 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1468 self
._write
_configuration
_status
(
1470 vca_index
=vca_index
,
1473 raise LcmException("{} {}".format(step
, e
)) from e
1475 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1476 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1478 Update db_nsr fields.
1481 :param current_operation:
1482 :param current_operation_id:
1483 :param error_description:
1484 :param error_detail:
1485 :param other_update: Other required changes at database if provided, will be cleared
1489 db_dict
= other_update
or {}
1490 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1491 db_dict
["_admin.current-operation"] = current_operation_id
1492 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1493 db_dict
["currentOperation"] = current_operation
1494 db_dict
["currentOperationID"] = current_operation_id
1495 db_dict
["errorDescription"] = error_description
1496 db_dict
["errorDetail"] = error_detail
1499 db_dict
["nsState"] = ns_state
1500 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1501 except DbException
as e
:
1502 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1504 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1505 operation_state
: str = None, other_update
: dict = None):
1507 db_dict
= other_update
or {}
1508 db_dict
['queuePosition'] = queuePosition
1509 if isinstance(stage
, list):
1510 db_dict
['stage'] = stage
[0]
1511 db_dict
['detailed-status'] = " ".join(stage
)
1512 elif stage
is not None:
1513 db_dict
['stage'] = str(stage
)
1515 if error_message
is not None:
1516 db_dict
['errorMessage'] = error_message
1517 if operation_state
is not None:
1518 db_dict
['operationState'] = operation_state
1519 db_dict
["statusEnteredTime"] = time()
1520 self
.update_db_2("nslcmops", op_id
, db_dict
)
1521 except DbException
as e
:
1522 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1524 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1526 nsr_id
= db_nsr
["_id"]
1527 # configurationStatus
1528 config_status
= db_nsr
.get('configurationStatus')
1530 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1531 enumerate(config_status
) if v
}
1533 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1535 except DbException
as e
:
1536 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1538 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1539 element_under_configuration
: str = None, element_type
: str = None,
1540 other_update
: dict = None):
1542 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1543 # .format(vca_index, status))
1546 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1547 db_dict
= other_update
or {}
1549 db_dict
[db_path
+ 'status'] = status
1550 if element_under_configuration
:
1551 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1553 db_dict
[db_path
+ 'elementType'] = element_type
1554 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1555 except DbException
as e
:
1556 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1557 .format(status
, nsr_id
, vca_index
, e
))
1559 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1561 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1562 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1563 Database is used because the result can be obtained from a different LCM worker in case of HA.
1564 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1565 :param db_nslcmop: database content of nslcmop
1566 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1567 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1568 computed 'vim-account-id'
1571 nslcmop_id
= db_nslcmop
['_id']
1572 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1573 if placement_engine
== "PLA":
1574 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1575 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1576 db_poll_interval
= 5
1577 wait
= db_poll_interval
* 10
1579 while not pla_result
and wait
>= 0:
1580 await asyncio
.sleep(db_poll_interval
)
1581 wait
-= db_poll_interval
1582 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1583 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1586 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1588 for pla_vnf
in pla_result
['vnf']:
1589 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1590 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1593 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1595 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1598 def update_nsrs_with_pla_result(self
, params
):
1600 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1601 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1602 except Exception as e
:
1603 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1605 async def instantiate(self
, nsr_id
, nslcmop_id
):
1608 :param nsr_id: ns instance to deploy
1609 :param nslcmop_id: operation to run
1613 # Try to lock HA task here
1614 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1615 if not task_is_locked_by_me
:
1616 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1619 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1620 self
.logger
.debug(logging_text
+ "Enter")
1622 # get all needed from database
1624 # database nsrs record
1627 # database nslcmops record
1630 # update operation on nsrs
1632 # update operation on nslcmops
1633 db_nslcmop_update
= {}
1635 nslcmop_operation_state
= None
1636 db_vnfrs
= {} # vnf's info indexed by member-index
1638 tasks_dict_info
= {} # from task to info text
1641 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1642 # ^ stage, step, VIM progress
1644 # wait for any previous tasks in process
1645 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1647 stage
[1] = "Sync filesystem from database."
1648 self
.fs
.sync() # TODO, make use of partial sync, only for the needed packages
1650 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1651 stage
[1] = "Reading from database."
1652 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1653 db_nsr_update
["detailed-status"] = "creating"
1654 db_nsr_update
["operational-status"] = "init"
1655 self
._write
_ns
_status
(
1657 ns_state
="BUILDING",
1658 current_operation
="INSTANTIATING",
1659 current_operation_id
=nslcmop_id
,
1660 other_update
=db_nsr_update
1662 self
._write
_op
_status
(
1668 # read from db: operation
1669 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
1670 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1671 ns_params
= db_nslcmop
.get("operationParams")
1672 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1673 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1675 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1678 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
1679 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1680 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
1681 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
1683 # nsr_name = db_nsr["name"] # TODO short-name??
1685 # read from db: vnf's of this ns
1686 stage
[1] = "Getting vnfrs from db."
1687 self
.logger
.debug(logging_text
+ stage
[1])
1688 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
1690 # read from db: vnfd's for every vnf
1691 db_vnfds
= [] # every vnfd data
1693 # for each vnf in ns, read vnfd
1694 for vnfr
in db_vnfrs_list
:
1695 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
1696 vnfd_id
= vnfr
["vnfd-id"]
1697 vnfd_ref
= vnfr
["vnfd-ref"]
1699 # if we haven't this vnfd, read it from db
1700 if vnfd_id
not in db_vnfds
:
1702 stage
[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id
, vnfd_ref
)
1703 self
.logger
.debug(logging_text
+ stage
[1])
1704 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
1707 db_vnfds
.append(vnfd
)
1709 # Get or generates the _admin.deployed.VCA list
1710 vca_deployed_list
= None
1711 if db_nsr
["_admin"].get("deployed"):
1712 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
1713 if vca_deployed_list
is None:
1714 vca_deployed_list
= []
1715 configuration_status_list
= []
1716 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1717 db_nsr_update
["configurationStatus"] = configuration_status_list
1718 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1719 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1720 elif isinstance(vca_deployed_list
, dict):
1721 # maintain backward compatibility. Change a dict to list at database
1722 vca_deployed_list
= list(vca_deployed_list
.values())
1723 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1724 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1726 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
1727 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
1728 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
1730 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1731 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1732 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1733 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"})
1735 # n2vc_redesign STEP 2 Deploy Network Scenario
1736 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1737 self
._write
_op
_status
(
1742 stage
[1] = "Deploying KDUs."
1743 # self.logger.debug(logging_text + "Before deploy_kdus")
1744 # Call to deploy_kdus in case exists the "vdu:kdu" param
1745 await self
.deploy_kdus(
1746 logging_text
=logging_text
,
1748 nslcmop_id
=nslcmop_id
,
1751 task_instantiation_info
=tasks_dict_info
,
1754 stage
[1] = "Getting VCA public key."
1755 # n2vc_redesign STEP 1 Get VCA public ssh-key
1756 # feature 1429. Add n2vc public key to needed VMs
1757 n2vc_key
= self
.n2vc
.get_public_key()
1758 n2vc_key_list
= [n2vc_key
]
1759 if self
.vca_config
.get("public_key"):
1760 n2vc_key_list
.append(self
.vca_config
["public_key"])
1762 stage
[1] = "Deploying NS at VIM."
1763 task_ro
= asyncio
.ensure_future(
1764 self
.instantiate_RO(
1765 logging_text
=logging_text
,
1769 db_nslcmop
=db_nslcmop
,
1772 n2vc_key_list
=n2vc_key_list
,
1776 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
1777 tasks_dict_info
[task_ro
] = "Deploying at VIM"
1779 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1780 stage
[1] = "Deploying Execution Environments."
1781 self
.logger
.debug(logging_text
+ stage
[1])
1783 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
1784 for vnf_profile
in get_vnf_profiles(nsd
):
1785 vnfd_id
= vnf_profile
["vnfd-id"]
1786 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
1787 member_vnf_index
= str(vnf_profile
["id"])
1788 db_vnfr
= db_vnfrs
[member_vnf_index
]
1789 base_folder
= vnfd
["_admin"]["storage"]
1795 # Get additional parameters
1796 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
1797 if db_vnfr
.get("additionalParamsForVnf"):
1798 deploy_params
.update(parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy()))
1800 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
1801 if descriptor_config
:
1803 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
1806 nslcmop_id
=nslcmop_id
,
1812 member_vnf_index
=member_vnf_index
,
1813 vdu_index
=vdu_index
,
1815 deploy_params
=deploy_params
,
1816 descriptor_config
=descriptor_config
,
1817 base_folder
=base_folder
,
1818 task_instantiation_info
=tasks_dict_info
,
1822 # Deploy charms for each VDU that supports one.
1823 for vdud
in get_vdu_list(vnfd
):
1825 descriptor_config
= get_configuration(vnfd
, vdu_id
)
1826 vdur
= find_in_list(db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
)
1828 if vdur
.get("additionalParams"):
1829 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
1831 deploy_params_vdu
= deploy_params
1832 deploy_params_vdu
["OSM"] = get_osm_params(db_vnfr
, vdu_id
, vdu_count_index
=0)
1833 vdud_count
= get_vdu_profile(vnfd
, vdu_id
).get("max-number-of-instances", 1)
1835 self
.logger
.debug("VDUD > {}".format(vdud
))
1836 self
.logger
.debug("Descriptor config > {}".format(descriptor_config
))
1837 if descriptor_config
:
1840 for vdu_index
in range(vdud_count
):
1841 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1843 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1844 member_vnf_index
, vdu_id
, vdu_index
),
1847 nslcmop_id
=nslcmop_id
,
1853 member_vnf_index
=member_vnf_index
,
1854 vdu_index
=vdu_index
,
1856 deploy_params
=deploy_params_vdu
,
1857 descriptor_config
=descriptor_config
,
1858 base_folder
=base_folder
,
1859 task_instantiation_info
=tasks_dict_info
,
1862 for kdud
in get_kdu_list(vnfd
):
1863 kdu_name
= kdud
["name"]
1864 descriptor_config
= get_configuration(vnfd
, kdu_name
)
1865 if descriptor_config
:
1869 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
1870 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
1871 if kdur
.get("additionalParams"):
1872 deploy_params_kdu
= parse_yaml_strings(kdur
["additionalParams"])
1875 logging_text
=logging_text
,
1878 nslcmop_id
=nslcmop_id
,
1884 member_vnf_index
=member_vnf_index
,
1885 vdu_index
=vdu_index
,
1887 deploy_params
=deploy_params_kdu
,
1888 descriptor_config
=descriptor_config
,
1889 base_folder
=base_folder
,
1890 task_instantiation_info
=tasks_dict_info
,
1894 # Check if this NS has a charm configuration
1895 descriptor_config
= nsd
.get("ns-configuration")
1896 if descriptor_config
and descriptor_config
.get("juju"):
1899 member_vnf_index
= None
1905 # Get additional parameters
1906 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
1907 if db_nsr
.get("additionalParamsForNs"):
1908 deploy_params
.update(parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy()))
1909 base_folder
= nsd
["_admin"]["storage"]
1911 logging_text
=logging_text
,
1914 nslcmop_id
=nslcmop_id
,
1920 member_vnf_index
=member_vnf_index
,
1921 vdu_index
=vdu_index
,
1923 deploy_params
=deploy_params
,
1924 descriptor_config
=descriptor_config
,
1925 base_folder
=base_folder
,
1926 task_instantiation_info
=tasks_dict_info
,
1930 # rest of staff will be done at finally
1932 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
1933 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
1935 except asyncio
.CancelledError
:
1936 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
1937 exc
= "Operation was cancelled"
1938 except Exception as e
:
1939 exc
= traceback
.format_exc()
1940 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
1943 error_list
.append(str(exc
))
1945 # wait for pending tasks
1947 stage
[1] = "Waiting for instantiate pending tasks."
1948 self
.logger
.debug(logging_text
+ stage
[1])
1949 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
1950 stage
, nslcmop_id
, nsr_id
=nsr_id
)
1951 stage
[1] = stage
[2] = ""
1952 except asyncio
.CancelledError
:
1953 error_list
.append("Cancelled")
1954 # TODO cancel all tasks
1955 except Exception as exc
:
1956 error_list
.append(str(exc
))
1958 # update operation-status
1959 db_nsr_update
["operational-status"] = "running"
1960 # let's begin with VCA 'configured' status (later we can change it)
1961 db_nsr_update
["config-status"] = "configured"
1962 for task
, task_name
in tasks_dict_info
.items():
1963 if not task
.done() or task
.cancelled() or task
.exception():
1964 if task_name
.startswith(self
.task_name_deploy_vca
):
1965 # A N2VC task is pending
1966 db_nsr_update
["config-status"] = "failed"
1968 # RO or KDU task is pending
1969 db_nsr_update
["operational-status"] = "failed"
1971 # update status at database
1973 error_detail
= ". ".join(error_list
)
1974 self
.logger
.error(logging_text
+ error_detail
)
1975 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
1976 error_description_nsr
= 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id
, stage
[0])
1978 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
1979 db_nslcmop_update
["detailed-status"] = error_detail
1980 nslcmop_operation_state
= "FAILED"
1984 error_description_nsr
= error_description_nslcmop
= None
1986 db_nsr_update
["detailed-status"] = "Done"
1987 db_nslcmop_update
["detailed-status"] = "Done"
1988 nslcmop_operation_state
= "COMPLETED"
1991 self
._write
_ns
_status
(
1994 current_operation
="IDLE",
1995 current_operation_id
=None,
1996 error_description
=error_description_nsr
,
1997 error_detail
=error_detail
,
1998 other_update
=db_nsr_update
2000 self
._write
_op
_status
(
2003 error_message
=error_description_nslcmop
,
2004 operation_state
=nslcmop_operation_state
,
2005 other_update
=db_nslcmop_update
,
2008 if nslcmop_operation_state
:
2010 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
2011 "operationState": nslcmop_operation_state
},
2013 except Exception as e
:
2014 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
2016 self
.logger
.debug(logging_text
+ "Exit")
2017 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2019 async def _add_vca_relations(self
, logging_text
, nsr_id
, vca_index
: int,
2020 timeout
: int = 3600, vca_type
: str = None) -> bool:
2023 # 1. find all relations for this VCA
2024 # 2. wait for other peers related
2028 vca_type
= vca_type
or "lxc_proxy_charm"
2030 # STEP 1: find all relations for this VCA
2033 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2034 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2037 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2039 # read all ns-configuration relations
2040 ns_relations
= list()
2041 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2043 for r
in db_ns_relations
:
2044 # check if this VCA is in the relation
2045 if my_vca
.get('member-vnf-index') in\
2046 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2047 ns_relations
.append(r
)
2049 # read all vnf-configuration relations
2050 vnf_relations
= list()
2051 db_vnfd_list
= db_nsr
.get('vnfd-id')
2053 for vnfd
in db_vnfd_list
:
2054 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2055 db_vnf_relations
= get_configuration(db_vnfd
, db_vnfd
["id"]).get("relation", [])
2056 if db_vnf_relations
:
2057 for r
in db_vnf_relations
:
2058 # check if this VCA is in the relation
2059 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2060 vnf_relations
.append(r
)
2062 # if no relations, terminate
2063 if not ns_relations
and not vnf_relations
:
2064 self
.logger
.debug(logging_text
+ ' No relations')
2067 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2074 if now
- start
>= timeout
:
2075 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2078 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2079 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2081 # for each defined NS relation, find the VCA's related
2082 for r
in ns_relations
.copy():
2083 from_vca_ee_id
= None
2085 from_vca_endpoint
= None
2086 to_vca_endpoint
= None
2087 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2088 for vca
in vca_list
:
2089 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2090 and vca
.get('config_sw_installed'):
2091 from_vca_ee_id
= vca
.get('ee_id')
2092 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2093 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2094 and vca
.get('config_sw_installed'):
2095 to_vca_ee_id
= vca
.get('ee_id')
2096 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2097 if from_vca_ee_id
and to_vca_ee_id
:
2099 await self
.vca_map
[vca_type
].add_relation(
2100 ee_id_1
=from_vca_ee_id
,
2101 ee_id_2
=to_vca_ee_id
,
2102 endpoint_1
=from_vca_endpoint
,
2103 endpoint_2
=to_vca_endpoint
)
2104 # remove entry from relations list
2105 ns_relations
.remove(r
)
2107 # check failed peers
2109 vca_status_list
= db_nsr
.get('configurationStatus')
2111 for i
in range(len(vca_list
)):
2113 vca_status
= vca_status_list
[i
]
2114 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2115 if vca_status
.get('status') == 'BROKEN':
2116 # peer broken: remove relation from list
2117 ns_relations
.remove(r
)
2118 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2119 if vca_status
.get('status') == 'BROKEN':
2120 # peer broken: remove relation from list
2121 ns_relations
.remove(r
)
2126 # for each defined VNF relation, find the VCA's related
2127 for r
in vnf_relations
.copy():
2128 from_vca_ee_id
= None
2130 from_vca_endpoint
= None
2131 to_vca_endpoint
= None
2132 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2133 for vca
in vca_list
:
2134 key_to_check
= "vdu_id"
2135 if vca
.get("vdu_id") is None:
2136 key_to_check
= "vnfd_id"
2137 if vca
.get(key_to_check
) == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2138 from_vca_ee_id
= vca
.get('ee_id')
2139 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2140 if vca
.get(key_to_check
) == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2141 to_vca_ee_id
= vca
.get('ee_id')
2142 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2143 if from_vca_ee_id
and to_vca_ee_id
:
2145 await self
.vca_map
[vca_type
].add_relation(
2146 ee_id_1
=from_vca_ee_id
,
2147 ee_id_2
=to_vca_ee_id
,
2148 endpoint_1
=from_vca_endpoint
,
2149 endpoint_2
=to_vca_endpoint
)
2150 # remove entry from relations list
2151 vnf_relations
.remove(r
)
2153 # check failed peers
2155 vca_status_list
= db_nsr
.get('configurationStatus')
2157 for i
in range(len(vca_list
)):
2159 vca_status
= vca_status_list
[i
]
2160 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2161 if vca_status
.get('status') == 'BROKEN':
2162 # peer broken: remove relation from list
2163 vnf_relations
.remove(r
)
2164 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2165 if vca_status
.get('status') == 'BROKEN':
2166 # peer broken: remove relation from list
2167 vnf_relations
.remove(r
)
2173 await asyncio
.sleep(5.0)
2175 if not ns_relations
and not vnf_relations
:
2176 self
.logger
.debug('Relations added')
2181 except Exception as e
:
2182 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2185 async def _install_kdu(self
, nsr_id
: str, nsr_db_path
: str, vnfr_data
: dict, kdu_index
: int, kdud
: dict,
2186 vnfd
: dict, k8s_instance_info
: dict, k8params
: dict = None, timeout
: int = 600):
2189 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2191 db_dict_install
= {"collection": "nsrs",
2192 "filter": {"_id": nsr_id
},
2193 "path": nsr_db_path
}
2195 kdu_instance
= self
.k8scluster_map
[k8sclustertype
].generate_kdu_instance_name(
2196 db_dict
=db_dict_install
,
2197 kdu_model
=k8s_instance_info
["kdu-model"],
2199 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2200 await self
.k8scluster_map
[k8sclustertype
].install(
2201 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2202 kdu_model
=k8s_instance_info
["kdu-model"],
2205 db_dict
=db_dict_install
,
2207 kdu_name
=k8s_instance_info
["kdu-name"],
2208 namespace
=k8s_instance_info
["namespace"],
2209 kdu_instance
=kdu_instance
,
2211 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2213 # Obtain services to obtain management service ip
2214 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2215 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2216 kdu_instance
=kdu_instance
,
2217 namespace
=k8s_instance_info
["namespace"])
2219 # Obtain management service info (if exists)
2220 vnfr_update_dict
= {}
2222 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2223 mgmt_services
= [service
for service
in kdud
.get("service", []) if service
.get("mgmt-service")]
2224 for mgmt_service
in mgmt_services
:
2225 for service
in services
:
2226 if service
["name"].startswith(mgmt_service
["name"]):
2227 # Mgmt service found, Obtain service ip
2228 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2229 if isinstance(ip
, list) and len(ip
) == 1:
2232 vnfr_update_dict
["kdur.{}.ip-address".format(kdu_index
)] = ip
2234 # Check if must update also mgmt ip at the vnf
2235 service_external_cp
= mgmt_service
.get("external-connection-point-ref")
2236 if service_external_cp
:
2237 if deep_get(vnfd
, ("mgmt-interface", "cp")) == service_external_cp
:
2238 vnfr_update_dict
["ip-address"] = ip
2242 self
.logger
.warn("Mgmt service name: {} not found".format(mgmt_service
["name"]))
2244 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
2245 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
2247 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
2248 if kdu_config
and kdu_config
.get("initial-config-primitive") and \
2249 get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None:
2250 initial_config_primitive_list
= kdu_config
.get("initial-config-primitive")
2251 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
2253 for initial_config_primitive
in initial_config_primitive_list
:
2254 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, {})
2256 await asyncio
.wait_for(
2257 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
2258 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2259 kdu_instance
=kdu_instance
,
2260 primitive_name
=initial_config_primitive
["name"],
2261 params
=primitive_params_
, db_dict
={}),
2264 except Exception as e
:
2265 # Prepare update db with error and raise exception
2267 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)})
2268 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), {"kdur.{}.status".format(kdu_index
): "ERROR"})
2270 # ignore to keep original exception
2272 # reraise original error
2277 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2278 # Launch kdus if present in the descriptor
2280 k8scluster_id_2_uuic
= {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2282 async def _get_cluster_id(cluster_id
, cluster_type
):
2283 nonlocal k8scluster_id_2_uuic
2284 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2285 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2287 # check if K8scluster is creating and wait look if previous tasks in process
2288 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("k8scluster", cluster_id
)
2290 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name
, cluster_id
)
2291 self
.logger
.debug(logging_text
+ text
)
2292 await asyncio
.wait(task_dependency
, timeout
=3600)
2294 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2295 if not db_k8scluster
:
2296 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2298 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2300 if cluster_type
== "helm-chart-v3":
2302 # backward compatibility for existing clusters that have not been initialized for helm v3
2303 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
2304 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(k8s_credentials
,
2305 reuse_cluster_uuid
=cluster_id
)
2306 db_k8scluster_update
= {}
2307 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
2308 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
2309 db_k8scluster_update
["_admin.helm-chart-v3.created"] = uninstall_sw
2310 db_k8scluster_update
["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2311 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
2312 except Exception as e
:
2313 self
.logger
.error(logging_text
+ "error initializing helm-v3 cluster: {}".format(str(e
)))
2314 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id
,
2317 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2318 format(cluster_id
, cluster_type
))
2319 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2322 logging_text
+= "Deploy kdus: "
2325 db_nsr_update
= {"_admin.deployed.K8s": []}
2326 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2329 updated_cluster_list
= []
2330 updated_v3_cluster_list
= []
2332 for vnfr_data
in db_vnfrs
.values():
2333 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
2334 # Step 0: Prepare and set parameters
2335 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
2336 vnfd_id
= vnfr_data
.get('vnfd-id')
2337 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
)
2338 kdud
= next(kdud
for kdud
in vnfd_with_id
["kdu"] if kdud
["name"] == kdur
["kdu-name"])
2339 namespace
= kdur
.get("k8s-namespace")
2340 if kdur
.get("helm-chart"):
2341 kdumodel
= kdur
["helm-chart"]
2342 # Default version: helm3, if helm-version is v2 assign v2
2343 k8sclustertype
= "helm-chart-v3"
2344 self
.logger
.debug("kdur: {}".format(kdur
))
2345 if kdur
.get("helm-version") and kdur
.get("helm-version") == "v2":
2346 k8sclustertype
= "helm-chart"
2347 elif kdur
.get("juju-bundle"):
2348 kdumodel
= kdur
["juju-bundle"]
2349 k8sclustertype
= "juju-bundle"
2351 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2352 "juju-bundle. Maybe an old NBI version is running".
2353 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2354 # check if kdumodel is a file and exists
2356 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
)
2357 storage
= deep_get(vnfd_with_id
, ('_admin', 'storage'))
2358 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2359 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2360 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2362 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2363 kdumodel
= self
.fs
.path
+ filename
2364 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2366 except Exception: # it is not a file
2369 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2370 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2371 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2374 if (k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
)\
2375 or (k8sclustertype
== "helm-chart-v3" and cluster_uuid
not in updated_v3_cluster_list
):
2376 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2377 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(cluster_uuid
=cluster_uuid
))
2378 if del_repo_list
or added_repo_dict
:
2379 if k8sclustertype
== "helm-chart":
2380 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2381 updated
= {'_admin.helm_charts_added.' +
2382 item
: name
for item
, name
in added_repo_dict
.items()}
2383 updated_cluster_list
.append(cluster_uuid
)
2384 elif k8sclustertype
== "helm-chart-v3":
2385 unset
= {'_admin.helm_charts_v3_added.' + item
: None for item
in del_repo_list
}
2386 updated
= {'_admin.helm_charts_v3_added.' +
2387 item
: name
for item
, name
in added_repo_dict
.items()}
2388 updated_v3_cluster_list
.append(cluster_uuid
)
2389 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster "
2390 "'{}' to_delete: {}, to_add: {}".
2391 format(k8s_cluster_id
, del_repo_list
, added_repo_dict
))
2392 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2395 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2396 kdur
["kdu-name"], k8s_cluster_id
)
2397 k8s_instance_info
= {"kdu-instance": None,
2398 "k8scluster-uuid": cluster_uuid
,
2399 "k8scluster-type": k8sclustertype
,
2400 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2401 "kdu-name": kdur
["kdu-name"],
2402 "kdu-model": kdumodel
,
2403 "namespace": namespace
}
2404 db_path
= "_admin.deployed.K8s.{}".format(index
)
2405 db_nsr_update
[db_path
] = k8s_instance_info
2406 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2407 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
)
2408 task
= asyncio
.ensure_future(
2409 self
._install
_kdu
(nsr_id
, db_path
, vnfr_data
, kdu_index
, kdud
, vnfd_with_id
,
2410 k8s_instance_info
, k8params
=desc_params
, timeout
=600))
2411 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2412 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2416 except (LcmException
, asyncio
.CancelledError
):
2418 except Exception as e
:
2419 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2420 if isinstance(e
, (N2VCException
, DbException
)):
2421 self
.logger
.error(logging_text
+ msg
)
2423 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2424 raise LcmException(msg
)
2427 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2429 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2430 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2431 base_folder
, task_instantiation_info
, stage
):
2432 # launch instantiate_N2VC in a asyncio task and register task object
2433 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2434 # if not found, create one entry and update database
2435 # fill db_nsr._admin.deployed.VCA.<index>
2437 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2438 if "execution-environment-list" in descriptor_config
:
2439 ee_list
= descriptor_config
.get("execution-environment-list", [])
2440 else: # other types as script are not supported
2443 for ee_item
in ee_list
:
2444 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2445 ee_item
.get("helm-chart")))
2446 ee_descriptor_id
= ee_item
.get("id")
2447 if ee_item
.get("juju"):
2448 vca_name
= ee_item
['juju'].get('charm')
2449 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2450 if ee_item
['juju'].get('cloud') == "k8s":
2451 vca_type
= "k8s_proxy_charm"
2452 elif ee_item
['juju'].get('proxy') is False:
2453 vca_type
= "native_charm"
2454 elif ee_item
.get("helm-chart"):
2455 vca_name
= ee_item
['helm-chart']
2456 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
2459 vca_type
= "helm-v3"
2461 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2465 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2466 if not vca_deployed
:
2468 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2469 vca_deployed
.get("vdu_id") == vdu_id
and \
2470 vca_deployed
.get("kdu_name") == kdu_name
and \
2471 vca_deployed
.get("vdu_count_index", 0) == vdu_index
and \
2472 vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
:
2475 # not found, create one.
2476 target
= "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
2478 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
2480 target
+= "/kdu/{}".format(kdu_name
)
2482 "target_element": target
,
2483 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2484 "member-vnf-index": member_vnf_index
,
2486 "kdu_name": kdu_name
,
2487 "vdu_count_index": vdu_index
,
2488 "operational-status": "init", # TODO revise
2489 "detailed-status": "", # TODO revise
2490 "step": "initial-deploy", # TODO revise
2492 "vdu_name": vdu_name
,
2494 "ee_descriptor_id": ee_descriptor_id
2498 # create VCA and configurationStatus in db
2500 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2501 "configurationStatus.{}".format(vca_index
): dict()
2503 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2505 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2507 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
2508 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
2509 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
2512 task_n2vc
= asyncio
.ensure_future(
2513 self
.instantiate_N2VC(
2514 logging_text
=logging_text
,
2515 vca_index
=vca_index
,
2521 vdu_index
=vdu_index
,
2522 deploy_params
=deploy_params
,
2523 config_descriptor
=descriptor_config
,
2524 base_folder
=base_folder
,
2525 nslcmop_id
=nslcmop_id
,
2529 ee_config_descriptor
=ee_item
2532 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2533 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2534 member_vnf_index
or "", vdu_id
or "")
2537 def _create_nslcmop(nsr_id
, operation
, params
):
2539 Creates a ns-lcm-opp content to be stored at database.
2540 :param nsr_id: internal id of the instance
2541 :param operation: instantiate, terminate, scale, action, ...
2542 :param params: user parameters for the operation
2543 :return: dictionary following SOL005 format
2545 # Raise exception if invalid arguments
2546 if not (nsr_id
and operation
and params
):
2548 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2554 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2555 "operationState": "PROCESSING",
2556 "statusEnteredTime": now
,
2557 "nsInstanceId": nsr_id
,
2558 "lcmOperationType": operation
,
2560 "isAutomaticInvocation": False,
2561 "operationParams": params
,
2562 "isCancelPending": False,
2564 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2565 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2570 def _format_additional_params(self
, params
):
2571 params
= params
or {}
2572 for key
, value
in params
.items():
2573 if str(value
).startswith("!!yaml "):
2574 params
[key
] = yaml
.safe_load(value
[7:])
2577 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2578 primitive
= seq
.get('name')
2579 primitive_params
= {}
2581 "member_vnf_index": vnf_index
,
2582 "primitive": primitive
,
2583 "primitive_params": primitive_params
,
2586 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2590 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2591 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2592 if op
.get('operationState') == 'COMPLETED':
2593 # b. Skip sub-operation
2594 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2595 return self
.SUBOPERATION_STATUS_SKIP
2597 # c. retry executing sub-operation
2598 # The sub-operation exists, and operationState != 'COMPLETED'
2599 # Update operationState = 'PROCESSING' to indicate a retry.
2600 operationState
= 'PROCESSING'
2601 detailed_status
= 'In progress'
2602 self
._update
_suboperation
_status
(
2603 db_nslcmop
, op_index
, operationState
, detailed_status
)
2604 # Return the sub-operation index
2605 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2606 # with arguments extracted from the sub-operation
2609 # Find a sub-operation where all keys in a matching dictionary must match
2610 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2611 def _find_suboperation(self
, db_nslcmop
, match
):
2612 if db_nslcmop
and match
:
2613 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2614 for i
, op
in enumerate(op_list
):
2615 if all(op
.get(k
) == match
[k
] for k
in match
):
2617 return self
.SUBOPERATION_STATUS_NOT_FOUND
2619 # Update status for a sub-operation given its index
2620 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2621 # Update DB for HA tasks
2622 q_filter
= {'_id': db_nslcmop
['_id']}
2623 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2624 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2625 self
.db
.set_one("nslcmops",
2627 update_dict
=update_dict
,
2628 fail_on_empty
=False)
2630 # Add sub-operation, return the index of the added sub-operation
2631 # Optionally, set operationState, detailed-status, and operationType
2632 # Status and type are currently set for 'scale' sub-operations:
2633 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2634 # 'detailed-status' : status message
2635 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2636 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2637 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2638 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2639 RO_nsr_id
=None, RO_scaling_info
=None):
2641 return self
.SUBOPERATION_STATUS_NOT_FOUND
2642 # Get the "_admin.operations" list, if it exists
2643 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2644 op_list
= db_nslcmop_admin
.get('operations')
2645 # Create or append to the "_admin.operations" list
2646 new_op
= {'member_vnf_index': vnf_index
,
2648 'vdu_count_index': vdu_count_index
,
2649 'primitive': primitive
,
2650 'primitive_params': mapped_primitive_params
}
2652 new_op
['operationState'] = operationState
2654 new_op
['detailed-status'] = detailed_status
2656 new_op
['lcmOperationType'] = operationType
2658 new_op
['RO_nsr_id'] = RO_nsr_id
2660 new_op
['RO_scaling_info'] = RO_scaling_info
2662 # No existing operations, create key 'operations' with current operation as first list element
2663 db_nslcmop_admin
.update({'operations': [new_op
]})
2664 op_list
= db_nslcmop_admin
.get('operations')
2666 # Existing operations, append operation to list
2667 op_list
.append(new_op
)
2669 db_nslcmop_update
= {'_admin.operations': op_list
}
2670 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2671 op_index
= len(op_list
) - 1
2674 # Helper methods for scale() sub-operations
2676 # pre-scale/post-scale:
2677 # Check for 3 different cases:
2678 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2679 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2680 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2681 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2682 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2683 # Find this sub-operation
2684 if RO_nsr_id
and RO_scaling_info
:
2685 operationType
= 'SCALE-RO'
2687 'member_vnf_index': vnf_index
,
2688 'RO_nsr_id': RO_nsr_id
,
2689 'RO_scaling_info': RO_scaling_info
,
2693 'member_vnf_index': vnf_index
,
2694 'primitive': vnf_config_primitive
,
2695 'primitive_params': primitive_params
,
2696 'lcmOperationType': operationType
2698 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
2699 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
2700 # a. New sub-operation
2701 # The sub-operation does not exist, add it.
2702 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2703 # The following parameters are set to None for all kind of scaling:
2705 vdu_count_index
= None
2707 if RO_nsr_id
and RO_scaling_info
:
2708 vnf_config_primitive
= None
2709 primitive_params
= None
2712 RO_scaling_info
= None
2713 # Initial status for sub-operation
2714 operationState
= 'PROCESSING'
2715 detailed_status
= 'In progress'
2716 # Add sub-operation for pre/post-scaling (zero or more operations)
2717 self
._add
_suboperation
(db_nslcmop
,
2722 vnf_config_primitive
,
2729 return self
.SUBOPERATION_STATUS_NEW
2731 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2732 # or op_index (operationState != 'COMPLETED')
2733 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
2735 # Function to return execution_environment id
2737 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
2738 # TODO vdu_index_count
2739 for vca
in vca_deployed_list
:
2740 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
2743 async def destroy_N2VC(self
, logging_text
, db_nslcmop
, vca_deployed
, config_descriptor
,
2744 vca_index
, destroy_ee
=True, exec_primitives
=True):
2746 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2747 :param logging_text:
2749 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2750 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2751 :param vca_index: index in the database _admin.deployed.VCA
2752 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2753 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2754 not executed properly
2755 :return: None or exception
2759 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2760 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
2764 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
2766 # execute terminate_primitives
2768 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
2769 config_descriptor
.get("terminate-config-primitive"), vca_deployed
.get("ee_descriptor_id"))
2770 vdu_id
= vca_deployed
.get("vdu_id")
2771 vdu_count_index
= vca_deployed
.get("vdu_count_index")
2772 vdu_name
= vca_deployed
.get("vdu_name")
2773 vnf_index
= vca_deployed
.get("member-vnf-index")
2774 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
2775 for seq
in terminate_primitives
:
2776 # For each sequence in list, get primitive and call _ns_execute_primitive()
2777 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
2778 vnf_index
, seq
.get("name"))
2779 self
.logger
.debug(logging_text
+ step
)
2780 # Create the primitive for each sequence, i.e. "primitive": "touch"
2781 primitive
= seq
.get('name')
2782 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
2785 self
._add
_suboperation
(db_nslcmop
,
2791 mapped_primitive_params
)
2792 # Sub-operations: Call _ns_execute_primitive() instead of action()
2794 result
, result_detail
= await self
._ns
_execute
_primitive
(vca_deployed
["ee_id"], primitive
,
2795 mapped_primitive_params
,
2797 except LcmException
:
2798 # this happens when VCA is not deployed. In this case it is not needed to terminate
2800 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
2801 if result
not in result_ok
:
2802 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2803 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
2804 # set that this VCA do not need terminated
2805 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
2806 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
2808 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
2809 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
2812 await self
.vca_map
[vca_type
].delete_execution_environment(vca_deployed
["ee_id"])
2814 async def _delete_all_N2VC(self
, db_nsr
: dict):
2815 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
2816 namespace
= "." + db_nsr
["_id"]
2818 await self
.n2vc
.delete_namespace(namespace
=namespace
, total_timeout
=self
.timeout_charm_delete
)
2819 except N2VCNotFound
: # already deleted. Skip
2821 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
2823 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
2825 Terminates a deployment from RO
2826 :param logging_text:
2827 :param nsr_deployed: db_nsr._admin.deployed
2830 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2831 this method will update only the index 2, but it will write on database the concatenated content of the list
2836 ro_nsr_id
= ro_delete_action
= None
2837 if nsr_deployed
and nsr_deployed
.get("RO"):
2838 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
2839 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
2842 stage
[2] = "Deleting ns from VIM."
2843 db_nsr_update
["detailed-status"] = " ".join(stage
)
2844 self
._write
_op
_status
(nslcmop_id
, stage
)
2845 self
.logger
.debug(logging_text
+ stage
[2])
2846 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2847 self
._write
_op
_status
(nslcmop_id
, stage
)
2848 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
2849 ro_delete_action
= desc
["action_id"]
2850 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2851 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2852 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2853 if ro_delete_action
:
2854 # wait until NS is deleted from VIM
2855 stage
[2] = "Waiting ns deleted from VIM."
2856 detailed_status_old
= None
2857 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
2859 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2860 self
._write
_op
_status
(nslcmop_id
, stage
)
2862 delete_timeout
= 20 * 60 # 20 minutes
2863 while delete_timeout
> 0:
2864 desc
= await self
.RO
.show(
2866 item_id_name
=ro_nsr_id
,
2867 extra_item
="action",
2868 extra_item_id
=ro_delete_action
)
2871 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
2873 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
2874 if ns_status
== "ERROR":
2875 raise ROclient
.ROClientException(ns_status_info
)
2876 elif ns_status
== "BUILD":
2877 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
2878 elif ns_status
== "ACTIVE":
2879 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2880 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2883 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
2884 if stage
[2] != detailed_status_old
:
2885 detailed_status_old
= stage
[2]
2886 db_nsr_update
["detailed-status"] = " ".join(stage
)
2887 self
._write
_op
_status
(nslcmop_id
, stage
)
2888 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2889 await asyncio
.sleep(5, loop
=self
.loop
)
2891 else: # delete_timeout <= 0:
2892 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
2894 except Exception as e
:
2895 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2896 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2897 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2898 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2899 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2900 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
2901 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2902 failed_detail
.append("delete conflict: {}".format(e
))
2903 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
2905 failed_detail
.append("delete error: {}".format(e
))
2906 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
2909 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
2910 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
2912 stage
[2] = "Deleting nsd from RO."
2913 db_nsr_update
["detailed-status"] = " ".join(stage
)
2914 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2915 self
._write
_op
_status
(nslcmop_id
, stage
)
2916 await self
.RO
.delete("nsd", ro_nsd_id
)
2917 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
2918 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2919 except Exception as e
:
2920 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2921 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2922 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
2923 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2924 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
2925 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2927 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
2928 self
.logger
.error(logging_text
+ failed_detail
[-1])
2930 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
2931 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
2932 if not vnf_deployed
or not vnf_deployed
["id"]:
2935 ro_vnfd_id
= vnf_deployed
["id"]
2936 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2937 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
2938 db_nsr_update
["detailed-status"] = " ".join(stage
)
2939 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2940 self
._write
_op
_status
(nslcmop_id
, stage
)
2941 await self
.RO
.delete("vnfd", ro_vnfd_id
)
2942 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
2943 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2944 except Exception as e
:
2945 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2946 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2947 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
2948 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2949 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
2950 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2952 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
2953 self
.logger
.error(logging_text
+ failed_detail
[-1])
2956 stage
[2] = "Error deleting from VIM"
2958 stage
[2] = "Deleted from VIM"
2959 db_nsr_update
["detailed-status"] = " ".join(stage
)
2960 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2961 self
._write
_op
_status
(nslcmop_id
, stage
)
2964 raise LcmException("; ".join(failed_detail
))
2966 async def terminate(self
, nsr_id
, nslcmop_id
):
2967 # Try to lock HA task here
2968 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
2969 if not task_is_locked_by_me
:
2972 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
2973 self
.logger
.debug(logging_text
+ "Enter")
2974 timeout_ns_terminate
= self
.timeout_ns_terminate
2977 operation_params
= None
2979 error_list
= [] # annotates all failed error messages
2980 db_nslcmop_update
= {}
2981 autoremove
= False # autoremove after terminated
2982 tasks_dict_info
= {}
2984 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
2985 # ^ contains [stage, step, VIM-status]
2987 # wait for any previous tasks in process
2988 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
2990 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
2991 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
2992 operation_params
= db_nslcmop
.get("operationParams") or {}
2993 if operation_params
.get("timeout_ns_terminate"):
2994 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
2995 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
2996 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2998 db_nsr_update
["operational-status"] = "terminating"
2999 db_nsr_update
["config-status"] = "terminating"
3000 self
._write
_ns
_status
(
3002 ns_state
="TERMINATING",
3003 current_operation
="TERMINATING",
3004 current_operation_id
=nslcmop_id
,
3005 other_update
=db_nsr_update
3007 self
._write
_op
_status
(
3012 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
3013 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
3016 stage
[1] = "Getting vnf descriptors from db."
3017 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
3018 db_vnfds_from_id
= {}
3019 db_vnfds_from_member_index
= {}
3021 for vnfr
in db_vnfrs_list
:
3022 vnfd_id
= vnfr
["vnfd-id"]
3023 if vnfd_id
not in db_vnfds_from_id
:
3024 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3025 db_vnfds_from_id
[vnfd_id
] = vnfd
3026 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3028 # Destroy individual execution environments when there are terminating primitives.
3029 # Rest of EE will be deleted at once
3030 # TODO - check before calling _destroy_N2VC
3031 # if not operation_params.get("skip_terminate_primitives"):#
3032 # or not vca.get("needed_terminate"):
3033 stage
[0] = "Stage 2/3 execute terminating primitives."
3034 self
.logger
.debug(logging_text
+ stage
[0])
3035 stage
[1] = "Looking execution environment that needs terminate."
3036 self
.logger
.debug(logging_text
+ stage
[1])
3038 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3039 config_descriptor
= None
3040 if not vca
or not vca
.get("ee_id"):
3042 if not vca
.get("member-vnf-index"):
3044 config_descriptor
= db_nsr
.get("ns-configuration")
3045 elif vca
.get("vdu_id"):
3046 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3047 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
3048 elif vca
.get("kdu_name"):
3049 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3050 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
3052 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3053 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
3054 vca_type
= vca
.get("type")
3055 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3056 vca
.get("needed_terminate"))
3057 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3058 # pending native charms
3059 destroy_ee
= True if vca_type
in ("helm", "helm-v3", "native_charm") else False
3060 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3061 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3062 task
= asyncio
.ensure_future(
3063 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
, vca_index
,
3064 destroy_ee
, exec_terminate_primitives
))
3065 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3067 # wait for pending tasks of terminate primitives
3069 self
.logger
.debug(logging_text
+ 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
3070 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3071 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3073 tasks_dict_info
.clear()
3075 return # raise LcmException("; ".join(error_list))
3077 # remove All execution environments at once
3078 stage
[0] = "Stage 3/3 delete all."
3080 if nsr_deployed
.get("VCA"):
3081 stage
[1] = "Deleting all execution environments."
3082 self
.logger
.debug(logging_text
+ stage
[1])
3083 task_delete_ee
= asyncio
.ensure_future(asyncio
.wait_for(self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
),
3084 timeout
=self
.timeout_charm_delete
))
3085 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3086 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3088 # Delete from k8scluster
3089 stage
[1] = "Deleting KDUs."
3090 self
.logger
.debug(logging_text
+ stage
[1])
3091 # print(nsr_deployed)
3092 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3093 if not kdu
or not kdu
.get("kdu-instance"):
3095 kdu_instance
= kdu
.get("kdu-instance")
3096 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3097 task_delete_kdu_instance
= asyncio
.ensure_future(
3098 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3099 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3100 kdu_instance
=kdu_instance
))
3102 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3103 format(kdu
.get("k8scluster-type")))
3105 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3108 stage
[1] = "Deleting ns from VIM."
3110 task_delete_ro
= asyncio
.ensure_future(
3111 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3113 task_delete_ro
= asyncio
.ensure_future(
3114 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3115 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3117 # rest of staff will be done at finally
3119 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3120 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3122 except asyncio
.CancelledError
:
3123 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3124 exc
= "Operation was cancelled"
3125 except Exception as e
:
3126 exc
= traceback
.format_exc()
3127 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3130 error_list
.append(str(exc
))
3132 # wait for pending tasks
3134 stage
[1] = "Waiting for terminate pending tasks."
3135 self
.logger
.debug(logging_text
+ stage
[1])
3136 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3138 stage
[1] = stage
[2] = ""
3139 except asyncio
.CancelledError
:
3140 error_list
.append("Cancelled")
3141 # TODO cancell all tasks
3142 except Exception as exc
:
3143 error_list
.append(str(exc
))
3144 # update status at database
3146 error_detail
= "; ".join(error_list
)
3147 # self.logger.error(logging_text + error_detail)
3148 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
3149 error_description_nsr
= 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id
, stage
[0])
3151 db_nsr_update
["operational-status"] = "failed"
3152 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3153 db_nslcmop_update
["detailed-status"] = error_detail
3154 nslcmop_operation_state
= "FAILED"
3158 error_description_nsr
= error_description_nslcmop
= None
3159 ns_state
= "NOT_INSTANTIATED"
3160 db_nsr_update
["operational-status"] = "terminated"
3161 db_nsr_update
["detailed-status"] = "Done"
3162 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3163 db_nslcmop_update
["detailed-status"] = "Done"
3164 nslcmop_operation_state
= "COMPLETED"
3167 self
._write
_ns
_status
(
3170 current_operation
="IDLE",
3171 current_operation_id
=None,
3172 error_description
=error_description_nsr
,
3173 error_detail
=error_detail
,
3174 other_update
=db_nsr_update
3176 self
._write
_op
_status
(
3179 error_message
=error_description_nslcmop
,
3180 operation_state
=nslcmop_operation_state
,
3181 other_update
=db_nslcmop_update
,
3183 if ns_state
== "NOT_INSTANTIATED":
3185 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "NOT_INSTANTIATED"})
3186 except DbException
as e
:
3187 self
.logger
.warn(logging_text
+ 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3189 if operation_params
:
3190 autoremove
= operation_params
.get("autoremove", False)
3191 if nslcmop_operation_state
:
3193 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3194 "operationState": nslcmop_operation_state
,
3195 "autoremove": autoremove
},
3197 except Exception as e
:
3198 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3200 self
.logger
.debug(logging_text
+ "Exit")
3201 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3203 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3205 error_detail_list
= []
3207 pending_tasks
= list(created_tasks_info
.keys())
3208 num_tasks
= len(pending_tasks
)
3210 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3211 self
._write
_op
_status
(nslcmop_id
, stage
)
3212 while pending_tasks
:
3214 _timeout
= timeout
+ time_start
- time()
3215 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3216 return_when
=asyncio
.FIRST_COMPLETED
)
3217 num_done
+= len(done
)
3218 if not done
: # Timeout
3219 for task
in pending_tasks
:
3220 new_error
= created_tasks_info
[task
] + ": Timeout"
3221 error_detail_list
.append(new_error
)
3222 error_list
.append(new_error
)
3225 if task
.cancelled():
3228 exc
= task
.exception()
3230 if isinstance(exc
, asyncio
.TimeoutError
):
3232 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3233 error_list
.append(created_tasks_info
[task
])
3234 error_detail_list
.append(new_error
)
3235 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3236 K8sException
, NgRoException
)):
3237 self
.logger
.error(logging_text
+ new_error
)
3239 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3240 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + " " + exc_traceback
)
3242 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3243 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3245 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3246 if nsr_id
: # update also nsr
3247 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3248 "errorDetail": ". ".join(error_detail_list
)})
3249 self
._write
_op
_status
(nslcmop_id
, stage
)
3250 return error_detail_list
3253 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3255 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3256 The default-value is used. If it is between < > it look for a value at instantiation_params
3257 :param primitive_desc: portion of VNFD/NSD that describes primitive
3258 :param params: Params provided by user
3259 :param instantiation_params: Instantiation params provided by user
3260 :return: a dictionary with the calculated params
3262 calculated_params
= {}
3263 for parameter
in primitive_desc
.get("parameter", ()):
3264 param_name
= parameter
["name"]
3265 if param_name
in params
:
3266 calculated_params
[param_name
] = params
[param_name
]
3267 elif "default-value" in parameter
or "value" in parameter
:
3268 if "value" in parameter
:
3269 calculated_params
[param_name
] = parameter
["value"]
3271 calculated_params
[param_name
] = parameter
["default-value"]
3272 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3273 and calculated_params
[param_name
].endswith(">"):
3274 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3275 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3277 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3278 format(calculated_params
[param_name
], primitive_desc
["name"]))
3280 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3281 format(param_name
, primitive_desc
["name"]))
3283 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3284 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
],
3285 default_flow_style
=True, width
=256)
3286 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3287 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3288 if parameter
.get("data-type") == "INTEGER":
3290 calculated_params
[param_name
] = int(calculated_params
[param_name
])
3291 except ValueError: # error converting string to int
3293 "Parameter {} of primitive {} must be integer".format(param_name
, primitive_desc
["name"]))
3294 elif parameter
.get("data-type") == "BOOLEAN":
3295 calculated_params
[param_name
] = not ((str(calculated_params
[param_name
])).lower() == 'false')
3297 # add always ns_config_info if primitive name is config
3298 if primitive_desc
["name"] == "config":
3299 if "ns_config_info" in instantiation_params
:
3300 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3301 return calculated_params
3303 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None,
3304 ee_descriptor_id
=None):
3305 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3306 for vca
in deployed_vca
:
3309 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3311 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3313 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3315 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
3319 # vca_deployed not found
3320 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3321 " is not deployed".format(member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
,
3324 ee_id
= vca
.get("ee_id")
3325 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3327 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3328 "execution environment"
3329 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3330 return ee_id
, vca_type
3332 async def _ns_execute_primitive(self
, ee_id
, primitive
, primitive_params
, retries
=0, retries_interval
=30,
3333 timeout
=None, vca_type
=None, db_dict
=None) -> (str, str):
3335 if primitive
== "config":
3336 primitive_params
= {"params": primitive_params
}
3338 vca_type
= vca_type
or "lxc_proxy_charm"
3342 output
= await asyncio
.wait_for(
3343 self
.vca_map
[vca_type
].exec_primitive(
3345 primitive_name
=primitive
,
3346 params_dict
=primitive_params
,
3347 progress_timeout
=self
.timeout_progress_primitive
,
3348 total_timeout
=self
.timeout_primitive
,
3350 timeout
=timeout
or self
.timeout_primitive
)
3353 except asyncio
.CancelledError
:
3355 except Exception as e
: # asyncio.TimeoutError
3356 if isinstance(e
, asyncio
.TimeoutError
):
3360 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3362 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3364 return 'FAILED', str(e
)
3366 return 'COMPLETED', output
3368 except (LcmException
, asyncio
.CancelledError
):
3370 except Exception as e
:
3371 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3373 async def action(self
, nsr_id
, nslcmop_id
):
3374 # Try to lock HA task here
3375 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3376 if not task_is_locked_by_me
:
3379 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3380 self
.logger
.debug(logging_text
+ "Enter")
3381 # get all needed from database
3385 db_nslcmop_update
= {}
3386 nslcmop_operation_state
= None
3387 error_description_nslcmop
= None
3390 # wait for any previous tasks in process
3391 step
= "Waiting for previous operations to terminate"
3392 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3394 self
._write
_ns
_status
(
3397 current_operation
="RUNNING ACTION",
3398 current_operation_id
=nslcmop_id
3401 step
= "Getting information from database"
3402 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3403 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3405 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3406 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3407 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3408 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3409 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3410 primitive
= db_nslcmop
["operationParams"]["primitive"]
3411 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3412 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3415 step
= "Getting vnfr from database"
3416 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3417 step
= "Getting vnfd from database"
3418 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3420 step
= "Getting nsd from database"
3421 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3423 # for backward compatibility
3424 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3425 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3426 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3427 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3429 # look for primitive
3430 config_primitive_desc
= descriptor_configuration
= None
3432 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
3434 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
3436 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
3438 descriptor_configuration
= db_nsd
.get("ns-configuration")
3440 if descriptor_configuration
and descriptor_configuration
.get("config-primitive"):
3441 for config_primitive
in descriptor_configuration
["config-primitive"]:
3442 if config_primitive
["name"] == primitive
:
3443 config_primitive_desc
= config_primitive
3446 if not config_primitive_desc
:
3447 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3448 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3450 primitive_name
= primitive
3451 ee_descriptor_id
= None
3453 primitive_name
= config_primitive_desc
.get("execution-environment-primitive", primitive
)
3454 ee_descriptor_id
= config_primitive_desc
.get("execution-environment-ref")
3458 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3459 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
3461 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3462 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3464 desc_params
= parse_yaml_strings(db_vnfr
.get("additionalParamsForVnf"))
3466 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
3467 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
3468 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
3470 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
3471 actions
.add(primitive
["name"])
3472 for primitive
in kdu_configuration
.get("config-primitive", []):
3473 actions
.add(primitive
["name"])
3474 kdu_action
= True if primitive_name
in actions
else False
3476 # TODO check if ns is in a proper status
3477 if kdu_name
and (primitive_name
in ("upgrade", "rollback", "status") or kdu_action
):
3478 # kdur and desc_params already set from before
3479 if primitive_params
:
3480 desc_params
.update(primitive_params
)
3481 # TODO Check if we will need something at vnf level
3482 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3483 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3486 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3488 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3489 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3490 raise LcmException(msg
)
3492 db_dict
= {"collection": "nsrs",
3493 "filter": {"_id": nsr_id
},
3494 "path": "_admin.deployed.K8s.{}".format(index
)}
3495 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
))
3496 step
= "Executing kdu {}".format(primitive_name
)
3497 if primitive_name
== "upgrade":
3498 if desc_params
.get("kdu_model"):
3499 kdu_model
= desc_params
.get("kdu_model")
3500 del desc_params
["kdu_model"]
3502 kdu_model
= kdu
.get("kdu-model")
3503 parts
= kdu_model
.split(sep
=":")
3505 kdu_model
= parts
[0]
3507 detailed_status
= await asyncio
.wait_for(
3508 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3509 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3510 kdu_instance
=kdu
.get("kdu-instance"),
3511 atomic
=True, kdu_model
=kdu_model
,
3512 params
=desc_params
, db_dict
=db_dict
,
3513 timeout
=timeout_ns_action
),
3514 timeout
=timeout_ns_action
+ 10)
3515 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3516 elif primitive_name
== "rollback":
3517 detailed_status
= await asyncio
.wait_for(
3518 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3519 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3520 kdu_instance
=kdu
.get("kdu-instance"),
3522 timeout
=timeout_ns_action
)
3523 elif primitive_name
== "status":
3524 detailed_status
= await asyncio
.wait_for(
3525 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3526 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3527 kdu_instance
=kdu
.get("kdu-instance")),
3528 timeout
=timeout_ns_action
)
3530 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3531 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3533 detailed_status
= await asyncio
.wait_for(
3534 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3535 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3536 kdu_instance
=kdu_instance
,
3537 primitive_name
=primitive_name
,
3538 params
=params
, db_dict
=db_dict
,
3539 timeout
=timeout_ns_action
),
3540 timeout
=timeout_ns_action
)
3543 nslcmop_operation_state
= 'COMPLETED'
3545 detailed_status
= ''
3546 nslcmop_operation_state
= 'FAILED'
3548 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"], member_vnf_index
=vnf_index
,
3549 vdu_id
=vdu_id
, vdu_count_index
=vdu_count_index
,
3550 ee_descriptor_id
=ee_descriptor_id
)
3551 db_nslcmop_notif
= {"collection": "nslcmops",
3552 "filter": {"_id": nslcmop_id
},
3553 "path": "admin.VCA"}
3554 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3556 primitive
=primitive_name
,
3557 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3558 timeout
=timeout_ns_action
,
3560 db_dict
=db_nslcmop_notif
)
3562 db_nslcmop_update
["detailed-status"] = detailed_status
3563 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3564 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3566 return # database update is called inside finally
3568 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3569 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3571 except asyncio
.CancelledError
:
3572 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3573 exc
= "Operation was cancelled"
3574 except asyncio
.TimeoutError
:
3575 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3577 except Exception as e
:
3578 exc
= traceback
.format_exc()
3579 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3582 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3583 "FAILED {}: {}".format(step
, exc
)
3584 nslcmop_operation_state
= "FAILED"
3586 self
._write
_ns
_status
(
3588 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3589 current_operation
="IDLE",
3590 current_operation_id
=None,
3591 # error_description=error_description_nsr,
3592 # error_detail=error_detail,
3593 other_update
=db_nsr_update
3596 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
3597 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
3599 if nslcmop_operation_state
:
3601 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3602 "operationState": nslcmop_operation_state
},
3604 except Exception as e
:
3605 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3606 self
.logger
.debug(logging_text
+ "Exit")
3607 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3608 return nslcmop_operation_state
, detailed_status
3610 async def scale(self
, nsr_id
, nslcmop_id
):
3611 # Try to lock HA task here
3612 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3613 if not task_is_locked_by_me
:
3616 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3617 stage
= ['', '', '']
3618 # ^ stage, step, VIM progress
3619 self
.logger
.debug(logging_text
+ "Enter")
3620 # get all needed from database
3622 db_nslcmop_update
= {}
3625 # in case of error, indicates what part of scale was failed to put nsr at error status
3626 scale_process
= None
3627 old_operational_status
= ""
3628 old_config_status
= ""
3630 # wait for any previous tasks in process
3631 step
= "Waiting for previous operations to terminate"
3632 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3633 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None,
3634 current_operation
="SCALING", current_operation_id
=nslcmop_id
)
3636 step
= "Getting nslcmop from database"
3637 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3638 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3640 step
= "Getting nsr from database"
3641 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3642 old_operational_status
= db_nsr
["operational-status"]
3643 old_config_status
= db_nsr
["config-status"]
3645 step
= "Parsing scaling parameters"
3646 db_nsr_update
["operational-status"] = "scaling"
3647 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3648 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3651 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3652 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3653 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3654 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3655 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3658 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3659 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3660 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
3661 # for backward compatibility
3662 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3663 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3664 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3665 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3667 step
= "Getting vnfr from database"
3668 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3670 step
= "Getting vnfd from database"
3671 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3673 step
= "Getting scaling-group-descriptor"
3674 scaling_descriptor
= find_in_list(
3678 lambda scale_desc
: scale_desc
["name"] == scaling_group
3680 if not scaling_descriptor
:
3681 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3682 "at vnfd:scaling-group-descriptor".format(scaling_group
))
3684 step
= "Sending scale order to VIM"
3685 # TODO check if ns is in a proper status
3687 if not db_nsr
["_admin"].get("scaling-group"):
3688 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
3689 admin_scale_index
= 0
3691 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
3692 if admin_scale_info
["name"] == scaling_group
:
3693 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
3695 else: # not found, set index one plus last element and add new entry with the name
3696 admin_scale_index
+= 1
3697 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
3698 RO_scaling_info
= []
3699 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
3700 if scaling_type
== "SCALE_OUT":
3701 if "aspect-delta-details" not in scaling_descriptor
:
3703 "Aspect delta details not fount in scaling descriptor {}".format(
3704 scaling_descriptor
["name"]
3707 # count if max-instance-count is reached
3708 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3710 vdu_scaling_info
["scaling_direction"] = "OUT"
3711 vdu_scaling_info
["vdu-create"] = {}
3712 for delta
in deltas
:
3713 for vdu_delta
in delta
["vdu-delta"]:
3714 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
3715 vdu_index
= get_vdu_index(db_vnfr
, vdu_delta
["id"])
3716 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
3718 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"]) or {}
3719 cloud_init_list
= []
3721 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3722 max_instance_count
= 10
3723 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
3724 max_instance_count
= vdu_profile
.get("max-number-of-instances", 10)
3726 deafult_instance_num
= get_number_of_instances(db_vnfd
, vdud
["id"])
3728 nb_scale_op
+= vdu_delta
.get("number-of-instances", 1)
3730 if nb_scale_op
+ deafult_instance_num
> max_instance_count
:
3732 "reached the limit of {} (max-instance-count) "
3733 "scaling-out operations for the "
3734 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3736 for x
in range(vdu_delta
.get("number-of-instances", 1)):
3738 # TODO Information of its own ip is not available because db_vnfr is not updated.
3739 additional_params
["OSM"] = get_osm_params(
3744 cloud_init_list
.append(
3745 self
._parse
_cloud
_init
(
3752 RO_scaling_info
.append(
3754 "osm_vdu_id": vdu_delta
["id"],
3755 "member-vnf-index": vnf_index
,
3757 "count": vdu_delta
.get("number-of-instances", 1)
3761 RO_scaling_info
[-1]["cloud_init"] = cloud_init_list
3762 vdu_scaling_info
["vdu-create"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3764 elif scaling_type
== "SCALE_IN":
3765 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
3766 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
3768 vdu_scaling_info
["scaling_direction"] = "IN"
3769 vdu_scaling_info
["vdu-delete"] = {}
3770 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3771 for delta
in deltas
:
3772 for vdu_delta
in delta
["vdu-delta"]:
3773 min_instance_count
= 0
3774 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3775 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
3776 min_instance_count
= vdu_profile
["min-number-of-instances"]
3778 deafult_instance_num
= get_number_of_instances(db_vnfd
, vdu_delta
["id"])
3780 nb_scale_op
-= vdu_delta
.get("number-of-instances", 1)
3781 if nb_scale_op
+ deafult_instance_num
< min_instance_count
:
3783 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3784 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3786 RO_scaling_info
.append({"osm_vdu_id": vdu_delta
["id"], "member-vnf-index": vnf_index
,
3787 "type": "delete", "count": vdu_delta
.get("number-of-instances", 1)})
3788 vdu_scaling_info
["vdu-delete"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3790 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3791 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
3792 if vdu_scaling_info
["scaling_direction"] == "IN":
3793 for vdur
in reversed(db_vnfr
["vdur"]):
3794 if vdu_delete
.get(vdur
["vdu-id-ref"]):
3795 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
3796 vdu_scaling_info
["vdu"].append({
3797 "name": vdur
.get("name") or vdur
.get("vdu-name"),
3798 "vdu_id": vdur
["vdu-id-ref"],
3801 for interface
in vdur
["interfaces"]:
3802 vdu_scaling_info
["vdu"][-1]["interface"].append({
3803 "name": interface
["name"],
3804 "ip_address": interface
["ip-address"],
3805 "mac_address": interface
.get("mac-address"),
3807 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3810 step
= "Executing pre-scale vnf-config-primitive"
3811 if scaling_descriptor
.get("scaling-config-action"):
3812 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3813 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
3814 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
3815 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3816 step
= db_nslcmop_update
["detailed-status"] = \
3817 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3819 # look for primitive
3820 for config_primitive
in (get_configuration(
3821 db_vnfd
, db_vnfd
["id"]
3822 ) or {}).get("config-primitive", ()):
3823 if config_primitive
["name"] == vnf_config_primitive
:
3827 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3828 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3829 "primitive".format(scaling_group
, vnf_config_primitive
))
3831 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3832 if db_vnfr
.get("additionalParamsForVnf"):
3833 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3835 scale_process
= "VCA"
3836 db_nsr_update
["config-status"] = "configuring pre-scaling"
3837 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3839 # Pre-scale retry check: Check if this sub-operation has been executed before
3840 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3841 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
3842 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3843 # Skip sub-operation
3844 result
= 'COMPLETED'
3845 result_detail
= 'Done'
3846 self
.logger
.debug(logging_text
+
3847 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3848 vnf_config_primitive
, result
, result_detail
))
3850 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3851 # New sub-operation: Get index of this sub-operation
3852 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3853 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3854 format(vnf_config_primitive
))
3856 # retry: Get registered params for this existing sub-operation
3857 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3858 vnf_index
= op
.get('member_vnf_index')
3859 vnf_config_primitive
= op
.get('primitive')
3860 primitive_params
= op
.get('primitive_params')
3861 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3862 format(vnf_config_primitive
))
3863 # Execute the primitive, either with new (first-time) or registered (reintent) args
3864 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
3865 primitive_name
= config_primitive
.get("execution-environment-primitive",
3866 vnf_config_primitive
)
3867 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3868 member_vnf_index
=vnf_index
,
3870 vdu_count_index
=None,
3871 ee_descriptor_id
=ee_descriptor_id
)
3872 result
, result_detail
= await self
._ns
_execute
_primitive
(
3873 ee_id
, primitive_name
, primitive_params
, vca_type
)
3874 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3875 vnf_config_primitive
, result
, result_detail
))
3876 # Update operationState = COMPLETED | FAILED
3877 self
._update
_suboperation
_status
(
3878 db_nslcmop
, op_index
, result
, result_detail
)
3880 if result
== "FAILED":
3881 raise LcmException(result_detail
)
3882 db_nsr_update
["config-status"] = old_config_status
3883 scale_process
= None
3886 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
3887 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
3891 scale_process
= "RO"
3892 if self
.ro_config
.get("ng"):
3893 await self
._scale
_ng
_ro
(logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
)
3894 vdu_scaling_info
.pop("vdu-create", None)
3895 vdu_scaling_info
.pop("vdu-delete", None)
3897 scale_process
= None
3899 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3902 # execute primitive service POST-SCALING
3903 step
= "Executing post-scale vnf-config-primitive"
3904 if scaling_descriptor
.get("scaling-config-action"):
3905 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3906 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
3907 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
3908 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3909 step
= db_nslcmop_update
["detailed-status"] = \
3910 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3912 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3913 if db_vnfr
.get("additionalParamsForVnf"):
3914 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3916 # look for primitive
3917 for config_primitive
in (
3918 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
3919 ).get("config-primitive", ()):
3920 if config_primitive
["name"] == vnf_config_primitive
:
3924 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
3925 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
3926 "config-primitive".format(scaling_group
, vnf_config_primitive
))
3927 scale_process
= "VCA"
3928 db_nsr_update
["config-status"] = "configuring post-scaling"
3929 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3931 # Post-scale retry check: Check if this sub-operation has been executed before
3932 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3933 db_nslcmop
, nslcmop_id
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
3934 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3935 # Skip sub-operation
3936 result
= 'COMPLETED'
3937 result_detail
= 'Done'
3938 self
.logger
.debug(logging_text
+
3939 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
3940 format(vnf_config_primitive
, result
, result_detail
))
3942 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3943 # New sub-operation: Get index of this sub-operation
3944 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3945 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3946 format(vnf_config_primitive
))
3948 # retry: Get registered params for this existing sub-operation
3949 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3950 vnf_index
= op
.get('member_vnf_index')
3951 vnf_config_primitive
= op
.get('primitive')
3952 primitive_params
= op
.get('primitive_params')
3953 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3954 format(vnf_config_primitive
))
3955 # Execute the primitive, either with new (first-time) or registered (reintent) args
3956 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
3957 primitive_name
= config_primitive
.get("execution-environment-primitive",
3958 vnf_config_primitive
)
3959 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3960 member_vnf_index
=vnf_index
,
3962 vdu_count_index
=None,
3963 ee_descriptor_id
=ee_descriptor_id
)
3964 result
, result_detail
= await self
._ns
_execute
_primitive
(
3965 ee_id
, primitive_name
, primitive_params
, vca_type
)
3966 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3967 vnf_config_primitive
, result
, result_detail
))
3968 # Update operationState = COMPLETED | FAILED
3969 self
._update
_suboperation
_status
(
3970 db_nslcmop
, op_index
, result
, result_detail
)
3972 if result
== "FAILED":
3973 raise LcmException(result_detail
)
3974 db_nsr_update
["config-status"] = old_config_status
3975 scale_process
= None
3978 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
3979 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
3980 else old_operational_status
3981 db_nsr_update
["config-status"] = old_config_status
3983 except (ROclient
.ROClientException
, DbException
, LcmException
, NgRoException
) as e
:
3984 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3986 except asyncio
.CancelledError
:
3987 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3988 exc
= "Operation was cancelled"
3989 except Exception as e
:
3990 exc
= traceback
.format_exc()
3991 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3993 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE", current_operation_id
=None)
3995 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
3996 nslcmop_operation_state
= "FAILED"
3998 db_nsr_update
["operational-status"] = old_operational_status
3999 db_nsr_update
["config-status"] = old_config_status
4000 db_nsr_update
["detailed-status"] = ""
4002 if "VCA" in scale_process
:
4003 db_nsr_update
["config-status"] = "failed"
4004 if "RO" in scale_process
:
4005 db_nsr_update
["operational-status"] = "failed"
4006 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
4009 error_description_nslcmop
= None
4010 nslcmop_operation_state
= "COMPLETED"
4011 db_nslcmop_update
["detailed-status"] = "Done"
4013 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
4014 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
4016 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE",
4017 current_operation_id
=None, other_update
=db_nsr_update
)
4019 if nslcmop_operation_state
:
4021 msg
= {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
, "operationState": nslcmop_operation_state
}
4022 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
4023 except Exception as e
:
4024 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
4025 self
.logger
.debug(logging_text
+ "Exit")
4026 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
4028 async def _scale_ng_ro(self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
):
4029 nsr_id
= db_nslcmop
["nsInstanceId"]
4030 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4033 # read from db: vnfd's for every vnf
4036 # for each vnf in ns, read vnfd
4037 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
4038 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
4039 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
4040 # if we haven't this vnfd, read it from db
4041 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
4043 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4044 db_vnfds
.append(vnfd
)
4045 n2vc_key
= self
.n2vc
.get_public_key()
4046 n2vc_key_list
= [n2vc_key
]
4047 self
.scale_vnfr(db_vnfr
, vdu_scaling_info
.get("vdu-create"), vdu_scaling_info
.get("vdu-delete"),
4049 # db_vnfr has been updated, update db_vnfrs to use it
4050 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
4051 await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, db_nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
4052 db_vnfds
, n2vc_key_list
, stage
=stage
, start_deploy
=time(),
4053 timeout_ns_deploy
=self
.timeout_ns_deploy
)
4054 if vdu_scaling_info
.get("vdu-delete"):
4055 self
.scale_vnfr(db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False)
4057 async def add_prometheus_metrics(self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
):
4058 if not self
.prometheus
:
4060 # look if exist a file called 'prometheus*.j2' and
4061 artifact_content
= self
.fs
.dir_ls(artifact_path
)
4062 job_file
= next((f
for f
in artifact_content
if f
.startswith("prometheus") and f
.endswith(".j2")), None)
4065 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
4069 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
4070 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
4072 vnfr_id
= vnfr_id
.replace("-", "")
4074 "JOB_NAME": vnfr_id
,
4075 "TARGET_IP": target_ip
,
4076 "EXPORTER_POD_IP": host_name
,
4077 "EXPORTER_POD_PORT": host_port
,
4079 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
4080 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4081 for job
in job_list
:
4082 if not isinstance(job
.get("job_name"), str) or vnfr_id
not in job
["job_name"]:
4083 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
4084 job
["nsr_id"] = nsr_id
4085 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
4086 if await self
.prometheus
.update(job_dict
):
4087 return list(job_dict
.keys())
4089 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4091 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4093 :param: vim_account_id: VIM Account ID
4095 :return: (cloud_name, cloud_credential)
4097 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4098 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
4100 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4102 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4104 :param: vim_account_id: VIM Account ID
4106 :return: (cloud_name, cloud_credential)
4108 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4109 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")