1 # -*- coding: utf-8 -*-
4 # Copyright 2018 Telefonica S.A.
6 # Licensed under the Apache License, Version 2.0 (the "License"); you may
7 # not use this file except in compliance with the License. You may obtain
8 # a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15 # License for the specific language governing permissions and limitations
22 import logging
.handlers
25 from jinja2
import Environment
, TemplateError
, TemplateNotFound
, StrictUndefined
, UndefinedError
27 from osm_lcm
import ROclient
28 from osm_lcm
.ng_ro
import NgRoClient
, NgRoException
29 from osm_lcm
.lcm_utils
import LcmException
, LcmExceptionNoMgmtIP
, LcmBase
, deep_get
, get_iterable
, populate_dict
30 from osm_lcm
.data_utils
.nsd
import get_vnf_profiles
31 from osm_lcm
.data_utils
.vnfd
import get_vdu_list
, get_vdu_profile
, \
32 get_ee_sorted_initial_config_primitive_list
, get_ee_sorted_terminate_config_primitive_list
, \
33 get_kdu_list
, get_virtual_link_profiles
, get_vdu
, get_configuration
, \
34 get_vdu_index
, get_scaling_aspect
, get_number_of_instances
, get_juju_ee_ref
35 from osm_lcm
.data_utils
.list_utils
import find_in_list
36 from osm_lcm
.data_utils
.vnfr
import get_osm_params
, get_vdur_index
37 from osm_lcm
.data_utils
.dict_utils
import parse_yaml_strings
38 from osm_lcm
.data_utils
.database
.vim_account
import VimAccountDB
39 from n2vc
.k8s_helm_conn
import K8sHelmConnector
40 from n2vc
.k8s_helm3_conn
import K8sHelm3Connector
41 from n2vc
.k8s_juju_conn
import K8sJujuConnector
43 from osm_common
.dbbase
import DbException
44 from osm_common
.fsbase
import FsException
46 from osm_lcm
.data_utils
.database
.database
import Database
47 from osm_lcm
.data_utils
.filesystem
.filesystem
import Filesystem
49 from n2vc
.n2vc_juju_conn
import N2VCJujuConnector
50 from n2vc
.exceptions
import N2VCException
, N2VCNotFound
, K8sException
52 from osm_lcm
.lcm_helm_conn
import LCMHelmConn
54 from copy
import copy
, deepcopy
56 from uuid
import uuid4
58 from random
import randint
60 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
64 timeout_vca_on_error
= 5 * 60 # Time for charm from first time at blocked,error status to mark as failed
65 timeout_ns_deploy
= 2 * 3600 # default global timeout for deployment a ns
66 timeout_ns_terminate
= 1800 # default global timeout for un deployment a ns
67 timeout_charm_delete
= 10 * 60
68 timeout_primitive
= 30 * 60 # timeout for primitive execution
69 timeout_progress_primitive
= 10 * 60 # timeout for some progress in a primitive execution
71 SUBOPERATION_STATUS_NOT_FOUND
= -1
72 SUBOPERATION_STATUS_NEW
= -2
73 SUBOPERATION_STATUS_SKIP
= -3
74 task_name_deploy_vca
= "Deploying VCA"
76 def __init__(self
, msg
, lcm_tasks
, config
, loop
, prometheus
=None):
78 Init, Connect to database, filesystem storage, and messaging
79 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
84 logger
=logging
.getLogger('lcm.ns')
87 self
.db
= Database().instance
.db
88 self
.fs
= Filesystem().instance
.fs
90 self
.lcm_tasks
= lcm_tasks
91 self
.timeout
= config
["timeout"]
92 self
.ro_config
= config
["ro_config"]
93 self
.ng_ro
= config
["ro_config"].get("ng")
94 self
.vca_config
= config
["VCA"].copy()
96 # create N2VC connector
97 self
.n2vc
= N2VCJujuConnector(
100 url
='{}:{}'.format(self
.vca_config
['host'], self
.vca_config
['port']),
101 username
=self
.vca_config
.get('user', None),
102 vca_config
=self
.vca_config
,
103 on_update_db
=self
._on
_update
_n
2vc
_db
,
108 self
.conn_helm_ee
= LCMHelmConn(
113 vca_config
=self
.vca_config
,
114 on_update_db
=self
._on
_update
_n
2vc
_db
117 self
.k8sclusterhelm2
= K8sHelmConnector(
118 kubectl_command
=self
.vca_config
.get("kubectlpath"),
119 helm_command
=self
.vca_config
.get("helmpath"),
126 self
.k8sclusterhelm3
= K8sHelm3Connector(
127 kubectl_command
=self
.vca_config
.get("kubectlpath"),
128 helm_command
=self
.vca_config
.get("helm3path"),
135 self
.k8sclusterjuju
= K8sJujuConnector(
136 kubectl_command
=self
.vca_config
.get("kubectlpath"),
137 juju_command
=self
.vca_config
.get("jujupath"),
141 vca_config
=self
.vca_config
,
146 self
.k8scluster_map
= {
147 "helm-chart": self
.k8sclusterhelm2
,
148 "helm-chart-v3": self
.k8sclusterhelm3
,
149 "chart": self
.k8sclusterhelm3
,
150 "juju-bundle": self
.k8sclusterjuju
,
151 "juju": self
.k8sclusterjuju
,
155 "lxc_proxy_charm": self
.n2vc
,
156 "native_charm": self
.n2vc
,
157 "k8s_proxy_charm": self
.n2vc
,
158 "helm": self
.conn_helm_ee
,
159 "helm-v3": self
.conn_helm_ee
162 self
.prometheus
= prometheus
165 self
.RO
= NgRoClient(self
.loop
, **self
.ro_config
)
168 def increment_ip_mac(ip_mac
, vm_index
=1):
169 if not isinstance(ip_mac
, str):
172 # try with ipv4 look for last dot
173 i
= ip_mac
.rfind(".")
176 return "{}{}".format(ip_mac
[:i
], int(ip_mac
[i
:]) + vm_index
)
177 # try with ipv6 or mac look for last colon. Operate in hex
178 i
= ip_mac
.rfind(":")
181 # format in hex, len can be 2 for mac or 4 for ipv6
182 return ("{}{:0" + str(len(ip_mac
) - i
) + "x}").format(ip_mac
[:i
], int(ip_mac
[i
:], 16) + vm_index
)
187 def _on_update_ro_db(self
, nsrs_id
, ro_descriptor
):
189 # self.logger.debug('_on_update_ro_db(nsrs_id={}'.format(nsrs_id))
192 # TODO filter RO descriptor fields...
196 # db_dict['deploymentStatus'] = yaml.dump(ro_descriptor, default_flow_style=False, indent=2)
197 db_dict
['deploymentStatus'] = ro_descriptor
198 self
.update_db_2("nsrs", nsrs_id
, db_dict
)
200 except Exception as e
:
201 self
.logger
.warn('Cannot write database RO deployment for ns={} -> {}'.format(nsrs_id
, e
))
203 async def _on_update_n2vc_db(self
, table
, filter, path
, updated_data
):
205 # remove last dot from path (if exists)
206 if path
.endswith('.'):
209 # self.logger.debug('_on_update_n2vc_db(table={}, filter={}, path={}, updated_data={}'
210 # .format(table, filter, path, updated_data))
214 nsr_id
= filter.get('_id')
216 # read ns record from database
217 nsr
= self
.db
.get_one(table
='nsrs', q_filter
=filter)
218 current_ns_status
= nsr
.get('nsState')
220 # get vca status for NS
221 status_dict
= await self
.n2vc
.get_status(namespace
='.' + nsr_id
, yaml_format
=False)
225 db_dict
['vcaStatus'] = status_dict
227 # update configurationStatus for this VCA
229 vca_index
= int(path
[path
.rfind(".")+1:])
231 vca_list
= deep_get(target_dict
=nsr
, key_list
=('_admin', 'deployed', 'VCA'))
232 vca_status
= vca_list
[vca_index
].get('status')
234 configuration_status_list
= nsr
.get('configurationStatus')
235 config_status
= configuration_status_list
[vca_index
].get('status')
237 if config_status
== 'BROKEN' and vca_status
!= 'failed':
238 db_dict
['configurationStatus'][vca_index
] = 'READY'
239 elif config_status
!= 'BROKEN' and vca_status
== 'failed':
240 db_dict
['configurationStatus'][vca_index
] = 'BROKEN'
241 except Exception as e
:
242 # not update configurationStatus
243 self
.logger
.debug('Error updating vca_index (ignore): {}'.format(e
))
245 # if nsState = 'READY' check if juju is reporting some error => nsState = 'DEGRADED'
246 # if nsState = 'DEGRADED' check if all is OK
248 if current_ns_status
in ('READY', 'DEGRADED'):
249 error_description
= ''
251 if status_dict
.get('machines'):
252 for machine_id
in status_dict
.get('machines'):
253 machine
= status_dict
.get('machines').get(machine_id
)
254 # check machine agent-status
255 if machine
.get('agent-status'):
256 s
= machine
.get('agent-status').get('status')
259 error_description
+= 'machine {} agent-status={} ; '.format(machine_id
, s
)
260 # check machine instance status
261 if machine
.get('instance-status'):
262 s
= machine
.get('instance-status').get('status')
265 error_description
+= 'machine {} instance-status={} ; '.format(machine_id
, s
)
267 if status_dict
.get('applications'):
268 for app_id
in status_dict
.get('applications'):
269 app
= status_dict
.get('applications').get(app_id
)
270 # check application status
271 if app
.get('status'):
272 s
= app
.get('status').get('status')
275 error_description
+= 'application {} status={} ; '.format(app_id
, s
)
277 if error_description
:
278 db_dict
['errorDescription'] = error_description
279 if current_ns_status
== 'READY' and is_degraded
:
280 db_dict
['nsState'] = 'DEGRADED'
281 if current_ns_status
== 'DEGRADED' and not is_degraded
:
282 db_dict
['nsState'] = 'READY'
285 self
.update_db_2("nsrs", nsr_id
, db_dict
)
287 except (asyncio
.CancelledError
, asyncio
.TimeoutError
):
289 except Exception as e
:
290 self
.logger
.warn('Error updating NS state for ns={}: {}'.format(nsr_id
, e
))
293 def _parse_cloud_init(cloud_init_text
, additional_params
, vnfd_id
, vdu_id
):
295 env
= Environment(undefined
=StrictUndefined
)
296 template
= env
.from_string(cloud_init_text
)
297 return template
.render(additional_params
or {})
298 except UndefinedError
as e
:
299 raise LcmException("Variable {} at vnfd[id={}]:vdu[id={}]:cloud-init/cloud-init-"
300 "file, must be provided in the instantiation parameters inside the "
301 "'additionalParamsForVnf/Vdu' block".format(e
, vnfd_id
, vdu_id
))
302 except (TemplateError
, TemplateNotFound
) as e
:
303 raise LcmException("Error parsing Jinja2 to cloud-init content at vnfd[id={}]:vdu[id={}]: {}".
304 format(vnfd_id
, vdu_id
, e
))
306 def _get_vdu_cloud_init_content(self
, vdu
, vnfd
):
307 cloud_init_content
= cloud_init_file
= None
309 if vdu
.get("cloud-init-file"):
310 base_folder
= vnfd
["_admin"]["storage"]
311 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
312 vdu
["cloud-init-file"])
313 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
314 cloud_init_content
= ci_file
.read()
315 elif vdu
.get("cloud-init"):
316 cloud_init_content
= vdu
["cloud-init"]
318 return cloud_init_content
319 except FsException
as e
:
320 raise LcmException("Error reading vnfd[id={}]:vdu[id={}]:cloud-init-file={}: {}".
321 format(vnfd
["id"], vdu
["id"], cloud_init_file
, e
))
323 def _get_vdu_additional_params(self
, db_vnfr
, vdu_id
):
324 vdur
= next(vdur
for vdur
in db_vnfr
.get("vdur") if vdu_id
== vdur
["vdu-id-ref"])
325 additional_params
= vdur
.get("additionalParams")
326 return parse_yaml_strings(additional_params
)
328 def vnfd2RO(self
, vnfd
, new_id
=None, additionalParams
=None, nsrId
=None):
330 Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
331 :param vnfd: input vnfd
332 :param new_id: overrides vnf id if provided
333 :param additionalParams: Instantiation params for VNFs provided
334 :param nsrId: Id of the NSR
335 :return: copy of vnfd
337 vnfd_RO
= deepcopy(vnfd
)
338 # remove unused by RO configuration, monitoring, scaling and internal keys
339 vnfd_RO
.pop("_id", None)
340 vnfd_RO
.pop("_admin", None)
341 vnfd_RO
.pop("monitoring-param", None)
342 vnfd_RO
.pop("scaling-group-descriptor", None)
343 vnfd_RO
.pop("kdu", None)
344 vnfd_RO
.pop("k8s-cluster", None)
346 vnfd_RO
["id"] = new_id
348 # parse cloud-init or cloud-init-file with the provided variables using Jinja2
349 for vdu
in get_iterable(vnfd_RO
, "vdu"):
350 vdu
.pop("cloud-init-file", None)
351 vdu
.pop("cloud-init", None)
355 def ip_profile_2_RO(ip_profile
):
356 RO_ip_profile
= deepcopy(ip_profile
)
357 if "dns-server" in RO_ip_profile
:
358 if isinstance(RO_ip_profile
["dns-server"], list):
359 RO_ip_profile
["dns-address"] = []
360 for ds
in RO_ip_profile
.pop("dns-server"):
361 RO_ip_profile
["dns-address"].append(ds
['address'])
363 RO_ip_profile
["dns-address"] = RO_ip_profile
.pop("dns-server")
364 if RO_ip_profile
.get("ip-version") == "ipv4":
365 RO_ip_profile
["ip-version"] = "IPv4"
366 if RO_ip_profile
.get("ip-version") == "ipv6":
367 RO_ip_profile
["ip-version"] = "IPv6"
368 if "dhcp-params" in RO_ip_profile
:
369 RO_ip_profile
["dhcp"] = RO_ip_profile
.pop("dhcp-params")
372 def _get_ro_vim_id_for_vim_account(self
, vim_account
):
373 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account
})
374 if db_vim
["_admin"]["operationalState"] != "ENABLED":
375 raise LcmException("VIM={} is not available. operationalState={}".format(
376 vim_account
, db_vim
["_admin"]["operationalState"]))
377 RO_vim_id
= db_vim
["_admin"]["deployed"]["RO"]
380 def get_ro_wim_id_for_wim_account(self
, wim_account
):
381 if isinstance(wim_account
, str):
382 db_wim
= self
.db
.get_one("wim_accounts", {"_id": wim_account
})
383 if db_wim
["_admin"]["operationalState"] != "ENABLED":
384 raise LcmException("WIM={} is not available. operationalState={}".format(
385 wim_account
, db_wim
["_admin"]["operationalState"]))
386 RO_wim_id
= db_wim
["_admin"]["deployed"]["RO-account"]
391 def scale_vnfr(self
, db_vnfr
, vdu_create
=None, vdu_delete
=None, mark_delete
=False):
393 db_vdu_push_list
= []
394 db_update
= {"_admin.modified": time()}
396 for vdu_id
, vdu_count
in vdu_create
.items():
397 vdur
= next((vdur
for vdur
in reversed(db_vnfr
["vdur"]) if vdur
["vdu-id-ref"] == vdu_id
), None)
399 raise LcmException("Error scaling OUT VNFR for {}. There is not any existing vnfr. Scaled to 0?".
402 for count
in range(vdu_count
):
403 vdur_copy
= deepcopy(vdur
)
404 vdur_copy
["status"] = "BUILD"
405 vdur_copy
["status-detailed"] = None
406 vdur_copy
["ip-address"]: None
407 vdur_copy
["_id"] = str(uuid4())
408 vdur_copy
["count-index"] += count
+ 1
409 vdur_copy
["id"] = "{}-{}".format(vdur_copy
["vdu-id-ref"], vdur_copy
["count-index"])
410 vdur_copy
.pop("vim_info", None)
411 for iface
in vdur_copy
["interfaces"]:
412 if iface
.get("fixed-ip"):
413 iface
["ip-address"] = self
.increment_ip_mac(iface
["ip-address"], count
+1)
415 iface
.pop("ip-address", None)
416 if iface
.get("fixed-mac"):
417 iface
["mac-address"] = self
.increment_ip_mac(iface
["mac-address"], count
+1)
419 iface
.pop("mac-address", None)
420 iface
.pop("mgmt_vnf", None) # only first vdu can be managment of vnf
421 db_vdu_push_list
.append(vdur_copy
)
422 # self.logger.debug("scale out, adding vdu={}".format(vdur_copy))
424 for vdu_id
, vdu_count
in vdu_delete
.items():
426 indexes_to_delete
= [iv
[0] for iv
in enumerate(db_vnfr
["vdur"]) if iv
[1]["vdu-id-ref"] == vdu_id
]
427 db_update
.update({"vdur.{}.status".format(i
): "DELETING" for i
in indexes_to_delete
[-vdu_count
:]})
429 # it must be deleted one by one because common.db does not allow otherwise
430 vdus_to_delete
= [v
for v
in reversed(db_vnfr
["vdur"]) if v
["vdu-id-ref"] == vdu_id
]
431 for vdu
in vdus_to_delete
[:vdu_count
]:
432 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, None, pull
={"vdur": {"_id": vdu
["_id"]}})
433 db_push
= {"vdur": db_vdu_push_list
} if db_vdu_push_list
else None
434 self
.db
.set_one("vnfrs", {"_id": db_vnfr
["_id"]}, db_update
, push_list
=db_push
)
435 # modify passed dictionary db_vnfr
436 db_vnfr_
= self
.db
.get_one("vnfrs", {"_id": db_vnfr
["_id"]})
437 db_vnfr
["vdur"] = db_vnfr_
["vdur"]
439 def ns_update_nsr(self
, ns_update_nsr
, db_nsr
, nsr_desc_RO
):
441 Updates database nsr with the RO info for the created vld
442 :param ns_update_nsr: dictionary to be filled with the updated info
443 :param db_nsr: content of db_nsr. This is also modified
444 :param nsr_desc_RO: nsr descriptor from RO
445 :return: Nothing, LcmException is raised on errors
448 for vld_index
, vld
in enumerate(get_iterable(db_nsr
, "vld")):
449 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
450 if vld
["id"] != net_RO
.get("ns_net_osm_id"):
452 vld
["vim-id"] = net_RO
.get("vim_net_id")
453 vld
["name"] = net_RO
.get("vim_name")
454 vld
["status"] = net_RO
.get("status")
455 vld
["status-detailed"] = net_RO
.get("error_msg")
456 ns_update_nsr
["vld.{}".format(vld_index
)] = vld
459 raise LcmException("ns_update_nsr: Not found vld={} at RO info".format(vld
["id"]))
461 def set_vnfr_at_error(self
, db_vnfrs
, error_text
):
463 for db_vnfr
in db_vnfrs
.values():
464 vnfr_update
= {"status": "ERROR"}
465 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
466 if "status" not in vdur
:
467 vdur
["status"] = "ERROR"
468 vnfr_update
["vdur.{}.status".format(vdu_index
)] = "ERROR"
470 vdur
["status-detailed"] = str(error_text
)
471 vnfr_update
["vdur.{}.status-detailed".format(vdu_index
)] = "ERROR"
472 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
473 except DbException
as e
:
474 self
.logger
.error("Cannot update vnf. {}".format(e
))
476 def ns_update_vnfr(self
, db_vnfrs
, nsr_desc_RO
):
478 Updates database vnfr with the RO info, e.g. ip_address, vim_id... Descriptor db_vnfrs is also updated
479 :param db_vnfrs: dictionary with member-vnf-index: vnfr-content
480 :param nsr_desc_RO: nsr descriptor from RO
481 :return: Nothing, LcmException is raised on errors
483 for vnf_index
, db_vnfr
in db_vnfrs
.items():
484 for vnf_RO
in nsr_desc_RO
["vnfs"]:
485 if vnf_RO
["member_vnf_index"] != vnf_index
:
488 if vnf_RO
.get("ip_address"):
489 db_vnfr
["ip-address"] = vnfr_update
["ip-address"] = vnf_RO
["ip_address"].split(";")[0]
490 elif not db_vnfr
.get("ip-address"):
491 if db_vnfr
.get("vdur"): # if not VDUs, there is not ip_address
492 raise LcmExceptionNoMgmtIP("ns member_vnf_index '{}' has no IP address".format(vnf_index
))
494 for vdu_index
, vdur
in enumerate(get_iterable(db_vnfr
, "vdur")):
495 vdur_RO_count_index
= 0
496 if vdur
.get("pdu-type"):
498 for vdur_RO
in get_iterable(vnf_RO
, "vms"):
499 if vdur
["vdu-id-ref"] != vdur_RO
["vdu_osm_id"]:
501 if vdur
["count-index"] != vdur_RO_count_index
:
502 vdur_RO_count_index
+= 1
504 vdur
["vim-id"] = vdur_RO
.get("vim_vm_id")
505 if vdur_RO
.get("ip_address"):
506 vdur
["ip-address"] = vdur_RO
["ip_address"].split(";")[0]
508 vdur
["ip-address"] = None
509 vdur
["vdu-id-ref"] = vdur_RO
.get("vdu_osm_id")
510 vdur
["name"] = vdur_RO
.get("vim_name")
511 vdur
["status"] = vdur_RO
.get("status")
512 vdur
["status-detailed"] = vdur_RO
.get("error_msg")
513 for ifacer
in get_iterable(vdur
, "interfaces"):
514 for interface_RO
in get_iterable(vdur_RO
, "interfaces"):
515 if ifacer
["name"] == interface_RO
.get("internal_name"):
516 ifacer
["ip-address"] = interface_RO
.get("ip_address")
517 ifacer
["mac-address"] = interface_RO
.get("mac_address")
520 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} interface={} "
522 .format(vnf_index
, vdur
["vdu-id-ref"], ifacer
["name"]))
523 vnfr_update
["vdur.{}".format(vdu_index
)] = vdur
526 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vdur={} count_index={} from "
527 "VIM info".format(vnf_index
, vdur
["vdu-id-ref"], vdur
["count-index"]))
529 for vld_index
, vld
in enumerate(get_iterable(db_vnfr
, "vld")):
530 for net_RO
in get_iterable(nsr_desc_RO
, "nets"):
531 if vld
["id"] != net_RO
.get("vnf_net_osm_id"):
533 vld
["vim-id"] = net_RO
.get("vim_net_id")
534 vld
["name"] = net_RO
.get("vim_name")
535 vld
["status"] = net_RO
.get("status")
536 vld
["status-detailed"] = net_RO
.get("error_msg")
537 vnfr_update
["vld.{}".format(vld_index
)] = vld
540 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} vld={} from VIM info".format(
541 vnf_index
, vld
["id"]))
543 self
.update_db_2("vnfrs", db_vnfr
["_id"], vnfr_update
)
547 raise LcmException("ns_update_vnfr: Not found member_vnf_index={} from VIM info".format(vnf_index
))
549 def _get_ns_config_info(self
, nsr_id
):
551 Generates a mapping between vnf,vdu elements and the N2VC id
552 :param nsr_id: id of nsr to get last database _admin.deployed.VCA that contains this list
553 :return: a dictionary with {osm-config-mapping: {}} where its element contains:
554 "<member-vnf-index>": <N2VC-id> for a vnf configuration, or
555 "<member-vnf-index>.<vdu.id>.<vdu replica(0, 1,..)>": <N2VC-id> for a vdu configuration
557 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
558 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
560 ns_config_info
= {"osm-config-mapping": mapping
}
561 for vca
in vca_deployed_list
:
562 if not vca
["member-vnf-index"]:
564 if not vca
["vdu_id"]:
565 mapping
[vca
["member-vnf-index"]] = vca
["application"]
567 mapping
["{}.{}.{}".format(vca
["member-vnf-index"], vca
["vdu_id"], vca
["vdu_count_index"])] =\
569 return ns_config_info
571 async def _instantiate_ng_ro(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
572 n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
):
576 def get_vim_account(vim_account_id
):
578 if vim_account_id
in db_vims
:
579 return db_vims
[vim_account_id
]
580 db_vim
= self
.db
.get_one("vim_accounts", {"_id": vim_account_id
})
581 db_vims
[vim_account_id
] = db_vim
584 # modify target_vld info with instantiation parameters
585 def parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, target_sdn
):
586 if vld_params
.get("ip-profile"):
587 target_vld
["vim_info"][target_vim
]["ip_profile"] = vld_params
["ip-profile"]
588 if vld_params
.get("provider-network"):
589 target_vld
["vim_info"][target_vim
]["provider_network"] = vld_params
["provider-network"]
590 if "sdn-ports" in vld_params
["provider-network"] and target_sdn
:
591 target_vld
["vim_info"][target_sdn
]["sdn-ports"] = vld_params
["provider-network"]["sdn-ports"]
592 if vld_params
.get("wimAccountId"):
593 target_wim
= "wim:{}".format(vld_params
["wimAccountId"])
594 target_vld
["vim_info"][target_wim
] = {}
595 for param
in ("vim-network-name", "vim-network-id"):
596 if vld_params
.get(param
):
597 if isinstance(vld_params
[param
], dict):
598 for vim
, vim_net
in vld_params
[param
].items():
599 other_target_vim
= "vim:" + vim
600 populate_dict(target_vld
["vim_info"], (other_target_vim
, param
.replace("-", "_")), vim_net
)
601 else: # isinstance str
602 target_vld
["vim_info"][target_vim
][param
.replace("-", "_")] = vld_params
[param
]
603 if vld_params
.get("common_id"):
604 target_vld
["common_id"] = vld_params
.get("common_id")
606 nslcmop_id
= db_nslcmop
["_id"]
608 "name": db_nsr
["name"],
611 "image": deepcopy(db_nsr
["image"]),
612 "flavor": deepcopy(db_nsr
["flavor"]),
613 "action_id": nslcmop_id
,
614 "cloud_init_content": {},
616 for image
in target
["image"]:
617 image
["vim_info"] = {}
618 for flavor
in target
["flavor"]:
619 flavor
["vim_info"] = {}
621 if db_nslcmop
.get("lcmOperationType") != "instantiate":
622 # get parameters of instantiation:
623 db_nslcmop_instantiate
= self
.db
.get_list("nslcmops", {"nsInstanceId": db_nslcmop
["nsInstanceId"],
624 "lcmOperationType": "instantiate"})[-1]
625 ns_params
= db_nslcmop_instantiate
.get("operationParams")
627 ns_params
= db_nslcmop
.get("operationParams")
628 ssh_keys_instantiation
= ns_params
.get("ssh_keys") or []
629 ssh_keys_all
= ssh_keys_instantiation
+ (n2vc_key_list
or [])
632 for vld_index
, vld
in enumerate(db_nsr
.get("vld")):
633 target_vim
= "vim:{}".format(ns_params
["vimAccountId"])
637 "mgmt-network": vld
.get("mgmt-network", False),
638 "type": vld
.get("type"),
641 "vim_network_name": vld
.get("vim-network-name"),
642 "vim_account_id": ns_params
["vimAccountId"]
646 # check if this network needs SDN assist
647 if vld
.get("pci-interfaces"):
648 db_vim
= get_vim_account(ns_params
["vimAccountId"])
649 sdnc_id
= db_vim
["config"].get("sdn-controller")
651 sdn_vld
= "nsrs:{}:vld.{}".format(nsr_id
, vld
["id"])
652 target_sdn
= "sdn:{}".format(sdnc_id
)
653 target_vld
["vim_info"][target_sdn
] = {
654 "sdn": True, "target_vim": target_vim
, "vlds": [sdn_vld
], "type": vld
.get("type")}
656 nsd_vnf_profiles
= get_vnf_profiles(nsd
)
657 for nsd_vnf_profile
in nsd_vnf_profiles
:
658 for cp
in nsd_vnf_profile
["virtual-link-connectivity"]:
659 if cp
["virtual-link-profile-id"] == vld
["id"]:
660 cp2target
["member_vnf:{}.{}".format(
661 cp
["constituent-cpd-id"][0]["constituent-base-element-id"],
662 cp
["constituent-cpd-id"][0]["constituent-cpd-id"]
663 )] = "nsrs:{}:vld.{}".format(nsr_id
, vld_index
)
665 # check at nsd descriptor, if there is an ip-profile
667 nsd_vlp
= find_in_list(
668 get_virtual_link_profiles(nsd
),
669 lambda a_link_profile
: a_link_profile
["virtual-link-desc-id"] == vld
["id"])
670 if nsd_vlp
and nsd_vlp
.get("virtual-link-protocol-data") and \
671 nsd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data"):
672 ip_profile_source_data
= nsd_vlp
["virtual-link-protocol-data"]["l3-protocol-data"]
673 ip_profile_dest_data
= {}
674 if "ip-version" in ip_profile_source_data
:
675 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
["ip-version"]
676 if "cidr" in ip_profile_source_data
:
677 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
["cidr"]
678 if "gateway-ip" in ip_profile_source_data
:
679 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
["gateway-ip"]
680 if "dhcp-enabled" in ip_profile_source_data
:
681 ip_profile_dest_data
["dhcp-params"] = {
682 "enabled": ip_profile_source_data
["dhcp-enabled"]
684 vld_params
["ip-profile"] = ip_profile_dest_data
686 # update vld_params with instantiation params
687 vld_instantiation_params
= find_in_list(get_iterable(ns_params
, "vld"),
688 lambda a_vld
: a_vld
["name"] in (vld
["name"], vld
["id"]))
689 if vld_instantiation_params
:
690 vld_params
.update(vld_instantiation_params
)
691 parse_vld_instantiation_params(target_vim
, target_vld
, vld_params
, None)
692 target
["ns"]["vld"].append(target_vld
)
694 for vnfr
in db_vnfrs
.values():
695 vnfd
= find_in_list(db_vnfds
, lambda db_vnf
: db_vnf
["id"] == vnfr
["vnfd-ref"])
696 vnf_params
= find_in_list(get_iterable(ns_params
, "vnf"),
697 lambda a_vnf
: a_vnf
["member-vnf-index"] == vnfr
["member-vnf-index-ref"])
698 target_vnf
= deepcopy(vnfr
)
699 target_vim
= "vim:{}".format(vnfr
["vim-account-id"])
700 for vld
in target_vnf
.get("vld", ()):
701 # check if connected to a ns.vld, to fill target'
702 vnf_cp
= find_in_list(vnfd
.get("int-virtual-link-desc", ()),
703 lambda cpd
: cpd
.get("id") == vld
["id"])
705 ns_cp
= "member_vnf:{}.{}".format(vnfr
["member-vnf-index-ref"], vnf_cp
["id"])
706 if cp2target
.get(ns_cp
):
707 vld
["target"] = cp2target
[ns_cp
]
709 vld
["vim_info"] = {target_vim
: {"vim_network_name": vld
.get("vim-network-name")}}
710 # check if this network needs SDN assist
712 if vld
.get("pci-interfaces"):
713 db_vim
= get_vim_account(vnfr
["vim-account-id"])
714 sdnc_id
= db_vim
["config"].get("sdn-controller")
716 sdn_vld
= "vnfrs:{}:vld.{}".format(target_vnf
["_id"], vld
["id"])
717 target_sdn
= "sdn:{}".format(sdnc_id
)
718 vld
["vim_info"][target_sdn
] = {
719 "sdn": True, "target_vim": target_vim
, "vlds": [sdn_vld
], "type": vld
.get("type")}
721 # check at vnfd descriptor, if there is an ip-profile
723 vnfd_vlp
= find_in_list(
724 get_virtual_link_profiles(vnfd
),
725 lambda a_link_profile
: a_link_profile
["id"] == vld
["id"]
727 if vnfd_vlp
and vnfd_vlp
.get("virtual-link-protocol-data") and \
728 vnfd_vlp
["virtual-link-protocol-data"].get("l3-protocol-data"):
729 ip_profile_source_data
= vnfd_vlp
["virtual-link-protocol-data"]["l3-protocol-data"]
730 ip_profile_dest_data
= {}
731 if "ip-version" in ip_profile_source_data
:
732 ip_profile_dest_data
["ip-version"] = ip_profile_source_data
["ip-version"]
733 if "cidr" in ip_profile_source_data
:
734 ip_profile_dest_data
["subnet-address"] = ip_profile_source_data
["cidr"]
735 if "gateway-ip" in ip_profile_source_data
:
736 ip_profile_dest_data
["gateway-address"] = ip_profile_source_data
["gateway-ip"]
737 if "dhcp-enabled" in ip_profile_source_data
:
738 ip_profile_dest_data
["dhcp-params"] = {
739 "enabled": ip_profile_source_data
["dhcp-enabled"]
742 vld_params
["ip-profile"] = ip_profile_dest_data
743 # update vld_params with instantiation params
745 vld_instantiation_params
= find_in_list(get_iterable(vnf_params
, "internal-vld"),
746 lambda i_vld
: i_vld
["name"] == vld
["id"])
747 if vld_instantiation_params
:
748 vld_params
.update(vld_instantiation_params
)
749 parse_vld_instantiation_params(target_vim
, vld
, vld_params
, target_sdn
)
752 for vdur
in target_vnf
.get("vdur", ()):
753 if vdur
.get("status") == "DELETING" or vdur
.get("pdu-type"):
754 continue # This vdu must not be created
755 vdur
["vim_info"] = {"vim_account_id": vnfr
["vim-account-id"]}
757 self
.logger
.debug("NS > ssh_keys > {}".format(ssh_keys_all
))
760 vdu_configuration
= get_configuration(vnfd
, vdur
["vdu-id-ref"])
761 vnf_configuration
= get_configuration(vnfd
, vnfd
["id"])
762 if vdu_configuration
and vdu_configuration
.get("config-access") and \
763 vdu_configuration
.get("config-access").get("ssh-access"):
764 vdur
["ssh-keys"] = ssh_keys_all
765 vdur
["ssh-access-required"] = vdu_configuration
["config-access"]["ssh-access"]["required"]
766 elif vnf_configuration
and vnf_configuration
.get("config-access") and \
767 vnf_configuration
.get("config-access").get("ssh-access") and \
768 any(iface
.get("mgmt-vnf") for iface
in vdur
["interfaces"]):
769 vdur
["ssh-keys"] = ssh_keys_all
770 vdur
["ssh-access-required"] = vnf_configuration
["config-access"]["ssh-access"]["required"]
771 elif ssh_keys_instantiation
and \
772 find_in_list(vdur
["interfaces"], lambda iface
: iface
.get("mgmt-vnf")):
773 vdur
["ssh-keys"] = ssh_keys_instantiation
775 self
.logger
.debug("NS > vdur > {}".format(vdur
))
777 vdud
= get_vdu(vnfd
, vdur
["vdu-id-ref"])
779 if vdud
.get("cloud-init-file"):
780 vdur
["cloud-init"] = "{}:file:{}".format(vnfd
["_id"], vdud
.get("cloud-init-file"))
781 # read file and put content at target.cloul_init_content. Avoid ng_ro to use shared package system
782 if vdur
["cloud-init"] not in target
["cloud_init_content"]:
783 base_folder
= vnfd
["_admin"]["storage"]
784 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"],
785 vdud
.get("cloud-init-file"))
786 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
787 target
["cloud_init_content"][vdur
["cloud-init"]] = ci_file
.read()
788 elif vdud
.get("cloud-init"):
789 vdur
["cloud-init"] = "{}:vdu:{}".format(vnfd
["_id"], get_vdu_index(vnfd
, vdur
["vdu-id-ref"]))
790 # put content at target.cloul_init_content. Avoid ng_ro read vnfd descriptor
791 target
["cloud_init_content"][vdur
["cloud-init"]] = vdud
["cloud-init"]
792 vdur
["additionalParams"] = vdur
.get("additionalParams") or {}
793 deploy_params_vdu
= self
._format
_additional
_params
(vdur
.get("additionalParams") or {})
794 deploy_params_vdu
["OSM"] = get_osm_params(vnfr
, vdur
["vdu-id-ref"], vdur
["count-index"])
795 vdur
["additionalParams"] = deploy_params_vdu
798 ns_flavor
= target
["flavor"][int(vdur
["ns-flavor-id"])]
799 if target_vim
not in ns_flavor
["vim_info"]:
800 ns_flavor
["vim_info"][target_vim
] = {}
803 # in case alternative images are provided we must check if they should be applied
804 # for the vim_type, modify the vim_type taking into account
805 ns_image_id
= int(vdur
["ns-image-id"])
806 if vdur
.get("alt-image-ids"):
807 db_vim
= get_vim_account(vnfr
["vim-account-id"])
808 vim_type
= db_vim
["vim_type"]
809 for alt_image_id
in vdur
.get("alt-image-ids"):
810 ns_alt_image
= target
["image"][int(alt_image_id
)]
811 if vim_type
== ns_alt_image
.get("vim-type"):
812 # must use alternative image
813 self
.logger
.debug("use alternative image id: {}".format(alt_image_id
))
814 ns_image_id
= alt_image_id
815 vdur
["ns-image-id"] = ns_image_id
817 ns_image
= target
["image"][int(ns_image_id
)]
818 if target_vim
not in ns_image
["vim_info"]:
819 ns_image
["vim_info"][target_vim
] = {}
821 vdur
["vim_info"] = {target_vim
: {}}
822 # instantiation parameters
824 # vdu_instantiation_params = next((v for v in get_iterable(vnf_params, "vdu") if v["id"] ==
826 vdur_list
.append(vdur
)
827 target_vnf
["vdur"] = vdur_list
828 target
["vnf"].append(target_vnf
)
830 desc
= await self
.RO
.deploy(nsr_id
, target
)
831 self
.logger
.debug("RO return > {}".format(desc
))
832 action_id
= desc
["action_id"]
833 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, timeout_ns_deploy
, stage
)
837 "_admin.deployed.RO.operational-status": "running",
838 "detailed-status": " ".join(stage
)
840 # db_nsr["_admin.deployed.RO.detailed-status"] = "Deployed at VIM"
841 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
842 self
._write
_op
_status
(nslcmop_id
, stage
)
843 self
.logger
.debug(logging_text
+ "ns deployed at RO. RO_id={}".format(action_id
))
846 async def _wait_ng_ro(self
, nsr_id
, action_id
, nslcmop_id
=None, start_time
=None, timeout
=600, stage
=None):
847 detailed_status_old
= None
849 start_time
= start_time
or time()
850 while time() <= start_time
+ timeout
:
851 desc_status
= await self
.RO
.status(nsr_id
, action_id
)
852 self
.logger
.debug("Wait NG RO > {}".format(desc_status
))
853 if desc_status
["status"] == "FAILED":
854 raise NgRoException(desc_status
["details"])
855 elif desc_status
["status"] == "BUILD":
857 stage
[2] = "VIM: ({})".format(desc_status
["details"])
858 elif desc_status
["status"] == "DONE":
860 stage
[2] = "Deployed at VIM"
863 assert False, "ROclient.check_ns_status returns unknown {}".format(desc_status
["status"])
864 if stage
and nslcmop_id
and stage
[2] != detailed_status_old
:
865 detailed_status_old
= stage
[2]
866 db_nsr_update
["detailed-status"] = " ".join(stage
)
867 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
868 self
._write
_op
_status
(nslcmop_id
, stage
)
869 await asyncio
.sleep(15, loop
=self
.loop
)
870 else: # timeout_ns_deploy
871 raise NgRoException("Timeout waiting ns to deploy")
873 async def _terminate_ng_ro(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
877 start_deploy
= time()
884 "action_id": nslcmop_id
886 desc
= await self
.RO
.deploy(nsr_id
, target
)
887 action_id
= desc
["action_id"]
888 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = action_id
889 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETING"
890 self
.logger
.debug(logging_text
+ "ns terminate action at RO. action_id={}".format(action_id
))
893 delete_timeout
= 20 * 60 # 20 minutes
894 await self
._wait
_ng
_ro
(nsr_id
, action_id
, nslcmop_id
, start_deploy
, delete_timeout
, stage
)
896 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
897 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
899 await self
.RO
.delete(nsr_id
)
900 except Exception as e
:
901 if isinstance(e
, NgRoException
) and e
.http_code
== 404: # not found
902 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
903 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
904 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
905 self
.logger
.debug(logging_text
+ "RO_action_id={} already deleted".format(action_id
))
906 elif isinstance(e
, NgRoException
) and e
.http_code
== 409: # conflict
907 failed_detail
.append("delete conflict: {}".format(e
))
908 self
.logger
.debug(logging_text
+ "RO_action_id={} delete conflict: {}".format(action_id
, e
))
910 failed_detail
.append("delete error: {}".format(e
))
911 self
.logger
.error(logging_text
+ "RO_action_id={} delete error: {}".format(action_id
, e
))
914 stage
[2] = "Error deleting from VIM"
916 stage
[2] = "Deleted from VIM"
917 db_nsr_update
["detailed-status"] = " ".join(stage
)
918 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
919 self
._write
_op
_status
(nslcmop_id
, stage
)
922 raise LcmException("; ".join(failed_detail
))
925 async def instantiate_RO(self
, logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
, db_vnfds
,
926 n2vc_key_list
, stage
):
929 :param logging_text: preffix text to use at logging
930 :param nsr_id: nsr identity
931 :param nsd: database content of ns descriptor
932 :param db_nsr: database content of ns record
933 :param db_nslcmop: database content of ns operation, in this case, 'instantiate'
935 :param db_vnfds: database content of vnfds, indexed by id (not _id). {id: {vnfd_object}, ...}
936 :param n2vc_key_list: ssh-public-key list to be inserted to management vdus via cloud-init
937 :param stage: list with 3 items: [general stage, tasks, vim_specific]. This task will write over vim_specific
938 :return: None or exception
941 start_deploy
= time()
942 ns_params
= db_nslcmop
.get("operationParams")
943 if ns_params
and ns_params
.get("timeout_ns_deploy"):
944 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
946 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
948 # Check for and optionally request placement optimization. Database will be updated if placement activated
949 stage
[2] = "Waiting for Placement."
950 if await self
._do
_placement
(logging_text
, db_nslcmop
, db_vnfrs
):
951 # in case of placement change ns_params[vimAcountId) if not present at any vnfrs
952 for vnfr
in db_vnfrs
.values():
953 if ns_params
["vimAccountId"] == vnfr
["vim-account-id"]:
956 ns_params
["vimAccountId"] == vnfr
["vim-account-id"]
958 return await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
959 db_vnfds
, n2vc_key_list
, stage
, start_deploy
, timeout_ns_deploy
)
960 except Exception as e
:
961 stage
[2] = "ERROR deploying at VIM"
962 self
.set_vnfr_at_error(db_vnfrs
, str(e
))
963 self
.logger
.error("Error deploying at VIM {}".format(e
),
964 exc_info
=not isinstance(e
, (ROclient
.ROClientException
, LcmException
, DbException
,
968 async def wait_kdu_up(self
, logging_text
, nsr_id
, vnfr_id
, kdu_name
):
970 Wait for kdu to be up, get ip address
971 :param logging_text: prefix use for logging
978 # self.logger.debug(logging_text + "Starting wait_kdu_up")
981 while nb_tries
< 360:
982 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
983 kdur
= next((x
for x
in get_iterable(db_vnfr
, "kdur") if x
.get("kdu-name") == kdu_name
), None)
985 raise LcmException("Not found vnfr_id={}, kdu_name={}".format(vnfr_id
, kdu_name
))
986 if kdur
.get("status"):
987 if kdur
["status"] in ("READY", "ENABLED"):
988 return kdur
.get("ip-address")
990 raise LcmException("target KDU={} is in error state".format(kdu_name
))
992 await asyncio
.sleep(10, loop
=self
.loop
)
994 raise LcmException("Timeout waiting KDU={} instantiated".format(kdu_name
))
996 async def wait_vm_up_insert_key_ro(self
, logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
, pub_key
=None, user
=None):
998 Wait for ip addres at RO, and optionally, insert public key in virtual machine
999 :param logging_text: prefix use for logging
1004 :param pub_key: public ssh key to inject, None to skip
1005 :param user: user to apply the public ssh key
1009 self
.logger
.debug(logging_text
+ "Starting wait_vm_up_insert_key_ro")
1013 target_vdu_id
= None
1019 if ro_retries
>= 360: # 1 hour
1020 raise LcmException("Not found _admin.deployed.RO.nsr_id for nsr_id: {}".format(nsr_id
))
1022 await asyncio
.sleep(10, loop
=self
.loop
)
1025 if not target_vdu_id
:
1026 db_vnfr
= self
.db
.get_one("vnfrs", {"_id": vnfr_id
})
1028 if not vdu_id
: # for the VNF case
1029 if db_vnfr
.get("status") == "ERROR":
1030 raise LcmException("Cannot inject ssh-key because target VNF is in error state")
1031 ip_address
= db_vnfr
.get("ip-address")
1034 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur") if x
.get("ip-address") == ip_address
), None)
1036 vdur
= next((x
for x
in get_iterable(db_vnfr
, "vdur")
1037 if x
.get("vdu-id-ref") == vdu_id
and x
.get("count-index") == vdu_index
), None)
1039 if not vdur
and len(db_vnfr
.get("vdur", ())) == 1: # If only one, this should be the target vdu
1040 vdur
= db_vnfr
["vdur"][0]
1042 raise LcmException("Not found vnfr_id={}, vdu_id={}, vdu_index={}".format(vnfr_id
, vdu_id
,
1044 # New generation RO stores information at "vim_info"
1047 if vdur
.get("vim_info"):
1048 target_vim
= next(t
for t
in vdur
["vim_info"]) # there should be only one key
1049 ng_ro_status
= vdur
["vim_info"][target_vim
].get("vim_status")
1050 if vdur
.get("pdu-type") or vdur
.get("status") == "ACTIVE" or ng_ro_status
== "ACTIVE":
1051 ip_address
= vdur
.get("ip-address")
1054 target_vdu_id
= vdur
["vdu-id-ref"]
1055 elif vdur
.get("status") == "ERROR" or ng_ro_status
== "ERROR":
1056 raise LcmException("Cannot inject ssh-key because target VM is in error state")
1058 if not target_vdu_id
:
1061 # inject public key into machine
1062 if pub_key
and user
:
1063 self
.logger
.debug(logging_text
+ "Inserting RO key")
1064 self
.logger
.debug("SSH > PubKey > {}".format(pub_key
))
1065 if vdur
.get("pdu-type"):
1066 self
.logger
.error(logging_text
+ "Cannot inject ssh-ky to a PDU")
1069 ro_vm_id
= "{}-{}".format(db_vnfr
["member-vnf-index-ref"], target_vdu_id
) # TODO add vdu_index
1071 target
= {"action": {"action": "inject_ssh_key", "key": pub_key
, "user": user
},
1072 "vnf": [{"_id": vnfr_id
, "vdur": [{"id": vdur
["id"]}]}],
1074 desc
= await self
.RO
.deploy(nsr_id
, target
)
1075 action_id
= desc
["action_id"]
1076 await self
._wait
_ng
_ro
(nsr_id
, action_id
, timeout
=600)
1079 # wait until NS is deployed at RO
1081 db_nsrs
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1082 ro_nsr_id
= deep_get(db_nsrs
, ("_admin", "deployed", "RO", "nsr_id"))
1085 result_dict
= await self
.RO
.create_action(
1087 item_id_name
=ro_nsr_id
,
1088 descriptor
={"add_public_key": pub_key
, "vms": [ro_vm_id
], "user": user
}
1090 # result_dict contains the format {VM-id: {vim_result: 200, description: text}}
1091 if not result_dict
or not isinstance(result_dict
, dict):
1092 raise LcmException("Unknown response from RO when injecting key")
1093 for result
in result_dict
.values():
1094 if result
.get("vim_result") == 200:
1097 raise ROclient
.ROClientException("error injecting key: {}".format(
1098 result
.get("description")))
1100 except NgRoException
as e
:
1101 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1102 except ROclient
.ROClientException
as e
:
1104 self
.logger
.debug(logging_text
+ "error injecting key: {}. Retrying until {} seconds".
1108 raise LcmException("Reaching max tries injecting key. Error: {}".format(e
))
1114 async def _wait_dependent_n2vc(self
, nsr_id
, vca_deployed_list
, vca_index
):
1116 Wait until dependent VCA deployments have been finished. NS wait for VNFs and VDUs. VNFs for VDUs
1118 my_vca
= vca_deployed_list
[vca_index
]
1119 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1120 # vdu or kdu: no dependencies
1124 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1125 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1126 configuration_status_list
= db_nsr
["configurationStatus"]
1127 for index
, vca_deployed
in enumerate(configuration_status_list
):
1128 if index
== vca_index
:
1131 if not my_vca
.get("member-vnf-index") or \
1132 (vca_deployed
.get("member-vnf-index") == my_vca
.get("member-vnf-index")):
1133 internal_status
= configuration_status_list
[index
].get("status")
1134 if internal_status
== 'READY':
1136 elif internal_status
== 'BROKEN':
1137 raise LcmException("Configuration aborted because dependent charm/s has failed")
1141 # no dependencies, return
1143 await asyncio
.sleep(10)
1146 raise LcmException("Configuration aborted because dependent charm/s timeout")
1148 async def instantiate_N2VC(self
, logging_text
, vca_index
, nsi_id
, db_nsr
, db_vnfr
, vdu_id
, kdu_name
, vdu_index
,
1149 config_descriptor
, deploy_params
, base_folder
, nslcmop_id
, stage
, vca_type
, vca_name
,
1150 ee_config_descriptor
):
1151 nsr_id
= db_nsr
["_id"]
1152 db_update_entry
= "_admin.deployed.VCA.{}.".format(vca_index
)
1153 vca_deployed_list
= db_nsr
["_admin"]["deployed"]["VCA"]
1154 vca_deployed
= db_nsr
["_admin"]["deployed"]["VCA"][vca_index
]
1155 osm_config
= {"osm": {"ns_id": db_nsr
["_id"]}}
1157 'collection': 'nsrs',
1158 'filter': {'_id': nsr_id
},
1159 'path': db_update_entry
1165 element_under_configuration
= nsr_id
1169 vnfr_id
= db_vnfr
["_id"]
1170 osm_config
["osm"]["vnf_id"] = vnfr_id
1172 namespace
= "{nsi}.{ns}".format(
1173 nsi
=nsi_id
if nsi_id
else "",
1177 element_type
= 'VNF'
1178 element_under_configuration
= vnfr_id
1179 namespace
+= ".{}-{}".format(vnfr_id
, vdu_index
or 0)
1181 namespace
+= ".{}-{}".format(vdu_id
, vdu_index
or 0)
1182 element_type
= 'VDU'
1183 element_under_configuration
= "{}-{}".format(vdu_id
, vdu_index
or 0)
1184 osm_config
["osm"]["vdu_id"] = vdu_id
1186 namespace
+= ".{}.{}".format(kdu_name
, vdu_index
or 0)
1187 element_type
= 'KDU'
1188 element_under_configuration
= kdu_name
1189 osm_config
["osm"]["kdu_name"] = kdu_name
1192 artifact_path
= "{}/{}/{}/{}".format(
1193 base_folder
["folder"],
1194 base_folder
["pkg-dir"],
1195 "charms" if vca_type
in ("native_charm", "lxc_proxy_charm", "k8s_proxy_charm") else "helm-charts",
1199 self
.logger
.debug("Artifact path > {}".format(artifact_path
))
1201 # get initial_config_primitive_list that applies to this element
1202 initial_config_primitive_list
= config_descriptor
.get('initial-config-primitive')
1204 self
.logger
.debug("Initial config primitive list > {}".format(initial_config_primitive_list
))
1206 # add config if not present for NS charm
1207 ee_descriptor_id
= ee_config_descriptor
.get("id")
1208 self
.logger
.debug("EE Descriptor > {}".format(ee_descriptor_id
))
1209 initial_config_primitive_list
= get_ee_sorted_initial_config_primitive_list(initial_config_primitive_list
,
1210 vca_deployed
, ee_descriptor_id
)
1212 self
.logger
.debug("Initial config primitive list #2 > {}".format(initial_config_primitive_list
))
1213 # n2vc_redesign STEP 3.1
1214 # find old ee_id if exists
1215 ee_id
= vca_deployed
.get("ee_id")
1218 deep_get(db_vnfr
, ("vim-account-id",)) or
1219 deep_get(deploy_params
, ("OSM", "vim_account_id"))
1221 vca_cloud
, vca_cloud_credential
= self
.get_vca_cloud_and_credentials(vim_account_id
)
1222 vca_k8s_cloud
, vca_k8s_cloud_credential
= self
.get_vca_k8s_cloud_and_credentials(vim_account_id
)
1223 # create or register execution environment in VCA
1224 if vca_type
in ("lxc_proxy_charm", "k8s_proxy_charm", "helm", "helm-v3"):
1226 self
._write
_configuration
_status
(
1228 vca_index
=vca_index
,
1230 element_under_configuration
=element_under_configuration
,
1231 element_type
=element_type
1234 step
= "create execution environment"
1235 self
.logger
.debug(logging_text
+ step
)
1239 if vca_type
== "k8s_proxy_charm":
1240 ee_id
= await self
.vca_map
[vca_type
].install_k8s_proxy_charm(
1241 charm_name
=artifact_path
[artifact_path
.rfind("/") + 1:],
1242 namespace
=namespace
,
1243 artifact_path
=artifact_path
,
1245 cloud_name
=vca_k8s_cloud
,
1246 credential_name
=vca_k8s_cloud_credential
,
1248 elif vca_type
== "helm" or vca_type
== "helm-v3":
1249 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1250 namespace
=namespace
,
1254 artifact_path
=artifact_path
,
1258 ee_id
, credentials
= await self
.vca_map
[vca_type
].create_execution_environment(
1259 namespace
=namespace
,
1262 cloud_name
=vca_cloud
,
1263 credential_name
=vca_cloud_credential
,
1266 elif vca_type
== "native_charm":
1267 step
= "Waiting to VM being up and getting IP address"
1268 self
.logger
.debug(logging_text
+ step
)
1269 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
, vdu_index
,
1270 user
=None, pub_key
=None)
1271 credentials
= {"hostname": rw_mgmt_ip
}
1273 username
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1274 # TODO remove this when changes on IM regarding config-access:ssh-access:default-user were
1275 # merged. Meanwhile let's get username from initial-config-primitive
1276 if not username
and initial_config_primitive_list
:
1277 for config_primitive
in initial_config_primitive_list
:
1278 for param
in config_primitive
.get("parameter", ()):
1279 if param
["name"] == "ssh-username":
1280 username
= param
["value"]
1283 raise LcmException("Cannot determine the username neither with 'initial-config-primitive' nor with "
1284 "'config-access.ssh-access.default-user'")
1285 credentials
["username"] = username
1286 # n2vc_redesign STEP 3.2
1288 self
._write
_configuration
_status
(
1290 vca_index
=vca_index
,
1291 status
='REGISTERING',
1292 element_under_configuration
=element_under_configuration
,
1293 element_type
=element_type
1296 step
= "register execution environment {}".format(credentials
)
1297 self
.logger
.debug(logging_text
+ step
)
1298 ee_id
= await self
.vca_map
[vca_type
].register_execution_environment(
1299 credentials
=credentials
,
1300 namespace
=namespace
,
1302 cloud_name
=vca_cloud
,
1303 credential_name
=vca_cloud_credential
,
1306 # for compatibility with MON/POL modules, the need model and application name at database
1307 # TODO ask MON/POL if needed to not assuming anymore the format "model_name.application_name"
1308 ee_id_parts
= ee_id
.split('.')
1309 db_nsr_update
= {db_update_entry
+ "ee_id": ee_id
}
1310 if len(ee_id_parts
) >= 2:
1311 model_name
= ee_id_parts
[0]
1312 application_name
= ee_id_parts
[1]
1313 db_nsr_update
[db_update_entry
+ "model"] = model_name
1314 db_nsr_update
[db_update_entry
+ "application"] = application_name
1316 # n2vc_redesign STEP 3.3
1317 step
= "Install configuration Software"
1319 self
._write
_configuration
_status
(
1321 vca_index
=vca_index
,
1322 status
='INSTALLING SW',
1323 element_under_configuration
=element_under_configuration
,
1324 element_type
=element_type
,
1325 other_update
=db_nsr_update
1328 # TODO check if already done
1329 self
.logger
.debug(logging_text
+ step
)
1331 if vca_type
== "native_charm":
1332 config_primitive
= next((p
for p
in initial_config_primitive_list
if p
["name"] == "config"), None)
1333 if config_primitive
:
1334 config
= self
._map
_primitive
_params
(
1340 if vca_type
== "lxc_proxy_charm":
1341 if element_type
== "NS":
1342 num_units
= db_nsr
.get("config-units") or 1
1343 elif element_type
== "VNF":
1344 num_units
= db_vnfr
.get("config-units") or 1
1345 elif element_type
== "VDU":
1346 for v
in db_vnfr
["vdur"]:
1347 if vdu_id
== v
["vdu-id-ref"]:
1348 num_units
= v
.get("config-units") or 1
1350 if vca_type
!= "k8s_proxy_charm":
1351 await self
.vca_map
[vca_type
].install_configuration_sw(
1353 artifact_path
=artifact_path
,
1356 num_units
=num_units
,
1359 # write in db flag of configuration_sw already installed
1360 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "config_sw_installed": True})
1362 # add relations for this VCA (wait for other peers related with this VCA)
1363 await self
._add
_vca
_relations
(logging_text
=logging_text
, nsr_id
=nsr_id
,
1364 vca_index
=vca_index
, vca_type
=vca_type
)
1366 # if SSH access is required, then get execution environment SSH public
1367 # if native charm we have waited already to VM be UP
1368 if vca_type
in ("k8s_proxy_charm", "lxc_proxy_charm", "helm", "helm-v3"):
1371 # self.logger.debug("get ssh key block")
1372 if deep_get(config_descriptor
, ("config-access", "ssh-access", "required")):
1373 # self.logger.debug("ssh key needed")
1374 # Needed to inject a ssh key
1375 user
= deep_get(config_descriptor
, ("config-access", "ssh-access", "default-user"))
1376 step
= "Install configuration Software, getting public ssh key"
1377 pub_key
= await self
.vca_map
[vca_type
].get_ee_ssh_public__key(ee_id
=ee_id
, db_dict
=db_dict
)
1379 step
= "Insert public key into VM user={} ssh_key={}".format(user
, pub_key
)
1381 # self.logger.debug("no need to get ssh key")
1382 step
= "Waiting to VM being up and getting IP address"
1383 self
.logger
.debug(logging_text
+ step
)
1385 # n2vc_redesign STEP 5.1
1386 # wait for RO (ip-address) Insert pub_key into VM
1389 rw_mgmt_ip
= await self
.wait_kdu_up(logging_text
, nsr_id
, vnfr_id
, kdu_name
)
1391 rw_mgmt_ip
= await self
.wait_vm_up_insert_key_ro(logging_text
, nsr_id
, vnfr_id
, vdu_id
,
1392 vdu_index
, user
=user
, pub_key
=pub_key
)
1394 rw_mgmt_ip
= None # This is for a NS configuration
1396 self
.logger
.debug(logging_text
+ ' VM_ip_address={}'.format(rw_mgmt_ip
))
1398 # store rw_mgmt_ip in deploy params for later replacement
1399 deploy_params
["rw_mgmt_ip"] = rw_mgmt_ip
1401 # n2vc_redesign STEP 6 Execute initial config primitive
1402 step
= 'execute initial config primitive'
1404 # wait for dependent primitives execution (NS -> VNF -> VDU)
1405 if initial_config_primitive_list
:
1406 await self
._wait
_dependent
_n
2vc
(nsr_id
, vca_deployed_list
, vca_index
)
1408 # stage, in function of element type: vdu, kdu, vnf or ns
1409 my_vca
= vca_deployed_list
[vca_index
]
1410 if my_vca
.get("vdu_id") or my_vca
.get("kdu_name"):
1412 stage
[0] = 'Stage 3/5: running Day-1 primitives for VDU.'
1413 elif my_vca
.get("member-vnf-index"):
1415 stage
[0] = 'Stage 4/5: running Day-1 primitives for VNF.'
1418 stage
[0] = 'Stage 5/5: running Day-1 primitives for NS.'
1420 self
._write
_configuration
_status
(
1422 vca_index
=vca_index
,
1423 status
='EXECUTING PRIMITIVE'
1426 self
._write
_op
_status
(
1431 check_if_terminated_needed
= True
1432 for initial_config_primitive
in initial_config_primitive_list
:
1433 # adding information on the vca_deployed if it is a NS execution environment
1434 if not vca_deployed
["member-vnf-index"]:
1435 deploy_params
["ns_config_info"] = json
.dumps(self
._get
_ns
_config
_info
(nsr_id
))
1436 # TODO check if already done
1437 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, deploy_params
)
1439 step
= "execute primitive '{}' params '{}'".format(initial_config_primitive
["name"], primitive_params_
)
1440 self
.logger
.debug(logging_text
+ step
)
1441 await self
.vca_map
[vca_type
].exec_primitive(
1443 primitive_name
=initial_config_primitive
["name"],
1444 params_dict
=primitive_params_
,
1447 # Once some primitive has been exec, check and write at db if it needs to exec terminated primitives
1448 if check_if_terminated_needed
:
1449 if config_descriptor
.get('terminate-config-primitive'):
1450 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "needed_terminate": True})
1451 check_if_terminated_needed
= False
1453 # TODO register in database that primitive is done
1455 # STEP 7 Configure metrics
1456 if vca_type
== "helm" or vca_type
== "helm-v3":
1457 prometheus_jobs
= await self
.add_prometheus_metrics(
1459 artifact_path
=artifact_path
,
1460 ee_config_descriptor
=ee_config_descriptor
,
1463 target_ip
=rw_mgmt_ip
,
1466 self
.update_db_2("nsrs", nsr_id
, {db_update_entry
+ "prometheus_jobs": prometheus_jobs
})
1468 step
= "instantiated at VCA"
1469 self
.logger
.debug(logging_text
+ step
)
1471 self
._write
_configuration
_status
(
1473 vca_index
=vca_index
,
1477 except Exception as e
: # TODO not use Exception but N2VC exception
1478 # self.update_db_2("nsrs", nsr_id, {db_update_entry + "instantiation": "FAILED"})
1479 if not isinstance(e
, (DbException
, N2VCException
, LcmException
, asyncio
.CancelledError
)):
1480 self
.logger
.error("Exception while {} : {}".format(step
, e
), exc_info
=True)
1481 self
._write
_configuration
_status
(
1483 vca_index
=vca_index
,
1486 raise LcmException("{} {}".format(step
, e
)) from e
1488 def _write_ns_status(self
, nsr_id
: str, ns_state
: str, current_operation
: str, current_operation_id
: str,
1489 error_description
: str = None, error_detail
: str = None, other_update
: dict = None):
1491 Update db_nsr fields.
1494 :param current_operation:
1495 :param current_operation_id:
1496 :param error_description:
1497 :param error_detail:
1498 :param other_update: Other required changes at database if provided, will be cleared
1502 db_dict
= other_update
or {}
1503 db_dict
["_admin.nslcmop"] = current_operation_id
# for backward compatibility
1504 db_dict
["_admin.current-operation"] = current_operation_id
1505 db_dict
["_admin.operation-type"] = current_operation
if current_operation
!= "IDLE" else None
1506 db_dict
["currentOperation"] = current_operation
1507 db_dict
["currentOperationID"] = current_operation_id
1508 db_dict
["errorDescription"] = error_description
1509 db_dict
["errorDetail"] = error_detail
1512 db_dict
["nsState"] = ns_state
1513 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1514 except DbException
as e
:
1515 self
.logger
.warn('Error writing NS status, ns={}: {}'.format(nsr_id
, e
))
1517 def _write_op_status(self
, op_id
: str, stage
: list = None, error_message
: str = None, queuePosition
: int = 0,
1518 operation_state
: str = None, other_update
: dict = None):
1520 db_dict
= other_update
or {}
1521 db_dict
['queuePosition'] = queuePosition
1522 if isinstance(stage
, list):
1523 db_dict
['stage'] = stage
[0]
1524 db_dict
['detailed-status'] = " ".join(stage
)
1525 elif stage
is not None:
1526 db_dict
['stage'] = str(stage
)
1528 if error_message
is not None:
1529 db_dict
['errorMessage'] = error_message
1530 if operation_state
is not None:
1531 db_dict
['operationState'] = operation_state
1532 db_dict
["statusEnteredTime"] = time()
1533 self
.update_db_2("nslcmops", op_id
, db_dict
)
1534 except DbException
as e
:
1535 self
.logger
.warn('Error writing OPERATION status for op_id: {} -> {}'.format(op_id
, e
))
1537 def _write_all_config_status(self
, db_nsr
: dict, status
: str):
1539 nsr_id
= db_nsr
["_id"]
1540 # configurationStatus
1541 config_status
= db_nsr
.get('configurationStatus')
1543 db_nsr_update
= {"configurationStatus.{}.status".format(index
): status
for index
, v
in
1544 enumerate(config_status
) if v
}
1546 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1548 except DbException
as e
:
1549 self
.logger
.warn('Error writing all configuration status, ns={}: {}'.format(nsr_id
, e
))
1551 def _write_configuration_status(self
, nsr_id
: str, vca_index
: int, status
: str = None,
1552 element_under_configuration
: str = None, element_type
: str = None,
1553 other_update
: dict = None):
1555 # self.logger.debug('_write_configuration_status(): vca_index={}, status={}'
1556 # .format(vca_index, status))
1559 db_path
= 'configurationStatus.{}.'.format(vca_index
)
1560 db_dict
= other_update
or {}
1562 db_dict
[db_path
+ 'status'] = status
1563 if element_under_configuration
:
1564 db_dict
[db_path
+ 'elementUnderConfiguration'] = element_under_configuration
1566 db_dict
[db_path
+ 'elementType'] = element_type
1567 self
.update_db_2("nsrs", nsr_id
, db_dict
)
1568 except DbException
as e
:
1569 self
.logger
.warn('Error writing configuration status={}, ns={}, vca_index={}: {}'
1570 .format(status
, nsr_id
, vca_index
, e
))
1572 async def _do_placement(self
, logging_text
, db_nslcmop
, db_vnfrs
):
1574 Check and computes the placement, (vim account where to deploy). If it is decided by an external tool, it
1575 sends the request via kafka and wait until the result is wrote at database (nslcmops _admin.plca).
1576 Database is used because the result can be obtained from a different LCM worker in case of HA.
1577 :param logging_text: contains the prefix for logging, with the ns and nslcmop identifiers
1578 :param db_nslcmop: database content of nslcmop
1579 :param db_vnfrs: database content of vnfrs, indexed by member-vnf-index.
1580 :return: True if some modification is done. Modifies database vnfrs and parameter db_vnfr with the
1581 computed 'vim-account-id'
1584 nslcmop_id
= db_nslcmop
['_id']
1585 placement_engine
= deep_get(db_nslcmop
, ('operationParams', 'placement-engine'))
1586 if placement_engine
== "PLA":
1587 self
.logger
.debug(logging_text
+ "Invoke and wait for placement optimization")
1588 await self
.msg
.aiowrite("pla", "get_placement", {'nslcmopId': nslcmop_id
}, loop
=self
.loop
)
1589 db_poll_interval
= 5
1590 wait
= db_poll_interval
* 10
1592 while not pla_result
and wait
>= 0:
1593 await asyncio
.sleep(db_poll_interval
)
1594 wait
-= db_poll_interval
1595 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1596 pla_result
= deep_get(db_nslcmop
, ('_admin', 'pla'))
1599 raise LcmException("Placement timeout for nslcmopId={}".format(nslcmop_id
))
1601 for pla_vnf
in pla_result
['vnf']:
1602 vnfr
= db_vnfrs
.get(pla_vnf
['member-vnf-index'])
1603 if not pla_vnf
.get('vimAccountId') or not vnfr
:
1606 self
.db
.set_one("vnfrs", {"_id": vnfr
["_id"]}, {"vim-account-id": pla_vnf
['vimAccountId']})
1608 vnfr
["vim-account-id"] = pla_vnf
['vimAccountId']
1611 def update_nsrs_with_pla_result(self
, params
):
1613 nslcmop_id
= deep_get(params
, ('placement', 'nslcmopId'))
1614 self
.update_db_2("nslcmops", nslcmop_id
, {"_admin.pla": params
.get('placement')})
1615 except Exception as e
:
1616 self
.logger
.warn('Update failed for nslcmop_id={}:{}'.format(nslcmop_id
, e
))
1618 async def instantiate(self
, nsr_id
, nslcmop_id
):
1621 :param nsr_id: ns instance to deploy
1622 :param nslcmop_id: operation to run
1626 # Try to lock HA task here
1627 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
1628 if not task_is_locked_by_me
:
1629 self
.logger
.debug('instantiate() task is not locked by me, ns={}'.format(nsr_id
))
1632 logging_text
= "Task ns={} instantiate={} ".format(nsr_id
, nslcmop_id
)
1633 self
.logger
.debug(logging_text
+ "Enter")
1635 # get all needed from database
1637 # database nsrs record
1640 # database nslcmops record
1643 # update operation on nsrs
1645 # update operation on nslcmops
1646 db_nslcmop_update
= {}
1648 nslcmop_operation_state
= None
1649 db_vnfrs
= {} # vnf's info indexed by member-index
1651 tasks_dict_info
= {} # from task to info text
1654 stage
= ['Stage 1/5: preparation of the environment.', "Waiting for previous operations to terminate.", ""]
1655 # ^ stage, step, VIM progress
1657 # wait for any previous tasks in process
1658 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
1660 # STEP 0: Reading database (nslcmops, nsrs, nsds, vnfrs, vnfds)
1661 stage
[1] = "Reading from database."
1662 # nsState="BUILDING", currentOperation="INSTANTIATING", currentOperationID=nslcmop_id
1663 db_nsr_update
["detailed-status"] = "creating"
1664 db_nsr_update
["operational-status"] = "init"
1665 self
._write
_ns
_status
(
1667 ns_state
="BUILDING",
1668 current_operation
="INSTANTIATING",
1669 current_operation_id
=nslcmop_id
,
1670 other_update
=db_nsr_update
1672 self
._write
_op
_status
(
1678 # read from db: operation
1679 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
1680 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
1681 ns_params
= db_nslcmop
.get("operationParams")
1682 if ns_params
and ns_params
.get("timeout_ns_deploy"):
1683 timeout_ns_deploy
= ns_params
["timeout_ns_deploy"]
1685 timeout_ns_deploy
= self
.timeout
.get("ns_deploy", self
.timeout_ns_deploy
)
1688 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
1689 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
1690 stage
[1] = "Getting nsd={} from db.".format(db_nsr
["nsd-id"])
1691 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
1692 self
.fs
.sync(db_nsr
["nsd-id"])
1694 # nsr_name = db_nsr["name"] # TODO short-name??
1696 # read from db: vnf's of this ns
1697 stage
[1] = "Getting vnfrs from db."
1698 self
.logger
.debug(logging_text
+ stage
[1])
1699 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
1701 # read from db: vnfd's for every vnf
1702 db_vnfds
= [] # every vnfd data
1704 # for each vnf in ns, read vnfd
1705 for vnfr
in db_vnfrs_list
:
1706 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
1707 vnfd_id
= vnfr
["vnfd-id"]
1708 vnfd_ref
= vnfr
["vnfd-ref"]
1709 self
.fs
.sync(vnfd_id
)
1711 # if we haven't this vnfd, read it from db
1712 if vnfd_id
not in db_vnfds
:
1714 stage
[1] = "Getting vnfd={} id='{}' from db.".format(vnfd_id
, vnfd_ref
)
1715 self
.logger
.debug(logging_text
+ stage
[1])
1716 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
1719 db_vnfds
.append(vnfd
)
1721 # Get or generates the _admin.deployed.VCA list
1722 vca_deployed_list
= None
1723 if db_nsr
["_admin"].get("deployed"):
1724 vca_deployed_list
= db_nsr
["_admin"]["deployed"].get("VCA")
1725 if vca_deployed_list
is None:
1726 vca_deployed_list
= []
1727 configuration_status_list
= []
1728 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1729 db_nsr_update
["configurationStatus"] = configuration_status_list
1730 # add _admin.deployed.VCA to db_nsr dictionary, value=vca_deployed_list
1731 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1732 elif isinstance(vca_deployed_list
, dict):
1733 # maintain backward compatibility. Change a dict to list at database
1734 vca_deployed_list
= list(vca_deployed_list
.values())
1735 db_nsr_update
["_admin.deployed.VCA"] = vca_deployed_list
1736 populate_dict(db_nsr
, ("_admin", "deployed", "VCA"), vca_deployed_list
)
1738 if not isinstance(deep_get(db_nsr
, ("_admin", "deployed", "RO", "vnfd")), list):
1739 populate_dict(db_nsr
, ("_admin", "deployed", "RO", "vnfd"), [])
1740 db_nsr_update
["_admin.deployed.RO.vnfd"] = []
1742 # set state to INSTANTIATED. When instantiated NBI will not delete directly
1743 db_nsr_update
["_admin.nsState"] = "INSTANTIATED"
1744 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
1745 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "INSTANTIATED"})
1747 # n2vc_redesign STEP 2 Deploy Network Scenario
1748 stage
[0] = 'Stage 2/5: deployment of KDUs, VMs and execution environments.'
1749 self
._write
_op
_status
(
1754 stage
[1] = "Deploying KDUs."
1755 # self.logger.debug(logging_text + "Before deploy_kdus")
1756 # Call to deploy_kdus in case exists the "vdu:kdu" param
1757 await self
.deploy_kdus(
1758 logging_text
=logging_text
,
1760 nslcmop_id
=nslcmop_id
,
1763 task_instantiation_info
=tasks_dict_info
,
1766 stage
[1] = "Getting VCA public key."
1767 # n2vc_redesign STEP 1 Get VCA public ssh-key
1768 # feature 1429. Add n2vc public key to needed VMs
1769 n2vc_key
= self
.n2vc
.get_public_key()
1770 n2vc_key_list
= [n2vc_key
]
1771 if self
.vca_config
.get("public_key"):
1772 n2vc_key_list
.append(self
.vca_config
["public_key"])
1774 stage
[1] = "Deploying NS at VIM."
1775 task_ro
= asyncio
.ensure_future(
1776 self
.instantiate_RO(
1777 logging_text
=logging_text
,
1781 db_nslcmop
=db_nslcmop
,
1784 n2vc_key_list
=n2vc_key_list
,
1788 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_RO", task_ro
)
1789 tasks_dict_info
[task_ro
] = "Deploying at VIM"
1791 # n2vc_redesign STEP 3 to 6 Deploy N2VC
1792 stage
[1] = "Deploying Execution Environments."
1793 self
.logger
.debug(logging_text
+ stage
[1])
1795 nsi_id
= None # TODO put nsi_id when this nsr belongs to a NSI
1796 for vnf_profile
in get_vnf_profiles(nsd
):
1797 vnfd_id
= vnf_profile
["vnfd-id"]
1798 vnfd
= find_in_list(db_vnfds
, lambda a_vnf
: a_vnf
["id"] == vnfd_id
)
1799 member_vnf_index
= str(vnf_profile
["id"])
1800 db_vnfr
= db_vnfrs
[member_vnf_index
]
1801 base_folder
= vnfd
["_admin"]["storage"]
1807 # Get additional parameters
1808 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
1809 if db_vnfr
.get("additionalParamsForVnf"):
1810 deploy_params
.update(parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy()))
1812 descriptor_config
= get_configuration(vnfd
, vnfd
["id"])
1813 if descriptor_config
:
1815 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
1818 nslcmop_id
=nslcmop_id
,
1824 member_vnf_index
=member_vnf_index
,
1825 vdu_index
=vdu_index
,
1827 deploy_params
=deploy_params
,
1828 descriptor_config
=descriptor_config
,
1829 base_folder
=base_folder
,
1830 task_instantiation_info
=tasks_dict_info
,
1834 # Deploy charms for each VDU that supports one.
1835 for vdud
in get_vdu_list(vnfd
):
1837 descriptor_config
= get_configuration(vnfd
, vdu_id
)
1838 vdur
= find_in_list(db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
)
1840 if vdur
.get("additionalParams"):
1841 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
1843 deploy_params_vdu
= deploy_params
1844 deploy_params_vdu
["OSM"] = get_osm_params(db_vnfr
, vdu_id
, vdu_count_index
=0)
1845 vdud_count
= get_vdu_profile(vnfd
, vdu_id
).get("max-number-of-instances", 1)
1847 self
.logger
.debug("VDUD > {}".format(vdud
))
1848 self
.logger
.debug("Descriptor config > {}".format(descriptor_config
))
1849 if descriptor_config
:
1852 for vdu_index
in range(vdud_count
):
1853 # TODO vnfr_params["rw_mgmt_ip"] = vdur["ip-address"]
1855 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
1856 member_vnf_index
, vdu_id
, vdu_index
),
1859 nslcmop_id
=nslcmop_id
,
1865 member_vnf_index
=member_vnf_index
,
1866 vdu_index
=vdu_index
,
1868 deploy_params
=deploy_params_vdu
,
1869 descriptor_config
=descriptor_config
,
1870 base_folder
=base_folder
,
1871 task_instantiation_info
=tasks_dict_info
,
1874 for kdud
in get_kdu_list(vnfd
):
1875 kdu_name
= kdud
["name"]
1876 descriptor_config
= get_configuration(vnfd
, kdu_name
)
1877 if descriptor_config
:
1881 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
1882 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
1883 if kdur
.get("additionalParams"):
1884 deploy_params_kdu
= parse_yaml_strings(kdur
["additionalParams"])
1887 logging_text
=logging_text
,
1890 nslcmop_id
=nslcmop_id
,
1896 member_vnf_index
=member_vnf_index
,
1897 vdu_index
=vdu_index
,
1899 deploy_params
=deploy_params_kdu
,
1900 descriptor_config
=descriptor_config
,
1901 base_folder
=base_folder
,
1902 task_instantiation_info
=tasks_dict_info
,
1906 # Check if this NS has a charm configuration
1907 descriptor_config
= nsd
.get("ns-configuration")
1908 if descriptor_config
and descriptor_config
.get("juju"):
1911 member_vnf_index
= None
1917 # Get additional parameters
1918 deploy_params
= {"OSM": {"vim_account_id": ns_params
["vimAccountId"]}}
1919 if db_nsr
.get("additionalParamsForNs"):
1920 deploy_params
.update(parse_yaml_strings(db_nsr
["additionalParamsForNs"].copy()))
1921 base_folder
= nsd
["_admin"]["storage"]
1923 logging_text
=logging_text
,
1926 nslcmop_id
=nslcmop_id
,
1932 member_vnf_index
=member_vnf_index
,
1933 vdu_index
=vdu_index
,
1935 deploy_params
=deploy_params
,
1936 descriptor_config
=descriptor_config
,
1937 base_folder
=base_folder
,
1938 task_instantiation_info
=tasks_dict_info
,
1942 # rest of staff will be done at finally
1944 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
1945 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
))
1947 except asyncio
.CancelledError
:
1948 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
1949 exc
= "Operation was cancelled"
1950 except Exception as e
:
1951 exc
= traceback
.format_exc()
1952 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
1955 error_list
.append(str(exc
))
1957 # wait for pending tasks
1959 stage
[1] = "Waiting for instantiate pending tasks."
1960 self
.logger
.debug(logging_text
+ stage
[1])
1961 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_deploy
,
1962 stage
, nslcmop_id
, nsr_id
=nsr_id
)
1963 stage
[1] = stage
[2] = ""
1964 except asyncio
.CancelledError
:
1965 error_list
.append("Cancelled")
1966 # TODO cancel all tasks
1967 except Exception as exc
:
1968 error_list
.append(str(exc
))
1970 # update operation-status
1971 db_nsr_update
["operational-status"] = "running"
1972 # let's begin with VCA 'configured' status (later we can change it)
1973 db_nsr_update
["config-status"] = "configured"
1974 for task
, task_name
in tasks_dict_info
.items():
1975 if not task
.done() or task
.cancelled() or task
.exception():
1976 if task_name
.startswith(self
.task_name_deploy_vca
):
1977 # A N2VC task is pending
1978 db_nsr_update
["config-status"] = "failed"
1980 # RO or KDU task is pending
1981 db_nsr_update
["operational-status"] = "failed"
1983 # update status at database
1985 error_detail
= ". ".join(error_list
)
1986 self
.logger
.error(logging_text
+ error_detail
)
1987 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
1988 error_description_nsr
= 'Operation: INSTANTIATING.{}, {}'.format(nslcmop_id
, stage
[0])
1990 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
1991 db_nslcmop_update
["detailed-status"] = error_detail
1992 nslcmop_operation_state
= "FAILED"
1996 error_description_nsr
= error_description_nslcmop
= None
1998 db_nsr_update
["detailed-status"] = "Done"
1999 db_nslcmop_update
["detailed-status"] = "Done"
2000 nslcmop_operation_state
= "COMPLETED"
2003 self
._write
_ns
_status
(
2006 current_operation
="IDLE",
2007 current_operation_id
=None,
2008 error_description
=error_description_nsr
,
2009 error_detail
=error_detail
,
2010 other_update
=db_nsr_update
2012 self
._write
_op
_status
(
2015 error_message
=error_description_nslcmop
,
2016 operation_state
=nslcmop_operation_state
,
2017 other_update
=db_nslcmop_update
,
2020 if nslcmop_operation_state
:
2022 await self
.msg
.aiowrite("ns", "instantiated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
2023 "operationState": nslcmop_operation_state
},
2025 except Exception as e
:
2026 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
2028 self
.logger
.debug(logging_text
+ "Exit")
2029 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_instantiate")
2031 async def _add_vca_relations(self
, logging_text
, nsr_id
, vca_index
: int,
2032 timeout
: int = 3600, vca_type
: str = None) -> bool:
2035 # 1. find all relations for this VCA
2036 # 2. wait for other peers related
2040 vca_type
= vca_type
or "lxc_proxy_charm"
2042 # STEP 1: find all relations for this VCA
2045 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2046 nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
2049 my_vca
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))[vca_index
]
2051 # read all ns-configuration relations
2052 ns_relations
= list()
2053 db_ns_relations
= deep_get(nsd
, ('ns-configuration', 'relation'))
2055 for r
in db_ns_relations
:
2056 # check if this VCA is in the relation
2057 if my_vca
.get('member-vnf-index') in\
2058 (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2059 ns_relations
.append(r
)
2061 # read all vnf-configuration relations
2062 vnf_relations
= list()
2063 db_vnfd_list
= db_nsr
.get('vnfd-id')
2065 for vnfd
in db_vnfd_list
:
2066 db_vnf_relations
= None
2067 db_vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd
})
2068 db_vnf_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
2069 if db_vnf_configuration
:
2070 db_vnf_relations
= db_vnf_configuration
.get("relation", [])
2071 if db_vnf_relations
:
2072 for r
in db_vnf_relations
:
2073 # check if this VCA is in the relation
2074 if my_vca
.get('vdu_id') in (r
.get('entities')[0].get('id'), r
.get('entities')[1].get('id')):
2075 vnf_relations
.append(r
)
2077 # if no relations, terminate
2078 if not ns_relations
and not vnf_relations
:
2079 self
.logger
.debug(logging_text
+ ' No relations')
2082 self
.logger
.debug(logging_text
+ ' adding relations\n {}\n {}'.format(ns_relations
, vnf_relations
))
2089 if now
- start
>= timeout
:
2090 self
.logger
.error(logging_text
+ ' : timeout adding relations')
2093 # reload nsr from database (we need to update record: _admin.deloyed.VCA)
2094 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
2096 # for each defined NS relation, find the VCA's related
2097 for r
in ns_relations
.copy():
2098 from_vca_ee_id
= None
2100 from_vca_endpoint
= None
2101 to_vca_endpoint
= None
2102 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2103 for vca
in vca_list
:
2104 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id') \
2105 and vca
.get('config_sw_installed'):
2106 from_vca_ee_id
= vca
.get('ee_id')
2107 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2108 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id') \
2109 and vca
.get('config_sw_installed'):
2110 to_vca_ee_id
= vca
.get('ee_id')
2111 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2112 if from_vca_ee_id
and to_vca_ee_id
:
2114 await self
.vca_map
[vca_type
].add_relation(
2115 ee_id_1
=from_vca_ee_id
,
2116 ee_id_2
=to_vca_ee_id
,
2117 endpoint_1
=from_vca_endpoint
,
2118 endpoint_2
=to_vca_endpoint
)
2119 # remove entry from relations list
2120 ns_relations
.remove(r
)
2122 # check failed peers
2124 vca_status_list
= db_nsr
.get('configurationStatus')
2126 for i
in range(len(vca_list
)):
2128 vca_status
= vca_status_list
[i
]
2129 if vca
.get('member-vnf-index') == r
.get('entities')[0].get('id'):
2130 if vca_status
.get('status') == 'BROKEN':
2131 # peer broken: remove relation from list
2132 ns_relations
.remove(r
)
2133 if vca
.get('member-vnf-index') == r
.get('entities')[1].get('id'):
2134 if vca_status
.get('status') == 'BROKEN':
2135 # peer broken: remove relation from list
2136 ns_relations
.remove(r
)
2141 # for each defined VNF relation, find the VCA's related
2142 for r
in vnf_relations
.copy():
2143 from_vca_ee_id
= None
2145 from_vca_endpoint
= None
2146 to_vca_endpoint
= None
2147 vca_list
= deep_get(db_nsr
, ('_admin', 'deployed', 'VCA'))
2148 for vca
in vca_list
:
2149 key_to_check
= "vdu_id"
2150 if vca
.get("vdu_id") is None:
2151 key_to_check
= "vnfd_id"
2152 if vca
.get(key_to_check
) == r
.get('entities')[0].get('id') and vca
.get('config_sw_installed'):
2153 from_vca_ee_id
= vca
.get('ee_id')
2154 from_vca_endpoint
= r
.get('entities')[0].get('endpoint')
2155 if vca
.get(key_to_check
) == r
.get('entities')[1].get('id') and vca
.get('config_sw_installed'):
2156 to_vca_ee_id
= vca
.get('ee_id')
2157 to_vca_endpoint
= r
.get('entities')[1].get('endpoint')
2158 if from_vca_ee_id
and to_vca_ee_id
:
2160 await self
.vca_map
[vca_type
].add_relation(
2161 ee_id_1
=from_vca_ee_id
,
2162 ee_id_2
=to_vca_ee_id
,
2163 endpoint_1
=from_vca_endpoint
,
2164 endpoint_2
=to_vca_endpoint
)
2165 # remove entry from relations list
2166 vnf_relations
.remove(r
)
2168 # check failed peers
2170 vca_status_list
= db_nsr
.get('configurationStatus')
2172 for i
in range(len(vca_list
)):
2174 vca_status
= vca_status_list
[i
]
2175 if vca
.get('vdu_id') == r
.get('entities')[0].get('id'):
2176 if vca_status
.get('status') == 'BROKEN':
2177 # peer broken: remove relation from list
2178 vnf_relations
.remove(r
)
2179 if vca
.get('vdu_id') == r
.get('entities')[1].get('id'):
2180 if vca_status
.get('status') == 'BROKEN':
2181 # peer broken: remove relation from list
2182 vnf_relations
.remove(r
)
2188 await asyncio
.sleep(5.0)
2190 if not ns_relations
and not vnf_relations
:
2191 self
.logger
.debug('Relations added')
2196 except Exception as e
:
2197 self
.logger
.warn(logging_text
+ ' ERROR adding relations: {}'.format(e
))
2200 async def _install_kdu(self
, nsr_id
: str, nsr_db_path
: str, vnfr_data
: dict, kdu_index
: int, kdud
: dict,
2201 vnfd
: dict, k8s_instance_info
: dict, k8params
: dict = None, timeout
: int = 600):
2204 k8sclustertype
= k8s_instance_info
["k8scluster-type"]
2206 db_dict_install
= {"collection": "nsrs",
2207 "filter": {"_id": nsr_id
},
2208 "path": nsr_db_path
}
2210 if k8s_instance_info
.get("kdu-deployment-name"):
2211 kdu_instance
= k8s_instance_info
.get("kdu-deployment-name")
2213 kdu_instance
= self
.k8scluster_map
[k8sclustertype
].generate_kdu_instance_name(
2214 db_dict
=db_dict_install
,
2215 kdu_model
=k8s_instance_info
["kdu-model"],
2216 kdu_name
=k8s_instance_info
["kdu-name"],
2218 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2219 await self
.k8scluster_map
[k8sclustertype
].install(
2220 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2221 kdu_model
=k8s_instance_info
["kdu-model"],
2224 db_dict
=db_dict_install
,
2226 kdu_name
=k8s_instance_info
["kdu-name"],
2227 namespace
=k8s_instance_info
["namespace"],
2228 kdu_instance
=kdu_instance
,
2230 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".kdu-instance": kdu_instance
})
2232 # Obtain services to obtain management service ip
2233 services
= await self
.k8scluster_map
[k8sclustertype
].get_services(
2234 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2235 kdu_instance
=kdu_instance
,
2236 namespace
=k8s_instance_info
["namespace"])
2238 # Obtain management service info (if exists)
2239 vnfr_update_dict
= {}
2240 kdu_config
= get_configuration(vnfd
, kdud
["name"])
2242 target_ee_list
= kdu_config
.get("execution-environment-list", [])
2247 vnfr_update_dict
["kdur.{}.services".format(kdu_index
)] = services
2248 mgmt_services
= [service
for service
in kdud
.get("service", []) if service
.get("mgmt-service")]
2249 for mgmt_service
in mgmt_services
:
2250 for service
in services
:
2251 if service
["name"].startswith(mgmt_service
["name"]):
2252 # Mgmt service found, Obtain service ip
2253 ip
= service
.get("external_ip", service
.get("cluster_ip"))
2254 if isinstance(ip
, list) and len(ip
) == 1:
2257 vnfr_update_dict
["kdur.{}.ip-address".format(kdu_index
)] = ip
2259 # Check if must update also mgmt ip at the vnf
2260 service_external_cp
= mgmt_service
.get("external-connection-point-ref")
2261 if service_external_cp
:
2262 if deep_get(vnfd
, ("mgmt-interface", "cp")) == service_external_cp
:
2263 vnfr_update_dict
["ip-address"] = ip
2267 lambda ee
: ee
.get("external-connection-point-ref", "") == service_external_cp
2269 vnfr_update_dict
["kdur.{}.ip-address".format(kdu_index
)] = ip
2272 self
.logger
.warn("Mgmt service name: {} not found".format(mgmt_service
["name"]))
2274 vnfr_update_dict
["kdur.{}.status".format(kdu_index
)] = "READY"
2275 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), vnfr_update_dict
)
2277 kdu_config
= get_configuration(vnfd
, k8s_instance_info
["kdu-name"])
2278 if kdu_config
and kdu_config
.get("initial-config-primitive") and \
2279 get_juju_ee_ref(vnfd
, k8s_instance_info
["kdu-name"]) is None:
2280 initial_config_primitive_list
= kdu_config
.get("initial-config-primitive")
2281 initial_config_primitive_list
.sort(key
=lambda val
: int(val
["seq"]))
2283 for initial_config_primitive
in initial_config_primitive_list
:
2284 primitive_params_
= self
._map
_primitive
_params
(initial_config_primitive
, {}, {})
2286 await asyncio
.wait_for(
2287 self
.k8scluster_map
[k8sclustertype
].exec_primitive(
2288 cluster_uuid
=k8s_instance_info
["k8scluster-uuid"],
2289 kdu_instance
=kdu_instance
,
2290 primitive_name
=initial_config_primitive
["name"],
2291 params
=primitive_params_
, db_dict
={}),
2294 except Exception as e
:
2295 # Prepare update db with error and raise exception
2297 self
.update_db_2("nsrs", nsr_id
, {nsr_db_path
+ ".detailed-status": str(e
)})
2298 self
.update_db_2("vnfrs", vnfr_data
.get("_id"), {"kdur.{}.status".format(kdu_index
): "ERROR"})
2300 # ignore to keep original exception
2302 # reraise original error
2307 async def deploy_kdus(self
, logging_text
, nsr_id
, nslcmop_id
, db_vnfrs
, db_vnfds
, task_instantiation_info
):
2308 # Launch kdus if present in the descriptor
2310 k8scluster_id_2_uuic
= {"helm-chart-v3": {}, "helm-chart": {}, "juju-bundle": {}}
2312 async def _get_cluster_id(cluster_id
, cluster_type
):
2313 nonlocal k8scluster_id_2_uuic
2314 if cluster_id
in k8scluster_id_2_uuic
[cluster_type
]:
2315 return k8scluster_id_2_uuic
[cluster_type
][cluster_id
]
2317 # check if K8scluster is creating and wait look if previous tasks in process
2318 task_name
, task_dependency
= self
.lcm_tasks
.lookfor_related("k8scluster", cluster_id
)
2320 text
= "Waiting for related tasks '{}' on k8scluster {} to be completed".format(task_name
, cluster_id
)
2321 self
.logger
.debug(logging_text
+ text
)
2322 await asyncio
.wait(task_dependency
, timeout
=3600)
2324 db_k8scluster
= self
.db
.get_one("k8sclusters", {"_id": cluster_id
}, fail_on_empty
=False)
2325 if not db_k8scluster
:
2326 raise LcmException("K8s cluster {} cannot be found".format(cluster_id
))
2328 k8s_id
= deep_get(db_k8scluster
, ("_admin", cluster_type
, "id"))
2330 if cluster_type
== "helm-chart-v3":
2332 # backward compatibility for existing clusters that have not been initialized for helm v3
2333 k8s_credentials
= yaml
.safe_dump(db_k8scluster
.get("credentials"))
2334 k8s_id
, uninstall_sw
= await self
.k8sclusterhelm3
.init_env(k8s_credentials
,
2335 reuse_cluster_uuid
=cluster_id
)
2336 db_k8scluster_update
= {}
2337 db_k8scluster_update
["_admin.helm-chart-v3.error_msg"] = None
2338 db_k8scluster_update
["_admin.helm-chart-v3.id"] = k8s_id
2339 db_k8scluster_update
["_admin.helm-chart-v3.created"] = uninstall_sw
2340 db_k8scluster_update
["_admin.helm-chart-v3.operationalState"] = "ENABLED"
2341 self
.update_db_2("k8sclusters", cluster_id
, db_k8scluster_update
)
2342 except Exception as e
:
2343 self
.logger
.error(logging_text
+ "error initializing helm-v3 cluster: {}".format(str(e
)))
2344 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".format(cluster_id
,
2347 raise LcmException("K8s cluster '{}' has not been initialized for '{}'".
2348 format(cluster_id
, cluster_type
))
2349 k8scluster_id_2_uuic
[cluster_type
][cluster_id
] = k8s_id
2352 logging_text
+= "Deploy kdus: "
2355 db_nsr_update
= {"_admin.deployed.K8s": []}
2356 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2359 updated_cluster_list
= []
2360 updated_v3_cluster_list
= []
2362 for vnfr_data
in db_vnfrs
.values():
2363 for kdu_index
, kdur
in enumerate(get_iterable(vnfr_data
, "kdur")):
2364 # Step 0: Prepare and set parameters
2365 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
2366 vnfd_id
= vnfr_data
.get('vnfd-id')
2367 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
)
2368 kdud
= next(kdud
for kdud
in vnfd_with_id
["kdu"] if kdud
["name"] == kdur
["kdu-name"])
2369 namespace
= kdur
.get("k8s-namespace")
2370 kdu_deployment_name
= kdur
.get("kdu-deployment-name")
2371 if kdur
.get("helm-chart"):
2372 kdumodel
= kdur
["helm-chart"]
2373 # Default version: helm3, if helm-version is v2 assign v2
2374 k8sclustertype
= "helm-chart-v3"
2375 self
.logger
.debug("kdur: {}".format(kdur
))
2376 if kdur
.get("helm-version") and kdur
.get("helm-version") == "v2":
2377 k8sclustertype
= "helm-chart"
2378 elif kdur
.get("juju-bundle"):
2379 kdumodel
= kdur
["juju-bundle"]
2380 k8sclustertype
= "juju-bundle"
2382 raise LcmException("kdu type for kdu='{}.{}' is neither helm-chart nor "
2383 "juju-bundle. Maybe an old NBI version is running".
2384 format(vnfr_data
["member-vnf-index-ref"], kdur
["kdu-name"]))
2385 # check if kdumodel is a file and exists
2387 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnfd
: vnfd
["_id"] == vnfd_id
)
2388 storage
= deep_get(vnfd_with_id
, ('_admin', 'storage'))
2389 if storage
and storage
.get('pkg-dir'): # may be not present if vnfd has not artifacts
2390 # path format: /vnfdid/pkkdir/helm-charts|juju-bundles/kdumodel
2391 filename
= '{}/{}/{}s/{}'.format(storage
["folder"], storage
["pkg-dir"], k8sclustertype
,
2393 if self
.fs
.file_exists(filename
, mode
='file') or self
.fs
.file_exists(filename
, mode
='dir'):
2394 kdumodel
= self
.fs
.path
+ filename
2395 except (asyncio
.TimeoutError
, asyncio
.CancelledError
):
2397 except Exception: # it is not a file
2400 k8s_cluster_id
= kdur
["k8s-cluster"]["id"]
2401 step
= "Synchronize repos for k8s cluster '{}'".format(k8s_cluster_id
)
2402 cluster_uuid
= await _get_cluster_id(k8s_cluster_id
, k8sclustertype
)
2405 if (k8sclustertype
== "helm-chart" and cluster_uuid
not in updated_cluster_list
)\
2406 or (k8sclustertype
== "helm-chart-v3" and cluster_uuid
not in updated_v3_cluster_list
):
2407 del_repo_list
, added_repo_dict
= await asyncio
.ensure_future(
2408 self
.k8scluster_map
[k8sclustertype
].synchronize_repos(cluster_uuid
=cluster_uuid
))
2409 if del_repo_list
or added_repo_dict
:
2410 if k8sclustertype
== "helm-chart":
2411 unset
= {'_admin.helm_charts_added.' + item
: None for item
in del_repo_list
}
2412 updated
= {'_admin.helm_charts_added.' +
2413 item
: name
for item
, name
in added_repo_dict
.items()}
2414 updated_cluster_list
.append(cluster_uuid
)
2415 elif k8sclustertype
== "helm-chart-v3":
2416 unset
= {'_admin.helm_charts_v3_added.' + item
: None for item
in del_repo_list
}
2417 updated
= {'_admin.helm_charts_v3_added.' +
2418 item
: name
for item
, name
in added_repo_dict
.items()}
2419 updated_v3_cluster_list
.append(cluster_uuid
)
2420 self
.logger
.debug(logging_text
+ "repos synchronized on k8s cluster "
2421 "'{}' to_delete: {}, to_add: {}".
2422 format(k8s_cluster_id
, del_repo_list
, added_repo_dict
))
2423 self
.db
.set_one("k8sclusters", {"_id": k8s_cluster_id
}, updated
, unset
=unset
)
2426 step
= "Instantiating KDU {}.{} in k8s cluster {}".format(vnfr_data
["member-vnf-index-ref"],
2427 kdur
["kdu-name"], k8s_cluster_id
)
2428 k8s_instance_info
= {"kdu-instance": None,
2429 "k8scluster-uuid": cluster_uuid
,
2430 "k8scluster-type": k8sclustertype
,
2431 "member-vnf-index": vnfr_data
["member-vnf-index-ref"],
2432 "kdu-name": kdur
["kdu-name"],
2433 "kdu-model": kdumodel
,
2434 "namespace": namespace
,
2435 "kdu-deployment-name": kdu_deployment_name
}
2436 db_path
= "_admin.deployed.K8s.{}".format(index
)
2437 db_nsr_update
[db_path
] = k8s_instance_info
2438 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2439 vnfd_with_id
= find_in_list(db_vnfds
, lambda vnf
: vnf
["_id"] == vnfd_id
)
2440 task
= asyncio
.ensure_future(
2441 self
._install
_kdu
(nsr_id
, db_path
, vnfr_data
, kdu_index
, kdud
, vnfd_with_id
,
2442 k8s_instance_info
, k8params
=desc_params
, timeout
=600))
2443 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_KDU-{}".format(index
), task
)
2444 task_instantiation_info
[task
] = "Deploying KDU {}".format(kdur
["kdu-name"])
2448 except (LcmException
, asyncio
.CancelledError
):
2450 except Exception as e
:
2451 msg
= "Exception {} while {}: {}".format(type(e
).__name
__, step
, e
)
2452 if isinstance(e
, (N2VCException
, DbException
)):
2453 self
.logger
.error(logging_text
+ msg
)
2455 self
.logger
.critical(logging_text
+ msg
, exc_info
=True)
2456 raise LcmException(msg
)
2459 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2461 def _deploy_n2vc(self
, logging_text
, db_nsr
, db_vnfr
, nslcmop_id
, nsr_id
, nsi_id
, vnfd_id
, vdu_id
,
2462 kdu_name
, member_vnf_index
, vdu_index
, vdu_name
, deploy_params
, descriptor_config
,
2463 base_folder
, task_instantiation_info
, stage
):
2464 # launch instantiate_N2VC in a asyncio task and register task object
2465 # Look where information of this charm is at database <nsrs>._admin.deployed.VCA
2466 # if not found, create one entry and update database
2467 # fill db_nsr._admin.deployed.VCA.<index>
2469 self
.logger
.debug(logging_text
+ "_deploy_n2vc vnfd_id={}, vdu_id={}".format(vnfd_id
, vdu_id
))
2470 if "execution-environment-list" in descriptor_config
:
2471 ee_list
= descriptor_config
.get("execution-environment-list", [])
2472 elif "juju" in descriptor_config
:
2473 ee_list
= [descriptor_config
] # ns charms
2474 else: # other types as script are not supported
2477 for ee_item
in ee_list
:
2478 self
.logger
.debug(logging_text
+ "_deploy_n2vc ee_item juju={}, helm={}".format(ee_item
.get('juju'),
2479 ee_item
.get("helm-chart")))
2480 ee_descriptor_id
= ee_item
.get("id")
2481 if ee_item
.get("juju"):
2482 vca_name
= ee_item
['juju'].get('charm')
2483 vca_type
= "lxc_proxy_charm" if ee_item
['juju'].get('charm') is not None else "native_charm"
2484 if ee_item
['juju'].get('cloud') == "k8s":
2485 vca_type
= "k8s_proxy_charm"
2486 elif ee_item
['juju'].get('proxy') is False:
2487 vca_type
= "native_charm"
2488 elif ee_item
.get("helm-chart"):
2489 vca_name
= ee_item
['helm-chart']
2490 if ee_item
.get("helm-version") and ee_item
.get("helm-version") == "v2":
2493 vca_type
= "helm-v3"
2495 self
.logger
.debug(logging_text
+ "skipping non juju neither charm configuration")
2499 for vca_index
, vca_deployed
in enumerate(db_nsr
["_admin"]["deployed"]["VCA"]):
2500 if not vca_deployed
:
2502 if vca_deployed
.get("member-vnf-index") == member_vnf_index
and \
2503 vca_deployed
.get("vdu_id") == vdu_id
and \
2504 vca_deployed
.get("kdu_name") == kdu_name
and \
2505 vca_deployed
.get("vdu_count_index", 0) == vdu_index
and \
2506 vca_deployed
.get("ee_descriptor_id") == ee_descriptor_id
:
2509 # not found, create one.
2510 target
= "ns" if not member_vnf_index
else "vnf/{}".format(member_vnf_index
)
2512 target
+= "/vdu/{}/{}".format(vdu_id
, vdu_index
or 0)
2514 target
+= "/kdu/{}".format(kdu_name
)
2516 "target_element": target
,
2517 # ^ target_element will replace member-vnf-index, kdu_name, vdu_id ... in a single string
2518 "member-vnf-index": member_vnf_index
,
2520 "kdu_name": kdu_name
,
2521 "vdu_count_index": vdu_index
,
2522 "operational-status": "init", # TODO revise
2523 "detailed-status": "", # TODO revise
2524 "step": "initial-deploy", # TODO revise
2526 "vdu_name": vdu_name
,
2528 "ee_descriptor_id": ee_descriptor_id
2532 # create VCA and configurationStatus in db
2534 "_admin.deployed.VCA.{}".format(vca_index
): vca_deployed
,
2535 "configurationStatus.{}".format(vca_index
): dict()
2537 self
.update_db_2("nsrs", nsr_id
, db_dict
)
2539 db_nsr
["_admin"]["deployed"]["VCA"].append(vca_deployed
)
2541 self
.logger
.debug("N2VC > NSR_ID > {}".format(nsr_id
))
2542 self
.logger
.debug("N2VC > DB_NSR > {}".format(db_nsr
))
2543 self
.logger
.debug("N2VC > VCA_DEPLOYED > {}".format(vca_deployed
))
2546 task_n2vc
= asyncio
.ensure_future(
2547 self
.instantiate_N2VC(
2548 logging_text
=logging_text
,
2549 vca_index
=vca_index
,
2555 vdu_index
=vdu_index
,
2556 deploy_params
=deploy_params
,
2557 config_descriptor
=descriptor_config
,
2558 base_folder
=base_folder
,
2559 nslcmop_id
=nslcmop_id
,
2563 ee_config_descriptor
=ee_item
2566 self
.lcm_tasks
.register("ns", nsr_id
, nslcmop_id
, "instantiate_N2VC-{}".format(vca_index
), task_n2vc
)
2567 task_instantiation_info
[task_n2vc
] = self
.task_name_deploy_vca
+ " {}.{}".format(
2568 member_vnf_index
or "", vdu_id
or "")
2571 def _create_nslcmop(nsr_id
, operation
, params
):
2573 Creates a ns-lcm-opp content to be stored at database.
2574 :param nsr_id: internal id of the instance
2575 :param operation: instantiate, terminate, scale, action, ...
2576 :param params: user parameters for the operation
2577 :return: dictionary following SOL005 format
2579 # Raise exception if invalid arguments
2580 if not (nsr_id
and operation
and params
):
2582 "Parameters 'nsr_id', 'operation' and 'params' needed to create primitive not provided")
2588 # COMPLETED,PARTIALLY_COMPLETED,FAILED_TEMP,FAILED,ROLLING_BACK,ROLLED_BACK
2589 "operationState": "PROCESSING",
2590 "statusEnteredTime": now
,
2591 "nsInstanceId": nsr_id
,
2592 "lcmOperationType": operation
,
2594 "isAutomaticInvocation": False,
2595 "operationParams": params
,
2596 "isCancelPending": False,
2598 "self": "/osm/nslcm/v1/ns_lcm_op_occs/" + _id
,
2599 "nsInstance": "/osm/nslcm/v1/ns_instances/" + nsr_id
,
2604 def _format_additional_params(self
, params
):
2605 params
= params
or {}
2606 for key
, value
in params
.items():
2607 if str(value
).startswith("!!yaml "):
2608 params
[key
] = yaml
.safe_load(value
[7:])
2611 def _get_terminate_primitive_params(self
, seq
, vnf_index
):
2612 primitive
= seq
.get('name')
2613 primitive_params
= {}
2615 "member_vnf_index": vnf_index
,
2616 "primitive": primitive
,
2617 "primitive_params": primitive_params
,
2620 return self
._map
_primitive
_params
(seq
, params
, desc_params
)
2624 def _retry_or_skip_suboperation(self
, db_nslcmop
, op_index
):
2625 op
= deep_get(db_nslcmop
, ('_admin', 'operations'), [])[op_index
]
2626 if op
.get('operationState') == 'COMPLETED':
2627 # b. Skip sub-operation
2628 # _ns_execute_primitive() or RO.create_action() will NOT be executed
2629 return self
.SUBOPERATION_STATUS_SKIP
2631 # c. retry executing sub-operation
2632 # The sub-operation exists, and operationState != 'COMPLETED'
2633 # Update operationState = 'PROCESSING' to indicate a retry.
2634 operationState
= 'PROCESSING'
2635 detailed_status
= 'In progress'
2636 self
._update
_suboperation
_status
(
2637 db_nslcmop
, op_index
, operationState
, detailed_status
)
2638 # Return the sub-operation index
2639 # _ns_execute_primitive() or RO.create_action() will be called from scale()
2640 # with arguments extracted from the sub-operation
2643 # Find a sub-operation where all keys in a matching dictionary must match
2644 # Returns the index of the matching sub-operation, or SUBOPERATION_STATUS_NOT_FOUND if no match
2645 def _find_suboperation(self
, db_nslcmop
, match
):
2646 if db_nslcmop
and match
:
2647 op_list
= db_nslcmop
.get('_admin', {}).get('operations', [])
2648 for i
, op
in enumerate(op_list
):
2649 if all(op
.get(k
) == match
[k
] for k
in match
):
2651 return self
.SUBOPERATION_STATUS_NOT_FOUND
2653 # Update status for a sub-operation given its index
2654 def _update_suboperation_status(self
, db_nslcmop
, op_index
, operationState
, detailed_status
):
2655 # Update DB for HA tasks
2656 q_filter
= {'_id': db_nslcmop
['_id']}
2657 update_dict
= {'_admin.operations.{}.operationState'.format(op_index
): operationState
,
2658 '_admin.operations.{}.detailed-status'.format(op_index
): detailed_status
}
2659 self
.db
.set_one("nslcmops",
2661 update_dict
=update_dict
,
2662 fail_on_empty
=False)
2664 # Add sub-operation, return the index of the added sub-operation
2665 # Optionally, set operationState, detailed-status, and operationType
2666 # Status and type are currently set for 'scale' sub-operations:
2667 # 'operationState' : 'PROCESSING' | 'COMPLETED' | 'FAILED'
2668 # 'detailed-status' : status message
2669 # 'operationType': may be any type, in the case of scaling: 'PRE-SCALE' | 'POST-SCALE'
2670 # Status and operation type are currently only used for 'scale', but NOT for 'terminate' sub-operations.
2671 def _add_suboperation(self
, db_nslcmop
, vnf_index
, vdu_id
, vdu_count_index
, vdu_name
, primitive
,
2672 mapped_primitive_params
, operationState
=None, detailed_status
=None, operationType
=None,
2673 RO_nsr_id
=None, RO_scaling_info
=None):
2675 return self
.SUBOPERATION_STATUS_NOT_FOUND
2676 # Get the "_admin.operations" list, if it exists
2677 db_nslcmop_admin
= db_nslcmop
.get('_admin', {})
2678 op_list
= db_nslcmop_admin
.get('operations')
2679 # Create or append to the "_admin.operations" list
2680 new_op
= {'member_vnf_index': vnf_index
,
2682 'vdu_count_index': vdu_count_index
,
2683 'primitive': primitive
,
2684 'primitive_params': mapped_primitive_params
}
2686 new_op
['operationState'] = operationState
2688 new_op
['detailed-status'] = detailed_status
2690 new_op
['lcmOperationType'] = operationType
2692 new_op
['RO_nsr_id'] = RO_nsr_id
2694 new_op
['RO_scaling_info'] = RO_scaling_info
2696 # No existing operations, create key 'operations' with current operation as first list element
2697 db_nslcmop_admin
.update({'operations': [new_op
]})
2698 op_list
= db_nslcmop_admin
.get('operations')
2700 # Existing operations, append operation to list
2701 op_list
.append(new_op
)
2703 db_nslcmop_update
= {'_admin.operations': op_list
}
2704 self
.update_db_2("nslcmops", db_nslcmop
['_id'], db_nslcmop_update
)
2705 op_index
= len(op_list
) - 1
2708 # Helper methods for scale() sub-operations
2710 # pre-scale/post-scale:
2711 # Check for 3 different cases:
2712 # a. New: First time execution, return SUBOPERATION_STATUS_NEW
2713 # b. Skip: Existing sub-operation exists, operationState == 'COMPLETED', return SUBOPERATION_STATUS_SKIP
2714 # c. retry: Existing sub-operation exists, operationState != 'COMPLETED', return op_index to re-execute
2715 def _check_or_add_scale_suboperation(self
, db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
,
2716 operationType
, RO_nsr_id
=None, RO_scaling_info
=None):
2717 # Find this sub-operation
2718 if RO_nsr_id
and RO_scaling_info
:
2719 operationType
= 'SCALE-RO'
2721 'member_vnf_index': vnf_index
,
2722 'RO_nsr_id': RO_nsr_id
,
2723 'RO_scaling_info': RO_scaling_info
,
2727 'member_vnf_index': vnf_index
,
2728 'primitive': vnf_config_primitive
,
2729 'primitive_params': primitive_params
,
2730 'lcmOperationType': operationType
2732 op_index
= self
._find
_suboperation
(db_nslcmop
, match
)
2733 if op_index
== self
.SUBOPERATION_STATUS_NOT_FOUND
:
2734 # a. New sub-operation
2735 # The sub-operation does not exist, add it.
2736 # _ns_execute_primitive() will be called from scale() as usual, with non-modified arguments
2737 # The following parameters are set to None for all kind of scaling:
2739 vdu_count_index
= None
2741 if RO_nsr_id
and RO_scaling_info
:
2742 vnf_config_primitive
= None
2743 primitive_params
= None
2746 RO_scaling_info
= None
2747 # Initial status for sub-operation
2748 operationState
= 'PROCESSING'
2749 detailed_status
= 'In progress'
2750 # Add sub-operation for pre/post-scaling (zero or more operations)
2751 self
._add
_suboperation
(db_nslcmop
,
2756 vnf_config_primitive
,
2763 return self
.SUBOPERATION_STATUS_NEW
2765 # Return either SUBOPERATION_STATUS_SKIP (operationState == 'COMPLETED'),
2766 # or op_index (operationState != 'COMPLETED')
2767 return self
._retry
_or
_skip
_suboperation
(db_nslcmop
, op_index
)
2769 # Function to return execution_environment id
2771 def _get_ee_id(self
, vnf_index
, vdu_id
, vca_deployed_list
):
2772 # TODO vdu_index_count
2773 for vca
in vca_deployed_list
:
2774 if vca
["member-vnf-index"] == vnf_index
and vca
["vdu_id"] == vdu_id
:
2777 async def destroy_N2VC(self
, logging_text
, db_nslcmop
, vca_deployed
, config_descriptor
,
2778 vca_index
, destroy_ee
=True, exec_primitives
=True, scaling_in
=False):
2780 Execute the terminate primitives and destroy the execution environment (if destroy_ee=False
2781 :param logging_text:
2783 :param vca_deployed: Dictionary of deployment info at db_nsr._admin.depoloyed.VCA.<INDEX>
2784 :param config_descriptor: Configuration descriptor of the NSD, VNFD, VNFD.vdu or VNFD.kdu
2785 :param vca_index: index in the database _admin.deployed.VCA
2786 :param destroy_ee: False to do not destroy, because it will be destroyed all of then at once
2787 :param exec_primitives: False to do not execute terminate primitives, because the config is not completed or has
2788 not executed properly
2789 :param scaling_in: True destroys the application, False destroys the model
2790 :return: None or exception
2794 logging_text
+ " vca_index: {}, vca_deployed: {}, config_descriptor: {}, destroy_ee: {}".format(
2795 vca_index
, vca_deployed
, config_descriptor
, destroy_ee
2799 vca_type
= vca_deployed
.get("type", "lxc_proxy_charm")
2801 # execute terminate_primitives
2803 terminate_primitives
= get_ee_sorted_terminate_config_primitive_list(
2804 config_descriptor
.get("terminate-config-primitive"), vca_deployed
.get("ee_descriptor_id"))
2805 vdu_id
= vca_deployed
.get("vdu_id")
2806 vdu_count_index
= vca_deployed
.get("vdu_count_index")
2807 vdu_name
= vca_deployed
.get("vdu_name")
2808 vnf_index
= vca_deployed
.get("member-vnf-index")
2809 if terminate_primitives
and vca_deployed
.get("needed_terminate"):
2810 for seq
in terminate_primitives
:
2811 # For each sequence in list, get primitive and call _ns_execute_primitive()
2812 step
= "Calling terminate action for vnf_member_index={} primitive={}".format(
2813 vnf_index
, seq
.get("name"))
2814 self
.logger
.debug(logging_text
+ step
)
2815 # Create the primitive for each sequence, i.e. "primitive": "touch"
2816 primitive
= seq
.get('name')
2817 mapped_primitive_params
= self
._get
_terminate
_primitive
_params
(seq
, vnf_index
)
2820 self
._add
_suboperation
(db_nslcmop
,
2826 mapped_primitive_params
)
2827 # Sub-operations: Call _ns_execute_primitive() instead of action()
2829 result
, result_detail
= await self
._ns
_execute
_primitive
(vca_deployed
["ee_id"], primitive
,
2830 mapped_primitive_params
,
2832 except LcmException
:
2833 # this happens when VCA is not deployed. In this case it is not needed to terminate
2835 result_ok
= ['COMPLETED', 'PARTIALLY_COMPLETED']
2836 if result
not in result_ok
:
2837 raise LcmException("terminate_primitive {} for vnf_member_index={} fails with "
2838 "error {}".format(seq
.get("name"), vnf_index
, result_detail
))
2839 # set that this VCA do not need terminated
2840 db_update_entry
= "_admin.deployed.VCA.{}.needed_terminate".format(vca_index
)
2841 self
.update_db_2("nsrs", db_nslcmop
["nsInstanceId"], {db_update_entry
: False})
2843 if vca_deployed
.get("prometheus_jobs") and self
.prometheus
:
2844 await self
.prometheus
.update(remove_jobs
=vca_deployed
["prometheus_jobs"])
2847 await self
.vca_map
[vca_type
].delete_execution_environment(vca_deployed
["ee_id"], scaling_in
=scaling_in
)
2849 async def _delete_all_N2VC(self
, db_nsr
: dict):
2850 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='TERMINATING')
2851 namespace
= "." + db_nsr
["_id"]
2853 await self
.n2vc
.delete_namespace(namespace
=namespace
, total_timeout
=self
.timeout_charm_delete
)
2854 except N2VCNotFound
: # already deleted. Skip
2856 self
._write
_all
_config
_status
(db_nsr
=db_nsr
, status
='DELETED')
2858 async def _terminate_RO(self
, logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
):
2860 Terminates a deployment from RO
2861 :param logging_text:
2862 :param nsr_deployed: db_nsr._admin.deployed
2865 :param stage: list of string with the content to write on db_nslcmop.detailed-status.
2866 this method will update only the index 2, but it will write on database the concatenated content of the list
2871 ro_nsr_id
= ro_delete_action
= None
2872 if nsr_deployed
and nsr_deployed
.get("RO"):
2873 ro_nsr_id
= nsr_deployed
["RO"].get("nsr_id")
2874 ro_delete_action
= nsr_deployed
["RO"].get("nsr_delete_action_id")
2877 stage
[2] = "Deleting ns from VIM."
2878 db_nsr_update
["detailed-status"] = " ".join(stage
)
2879 self
._write
_op
_status
(nslcmop_id
, stage
)
2880 self
.logger
.debug(logging_text
+ stage
[2])
2881 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2882 self
._write
_op
_status
(nslcmop_id
, stage
)
2883 desc
= await self
.RO
.delete("ns", ro_nsr_id
)
2884 ro_delete_action
= desc
["action_id"]
2885 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = ro_delete_action
2886 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2887 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2888 if ro_delete_action
:
2889 # wait until NS is deleted from VIM
2890 stage
[2] = "Waiting ns deleted from VIM."
2891 detailed_status_old
= None
2892 self
.logger
.debug(logging_text
+ stage
[2] + " RO_id={} ro_delete_action={}".format(ro_nsr_id
,
2894 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2895 self
._write
_op
_status
(nslcmop_id
, stage
)
2897 delete_timeout
= 20 * 60 # 20 minutes
2898 while delete_timeout
> 0:
2899 desc
= await self
.RO
.show(
2901 item_id_name
=ro_nsr_id
,
2902 extra_item
="action",
2903 extra_item_id
=ro_delete_action
)
2906 self
._on
_update
_ro
_db
(nsrs_id
=nsr_id
, ro_descriptor
=desc
)
2908 ns_status
, ns_status_info
= self
.RO
.check_action_status(desc
)
2909 if ns_status
== "ERROR":
2910 raise ROclient
.ROClientException(ns_status_info
)
2911 elif ns_status
== "BUILD":
2912 stage
[2] = "Deleting from VIM {}".format(ns_status_info
)
2913 elif ns_status
== "ACTIVE":
2914 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2915 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2918 assert False, "ROclient.check_action_status returns unknown {}".format(ns_status
)
2919 if stage
[2] != detailed_status_old
:
2920 detailed_status_old
= stage
[2]
2921 db_nsr_update
["detailed-status"] = " ".join(stage
)
2922 self
._write
_op
_status
(nslcmop_id
, stage
)
2923 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2924 await asyncio
.sleep(5, loop
=self
.loop
)
2926 else: # delete_timeout <= 0:
2927 raise ROclient
.ROClientException("Timeout waiting ns deleted from VIM")
2929 except Exception as e
:
2930 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2931 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2932 db_nsr_update
["_admin.deployed.RO.nsr_id"] = None
2933 db_nsr_update
["_admin.deployed.RO.nsr_status"] = "DELETED"
2934 db_nsr_update
["_admin.deployed.RO.nsr_delete_action_id"] = None
2935 self
.logger
.debug(logging_text
+ "RO_ns_id={} already deleted".format(ro_nsr_id
))
2936 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2937 failed_detail
.append("delete conflict: {}".format(e
))
2938 self
.logger
.debug(logging_text
+ "RO_ns_id={} delete conflict: {}".format(ro_nsr_id
, e
))
2940 failed_detail
.append("delete error: {}".format(e
))
2941 self
.logger
.error(logging_text
+ "RO_ns_id={} delete error: {}".format(ro_nsr_id
, e
))
2944 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "nsd_id")):
2945 ro_nsd_id
= nsr_deployed
["RO"]["nsd_id"]
2947 stage
[2] = "Deleting nsd from RO."
2948 db_nsr_update
["detailed-status"] = " ".join(stage
)
2949 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2950 self
._write
_op
_status
(nslcmop_id
, stage
)
2951 await self
.RO
.delete("nsd", ro_nsd_id
)
2952 self
.logger
.debug(logging_text
+ "ro_nsd_id={} deleted".format(ro_nsd_id
))
2953 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2954 except Exception as e
:
2955 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2956 db_nsr_update
["_admin.deployed.RO.nsd_id"] = None
2957 self
.logger
.debug(logging_text
+ "ro_nsd_id={} already deleted".format(ro_nsd_id
))
2958 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2959 failed_detail
.append("ro_nsd_id={} delete conflict: {}".format(ro_nsd_id
, e
))
2960 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2962 failed_detail
.append("ro_nsd_id={} delete error: {}".format(ro_nsd_id
, e
))
2963 self
.logger
.error(logging_text
+ failed_detail
[-1])
2965 if not failed_detail
and deep_get(nsr_deployed
, ("RO", "vnfd")):
2966 for index
, vnf_deployed
in enumerate(nsr_deployed
["RO"]["vnfd"]):
2967 if not vnf_deployed
or not vnf_deployed
["id"]:
2970 ro_vnfd_id
= vnf_deployed
["id"]
2971 stage
[2] = "Deleting member_vnf_index={} ro_vnfd_id={} from RO.".format(
2972 vnf_deployed
["member-vnf-index"], ro_vnfd_id
)
2973 db_nsr_update
["detailed-status"] = " ".join(stage
)
2974 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2975 self
._write
_op
_status
(nslcmop_id
, stage
)
2976 await self
.RO
.delete("vnfd", ro_vnfd_id
)
2977 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} deleted".format(ro_vnfd_id
))
2978 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2979 except Exception as e
:
2980 if isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 404: # not found
2981 db_nsr_update
["_admin.deployed.RO.vnfd.{}.id".format(index
)] = None
2982 self
.logger
.debug(logging_text
+ "ro_vnfd_id={} already deleted ".format(ro_vnfd_id
))
2983 elif isinstance(e
, ROclient
.ROClientException
) and e
.http_code
== 409: # conflict
2984 failed_detail
.append("ro_vnfd_id={} delete conflict: {}".format(ro_vnfd_id
, e
))
2985 self
.logger
.debug(logging_text
+ failed_detail
[-1])
2987 failed_detail
.append("ro_vnfd_id={} delete error: {}".format(ro_vnfd_id
, e
))
2988 self
.logger
.error(logging_text
+ failed_detail
[-1])
2991 stage
[2] = "Error deleting from VIM"
2993 stage
[2] = "Deleted from VIM"
2994 db_nsr_update
["detailed-status"] = " ".join(stage
)
2995 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
2996 self
._write
_op
_status
(nslcmop_id
, stage
)
2999 raise LcmException("; ".join(failed_detail
))
3001 async def terminate(self
, nsr_id
, nslcmop_id
):
3002 # Try to lock HA task here
3003 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3004 if not task_is_locked_by_me
:
3007 logging_text
= "Task ns={} terminate={} ".format(nsr_id
, nslcmop_id
)
3008 self
.logger
.debug(logging_text
+ "Enter")
3009 timeout_ns_terminate
= self
.timeout_ns_terminate
3012 operation_params
= None
3014 error_list
= [] # annotates all failed error messages
3015 db_nslcmop_update
= {}
3016 autoremove
= False # autoremove after terminated
3017 tasks_dict_info
= {}
3019 stage
= ["Stage 1/3: Preparing task.", "Waiting for previous operations to terminate.", ""]
3020 # ^ contains [stage, step, VIM-status]
3022 # wait for any previous tasks in process
3023 await self
.lcm_tasks
.waitfor_related_HA("ns", 'nslcmops', nslcmop_id
)
3025 stage
[1] = "Getting nslcmop={} from db.".format(nslcmop_id
)
3026 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3027 operation_params
= db_nslcmop
.get("operationParams") or {}
3028 if operation_params
.get("timeout_ns_terminate"):
3029 timeout_ns_terminate
= operation_params
["timeout_ns_terminate"]
3030 stage
[1] = "Getting nsr={} from db.".format(nsr_id
)
3031 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3033 db_nsr_update
["operational-status"] = "terminating"
3034 db_nsr_update
["config-status"] = "terminating"
3035 self
._write
_ns
_status
(
3037 ns_state
="TERMINATING",
3038 current_operation
="TERMINATING",
3039 current_operation_id
=nslcmop_id
,
3040 other_update
=db_nsr_update
3042 self
._write
_op
_status
(
3047 nsr_deployed
= deepcopy(db_nsr
["_admin"].get("deployed")) or {}
3048 if db_nsr
["_admin"]["nsState"] == "NOT_INSTANTIATED":
3051 stage
[1] = "Getting vnf descriptors from db."
3052 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
3053 db_vnfds_from_id
= {}
3054 db_vnfds_from_member_index
= {}
3056 for vnfr
in db_vnfrs_list
:
3057 vnfd_id
= vnfr
["vnfd-id"]
3058 if vnfd_id
not in db_vnfds_from_id
:
3059 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
3060 db_vnfds_from_id
[vnfd_id
] = vnfd
3061 db_vnfds_from_member_index
[vnfr
["member-vnf-index-ref"]] = db_vnfds_from_id
[vnfd_id
]
3063 # Destroy individual execution environments when there are terminating primitives.
3064 # Rest of EE will be deleted at once
3065 # TODO - check before calling _destroy_N2VC
3066 # if not operation_params.get("skip_terminate_primitives"):#
3067 # or not vca.get("needed_terminate"):
3068 stage
[0] = "Stage 2/3 execute terminating primitives."
3069 self
.logger
.debug(logging_text
+ stage
[0])
3070 stage
[1] = "Looking execution environment that needs terminate."
3071 self
.logger
.debug(logging_text
+ stage
[1])
3073 for vca_index
, vca
in enumerate(get_iterable(nsr_deployed
, "VCA")):
3074 config_descriptor
= None
3075 if not vca
or not vca
.get("ee_id"):
3077 if not vca
.get("member-vnf-index"):
3079 config_descriptor
= db_nsr
.get("ns-configuration")
3080 elif vca
.get("vdu_id"):
3081 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3082 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
3083 elif vca
.get("kdu_name"):
3084 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3085 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
3087 db_vnfd
= db_vnfds_from_member_index
[vca
["member-vnf-index"]]
3088 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
3089 vca_type
= vca
.get("type")
3090 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3091 vca
.get("needed_terminate"))
3092 # For helm we must destroy_ee. Also for native_charm, as juju_model cannot be deleted if there are
3093 # pending native charms
3094 destroy_ee
= True if vca_type
in ("helm", "helm-v3", "native_charm") else False
3095 # self.logger.debug(logging_text + "vca_index: {}, ee_id: {}, vca_type: {} destroy_ee: {}".format(
3096 # vca_index, vca.get("ee_id"), vca_type, destroy_ee))
3097 task
= asyncio
.ensure_future(
3098 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
, vca_index
,
3099 destroy_ee
, exec_terminate_primitives
))
3100 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3102 # wait for pending tasks of terminate primitives
3104 self
.logger
.debug(logging_text
+ 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
3105 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3106 min(self
.timeout_charm_delete
, timeout_ns_terminate
),
3108 tasks_dict_info
.clear()
3110 return # raise LcmException("; ".join(error_list))
3112 # remove All execution environments at once
3113 stage
[0] = "Stage 3/3 delete all."
3115 if nsr_deployed
.get("VCA"):
3116 stage
[1] = "Deleting all execution environments."
3117 self
.logger
.debug(logging_text
+ stage
[1])
3118 task_delete_ee
= asyncio
.ensure_future(asyncio
.wait_for(self
._delete
_all
_N
2VC
(db_nsr
=db_nsr
),
3119 timeout
=self
.timeout_charm_delete
))
3120 # task_delete_ee = asyncio.ensure_future(self.n2vc.delete_namespace(namespace="." + nsr_id))
3121 tasks_dict_info
[task_delete_ee
] = "Terminating all VCA"
3123 # Delete from k8scluster
3124 stage
[1] = "Deleting KDUs."
3125 self
.logger
.debug(logging_text
+ stage
[1])
3126 # print(nsr_deployed)
3127 for kdu
in get_iterable(nsr_deployed
, "K8s"):
3128 if not kdu
or not kdu
.get("kdu-instance"):
3130 kdu_instance
= kdu
.get("kdu-instance")
3131 if kdu
.get("k8scluster-type") in self
.k8scluster_map
:
3132 task_delete_kdu_instance
= asyncio
.ensure_future(
3133 self
.k8scluster_map
[kdu
["k8scluster-type"]].uninstall(
3134 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3135 kdu_instance
=kdu_instance
))
3137 self
.logger
.error(logging_text
+ "Unknown k8s deployment type {}".
3138 format(kdu
.get("k8scluster-type")))
3140 tasks_dict_info
[task_delete_kdu_instance
] = "Terminating KDU '{}'".format(kdu
.get("kdu-name"))
3143 stage
[1] = "Deleting ns from VIM."
3145 task_delete_ro
= asyncio
.ensure_future(
3146 self
._terminate
_ng
_ro
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3148 task_delete_ro
= asyncio
.ensure_future(
3149 self
._terminate
_RO
(logging_text
, nsr_deployed
, nsr_id
, nslcmop_id
, stage
))
3150 tasks_dict_info
[task_delete_ro
] = "Removing deployment from VIM"
3152 # rest of staff will be done at finally
3154 except (ROclient
.ROClientException
, DbException
, LcmException
, N2VCException
) as e
:
3155 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3157 except asyncio
.CancelledError
:
3158 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(stage
[1]))
3159 exc
= "Operation was cancelled"
3160 except Exception as e
:
3161 exc
= traceback
.format_exc()
3162 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(stage
[1], e
), exc_info
=True)
3165 error_list
.append(str(exc
))
3167 # wait for pending tasks
3169 stage
[1] = "Waiting for terminate pending tasks."
3170 self
.logger
.debug(logging_text
+ stage
[1])
3171 error_list
+= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, timeout_ns_terminate
,
3173 stage
[1] = stage
[2] = ""
3174 except asyncio
.CancelledError
:
3175 error_list
.append("Cancelled")
3176 # TODO cancell all tasks
3177 except Exception as exc
:
3178 error_list
.append(str(exc
))
3179 # update status at database
3181 error_detail
= "; ".join(error_list
)
3182 # self.logger.error(logging_text + error_detail)
3183 error_description_nslcmop
= '{} Detail: {}'.format(stage
[0], error_detail
)
3184 error_description_nsr
= 'Operation: TERMINATING.{}, {}.'.format(nslcmop_id
, stage
[0])
3186 db_nsr_update
["operational-status"] = "failed"
3187 db_nsr_update
["detailed-status"] = error_description_nsr
+ " Detail: " + error_detail
3188 db_nslcmop_update
["detailed-status"] = error_detail
3189 nslcmop_operation_state
= "FAILED"
3193 error_description_nsr
= error_description_nslcmop
= None
3194 ns_state
= "NOT_INSTANTIATED"
3195 db_nsr_update
["operational-status"] = "terminated"
3196 db_nsr_update
["detailed-status"] = "Done"
3197 db_nsr_update
["_admin.nsState"] = "NOT_INSTANTIATED"
3198 db_nslcmop_update
["detailed-status"] = "Done"
3199 nslcmop_operation_state
= "COMPLETED"
3202 self
._write
_ns
_status
(
3205 current_operation
="IDLE",
3206 current_operation_id
=None,
3207 error_description
=error_description_nsr
,
3208 error_detail
=error_detail
,
3209 other_update
=db_nsr_update
3211 self
._write
_op
_status
(
3214 error_message
=error_description_nslcmop
,
3215 operation_state
=nslcmop_operation_state
,
3216 other_update
=db_nslcmop_update
,
3218 if ns_state
== "NOT_INSTANTIATED":
3220 self
.db
.set_list("vnfrs", {"nsr-id-ref": nsr_id
}, {"_admin.nsState": "NOT_INSTANTIATED"})
3221 except DbException
as e
:
3222 self
.logger
.warn(logging_text
+ 'Error writing VNFR status for nsr-id-ref: {} -> {}'.
3224 if operation_params
:
3225 autoremove
= operation_params
.get("autoremove", False)
3226 if nslcmop_operation_state
:
3228 await self
.msg
.aiowrite("ns", "terminated", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3229 "operationState": nslcmop_operation_state
,
3230 "autoremove": autoremove
},
3232 except Exception as e
:
3233 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3235 self
.logger
.debug(logging_text
+ "Exit")
3236 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_terminate")
3238 async def _wait_for_tasks(self
, logging_text
, created_tasks_info
, timeout
, stage
, nslcmop_id
, nsr_id
=None):
3240 error_detail_list
= []
3242 pending_tasks
= list(created_tasks_info
.keys())
3243 num_tasks
= len(pending_tasks
)
3245 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3246 self
._write
_op
_status
(nslcmop_id
, stage
)
3247 while pending_tasks
:
3249 _timeout
= timeout
+ time_start
- time()
3250 done
, pending_tasks
= await asyncio
.wait(pending_tasks
, timeout
=_timeout
,
3251 return_when
=asyncio
.FIRST_COMPLETED
)
3252 num_done
+= len(done
)
3253 if not done
: # Timeout
3254 for task
in pending_tasks
:
3255 new_error
= created_tasks_info
[task
] + ": Timeout"
3256 error_detail_list
.append(new_error
)
3257 error_list
.append(new_error
)
3260 if task
.cancelled():
3263 exc
= task
.exception()
3265 if isinstance(exc
, asyncio
.TimeoutError
):
3267 new_error
= created_tasks_info
[task
] + ": {}".format(exc
)
3268 error_list
.append(created_tasks_info
[task
])
3269 error_detail_list
.append(new_error
)
3270 if isinstance(exc
, (str, DbException
, N2VCException
, ROclient
.ROClientException
, LcmException
,
3271 K8sException
, NgRoException
)):
3272 self
.logger
.error(logging_text
+ new_error
)
3274 exc_traceback
= "".join(traceback
.format_exception(None, exc
, exc
.__traceback
__))
3275 self
.logger
.error(logging_text
+ created_tasks_info
[task
] + " " + exc_traceback
)
3277 self
.logger
.debug(logging_text
+ created_tasks_info
[task
] + ": Done")
3278 stage
[1] = "{}/{}.".format(num_done
, num_tasks
)
3280 stage
[1] += " Errors: " + ". ".join(error_detail_list
) + "."
3281 if nsr_id
: # update also nsr
3282 self
.update_db_2("nsrs", nsr_id
, {"errorDescription": "Error at: " + ", ".join(error_list
),
3283 "errorDetail": ". ".join(error_detail_list
)})
3284 self
._write
_op
_status
(nslcmop_id
, stage
)
3285 return error_detail_list
3288 def _map_primitive_params(primitive_desc
, params
, instantiation_params
):
3290 Generates the params to be provided to charm before executing primitive. If user does not provide a parameter,
3291 The default-value is used. If it is between < > it look for a value at instantiation_params
3292 :param primitive_desc: portion of VNFD/NSD that describes primitive
3293 :param params: Params provided by user
3294 :param instantiation_params: Instantiation params provided by user
3295 :return: a dictionary with the calculated params
3297 calculated_params
= {}
3298 for parameter
in primitive_desc
.get("parameter", ()):
3299 param_name
= parameter
["name"]
3300 if param_name
in params
:
3301 calculated_params
[param_name
] = params
[param_name
]
3302 elif "default-value" in parameter
or "value" in parameter
:
3303 if "value" in parameter
:
3304 calculated_params
[param_name
] = parameter
["value"]
3306 calculated_params
[param_name
] = parameter
["default-value"]
3307 if isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("<") \
3308 and calculated_params
[param_name
].endswith(">"):
3309 if calculated_params
[param_name
][1:-1] in instantiation_params
:
3310 calculated_params
[param_name
] = instantiation_params
[calculated_params
[param_name
][1:-1]]
3312 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3313 format(calculated_params
[param_name
], primitive_desc
["name"]))
3315 raise LcmException("Parameter {} needed to execute primitive {} not provided".
3316 format(param_name
, primitive_desc
["name"]))
3318 if isinstance(calculated_params
[param_name
], (dict, list, tuple)):
3319 calculated_params
[param_name
] = yaml
.safe_dump(calculated_params
[param_name
],
3320 default_flow_style
=True, width
=256)
3321 elif isinstance(calculated_params
[param_name
], str) and calculated_params
[param_name
].startswith("!!yaml "):
3322 calculated_params
[param_name
] = calculated_params
[param_name
][7:]
3323 if parameter
.get("data-type") == "INTEGER":
3325 calculated_params
[param_name
] = int(calculated_params
[param_name
])
3326 except ValueError: # error converting string to int
3328 "Parameter {} of primitive {} must be integer".format(param_name
, primitive_desc
["name"]))
3329 elif parameter
.get("data-type") == "BOOLEAN":
3330 calculated_params
[param_name
] = not ((str(calculated_params
[param_name
])).lower() == 'false')
3332 # add always ns_config_info if primitive name is config
3333 if primitive_desc
["name"] == "config":
3334 if "ns_config_info" in instantiation_params
:
3335 calculated_params
["ns_config_info"] = instantiation_params
["ns_config_info"]
3336 return calculated_params
3338 def _look_for_deployed_vca(self
, deployed_vca
, member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
=None,
3339 ee_descriptor_id
=None):
3340 # find vca_deployed record for this action. Raise LcmException if not found or there is not any id.
3341 for vca
in deployed_vca
:
3344 if member_vnf_index
!= vca
["member-vnf-index"] or vdu_id
!= vca
["vdu_id"]:
3346 if vdu_count_index
is not None and vdu_count_index
!= vca
["vdu_count_index"]:
3348 if kdu_name
and kdu_name
!= vca
["kdu_name"]:
3350 if ee_descriptor_id
and ee_descriptor_id
!= vca
["ee_descriptor_id"]:
3354 # vca_deployed not found
3355 raise LcmException("charm for member_vnf_index={} vdu_id={}.{} kdu_name={} execution-environment-list.id={}"
3356 " is not deployed".format(member_vnf_index
, vdu_id
, vdu_count_index
, kdu_name
,
3359 ee_id
= vca
.get("ee_id")
3360 vca_type
= vca
.get("type", "lxc_proxy_charm") # default value for backward compatibility - proxy charm
3362 raise LcmException("charm for member_vnf_index={} vdu_id={} kdu_name={} vdu_count_index={} has not "
3363 "execution environment"
3364 .format(member_vnf_index
, vdu_id
, kdu_name
, vdu_count_index
))
3365 return ee_id
, vca_type
3367 async def _ns_execute_primitive(self
, ee_id
, primitive
, primitive_params
, retries
=0, retries_interval
=30,
3368 timeout
=None, vca_type
=None, db_dict
=None) -> (str, str):
3370 if primitive
== "config":
3371 primitive_params
= {"params": primitive_params
}
3373 vca_type
= vca_type
or "lxc_proxy_charm"
3377 output
= await asyncio
.wait_for(
3378 self
.vca_map
[vca_type
].exec_primitive(
3380 primitive_name
=primitive
,
3381 params_dict
=primitive_params
,
3382 progress_timeout
=self
.timeout_progress_primitive
,
3383 total_timeout
=self
.timeout_primitive
,
3385 timeout
=timeout
or self
.timeout_primitive
)
3388 except asyncio
.CancelledError
:
3390 except Exception as e
: # asyncio.TimeoutError
3391 if isinstance(e
, asyncio
.TimeoutError
):
3395 self
.logger
.debug('Error executing action {} on {} -> {}'.format(primitive
, ee_id
, e
))
3397 await asyncio
.sleep(retries_interval
, loop
=self
.loop
)
3399 return 'FAILED', str(e
)
3401 return 'COMPLETED', output
3403 except (LcmException
, asyncio
.CancelledError
):
3405 except Exception as e
:
3406 return 'FAIL', 'Error executing action {}: {}'.format(primitive
, e
)
3408 async def action(self
, nsr_id
, nslcmop_id
):
3409 # Try to lock HA task here
3410 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3411 if not task_is_locked_by_me
:
3414 logging_text
= "Task ns={} action={} ".format(nsr_id
, nslcmop_id
)
3415 self
.logger
.debug(logging_text
+ "Enter")
3416 # get all needed from database
3420 db_nslcmop_update
= {}
3421 nslcmop_operation_state
= None
3422 error_description_nslcmop
= None
3425 # wait for any previous tasks in process
3426 step
= "Waiting for previous operations to terminate"
3427 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3429 self
._write
_ns
_status
(
3432 current_operation
="RUNNING ACTION",
3433 current_operation_id
=nslcmop_id
3436 step
= "Getting information from database"
3437 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3438 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3440 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3441 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3442 vdu_id
= db_nslcmop
["operationParams"].get("vdu_id")
3443 kdu_name
= db_nslcmop
["operationParams"].get("kdu_name")
3444 vdu_count_index
= db_nslcmop
["operationParams"].get("vdu_count_index")
3445 primitive
= db_nslcmop
["operationParams"]["primitive"]
3446 primitive_params
= db_nslcmop
["operationParams"]["primitive_params"]
3447 timeout_ns_action
= db_nslcmop
["operationParams"].get("timeout_ns_action", self
.timeout_primitive
)
3450 step
= "Getting vnfr from database"
3451 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3452 step
= "Getting vnfd from database"
3453 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3455 step
= "Getting nsd from database"
3456 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
3458 # for backward compatibility
3459 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3460 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3461 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3462 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3464 # look for primitive
3465 config_primitive_desc
= descriptor_configuration
= None
3467 descriptor_configuration
= get_configuration(db_vnfd
, vdu_id
)
3469 descriptor_configuration
= get_configuration(db_vnfd
, kdu_name
)
3471 descriptor_configuration
= get_configuration(db_vnfd
, db_vnfd
["id"])
3473 descriptor_configuration
= db_nsd
.get("ns-configuration")
3475 if descriptor_configuration
and descriptor_configuration
.get("config-primitive"):
3476 for config_primitive
in descriptor_configuration
["config-primitive"]:
3477 if config_primitive
["name"] == primitive
:
3478 config_primitive_desc
= config_primitive
3481 if not config_primitive_desc
:
3482 if not (kdu_name
and primitive
in ("upgrade", "rollback", "status")):
3483 raise LcmException("Primitive {} not found at [ns|vnf|vdu]-configuration:config-primitive ".
3485 primitive_name
= primitive
3486 ee_descriptor_id
= None
3488 primitive_name
= config_primitive_desc
.get("execution-environment-primitive", primitive
)
3489 ee_descriptor_id
= config_primitive_desc
.get("execution-environment-ref")
3493 vdur
= next((x
for x
in db_vnfr
["vdur"] if x
["vdu-id-ref"] == vdu_id
), None)
3494 desc_params
= parse_yaml_strings(vdur
.get("additionalParams"))
3496 kdur
= next((x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
), None)
3497 desc_params
= parse_yaml_strings(kdur
.get("additionalParams"))
3499 desc_params
= parse_yaml_strings(db_vnfr
.get("additionalParamsForVnf"))
3501 desc_params
= parse_yaml_strings(db_nsr
.get("additionalParamsForNs"))
3502 if kdu_name
and get_configuration(db_vnfd
, kdu_name
):
3503 kdu_configuration
= get_configuration(db_vnfd
, kdu_name
)
3505 for primitive
in kdu_configuration
.get("initial-config-primitive", []):
3506 actions
.add(primitive
["name"])
3507 for primitive
in kdu_configuration
.get("config-primitive", []):
3508 actions
.add(primitive
["name"])
3509 kdu_action
= True if primitive_name
in actions
else False
3511 # TODO check if ns is in a proper status
3512 if kdu_name
and (primitive_name
in ("upgrade", "rollback", "status") or kdu_action
):
3513 # kdur and desc_params already set from before
3514 if primitive_params
:
3515 desc_params
.update(primitive_params
)
3516 # TODO Check if we will need something at vnf level
3517 for index
, kdu
in enumerate(get_iterable(nsr_deployed
, "K8s")):
3518 if kdu_name
== kdu
["kdu-name"] and kdu
["member-vnf-index"] == vnf_index
:
3521 raise LcmException("KDU '{}' for vnf '{}' not deployed".format(kdu_name
, vnf_index
))
3523 if kdu
.get("k8scluster-type") not in self
.k8scluster_map
:
3524 msg
= "unknown k8scluster-type '{}'".format(kdu
.get("k8scluster-type"))
3525 raise LcmException(msg
)
3527 db_dict
= {"collection": "nsrs",
3528 "filter": {"_id": nsr_id
},
3529 "path": "_admin.deployed.K8s.{}".format(index
)}
3530 self
.logger
.debug(logging_text
+ "Exec k8s {} on {}.{}".format(primitive_name
, vnf_index
, kdu_name
))
3531 step
= "Executing kdu {}".format(primitive_name
)
3532 if primitive_name
== "upgrade":
3533 if desc_params
.get("kdu_model"):
3534 kdu_model
= desc_params
.get("kdu_model")
3535 del desc_params
["kdu_model"]
3537 kdu_model
= kdu
.get("kdu-model")
3538 parts
= kdu_model
.split(sep
=":")
3540 kdu_model
= parts
[0]
3542 detailed_status
= await asyncio
.wait_for(
3543 self
.k8scluster_map
[kdu
["k8scluster-type"]].upgrade(
3544 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3545 kdu_instance
=kdu
.get("kdu-instance"),
3546 atomic
=True, kdu_model
=kdu_model
,
3547 params
=desc_params
, db_dict
=db_dict
,
3548 timeout
=timeout_ns_action
),
3549 timeout
=timeout_ns_action
+ 10)
3550 self
.logger
.debug(logging_text
+ " Upgrade of kdu {} done".format(detailed_status
))
3551 elif primitive_name
== "rollback":
3552 detailed_status
= await asyncio
.wait_for(
3553 self
.k8scluster_map
[kdu
["k8scluster-type"]].rollback(
3554 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3555 kdu_instance
=kdu
.get("kdu-instance"),
3557 timeout
=timeout_ns_action
)
3558 elif primitive_name
== "status":
3559 detailed_status
= await asyncio
.wait_for(
3560 self
.k8scluster_map
[kdu
["k8scluster-type"]].status_kdu(
3561 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3562 kdu_instance
=kdu
.get("kdu-instance")),
3563 timeout
=timeout_ns_action
)
3565 kdu_instance
= kdu
.get("kdu-instance") or "{}-{}".format(kdu
["kdu-name"], nsr_id
)
3566 params
= self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
)
3568 detailed_status
= await asyncio
.wait_for(
3569 self
.k8scluster_map
[kdu
["k8scluster-type"]].exec_primitive(
3570 cluster_uuid
=kdu
.get("k8scluster-uuid"),
3571 kdu_instance
=kdu_instance
,
3572 primitive_name
=primitive_name
,
3573 params
=params
, db_dict
=db_dict
,
3574 timeout
=timeout_ns_action
),
3575 timeout
=timeout_ns_action
)
3578 nslcmop_operation_state
= 'COMPLETED'
3580 detailed_status
= ''
3581 nslcmop_operation_state
= 'FAILED'
3583 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"], member_vnf_index
=vnf_index
,
3584 vdu_id
=vdu_id
, vdu_count_index
=vdu_count_index
,
3585 ee_descriptor_id
=ee_descriptor_id
)
3586 db_nslcmop_notif
= {"collection": "nslcmops",
3587 "filter": {"_id": nslcmop_id
},
3588 "path": "admin.VCA"}
3589 nslcmop_operation_state
, detailed_status
= await self
._ns
_execute
_primitive
(
3591 primitive
=primitive_name
,
3592 primitive_params
=self
._map
_primitive
_params
(config_primitive_desc
, primitive_params
, desc_params
),
3593 timeout
=timeout_ns_action
,
3595 db_dict
=db_nslcmop_notif
)
3597 db_nslcmop_update
["detailed-status"] = detailed_status
3598 error_description_nslcmop
= detailed_status
if nslcmop_operation_state
== "FAILED" else ""
3599 self
.logger
.debug(logging_text
+ " task Done with result {} {}".format(nslcmop_operation_state
,
3601 return # database update is called inside finally
3603 except (DbException
, LcmException
, N2VCException
, K8sException
) as e
:
3604 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
3606 except asyncio
.CancelledError
:
3607 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
3608 exc
= "Operation was cancelled"
3609 except asyncio
.TimeoutError
:
3610 self
.logger
.error(logging_text
+ "Timeout while '{}'".format(step
))
3612 except Exception as e
:
3613 exc
= traceback
.format_exc()
3614 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
3617 db_nslcmop_update
["detailed-status"] = detailed_status
= error_description_nslcmop
= \
3618 "FAILED {}: {}".format(step
, exc
)
3619 nslcmop_operation_state
= "FAILED"
3621 self
._write
_ns
_status
(
3623 ns_state
=db_nsr
["nsState"], # TODO check if degraded. For the moment use previous status
3624 current_operation
="IDLE",
3625 current_operation_id
=None,
3626 # error_description=error_description_nsr,
3627 # error_detail=error_detail,
3628 other_update
=db_nsr_update
3631 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
3632 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
3634 if nslcmop_operation_state
:
3636 await self
.msg
.aiowrite("ns", "actioned", {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
,
3637 "operationState": nslcmop_operation_state
},
3639 except Exception as e
:
3640 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
3641 self
.logger
.debug(logging_text
+ "Exit")
3642 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_action")
3643 return nslcmop_operation_state
, detailed_status
3645 async def scale(self
, nsr_id
, nslcmop_id
):
3646 # Try to lock HA task here
3647 task_is_locked_by_me
= self
.lcm_tasks
.lock_HA('ns', 'nslcmops', nslcmop_id
)
3648 if not task_is_locked_by_me
:
3651 logging_text
= "Task ns={} scale={} ".format(nsr_id
, nslcmop_id
)
3652 stage
= ['', '', '']
3653 tasks_dict_info
= {}
3654 # ^ stage, step, VIM progress
3655 self
.logger
.debug(logging_text
+ "Enter")
3656 # get all needed from database
3658 db_nslcmop_update
= {}
3661 # in case of error, indicates what part of scale was failed to put nsr at error status
3662 scale_process
= None
3663 old_operational_status
= ""
3664 old_config_status
= ""
3667 # wait for any previous tasks in process
3668 step
= "Waiting for previous operations to terminate"
3669 await self
.lcm_tasks
.waitfor_related_HA('ns', 'nslcmops', nslcmop_id
)
3670 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None,
3671 current_operation
="SCALING", current_operation_id
=nslcmop_id
)
3673 step
= "Getting nslcmop from database"
3674 self
.logger
.debug(step
+ " after having waited for previous tasks to be completed")
3675 db_nslcmop
= self
.db
.get_one("nslcmops", {"_id": nslcmop_id
})
3677 step
= "Getting nsr from database"
3678 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
3679 old_operational_status
= db_nsr
["operational-status"]
3680 old_config_status
= db_nsr
["config-status"]
3682 step
= "Parsing scaling parameters"
3683 db_nsr_update
["operational-status"] = "scaling"
3684 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3685 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3688 nsr_deployed
= db_nsr
["_admin"].get("deployed")
3689 vnf_index
= db_nslcmop
["operationParams"].get("member_vnf_index")
3690 # vdu_id = db_nslcmop["operationParams"].get("vdu_id")
3691 # vdu_count_index = db_nslcmop["operationParams"].get("vdu_count_index")
3692 # vdu_name = db_nslcmop["operationParams"].get("vdu_name")
3695 vnf_index
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["member-vnf-index"]
3696 scaling_group
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleByStepData"]["scaling-group-descriptor"]
3697 scaling_type
= db_nslcmop
["operationParams"]["scaleVnfData"]["scaleVnfType"]
3698 # for backward compatibility
3699 if nsr_deployed
and isinstance(nsr_deployed
.get("VCA"), dict):
3700 nsr_deployed
["VCA"] = list(nsr_deployed
["VCA"].values())
3701 db_nsr_update
["_admin.deployed.VCA"] = nsr_deployed
["VCA"]
3702 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
3704 step
= "Getting vnfr from database"
3705 db_vnfr
= self
.db
.get_one("vnfrs", {"member-vnf-index-ref": vnf_index
, "nsr-id-ref": nsr_id
})
3707 step
= "Getting vnfd from database"
3708 db_vnfd
= self
.db
.get_one("vnfds", {"_id": db_vnfr
["vnfd-id"]})
3710 base_folder
= db_vnfd
["_admin"]["storage"]
3712 step
= "Getting scaling-group-descriptor"
3713 scaling_descriptor
= find_in_list(
3717 lambda scale_desc
: scale_desc
["name"] == scaling_group
3719 if not scaling_descriptor
:
3720 raise LcmException("input parameter 'scaleByStepData':'scaling-group-descriptor':'{}' is not present "
3721 "at vnfd:scaling-group-descriptor".format(scaling_group
))
3723 step
= "Sending scale order to VIM"
3724 # TODO check if ns is in a proper status
3726 if not db_nsr
["_admin"].get("scaling-group"):
3727 self
.update_db_2("nsrs", nsr_id
, {"_admin.scaling-group": [{"name": scaling_group
, "nb-scale-op": 0}]})
3728 admin_scale_index
= 0
3730 for admin_scale_index
, admin_scale_info
in enumerate(db_nsr
["_admin"]["scaling-group"]):
3731 if admin_scale_info
["name"] == scaling_group
:
3732 nb_scale_op
= admin_scale_info
.get("nb-scale-op", 0)
3734 else: # not found, set index one plus last element and add new entry with the name
3735 admin_scale_index
+= 1
3736 db_nsr_update
["_admin.scaling-group.{}.name".format(admin_scale_index
)] = scaling_group
3737 RO_scaling_info
= []
3738 VCA_scaling_info
= []
3739 vdu_scaling_info
= {"scaling_group_name": scaling_group
, "vdu": []}
3740 if scaling_type
== "SCALE_OUT":
3741 if "aspect-delta-details" not in scaling_descriptor
:
3743 "Aspect delta details not fount in scaling descriptor {}".format(
3744 scaling_descriptor
["name"]
3747 # count if max-instance-count is reached
3748 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3750 vdu_scaling_info
["scaling_direction"] = "OUT"
3751 vdu_scaling_info
["vdu-create"] = {}
3752 for delta
in deltas
:
3753 for vdu_delta
in delta
["vdu-delta"]:
3754 vdud
= get_vdu(db_vnfd
, vdu_delta
["id"])
3755 vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
3756 cloud_init_text
= self
._get
_vdu
_cloud
_init
_content
(vdud
, db_vnfd
)
3758 additional_params
= self
._get
_vdu
_additional
_params
(db_vnfr
, vdud
["id"]) or {}
3759 cloud_init_list
= []
3761 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3762 max_instance_count
= 10
3763 if vdu_profile
and "max-number-of-instances" in vdu_profile
:
3764 max_instance_count
= vdu_profile
.get("max-number-of-instances", 10)
3766 default_instance_num
= get_number_of_instances(db_vnfd
, vdud
["id"])
3768 nb_scale_op
+= vdu_delta
.get("number-of-instances", 1)
3770 if nb_scale_op
+ default_instance_num
> max_instance_count
:
3772 "reached the limit of {} (max-instance-count) "
3773 "scaling-out operations for the "
3774 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3776 for x
in range(vdu_delta
.get("number-of-instances", 1)):
3778 # TODO Information of its own ip is not available because db_vnfr is not updated.
3779 additional_params
["OSM"] = get_osm_params(
3784 cloud_init_list
.append(
3785 self
._parse
_cloud
_init
(
3792 VCA_scaling_info
.append(
3794 "osm_vdu_id": vdu_delta
["id"],
3795 "member-vnf-index": vnf_index
,
3797 "vdu_index": vdu_index
+ x
3800 RO_scaling_info
.append(
3802 "osm_vdu_id": vdu_delta
["id"],
3803 "member-vnf-index": vnf_index
,
3805 "count": vdu_delta
.get("number-of-instances", 1)
3809 RO_scaling_info
[-1]["cloud_init"] = cloud_init_list
3810 vdu_scaling_info
["vdu-create"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3812 elif scaling_type
== "SCALE_IN":
3813 if "min-instance-count" in scaling_descriptor
and scaling_descriptor
["min-instance-count"] is not None:
3814 min_instance_count
= int(scaling_descriptor
["min-instance-count"])
3816 vdu_scaling_info
["scaling_direction"] = "IN"
3817 vdu_scaling_info
["vdu-delete"] = {}
3818 deltas
= scaling_descriptor
.get("aspect-delta-details")["deltas"]
3819 for delta
in deltas
:
3820 for vdu_delta
in delta
["vdu-delta"]:
3821 vdu_index
= get_vdur_index(db_vnfr
, vdu_delta
)
3822 min_instance_count
= 0
3823 vdu_profile
= get_vdu_profile(db_vnfd
, vdu_delta
["id"])
3824 if vdu_profile
and "min-number-of-instances" in vdu_profile
:
3825 min_instance_count
= vdu_profile
["min-number-of-instances"]
3827 default_instance_num
= get_number_of_instances(db_vnfd
, vdu_delta
["id"])
3829 nb_scale_op
-= vdu_delta
.get("number-of-instances", 1)
3830 if nb_scale_op
+ default_instance_num
< min_instance_count
:
3832 "reached the limit of {} (min-instance-count) scaling-in operations for the "
3833 "scaling-group-descriptor '{}'".format(nb_scale_op
, scaling_group
)
3835 RO_scaling_info
.append({"osm_vdu_id": vdu_delta
["id"], "member-vnf-index": vnf_index
,
3836 "type": "delete", "count": vdu_delta
.get("number-of-instances", 1),
3837 "vdu_index": vdu_index
- 1})
3838 for x
in range(vdu_delta
.get("number-of-instances", 1)):
3839 VCA_scaling_info
.append(
3841 "osm_vdu_id": vdu_delta
["id"],
3842 "member-vnf-index": vnf_index
,
3844 "vdu_index": vdu_index
- 1 - x
3847 vdu_scaling_info
["vdu-delete"][vdu_delta
["id"]] = vdu_delta
.get("number-of-instances", 1)
3849 # update VDU_SCALING_INFO with the VDUs to delete ip_addresses
3850 vdu_delete
= copy(vdu_scaling_info
.get("vdu-delete"))
3851 if vdu_scaling_info
["scaling_direction"] == "IN":
3852 for vdur
in reversed(db_vnfr
["vdur"]):
3853 if vdu_delete
.get(vdur
["vdu-id-ref"]):
3854 vdu_delete
[vdur
["vdu-id-ref"]] -= 1
3855 vdu_scaling_info
["vdu"].append({
3856 "name": vdur
.get("name") or vdur
.get("vdu-name"),
3857 "vdu_id": vdur
["vdu-id-ref"],
3860 for interface
in vdur
["interfaces"]:
3861 vdu_scaling_info
["vdu"][-1]["interface"].append({
3862 "name": interface
["name"],
3863 "ip_address": interface
["ip-address"],
3864 "mac_address": interface
.get("mac-address"),
3866 # vdu_delete = vdu_scaling_info.pop("vdu-delete")
3869 step
= "Executing pre-scale vnf-config-primitive"
3870 if scaling_descriptor
.get("scaling-config-action"):
3871 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
3872 if (scaling_config_action
.get("trigger") == "pre-scale-in" and scaling_type
== "SCALE_IN") \
3873 or (scaling_config_action
.get("trigger") == "pre-scale-out" and scaling_type
== "SCALE_OUT"):
3874 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
3875 step
= db_nslcmop_update
["detailed-status"] = \
3876 "executing pre-scale scaling-config-action '{}'".format(vnf_config_primitive
)
3878 # look for primitive
3879 for config_primitive
in (get_configuration(
3880 db_vnfd
, db_vnfd
["id"]
3881 ) or {}).get("config-primitive", ()):
3882 if config_primitive
["name"] == vnf_config_primitive
:
3886 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-action"
3887 "[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:config-"
3888 "primitive".format(scaling_group
, vnf_config_primitive
))
3890 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
3891 if db_vnfr
.get("additionalParamsForVnf"):
3892 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
3894 scale_process
= "VCA"
3895 db_nsr_update
["config-status"] = "configuring pre-scaling"
3896 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
3898 # Pre-scale retry check: Check if this sub-operation has been executed before
3899 op_index
= self
._check
_or
_add
_scale
_suboperation
(
3900 db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
, 'PRE-SCALE')
3901 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
3902 # Skip sub-operation
3903 result
= 'COMPLETED'
3904 result_detail
= 'Done'
3905 self
.logger
.debug(logging_text
+
3906 "vnf_config_primitive={} Skipped sub-operation, result {} {}".format(
3907 vnf_config_primitive
, result
, result_detail
))
3909 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
3910 # New sub-operation: Get index of this sub-operation
3911 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
3912 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
3913 format(vnf_config_primitive
))
3915 # retry: Get registered params for this existing sub-operation
3916 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
3917 vnf_index
= op
.get('member_vnf_index')
3918 vnf_config_primitive
= op
.get('primitive')
3919 primitive_params
= op
.get('primitive_params')
3920 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
3921 format(vnf_config_primitive
))
3922 # Execute the primitive, either with new (first-time) or registered (reintent) args
3923 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
3924 primitive_name
= config_primitive
.get("execution-environment-primitive",
3925 vnf_config_primitive
)
3926 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
3927 member_vnf_index
=vnf_index
,
3929 vdu_count_index
=None,
3930 ee_descriptor_id
=ee_descriptor_id
)
3931 result
, result_detail
= await self
._ns
_execute
_primitive
(
3932 ee_id
, primitive_name
, primitive_params
, vca_type
=vca_type
)
3933 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
3934 vnf_config_primitive
, result
, result_detail
))
3935 # Update operationState = COMPLETED | FAILED
3936 self
._update
_suboperation
_status
(
3937 db_nslcmop
, op_index
, result
, result_detail
)
3939 if result
== "FAILED":
3940 raise LcmException(result_detail
)
3941 db_nsr_update
["config-status"] = old_config_status
3942 scale_process
= None
3945 db_nsr_update
["_admin.scaling-group.{}.nb-scale-op".format(admin_scale_index
)] = nb_scale_op
3946 db_nsr_update
["_admin.scaling-group.{}.time".format(admin_scale_index
)] = time()
3948 # SCALE-IN VCA - BEGIN
3949 if VCA_scaling_info
:
3950 step
= db_nslcmop_update
["detailed-status"] = \
3951 "Deleting the execution environments"
3952 scale_process
= "VCA"
3953 for vdu_info
in VCA_scaling_info
:
3954 if vdu_info
["type"] == "delete":
3955 member_vnf_index
= str(vdu_info
["member-vnf-index"])
3956 self
.logger
.debug(logging_text
+ "vdu info: {}".format(vdu_info
))
3957 vdu_id
= vdu_info
["osm_vdu_id"]
3958 vdu_index
= int(vdu_info
["vdu_index"])
3959 stage
[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
3960 member_vnf_index
, vdu_id
, vdu_index
)
3961 stage
[2] = step
= "Scaling in VCA"
3962 self
._write
_op
_status
(
3966 vca_update
= db_nsr
["_admin"]["deployed"]["VCA"]
3967 config_update
= db_nsr
["configurationStatus"]
3968 for vca_index
, vca
in enumerate(vca_update
):
3969 if (vca
or vca
.get("ee_id")) and vca
["member-vnf-index"] == member_vnf_index
and \
3970 vca
["vdu_count_index"] == vdu_index
:
3971 if vca
.get("vdu_id"):
3972 config_descriptor
= get_configuration(db_vnfd
, vca
.get("vdu_id"))
3973 elif vca
.get("kdu_name"):
3974 config_descriptor
= get_configuration(db_vnfd
, vca
.get("kdu_name"))
3976 config_descriptor
= get_configuration(db_vnfd
, db_vnfd
["id"])
3977 operation_params
= db_nslcmop
.get("operationParams") or {}
3978 exec_terminate_primitives
= (not operation_params
.get("skip_terminate_primitives") and
3979 vca
.get("needed_terminate"))
3980 task
= asyncio
.ensure_future(asyncio
.wait_for(
3981 self
.destroy_N2VC(logging_text
, db_nslcmop
, vca
, config_descriptor
,
3982 vca_index
, destroy_ee
=True,
3983 exec_primitives
=exec_terminate_primitives
,
3984 scaling_in
=True), timeout
=self
.timeout_charm_delete
))
3985 # wait before next removal
3986 await asyncio
.sleep(30)
3987 tasks_dict_info
[task
] = "Terminating VCA {}".format(vca
.get("ee_id"))
3988 del vca_update
[vca_index
]
3989 del config_update
[vca_index
]
3990 # wait for pending tasks of terminate primitives
3992 self
.logger
.debug(logging_text
+
3993 'Waiting for tasks {}'.format(list(tasks_dict_info
.keys())))
3994 error_list
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
,
3995 min(self
.timeout_charm_delete
,
3996 self
.timeout_ns_terminate
),
3998 tasks_dict_info
.clear()
4000 raise LcmException("; ".join(error_list
))
4002 db_vca_and_config_update
= {
4003 "_admin.deployed.VCA": vca_update
,
4004 "configurationStatus": config_update
4006 self
.update_db_2("nsrs", db_nsr
["_id"], db_vca_and_config_update
)
4007 scale_process
= None
4008 # SCALE-IN VCA - END
4012 scale_process
= "RO"
4013 if self
.ro_config
.get("ng"):
4014 await self
._scale
_ng
_ro
(logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
)
4015 vdu_scaling_info
.pop("vdu-create", None)
4016 vdu_scaling_info
.pop("vdu-delete", None)
4018 scale_process
= None
4020 self
.update_db_2("nsrs", nsr_id
, db_nsr_update
)
4023 # SCALE-UP VCA - BEGIN
4024 if VCA_scaling_info
:
4025 step
= db_nslcmop_update
["detailed-status"] = \
4026 "Creating new execution environments"
4027 scale_process
= "VCA"
4028 for vdu_info
in VCA_scaling_info
:
4029 if vdu_info
["type"] == "create":
4030 member_vnf_index
= str(vdu_info
["member-vnf-index"])
4031 self
.logger
.debug(logging_text
+ "vdu info: {}".format(vdu_info
))
4032 vnfd_id
= db_vnfr
["vnfd-ref"]
4033 vdu_index
= int(vdu_info
["vdu_index"])
4034 deploy_params
= {"OSM": get_osm_params(db_vnfr
)}
4035 if db_vnfr
.get("additionalParamsForVnf"):
4036 deploy_params
.update(parse_yaml_strings(db_vnfr
["additionalParamsForVnf"].copy()))
4037 descriptor_config
= get_configuration(db_vnfd
, db_vnfd
["id"])
4038 if descriptor_config
:
4043 logging_text
=logging_text
+ "member_vnf_index={} ".format(member_vnf_index
),
4046 nslcmop_id
=nslcmop_id
,
4052 member_vnf_index
=member_vnf_index
,
4053 vdu_index
=vdu_index
,
4055 deploy_params
=deploy_params
,
4056 descriptor_config
=descriptor_config
,
4057 base_folder
=base_folder
,
4058 task_instantiation_info
=tasks_dict_info
,
4061 vdu_id
= vdu_info
["osm_vdu_id"]
4062 vdur
= find_in_list(db_vnfr
["vdur"], lambda vdu
: vdu
["vdu-id-ref"] == vdu_id
)
4063 descriptor_config
= get_configuration(db_vnfd
, vdu_id
)
4064 if vdur
.get("additionalParams"):
4065 deploy_params_vdu
= parse_yaml_strings(vdur
["additionalParams"])
4067 deploy_params_vdu
= deploy_params
4068 deploy_params_vdu
["OSM"] = get_osm_params(db_vnfr
, vdu_id
, vdu_count_index
=vdu_index
)
4069 if descriptor_config
:
4072 stage
[1] = "Scaling member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4073 member_vnf_index
, vdu_id
, vdu_index
)
4074 stage
[2] = step
= "Scaling out VCA"
4075 self
._write
_op
_status
(
4080 logging_text
=logging_text
+ "member_vnf_index={}, vdu_id={}, vdu_index={} ".format(
4081 member_vnf_index
, vdu_id
, vdu_index
),
4084 nslcmop_id
=nslcmop_id
,
4090 member_vnf_index
=member_vnf_index
,
4091 vdu_index
=vdu_index
,
4093 deploy_params
=deploy_params_vdu
,
4094 descriptor_config
=descriptor_config
,
4095 base_folder
=base_folder
,
4096 task_instantiation_info
=tasks_dict_info
,
4099 # TODO: scaling for kdu is not implemented yet.
4100 kdu_name
= vdu_info
["osm_vdu_id"]
4101 descriptor_config
= get_configuration(db_vnfd
, kdu_name
)
4102 if descriptor_config
:
4104 vdu_index
= vdu_index
4106 kdur
= next(x
for x
in db_vnfr
["kdur"] if x
["kdu-name"] == kdu_name
)
4107 deploy_params_kdu
= {"OSM": get_osm_params(db_vnfr
)}
4108 if kdur
.get("additionalParams"):
4109 deploy_params_kdu
= parse_yaml_strings(kdur
["additionalParams"])
4112 logging_text
=logging_text
,
4115 nslcmop_id
=nslcmop_id
,
4121 member_vnf_index
=member_vnf_index
,
4122 vdu_index
=vdu_index
,
4124 deploy_params
=deploy_params_kdu
,
4125 descriptor_config
=descriptor_config
,
4126 base_folder
=base_folder
,
4127 task_instantiation_info
=tasks_dict_info
,
4130 # SCALE-UP VCA - END
4131 scale_process
= None
4134 # execute primitive service POST-SCALING
4135 step
= "Executing post-scale vnf-config-primitive"
4136 if scaling_descriptor
.get("scaling-config-action"):
4137 for scaling_config_action
in scaling_descriptor
["scaling-config-action"]:
4138 if (scaling_config_action
.get("trigger") == "post-scale-in" and scaling_type
== "SCALE_IN") \
4139 or (scaling_config_action
.get("trigger") == "post-scale-out" and scaling_type
== "SCALE_OUT"):
4140 vnf_config_primitive
= scaling_config_action
["vnf-config-primitive-name-ref"]
4141 step
= db_nslcmop_update
["detailed-status"] = \
4142 "executing post-scale scaling-config-action '{}'".format(vnf_config_primitive
)
4144 vnfr_params
= {"VDU_SCALE_INFO": vdu_scaling_info
}
4145 if db_vnfr
.get("additionalParamsForVnf"):
4146 vnfr_params
.update(db_vnfr
["additionalParamsForVnf"])
4148 # look for primitive
4149 for config_primitive
in (
4150 get_configuration(db_vnfd
, db_vnfd
["id"]) or {}
4151 ).get("config-primitive", ()):
4152 if config_primitive
["name"] == vnf_config_primitive
:
4156 "Invalid vnfd descriptor at scaling-group-descriptor[name='{}']:scaling-config-"
4157 "action[vnf-config-primitive-name-ref='{}'] does not match any vnf-configuration:"
4158 "config-primitive".format(scaling_group
, vnf_config_primitive
))
4159 scale_process
= "VCA"
4160 db_nsr_update
["config-status"] = "configuring post-scaling"
4161 primitive_params
= self
._map
_primitive
_params
(config_primitive
, {}, vnfr_params
)
4163 # Post-scale retry check: Check if this sub-operation has been executed before
4164 op_index
= self
._check
_or
_add
_scale
_suboperation
(
4165 db_nslcmop
, vnf_index
, vnf_config_primitive
, primitive_params
, 'POST-SCALE')
4166 if op_index
== self
.SUBOPERATION_STATUS_SKIP
:
4167 # Skip sub-operation
4168 result
= 'COMPLETED'
4169 result_detail
= 'Done'
4170 self
.logger
.debug(logging_text
+
4171 "vnf_config_primitive={} Skipped sub-operation, result {} {}".
4172 format(vnf_config_primitive
, result
, result_detail
))
4174 if op_index
== self
.SUBOPERATION_STATUS_NEW
:
4175 # New sub-operation: Get index of this sub-operation
4176 op_index
= len(db_nslcmop
.get('_admin', {}).get('operations')) - 1
4177 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} New sub-operation".
4178 format(vnf_config_primitive
))
4180 # retry: Get registered params for this existing sub-operation
4181 op
= db_nslcmop
.get('_admin', {}).get('operations', [])[op_index
]
4182 vnf_index
= op
.get('member_vnf_index')
4183 vnf_config_primitive
= op
.get('primitive')
4184 primitive_params
= op
.get('primitive_params')
4185 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Sub-operation retry".
4186 format(vnf_config_primitive
))
4187 # Execute the primitive, either with new (first-time) or registered (reintent) args
4188 ee_descriptor_id
= config_primitive
.get("execution-environment-ref")
4189 primitive_name
= config_primitive
.get("execution-environment-primitive",
4190 vnf_config_primitive
)
4191 ee_id
, vca_type
= self
._look
_for
_deployed
_vca
(nsr_deployed
["VCA"],
4192 member_vnf_index
=vnf_index
,
4194 vdu_count_index
=None,
4195 ee_descriptor_id
=ee_descriptor_id
)
4196 result
, result_detail
= await self
._ns
_execute
_primitive
(
4197 ee_id
, primitive_name
, primitive_params
, vca_type
=vca_type
)
4198 self
.logger
.debug(logging_text
+ "vnf_config_primitive={} Done with result {} {}".format(
4199 vnf_config_primitive
, result
, result_detail
))
4200 # Update operationState = COMPLETED | FAILED
4201 self
._update
_suboperation
_status
(
4202 db_nslcmop
, op_index
, result
, result_detail
)
4204 if result
== "FAILED":
4205 raise LcmException(result_detail
)
4206 db_nsr_update
["config-status"] = old_config_status
4207 scale_process
= None
4210 db_nsr_update
["detailed-status"] = "" # "scaled {} {}".format(scaling_group, scaling_type)
4211 db_nsr_update
["operational-status"] = "running" if old_operational_status
== "failed" \
4212 else old_operational_status
4213 db_nsr_update
["config-status"] = old_config_status
4215 except (ROclient
.ROClientException
, DbException
, LcmException
, NgRoException
) as e
:
4216 self
.logger
.error(logging_text
+ "Exit Exception {}".format(e
))
4218 except asyncio
.CancelledError
:
4219 self
.logger
.error(logging_text
+ "Cancelled Exception while '{}'".format(step
))
4220 exc
= "Operation was cancelled"
4221 except Exception as e
:
4222 exc
= traceback
.format_exc()
4223 self
.logger
.critical(logging_text
+ "Exit Exception {} {}".format(type(e
).__name
__, e
), exc_info
=True)
4225 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE", current_operation_id
=None)
4227 stage
[1] = "Waiting for instantiate pending tasks."
4228 self
.logger
.debug(logging_text
+ stage
[1])
4229 exc
= await self
._wait
_for
_tasks
(logging_text
, tasks_dict_info
, self
.timeout_ns_deploy
,
4230 stage
, nslcmop_id
, nsr_id
=nsr_id
)
4232 db_nslcmop_update
["detailed-status"] = error_description_nslcmop
= "FAILED {}: {}".format(step
, exc
)
4233 nslcmop_operation_state
= "FAILED"
4235 db_nsr_update
["operational-status"] = old_operational_status
4236 db_nsr_update
["config-status"] = old_config_status
4237 db_nsr_update
["detailed-status"] = ""
4239 if "VCA" in scale_process
:
4240 db_nsr_update
["config-status"] = "failed"
4241 if "RO" in scale_process
:
4242 db_nsr_update
["operational-status"] = "failed"
4243 db_nsr_update
["detailed-status"] = "FAILED scaling nslcmop={} {}: {}".format(nslcmop_id
, step
,
4246 error_description_nslcmop
= None
4247 nslcmop_operation_state
= "COMPLETED"
4248 db_nslcmop_update
["detailed-status"] = "Done"
4250 self
._write
_op
_status
(op_id
=nslcmop_id
, stage
="", error_message
=error_description_nslcmop
,
4251 operation_state
=nslcmop_operation_state
, other_update
=db_nslcmop_update
)
4253 self
._write
_ns
_status
(nsr_id
=nsr_id
, ns_state
=None, current_operation
="IDLE",
4254 current_operation_id
=None, other_update
=db_nsr_update
)
4256 if nslcmop_operation_state
:
4258 msg
= {"nsr_id": nsr_id
, "nslcmop_id": nslcmop_id
, "operationState": nslcmop_operation_state
}
4259 await self
.msg
.aiowrite("ns", "scaled", msg
, loop
=self
.loop
)
4260 except Exception as e
:
4261 self
.logger
.error(logging_text
+ "kafka_write notification Exception {}".format(e
))
4262 self
.logger
.debug(logging_text
+ "Exit")
4263 self
.lcm_tasks
.remove("ns", nsr_id
, nslcmop_id
, "ns_scale")
4265 async def _scale_ng_ro(self
, logging_text
, db_nsr
, db_nslcmop
, db_vnfr
, vdu_scaling_info
, stage
):
4266 nsr_id
= db_nslcmop
["nsInstanceId"]
4267 db_nsd
= self
.db
.get_one("nsds", {"_id": db_nsr
["nsd-id"]})
4270 # read from db: vnfd's for every vnf
4273 # for each vnf in ns, read vnfd
4274 for vnfr
in self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
}):
4275 db_vnfrs
[vnfr
["member-vnf-index-ref"]] = vnfr
4276 vnfd_id
= vnfr
["vnfd-id"] # vnfd uuid for this vnf
4277 # if we haven't this vnfd, read it from db
4278 if not find_in_list(db_vnfds
, lambda a_vnfd
: a_vnfd
["id"] == vnfd_id
):
4280 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
4281 db_vnfds
.append(vnfd
)
4282 n2vc_key
= self
.n2vc
.get_public_key()
4283 n2vc_key_list
= [n2vc_key
]
4284 self
.scale_vnfr(db_vnfr
, vdu_scaling_info
.get("vdu-create"), vdu_scaling_info
.get("vdu-delete"),
4286 # db_vnfr has been updated, update db_vnfrs to use it
4287 db_vnfrs
[db_vnfr
["member-vnf-index-ref"]] = db_vnfr
4288 await self
._instantiate
_ng
_ro
(logging_text
, nsr_id
, db_nsd
, db_nsr
, db_nslcmop
, db_vnfrs
,
4289 db_vnfds
, n2vc_key_list
, stage
=stage
, start_deploy
=time(),
4290 timeout_ns_deploy
=self
.timeout_ns_deploy
)
4291 if vdu_scaling_info
.get("vdu-delete"):
4292 self
.scale_vnfr(db_vnfr
, None, vdu_scaling_info
["vdu-delete"], mark_delete
=False)
4294 async def add_prometheus_metrics(self
, ee_id
, artifact_path
, ee_config_descriptor
, vnfr_id
, nsr_id
, target_ip
):
4295 if not self
.prometheus
:
4297 # look if exist a file called 'prometheus*.j2' and
4298 artifact_content
= self
.fs
.dir_ls(artifact_path
)
4299 job_file
= next((f
for f
in artifact_content
if f
.startswith("prometheus") and f
.endswith(".j2")), None)
4302 with self
.fs
.file_open((artifact_path
, job_file
), "r") as f
:
4306 _
, _
, service
= ee_id
.partition(".") # remove prefix "namespace."
4307 host_name
= "{}-{}".format(service
, ee_config_descriptor
["metric-service"])
4309 vnfr_id
= vnfr_id
.replace("-", "")
4311 "JOB_NAME": vnfr_id
,
4312 "TARGET_IP": target_ip
,
4313 "EXPORTER_POD_IP": host_name
,
4314 "EXPORTER_POD_PORT": host_port
,
4316 job_list
= self
.prometheus
.parse_job(job_data
, variables
)
4317 # ensure job_name is using the vnfr_id. Adding the metadata nsr_id
4318 for job
in job_list
:
4319 if not isinstance(job
.get("job_name"), str) or vnfr_id
not in job
["job_name"]:
4320 job
["job_name"] = vnfr_id
+ "_" + str(randint(1, 10000))
4321 job
["nsr_id"] = nsr_id
4322 job_dict
= {jl
["job_name"]: jl
for jl
in job_list
}
4323 if await self
.prometheus
.update(job_dict
):
4324 return list(job_dict
.keys())
4326 def get_vca_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4328 Get VCA Cloud and VCA Cloud Credentials for the VIM account
4330 :param: vim_account_id: VIM Account ID
4332 :return: (cloud_name, cloud_credential)
4334 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4335 return config
.get("vca_cloud"), config
.get("vca_cloud_credential")
4337 def get_vca_k8s_cloud_and_credentials(self
, vim_account_id
: str) -> (str, str):
4339 Get VCA K8s Cloud and VCA K8s Cloud Credentials for the VIM account
4341 :param: vim_account_id: VIM Account ID
4343 :return: (cloud_name, cloud_credential)
4345 config
= VimAccountDB
.get_vim_account_with_id(vim_account_id
).get("config", {})
4346 return config
.get("vca_k8s_cloud"), config
.get("vca_k8s_cloud_credential")