From 6ce7cc879dda3b729b10d563bd26df613dc9f70f Mon Sep 17 00:00:00 2001 From: tierno Date: Mon, 29 Jan 2018 18:44:04 +0100 Subject: [PATCH] Initial commit for lightweight build Change-Id: I22c7c64b76327822dc8eb8a40de70b1821b4cac2 Signed-off-by: tierno --- lcm/ROclient.py | 880 ++++++++++++++++++++++++++++++++++++++++++++++++ lcm/dbbase.py | 35 ++ lcm/dbmemory.py | 116 +++++++ lcm/lcm.py | 380 +++++++++++++++++++++ 4 files changed, 1411 insertions(+) create mode 100644 lcm/ROclient.py create mode 100644 lcm/dbbase.py create mode 100644 lcm/dbmemory.py create mode 100644 lcm/lcm.py diff --git a/lcm/ROclient.py b/lcm/ROclient.py new file mode 100644 index 00000000..84ce7aa1 --- /dev/null +++ b/lcm/ROclient.py @@ -0,0 +1,880 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +## +# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U. +# This file is part of openmano +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: nfvlabs@tid.es +## + +""" +asyncio RO python client to interact with RO-server +""" + +import asyncio +import aiohttp + +import json +import yaml +import logging +import sys +from urllib.parse import quote +from uuid import UUID + +__author__ = "Alfonso Tierno, Pablo Montes" +__date__ = "$09-Jan-2018 09:09:48$" +__version__ = "0.1.0-r470" +version_date = "Jan 2018" +requests = None + +class ROClientException(Exception): + def __init__(self, message, http_code=400): + self.http_code = http_code + Exception.__init__(self, message) + """Common Exception for all openmano client exceptions""" + + +def remove_envelop(item, indata=None): + """ + Obtain the useful data removing the envelop. It goes through the vnfd or nsd catalog and returns the + vnfd or nsd content + :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' + :param indata: Content to be inspected + :return: the useful part of indata (a reference, not a new dictionay) plus boolean indicating if it was enveloped + """ + clean_indata = indata + enveloped = False + if not indata: + return {}, False + if item == "vnfd": + if clean_indata.get('vnfd:vnfd-catalog'): + enveloped = True + clean_indata = clean_indata['vnfd:vnfd-catalog'] + elif clean_indata.get('vnfd-catalog'): + enveloped = True + clean_indata = clean_indata['vnfd-catalog'] + if clean_indata.get('vnfd'): + if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1: + raise ROClientException("'vnfd' must be a list only one element") + clean_indata = clean_indata['vnfd'][0] + elif item == "nsd": + if clean_indata.get('nsd:nsd-catalog'): + enveloped = True + clean_indata = clean_indata['nsd:nsd-catalog'] + elif clean_indata.get('nsd-catalog'): + enveloped = True + clean_indata = clean_indata['nsd-catalog'] + if clean_indata.get('nsd'): + if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1: + raise ROClientException("'nsd' must be a list only one element") + clean_indata = clean_indata['nsd'][0] + elif item == "tenant": + if len(indata) == 1 and "tenant" in indata: + enveloped = True + clean_indata = indata["tenant"] + elif item == "vim" or item == "datacenter": + if len(indata) == 1 and "datacenter" in indata: + enveloped = True + clean_indata = indata["datacenter"] + elif item == "ns" or item == "instances": + if len(indata) == 1 and "instance" in indata: + enveloped = True + clean_indata = indata["instance"] + else: + assert False, "remove_envelop with unknown item {}".format(item) + + return clean_indata, enveloped + + +class ROClient: + headers_req = {'Accept': 'application/yaml', 'content-type': 'application/yaml'} + client_to_RO = {'tenant': 'tenants', 'vim': 'datacenters', 'vnfd': 'vnfs', 'nsd': 'scenarios', + 'ns': 'instances'} + mandatory_for_create = { + 'tenant': ("name", ), + 'vim': ("name", "vim_url"), + 'vnfd': ("name", "id", "connection-point", "vdu"), + 'nsd': ("name", "id", "constituent-vnfd"), + 'ns': ("name", "scenario", "datacenter"), + } + timeout_large = 120 + timeout_short = 30 + + def __init__(self, loop, endpoint_url, **kwargs): + self.loop = loop + self.endpoint_url = endpoint_url + + self.username = kwargs.get("username") + self.password = kwargs.get("password") + self.tenant_id_name = kwargs.get("tenant") + self.tenant = None + self.datacenter_id_name = kwargs.get("datacenter") + self.datacenter = None + self.logger = logging.getLogger(kwargs.get('logger', 'ROClient')) + if kwargs.get("debug"): + self.logger.setLevel(logging.DEBUG) + global requests + requests = kwargs.get("TODO remove") + + def __getitem__(self, index): + if index == 'tenant': + return self.tenant_id_name + elif index == 'datacenter': + return self.datacenter_id_name + elif index == 'username': + return self.username + elif index == 'password': + return self.password + elif index == 'endpoint_url': + return self.endpoint_url + else: + raise KeyError("Invalid key '%s'" %str(index)) + + def __setitem__(self,index, value): + if index == 'tenant': + self.tenant_id_name = value + elif index == 'datacenter': + self.datacenter_id_name = value + elif index == 'username': + self.username = value + elif index == 'password': + self.password = value + elif index == 'endpoint_url': + self.endpoint_url = value + else: + raise KeyError("Invalid key '{}'".format(index)) + self.tenant = None # force to reload tenant with different credentials + self.datacenter = None # force to reload datacenter with different credentials + + def _parse(self, descriptor, descriptor_format, response=False): + #try yaml + if descriptor_format and descriptor_format != "json" and descriptor_format != "yaml": + raise ROClientException("'descriptor_format' must be a 'json' or 'yaml' text") + if descriptor_format != "json": + try: + return yaml.load(descriptor) + except yaml.YAMLError as exc: + error_pos = "" + if hasattr(exc, 'problem_mark'): + mark = exc.problem_mark + error_pos = " at line:{} column:{}s".format(mark.line+1, mark.column+1) + error_text = "yaml format error" + error_pos + elif descriptor_format != "yaml": + try: + return json.loads(descriptor) + except Exception as e: + if response: + error_text = "json format error" + str(e) + + if response: + raise ROClientException(error_text) + raise ROClientException(error_text) + + def _parse_yaml(self, descriptor, response=False): + try: + return yaml.load(descriptor) + except yaml.YAMLError as exc: + error_pos = "" + if hasattr(exc, 'problem_mark'): + mark = exc.problem_mark + error_pos = " at line:{} column:{}s".format(mark.line+1, mark.column+1) + error_text = "yaml format error" + error_pos + if response: + raise ROClientException(error_text) + raise ROClientException(error_text) + + @staticmethod + def check_if_uuid(uuid_text): + """ + Check if text correspond to an uuid foramt + :param uuid_text: + :return: True if it is an uuid False if not + """ + try: + UUID(uuid_text) + return True + except (ValueError, TypeError): + return False + + @staticmethod + def _create_envelop(item, indata=None): + """ + Returns a new dict that incledes indata with the expected envelop + :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' + :param indata: Content to be enveloped + :return: a new dic with {: {indata} } where envelop can be e.g. tenant, datacenter, ... + """ + if item == "vnfd": + return {'vnfd-catalog': {'vnfd': [indata]}} + elif item == "nsd": + return {'nsd-catalog': {'nsd': [indata]}} + elif item == "tenant": + return {'tenant': indata} + elif item == "vim" or item == "datacenter": + return {'datacenter': indata} + elif item == "ns" or item == "instances": + return {'instance': indata} + else: + assert False, "_create_envelop with unknown item {}".format(item) + + @staticmethod + def check_ns_status(ns_descriptor): + """ + Inspect RO instance descriptor and indicates the status + :param ns_descriptor: instance descriptor obtained with self.show("ns", ) + :return: status, message: status can be BUILD,ACTIVE,ERROR, message is a text message + """ + net_total = 0 + vm_total = 0 + net_done = 0 + vm_done = 0 + + for net in ns_descriptor["nets"]: + net_total += 1 + if net["status"] == "ERROR": + return "ERROR", net["error_msg"] + elif net["status"] == "ACTIVE": + net_done += 1 + for vnf in ns_descriptor["vnfs"]: + for vm in vnf["vms"]: + vm_total += 1 + if vm["status"] == "ERROR": + return "ERROR", vm["error_msg"] + elif vm["status"] == "ACTIVE": + vm_done += 1 + + if net_total == net_done and vm_total == vm_done: + return "ACTIVE", "VMs {}, networks: {}".format(vm_total, net_total) + else: + return "BUILD", "VMs: {}/{}, networks: {}/{}".format(vm_done, vm_total, net_done, net_total) + + @staticmethod + def get_ns_vnf_ip(ns_descriptor): + """ + Get a dict with the IPs of every vnf. + :param ns_descriptor: instance descriptor obtained with self.show("ns", ) + :return: dict iwth key member_vnf_index, value ip_address + """ + ns_ip={} + for vnf in ns_descriptor["vnfs"]: + ns_ip[vnf["member_vnf_index"]] = vnf["ip_address"] + #uuid sce_vnf_id + # vnf[mgmt_access]: '{interface_id: cf3cbf00-385c-49b4-9a3f-b400b7b15dc6, vm_id: d0dd22a9-91ef-46f1-8e8f-8cf4b2d5b2d7}' + # vnf[vms] + return ns_ip + + async def get_list(self, item, all_tenants=False, filter_by=None): + """ + Obtain a list of items filtering by the specigy filter_by. + :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' + :param all_tenants: True if not filtering by tenant. Only allowed for admin + :param filter_by: dictionary with filtering + :return: a list of dict. It can be empty. Raises ROClientException on Error, + """ + if item not in self.client_to_RO: + raise ROClientException("Invalid item {}".format(item)) + if item == 'tenant': + all_tenants = None + with aiohttp.ClientSession(loop=self.loop) as session: + content = await self._list_item(session, self.client_to_RO[item], all_tenants=all_tenants, + filter_dict=filter_by) + if isinstance(content, dict): + if len(content) == 1: + return content.values()[0] + else: + raise ROClientException("Output not a list neither dict with len equal 1", http_code=500) + return content + + async def _get_item_uuid(self, session, item, item_id_name, all_tenants=False): + if all_tenants: + tenant_text = "/any" + elif all_tenants is None: + tenant_text = "" + else: + if not self.tenant: + await self._get_tenant(session) + tenant_text = "/" + self.tenant + + item_id = 0 + url = "{}{}/{}".format(self.endpoint_url, tenant_text, item) + if self.check_if_uuid(item_id_name): + item_id = item_id_name + url += "/" + item_id_name + elif item_id_name and item_id_name.startswith("'") and item_id_name.endswith("'"): + item_id_name = item_id_name[1:-1] + self.logger.debug("openmano GET %s", url) + with aiohttp.Timeout(self.timeout_short): + async with session.get(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status == 404: # NOT_FOUND + raise ROClientException("No {} found with id '{}'".format(item[:-1], item_id_name), + http_code=404) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + content = self._parse_yaml(response_text, response=True) + + if item_id: + return item_id + desc = content[item] + assert isinstance(desc, list), "_get_item_uuid get a non dict with a list inside {}".format(type(desc)) + uuid = None + for i in desc: + if item_id_name and i["name"] != item_id_name: + continue + if uuid: # found more than one + raise ROClientException( + "Found more than one {} with name '{}'. uuid must be used".format(item, item_id_name), + http_code=404) + uuid = i["uuid"] + if not uuid: + raise ROClientException("No {} found with name '{}'".format(item[:-1], item_id_name), http_code=404) + return uuid + + async def _get_item(self, session, item, item_id_name, all_tenants=False): + if all_tenants: + tenant_text = "/any" + elif all_tenants is None: + tenant_text = "" + else: + if not self.tenant: + await self._get_tenant(session) + tenant_text = "/" + self.tenant + + if self.check_if_uuid(item_id_name): + uuid = item_id_name + else: + # check that exist + uuid = self._get_item_uuid(session, item, item_id_name, all_tenants) + + url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid) + self.logger.debug("GET %s", url ) + with aiohttp.Timeout(self.timeout_short): + async with session.get(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + + return self._parse_yaml(response_text, response=True) + + async def _get_tenant(self, session): + if not self.tenant: + self.tenant = await self._get_item_uuid(session, "tenants", self.tenant_id_name, None) + return self.tenant + + async def _get_datacenter(self, session): + if not self.tenant: + await self._get_tenant(session) + if not self.datacenter: + self.datacenter = await self._get_item_uuid(session, "datacenters", self.datacenter_id_name, True) + return self.datacenter + + async def _create_item(self, session, item, descriptor, all_tenants=False): + if all_tenants: + tenant_text = "/any" + elif all_tenants is None: + tenant_text = "" + else: + if not self.tenant: + await self._get_tenant(session) + tenant_text = "/" + self.tenant + payload_req = yaml.safe_dump(descriptor) + + api_version_text = "" + if item == "vnfs": + # assumes version v3 only + api_version_text = "/v3" + item = "vnfd" + elif item == "scenarios": + # assumes version v3 only + api_version_text = "/v3" + item = "nsd" + + #print payload_req + + url = "{}{apiver}{tenant}/{item}".format(self.endpoint_url, apiver=api_version_text, tenant=tenant_text, + item=item) + self.logger.debug("openmano POST %s %s", url, payload_req) + with aiohttp.Timeout(self.timeout_large): + async with session.post(url, headers=self.headers_req, data=payload_req) as response: + response_text = await response.read() + self.logger.debug("POST {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + + response_desc = self._parse_yaml(response_text, response=True) + desc, _ = remove_envelop(item, response_desc) + return desc + + async def _del_item(self, session, item, item_id_name, all_tenants=False): + if all_tenants: + tenant_text = "/any" + elif all_tenants is None: + tenant_text = "" + else: + if not self.tenant: + await self._get_tenant(session) + tenant_text = "/" + self.tenant + if not self.check_if_uuid(item_id_name): + # check that exist + uuid = await self._get_item_uuid(session, item, item_id_name, all_tenants) + else: + uuid = item_id_name + + url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid) + self.logger.debug("DELETE %s", url) + with aiohttp.Timeout(self.timeout_short): + async with session.delete(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug("DELETE {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + return self._parse_yaml(response_text, response=True) + + async def _list_item(self, session, item, all_tenants=False, filter_dict=None): + if all_tenants: + tenant_text = "/any" + elif all_tenants is None: + tenant_text = "" + else: + if not self.tenant: + await self._get_tenant(session) + tenant_text = "/" + self.tenant + + url = "{}{}/{}".format(self.endpoint_url, tenant_text, item) + separator = "?" + if filter_dict: + for k in filter_dict: + url += separator + quote(str(k)) + "=" + quote(str(filter_dict[k])) + separator = "&" + self.logger.debug("openmano GET %s", url) + with aiohttp.Timeout(self.timeout_short): + async with session.get(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + return self._parse_yaml(response_text, response=True) + + async def _edit_item(self, session, item, descriptor, item_id_name, all_tenants=False): + if all_tenants: + tenant_text = "/any" + elif all_tenants is None: + tenant_text = "" + else: + if not self.tenant: + await self._get_tenant(session) + tenant_text = "/" + self.tenant + + if not uuid: + #check that exist + uuid = self._get_item_uuid(session, "tenants", item_id_name, all_tenants) + + payload_req = yaml.safe_dump(descriptor) + + #print payload_req + + url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid) + self.logger.debug("openmano PUT %s %s", url, payload_req) + with aiohttp.Timeout(self.timeout_large): + async with session.put(url, headers=self.headers_req, data=payload_req) as response: + response_text = await response.read() + self.logger.debug("PUT {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + return self._parse_yaml(response_text, response=True) + + async def show(self, item, item_id_name=None, all_tenants=False): + """ + Obtain the information of an item from its id or name + :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' + :param item_id_name: RO id or name of the item. Raise and exception if more than one found + :param all_tenants: True if not filtering by tenant. Only allowed for admin + :return: dictionary with the information or raises ROClientException on Error, NotFound, found several + """ + if item not in self.client_to_RO: + raise ROClientException("Invalid item {}".format(item)) + if item == 'tenant': + all_tenants = None + + with aiohttp.ClientSession(loop=self.loop) as session: + content = await self._get_item(session, self.client_to_RO[item], item_id_name, all_tenants=all_tenants) + if len(content) == 1: + return content.values()[0] + else: + return content + + async def delete(self, item, item_id_name=None, all_tenants=False): + """ + Delete the information of an item from its id or name + :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' + :param item_id_name: RO id or name of the item. Raise and exception if more than one found + :param all_tenants: True if not filtering by tenant. Only allowed for admin + :return: dictionary with the information or raises ROClientException on Error, NotFound, found several + """ + if item not in self.client_to_RO: + raise ROClientException("Invalid item {}".format(item)) + if item == 'tenant': + all_tenants = None + + with aiohttp.ClientSession(loop=self.loop) as session: + if item == 'vim': + # check that exist + item_id = await self._get_item_uuid(session, "datacenters", item_id_name, all_tenants=True) + all_tenants = None + return await self._del_item(session, self.client_to_RO[item], item_id_name, all_tenants=all_tenants) + + async def create(self, item, descriptor=None, descriptor_format=None, **kwargs): + """ + Creates an item from its descriptor + :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' + :param descriptor: can be a dict, or a yaml/json text. Autodetect unless descriptor_format is provided + :param descriptor_format: Can be 'json' or 'yaml' + :param kwargs: Overrides descriptor with values as name, description, vim_url, vim_url_admin, vim_type + keys can be a dot separated list to specify elements inside dict + :return: dictionary with the information or raises ROClientException on Error + """ + if isinstance(descriptor, str): + descriptor = self._parse(descriptor, descriptor_format) + elif descriptor: + pass + else: + descriptor = {} + + if item not in self.client_to_RO: + raise ROClientException("Invalid item {}".format(item)) + desc, enveloped = remove_envelop(item, descriptor) + + # Override descriptor with kwargs + if kwargs: + try: + for k, v in kwargs.items(): + update_content = desc + kitem_old = None + klist = k.split(".") + for kitem in klist: + if kitem_old is not None: + update_content = update_content[kitem_old] + if isinstance(update_content, dict): + kitem_old = kitem + elif isinstance(update_content, list): + kitem_old = int(kitem) + else: + raise ROClientException( + "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem)) + update_content[kitem_old] = v + except KeyError: + raise ROClientException( + "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old)) + except ValueError: + raise ROClientException("Invalid query string '{}'. Expected integer index list instead of '{}'".format( + k, kitem)) + except IndexError: + raise ROClientException( + "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) + + for mandatory in self.mandatory_for_create[item]: + if mandatory not in desc: + raise ROClientException("'{}' is mandatory parameter for {}".format(mandatory, item)) + + all_tenants = False + if item in ('tenant', 'vim'): + all_tenants = None + + if not enveloped: + create_desc = self._create_envelop(item, desc) + else: + create_desc = descriptor + + with aiohttp.ClientSession(loop=self.loop) as session: + return await self._create_item(session, self.client_to_RO[item], create_desc, all_tenants) + + def edit_tenant(self, uuid=None, name=None, descriptor=None, descriptor_format=None, new_name=None, new_description=None): + """Edit the parameters of a tenant + Params: must supply a descriptor or/and a new_name or new_description + uuid or/and name. If only name is supplied, there must be only one or an exception is raised + descriptor: with format {'tenant':{params to change info}} + must be a dictionary or a json/yaml text. + name: the tenant name. Overwrite descriptor name if any + description: tenant descriptor.. Overwrite descriptor description if any + Return: Raises an exception on error, not found or found several + Obtain a dictionary with format {'tenant':{newtenant_info}} + """ + # TODO revise + if isinstance(descriptor, str): + descriptor = self.parse(descriptor, descriptor_format) + elif descriptor: + pass + elif new_name or new_description: + descriptor={"tenant": {}} + else: + raise ROClientException("Missing descriptor") + + if 'tenant' not in descriptor or len(descriptor)!=1: + raise ROClientException("Descriptor must contain only one 'tenant' field") + if new_name: + descriptor['tenant']['name'] = new_name + if new_description: + descriptor['tenant']['description'] = new_description + + return self._edit_item("tenants", descriptor, uuid, name, all_tenants=None) + + #DATACENTERS + + def edit_datacenter(self, uuid=None, name=None, descriptor=None, descriptor_format=None, all_tenants=False, **kwargs): + """Edit the parameters of a datacenter + Params: must supply a descriptor or/and a parameter to change + uuid or/and name. If only name is supplied, there must be only one or an exception is raised + descriptor: with format {'datacenter':{params to change info}} + must be a dictionary or a json/yaml text. + parameters to change can be supplyied by the descriptor or as parameters: + new_name: the datacenter name + vim_url: the datacenter URL + vim_url_admin: the datacenter URL for administrative issues + vim_type: the datacenter type, can be openstack or openvim. + public: boolean, available to other tenants + description: datacenter description + Return: Raises an exception on error, not found or found several + Obtain a dictionary with format {'datacenter':{new_datacenter_info}} + """ + + if isinstance(descriptor, str): + descriptor = self.parse(descriptor, descriptor_format) + elif descriptor: + pass + elif kwargs: + descriptor={"datacenter": {}} + else: + raise ROClientException("Missing descriptor") + + if 'datacenter' not in descriptor or len(descriptor)!=1: + raise ROClientException("Descriptor must contain only one 'datacenter' field") + for param in kwargs: + if param=='new_name': + descriptor['datacenter']['name'] = kwargs[param] + else: + descriptor['datacenter'][param] = kwargs[param] + return self._edit_item("datacenters", descriptor, uuid, name, all_tenants=None) + + def attach_datacenter(self, uuid_name=None, descriptor=None, descriptor_format=None, vim_user=None, vim_password=None, vim_tenant_name=None, vim_tenant_id=None): + #check that exist + uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=True) + tenant_text = "/"+self._get_tenant() + + if isinstance(descriptor, str): + descriptor = self.parse(descriptor, descriptor_format) + elif descriptor: + pass + elif vim_user or vim_password or vim_tenant_name or vim_tenant_id: + descriptor={"datacenter": {}} + else: + raise ROClientException("Missing descriptor or params") + + if vim_user or vim_password or vim_tenant_name or vim_tenant_id: + #print args.name + try: + if vim_user: + descriptor['datacenter']['vim_user'] = vim_user + if vim_password: + descriptor['datacenter']['vim_password'] = vim_password + if vim_tenant_name: + descriptor['datacenter']['vim_tenant_name'] = vim_tenant_name + if vim_tenant_id: + descriptor['datacenter']['vim_tenant'] = vim_tenant_id + except (KeyError, TypeError) as e: + if str(e)=='datacenter': error_pos= "missing field 'datacenter'" + else: error_pos="wrong format" + raise ROClientException("Wrong datacenter descriptor: " + error_pos) + + payload_req = yaml.safe_dump(descriptor) + #print payload_req + url = "{}{}/datacenters/{}".format(self.endpoint_url, tenant_text, uuid) + self.logger.debug("openmano POST %s %s", url, payload_req) + mano_response = requests.post(url, headers = self.headers_req, data=payload_req) + self.logger.debug("openmano response: %s", mano_response.text ) + + content = self._parse_yaml(mano_response.text, response=True) + if mano_response.status_code==200: + return content + else: + raise ROClientException(str(content), http_code=mano_response.status) + + def detach_datacenter(self, uuid_name=None): + if not uuid: + #check that exist + uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=False) + tenant_text = "/"+self._get_tenant() + url = "{}{}/datacenters/{}".format(self.endpoint_url, tenant_text, uuid) + self.logger.debug("openmano DELETE %s", url) + mano_response = requests.delete(url, headers = self.headers_req) + self.logger.debug("openmano response: %s", mano_response.text ) + + content = self._parse_yaml(mano_response.text, response=True) + if mano_response.status_code==200: + return content + else: + raise ROClientException(str(content), http_code=mano_response.status) + + #VNFS + + def edit_scenario(self, uuid=None, name=None, descriptor=None, descriptor_format=None, all_tenants=False, **kwargs): + """Edit the parameters of a scenario + Params: must supply a descriptor or/and a parameters to change + uuid or/and name. If only name is supplied, there must be only one or an exception is raised + descriptor: with format {'scenario':{params to change info}} + must be a dictionary or a json/yaml text. + parameters to change can be supplyied by the descriptor or as parameters: + new_name: the scenario name + public: boolean, available to other tenants + description: scenario description + tenant_id. Propietary tenant + Return: Raises an exception on error, not found or found several + Obtain a dictionary with format {'scenario':{new_scenario_info}} + """ + + if isinstance(descriptor, str): + descriptor = self.parse(descriptor, descriptor_format) + elif descriptor: + pass + elif kwargs: + descriptor={"scenario": {}} + else: + raise ROClientException("Missing descriptor") + + if 'scenario' not in descriptor or len(descriptor)>2: + raise ROClientException("Descriptor must contain only one 'scenario' field") + for param in kwargs: + if param=='new_name': + descriptor['scenario']['name'] = kwargs[param] + else: + descriptor['scenario'][param] = kwargs[param] + return self._edit_item("scenarios", descriptor, uuid, name, all_tenants=None) + + #VIM ACTIONS + def vim_action(self, action, item, uuid=None, all_tenants=False, **kwargs): + """Perform an action over a vim + Params: + action: can be 'list', 'get'/'show', 'delete' or 'create' + item: can be 'tenants' or 'networks' + uuid: uuid of the tenant/net to show or to delete. Ignore otherwise + other parameters: + datacenter_name, datacenter_id: datacenters to act on, if missing uses classes store datacenter + descriptor, descriptor_format: descriptor needed on creation, can be a dict or a yaml/json str + must be a dictionary or a json/yaml text. + name: for created tenant/net Overwrite descriptor name if any + description: tenant descriptor. Overwrite descriptor description if any + + Return: Raises an exception on error + Obtain a dictionary with format {'tenant':{new_tenant_info}} + """ + if item not in ("tenants", "networks", "images"): + raise ROClientException("Unknown value for item '{}', must be 'tenants', 'nets' or " + "images".format(str(item))) + + image_actions = ['list','get','show','delete'] + if item == "images" and action not in image_actions: + raise ROClientException("Only available actions for item '{}' are {}\n" + "Requested action was '{}'".format(item, ', '.join(image_actions), action)) + if all_tenants: + tenant_text = "/any" + else: + tenant_text = "/"+self._get_tenant() + + if "datacenter_id" in kwargs or "datacenter_name" in kwargs: + datacenter = self._get_item_uuid(session, "datacenters", kwargs.get("datacenter"), all_tenants=all_tenants) + else: + datacenter = self.get_datacenter(session) + + if action=="list": + url = "{}{}/vim/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item) + self.logger.debug("GET %s", url ) + mano_response = requests.get(url, headers=self.headers_req) + self.logger.debug("openmano response: %s", mano_response.text ) + content = self._parse_yaml(mano_response.text, response=True) + if mano_response.status_code==200: + return content + else: + raise ROClientException(str(content), http_code=mano_response.status) + elif action=="get" or action=="show": + url = "{}{}/vim/{}/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item, uuid) + self.logger.debug("GET %s", url ) + mano_response = requests.get(url, headers=self.headers_req) + self.logger.debug("openmano response: %s", mano_response.text ) + content = self._parse_yaml(mano_response.text, response=True) + if mano_response.status_code==200: + return content + else: + raise ROClientException(str(content), http_code=mano_response.status) + elif action=="delete": + url = "{}{}/vim/{}/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item, uuid) + self.logger.debug("DELETE %s", url ) + mano_response = requests.delete(url, headers=self.headers_req) + self.logger.debug("openmano response: %s", mano_response.text ) + content = self._parse_yaml(mano_response.text, response=True) + if mano_response.status_code==200: + return content + else: + raise ROClientException(str(content), http_code=mano_response.status) + elif action=="create": + if "descriptor" in kwargs: + if isinstance(kwargs["descriptor"], str): + descriptor = self._parse(kwargs["descriptor"], kwargs.get("descriptor_format") ) + else: + descriptor = kwargs["descriptor"] + elif "name" in kwargs: + descriptor={item[:-1]: {"name": kwargs["name"]}} + else: + raise ROClientException("Missing descriptor") + + if item[:-1] not in descriptor or len(descriptor)!=1: + raise ROClientException("Descriptor must contain only one 'tenant' field") + if "name" in kwargs: + descriptor[ item[:-1] ]['name'] = kwargs["name"] + if "description" in kwargs: + descriptor[ item[:-1] ]['description'] = kwargs["description"] + payload_req = yaml.safe_dump(descriptor) + #print payload_req + url = "{}{}/vim/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item) + self.logger.debug("openmano POST %s %s", url, payload_req) + mano_response = requests.post(url, headers = self.headers_req, data=payload_req) + self.logger.debug("openmano response: %s", mano_response.text ) + content = self._parse_yaml(mano_response.text, response=True) + if mano_response.status_code==200: + return content + else: + raise ROClientException(str(content), http_code=mano_response.status) + else: + raise ROClientException("Unknown value for action '{}".format(str(action))) + + +if __name__ == '__main__': + RO_URL = "http://localhost:9090/openmano" + RO_TENANT = "2c94f639-cefc-4f3a-a8f9-bbab0471946a" + RO_VIM = "3e70deb6-aea1-11e7-af13-080027429aaf" + + streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s" + logging.basicConfig(format=streamformat) + + loop = asyncio.get_event_loop() + myClient = ROClient(endpoint_url=RO_URL, loop=loop, tenant_id=RO_TENANT, datacenter_id=RO_VIM, debug=True) + content = loop.run_until_complete(myClient.list_tenants()) + print(content) + loop.close() + + diff --git a/lcm/dbbase.py b/lcm/dbbase.py new file mode 100644 index 00000000..55041b68 --- /dev/null +++ b/lcm/dbbase.py @@ -0,0 +1,35 @@ + + +class DbException(Exception): + + def __init__(self, message, http_code=404): + self.http_code = http_code + Exception.__init__(self, message) + +class dbbase(object): + + def __init__(self): + pass + + def db_connect(self, config): + pass + + def db_disconnect(self): + pass + + def get_list(self, table, filter={}): + pass + + def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True): + pass + + def create(self, table, indata): + pass + + def del_list(self, table, filter={}): + pass + + def del_one(self, table, filter={}, fail_on_empty=True): + pass + + diff --git a/lcm/dbmemory.py b/lcm/dbmemory.py new file mode 100644 index 00000000..75d1f883 --- /dev/null +++ b/lcm/dbmemory.py @@ -0,0 +1,116 @@ +from dbbase import DbException, dbbase +from http import HTTPStatus +from uuid import uuid4 +from copy import deepcopy + + +class dbmemory(dbbase): + + def __init__(self): + self.db = {} + + @staticmethod + def _format_filter(filter): + return filter # TODO + + def _find(self, table, filter): + for i, row in enumerate(self.db.get(table, ())): + match = True + if filter: + for k, v in filter.items(): + if k not in row or v != row[k]: + match = False + if match: + yield i, row + + def get_list(self, table, filter={}): + try: + l = [] + for _, row in self._find(table, self._format_filter(filter)): + l.append(deepcopy(row)) + return l + except DbException: + raise + except Exception as e: # TODO refine + raise DbException(str(e)) + + def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True): + try: + l = None + for _, row in self._find(table, self._format_filter(filter)): + if not fail_on_more: + return deepcopy(row) + if l: + raise DbException("Found more than one entry with filter='{}'".format(filter), + HTTPStatus.CONFLICT.value) + l = row + if not l and fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return deepcopy(l) + except Exception as e: # TODO refine + raise DbException(str(e)) + + def del_list(self, table, filter={}): + try: + id_list = [] + for i, _ in self._find(table, self._format_filter(filter)): + id_list.append(i) + deleted = len(id_list) + for i in id_list: + del self.db[table][i] + return {"deleted": deleted} + except DbException: + raise + except Exception as e: # TODO refine + raise DbException(str(e)) + + def del_one(self, table, filter={}, fail_on_empty=True): + try: + for i, _ in self._find(table, self._format_filter(filter)): + break + else: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return None + del self.db[table][i] + return {"deleted": 1} + except Exception as e: # TODO refine + raise DbException(str(e)) + + def replace(self, table, filter, indata, fail_on_empty=True): + try: + for i, _ in self._find(table, self._format_filter(filter)): + break + else: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return None + self.db[table][i] = deepcopy(indata) + return {"upadted": 1} + except Exception as e: # TODO refine + raise DbException(str(e)) + + def create(self, table, indata): + try: + id = indata.get("_id") + if not id: + id = str(uuid4()) + indata["_id"] = id + if table not in self.db: + self.db[table] = [] + self.db[table].append(deepcopy(indata)) + return id + except Exception as e: # TODO refine + raise DbException(str(e)) + + +if __name__ == '__main__': + # some test code + db = dbmemory() + db.create("test", {"_id": 1, "data": 1}) + db.create("test", {"_id": 2, "data": 2}) + db.create("test", {"_id": 3, "data": 3}) + print("must be 3 items:", db.get_list("test")) + print("must return item 2:", db.get_list("test", {"_id": 2})) + db.del_one("test", {"_id": 2}) + print("must be emtpy:", db.get_list("test", {"_id": 2})) diff --git a/lcm/lcm.py b/lcm/lcm.py new file mode 100644 index 00000000..99581428 --- /dev/null +++ b/lcm/lcm.py @@ -0,0 +1,380 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +import asyncio +import aiohttp +import yaml +import ROclient +import time +import dbmemory +import logging + +from copy import deepcopy +from uuid import uuid4 + +#streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s" +streamformat = "%(name)s %(levelname)s: %(message)s" +logging.basicConfig(format=streamformat, level=logging.DEBUG) +logger = logging.getLogger('lcm') + +ro_account = { + "url": "http://localhost:9090/openmano", + "tenant": "osm" +} + +vca_account = { + # TODO +} + +# conains created tasks/futures to be able to cancel +lcm_tasks = {} + +headers_req = {'Accept': 'application/yaml', 'content-type': 'application/yaml'} +ns_status = ("CREATION-SCHEDULED", "DEPLOYING", "CONFIGURING", "DELETION-SCHEDULED", "UN-CONFIGURING", "UNDEPLOYING") + +# TODO replace with database calls +db = dbmemory.dbmemory() + + + +async def CreateNS(loop, nsr_id): + logger.debug("CreateNS task nsr_id={} Enter".format(nsr_id)) + nsr_lcm = { + "id": nsr_id, + "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"}, + "nsr_ip": {}, + "VCA": {"TODO"}, + "status": "BUILD", + "status_detailed": "", + } + + deloyment_timeout = 120 + try: + ns_request = db.get_one("ns_request", {"id": nsr_id}) + nsd = db.get_one("nsd", {"id": ns_request["nsd_id"]}) + RO = ROclient.ROClient(loop, endpoint_url=ro_account["url"], tenant=ro_account["tenant"], + datacenter=ns_request["vim"]) + nsr_lcm["status_detailed"] = "Creating vnfd at RO" + # ns_request["constituent-vnfr-ref"] = [] + + db.create("nsr_lcm", nsr_lcm) + + # get vnfds, instantiate at RO + logger.debug("CreateNS task nsr_id={} RO VNFD".format(nsr_id)) + for c_vnf in nsd["constituent-vnfd"]: + vnfd_id = c_vnf["vnfd-id-ref"] + vnfd = db.get_one("vnfd", {"id": vnfd_id}) + vnfd.pop("_admin", None) + vnfd.pop("_id", None) + # vnfr = deepcopy(vnfd) + # vnfr["member-vnf-index"] = c_vnf["member-vnf-index"] + # vnfr["nsr-id"] = nsr_id + # vnfr["id"] = uuid4() + # vnfr["vnf-id"] = vnfd["id"] + # ns_request["constituent-vnfr-ref"],append(vnfd_id) + + # TODO change id for RO in case it is present + try: + desc = await RO.create("vnfd", descriptor=vnfd) + nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"] + db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 409: # conflict, vnfd already present + print("debug", e) + else: + raise + + # db_new("vnfr", vnfr) + # db_update("ns_request", nsr_id, ns_request) + + # create nsd at RO + logger.debug("CreateNS task nsr_id={} RO NSD".format(nsr_id)) + nsr_lcm["status_detailed"] = "Creating nsd at RO" + nsd_id = ns_request["nsd_id"] + nsd = db.get_one("nsd", {"id": nsd_id}) + nsd.pop("_admin", None) + nsd.pop("_id", None) + try: + desc = await RO.create("nsd", descriptor=nsd) + nsr_lcm["RO"]["nsd_id"] = desc["uuid"] + db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 409: # conflict, nsd already present + print("debug", e) + else: + raise + + # Crate ns at RO + logger.debug("CreateNS task nsr_id={} RO NS".format(nsr_id)) + nsr_lcm["status_detailed"] = "Creating ns at RO" + desc = await RO.create("ns", name=ns_request["name"], datacenter=ns_request["vim"], scenario=nsr_lcm["RO"]["nsd_id"]) + RO_nsr_id = desc["uuid"] + nsr_lcm["RO"]["nsr_id"] = RO_nsr_id + nsr_lcm["RO"]["nsr_status"] = "BUILD" + db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + + # wait until NS is ready + deloyment_timeout = 600 + while deloyment_timeout > 0: + ns_status_detailed = "Waiting ns ready at RO" + nsr_lcm["status_detailed"] = ns_status_detailed + desc = await RO.show("ns", RO_nsr_id) + ns_status, ns_status_info = RO.check_ns_status(desc) + nsr_lcm["RO"]["nsr_status"] = ns_status + if ns_status == "ERROR": + raise ROclient.ROClientException(ns_status_info) + elif ns_status == "BUILD": + nsr_lcm["status_detailed"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info) + elif ns_status == "ACTIVE": + nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc) + break + else: + assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status) + + await asyncio.sleep(5, loop=loop) + deloyment_timeout -= 5 + if deloyment_timeout <= 0: + raise ROclient.ROClientException("Timeot wating ns to be ready") + nsr_lcm["status_detailed"] = "Configuring vnfr" + db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + + #for nsd in nsr_lcm["descriptors"]["nsd"]: + + logger.debug("CreateNS task nsr_id={} VCA look for".format(nsr_id)) + for c_vnf in nsd["constituent-vnfd"]: + vnfd_id = c_vnf["vnfd-id-ref"] + vnfd_index = int(c_vnf["member-vnf-index"]) + vnfd = db.get_one("vnfd", {"id": vnfd_id}) + if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"): + proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"] + config_primitive = vnfd["vnf-configuration"].get("config-primitive") + # get parameters for juju charm + base_folder = vnfd["_admin"]["storage"] + path = base_folder + "/charms/" + proxy_charm + mgmt_ip = nsr_lcm['nsr_ip'][vnfd_index] + # TODO launch VCA charm + # task = asyncio.ensure_future(DeployCharm(loop, path, mgmt_ip, config_primitive)) + nsr_lcm["status"] = "DONE" + db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + + return nsr_lcm + + except (ROclient.ROClientException, Exception) as e: + logger.debug("CreateNS nsr_id={} Exception {}".format(nsr_id, e), exc_info=True) + nsr_lcm["status"] = "ERROR" + nsr_lcm["status_detailed"] += ": ERROR {}".format(e) + finally: + logger.debug("CreateNS task nsr_id={} Exit".format(nsr_id)) + + +async def DestroyNS(loop, nsr_id): + logger.debug("DestroyNS task nsr_id={} Enter".format(nsr_id)) + nsr_lcm = db.get_one("nsr_lcm", {"id": nsr_id}) + ns_request = db.get_one("ns_request", {"id": nsr_id}) + + nsr_lcm["status"] = "DELETING" + nsr_lcm["status_detailed"] = "Deleting charms" + db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + # TODO destroy VCA charm + + # remove from RO + RO = ROclient.ROClient(loop, endpoint_url=ro_account["url"], tenant=ro_account["tenant"], + datacenter=ns_request["vim"]) + # Delete ns + try: + RO_nsr_id = nsr_lcm["RO"]["nsr_id"] + if RO_nsr_id: + nsr_lcm["status_detailed"] = "Deleting ns at RO" + desc = await RO.delete("ns", RO_nsr_id) + print("debug", "deleted RO ns {}".format(RO_nsr_id)) + nsr_lcm["RO"]["nsr_id"] = None + nsr_lcm["RO"]["nsr_status"] = "DELETED" + db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 404: + nsr_lcm["RO"]["nsr_id"] = None + nsr_lcm["RO"]["nsr_status"] = "DELETED" + db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + print("warning", e) + else: + print("error", e) + + # Delete nsd + try: + RO_nsd_id = nsr_lcm["RO"]["nsd_id"] + if RO_nsd_id: + nsr_lcm["status_detailed"] = "Deleting nsd at RO" + desc = await RO.delete("nsd", RO_nsd_id) + print("debug", "deleted RO nsd {}".format(RO_nsd_id)) + nsr_lcm["RO"]["nsd_id"] = None + db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 404: + nsr_lcm["RO"]["nsd_id"] = None + print("warning", e) + else: + print("error", e) + + for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items(): + try: + if RO_vnfd_id: + nsr_lcm["status_detailed"] = "Deleting vnfd at RO" + desc = await RO.delete("vnfd", RO_vnfd_id) + print("debug", "deleted RO vnfd {}".format(RO_vnfd_id)) + nsr_lcm["RO"]["vnfd_id"][vnf_id] = None + db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 404: + nsr_lcm["RO"]["vnfd_id"][vnf_id] = None + print("warning", e) + else: + print("error", e) + logger.debug("DestroyNS task nsr_id={} Exit".format(nsr_id)) + + +async def test(loop, param=None): + logger.debug("Starting/Ending test task: {}".format(param)) + + +def cancel_tasks(loop, nsr_id): + """ + Cancel all active tasks of a concrete nsr identified for nsr_id + :param loop: loop + :param nsr_id: nsr identity + :return: None, or raises an exception if not possible + """ + global lcm_tasks + if not lcm_tasks.get(nsr_id): + return + for order_id, tasks_set in lcm_tasks[nsr_id].items(): + for task_name, task in tasks_set.items(): + result = task.cancel() + if result: + logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name)) + lcm_tasks[nsr_id] = {} + + + +async def read_kafka(loop, bus_info): + global lcm_tasks + logger.debug("kafka task Enter") + order_id = 1 + # future = asyncio.Future() + with open(bus_info["file"]) as f: + + # ignore old orders. Read file + command = "fake" + while command: + command = f.read() + + while True: + command = f.read() + if not command: + await asyncio.sleep(2, loop=loop) + continue + order_id += 1 + command = command.strip() + command, _, params = command.partition(" ") + if command == "exit": + print("Bye!") + break + elif command.startswith("#"): + continue + elif command == "echo": + print(params) + elif command == "test": + asyncio.Task(test(loop, params), loop=loop) + elif command == "break": + print("put a break in this line of code") + elif command == "new-ns": + nsr_id = params.strip() + logger.debug("Deploying NS {}".format(nsr_id)) + task = asyncio.ensure_future(CreateNS(loop, nsr_id)) + if nsr_id not in lcm_tasks: + lcm_tasks[nsr_id] = {} + lcm_tasks[nsr_id][order_id] = {"CreateNS": task} + elif command == "del-ns": + nsr_id = params.strip() + logger.debug("Deleting NS {}".format(nsr_id)) + cancel_tasks(loop, nsr_id) + task = asyncio.ensure_future(DestroyNS(loop, nsr_id)) + if nsr_id not in lcm_tasks: + lcm_tasks[nsr_id] = {} + lcm_tasks[nsr_id][order_id] = {"DestroyNS": task} + elif command == "get-ns": + nsr_id = params.strip() + nsr_lcm = db.get_one("nsr_lcm", {"id": nsr_id}) + print("nsr_lcm", nsr_lcm) + print("lcm_tasks", lcm_tasks.get(nsr_id)) + else: + logger.debug("unknown command '{}'".format(command)) + print("Usage:\n echo <>\n new-ns \n del-ns \n get-ns ") + logger.debug("kafka task Exit") + + +def lcm(): + loop = asyncio.get_event_loop() + loop.run_until_complete(read_kafka(loop, {"file": "/home/atierno/OSM/osm/NBI/kafka"})) + return + + +def lcm2(): + loop = asyncio.get_event_loop() + # asyncio.ensure_future(CreateNS, loop) + try: + content = loop.run_until_complete(CreateNS(loop, "ns1")) + print("Done: {}".format(content)) + except ROclient.ROClientException as e: + print("Error {}".format(e)) + + time.sleep(10) + + content = loop.run_until_complete(DestroyNS(loop, "ns1")) + print(content) + + loop.close() + + +if __name__ == '__main__': + + # FOR TEST + RO_VIM = "OST2_MRT" + + #FILL DATABASE + with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f: + vnfd = yaml.load(f) + vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"} + db.create("vnfd", vnfd_clean) + with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f: + vnfd = yaml.load(f) + vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"} + db.create("vnfd", vnfd_clean) + with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f: + nsd = yaml.load(f) + nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) + nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"} + db.create("nsd", nsd_clean) + + ns_request = { + "id": "ns1", + "nsr_id": "ns1", + "name": "pingpongOne", + "vim": RO_VIM, + "nsd_id": nsd_clean["id"], # nsd_ping_pong + } + db.create("ns_request", ns_request) + ns_request = { + "id": "ns2", + "nsr_id": "ns2", + "name": "pingpongTwo", + "vim": RO_VIM, + "nsd_id": nsd_clean["id"], # nsd_ping_pong + } + db.create("ns_request", ns_request) + # lcm2() + lcm() + + + -- 2.17.1