1 # -*- coding: utf-8 -*-
4 # Copyright 2020 Telefonica Investigacion y Desarrollo, S.A.U.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
21 from traceback
import format_exc
as traceback_format_exc
22 from osm_ng_ro
.ns_thread
import NsWorker
23 from osm_ng_ro
.validation
import validate_input
, deploy_schema
24 from osm_common
import dbmongo
, dbmemory
, fslocal
, fsmongo
, msglocal
, msgkafka
, version
as common_version
25 from osm_common
.dbbase
import DbException
26 from osm_common
.fsbase
import FsException
27 from osm_common
.msgbase
import MsgException
28 from http
import HTTPStatus
29 from uuid
import uuid4
30 from threading
import Lock
31 from random
import choice
as random_choice
33 from jinja2
import Environment
, Template
, meta
, TemplateError
, TemplateNotFound
, TemplateSyntaxError
34 from cryptography
.hazmat
.primitives
import serialization
as crypto_serialization
35 from cryptography
.hazmat
.primitives
.asymmetric
import rsa
36 from cryptography
.hazmat
.backends
import default_backend
as crypto_default_backend
38 __author__
= "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
39 min_common_version
= "0.1.16"
42 class NsException(Exception):
44 def __init__(self
, message
, http_code
=HTTPStatus
.BAD_REQUEST
):
45 self
.http_code
= http_code
46 super(Exception, self
).__init
__(message
)
51 Obtain a unique ID for this process. If running from inside docker, it will get docker ID. If not it
52 will provide a random one
55 # Try getting docker id. If fails, get pid
57 with
open("/proc/self/cgroup", "r") as f
:
58 text_id_
= f
.readline()
59 _
, _
, text_id
= text_id_
.rpartition("/")
60 text_id
= text_id
.replace("\n", "")[:12]
66 return "".join(random_choice("0123456789abcdef") for _
in range(12))
70 """utility for compare dot separate versions. Fills with zeros to proper number comparison"""
72 for point
in v
.split("."):
73 filled
.append(point
.zfill(8))
84 # self.operations = None
85 self
.logger
= logging
.getLogger("ro.ns")
87 self
.write_lock
= None
93 def init_db(self
, target_version
):
96 def start(self
, config
):
98 Connect to database, filesystem storage, and messaging
99 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
100 :param config: Configuration of db, storage, etc
104 self
.config
["process_id"] = get_process_id() # used for HA identity
105 # check right version of common
106 if versiontuple(common_version
) < versiontuple(min_common_version
):
107 raise NsException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
108 common_version
, min_common_version
))
112 if config
["database"]["driver"] == "mongo":
113 self
.db
= dbmongo
.DbMongo()
114 self
.db
.db_connect(config
["database"])
115 elif config
["database"]["driver"] == "memory":
116 self
.db
= dbmemory
.DbMemory()
117 self
.db
.db_connect(config
["database"])
119 raise NsException("Invalid configuration param '{}' at '[database]':'driver'".format(
120 config
["database"]["driver"]))
122 if config
["storage"]["driver"] == "local":
123 self
.fs
= fslocal
.FsLocal()
124 self
.fs
.fs_connect(config
["storage"])
125 elif config
["storage"]["driver"] == "mongo":
126 self
.fs
= fsmongo
.FsMongo()
127 self
.fs
.fs_connect(config
["storage"])
129 raise NsException("Invalid configuration param '{}' at '[storage]':'driver'".format(
130 config
["storage"]["driver"]))
132 if config
["message"]["driver"] == "local":
133 self
.msg
= msglocal
.MsgLocal()
134 self
.msg
.connect(config
["message"])
135 elif config
["message"]["driver"] == "kafka":
136 self
.msg
= msgkafka
.MsgKafka()
137 self
.msg
.connect(config
["message"])
139 raise NsException("Invalid configuration param '{}' at '[message]':'driver'".format(
140 config
["message"]["driver"]))
142 # TODO load workers to deal with exising database tasks
144 self
.write_lock
= Lock()
145 except (DbException
, FsException
, MsgException
) as e
:
146 raise NsException(str(e
), http_code
=e
.http_code
)
151 self
.db
.db_disconnect()
153 self
.fs
.fs_disconnect()
155 self
.msg
.disconnect()
156 self
.write_lock
= None
157 except (DbException
, FsException
, MsgException
) as e
:
158 raise NsException(str(e
), http_code
=e
.http_code
)
159 for worker
in self
.workers
:
160 worker
.insert_task(("terminate",))
162 def _create_worker(self
, vim_account_id
):
163 # TODO make use of the limit self.config["global"]["server.ns_threads"]
164 worker_id
= next((i
for i
in range(len(self
.workers
)) if not self
.workers
[i
].is_alive()), None)
165 if worker_id
is None:
166 worker_id
= len(self
.workers
)
167 self
.workers
.append(NsWorker(worker_id
, self
.config
, self
.plugins
, self
.db
))
168 self
.workers
[worker_id
].start()
169 self
.workers
[worker_id
].insert_task(("load_vim", vim_account_id
))
172 def _assign_vim(self
, vim_account_id
):
173 if vim_account_id
not in self
.assignment
:
174 self
.assignment
[vim_account_id
] = self
._create
_worker
(vim_account_id
)
176 def _get_cloud_init(self
, where
):
179 :param where: can be 'vnfr_id:file:file_name' or 'vnfr_id:vdu:vdu_idex'
182 vnfd_id
, _
, other
= where
.partition(":")
183 _type
, _
, name
= other
.partition(":")
184 vnfd
= self
.db
.get_one("vnfds", {"_id": vnfd_id
})
186 base_folder
= vnfd
["_admin"]["storage"]
187 cloud_init_file
= "{}/{}/cloud_init/{}".format(base_folder
["folder"], base_folder
["pkg-dir"], name
)
188 with self
.fs
.file_open(cloud_init_file
, "r") as ci_file
:
189 cloud_init_content
= ci_file
.read()
191 cloud_init_content
= vnfd
["vdu"][int(name
)]["cloud-init"]
193 raise NsException("Mismatch descriptor for cloud init: {}".format(where
))
194 return cloud_init_content
196 def _parse_jinja2(self
, cloud_init_content
, params
, context
):
199 ast
= env
.parse(cloud_init_content
)
200 mandatory_vars
= meta
.find_undeclared_variables(ast
)
202 for var
in mandatory_vars
:
203 if not params
or var
not in params
:
205 "Variable '{}' defined at vnfd='{}' must be provided in the instantiation parameters"
206 "inside the 'additionalParamsForVnf' block".format(var
, context
))
207 template
= Template(cloud_init_content
)
208 return template
.render(params
or {})
210 except (TemplateError
, TemplateNotFound
, TemplateSyntaxError
) as e
:
211 raise NsException("Error parsing Jinja2 to cloud-init content at vnfd='{}': {}".format(context
, e
))
213 def _create_db_ro_nsrs(self
, nsr_id
, now
):
215 key
= rsa
.generate_private_key(
216 backend
=crypto_default_backend(),
217 public_exponent
=65537,
220 private_key
= key
.private_bytes(
221 crypto_serialization
.Encoding
.PEM
,
222 crypto_serialization
.PrivateFormat
.PKCS8
,
223 crypto_serialization
.NoEncryption())
224 public_key
= key
.public_key().public_bytes(
225 crypto_serialization
.Encoding
.OpenSSH
,
226 crypto_serialization
.PublicFormat
.OpenSSH
228 private_key
= private_key
.decode('utf8')
229 public_key
= public_key
.decode('utf8')
230 except Exception as e
:
231 raise NsException("Cannot create ssh-keys: {}".format(e
))
233 schema_version
= "1.1"
234 private_key_encrypted
= self
.db
.encrypt(private_key
, schema_version
=schema_version
, salt
=nsr_id
)
240 "schema_version": schema_version
242 "public_key": public_key
,
243 "private_key": private_key_encrypted
,
246 self
.db
.create("ro_nsrs", db_content
)
249 def deploy(self
, session
, indata
, version
, nsr_id
, *args
, **kwargs
):
250 print("ns.deploy session={} indata={} version={} nsr_id={}".format(session
, indata
, version
, nsr_id
))
251 validate_input(indata
, deploy_schema
)
252 action_id
= indata
.get("action_id", str(uuid4()))
254 # get current deployment
257 db_nsr_update
= {} # update operation on nsrs
259 # db_nslcmop_update = {} # update operation on nslcmops
260 db_vnfrs
= {} # vnf's info indexed by _id
263 logging_text
= "Task deploy nsr_id={} action_id={} ".format(nsr_id
, action_id
)
264 self
.logger
.debug(logging_text
+ "Enter")
266 step
= "Getting ns and vnfr record from db"
267 # db_nslcmop = self.db.get_one("nslcmops", {"_id": nslcmop_id})
268 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
271 # read from db: vnf's of this ns
272 step
= "Getting vnfrs from db"
273 db_vnfrs_list
= self
.db
.get_list("vnfrs", {"nsr-id-ref": nsr_id
})
274 if not db_vnfrs_list
:
275 raise NsException("Cannot obtain associated VNF for ns")
276 for vnfr
in db_vnfrs_list
:
277 db_vnfrs
[vnfr
["_id"]] = vnfr
278 db_vnfrs_update
[vnfr
["_id"]] = {}
280 db_ro_nsr
= self
.db
.get_one("ro_nsrs", {"_id": nsr_id
}, fail_on_empty
=False)
282 db_ro_nsr
= self
._create
_db
_ro
_nsrs
(nsr_id
, now
)
283 ro_nsr_public_key
= db_ro_nsr
["public_key"]
285 # check that action_id is not in the list of actions. Suffixed with :index
286 if action_id
in db_ro_nsr
["actions"]:
289 new_action_id
= "{}:{}".format(action_id
, index
)
290 if new_action_id
not in db_ro_nsr
["actions"]:
291 action_id
= new_action_id
292 self
.logger
.debug(logging_text
+ "Changing action_id in use to {}".format(action_id
))
296 def _create_task(item
, action
, target_record
, target_record_id
, extra_dict
=None):
302 "action_id": action_id
,
304 "task_id": "{}:{}".format(action_id
, task_index
),
305 "status": "SCHEDULED",
308 "target_record": target_record
,
309 "target_record_id": target_record_id
,
312 task
.update(extra_dict
) # params, find_params, depends_on
316 def _create_ro_task(vim_account_id
, item
, action
, target_record
, target_record_id
, extra_dict
=None):
321 _id
= action_id
+ ":" + str(task_index
)
326 "target_id": "vim:" + vim_account_id
,
329 "created_items": None,
339 "tasks": [_create_task(item
, action
, target_record
, target_record_id
, extra_dict
)],
343 def _process_image_params(target_image
, vim_info
):
345 if target_image
.get("image"):
346 find_params
["filter_dict"] = {"name": target_image
.get("image")}
347 if target_image
.get("vim_image_id"):
348 find_params
["filter_dict"] = {"id": target_image
.get("vim_image_id")}
349 if target_image
.get("image_checksum"):
350 find_params
["filter_dict"] = {"checksum": target_image
.get("image_checksum")}
351 return {"find_params": find_params
}
353 def _process_flavor_params(target_flavor
, vim_info
):
355 def _get_resource_allocation_params(quota_descriptor
):
357 read the quota_descriptor from vnfd and fetch the resource allocation properties from the
359 :param quota_descriptor: cpu/mem/vif/disk-io quota descriptor
360 :return: quota params for limit, reserve, shares from the descriptor object
363 if quota_descriptor
.get("limit"):
364 quota
["limit"] = int(quota_descriptor
["limit"])
365 if quota_descriptor
.get("reserve"):
366 quota
["reserve"] = int(quota_descriptor
["reserve"])
367 if quota_descriptor
.get("shares"):
368 quota
["shares"] = int(quota_descriptor
["shares"])
372 "disk": int(target_flavor
["storage-gb"]),
373 # "ram": max(int(target_flavor["memory-mb"]) // 1024, 1),
374 # ^ TODO manage at vim_connectors MB instead of GB
375 "ram": int(target_flavor
["memory-mb"]),
376 "vcpus": target_flavor
["vcpu-count"],
378 if target_flavor
.get("guest-epa"):
382 if target_flavor
["guest-epa"].get("numa-node-policy"):
383 numa_node_policy
= target_flavor
["guest-epa"].get("numa-node-policy")
384 if numa_node_policy
.get("node"):
385 numa_node
= numa_node_policy
["node"][0]
386 if numa_node
.get("num-cores"):
387 numa
["cores"] = numa_node
["num-cores"]
389 if numa_node
.get("paired-threads"):
390 if numa_node
["paired-threads"].get("num-paired-threads"):
391 numa
["paired-threads"] = int(numa_node
["paired-threads"]["num-paired-threads"])
393 if len(numa_node
["paired-threads"].get("paired-thread-ids")):
394 numa
["paired-threads-id"] = []
395 for pair
in numa_node
["paired-threads"]["paired-thread-ids"]:
396 numa
["paired-threads-id"].append(
397 (str(pair
["thread-a"]), str(pair
["thread-b"]))
399 if numa_node
.get("num-threads"):
400 numa
["threads"] = int(numa_node
["num-threads"])
402 if numa_node
.get("memory-mb"):
403 numa
["memory"] = max(int(numa_node
["memory-mb"] / 1024), 1)
404 if target_flavor
["guest-epa"].get("mempage-size"):
405 extended
["mempage-size"] = target_flavor
["guest-epa"].get("mempage-size")
406 if target_flavor
["guest-epa"].get("cpu-pinning-policy") and not epa_vcpu_set
:
407 if target_flavor
["guest-epa"]["cpu-pinning-policy"] == "DEDICATED":
408 if target_flavor
["guest-epa"].get("cpu-thread-pinning-policy") and \
409 target_flavor
["guest-epa"]["cpu-thread-pinning-policy"] != "PREFER":
410 numa
["cores"] = max(flavor_data
["vcpus"], 1)
412 numa
["threads"] = max(flavor_data
["vcpus"], 1)
414 if target_flavor
["guest-epa"].get("cpu-quota") and not epa_vcpu_set
:
415 cpuquota
= _get_resource_allocation_params(target_flavor
["guest-epa"].get("cpu-quota"))
417 extended
["cpu-quota"] = cpuquota
418 if target_flavor
["guest-epa"].get("mem-quota"):
419 vduquota
= _get_resource_allocation_params(target_flavor
["guest-epa"].get("mem-quota"))
421 extended
["mem-quota"] = vduquota
422 if target_flavor
["guest-epa"].get("disk-io-quota"):
423 diskioquota
= _get_resource_allocation_params(target_flavor
["guest-epa"].get("disk-io-quota"))
425 extended
["disk-io-quota"] = diskioquota
426 if target_flavor
["guest-epa"].get("vif-quota"):
427 vifquota
= _get_resource_allocation_params(target_flavor
["guest-epa"].get("vif-quota"))
429 extended
["vif-quota"] = vifquota
431 extended
["numas"] = [numa
]
433 flavor_data
["extended"] = extended
435 extra_dict
= {"find_params": {"flavor_data": flavor_data
}}
436 flavor_data_name
= flavor_data
.copy()
437 flavor_data_name
["name"] = target_flavor
["name"]
438 extra_dict
["params"] = {"flavor_data": flavor_data_name
}
441 def _process_net_params(target_vld
, vim_info
):
444 if vim_info
.get("vim_network_name"):
445 extra_dict
["find_params"] = {"filter_dict": {"name": vim_info
.get("vim_network_name")}}
446 elif vim_info
.get("vim_network_id"):
447 extra_dict
["find_params"] = {"filter_dict": {"id": vim_info
.get("vim_network_id")}}
448 elif target_vld
.get("mgmt-network"):
449 extra_dict
["find_params"] = {"mgmt": True, "name": target_vld
["id"]}
452 extra_dict
["params"] = {
453 "net_name": "{}-{}".format(indata
["name"][:16], target_vld
.get("name", target_vld
["id"])[:16]),
454 "ip_profile": vim_info
.get('ip_profile'),
455 "provider_network_profile": vim_info
.get('provider_network'),
457 if not target_vld
.get("underlay"):
458 extra_dict
["params"]["net_type"] = "bridge"
460 extra_dict
["params"]["net_type"] = "ptp" if target_vld
.get("type") == "ELINE" else "data"
463 def _process_vdu_params(target_vdu
, vim_info
):
468 nonlocal vdu2cloud_init
469 vnf_preffix
= "vnfrs:{}".format(vnfr_id
)
470 ns_preffix
= "nsrs:{}".format(nsr_id
)
471 image_text
= ns_preffix
+ ":image." + target_vdu
["ns-image-id"]
472 flavor_text
= ns_preffix
+ ":flavor." + target_vdu
["ns-flavor-id"]
473 extra_dict
= {"depends_on": [image_text
, flavor_text
]}
475 for iface_index
, interface
in enumerate(target_vdu
["interfaces"]):
476 if interface
.get("ns-vld-id"):
477 net_text
= ns_preffix
+ ":vld." + interface
["ns-vld-id"]
479 net_text
= vnf_preffix
+ ":vld." + interface
["vnf-vld-id"]
480 extra_dict
["depends_on"].append(net_text
)
482 "name": interface
["name"],
483 "net_id": "TASK-" + net_text
,
484 "vpci": interface
.get("vpci"),
486 # TODO mac_address: used for SR-IOV ifaces #TODO for other types
487 # TODO floating_ip: True/False (or it can be None)
489 if interface
.get("type") in ("SR-IOV", "PCI-PASSTHROUGH"):
490 net_item
["use"] = "data"
491 net_item
["model"] = interface
["type"]
492 net_item
["type"] = interface
["type"]
493 elif interface
.get("type") == "OM-MGMT" or interface
.get("mgmt-interface") or \
494 interface
.get("mgmt-vnf"):
495 net_item
["use"] = "mgmt"
496 else: # if interface.get("type") in ("VIRTIO", "E1000", "PARAVIRT"):
497 net_item
["use"] = "bridge"
498 net_item
["model"] = interface
.get("type")
499 net_list
.append(net_item
)
500 if interface
.get("mgmt-vnf"):
501 extra_dict
["mgmt_vnf_interface"] = iface_index
502 elif interface
.get("mgmt-interface"):
503 extra_dict
["mgmt_vdu_interface"] = iface_index
507 if target_vdu
.get("cloud-init"):
508 if target_vdu
["cloud-init"] not in vdu2cloud_init
:
509 vdu2cloud_init
[target_vdu
["cloud-init"]] = self
._get
_cloud
_init
(target_vdu
["cloud-init"])
510 cloud_content_
= vdu2cloud_init
[target_vdu
["cloud-init"]]
511 cloud_config
["user-data"] = self
._parse
_jinja
2(cloud_content_
, target_vdu
.get("additionalParams"),
512 target_vdu
["cloud-init"])
513 if target_vdu
.get("boot-data-drive"):
514 cloud_config
["boot-data-drive"] = target_vdu
.get("boot-data-drive")
516 if target_vdu
.get("ssh-keys"):
517 ssh_keys
+= target_vdu
.get("ssh-keys")
518 if target_vdu
.get("ssh-access-required"):
519 ssh_keys
.append(ro_nsr_public_key
)
521 cloud_config
["key-pairs"] = ssh_keys
523 extra_dict
["params"] = {
524 "name": "{}-{}-{}-{}".format(indata
["name"][:16], vnfr
["member-vnf-index-ref"][:16],
525 target_vdu
["vdu-name"][:32], target_vdu
.get("count-index") or 0),
526 "description": target_vdu
["vdu-name"],
528 "image_id": "TASK-" + image_text
,
529 "flavor_id": "TASK-" + flavor_text
,
530 "net_list": net_list
,
531 "cloud_config": cloud_config
or None,
532 "disk_list": None, # TODO
533 "availability_zone_index": None, # TODO
534 "availability_zone_list": None, # TODO
538 def _process_items(target_list
, existing_list
, db_record
, db_update
, db_path
, item
, process_params
):
540 nonlocal db_new_tasks
543 # ensure all the target_list elements has an "id". If not assign the index
544 for target_index
, tl
in enumerate(target_list
):
545 if tl
and not tl
.get("id"):
546 tl
["id"] = str(target_index
)
548 # step 1 networks to be deleted/updated
549 for vld_index
, existing_vld
in enumerate(existing_list
):
550 target_vld
= next((vld
for vld
in target_list
if vld
["id"] == existing_vld
["id"]), None)
551 for existing_vim_index
, existing_vim_info
in enumerate(existing_vld
.get("vim_info", ())):
552 if not existing_vim_info
:
555 target_viminfo
= next((target_viminfo
for target_viminfo
in target_vld
.get("vim_info", ())
556 if existing_vim_info
["vim_account_id"] == target_viminfo
[
557 "vim_account_id"]), None)
559 target_viminfo
= None
560 if not target_viminfo
:
562 self
._assign
_vim
(existing_vim_info
["vim_account_id"])
563 db_new_tasks
.append(_create_task(
565 target_record
="{}.{}.vim_info.{}".format(db_record
, vld_index
, existing_vim_index
),
566 target_record_id
="{}.{}".format(db_record
, existing_vld
["id"])))
568 # TODO check one by one the vims to be created/deleted
570 # step 2 networks to be created
571 for target_vld
in target_list
:
573 for vld_index
, existing_vld
in enumerate(existing_list
):
574 if existing_vld
["id"] == target_vld
["id"]:
578 db_update
[db_path
+ ".{}".format(vld_index
)] = target_vld
579 existing_list
.append(target_vld
)
582 for vim_index
, vim_info
in enumerate(target_vld
["vim_info"]):
583 existing_viminfo
= None
585 existing_viminfo
= next(
586 (existing_viminfo
for existing_viminfo
in existing_vld
.get("vim_info", ())
587 if vim_info
["vim_account_id"] == existing_viminfo
["vim_account_id"]), None)
588 # TODO check if different. Delete and create???
589 # TODO delete if not exist
593 extra_dict
= process_params(target_vld
, vim_info
)
595 self
._assign
_vim
(vim_info
["vim_account_id"])
596 db_ro_tasks
.append(_create_ro_task(
597 vim_info
["vim_account_id"], item
, "CREATE",
598 target_record
="{}.{}.vim_info.{}".format(db_record
, vld_index
, vim_index
),
599 target_record_id
="{}.{}".format(db_record
, target_vld
["id"]),
600 extra_dict
=extra_dict
))
602 db_update
[db_path
+ ".{}".format(vld_index
)] = target_vld
604 def _process_action(indata
):
606 nonlocal db_new_tasks
611 if indata
["action"] == "inject_ssh_key":
612 key
= indata
.get("key")
613 user
= indata
.get("user")
614 password
= indata
.get("password")
615 for vnf
in indata
.get("vnf", ()):
616 if vnf
.get("_id") not in db_vnfrs
:
617 raise NsException("Invalid vnf={}".format(vnf
["_id"]))
618 db_vnfr
= db_vnfrs
[vnf
["_id"]]
619 for target_vdu
in vnf
.get("vdur", ()):
620 vdu_index
, vdur
= next((i_v
for i_v
in enumerate(db_vnfr
["vdur"]) if
621 i_v
[1]["id"] == target_vdu
["id"]), (None, None))
623 raise NsException("Invalid vdu vnf={}.{}".format(vnf
["_id"], target_vdu
["id"]))
624 vim_info
= vdur
["vim_info"][0]
625 self
._assign
_vim
(vim_info
["vim_account_id"])
626 target_record
= "vnfrs:{}:vdur.{}.ssh_keys".format(vnf
["_id"], vdu_index
)
628 "depends_on": ["vnfrs:{}:vdur.{}".format(vnf
["_id"], vdur
["id"])],
630 "ip_address": vdur
.gt("ip_address"),
633 "password": password
,
634 "private_key": db_ro_nsr
["private_key"],
635 "salt": db_ro_nsr
["_id"],
636 "schema_version": db_ro_nsr
["_admin"]["schema_version"]
639 db_ro_tasks
.append(_create_ro_task(vim_info
["vim_account_id"], "vdu", "EXEC",
640 target_record
=target_record
,
641 target_record_id
=None,
642 extra_dict
=extra_dict
))
644 with self
.write_lock
:
645 if indata
.get("action"):
646 _process_action(indata
)
648 # compute network differences
650 step
= "process NS VLDs"
651 _process_items(target_list
=indata
["ns"]["vld"] or [], existing_list
=db_nsr
.get("vld") or [],
652 db_record
="nsrs:{}:vld".format(nsr_id
), db_update
=db_nsr_update
,
653 db_path
="vld", item
="net", process_params
=_process_net_params
)
655 step
= "process NS images"
656 _process_items(target_list
=indata
["image"] or [], existing_list
=db_nsr
.get("image") or [],
657 db_record
="nsrs:{}:image".format(nsr_id
),
658 db_update
=db_nsr_update
, db_path
="image", item
="image",
659 process_params
=_process_image_params
)
661 step
= "process NS flavors"
662 _process_items(target_list
=indata
["flavor"] or [], existing_list
=db_nsr
.get("flavor") or [],
663 db_record
="nsrs:{}:flavor".format(nsr_id
),
664 db_update
=db_nsr_update
, db_path
="flavor", item
="flavor",
665 process_params
=_process_flavor_params
)
668 for vnfr_id
, vnfr
in db_vnfrs
.items():
669 # vnfr_id need to be set as global variable for among others nested method _process_vdu_params
670 step
= "process VNF={} VLDs".format(vnfr_id
)
671 target_vnf
= next((vnf
for vnf
in indata
.get("vnf", ()) if vnf
["_id"] == vnfr_id
), None)
672 target_list
= target_vnf
.get("vld") if target_vnf
else None
673 _process_items(target_list
=target_list
or [], existing_list
=vnfr
.get("vld") or [],
674 db_record
="vnfrs:{}:vld".format(vnfr_id
), db_update
=db_vnfrs_update
[vnfr
["_id"]],
675 db_path
="vld", item
="net", process_params
=_process_net_params
)
677 target_list
= target_vnf
.get("vdur") if target_vnf
else None
678 step
= "process VNF={} VDUs".format(vnfr_id
)
679 _process_items(target_list
=target_list
or [], existing_list
=vnfr
.get("vdur") or [],
680 db_record
="vnfrs:{}:vdur".format(vnfr_id
),
681 db_update
=db_vnfrs_update
[vnfr
["_id"]], db_path
="vdur", item
="vdu",
682 process_params
=_process_vdu_params
)
684 step
= "Updating database, Creating ro_tasks"
686 self
.db
.create_list("ro_tasks", db_ro_tasks
)
687 step
= "Updating database, Appending tasks to ro_tasks"
688 for task
in db_new_tasks
:
689 if not self
.db
.set_one("ro_tasks", q_filter
={"tasks.target_record": task
["target_record"]},
690 update_dict
={"to_check_at": now
, "modified_at": now
},
691 push
={"tasks": task
}, fail_on_empty
=False):
692 self
.logger
.error(logging_text
+ "Cannot find task for target_record={}".
693 format(task
["target_record"]))
694 # TODO something else appart from logging?
695 step
= "Updating database, nsrs"
697 self
.db
.set_one("nsrs", {"_id": nsr_id
}, db_nsr_update
)
698 for vnfr_id
, db_vnfr_update
in db_vnfrs_update
.items():
700 step
= "Updating database, vnfrs={}".format(vnfr_id
)
701 self
.db
.set_one("vnfrs", {"_id": vnfr_id
}, db_vnfr_update
)
703 self
.logger
.debug(logging_text
+ "Exit")
704 return {"status": "ok", "nsr_id": nsr_id
, "action_id": action_id
}, action_id
, True
706 except Exception as e
:
707 if isinstance(e
, (DbException
, NsException
)):
708 self
.logger
.error(logging_text
+ "Exit Exception while '{}': {}".format(step
, e
))
710 e
= traceback_format_exc()
711 self
.logger
.critical(logging_text
+ "Exit Exception while '{}': {}".format(step
, e
), exc_info
=True)
714 def delete(self
, session
, indata
, version
, nsr_id
, *args
, **kwargs
):
715 print("ns.delete session={} indata={} version={} nsr_id={}".format(session
, indata
, version
, nsr_id
))
716 # TODO del when ALL "tasks.nsr_id" are None of nsr_id
717 # self.db.del_list({"_id": ro_task["_id"], "tasks.nsr_id.ne": nsr_id})
719 for retry
in range(retries
):
720 with self
.write_lock
:
721 ro_tasks
= self
.db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
726 for ro_task
in ro_tasks
:
729 for index
, task
in enumerate(ro_task
["tasks"]):
732 elif task
["nsr_id"] == nsr_id
:
733 db_update
["tasks.{}".format(index
)] = None
735 to_delete
= False # used by other nsr, cannot be deleted
736 # delete or update if nobody has changed ro_task meanwhile. Used modified_at for known if changed
738 if not self
.db
.del_one("ro_tasks",
739 q_filter
={"_id": ro_task
["_id"], "modified_at": ro_task
["modified_at"]},
740 fail_on_empty
=False):
743 db_update
["modified_at"] = now
744 if not self
.db
.set_one("ro_tasks",
745 q_filter
={"_id": ro_task
["_id"], "modified_at": ro_task
["modified_at"]},
746 update_dict
=db_update
,
747 fail_on_empty
=False):
752 raise NsException("Exceeded {} retries".format(retries
))
754 return None, None, True
756 def status(self
, session
, indata
, version
, nsr_id
, action_id
, *args
, **kwargs
):
757 print("ns.status session={} indata={} version={} nsr_id={}, action_id={}".format(session
, indata
, version
,
762 ro_tasks
= self
.db
.get_list("ro_tasks", {"tasks.action_id": action_id
})
763 global_status
= "DONE"
765 for ro_task
in ro_tasks
:
766 for task
in ro_task
["tasks"]:
767 if task
["action_id"] == action_id
:
768 task_list
.append(task
)
770 if task
["status"] == "FAILED":
771 global_status
= "FAILED"
772 details
.append(ro_task
.get("vim_details", ''))
773 elif task
["status"] in ("SCHEDULED", "BUILD"):
774 if global_status
!= "FAILED":
775 global_status
= "BUILD"
779 "status": global_status
,
780 "details": ". ".join(details
) if details
else "progress {}/{}".format(done
, total
),
782 "action_id": action_id
,
785 return return_data
, None, True
787 def cancel(self
, session
, indata
, version
, nsr_id
, action_id
, *args
, **kwargs
):
788 print("ns.cancel session={} indata={} version={} nsr_id={}, action_id={}".format(session
, indata
, version
,
790 return None, None, True
792 def get_deploy(self
, session
, indata
, version
, nsr_id
, action_id
, *args
, **kwargs
):
793 nsrs
= self
.db
.get_list("nsrs", {})
796 return_data
.append({"_id": ns
["_id"], "name": ns
["name"]})
797 return return_data
, None, True
799 def get_actions(self
, session
, indata
, version
, nsr_id
, action_id
, *args
, **kwargs
):
800 ro_tasks
= self
.db
.get_list("ro_tasks", {"tasks.nsr_id": nsr_id
})
802 for ro_task
in ro_tasks
:
803 for task
in ro_task
["tasks"]:
804 if task
["action_id"] not in return_data
:
805 return_data
.append(task
["action_id"])
806 return return_data
, None, True