FROM ubuntu:16.04
# Set the working directory to /app
-WORKDIR /app/osm_nbi
+WORKDIR /app/NBI/osm_nbi
# Copy the current directory contents into the container at /app
-ADD . /app
+ADD . /app/NBI
RUN apt-get update && apt-get install -y git python3 python3-jsonschema \
python3-pymongo python3-yaml python3-pip python3-keystoneclient \
# OSM_IM
RUN pip3 install pyang && mkdir -p /app && cd /app \
- && git clone https://github.com/robshakir/pyangbind \
- && pip3 install -e pyangbind \
- && git clone https://osm.etsi.org/gerrit/osm/IM \
+ && git -C /app clone https://github.com/robshakir/pyangbind \
+ && pip3 install -e /app/pyangbind \
+ && git -C /app clone https://osm.etsi.org/gerrit/osm/IM \
&& cd /app/IM/models/yang \
- && pyang --plugindir /app/pyangbind/pyangbind/plugin -f pybind -o /app/osm_nbi/vnfd_catalog.py vnfd.yang \
- && pyang --plugindir /app/pyangbind/pyangbind/plugin -f pybind -o /app/osm_nbi/nsd_catalog.py nsd.yang
+ && mkdir /app/IM/osm_im \
+ && pyang --plugindir /app/pyangbind/pyangbind/plugin -f pybind -o /app/IM/osm_im/vnfd.py vnfd.yang \
+ && pyang --plugindir /app/pyangbind/pyangbind/plugin -f pybind -o /app/IM/osm_im/nsd.py nsd.yang \
+ && pip3 install -e /app/IM
EXPOSE 9999
ENV OSMNBI_DATABASE_DRIVER mongo
ENV OSMNBI_DATABASE_HOST mongo
ENV OSMNBI_DATABASE_PORT 27017
+# ENV OSMNBI_DATABASE_USER xxx
+# ENV OSMNBI_DATABASE_PASSWORD xxx
+# ENV OSMNBI_DATABASE_MASTERPASSWORD xxx
# web
ENV OSMNBI_STATIC_DIR /app/osm_nbi/html_public
# logs
clean:
- rm -rf dist deb_dist .build osm_nbi-*.tar.gz osm_nbi.egg-info eggs
+ rm -rf dist deb_dist .build osm_nbi-*.tar.gz osm_nbi.egg-info .eggs
package:
python3 setup.py --command-packages=stdeb.command sdist_dsc
rm -rf /var/lib/apt/lists/* && \
chmod +x start.sh
-ENV DB_HOST keystone-db # DB Hostname
-ENV DB_PORT 3306 # DB Port
-ENV ROOT_DB_USER root # DB Root User
-ENV ROOT_DB_PASSWORD admin # DB Root Password
-ENV KEYSTONE_DB_PASSWORD admin # Keystone user password
-ENV ADMIN_PASSWORD admin # Admin password
-ENV NBI_PASSWORD nbi # NBI password
+# database
+ENV DB_HOST keystone-db
+ENV DB_PORT 3306
+ENV ROOT_DB_USER root
+ENV ROOT_DB_PASSWORD admin
+# keystone
+ENV KEYSTONE_DB_PASSWORD admin
+ENV ADMIN_PASSWORD admin
+ENV NBI_PASSWORD nbi
ENTRYPOINT ./start.sh
except DbException as e:
raise AuthException(str(e), http_code=e.http_code)
- def init_db(self, target_version='1.0'):
+ def init_db(self, target_version='1.1'):
"""
- Check if the database has been initialized. If not, create the required tables
+ Check if the database has been initialized, with at least one user. If not, create an adthe required tables
and insert the predefined mappings between roles and permissions.
:param target_version: schema version that should be present in the database.
from osm_common.dbbase import DbException, deep_update_rfc7396
from http import HTTPStatus
from validation import ValidationError, pdu_new_schema, pdu_edit_schema
-from base_topic import BaseTopic, EngineException
+from base_topic import BaseTopic, EngineException, get_iterable
+from osm_im.vnfd import vnfd as vnfd_im
+from osm_im.nsd import nsd as nsd_im
+from pyangbind.lib.serialise import pybindJSONDecoder
+import pyangbind.lib.pybindJSON as pybindJSON
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
"""
Return the file content of a vnfd or nsd
:param session: contains the used login username and working project
- :param _id: Identity of the vnfd, ndsd
+ :param _id: Identity of the vnfd, nsd
:param path: artifact path or "$DESCRIPTOR" or None
:param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
- :return: opened file or raises an exception
+ :return: opened file plus Accept format or raises an exception
"""
accept_text = accept_zip = False
if accept_header:
if 'text/plain' in accept_header or '*/*' in accept_header:
accept_text = True
if 'application/zip' in accept_header or '*/*' in accept_header:
- accept_zip = True
+ accept_zip = 'application/zip'
+ elif 'application/gzip' in accept_header:
+ accept_zip = 'application/gzip'
+
if not accept_text and not accept_zip:
raise EngineException("provide request header 'Accept' with 'application/zip' or 'text/plain'",
http_code=HTTPStatus.NOT_ACCEPTABLE)
# TODO generate zipfile if not present
raise EngineException("Only allowed 'text/plain' Accept header for this descriptor. To be solved in "
"future versions", http_code=HTTPStatus.NOT_ACCEPTABLE)
- return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), "application/zip"
+ return self.fs.file_open((storage['folder'], storage['zipfile']), "rb"), accept_zip
+
+ def pyangbind_validation(self, item, data, force=False):
+ try:
+ if item == "vnfds":
+ myvnfd = vnfd_im()
+ pybindJSONDecoder.load_ietf_json({'vnfd:vnfd-catalog': {'vnfd': [data]}}, None, None, obj=myvnfd,
+ path_helper=True, skip_unknown=force)
+ out = pybindJSON.dumps(myvnfd, mode="ietf")
+ elif item == "nsds":
+ mynsd = nsd_im()
+ pybindJSONDecoder.load_ietf_json({'nsd:nsd-catalog': {'nsd': [data]}}, None, None, obj=mynsd,
+ path_helper=True, skip_unknown=force)
+ out = pybindJSON.dumps(mynsd, mode="ietf")
+ else:
+ raise EngineException("Not possible to validate '{}' item".format(item),
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ desc_out = self._remove_envelop(yaml.safe_load(out))
+ return desc_out
+
+ except Exception as e:
+ raise EngineException("Error in pyangbind validation: {}".format(str(e)),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
class VnfdTopic(DescriptorTopic):
clean_indata = clean_indata['vnfd-catalog']
if clean_indata.get('vnfd'):
if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1:
- raise EngineException("'vnfd' must be a list only one element")
+ raise EngineException("'vnfd' must be a list of only one element")
clean_indata = clean_indata['vnfd'][0]
+ elif clean_indata.get('vnfd:vnfd'):
+ if not isinstance(clean_indata['vnfd:vnfd'], list) or len(clean_indata['vnfd:vnfd']) != 1:
+ raise EngineException("'vnfd:vnfd' must be a list of only one element")
+ clean_indata = clean_indata['vnfd:vnfd'][0]
return clean_indata
def check_conflict_on_del(self, session, _id, force=False):
def _validate_input_new(self, indata, force=False):
# TODO validate with pyangbind, serialize
+ indata = self.pyangbind_validation("vnfds", indata, force)
+ # Cross references validation in the descriptor
+ if not indata.get("mgmt-interface"):
+ raise EngineException("'mgmt-interface' is a mandatory field and it is not defined",
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ if indata["mgmt-interface"].get("cp"):
+ for cp in get_iterable(indata.get("connection-point")):
+ if cp["name"] == indata["mgmt-interface"]["cp"]:
+ break
+ else:
+ raise EngineException("mgmt-interface:cp='{}' must match an existing connection-point"
+ .format(indata["mgmt-interface"]["cp"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+
+ for vdu in get_iterable(indata.get("vdu")):
+ for interface in get_iterable(vdu.get("interface")):
+ if interface.get("external-connection-point-ref"):
+ for cp in get_iterable(indata.get("connection-point")):
+ if cp["name"] == interface["external-connection-point-ref"]:
+ break
+ else:
+ raise EngineException("vdu[id='{}']:interface[name='{}']:external-connection-point-ref='{}' "
+ "must match an existing connection-point"
+ .format(vdu["id"], interface["name"],
+ interface["external-connection-point-ref"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+
+ elif interface.get("internal-connection-point-ref"):
+ for internal_cp in get_iterable(vdu.get("internal-connection-point")):
+ if interface["internal-connection-point-ref"] == internal_cp.get("id"):
+ break
+ else:
+ raise EngineException("vdu[id='{}']:interface[name='{}']:internal-connection-point-ref='{}' "
+ "must match an existing vdu:internal-connection-point"
+ .format(vdu["id"], interface["name"],
+ interface["internal-connection-point-ref"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ for ivld in get_iterable(indata.get("internal-vld")):
+ for icp in get_iterable(ivld.get("internal-connection-point")):
+ icp_mark = False
+ for vdu in get_iterable(indata.get("vdu")):
+ for internal_cp in get_iterable(vdu.get("internal-connection-point")):
+ if icp["id-ref"] == internal_cp["id"]:
+ icp_mark = True
+ break
+ if icp_mark:
+ break
+ else:
+ raise EngineException("internal-vld[id='{}']:internal-connection-point='{}' must match an existing "
+ "vdu:internal-connection-point".format(ivld["id"], icp["id-ref"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ if ivld.get("ip-profile-ref"):
+ for ip_prof in get_iterable(indata.get("ip-profiles")):
+ if ip_prof["name"] == get_iterable(ivld.get("ip-profile-ref")):
+ break
+ else:
+ raise EngineException("internal-vld[id='{}']:ip-profile-ref='{}' does not exist".format(
+ ivld["id"], ivld["ip-profile-ref"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ for mp in get_iterable(indata.get("monitoring-param")):
+ if mp.get("vdu-monitoring-param"):
+ mp_vmp_mark = False
+ for vdu in get_iterable(indata.get("vdu")):
+ for vmp in get_iterable(vdu.get("monitoring-param")):
+ if vmp["id"] == mp["vdu-monitoring-param"].get("vdu-monitoring-param-ref") and vdu["id"] ==\
+ mp["vdu-monitoring-param"]["vdu-ref"]:
+ mp_vmp_mark = True
+ break
+ if mp_vmp_mark:
+ break
+ else:
+ raise EngineException("monitoring-param:vdu-monitoring-param:vdu-monitoring-param-ref='{}' not "
+ "defined at vdu[id='{}'] or vdu does not exist"
+ .format(mp["vdu-monitoring-param"]["vdu-monitoring-param-ref"],
+ mp["vdu-monitoring-param"]["vdu-ref"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ elif mp.get("vdu-metric"):
+ mp_vm_mark = False
+ for vdu in get_iterable(indata.get("vdu")):
+ if vdu.get("vdu-configuration"):
+ for metric in get_iterable(vdu["vdu-configuration"].get("metrics")):
+ if metric["name"] == mp["vdu-metric"]["vdu-metric-name-ref"] and vdu["id"] == \
+ mp["vdu-metric"]["vdu-ref"]:
+ mp_vm_mark = True
+ break
+ if mp_vm_mark:
+ break
+ else:
+ raise EngineException("monitoring-param:vdu-metric:vdu-metric-name-ref='{}' not defined at "
+ "vdu[id='{}'] or vdu does not exist"
+ .format(mp["vdu-metric"]["vdu-metric-name-ref"],
+ mp["vdu-metric"]["vdu-ref"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+
+ for sgd in get_iterable(indata.get("scaling-group-descriptor")):
+ for sp in get_iterable(sgd.get("scaling-policy")):
+ for sc in get_iterable(sp.get("scaling-criteria")):
+ for mp in get_iterable(indata.get("monitoring-param")):
+ if mp["id"] == get_iterable(sc.get("vnf-monitoring-param-ref")):
+ break
+ else:
+ raise EngineException("scaling-group-descriptor[name='{}']:scaling-criteria[name='{}']:"
+ "vnf-monitoring-param-ref='{}' not defined in any monitoring-param"
+ .format(sgd["name"], sc["name"], sc["vnf-monitoring-param-ref"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ for sgd_vdu in get_iterable(sgd.get("vdu")):
+ sgd_vdu_mark = False
+ for vdu in get_iterable(indata.get("vdu")):
+ if vdu["id"] == sgd_vdu["vdu-id-ref"]:
+ sgd_vdu_mark = True
+ break
+ if sgd_vdu_mark:
+ break
+ else:
+ raise EngineException("scaling-group-descriptor[name='{}']:vdu-id-ref={} does not match any vdu"
+ .format(sgd["name"], sgd_vdu["vdu-id-ref"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ for sca in get_iterable(sgd.get("scaling-config-action")):
+ if not indata.get("vnf-configuration"):
+ raise EngineException("'vnf-configuration' not defined in the descriptor but it is referenced by "
+ "scaling-group-descriptor[name='{}']:scaling-config-action"
+ .format(sgd["name"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
+ for primitive in get_iterable(indata["vnf-configuration"].get("config-primitive")):
+ if primitive["name"] == sca["vnf-config-primitive-name-ref"]:
+ break
+ else:
+ raise EngineException("scaling-group-descriptor[name='{}']:scaling-config-action:vnf-config-"
+ "primitive-name-ref='{}' does not match any "
+ "vnf-configuration:config-primitive:name"
+ .format(sgd["name"], sca["vnf-config-primitive-name-ref"]),
+ http_code=HTTPStatus.UNPROCESSABLE_ENTITY)
return indata
def _validate_input_edit(self, indata, force=False):
clean_indata = clean_indata['nsd-catalog']
if clean_indata.get('nsd'):
if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1:
- raise EngineException("'nsd' must be a list only one element")
+ raise EngineException("'nsd' must be a list of only one element")
clean_indata = clean_indata['nsd'][0]
+ elif clean_indata.get('nsd:nsd'):
+ if not isinstance(clean_indata['nsd:nsd'], list) or len(clean_indata['nsd:nsd']) != 1:
+ raise EngineException("'nsd:nsd' must be a list of only one element")
+ clean_indata = clean_indata['nsd:nsd'][0]
return clean_indata
def _validate_input_new(self, indata, force=False):
- # transform constituent-vnfd:member-vnf-index to string
- if indata.get("constituent-vnfd"):
- for constituent_vnfd in indata["constituent-vnfd"]:
- if "member-vnf-index" in constituent_vnfd:
- constituent_vnfd["member-vnf-index"] = str(constituent_vnfd["member-vnf-index"])
# TODO validate with pyangbind, serialize
+ indata = self.pyangbind_validation("nsds", indata, force)
return indata
def _validate_input_edit(self, indata, force=False):
from admin_topics import UserTopic, ProjectTopic, VimAccountTopic, SdnTopic
from descriptor_topics import VnfdTopic, NsdTopic, PduTopic
from instance_topics import NsrTopic, VnfrTopic, NsLcmOpTopic
+from base64 import b64encode
+from os import urandom
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
min_common_version = "0.1.8"
raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
return self.map_topic[topic].show(session, _id)
+ def get_file(self, session, topic, _id, path=None, accept_header=None):
+ """
+ Get descriptor package or artifact file content
+ :param session: contains the used login username and working project
+ :param topic: it can be: users, projects, vnfds, nsds,
+ :param _id: server id of the item
+ :param path: artifact path or "$DESCRIPTOR" or None
+ :param accept_header: Content of Accept header. Must contain applition/zip or/and text/plain
+ :return: opened file plus Accept format or raises an exception
+ """
+ if topic not in self.map_topic:
+ raise EngineException("Unknown topic {}!!!".format(topic), HTTPStatus.INTERNAL_SERVER_ERROR)
+ return self.map_topic[topic].get_file(session, _id, path, accept_header)
+
def del_item_list(self, session, topic, _filter=None):
"""
Delete a list of items
_id = self.map_topic["users"].new(roolback_list, fake_session, user_desc, force=True)
return _id
+ def upgrade_db(self, current_version, target_version):
+ if not target_version or current_version == target_version:
+ return
+ if target_version == '1.0':
+ if not current_version:
+ # create database version
+ serial = urandom(32)
+ version_data = {
+ "_id": 'version', # Always 'version'
+ "version_int": 1000, # version number
+ "version": '1.0', # version text
+ "date": "2018-10-25", # version date
+ "description": "added serial", # changes in this version
+ 'status': 'ENABLED', # ENABLED, DISABLED (migration in process), ERROR,
+ 'serial': b64encode(serial)
+ }
+ self.db.create("admin", version_data)
+ self.db.set_secret_key(serial)
+ # TODO add future migrations here
+
+ raise EngineException("Wrong database version '{}'. Expected '{}'"
+ ". It cannot be up/down-grade".format(current_version, target_version),
+ http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
+
def init_db(self, target_version='1.0'):
"""
- Init database if empty. If not empty it checks that database version is ok.
+ Init database if empty. If not empty it checks that database version and migrates if needed
If empty, it creates a new user admin/admin at 'users' and a new entry at 'version'
+ :param target_version: check desired database version. Migrate to it if possible or raises exception
:return: None if ok, exception if error or if the version is different.
"""
- version = self.db.get_one("version", fail_on_empty=False, fail_on_more=False)
- if not version:
- # create user admin
- self.create_admin()
- # create database version
- version_data = {
- "_id": '1.0', # version text
- "version": 1000, # version number
- "date": "2018-04-12", # version date
- "description": "initial design", # changes in this version
- 'status': 'ENABLED' # ENABLED, DISABLED (migration in process), ERROR,
- }
- self.db.create("version", version_data)
- elif version["_id"] != target_version:
- # TODO implement migration process
- raise EngineException("Wrong database version '{}'. Expected '{}'".format(
- version["_id"], target_version), HTTPStatus.INTERNAL_SERVER_ERROR)
- elif version["status"] != 'ENABLED':
+
+ version_data = self.db.get_one("admin", {"_id": "version"}, fail_on_empty=False, fail_on_more=True)
+ # check database status is ok
+ if version_data and version_data.get("status") != 'ENABLED':
raise EngineException("Wrong database status '{}'".format(
- version["status"]), HTTPStatus.INTERNAL_SERVER_ERROR)
+ version_data["status"]), HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ # check version
+ db_version = None if not version_data else version_data.get("version")
+ if db_version != target_version:
+ self.upgrade_db(db_version, target_version)
+
+ # create user admin if not exist
+ self.create_admin()
return
import yaml
from http import HTTPStatus
+from html import escape as html_escape
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
data_id = k.pop("_id", None)
elif isinstance(k, str):
data_id = k
- body += '<p> <a href="/osm/{url}/{id}">{id}</a>: {t} </p>'.format(url=request.path_info, id=data_id, t=k)
+ body += '<p> <a href="/osm/{url}/{id}">{id}</a>: {t} </p>'.format(url=request.path_info, id=data_id,
+ t=html_escape(str(k)))
elif isinstance(data, dict):
if "Location" in response.headers:
body += '<a href="{}"> show </a>'.format(response.headers["Location"])
request.path_info.startswith("/nslcm/v1/ns_instances/"):
_id = request.path_info[request.path_info.rfind("/")+1:]
body += html_nslcmop_body.format(id=_id)
- body += "<pre>" + yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False) + "</pre>"
+ body += "<pre>" + html_escape(yaml.safe_dump(data, explicit_start=True, indent=4, default_flow_style=False)) + \
+ "</pre>"
elif data is None:
if request.method == "DELETE" or "METHOD=DELETE" in request.query_string:
body += "<pre> deleted </pre>"
else:
- body = str(data)
+ body = html_escape(str(data))
user_text = " "
if session:
if session.get("username"):
-0.1.21
-2018-10-09
+0.1.24
+2018-11-02
from uuid import uuid4
from http import HTTPStatus
from time import time
-from copy import copy
+from copy import copy, deepcopy
from validation import validate_input, ValidationError, ns_instantiate, ns_action, ns_scale
from base_topic import BaseTopic, EngineException, get_iterable
from descriptor_topics import DescriptorTopic
}
vnfr_descriptor["connection-point"].append(vnf_cp)
for vdu in vnfd["vdu"]:
- vdur_id = str(uuid4())
vdur = {
- "id": vdur_id,
"vdu-id-ref": vdu["id"],
# TODO "name": "" Name of the VDU in the VIM
"ip-address": None, # mgmt-interface filled by LCM
"internal-connection-point": [],
"interfaces": [],
}
+ if vdu.get("pdu-type"):
+ vdur["pdu-type"] = vdu["pdu-type"]
# TODO volumes: name, volume-id
for icp in vdu.get("internal-connection-point", ()):
vdu_icp = {
# "ip-address", "mac-address" # filled by LCM
# vim-id # TODO it would be nice having a vim port id
}
+ if iface.get("mgmt-interface"):
+ vdu_iface["mgmt-interface"] = True
+
vdur["interfaces"].append(vdu_iface)
- vnfr_descriptor["vdur"].append(vdur)
+ count = vdu.get("count", 1)
+ if count is None:
+ count = 1
+ count = int(count) # TODO remove when descriptor serialized with payngbind
+ for index in range(0, count):
+ if index:
+ vdur = deepcopy(vdur)
+ vdur["_id"] = str(uuid4())
+ vdur["count-index"] = index
+ vnfr_descriptor["vdur"].append(vdur)
step = "creating vnfr vnfd-id='{}' constituent-vnfd='{}' at database".format(
member_vnf["vnfd-id-ref"], member_vnf["member-vnf-index"])
raise EngineException("Invalid parameter member_vnf_index='{}' is not one of the "
"nsd:constituent-vnfd".format(member_vnf_index))
+ def _check_vnf_instantiation_params(in_vnfd, vnfd):
+
+ for in_vdu in get_iterable(in_vnfd.get("vdu")):
+ for vdu in get_iterable(vnfd.get("vdu")):
+ if in_vdu["id"] == vdu["id"]:
+ for volume in get_iterable(in_vdu.get("volume")):
+ for volumed in get_iterable(vdu.get("volumes")):
+ if volumed["name"] == volume["name"]:
+ break
+ else:
+ raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:vdu[id='{}']:"
+ "volume:name='{}' is not present at vnfd:vdu:volumes list".
+ format(in_vnf["member-vnf-index"], in_vdu["id"],
+ volume["name"]))
+ for in_iface in get_iterable(in_vdu["interface"]):
+ for iface in get_iterable(vdu.get("interface")):
+ if in_iface["name"] == iface["name"]:
+ break
+ else:
+ raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:vdu[id='{}']:"
+ "interface[name='{}'] is not present at vnfd:vdu:interface"
+ .format(in_vnf["member-vnf-index"], in_vdu["id"],
+ in_iface["name"]))
+ break
+ else:
+ raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:vdu[id='{}'] is is not present "
+ "at vnfd:vdu".format(in_vnf["member-vnf-index"], in_vdu["id"]))
+
+ for in_ivld in get_iterable(in_vnfd.get("internal-vld")):
+ for ivld in get_iterable(vnfd.get("internal-vld")):
+ if in_ivld["name"] == ivld["name"] or in_ivld["name"] == ivld["id"]:
+ for in_icp in get_iterable(in_ivld["internal-connection-point"]):
+ for icp in ivld["internal-connection-point"]:
+ if in_icp["id-ref"] == icp["id-ref"]:
+ break
+ else:
+ raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:internal-vld[name"
+ "='{}']:internal-connection-point[id-ref:'{}'] is not present at "
+ "vnfd:internal-vld:name/id:internal-connection-point"
+ .format(in_vnf["member-vnf-index"], in_ivld["name"],
+ in_icp["id-ref"], vnfd["id"]))
+ break
+ else:
+ raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:internal-vld:name='{}'"
+ " is not present at vnfd '{}'".format(in_vnf["member-vnf-index"],
+ in_ivld["name"], vnfd["id"]))
+
def check_valid_vim_account(vim_account):
if vim_account in vim_accounts:
return
try:
- # TODO add _get_project_filter
- self.db.get_one("vim_accounts", {"_id": vim_account})
+ db_filter = self._get_project_filter(session, write=False, show_all=True)
+ db_filter["_id"] = vim_account
+ self.db.get_one("vim_accounts", db_filter)
except Exception:
- raise EngineException("Invalid vimAccountId='{}' not present".format(vim_account))
+ raise EngineException("Invalid vimAccountId='{}' not present for the project".format(vim_account))
vim_accounts.append(vim_account)
if operation == "action":
check_valid_vim_account(indata["vimAccountId"])
for in_vnf in get_iterable(indata.get("vnf")):
vnfd = check_valid_vnf_member_index(in_vnf["member-vnf-index"])
+ _check_vnf_instantiation_params(in_vnf, vnfd)
if in_vnf.get("vimAccountId"):
check_valid_vim_account(in_vnf["vimAccountId"])
- for in_vdu in get_iterable(in_vnf.get("vdu")):
- for vdud in get_iterable(vnfd.get("vdu")):
- if vdud["id"] == in_vdu["id"]:
- for volume in get_iterable(in_vdu.get("volume")):
- for volumed in get_iterable(vdud.get("volumes")):
- if volumed["name"] == volume["name"]:
- break
- else:
- raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:vdu[id='{}']:"
- "volume:name='{}' is not present at vnfd:vdu:volumes list".
- format(in_vnf["member-vnf-index"], in_vdu["id"],
- volume["name"]))
- break
- else:
- raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:vdu:id='{}' is not "
- "present at vnfd".format(in_vnf["member-vnf-index"], in_vdu["id"]))
- for in_internal_vld in get_iterable(in_vnf.get("internal-vld")):
- for internal_vldd in get_iterable(vnfd.get("internal-vld")):
- if in_internal_vld["name"] == internal_vldd["name"] or \
- in_internal_vld["name"] == internal_vldd["id"]:
- break
- else:
- raise EngineException("Invalid parameter vnf[member-vnf-index='{}']:internal-vld:name='{}'"
- " is not present at vnfd '{}'".format(in_vnf["member-vnf-index"],
- in_internal_vld["name"],
- vnfd["id"]))
for in_vld in get_iterable(indata.get("vld")):
for vldd in get_iterable(nsd.get("vld")):
if in_vld["name"] == vldd["name"] or in_vld["name"] == vldd["id"]:
raise EngineException("Invalid parameter vld:name='{}' is not present at nsd:vld".format(
in_vld["name"]))
+ def _look_for_pdu(self, session, rollback, vnfr, vim_account):
+ """
+ Look for a free PDU in the catalog matching vdur type and interfaces. Fills vdur with ip_address information
+ :param vdur: vnfr:vdur descriptor. It is modified with pdu interface info if pdu is found
+ :param member_vnf_index: used just for logging. Target vnfd of nsd
+ :param vim_account:
+ :return: vder_update: dictionary to update vnfr:vdur with pdu info. In addition it modified choosen pdu to set
+ at status IN_USE
+ """
+ vnfr_update = {}
+ rollback_vnfr = {}
+ for vdur_index, vdur in enumerate(get_iterable(vnfr.get("vdur"))):
+ if not vdur.get("pdu-type"):
+ continue
+ pdu_type = vdur.get("pdu-type")
+ pdu_filter = self._get_project_filter(session, write=True, show_all=True)
+ pdu_filter["vim.vim_accounts"] = vim_account
+ pdu_filter["type"] = pdu_type
+ pdu_filter["_admin.operationalState"] = "ENABLED"
+ pdu_filter["_admin.usageSate"] = "NOT_IN_USE",
+ # TODO feature 1417: "shared": True,
+
+ available_pdus = self.db.get_list("pdus", pdu_filter)
+ for pdu in available_pdus:
+ # step 1 check if this pdu contains needed interfaces:
+ match_interfaces = True
+ for vdur_interface in vdur["interfaces"]:
+ for pdu_interface in pdu["interfaces"]:
+ if pdu_interface["name"] == vdur_interface["name"]:
+ # TODO feature 1417: match per mgmt type
+ break
+ else: # no interface found for name
+ match_interfaces = False
+ break
+ if match_interfaces:
+ break
+ else:
+ raise EngineException(
+ "No PDU of type={} found for member_vnf_index={} at vim_account={} matching interface "
+ "names".format(vdur["vdu-id-ref"], vnfr["member-vnf-index-ref"], pdu_type))
+
+ # step 2. Update pdu
+ rollback_pdu = {
+ "_admin.usageState": pdu["_admin"]["usageState"],
+ "_admin.usage.vnfr_id": None,
+ "_admin.usage.nsr_id": None,
+ "_admin.usage.vdur": None,
+ }
+ self.db.set_one("pdus", {"_id": pdu["_id"]},
+ {"_admin.usageSate": "IN_USE",
+ "_admin.usage.vnfr_id": vnfr["_id"],
+ "_admin.usage.nsr_id": vnfr["nsr-id-ref"],
+ "_admin.usage.vdur": vdur["vdu-id-ref"]}
+ )
+ rollback.append({"topic": "pdus", "_id": pdu["_id"], "operation": "set", "content": rollback_pdu})
+
+ # step 3. Fill vnfr info by filling vdur
+ vdu_text = "vdur.{}".format(vdur_index)
+ rollback_vnfr[vdu_text + ".pdu-id"] = None
+ vnfr_update[vdu_text + ".pdu-id"] = pdu["_id"]
+ for iface_index, vdur_interface in enumerate(vdur["interfaces"]):
+ for pdu_interface in pdu["interfaces"]:
+ if pdu_interface["name"] == vdur_interface["name"]:
+ iface_text = vdu_text + ".interfaces.{}".format(iface_index)
+ for k, v in pdu_interface.items():
+ vnfr_update[iface_text + ".{}".format(k)] = v
+ rollback_vnfr[iface_text + ".{}".format(k)] = vdur_interface.get(v)
+ break
+
+ if vnfr_update:
+ self.db.set_one("vnfrs", {"_id": vnfr["_id"]}, vnfr_update)
+ rollback.append({"topic": "vnfrs", "_id": vnfr["_id"], "operation": "set", "content": rollback_vnfr})
+ return
+
+ def _update_vnfrs(self, session, rollback, nsr, indata):
+ vnfrs = None
+ # get vnfr
+ nsr_id = nsr["_id"]
+ vnfrs = self.db.get_list("vnfrs", {"nsr-id-ref": nsr_id})
+
+ for vnfr in vnfrs:
+ vnfr_update = {}
+ vnfr_update_rollback = {}
+ member_vnf_index = vnfr["member-vnf-index-ref"]
+ # update vim-account-id
+
+ vim_account = indata["vimAccountId"]
+ # check instantiate parameters
+ for vnf_inst_params in get_iterable(indata.get("vnf")):
+ if vnf_inst_params["member-vnf-index"] != member_vnf_index:
+ continue
+ if vnf_inst_params.get("vimAccountId"):
+ vim_account = vnf_inst_params.get("vimAccountId")
+
+ vnfr_update["vim-account-id"] = vim_account
+ vnfr_update_rollback["vim-account-id"] = vnfr.get("vim-account-id")
+
+ # get pdu
+ self._look_for_pdu(session, rollback, vnfr, vim_account)
+ # TODO change instantiation parameters to set network
+
def _create_nslcmop(self, session, nsInstanceId, operation, params):
now = time()
_id = str(uuid4())
raise EngineException("ns_instance '{}' cannot be '{}' because it is already instantiated".format(
nsInstanceId, operation), HTTPStatus.CONFLICT)
self._check_ns_operation(session, nsr, operation, indata)
+ if operation == "instantiate":
+ self._update_vnfrs(session, rollback, nsr, indata)
nslcmop_desc = self._create_nslcmop(session, nsInstanceId, operation, indata)
self.format_on_new(nslcmop_desc, session["project_id"], make_public=make_public)
_id = self.db.create("nslcmops", nslcmop_desc)
host: "mongo" # hostname or IP
port: 27017
name: "osm"
-user: "user"
-password: "password"
+# user: "user"
+# password: "password"
+# materpassword: "mpasswd"
loglevel: "DEBUG"
#logfile: /var/log/osm/nbi-database.log
if "application/json" in cherrypy.request.headers["Content-Type"]:
error_text = "Invalid json format "
indata = json.load(self.reader(cherrypy.request.body))
+ cherrypy.request.headers.pop("Content-File-MD5", None)
elif "application/yaml" in cherrypy.request.headers["Content-Type"]:
error_text = "Invalid yaml format "
indata = yaml.load(cherrypy.request.body)
+ cherrypy.request.headers.pop("Content-File-MD5", None)
elif "application/binary" in cherrypy.request.headers["Content-Type"] or \
"application/gzip" in cherrypy.request.headers["Content-Type"] or \
"application/zip" in cherrypy.request.headers["Content-Type"] or \
# 'application/yaml' for input format are available")
error_text = "Invalid yaml format "
indata = yaml.load(cherrypy.request.body)
+ cherrypy.request.headers.pop("Content-File-MD5", None)
else:
error_text = "Invalid yaml format "
indata = yaml.load(cherrypy.request.body)
+ cherrypy.request.headers.pop("Content-File-MD5", None)
if not indata:
indata = {}
rollback.reverse()
for rollback_item in rollback:
try:
- self.engine.del_item(**rollback_item, session=session, force=True)
+ if rollback_item.get("operation") == "set":
+ self.engine.db.set_one(rollback_item["topic"], {"_id": rollback_item["_id"]},
+ rollback_item["content"], fail_on_empty=False)
+ else:
+ self.engine.del_item(**rollback_item, session=session, force=True)
except Exception as e2:
rollback_error_text = "Rollback Exception {}: {}".format(rollback_item, e2)
cherrypy.log(rollback_error_text)
# TODO add more entries, e.g.: storage
cherrypy.tree.apps['/osm'].root.engine.start(engine_config)
cherrypy.tree.apps['/osm'].root.authenticator.start(engine_config)
- try:
- cherrypy.tree.apps['/osm'].root.engine.init_db(target_version=database_version)
- cherrypy.tree.apps['/osm'].root.authenticator.init_db(target_version=auth_database_version)
- except (EngineException, AuthException):
- pass
+ cherrypy.tree.apps['/osm'].root.engine.init_db(target_version=database_version)
+ cherrypy.tree.apps['/osm'].root.authenticator.init_db(target_version=auth_database_version)
# getenv('OSMOPENMANO_TENANT', None)
__author__ = "Alfonso Tierno, alfonso.tiernosepulveda@telefonica.com"
__date__ = "$2018-03-01$"
-__version__ = "0.2"
-version_date = "Jul 2018"
+__version__ = "0.3"
+version_date = "Oct 2018"
def usage():
del self.s.headers[key]
def test(self, name, description, method, url, headers, payload, expected_codes, expected_headers,
- expected_payload):
+ expected_payload, store_file=None):
"""
Performs an http request and check http code response. Exit if different than allowed. It get the returned id
that can be used by following test in the URL with {name} where name is the name of the test
:param payload: Can be a dict, transformed to json, a text or a file if starts with '@'
:param expected_codes: expected response codes, can be int, int tuple or int range
:param expected_headers: expected response headers, dict with key values
- :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip'
+ :param expected_payload: expected payload, 0 if empty, 'yaml', 'json', 'text', 'zip', 'octet-stream'
+ :param store_file: filename to store content
:return: requests response
"""
r = None
self.old_test_description = test_description
logger.warning(test_description)
stream = False
- # if expected_payload == "zip":
- # stream = True
+ if expected_payload in ("zip", "octet-string") or store_file:
+ stream = True
r = getattr(self.s, method.lower())(url, data=payload, headers=headers, verify=self.verify, stream=stream)
- logger.debug("RX {}: {}".format(r.status_code, r.text))
+ if expected_payload in ("zip", "octet-string") or store_file:
+ logger.debug("RX {}".format(r.status_code))
+ else:
+ logger.debug("RX {}: {}".format(r.status_code, r.text))
# check response
if expected_codes:
yaml.safe_load(r.text)
except Exception as e:
raise TestException("Expected yaml response payload, but got Exception {}".format(e))
- elif expected_payload == "zip":
+ elif expected_payload in ("zip", "octet-string"):
if len(r.content) == 0:
raise TestException("Expected some response payload, but got empty")
# try:
if len(r.content) == 0:
raise TestException("Expected some response payload, but got empty")
# r.text
+ if store_file:
+ with open(store_file, 'wb') as fd:
+ for chunk in r.iter_content(chunk_size=128):
+ fd.write(chunk)
+
location = r.headers.get("Location")
if location:
_id = location[location.rfind("/") + 1:]
self.vim_id = None
self.nsd_test = None
self.ns_test = None
+ self.ns_id = None
self.vnfds_test = []
+ self.vnfds_id = []
self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-3.0-three/2nd-hackfest/packages/"
self.vnfd_filenames = ("cirros_vnf.tar.gz",)
self.nsd_filename = "cirros_2vnf_ns.tar.gz"
self.uses_configuration = False
+ self.uss = {}
+ self.passwds = {}
+ self.cmds = {}
+ self.keys = {}
+ self.timeout = 120
+ self.passed_tests = 0
+ self.total_tests = 0
def create_descriptors(self, engine):
- temp_dir = os.path.dirname(__file__) + "/temp/"
+ temp_dir = os.path.dirname(os.path.abspath(__file__)) + "/temp/"
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
for vnfd_filename in self.vnfd_filenames:
headers = headers_zip_yaml
if self.step % 2 == 0:
# vnfd CREATE AND UPLOAD in one step:
- engine.test("DEPLOY{}".format(self.step), "Onboard VNFD in one step", "POST",
+ test_name = "DEPLOY{}".format(self.step)
+ engine.test(test_name, "Onboard VNFD in one step", "POST",
"/vnfpkgm/v1/vnf_packages_content", headers, "@b" + vnfd_filename_path, 201,
{"Location": "/vnfpkgm/v1/vnf_packages_content/", "Content-Type": "application/yaml"}, yaml)
- self.vnfds_test.append("DEPLOY" + str(self.step))
+ self.vnfds_test.append(test_name)
+ self.vnfds_id.append(engine.test_ids[test_name])
self.step += 1
else:
# vnfd CREATE AND UPLOAD ZIP
- engine.test("DEPLOY{}".format(self.step), "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages",
+ test_name = "DEPLOY{}".format(self.step)
+ engine.test(test_name, "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages",
headers_json, None, 201,
{"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
- self.vnfds_test.append("DEPLOY" + str(self.step))
+ self.vnfds_test.append(test_name)
+ self.vnfds_id.append(engine.test_ids[test_name])
self.step += 1
# location = r.headers["Location"]
# vnfd_id = location[location.rfind("/")+1:]
headers_yaml, ns_data_text, 201,
{"Location": "nslcm/v1/ns_instances/", "Content-Type": "application/yaml"}, "yaml")
self.ns_test = "DEPLOY{}".format(self.step)
+ self.ns_id = engine.test_ids[self.ns_test]
engine.test_ids[self.ns_test]
self.step += 1
r = engine.test("DEPLOY{}".format(self.step), "Instantiate NS step 2", "POST",
raise TestException("NS instantiate is not done after {} seconds".format(timeout_deploy))
self.step += 1
- def _wait_nslcmop_ready(self, engine, nslcmop_test, timeout_deploy):
+ def _wait_nslcmop_ready(self, engine, nslcmop_test, timeout_deploy, expected_fail=False):
wait = timeout
while wait >= 0:
r = engine.test("DEPLOY{}".format(self.step), "Wait to ns lcm operation complete", "GET",
200, r_header_json, "json")
nslcmop = r.json()
if "COMPLETED" in nslcmop["operationState"]:
+ if expected_fail:
+ raise TestException("NS terminate has success, expecting failing: {}".format(
+ nslcmop["detailed-status"]))
break
elif "FAILED" in nslcmop["operationState"]:
- raise TestException("NS terminate has failed: {}".format(nslcmop["detailed-status"]))
+ if not expected_fail:
+ raise TestException("NS terminate has failed: {}".format(nslcmop["detailed-status"]))
+ break
wait -= 5
sleep(5)
else:
if not isinstance(nslcmops, list) or nslcmops:
raise TestException("NS {} deleted but with ns_lcm_op_occ active: {}".format(self.ns_test, nslcmops))
- def test_ns(self, engine, test_osm):
- pass
+ def test_ns(self, engine, test_osm, commands=None, users=None, passwds=None, keys=None, timeout=0):
+
+ n = 0
+ r = engine.test("TEST_NS{}".format(n), "GET VNFR_IDs", "GET",
+ "/nslcm/v1/ns_instances/{}".format(self.ns_id), headers_json, None,
+ 200, r_header_json, "json")
+ n += 1
+ ns_data = r.json()
+
+ vnfr_list = ns_data['constituent-vnfr-ref']
+ time = 0
+
+ for vnfr_id in vnfr_list:
+ self.total_tests += 1
+ r = engine.test("TEST_NS{}".format(n), "GET IP_ADDRESS OF VNFR", "GET",
+ "/nslcm/v1/vnfrs/{}".format(vnfr_id), headers_json, None,
+ 200, r_header_json, "json")
+ n += 1
+ vnfr_data = r.json()
+
+ if vnfr_data.get("ip-address"):
+ name = "TEST_NS{}".format(n)
+ description = "Run tests in VNFR with IP {}".format(vnfr_data['ip-address'])
+ n += 1
+ test_description = "Test {} {}".format(name, description)
+ logger.warning(test_description)
+ vnf_index = str(vnfr_data["member-vnf-index-ref"])
+ while timeout >= time:
+ result, message = self.do_checks([vnfr_data["ip-address"]],
+ vnf_index=vnfr_data["member-vnf-index-ref"],
+ commands=commands.get(vnf_index), user=users.get(vnf_index),
+ passwd=passwds.get(vnf_index), key=keys.get(vnf_index))
+ if result == 1:
+ logger.warning(message)
+ break
+ elif result == 0:
+ time += 20
+ sleep(20)
+ elif result == -1:
+ logger.critical(message)
+ break
+ else:
+ time -= 20
+ logger.critical(message)
+ else:
+ logger.critical("VNFR {} has not mgmt address. Check failed".format(vnfr_id))
+
+ def do_checks(self, ip, vnf_index, commands=[], user=None, passwd=None, key=None):
+ try:
+ import urllib3
+ from pssh.clients import ParallelSSHClient
+ from pssh.utils import load_private_key
+ from ssh2 import exceptions as ssh2Exception
+ except ImportError as e:
+ logger.critical("package <pssh> or/and <urllib3> is not installed. Please add it with 'pip3 install "
+ "parallel-ssh' and/or 'pip3 install urllib3': {}".format(e))
+ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
+ try:
+ p_host = os.environ.get("PROXY_HOST")
+ p_user = os.environ.get("PROXY_USER")
+ p_password = os.environ.get("PROXY_PASSWD")
+
+ if key:
+ pkey = load_private_key(key)
+ else:
+ pkey = None
+
+ client = ParallelSSHClient(ip, user=user, password=passwd, pkey=pkey, proxy_host=p_host,
+ proxy_user=p_user, proxy_password=p_password, timeout=10, num_retries=0)
+ for cmd in commands:
+ output = client.run_command(cmd)
+ client.join(output)
+ if output[ip[0]].exit_code:
+ return -1, " VNFR {} could not be checked: {}".format(ip[0], output[ip[0]].stderr)
+ else:
+ self.passed_tests += 1
+ return 1, " Test successful"
+ except (ssh2Exception.ChannelFailure, ssh2Exception.SocketDisconnectError, ssh2Exception.SocketTimeout,
+ ssh2Exception.SocketRecvError) as e:
+ return 0, "Timeout accessing the VNFR {}: {}".format(ip[0], str(e))
+ except Exception as e:
+ return -1, "ERROR checking the VNFR {}: {}".format(ip[0], str(e))
def aditional_operations(self, engine, test_osm, manual_check):
pass
if manual_check:
input('NS has been deployed. Perform manual check and press enter to resume')
else:
- self.test_ns(engine, test_osm)
+ self.test_ns(engine, test_osm, self.cmds, self.uss, self.pss, self.keys, self.timeout)
self.aditional_operations(engine, test_osm, manual_check)
self.terminate(engine)
self.delete_descriptors(engine)
+ self.print_results()
+
+ def print_results(self):
+ print("\n\n\n--------------------------------------------")
+ print("TEST RESULTS:\n PASSED TESTS: {} - TOTAL TESTS: {}".format(self.total_tests, self.passed_tests))
+ print("--------------------------------------------")
class TestDeployHackfestCirros(TestDeploy):
super().__init__()
self.vnfd_filenames = ("cirros_vnf.tar.gz",)
self.nsd_filename = "cirros_2vnf_ns.tar.gz"
+ self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]}
+ self.uss = {'1': "cirros", '2': "cirros"}
+ self.pss = {'1': "cubswin:)", '2': "cubswin:)"}
- def run(self, engine, test_osm, manual_check, test_params=None):
- super().run(engine, test_osm, manual_check, test_params)
+
+class TestDeployHackfest1(TestDeploy):
+ description = "Load and deploy Hackfest_1_vnfd example"
+
+ def __init__(self):
+ super().__init__()
+ self.vnfd_filenames = ("hackfest_1_vnfd.tar.gz",)
+ self.nsd_filename = "hackfest_1_nsd.tar.gz"
+ # self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]}
+ # self.uss = {'1': "cirros", '2': "cirros"}
+ # self.pss = {'1': "cubswin:)", '2': "cubswin:)"}
+
+
+class TestDeployHackfestCirrosScaling(TestDeploy):
+ description = "Load and deploy Hackfest cirros_2vnf_ns example with scaling modifications"
+
+ def __init__(self):
+ super().__init__()
+ self.vnfd_filenames = ("cirros_vnf.tar.gz",)
+ self.nsd_filename = "cirros_2vnf_ns.tar.gz"
+
+ def create_descriptors(self, engine):
+ super().create_descriptors(engine)
+ # Modify VNFD to add scaling and count=2
+ payload = """
+ vdu:
+ "$id: 'cirros_vnfd-VM'":
+ count: 2
+ scaling-group-descriptor:
+ - name: "scale_cirros"
+ max-instance-count: 2
+ vdu:
+ - vdu-id-ref: cirros_vnfd-VM
+ count: 2
+ """
+ engine.test("DEPLOY{}".format(self.step), "Edit VNFD ", "PATCH",
+ "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfds_id[0]),
+ headers_yaml, payload, 204, None, None)
+ self.step += 1
+
+ def aditional_operations(self, engine, test_osm, manual_check):
+ if not test_osm:
+ return
+ # 2 perform scale out twice
+ payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_OUT, scaleByStepData: ' \
+ '{scaling-group-descriptor: scale_cirros, member-vnf-index: "1"}}}'
+ for i in range(0, 2):
+ engine.test("DEPLOY{}".format(self.step), "Execute scale action over NS", "POST",
+ "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload,
+ 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml")
+ nslcmop2_scale_out = "DEPLOY{}".format(self.step)
+ self._wait_nslcmop_ready(engine, nslcmop2_scale_out, timeout_deploy)
+ if manual_check:
+ input('NS scale out done. Check that two more vdus are there')
+ # TODO check automatic
+
+ # 2 perform scale in
+ payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_IN, scaleByStepData: ' \
+ '{scaling-group-descriptor: scale_cirros, member-vnf-index: "1"}}}'
+ for i in range(0, 2):
+ engine.test("DEPLOY{}".format(self.step), "Execute scale IN action over NS", "POST",
+ "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload,
+ 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml")
+ nslcmop2_scale_in = "DEPLOY{}".format(self.step)
+ self._wait_nslcmop_ready(engine, nslcmop2_scale_in, timeout_deploy)
+ if manual_check:
+ input('NS scale in done. Check that two less vdus are there')
+ # TODO check automatic
+
+ # perform scale in that must fail as reached limit
+ engine.test("DEPLOY{}".format(self.step), "Execute scale IN out of limit action over NS", "POST",
+ "/nslcm/v1/ns_instances/<{}>/scale".format(self.ns_test), headers_yaml, payload,
+ 201, {"Location": "nslcm/v1/ns_lcm_op_occs/", "Content-Type": "application/yaml"}, "yaml")
+ nslcmop2_scale_in = "DEPLOY{}".format(self.step)
+ self._wait_nslcmop_ready(engine, nslcmop2_scale_in, timeout_deploy, expected_fail=True)
class TestDeployIpMac(TestDeploy):
self.nsd_filename = "scenario_2vdu_set_ip_mac.yaml"
self.descriptor_url = \
"https://osm.etsi.org/gitweb/?p=osm/RO.git;a=blob_plain;f=test/RO_tests/v3_2vdu_set_ip_mac/"
+ self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]}
+ self.uss = {'1': "osm", '2': "osm"}
+ self.pss = {'1': "osm4u", '2': "osm4u"}
+ self.timeout = 360
def run(self, engine, test_osm, manual_check, test_params=None):
# super().run(engine, test_osm, manual_check, test_params)
"interface": [
{
"name": "iface21",
- "ip-address": "10.31.31.21",
+ "ip-address": "10.31.31.22",
"mac-address": "52:33:44:55:66:21"
},
],
},
]
}
+
super().run(engine, test_osm, manual_check, test_params={"ns-config": instantiation_params})
self.vnfd_filenames = ("hackfest_4_vnfd.tar.gz",)
self.nsd_filename = "hackfest_4_nsd.tar.gz"
self.uses_configuration = True
+ self.cmds = {'1': ['ls -lrt', ], '2': ['ls -lrt', ]}
+ self.uss = {'1': "ubuntu", '2': "ubuntu"}
+ self.pss = {'1': "osm4u", '2': "osm4u"}
def create_descriptors(self, engine):
super().create_descriptors(engine)
"/vnfpkgm/v1/vnf_packages/<{}>".format(self.vnfds_test[0]), headers_yaml, payload, 204, None, None)
self.step += 1
- def run(self, engine, test_osm, manual_check, test_params=None):
- super().run(engine, test_osm, manual_check, test_params)
-
class TestDeployHackfest3Charmed(TestDeploy):
description = "Load and deploy Hackfest 3charmed_ns example. Modifies it for adding scaling and performs " \
self.vnfd_filenames = ("hackfest_3charmed_vnfd.tar.gz",)
self.nsd_filename = "hackfest_3charmed_nsd.tar.gz"
self.uses_configuration = True
+ self.cmds = {'1': [''], '2': ['ls -lrt /home/ubuntu/first-touch', ]}
+ self.uss = {'1': "ubuntu", '2': "ubuntu"}
+ self.pss = {'1': "osm4u", '2': "osm4u"}
# def create_descriptors(self, engine):
# super().create_descriptors(engine)
if manual_check:
input('NS service primitive has been executed. Check that file /home/ubuntu/OSMTESTNBI is present at '
'TODO_PUT_IP')
- # TODO check automatic
+ else:
+ cmds = {'1': [''], '2': ['ls -lrt /home/ubuntu/OSMTESTNBI', ]}
+ uss = {'1': "ubuntu", '2': "ubuntu"}
+ pss = {'1': "osm4u", '2': "osm4u"}
+ self.test_ns(engine, test_osm, cmds, uss, pss, self.keys, self.timeout)
# # 2 perform scale out
# payload = '{scaleType: SCALE_VNF, scaleVnfData: {scaleVnfType: SCALE_OUT, scaleByStepData: ' \
# input('NS scale in done. Check that file /home/ubuntu/touched is updated and new VM is deleted')
# # TODO check automatic
+
+class TestDescriptors:
+ description = "Test VNFD, NSD, PDU descriptors CRUD and dependencies"
+
+ def __init__(self):
+ self.step = 0
+ self.vnfd_filename = "hackfest_3charmed_vnfd.tar.gz"
+ self.nsd_filename = "hackfest_3charmed_nsd.tar.gz"
+ self.descriptor_url = "https://osm-download.etsi.org/ftp/osm-3.0-three/2nd-hackfest/packages/"
+ self.vnfd_id = None
+ self.nsd_id = None
+
def run(self, engine, test_osm, manual_check, test_params=None):
- super().run(engine, test_osm, manual_check, test_params)
+ engine.get_autorization()
+ temp_dir = os.path.dirname(os.path.abspath(__file__)) + "/temp/"
+ if not os.path.exists(temp_dir):
+ os.makedirs(temp_dir)
+
+ # download files
+ for filename in (self.vnfd_filename, self.nsd_filename):
+ filename_path = temp_dir + filename
+ if not os.path.exists(filename_path):
+ with open(filename_path, "wb") as file:
+ response = requests.get(self.descriptor_url + filename)
+ if response.status_code >= 300:
+ raise TestException("Error downloading descriptor from '{}': {}".format(
+ self.descriptor_url + filename, response.status_code))
+ file.write(response.content)
+
+ vnfd_filename_path = temp_dir + self.vnfd_filename
+ nsd_filename_path = temp_dir + self.nsd_filename
+
+ # vnfd CREATE AND UPLOAD in one step:
+ test_name = "DESCRIPTOR{}".format(self.step)
+ engine.test(test_name, "Onboard VNFD in one step", "POST",
+ "/vnfpkgm/v1/vnf_packages_content", headers_zip_yaml, "@b" + vnfd_filename_path, 201,
+ {"Location": "/vnfpkgm/v1/vnf_packages_content/", "Content-Type": "application/yaml"}, "yaml")
+ self.vnfd_id = engine.test_ids[test_name]
+ self.step += 1
+
+ # get vnfd descriptor
+ engine.test("DESCRIPTOR" + str(self.step), "Get VNFD descriptor", "GET",
+ "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_id), headers_yaml, None, 200, r_header_yaml, "yaml")
+ self.step += 1
+
+ # get vnfd file descriptor
+ engine.test("DESCRIPTOR" + str(self.step), "Get VNFD file descriptor", "GET",
+ "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(self.vnfd_id), headers_text, None, 200,
+ r_header_text, "text", temp_dir+"vnfd-yaml")
+ self.step += 1
+ # TODO compare files: diff vnfd-yaml hackfest_3charmed_vnfd/hackfest_3charmed_vnfd.yaml
+
+ # get vnfd zip file package
+ engine.test("DESCRIPTOR" + str(self.step), "Get VNFD zip package", "GET",
+ "/vnfpkgm/v1/vnf_packages/{}/package_content".format(self.vnfd_id), headers_zip, None, 200,
+ r_header_zip, "zip", temp_dir+"vnfd-zip")
+ self.step += 1
+ # TODO compare files: diff vnfd-zip hackfest_3charmed_vnfd.tar.gz
+
+ # get vnfd artifact
+ engine.test("DESCRIPTOR" + str(self.step), "Get VNFD artifact package", "GET",
+ "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/osm.png".format(self.vnfd_id), headers_zip, None, 200,
+ r_header_octect, "octet-string", temp_dir+"vnfd-icon")
+ self.step += 1
+ # TODO compare files: diff vnfd-icon hackfest_3charmed_vnfd/icons/osm.png
+
+ # nsd CREATE AND UPLOAD in one step:
+ test_name = "DESCRIPTOR{}".format(self.step)
+ engine.test(test_name, "Onboard NSD in one step", "POST",
+ "/nsd/v1/ns_descriptors_content", headers_zip_yaml, "@b" + nsd_filename_path, 201,
+ {"Location": "/nsd/v1/ns_descriptors_content/", "Content-Type": "application/yaml"}, "yaml")
+ self.nsd_id = engine.test_ids[test_name]
+ self.step += 1
+
+ # get nsd descriptor
+ engine.test("DESCRIPTOR" + str(self.step), "Get NSD descriptor", "GET",
+ "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, None, 200, r_header_yaml, "yaml")
+ self.step += 1
+
+ # get nsd file descriptor
+ engine.test("DESCRIPTOR" + str(self.step), "Get NSD file descriptor", "GET",
+ "/nsd/v1/ns_descriptors/{}/nsd".format(self.nsd_id), headers_text, None, 200,
+ r_header_text, "text", temp_dir+"nsd-yaml")
+ self.step += 1
+ # TODO compare files: diff nsd-yaml hackfest_3charmed_nsd/hackfest_3charmed_nsd.yaml
+
+ # get nsd zip file package
+ engine.test("DESCRIPTOR" + str(self.step), "Get NSD zip package", "GET",
+ "/nsd/v1/ns_descriptors/{}/nsd_content".format(self.nsd_id), headers_zip, None, 200,
+ r_header_zip, "zip", temp_dir+"nsd-zip")
+ self.step += 1
+ # TODO compare files: diff nsd-zip hackfest_3charmed_nsd.tar.gz
+
+ # get nsd artifact
+ engine.test("DESCRIPTOR" + str(self.step), "Get NSD artifact package", "GET",
+ "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm.png".format(self.nsd_id), headers_zip, None, 200,
+ r_header_octect, "octet-string", temp_dir+"nsd-icon")
+ self.step += 1
+ # TODO compare files: diff nsd-icon hackfest_3charmed_nsd/icons/osm.png
+
+ # vnfd DELETE
+ test_rest.test("DESCRIPTOR" + str(self.step), "Delete VNFD conflict", "DELETE",
+ "/vnfpkgm/v1/vnf_packages/{}".format(self.vnfd_id), headers_yaml, None, 409, None, None)
+ self.step += 1
+
+ test_rest.test("DESCRIPTOR" + str(self.step), "Delete VNFD force", "DELETE",
+ "/vnfpkgm/v1/vnf_packages/{}?FORCE=TRUE".format(self.vnfd_id), headers_yaml, None, 204, None, 0)
+ self.step += 1
+
+ # nsd DELETE
+ test_rest.test("DESCRIPTOR" + str(self.step), "Delete NSD", "DELETE",
+ "/nsd/v1/ns_descriptors/{}".format(self.nsd_id), headers_yaml, None, 204, None, 0)
+ self.step += 1
if __name__ == "__main__":
"VIM-SDN": TestVIMSDN,
"Deploy-Custom": TestDeploy,
"Deploy-Hackfest-Cirros": TestDeployHackfestCirros,
+ "Deploy-Hackfest-Cirros-Scaling": TestDeployHackfestCirrosScaling,
"Deploy-Hackfest-3Charmed": TestDeployHackfest3Charmed,
"Deploy-Hackfest-4": TestDeployHackfest4,
"Deploy-CirrosMacIp": TestDeployIpMac,
+ "TestDescriptors": TestDescriptors,
+ "TestDeployHackfest1": TestDeployHackfest1,
+ # "Deploy-MultiVIM": TestDeployMultiVIM,
}
test_to_do = []
test_params = {}
exit(0)
# get token
-
- # # tests once authorized
- # for t in test_authorized_list:
- # test_rest.test(*t)
- #
- # # tests admin
- # for t in test_admin_list1:
- # test_rest.test(*t)
- #
- # # vnfd CREATE
- # r = test_rest.test("VNFD1", "Onboard VNFD step 1", "POST", "/vnfpkgm/v1/vnf_packages", headers_json, None,
- # 201, {"Location": "/vnfpkgm/v1/vnf_packages/", "Content-Type": "application/json"}, "json")
- # location = r.headers["Location"]
- # vnfd_id = location[location.rfind("/")+1:]
- # # print(location, vnfd_id)
- #
- # # vnfd UPLOAD test
- # r = test_rest.test("VNFD2", "Onboard VNFD step 2 as TEXT", "PUT",
- # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
- # r_header_text, "@./cirros_vnf/cirros_vnfd.yaml", 204, None, 0)
- #
- # # vnfd SHOW OSM format
- # r = test_rest.test("VNFD3", "Show VNFD OSM format", "GET",
- # "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
- # headers_json, None, 200, r_header_json, "json")
- #
- # # vnfd SHOW text
- # r = test_rest.test("VNFD4", "Show VNFD SOL005 text", "GET",
- # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
- # headers_text, None, 200, r_header_text, "text")
- #
- # # vnfd UPLOAD ZIP
- # makedirs("temp", exist_ok=True)
- # tar = tarfile.open("temp/cirros_vnf.tar.gz", "w:gz")
- # tar.add("cirros_vnf")
- # tar.close()
- # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
- # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
- # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
- #
- # # vnfd SHOW OSM format
- # r = test_rest.test("VNFD6", "Show VNFD OSM format", "GET",
- # "/vnfpkgm/v1/vnf_packages_content/{}".format(vnfd_id),
- # headers_json, None, 200, r_header_json, "json")
- #
- # # vnfd SHOW zip
- # r = test_rest.test("VNFD7", "Show VNFD SOL005 zip", "GET",
- # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
- # headers_zip, None, 200, r_header_zip, "zip")
- # # vnfd SHOW descriptor
- # r = test_rest.test("VNFD8", "Show VNFD descriptor", "GET",
- # "/vnfpkgm/v1/vnf_packages/{}/vnfd".format(vnfd_id),
- # headers_text, None, 200, r_header_text, "text")
- # # vnfd SHOW actifact
- # r = test_rest.test("VNFD9", "Show VNFD artifact", "GET",
- # "/vnfpkgm/v1/vnf_packages/{}/artifacts/icons/cirros-64.png".format(vnfd_id),
- # headers_text, None, 200, r_header_octect, "text")
- #
- # # # vnfd DELETE
- # # r = test_rest.test("VNFD10", "Delete VNFD SOL005 text", "DELETE",
- # # "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
- # # headers_yaml, None, 204, None, 0)
- #
- # # nsd CREATE
- # r = test_rest.test("NSD1", "Onboard NSD step 1", "POST", "/nsd/v1/ns_descriptors", headers_json, None,
- # 201, {"Location": "/nsd/v1/ns_descriptors/", "Content-Type": "application/json"}, "json")
- # location = r.headers["Location"]
- # nsd_id = location[location.rfind("/")+1:]
- # # print(location, nsd_id)
- #
- # # nsd UPLOAD test
- # r = test_rest.test("NSD2", "Onboard NSD with missing vnfd", "PUT",
- # "/nsd/v1/ns_descriptors/<>/nsd_content?constituent-vnfd.0.vnfd-id-ref"
- # "=NONEXISTING-VNFD".format(nsd_id),
- # r_header_text, "@./cirros_ns/cirros_nsd.yaml", 409, r_header_yaml, "yaml")
- #
- # # # VNF_CREATE
- # # r = test_rest.test("VNFD5", "Onboard VNFD step 3 replace with ZIP", "PUT",
- # # "/vnfpkgm/v1/vnf_packages/{}/package_content".format(vnfd_id),
- # # r_header_zip, "@b./temp/cirros_vnf.tar.gz", 204, None, 0)
- #
- # r = test_rest.test("NSD2", "Onboard NSD step 2 as TEXT", "PUT",
- # "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
- # r_header_text, "@./cirros_ns/cirros_nsd.yaml", 204, None, 0)
- #
- # # nsd SHOW OSM format
- # r = test_rest.test("NSD3", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
- # headers_json, None, 200, r_header_json, "json")
- #
- # # nsd SHOW text
- # r = test_rest.test("NSD4", "Show NSD SOL005 text", "GET",
- # "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
- # headers_text, None, 200, r_header_text, "text")
- #
- # # nsd UPLOAD ZIP
- # makedirs("temp", exist_ok=True)
- # tar = tarfile.open("temp/cirros_ns.tar.gz", "w:gz")
- # tar.add("cirros_ns")
- # tar.close()
- # r = test_rest.test("NSD5", "Onboard NSD step 3 replace with ZIP", "PUT",
- # "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
- # r_header_zip, "@b./temp/cirros_ns.tar.gz", 204, None, 0)
- #
- # # nsd SHOW OSM format
- # r = test_rest.test("NSD6", "Show NSD OSM format", "GET", "/nsd/v1/ns_descriptors_content/{}".format(nsd_id),
- # headers_json, None, 200, r_header_json, "json")
- #
- # # nsd SHOW zip
- # r = test_rest.test("NSD7","Show NSD SOL005 zip","GET", "/nsd/v1/ns_descriptors/{}/nsd_content".format(nsd_id),
- # headers_zip, None, 200, r_header_zip, "zip")
- #
- # # nsd SHOW descriptor
- # r = test_rest.test("NSD8", "Show NSD descriptor", "GET", "/nsd/v1/ns_descriptors/{}/nsd".format(nsd_id),
- # headers_text, None, 200, r_header_text, "text")
- # # nsd SHOW actifact
- # r = test_rest.test("NSD9", "Show NSD artifact", "GET",
- # "/nsd/v1/ns_descriptors/{}/artifacts/icons/osm_2x.png".format(nsd_id),
- # headers_text, None, 200, r_header_octect, "text")
- #
- # # vnfd DELETE
- # r = test_rest.test("VNFD10", "Delete VNFD conflict", "DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
- # headers_yaml, None, 409, r_header_yaml, "yaml")
- #
- # # nsd DELETE
- # r = test_rest.test("NSD10", "Delete NSD SOL005 text", "DELETE", "/nsd/v1/ns_descriptors/{}".format(nsd_id),
- # headers_yaml, None, 204, None, 0)
- #
- # # vnfd DELETE
- # r = test_rest.test("VNFD10","Delete VNFD SOL005 text","DELETE", "/vnfpkgm/v1/vnf_packages/{}".format(vnfd_id),
- # headers_yaml, None, 204, None, 0)
print("PASS")
except TestException as e:
print("RX {}: {}".format(r.status_code, r.text))
if "id" in response:
print("---\nid: {}".format(response["id"]))
- except Exception as e:
+ except Exception:
raise
# Basis schemas
patern_name = "^[ -~]+$"
-nameshort_schema = {"type": "string", "minLength": 1, "maxLength": 60, "pattern": "^[^,;()\.\$'\"]+$"}
+nameshort_schema = {"type": "string", "minLength": 1, "maxLength": 60, "pattern": "^[^,;()\\.\\$'\"]+$"}
passwd_schema = {"type": "string", "minLength": 1, "maxLength": 60}
name_schema = {"type": "string", "minLength": 1, "maxLength": 255, "pattern": "^[^,;()'\"]+$"}
string_schema = {"type": "string", "minLength": 1, "maxLength": 255}
# "pattern": "^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$"
id_schema = {"type": "string", "pattern": "^[a-fA-F0-9]{8}(-[a-fA-F0-9]{4}){3}-[a-fA-F0-9]{12}$"}
time_schema = {"type": "string", "pattern": "^[0-9]{4}-[0-1][0-9]-[0-3][0-9]T[0-2][0-9]([0-5]:){2}"}
-pci_schema = {"type": "string", "pattern": "^[0-9a-fA-F]{4}(:[0-9a-fA-F]{2}){2}\.[0-9a-fA-F]$"}
+pci_schema = {"type": "string", "pattern": "^[0-9a-fA-F]{4}(:[0-9a-fA-F]{2}){2}\\.[0-9a-fA-F]$"}
+# allows [] for wildcards. For that reason huge length limit is set
+pci_extended_schema = {"type": "string", "pattern": "^[0-9a-fA-F.:-\\[\\]]{12,40}$"}
http_schema = {"type": "string", "pattern": "^https?://[^'\"=]+$"}
bandwidth_schema = {"type": "string", "pattern": "^[0-9]+ *([MG]bps)?$"}
memory_schema = {"type": "string", "pattern": "^[0-9]+ *([MG]i?[Bb])?$"}
integer0_schema = {"type": "integer", "minimum": 0}
integer1_schema = {"type": "integer", "minimum": 1}
-path_schema = {"type": "string", "pattern": "^(\.){0,2}(/[^/\"':{}\(\)]+)+$"}
+path_schema = {"type": "string", "pattern": "^(\\.){0,2}(/[^/\"':{}\\(\\)]+)+$"}
vlan_schema = {"type": "integer", "minimum": 1, "maximum": 4095}
vlan1000_schema = {"type": "integer", "minimum": 1000, "maximum": 4095}
mac_schema = {"type": "string",
dpid_Schema = {"type": "string", "pattern": "^[0-9a-fA-F]{2}(:[0-9a-fA-F]{2}){7}$"}
# mac_schema={"type":"string", "pattern":"^([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}$"}
ip_schema = {"type": "string",
- "pattern": "^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"}
+ "pattern": "^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"}
ip_prefix_schema = {"type": "string",
- "pattern": "^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}"
+ "pattern": "^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}"
"(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/(30|[12]?[0-9])$"}
port_schema = {"type": "integer", "minimum": 1, "maximum": 65534}
object_schema = {"type": "object"}
array_edition_schema = {
"type": "object",
"patternProperties": {
- "^\$": "Any"
+ "^\\$": "Any"
},
"additionalProperties": False,
"minProperties": 1,
"nsDescription": {"oneOf": [description_schema, {"type": "null"}]},
"nsdId": id_schema,
"vimAccountId": id_schema,
- "ssh_keys": {"type": "string"},
+ "ssh_keys": {"type": "array", "items": {"type": "string"}},
"nsr_id": id_schema,
+ "vduImage": name_schema,
"vnf": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"properties": {
- "pci": pci_schema,
+ "pci": pci_extended_schema,
"switch_port": nameshort_schema,
"switch_mac": mac_schema
},
basepython = python3
deps = flake8
commands = flake8 osm_nbi/ setup.py --max-line-length 120 \
- --exclude .svn,CVS,.gz,.git,__pycache__,.tox,local,temp,vnfd_catalog.py,nsd_catalog.py --ignore W291,W293,E226,E402
+ --exclude .svn,CVS,.gz,.git,__pycache__,.tox,local,temp,osm_im --ignore W291,W293,E226,E402,W504
[testenv:build]
basepython = python3