From: tierno Date: Thu, 1 Feb 2018 18:13:07 +0000 (+0100) Subject: lightweight build structure X-Git-Tag: v3.0.3~6 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=0aef0dbb3c8b50426f31812e7f386dc9188823d2;p=osm%2FRO.git lightweight build structure Change-Id: I7a04acdd31dd6ce97546fd762c3c5d550387806d Signed-off-by: tierno --- diff --git a/lcm/ROclient.py b/lcm/ROclient.py deleted file mode 100644 index 84ce7aa1..00000000 --- a/lcm/ROclient.py +++ /dev/null @@ -1,880 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -## -# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U. -# This file is part of openmano -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# For those usages not covered by the Apache License, Version 2.0 please -# contact with: nfvlabs@tid.es -## - -""" -asyncio RO python client to interact with RO-server -""" - -import asyncio -import aiohttp - -import json -import yaml -import logging -import sys -from urllib.parse import quote -from uuid import UUID - -__author__ = "Alfonso Tierno, Pablo Montes" -__date__ = "$09-Jan-2018 09:09:48$" -__version__ = "0.1.0-r470" -version_date = "Jan 2018" -requests = None - -class ROClientException(Exception): - def __init__(self, message, http_code=400): - self.http_code = http_code - Exception.__init__(self, message) - """Common Exception for all openmano client exceptions""" - - -def remove_envelop(item, indata=None): - """ - Obtain the useful data removing the envelop. It goes through the vnfd or nsd catalog and returns the - vnfd or nsd content - :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' - :param indata: Content to be inspected - :return: the useful part of indata (a reference, not a new dictionay) plus boolean indicating if it was enveloped - """ - clean_indata = indata - enveloped = False - if not indata: - return {}, False - if item == "vnfd": - if clean_indata.get('vnfd:vnfd-catalog'): - enveloped = True - clean_indata = clean_indata['vnfd:vnfd-catalog'] - elif clean_indata.get('vnfd-catalog'): - enveloped = True - clean_indata = clean_indata['vnfd-catalog'] - if clean_indata.get('vnfd'): - if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1: - raise ROClientException("'vnfd' must be a list only one element") - clean_indata = clean_indata['vnfd'][0] - elif item == "nsd": - if clean_indata.get('nsd:nsd-catalog'): - enveloped = True - clean_indata = clean_indata['nsd:nsd-catalog'] - elif clean_indata.get('nsd-catalog'): - enveloped = True - clean_indata = clean_indata['nsd-catalog'] - if clean_indata.get('nsd'): - if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1: - raise ROClientException("'nsd' must be a list only one element") - clean_indata = clean_indata['nsd'][0] - elif item == "tenant": - if len(indata) == 1 and "tenant" in indata: - enveloped = True - clean_indata = indata["tenant"] - elif item == "vim" or item == "datacenter": - if len(indata) == 1 and "datacenter" in indata: - enveloped = True - clean_indata = indata["datacenter"] - elif item == "ns" or item == "instances": - if len(indata) == 1 and "instance" in indata: - enveloped = True - clean_indata = indata["instance"] - else: - assert False, "remove_envelop with unknown item {}".format(item) - - return clean_indata, enveloped - - -class ROClient: - headers_req = {'Accept': 'application/yaml', 'content-type': 'application/yaml'} - client_to_RO = {'tenant': 'tenants', 'vim': 'datacenters', 'vnfd': 'vnfs', 'nsd': 'scenarios', - 'ns': 'instances'} - mandatory_for_create = { - 'tenant': ("name", ), - 'vim': ("name", "vim_url"), - 'vnfd': ("name", "id", "connection-point", "vdu"), - 'nsd': ("name", "id", "constituent-vnfd"), - 'ns': ("name", "scenario", "datacenter"), - } - timeout_large = 120 - timeout_short = 30 - - def __init__(self, loop, endpoint_url, **kwargs): - self.loop = loop - self.endpoint_url = endpoint_url - - self.username = kwargs.get("username") - self.password = kwargs.get("password") - self.tenant_id_name = kwargs.get("tenant") - self.tenant = None - self.datacenter_id_name = kwargs.get("datacenter") - self.datacenter = None - self.logger = logging.getLogger(kwargs.get('logger', 'ROClient')) - if kwargs.get("debug"): - self.logger.setLevel(logging.DEBUG) - global requests - requests = kwargs.get("TODO remove") - - def __getitem__(self, index): - if index == 'tenant': - return self.tenant_id_name - elif index == 'datacenter': - return self.datacenter_id_name - elif index == 'username': - return self.username - elif index == 'password': - return self.password - elif index == 'endpoint_url': - return self.endpoint_url - else: - raise KeyError("Invalid key '%s'" %str(index)) - - def __setitem__(self,index, value): - if index == 'tenant': - self.tenant_id_name = value - elif index == 'datacenter': - self.datacenter_id_name = value - elif index == 'username': - self.username = value - elif index == 'password': - self.password = value - elif index == 'endpoint_url': - self.endpoint_url = value - else: - raise KeyError("Invalid key '{}'".format(index)) - self.tenant = None # force to reload tenant with different credentials - self.datacenter = None # force to reload datacenter with different credentials - - def _parse(self, descriptor, descriptor_format, response=False): - #try yaml - if descriptor_format and descriptor_format != "json" and descriptor_format != "yaml": - raise ROClientException("'descriptor_format' must be a 'json' or 'yaml' text") - if descriptor_format != "json": - try: - return yaml.load(descriptor) - except yaml.YAMLError as exc: - error_pos = "" - if hasattr(exc, 'problem_mark'): - mark = exc.problem_mark - error_pos = " at line:{} column:{}s".format(mark.line+1, mark.column+1) - error_text = "yaml format error" + error_pos - elif descriptor_format != "yaml": - try: - return json.loads(descriptor) - except Exception as e: - if response: - error_text = "json format error" + str(e) - - if response: - raise ROClientException(error_text) - raise ROClientException(error_text) - - def _parse_yaml(self, descriptor, response=False): - try: - return yaml.load(descriptor) - except yaml.YAMLError as exc: - error_pos = "" - if hasattr(exc, 'problem_mark'): - mark = exc.problem_mark - error_pos = " at line:{} column:{}s".format(mark.line+1, mark.column+1) - error_text = "yaml format error" + error_pos - if response: - raise ROClientException(error_text) - raise ROClientException(error_text) - - @staticmethod - def check_if_uuid(uuid_text): - """ - Check if text correspond to an uuid foramt - :param uuid_text: - :return: True if it is an uuid False if not - """ - try: - UUID(uuid_text) - return True - except (ValueError, TypeError): - return False - - @staticmethod - def _create_envelop(item, indata=None): - """ - Returns a new dict that incledes indata with the expected envelop - :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' - :param indata: Content to be enveloped - :return: a new dic with {: {indata} } where envelop can be e.g. tenant, datacenter, ... - """ - if item == "vnfd": - return {'vnfd-catalog': {'vnfd': [indata]}} - elif item == "nsd": - return {'nsd-catalog': {'nsd': [indata]}} - elif item == "tenant": - return {'tenant': indata} - elif item == "vim" or item == "datacenter": - return {'datacenter': indata} - elif item == "ns" or item == "instances": - return {'instance': indata} - else: - assert False, "_create_envelop with unknown item {}".format(item) - - @staticmethod - def check_ns_status(ns_descriptor): - """ - Inspect RO instance descriptor and indicates the status - :param ns_descriptor: instance descriptor obtained with self.show("ns", ) - :return: status, message: status can be BUILD,ACTIVE,ERROR, message is a text message - """ - net_total = 0 - vm_total = 0 - net_done = 0 - vm_done = 0 - - for net in ns_descriptor["nets"]: - net_total += 1 - if net["status"] == "ERROR": - return "ERROR", net["error_msg"] - elif net["status"] == "ACTIVE": - net_done += 1 - for vnf in ns_descriptor["vnfs"]: - for vm in vnf["vms"]: - vm_total += 1 - if vm["status"] == "ERROR": - return "ERROR", vm["error_msg"] - elif vm["status"] == "ACTIVE": - vm_done += 1 - - if net_total == net_done and vm_total == vm_done: - return "ACTIVE", "VMs {}, networks: {}".format(vm_total, net_total) - else: - return "BUILD", "VMs: {}/{}, networks: {}/{}".format(vm_done, vm_total, net_done, net_total) - - @staticmethod - def get_ns_vnf_ip(ns_descriptor): - """ - Get a dict with the IPs of every vnf. - :param ns_descriptor: instance descriptor obtained with self.show("ns", ) - :return: dict iwth key member_vnf_index, value ip_address - """ - ns_ip={} - for vnf in ns_descriptor["vnfs"]: - ns_ip[vnf["member_vnf_index"]] = vnf["ip_address"] - #uuid sce_vnf_id - # vnf[mgmt_access]: '{interface_id: cf3cbf00-385c-49b4-9a3f-b400b7b15dc6, vm_id: d0dd22a9-91ef-46f1-8e8f-8cf4b2d5b2d7}' - # vnf[vms] - return ns_ip - - async def get_list(self, item, all_tenants=False, filter_by=None): - """ - Obtain a list of items filtering by the specigy filter_by. - :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' - :param all_tenants: True if not filtering by tenant. Only allowed for admin - :param filter_by: dictionary with filtering - :return: a list of dict. It can be empty. Raises ROClientException on Error, - """ - if item not in self.client_to_RO: - raise ROClientException("Invalid item {}".format(item)) - if item == 'tenant': - all_tenants = None - with aiohttp.ClientSession(loop=self.loop) as session: - content = await self._list_item(session, self.client_to_RO[item], all_tenants=all_tenants, - filter_dict=filter_by) - if isinstance(content, dict): - if len(content) == 1: - return content.values()[0] - else: - raise ROClientException("Output not a list neither dict with len equal 1", http_code=500) - return content - - async def _get_item_uuid(self, session, item, item_id_name, all_tenants=False): - if all_tenants: - tenant_text = "/any" - elif all_tenants is None: - tenant_text = "" - else: - if not self.tenant: - await self._get_tenant(session) - tenant_text = "/" + self.tenant - - item_id = 0 - url = "{}{}/{}".format(self.endpoint_url, tenant_text, item) - if self.check_if_uuid(item_id_name): - item_id = item_id_name - url += "/" + item_id_name - elif item_id_name and item_id_name.startswith("'") and item_id_name.endswith("'"): - item_id_name = item_id_name[1:-1] - self.logger.debug("openmano GET %s", url) - with aiohttp.Timeout(self.timeout_short): - async with session.get(url, headers=self.headers_req) as response: - response_text = await response.read() - self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100])) - if response.status == 404: # NOT_FOUND - raise ROClientException("No {} found with id '{}'".format(item[:-1], item_id_name), - http_code=404) - if response.status >= 300: - raise ROClientException(response_text, http_code=response.status) - content = self._parse_yaml(response_text, response=True) - - if item_id: - return item_id - desc = content[item] - assert isinstance(desc, list), "_get_item_uuid get a non dict with a list inside {}".format(type(desc)) - uuid = None - for i in desc: - if item_id_name and i["name"] != item_id_name: - continue - if uuid: # found more than one - raise ROClientException( - "Found more than one {} with name '{}'. uuid must be used".format(item, item_id_name), - http_code=404) - uuid = i["uuid"] - if not uuid: - raise ROClientException("No {} found with name '{}'".format(item[:-1], item_id_name), http_code=404) - return uuid - - async def _get_item(self, session, item, item_id_name, all_tenants=False): - if all_tenants: - tenant_text = "/any" - elif all_tenants is None: - tenant_text = "" - else: - if not self.tenant: - await self._get_tenant(session) - tenant_text = "/" + self.tenant - - if self.check_if_uuid(item_id_name): - uuid = item_id_name - else: - # check that exist - uuid = self._get_item_uuid(session, item, item_id_name, all_tenants) - - url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid) - self.logger.debug("GET %s", url ) - with aiohttp.Timeout(self.timeout_short): - async with session.get(url, headers=self.headers_req) as response: - response_text = await response.read() - self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100])) - if response.status >= 300: - raise ROClientException(response_text, http_code=response.status) - - return self._parse_yaml(response_text, response=True) - - async def _get_tenant(self, session): - if not self.tenant: - self.tenant = await self._get_item_uuid(session, "tenants", self.tenant_id_name, None) - return self.tenant - - async def _get_datacenter(self, session): - if not self.tenant: - await self._get_tenant(session) - if not self.datacenter: - self.datacenter = await self._get_item_uuid(session, "datacenters", self.datacenter_id_name, True) - return self.datacenter - - async def _create_item(self, session, item, descriptor, all_tenants=False): - if all_tenants: - tenant_text = "/any" - elif all_tenants is None: - tenant_text = "" - else: - if not self.tenant: - await self._get_tenant(session) - tenant_text = "/" + self.tenant - payload_req = yaml.safe_dump(descriptor) - - api_version_text = "" - if item == "vnfs": - # assumes version v3 only - api_version_text = "/v3" - item = "vnfd" - elif item == "scenarios": - # assumes version v3 only - api_version_text = "/v3" - item = "nsd" - - #print payload_req - - url = "{}{apiver}{tenant}/{item}".format(self.endpoint_url, apiver=api_version_text, tenant=tenant_text, - item=item) - self.logger.debug("openmano POST %s %s", url, payload_req) - with aiohttp.Timeout(self.timeout_large): - async with session.post(url, headers=self.headers_req, data=payload_req) as response: - response_text = await response.read() - self.logger.debug("POST {} [{}] {}".format(url, response.status, response_text[:100])) - if response.status >= 300: - raise ROClientException(response_text, http_code=response.status) - - response_desc = self._parse_yaml(response_text, response=True) - desc, _ = remove_envelop(item, response_desc) - return desc - - async def _del_item(self, session, item, item_id_name, all_tenants=False): - if all_tenants: - tenant_text = "/any" - elif all_tenants is None: - tenant_text = "" - else: - if not self.tenant: - await self._get_tenant(session) - tenant_text = "/" + self.tenant - if not self.check_if_uuid(item_id_name): - # check that exist - uuid = await self._get_item_uuid(session, item, item_id_name, all_tenants) - else: - uuid = item_id_name - - url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid) - self.logger.debug("DELETE %s", url) - with aiohttp.Timeout(self.timeout_short): - async with session.delete(url, headers=self.headers_req) as response: - response_text = await response.read() - self.logger.debug("DELETE {} [{}] {}".format(url, response.status, response_text[:100])) - if response.status >= 300: - raise ROClientException(response_text, http_code=response.status) - return self._parse_yaml(response_text, response=True) - - async def _list_item(self, session, item, all_tenants=False, filter_dict=None): - if all_tenants: - tenant_text = "/any" - elif all_tenants is None: - tenant_text = "" - else: - if not self.tenant: - await self._get_tenant(session) - tenant_text = "/" + self.tenant - - url = "{}{}/{}".format(self.endpoint_url, tenant_text, item) - separator = "?" - if filter_dict: - for k in filter_dict: - url += separator + quote(str(k)) + "=" + quote(str(filter_dict[k])) - separator = "&" - self.logger.debug("openmano GET %s", url) - with aiohttp.Timeout(self.timeout_short): - async with session.get(url, headers=self.headers_req) as response: - response_text = await response.read() - self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100])) - if response.status >= 300: - raise ROClientException(response_text, http_code=response.status) - return self._parse_yaml(response_text, response=True) - - async def _edit_item(self, session, item, descriptor, item_id_name, all_tenants=False): - if all_tenants: - tenant_text = "/any" - elif all_tenants is None: - tenant_text = "" - else: - if not self.tenant: - await self._get_tenant(session) - tenant_text = "/" + self.tenant - - if not uuid: - #check that exist - uuid = self._get_item_uuid(session, "tenants", item_id_name, all_tenants) - - payload_req = yaml.safe_dump(descriptor) - - #print payload_req - - url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid) - self.logger.debug("openmano PUT %s %s", url, payload_req) - with aiohttp.Timeout(self.timeout_large): - async with session.put(url, headers=self.headers_req, data=payload_req) as response: - response_text = await response.read() - self.logger.debug("PUT {} [{}] {}".format(url, response.status, response_text[:100])) - if response.status >= 300: - raise ROClientException(response_text, http_code=response.status) - return self._parse_yaml(response_text, response=True) - - async def show(self, item, item_id_name=None, all_tenants=False): - """ - Obtain the information of an item from its id or name - :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' - :param item_id_name: RO id or name of the item. Raise and exception if more than one found - :param all_tenants: True if not filtering by tenant. Only allowed for admin - :return: dictionary with the information or raises ROClientException on Error, NotFound, found several - """ - if item not in self.client_to_RO: - raise ROClientException("Invalid item {}".format(item)) - if item == 'tenant': - all_tenants = None - - with aiohttp.ClientSession(loop=self.loop) as session: - content = await self._get_item(session, self.client_to_RO[item], item_id_name, all_tenants=all_tenants) - if len(content) == 1: - return content.values()[0] - else: - return content - - async def delete(self, item, item_id_name=None, all_tenants=False): - """ - Delete the information of an item from its id or name - :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' - :param item_id_name: RO id or name of the item. Raise and exception if more than one found - :param all_tenants: True if not filtering by tenant. Only allowed for admin - :return: dictionary with the information or raises ROClientException on Error, NotFound, found several - """ - if item not in self.client_to_RO: - raise ROClientException("Invalid item {}".format(item)) - if item == 'tenant': - all_tenants = None - - with aiohttp.ClientSession(loop=self.loop) as session: - if item == 'vim': - # check that exist - item_id = await self._get_item_uuid(session, "datacenters", item_id_name, all_tenants=True) - all_tenants = None - return await self._del_item(session, self.client_to_RO[item], item_id_name, all_tenants=all_tenants) - - async def create(self, item, descriptor=None, descriptor_format=None, **kwargs): - """ - Creates an item from its descriptor - :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' - :param descriptor: can be a dict, or a yaml/json text. Autodetect unless descriptor_format is provided - :param descriptor_format: Can be 'json' or 'yaml' - :param kwargs: Overrides descriptor with values as name, description, vim_url, vim_url_admin, vim_type - keys can be a dot separated list to specify elements inside dict - :return: dictionary with the information or raises ROClientException on Error - """ - if isinstance(descriptor, str): - descriptor = self._parse(descriptor, descriptor_format) - elif descriptor: - pass - else: - descriptor = {} - - if item not in self.client_to_RO: - raise ROClientException("Invalid item {}".format(item)) - desc, enveloped = remove_envelop(item, descriptor) - - # Override descriptor with kwargs - if kwargs: - try: - for k, v in kwargs.items(): - update_content = desc - kitem_old = None - klist = k.split(".") - for kitem in klist: - if kitem_old is not None: - update_content = update_content[kitem_old] - if isinstance(update_content, dict): - kitem_old = kitem - elif isinstance(update_content, list): - kitem_old = int(kitem) - else: - raise ROClientException( - "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem)) - update_content[kitem_old] = v - except KeyError: - raise ROClientException( - "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old)) - except ValueError: - raise ROClientException("Invalid query string '{}'. Expected integer index list instead of '{}'".format( - k, kitem)) - except IndexError: - raise ROClientException( - "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) - - for mandatory in self.mandatory_for_create[item]: - if mandatory not in desc: - raise ROClientException("'{}' is mandatory parameter for {}".format(mandatory, item)) - - all_tenants = False - if item in ('tenant', 'vim'): - all_tenants = None - - if not enveloped: - create_desc = self._create_envelop(item, desc) - else: - create_desc = descriptor - - with aiohttp.ClientSession(loop=self.loop) as session: - return await self._create_item(session, self.client_to_RO[item], create_desc, all_tenants) - - def edit_tenant(self, uuid=None, name=None, descriptor=None, descriptor_format=None, new_name=None, new_description=None): - """Edit the parameters of a tenant - Params: must supply a descriptor or/and a new_name or new_description - uuid or/and name. If only name is supplied, there must be only one or an exception is raised - descriptor: with format {'tenant':{params to change info}} - must be a dictionary or a json/yaml text. - name: the tenant name. Overwrite descriptor name if any - description: tenant descriptor.. Overwrite descriptor description if any - Return: Raises an exception on error, not found or found several - Obtain a dictionary with format {'tenant':{newtenant_info}} - """ - # TODO revise - if isinstance(descriptor, str): - descriptor = self.parse(descriptor, descriptor_format) - elif descriptor: - pass - elif new_name or new_description: - descriptor={"tenant": {}} - else: - raise ROClientException("Missing descriptor") - - if 'tenant' not in descriptor or len(descriptor)!=1: - raise ROClientException("Descriptor must contain only one 'tenant' field") - if new_name: - descriptor['tenant']['name'] = new_name - if new_description: - descriptor['tenant']['description'] = new_description - - return self._edit_item("tenants", descriptor, uuid, name, all_tenants=None) - - #DATACENTERS - - def edit_datacenter(self, uuid=None, name=None, descriptor=None, descriptor_format=None, all_tenants=False, **kwargs): - """Edit the parameters of a datacenter - Params: must supply a descriptor or/and a parameter to change - uuid or/and name. If only name is supplied, there must be only one or an exception is raised - descriptor: with format {'datacenter':{params to change info}} - must be a dictionary or a json/yaml text. - parameters to change can be supplyied by the descriptor or as parameters: - new_name: the datacenter name - vim_url: the datacenter URL - vim_url_admin: the datacenter URL for administrative issues - vim_type: the datacenter type, can be openstack or openvim. - public: boolean, available to other tenants - description: datacenter description - Return: Raises an exception on error, not found or found several - Obtain a dictionary with format {'datacenter':{new_datacenter_info}} - """ - - if isinstance(descriptor, str): - descriptor = self.parse(descriptor, descriptor_format) - elif descriptor: - pass - elif kwargs: - descriptor={"datacenter": {}} - else: - raise ROClientException("Missing descriptor") - - if 'datacenter' not in descriptor or len(descriptor)!=1: - raise ROClientException("Descriptor must contain only one 'datacenter' field") - for param in kwargs: - if param=='new_name': - descriptor['datacenter']['name'] = kwargs[param] - else: - descriptor['datacenter'][param] = kwargs[param] - return self._edit_item("datacenters", descriptor, uuid, name, all_tenants=None) - - def attach_datacenter(self, uuid_name=None, descriptor=None, descriptor_format=None, vim_user=None, vim_password=None, vim_tenant_name=None, vim_tenant_id=None): - #check that exist - uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=True) - tenant_text = "/"+self._get_tenant() - - if isinstance(descriptor, str): - descriptor = self.parse(descriptor, descriptor_format) - elif descriptor: - pass - elif vim_user or vim_password or vim_tenant_name or vim_tenant_id: - descriptor={"datacenter": {}} - else: - raise ROClientException("Missing descriptor or params") - - if vim_user or vim_password or vim_tenant_name or vim_tenant_id: - #print args.name - try: - if vim_user: - descriptor['datacenter']['vim_user'] = vim_user - if vim_password: - descriptor['datacenter']['vim_password'] = vim_password - if vim_tenant_name: - descriptor['datacenter']['vim_tenant_name'] = vim_tenant_name - if vim_tenant_id: - descriptor['datacenter']['vim_tenant'] = vim_tenant_id - except (KeyError, TypeError) as e: - if str(e)=='datacenter': error_pos= "missing field 'datacenter'" - else: error_pos="wrong format" - raise ROClientException("Wrong datacenter descriptor: " + error_pos) - - payload_req = yaml.safe_dump(descriptor) - #print payload_req - url = "{}{}/datacenters/{}".format(self.endpoint_url, tenant_text, uuid) - self.logger.debug("openmano POST %s %s", url, payload_req) - mano_response = requests.post(url, headers = self.headers_req, data=payload_req) - self.logger.debug("openmano response: %s", mano_response.text ) - - content = self._parse_yaml(mano_response.text, response=True) - if mano_response.status_code==200: - return content - else: - raise ROClientException(str(content), http_code=mano_response.status) - - def detach_datacenter(self, uuid_name=None): - if not uuid: - #check that exist - uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=False) - tenant_text = "/"+self._get_tenant() - url = "{}{}/datacenters/{}".format(self.endpoint_url, tenant_text, uuid) - self.logger.debug("openmano DELETE %s", url) - mano_response = requests.delete(url, headers = self.headers_req) - self.logger.debug("openmano response: %s", mano_response.text ) - - content = self._parse_yaml(mano_response.text, response=True) - if mano_response.status_code==200: - return content - else: - raise ROClientException(str(content), http_code=mano_response.status) - - #VNFS - - def edit_scenario(self, uuid=None, name=None, descriptor=None, descriptor_format=None, all_tenants=False, **kwargs): - """Edit the parameters of a scenario - Params: must supply a descriptor or/and a parameters to change - uuid or/and name. If only name is supplied, there must be only one or an exception is raised - descriptor: with format {'scenario':{params to change info}} - must be a dictionary or a json/yaml text. - parameters to change can be supplyied by the descriptor or as parameters: - new_name: the scenario name - public: boolean, available to other tenants - description: scenario description - tenant_id. Propietary tenant - Return: Raises an exception on error, not found or found several - Obtain a dictionary with format {'scenario':{new_scenario_info}} - """ - - if isinstance(descriptor, str): - descriptor = self.parse(descriptor, descriptor_format) - elif descriptor: - pass - elif kwargs: - descriptor={"scenario": {}} - else: - raise ROClientException("Missing descriptor") - - if 'scenario' not in descriptor or len(descriptor)>2: - raise ROClientException("Descriptor must contain only one 'scenario' field") - for param in kwargs: - if param=='new_name': - descriptor['scenario']['name'] = kwargs[param] - else: - descriptor['scenario'][param] = kwargs[param] - return self._edit_item("scenarios", descriptor, uuid, name, all_tenants=None) - - #VIM ACTIONS - def vim_action(self, action, item, uuid=None, all_tenants=False, **kwargs): - """Perform an action over a vim - Params: - action: can be 'list', 'get'/'show', 'delete' or 'create' - item: can be 'tenants' or 'networks' - uuid: uuid of the tenant/net to show or to delete. Ignore otherwise - other parameters: - datacenter_name, datacenter_id: datacenters to act on, if missing uses classes store datacenter - descriptor, descriptor_format: descriptor needed on creation, can be a dict or a yaml/json str - must be a dictionary or a json/yaml text. - name: for created tenant/net Overwrite descriptor name if any - description: tenant descriptor. Overwrite descriptor description if any - - Return: Raises an exception on error - Obtain a dictionary with format {'tenant':{new_tenant_info}} - """ - if item not in ("tenants", "networks", "images"): - raise ROClientException("Unknown value for item '{}', must be 'tenants', 'nets' or " - "images".format(str(item))) - - image_actions = ['list','get','show','delete'] - if item == "images" and action not in image_actions: - raise ROClientException("Only available actions for item '{}' are {}\n" - "Requested action was '{}'".format(item, ', '.join(image_actions), action)) - if all_tenants: - tenant_text = "/any" - else: - tenant_text = "/"+self._get_tenant() - - if "datacenter_id" in kwargs or "datacenter_name" in kwargs: - datacenter = self._get_item_uuid(session, "datacenters", kwargs.get("datacenter"), all_tenants=all_tenants) - else: - datacenter = self.get_datacenter(session) - - if action=="list": - url = "{}{}/vim/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item) - self.logger.debug("GET %s", url ) - mano_response = requests.get(url, headers=self.headers_req) - self.logger.debug("openmano response: %s", mano_response.text ) - content = self._parse_yaml(mano_response.text, response=True) - if mano_response.status_code==200: - return content - else: - raise ROClientException(str(content), http_code=mano_response.status) - elif action=="get" or action=="show": - url = "{}{}/vim/{}/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item, uuid) - self.logger.debug("GET %s", url ) - mano_response = requests.get(url, headers=self.headers_req) - self.logger.debug("openmano response: %s", mano_response.text ) - content = self._parse_yaml(mano_response.text, response=True) - if mano_response.status_code==200: - return content - else: - raise ROClientException(str(content), http_code=mano_response.status) - elif action=="delete": - url = "{}{}/vim/{}/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item, uuid) - self.logger.debug("DELETE %s", url ) - mano_response = requests.delete(url, headers=self.headers_req) - self.logger.debug("openmano response: %s", mano_response.text ) - content = self._parse_yaml(mano_response.text, response=True) - if mano_response.status_code==200: - return content - else: - raise ROClientException(str(content), http_code=mano_response.status) - elif action=="create": - if "descriptor" in kwargs: - if isinstance(kwargs["descriptor"], str): - descriptor = self._parse(kwargs["descriptor"], kwargs.get("descriptor_format") ) - else: - descriptor = kwargs["descriptor"] - elif "name" in kwargs: - descriptor={item[:-1]: {"name": kwargs["name"]}} - else: - raise ROClientException("Missing descriptor") - - if item[:-1] not in descriptor or len(descriptor)!=1: - raise ROClientException("Descriptor must contain only one 'tenant' field") - if "name" in kwargs: - descriptor[ item[:-1] ]['name'] = kwargs["name"] - if "description" in kwargs: - descriptor[ item[:-1] ]['description'] = kwargs["description"] - payload_req = yaml.safe_dump(descriptor) - #print payload_req - url = "{}{}/vim/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item) - self.logger.debug("openmano POST %s %s", url, payload_req) - mano_response = requests.post(url, headers = self.headers_req, data=payload_req) - self.logger.debug("openmano response: %s", mano_response.text ) - content = self._parse_yaml(mano_response.text, response=True) - if mano_response.status_code==200: - return content - else: - raise ROClientException(str(content), http_code=mano_response.status) - else: - raise ROClientException("Unknown value for action '{}".format(str(action))) - - -if __name__ == '__main__': - RO_URL = "http://localhost:9090/openmano" - RO_TENANT = "2c94f639-cefc-4f3a-a8f9-bbab0471946a" - RO_VIM = "3e70deb6-aea1-11e7-af13-080027429aaf" - - streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s" - logging.basicConfig(format=streamformat) - - loop = asyncio.get_event_loop() - myClient = ROClient(endpoint_url=RO_URL, loop=loop, tenant_id=RO_TENANT, datacenter_id=RO_VIM, debug=True) - content = loop.run_until_complete(myClient.list_tenants()) - print(content) - loop.close() - - diff --git a/lcm/dbbase.py b/lcm/dbbase.py deleted file mode 100644 index 55041b68..00000000 --- a/lcm/dbbase.py +++ /dev/null @@ -1,35 +0,0 @@ - - -class DbException(Exception): - - def __init__(self, message, http_code=404): - self.http_code = http_code - Exception.__init__(self, message) - -class dbbase(object): - - def __init__(self): - pass - - def db_connect(self, config): - pass - - def db_disconnect(self): - pass - - def get_list(self, table, filter={}): - pass - - def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True): - pass - - def create(self, table, indata): - pass - - def del_list(self, table, filter={}): - pass - - def del_one(self, table, filter={}, fail_on_empty=True): - pass - - diff --git a/lcm/dbmemory.py b/lcm/dbmemory.py deleted file mode 100644 index 75d1f883..00000000 --- a/lcm/dbmemory.py +++ /dev/null @@ -1,116 +0,0 @@ -from dbbase import DbException, dbbase -from http import HTTPStatus -from uuid import uuid4 -from copy import deepcopy - - -class dbmemory(dbbase): - - def __init__(self): - self.db = {} - - @staticmethod - def _format_filter(filter): - return filter # TODO - - def _find(self, table, filter): - for i, row in enumerate(self.db.get(table, ())): - match = True - if filter: - for k, v in filter.items(): - if k not in row or v != row[k]: - match = False - if match: - yield i, row - - def get_list(self, table, filter={}): - try: - l = [] - for _, row in self._find(table, self._format_filter(filter)): - l.append(deepcopy(row)) - return l - except DbException: - raise - except Exception as e: # TODO refine - raise DbException(str(e)) - - def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True): - try: - l = None - for _, row in self._find(table, self._format_filter(filter)): - if not fail_on_more: - return deepcopy(row) - if l: - raise DbException("Found more than one entry with filter='{}'".format(filter), - HTTPStatus.CONFLICT.value) - l = row - if not l and fail_on_empty: - raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) - return deepcopy(l) - except Exception as e: # TODO refine - raise DbException(str(e)) - - def del_list(self, table, filter={}): - try: - id_list = [] - for i, _ in self._find(table, self._format_filter(filter)): - id_list.append(i) - deleted = len(id_list) - for i in id_list: - del self.db[table][i] - return {"deleted": deleted} - except DbException: - raise - except Exception as e: # TODO refine - raise DbException(str(e)) - - def del_one(self, table, filter={}, fail_on_empty=True): - try: - for i, _ in self._find(table, self._format_filter(filter)): - break - else: - if fail_on_empty: - raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) - return None - del self.db[table][i] - return {"deleted": 1} - except Exception as e: # TODO refine - raise DbException(str(e)) - - def replace(self, table, filter, indata, fail_on_empty=True): - try: - for i, _ in self._find(table, self._format_filter(filter)): - break - else: - if fail_on_empty: - raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) - return None - self.db[table][i] = deepcopy(indata) - return {"upadted": 1} - except Exception as e: # TODO refine - raise DbException(str(e)) - - def create(self, table, indata): - try: - id = indata.get("_id") - if not id: - id = str(uuid4()) - indata["_id"] = id - if table not in self.db: - self.db[table] = [] - self.db[table].append(deepcopy(indata)) - return id - except Exception as e: # TODO refine - raise DbException(str(e)) - - -if __name__ == '__main__': - # some test code - db = dbmemory() - db.create("test", {"_id": 1, "data": 1}) - db.create("test", {"_id": 2, "data": 2}) - db.create("test", {"_id": 3, "data": 3}) - print("must be 3 items:", db.get_list("test")) - print("must return item 2:", db.get_list("test", {"_id": 2})) - db.del_one("test", {"_id": 2}) - print("must be emtpy:", db.get_list("test", {"_id": 2})) diff --git a/lcm/lcm.py b/lcm/lcm.py deleted file mode 100644 index 99581428..00000000 --- a/lcm/lcm.py +++ /dev/null @@ -1,380 +0,0 @@ -#!/usr/bin/python3 -# -*- coding: utf-8 -*- - -import asyncio -import aiohttp -import yaml -import ROclient -import time -import dbmemory -import logging - -from copy import deepcopy -from uuid import uuid4 - -#streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s" -streamformat = "%(name)s %(levelname)s: %(message)s" -logging.basicConfig(format=streamformat, level=logging.DEBUG) -logger = logging.getLogger('lcm') - -ro_account = { - "url": "http://localhost:9090/openmano", - "tenant": "osm" -} - -vca_account = { - # TODO -} - -# conains created tasks/futures to be able to cancel -lcm_tasks = {} - -headers_req = {'Accept': 'application/yaml', 'content-type': 'application/yaml'} -ns_status = ("CREATION-SCHEDULED", "DEPLOYING", "CONFIGURING", "DELETION-SCHEDULED", "UN-CONFIGURING", "UNDEPLOYING") - -# TODO replace with database calls -db = dbmemory.dbmemory() - - - -async def CreateNS(loop, nsr_id): - logger.debug("CreateNS task nsr_id={} Enter".format(nsr_id)) - nsr_lcm = { - "id": nsr_id, - "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"}, - "nsr_ip": {}, - "VCA": {"TODO"}, - "status": "BUILD", - "status_detailed": "", - } - - deloyment_timeout = 120 - try: - ns_request = db.get_one("ns_request", {"id": nsr_id}) - nsd = db.get_one("nsd", {"id": ns_request["nsd_id"]}) - RO = ROclient.ROClient(loop, endpoint_url=ro_account["url"], tenant=ro_account["tenant"], - datacenter=ns_request["vim"]) - nsr_lcm["status_detailed"] = "Creating vnfd at RO" - # ns_request["constituent-vnfr-ref"] = [] - - db.create("nsr_lcm", nsr_lcm) - - # get vnfds, instantiate at RO - logger.debug("CreateNS task nsr_id={} RO VNFD".format(nsr_id)) - for c_vnf in nsd["constituent-vnfd"]: - vnfd_id = c_vnf["vnfd-id-ref"] - vnfd = db.get_one("vnfd", {"id": vnfd_id}) - vnfd.pop("_admin", None) - vnfd.pop("_id", None) - # vnfr = deepcopy(vnfd) - # vnfr["member-vnf-index"] = c_vnf["member-vnf-index"] - # vnfr["nsr-id"] = nsr_id - # vnfr["id"] = uuid4() - # vnfr["vnf-id"] = vnfd["id"] - # ns_request["constituent-vnfr-ref"],append(vnfd_id) - - # TODO change id for RO in case it is present - try: - desc = await RO.create("vnfd", descriptor=vnfd) - nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"] - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 409: # conflict, vnfd already present - print("debug", e) - else: - raise - - # db_new("vnfr", vnfr) - # db_update("ns_request", nsr_id, ns_request) - - # create nsd at RO - logger.debug("CreateNS task nsr_id={} RO NSD".format(nsr_id)) - nsr_lcm["status_detailed"] = "Creating nsd at RO" - nsd_id = ns_request["nsd_id"] - nsd = db.get_one("nsd", {"id": nsd_id}) - nsd.pop("_admin", None) - nsd.pop("_id", None) - try: - desc = await RO.create("nsd", descriptor=nsd) - nsr_lcm["RO"]["nsd_id"] = desc["uuid"] - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 409: # conflict, nsd already present - print("debug", e) - else: - raise - - # Crate ns at RO - logger.debug("CreateNS task nsr_id={} RO NS".format(nsr_id)) - nsr_lcm["status_detailed"] = "Creating ns at RO" - desc = await RO.create("ns", name=ns_request["name"], datacenter=ns_request["vim"], scenario=nsr_lcm["RO"]["nsd_id"]) - RO_nsr_id = desc["uuid"] - nsr_lcm["RO"]["nsr_id"] = RO_nsr_id - nsr_lcm["RO"]["nsr_status"] = "BUILD" - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - - # wait until NS is ready - deloyment_timeout = 600 - while deloyment_timeout > 0: - ns_status_detailed = "Waiting ns ready at RO" - nsr_lcm["status_detailed"] = ns_status_detailed - desc = await RO.show("ns", RO_nsr_id) - ns_status, ns_status_info = RO.check_ns_status(desc) - nsr_lcm["RO"]["nsr_status"] = ns_status - if ns_status == "ERROR": - raise ROclient.ROClientException(ns_status_info) - elif ns_status == "BUILD": - nsr_lcm["status_detailed"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info) - elif ns_status == "ACTIVE": - nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc) - break - else: - assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status) - - await asyncio.sleep(5, loop=loop) - deloyment_timeout -= 5 - if deloyment_timeout <= 0: - raise ROclient.ROClientException("Timeot wating ns to be ready") - nsr_lcm["status_detailed"] = "Configuring vnfr" - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - - #for nsd in nsr_lcm["descriptors"]["nsd"]: - - logger.debug("CreateNS task nsr_id={} VCA look for".format(nsr_id)) - for c_vnf in nsd["constituent-vnfd"]: - vnfd_id = c_vnf["vnfd-id-ref"] - vnfd_index = int(c_vnf["member-vnf-index"]) - vnfd = db.get_one("vnfd", {"id": vnfd_id}) - if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"): - proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"] - config_primitive = vnfd["vnf-configuration"].get("config-primitive") - # get parameters for juju charm - base_folder = vnfd["_admin"]["storage"] - path = base_folder + "/charms/" + proxy_charm - mgmt_ip = nsr_lcm['nsr_ip'][vnfd_index] - # TODO launch VCA charm - # task = asyncio.ensure_future(DeployCharm(loop, path, mgmt_ip, config_primitive)) - nsr_lcm["status"] = "DONE" - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - - return nsr_lcm - - except (ROclient.ROClientException, Exception) as e: - logger.debug("CreateNS nsr_id={} Exception {}".format(nsr_id, e), exc_info=True) - nsr_lcm["status"] = "ERROR" - nsr_lcm["status_detailed"] += ": ERROR {}".format(e) - finally: - logger.debug("CreateNS task nsr_id={} Exit".format(nsr_id)) - - -async def DestroyNS(loop, nsr_id): - logger.debug("DestroyNS task nsr_id={} Enter".format(nsr_id)) - nsr_lcm = db.get_one("nsr_lcm", {"id": nsr_id}) - ns_request = db.get_one("ns_request", {"id": nsr_id}) - - nsr_lcm["status"] = "DELETING" - nsr_lcm["status_detailed"] = "Deleting charms" - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - # TODO destroy VCA charm - - # remove from RO - RO = ROclient.ROClient(loop, endpoint_url=ro_account["url"], tenant=ro_account["tenant"], - datacenter=ns_request["vim"]) - # Delete ns - try: - RO_nsr_id = nsr_lcm["RO"]["nsr_id"] - if RO_nsr_id: - nsr_lcm["status_detailed"] = "Deleting ns at RO" - desc = await RO.delete("ns", RO_nsr_id) - print("debug", "deleted RO ns {}".format(RO_nsr_id)) - nsr_lcm["RO"]["nsr_id"] = None - nsr_lcm["RO"]["nsr_status"] = "DELETED" - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 404: - nsr_lcm["RO"]["nsr_id"] = None - nsr_lcm["RO"]["nsr_status"] = "DELETED" - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - print("warning", e) - else: - print("error", e) - - # Delete nsd - try: - RO_nsd_id = nsr_lcm["RO"]["nsd_id"] - if RO_nsd_id: - nsr_lcm["status_detailed"] = "Deleting nsd at RO" - desc = await RO.delete("nsd", RO_nsd_id) - print("debug", "deleted RO nsd {}".format(RO_nsd_id)) - nsr_lcm["RO"]["nsd_id"] = None - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 404: - nsr_lcm["RO"]["nsd_id"] = None - print("warning", e) - else: - print("error", e) - - for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items(): - try: - if RO_vnfd_id: - nsr_lcm["status_detailed"] = "Deleting vnfd at RO" - desc = await RO.delete("vnfd", RO_vnfd_id) - print("debug", "deleted RO vnfd {}".format(RO_vnfd_id)) - nsr_lcm["RO"]["vnfd_id"][vnf_id] = None - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 404: - nsr_lcm["RO"]["vnfd_id"][vnf_id] = None - print("warning", e) - else: - print("error", e) - logger.debug("DestroyNS task nsr_id={} Exit".format(nsr_id)) - - -async def test(loop, param=None): - logger.debug("Starting/Ending test task: {}".format(param)) - - -def cancel_tasks(loop, nsr_id): - """ - Cancel all active tasks of a concrete nsr identified for nsr_id - :param loop: loop - :param nsr_id: nsr identity - :return: None, or raises an exception if not possible - """ - global lcm_tasks - if not lcm_tasks.get(nsr_id): - return - for order_id, tasks_set in lcm_tasks[nsr_id].items(): - for task_name, task in tasks_set.items(): - result = task.cancel() - if result: - logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name)) - lcm_tasks[nsr_id] = {} - - - -async def read_kafka(loop, bus_info): - global lcm_tasks - logger.debug("kafka task Enter") - order_id = 1 - # future = asyncio.Future() - with open(bus_info["file"]) as f: - - # ignore old orders. Read file - command = "fake" - while command: - command = f.read() - - while True: - command = f.read() - if not command: - await asyncio.sleep(2, loop=loop) - continue - order_id += 1 - command = command.strip() - command, _, params = command.partition(" ") - if command == "exit": - print("Bye!") - break - elif command.startswith("#"): - continue - elif command == "echo": - print(params) - elif command == "test": - asyncio.Task(test(loop, params), loop=loop) - elif command == "break": - print("put a break in this line of code") - elif command == "new-ns": - nsr_id = params.strip() - logger.debug("Deploying NS {}".format(nsr_id)) - task = asyncio.ensure_future(CreateNS(loop, nsr_id)) - if nsr_id not in lcm_tasks: - lcm_tasks[nsr_id] = {} - lcm_tasks[nsr_id][order_id] = {"CreateNS": task} - elif command == "del-ns": - nsr_id = params.strip() - logger.debug("Deleting NS {}".format(nsr_id)) - cancel_tasks(loop, nsr_id) - task = asyncio.ensure_future(DestroyNS(loop, nsr_id)) - if nsr_id not in lcm_tasks: - lcm_tasks[nsr_id] = {} - lcm_tasks[nsr_id][order_id] = {"DestroyNS": task} - elif command == "get-ns": - nsr_id = params.strip() - nsr_lcm = db.get_one("nsr_lcm", {"id": nsr_id}) - print("nsr_lcm", nsr_lcm) - print("lcm_tasks", lcm_tasks.get(nsr_id)) - else: - logger.debug("unknown command '{}'".format(command)) - print("Usage:\n echo <>\n new-ns \n del-ns \n get-ns ") - logger.debug("kafka task Exit") - - -def lcm(): - loop = asyncio.get_event_loop() - loop.run_until_complete(read_kafka(loop, {"file": "/home/atierno/OSM/osm/NBI/kafka"})) - return - - -def lcm2(): - loop = asyncio.get_event_loop() - # asyncio.ensure_future(CreateNS, loop) - try: - content = loop.run_until_complete(CreateNS(loop, "ns1")) - print("Done: {}".format(content)) - except ROclient.ROClientException as e: - print("Error {}".format(e)) - - time.sleep(10) - - content = loop.run_until_complete(DestroyNS(loop, "ns1")) - print(content) - - loop.close() - - -if __name__ == '__main__': - - # FOR TEST - RO_VIM = "OST2_MRT" - - #FILL DATABASE - with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f: - vnfd = yaml.load(f) - vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) - vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"} - db.create("vnfd", vnfd_clean) - with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f: - vnfd = yaml.load(f) - vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) - vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"} - db.create("vnfd", vnfd_clean) - with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f: - nsd = yaml.load(f) - nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) - nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"} - db.create("nsd", nsd_clean) - - ns_request = { - "id": "ns1", - "nsr_id": "ns1", - "name": "pingpongOne", - "vim": RO_VIM, - "nsd_id": nsd_clean["id"], # nsd_ping_pong - } - db.create("ns_request", ns_request) - ns_request = { - "id": "ns2", - "nsr_id": "ns2", - "name": "pingpongTwo", - "vim": RO_VIM, - "nsd_id": nsd_clean["id"], # nsd_ping_pong - } - db.create("ns_request", ns_request) - # lcm2() - lcm() - - - diff --git a/lcm/osm_common/__init__.py b/lcm/osm_common/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lcm/osm_common/dbbase.py b/lcm/osm_common/dbbase.py new file mode 100644 index 00000000..a2768ae0 --- /dev/null +++ b/lcm/osm_common/dbbase.py @@ -0,0 +1,35 @@ + + +class DbException(Exception): + + def __init__(self, message, http_code=404): + # TODO change to http.HTTPStatus instead of int that allows .value and .name + self.http_code = http_code + Exception.__init__(self, message) + + +class dbbase(object): + + def __init__(self): + pass + + def db_connect(self, config): + pass + + def db_disconnect(self): + pass + + def get_list(self, table, filter={}): + pass + + def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True): + pass + + def create(self, table, indata): + pass + + def del_list(self, table, filter={}): + pass + + def del_one(self, table, filter={}, fail_on_empty=True): + pass diff --git a/lcm/osm_common/dbmemory.py b/lcm/osm_common/dbmemory.py new file mode 100644 index 00000000..75d1f883 --- /dev/null +++ b/lcm/osm_common/dbmemory.py @@ -0,0 +1,116 @@ +from dbbase import DbException, dbbase +from http import HTTPStatus +from uuid import uuid4 +from copy import deepcopy + + +class dbmemory(dbbase): + + def __init__(self): + self.db = {} + + @staticmethod + def _format_filter(filter): + return filter # TODO + + def _find(self, table, filter): + for i, row in enumerate(self.db.get(table, ())): + match = True + if filter: + for k, v in filter.items(): + if k not in row or v != row[k]: + match = False + if match: + yield i, row + + def get_list(self, table, filter={}): + try: + l = [] + for _, row in self._find(table, self._format_filter(filter)): + l.append(deepcopy(row)) + return l + except DbException: + raise + except Exception as e: # TODO refine + raise DbException(str(e)) + + def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True): + try: + l = None + for _, row in self._find(table, self._format_filter(filter)): + if not fail_on_more: + return deepcopy(row) + if l: + raise DbException("Found more than one entry with filter='{}'".format(filter), + HTTPStatus.CONFLICT.value) + l = row + if not l and fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return deepcopy(l) + except Exception as e: # TODO refine + raise DbException(str(e)) + + def del_list(self, table, filter={}): + try: + id_list = [] + for i, _ in self._find(table, self._format_filter(filter)): + id_list.append(i) + deleted = len(id_list) + for i in id_list: + del self.db[table][i] + return {"deleted": deleted} + except DbException: + raise + except Exception as e: # TODO refine + raise DbException(str(e)) + + def del_one(self, table, filter={}, fail_on_empty=True): + try: + for i, _ in self._find(table, self._format_filter(filter)): + break + else: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return None + del self.db[table][i] + return {"deleted": 1} + except Exception as e: # TODO refine + raise DbException(str(e)) + + def replace(self, table, filter, indata, fail_on_empty=True): + try: + for i, _ in self._find(table, self._format_filter(filter)): + break + else: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return None + self.db[table][i] = deepcopy(indata) + return {"upadted": 1} + except Exception as e: # TODO refine + raise DbException(str(e)) + + def create(self, table, indata): + try: + id = indata.get("_id") + if not id: + id = str(uuid4()) + indata["_id"] = id + if table not in self.db: + self.db[table] = [] + self.db[table].append(deepcopy(indata)) + return id + except Exception as e: # TODO refine + raise DbException(str(e)) + + +if __name__ == '__main__': + # some test code + db = dbmemory() + db.create("test", {"_id": 1, "data": 1}) + db.create("test", {"_id": 2, "data": 2}) + db.create("test", {"_id": 3, "data": 3}) + print("must be 3 items:", db.get_list("test")) + print("must return item 2:", db.get_list("test", {"_id": 2})) + db.del_one("test", {"_id": 2}) + print("must be emtpy:", db.get_list("test", {"_id": 2})) diff --git a/lcm/osm_common/dbmongo.py b/lcm/osm_common/dbmongo.py new file mode 100644 index 00000000..38454b3b --- /dev/null +++ b/lcm/osm_common/dbmongo.py @@ -0,0 +1,153 @@ +#import pymongo +from pymongo import MongoClient +from dbbase import DbException, dbbase +from http import HTTPStatus + +class dbmongo(dbbase): + + def __init__(self): + pass + + def db_connect(self, config): + try: + self.client = MongoClient(config["host"], config["port"]) + self.db = self.client[config["name"]] + # get data to try a connection + self.db.users.find_one({"username": "admin"}) + except Exception as e: # TODO refine + raise DbException(str(e)) + + def db_disconnect(self): + pass # TODO + + @staticmethod + def _format_filter(filter): + try: + db_filter = {} + for query_k, query_v in filter.items(): + dot_index = query_k.rfind(".") + if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont", + "ncont", "neq"): + operator = "$" + query_k[dot_index+1:] + if operator == "$neq": + operator = "$nq" + k = query_k[:dot_index] + else: + operator = "$eq" + k = query_k + + v = query_v + if isinstance(v, list): + if operator in ("$eq", "$cont"): + operator = "$in" + v = query_v + elif operator in ("$ne", "$ncont"): + operator = "$nin" + v = query_v + else: + v = query_v.join(",") + + if operator in ("$eq", "$cont"): + # v cannot be a comma separated list, because operator would have been changed to $in + db_filter[k] = v + elif operator == "$ncount": + # v cannot be a comma separated list, because operator would have been changed to $nin + db_filter[k] = {"$ne": v} + else: + # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8" + if k not in db_filter: + db_filter[k] = {} + db_filter[k][operator] = v + + return db_filter + except Exception as e: + raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e), + http_code=HTTPStatus.BAD_REQUEST.value) + + + def get_list(self, table, filter={}): + try: + l = [] + collection = self.db[table] + rows = collection.find(self._format_filter(filter)) + for row in rows: + l.append(row) + return l + except DbException: + raise + except Exception as e: # TODO refine + raise DbException(str(e)) + + def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True): + try: + if filter: + filter = self._format_filter(filter) + collection = self.db[table] + if not (fail_on_empty and fail_on_more): + return collection.find_one(filter) + rows = collection.find(filter) + if rows.count() == 0: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return None + elif rows.count() > 1: + if fail_on_more: + raise DbException("Found more than one entry with filter='{}'".format(filter), + HTTPStatus.CONFLICT.value) + return rows[0] + except Exception as e: # TODO refine + raise DbException(str(e)) + + def del_list(self, table, filter={}): + try: + collection = self.db[table] + rows = collection.delete_many(self._format_filter(filter)) + return {"deleted": rows.deleted_count} + except DbException: + raise + except Exception as e: # TODO refine + raise DbException(str(e)) + + def del_one(self, table, filter={}, fail_on_empty=True): + try: + collection = self.db[table] + rows = collection.delete_one(self._format_filter(filter)) + if rows.deleted_count == 0: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return None + return {"deleted": rows.deleted_count} + except Exception as e: # TODO refine + raise DbException(str(e)) + + def create(self, table, indata): + try: + collection = self.db[table] + data = collection.insert_one(indata) + return data.inserted_id + except Exception as e: # TODO refine + raise DbException(str(e)) + + def set_one(self, table, filter, update_dict, fail_on_empty=True): + try: + collection = self.db[table] + rows = collection.update_one(self._format_filter(filter), {"$set": update_dict}) + if rows.updated_count == 0: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return None + return {"deleted": rows.deleted_count} + except Exception as e: # TODO refine + raise DbException(str(e)) + + def replace(self, table, id, indata, fail_on_empty=True): + try: + collection = self.db[table] + rows = collection.replace_one({"_id": id}, indata) + if rows.modified_count == 0: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return None + return {"replace": rows.modified_count} + except Exception as e: # TODO refine + raise DbException(str(e)) diff --git a/lcm/osm_common/fsbase.py b/lcm/osm_common/fsbase.py new file mode 100644 index 00000000..85562bdd --- /dev/null +++ b/lcm/osm_common/fsbase.py @@ -0,0 +1,40 @@ + +class FsException(Exception): + + def __init__(self, message, http_code=404): + self.http_code = http_code + Exception.__init__(self, message) + + +class FsBase(object): + + def __init__(self): + pass + + def get_params(self): + return {} + + def fs_connect(self, config): + pass + + def fs_disconnect(self): + pass + + def mkdir(self, folder): + pass + + def file_exists(self, storage): + pass + + def file_size(self, storage): + pass + + def file_extract(self, tar_object, path): + pass + + def file_open(self, storage, mode): + pass + + def file_delete(self, storage, ignore_non_exist=False): + pass + diff --git a/lcm/osm_common/fslocal.py b/lcm/osm_common/fslocal.py new file mode 100644 index 00000000..b88475f5 --- /dev/null +++ b/lcm/osm_common/fslocal.py @@ -0,0 +1,108 @@ +import os +import tarfile +from http import HTTPStatus +from shutil import rmtree +from fsbase import FsBase, FsException + + +class FsLocal(FsBase): + + def __init__(self): + self.path = None + + def get_params(self): + return {"fs": "local", "path": self.path} + + def fs_connect(self, config): + try: + self.path = config["path"] + if not self.path.endswith("/"): + self.path += "/" + if not os.path.exists(self.path): + raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format( + config["path"])) + except FsException: + raise + except Exception as e: # TODO refine + raise FsException(str(e)) + + def fs_disconnect(self): + pass # TODO + + def mkdir(self, folder): + """ + Creates a folder or parent object location + :param folder: + :return: None or raises and exception + """ + try: + os.mkdir(self.path + folder) + except Exception as e: + raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) + + def file_exists(self, storage): + """ + Indicates if "storage" file exist + :param storage: can be a str or a str list + :return: True, False + """ + if isinstance(storage, str): + f = storage + else: + f = "/".join(storage) + return os.path.exists(self.path + f) + + def file_size(self, storage): + """ + return file size + :param storage: can be a str or a str list + :return: file size + """ + if isinstance(storage, str): + f = storage + else: + f = "/".join(storage) + return os.path.getsize(self.path + f) + + def file_extract(self, tar_object, path): + """ + extract a tar file + :param tar_object: object of type tar + :param path: can be a str or a str list, or a tar object where to extract the tar_object + :return: None + """ + if isinstance(path, str): + f = self.path + path + else: + f = self.path + "/".join(path) + tar_object.extractall(path=f) + + def file_open(self, storage, mode): + """ + Open a file + :param storage: can be a str or list of str + :param mode: file mode + :return: file object + """ + if isinstance(storage, str): + f = storage + else: + f = "/".join(storage) + return open(self.path + f, mode) + + def file_delete(self, storage, ignore_non_exist=False): + """ + Delete storage content recursivelly + :param storage: can be a str or list of str + :param ignore_non_exist: not raise exception if storage does not exist + :return: None + """ + + if isinstance(storage, str): + f = self.path + storage + else: + f = self.path + "/".join(storage) + if os.path.exists(f): + rmtree(f) + elif not ignore_non_exist: + raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.BAD_REQUEST.value) diff --git a/lcm/osm_common/msgbase.py b/lcm/osm_common/msgbase.py new file mode 100644 index 00000000..745df7f6 --- /dev/null +++ b/lcm/osm_common/msgbase.py @@ -0,0 +1,39 @@ + +from http import HTTPStatus + + +class MsgException(Exception): + """ + Base Exception class for all msgXXXX exceptions + """ + + def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR): + """ + General exception + :param message: descriptive text + :param http_code: type. It contains ".value" (http error code) and ".name" (http error name + """ + self.http_code = http_code + Exception.__init__(self, message) + + +class MsgBase(object): + """ + Base class for all msgXXXX classes + """ + + def __init__(self): + pass + + def connect(self, config): + pass + + def write(self, msg): + pass + + def read(self): + pass + + def disconnect(self): + pass + diff --git a/lcm/osm_common/msglocal.py b/lcm/osm_common/msglocal.py new file mode 100644 index 00000000..5045181f --- /dev/null +++ b/lcm/osm_common/msglocal.py @@ -0,0 +1,78 @@ +import os +import yaml +import asyncio +from msgbase import MsgBase, MsgException + + +class msgLocal(MsgBase): + + def __init__(self): + self.path = None + # create a different file for each topic + self.files = {} + + def connect(self, config): + try: + self.path = config["path"] + if not self.path.endswith("/"): + self.path += "/" + if not os.path.exists(self.path): + os.mkdir(self.path) + except MsgException: + raise + except Exception as e: # TODO refine + raise MsgException(str(e)) + + def disconnect(self): + for f in self.files.values(): + try: + f.close() + except Exception as e: # TODO refine + pass + + def write(self, topic, key, msg): + """ + Insert a message into topic + :param topic: topic + :param key: key text to be inserted + :param msg: value object to be inserted + :return: None or raises and exception + """ + try: + if topic not in self.files: + self.files[topic] = open(self.path + topic, "w+") + yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True) + self.files[topic].flush() + except Exception as e: # TODO refine + raise MsgException(str(e)) + + def read(self, topic): + try: + if topic not in self.files: + self.files[topic] = open(self.path + topic, "r+") + msg = self.files[topic].read() + msg_dict = yaml.load(msg) + assert len(msg_dict) == 1 + for k, v in msg_dict.items(): + return k, v + except Exception as e: # TODO refine + raise MsgException(str(e)) + + async def aioread(self, loop, topic): + try: + if topic not in self.files: + self.files[topic] = open(self.path + topic, "r+") + # ignore previous content + while self.files[topic].read(): + pass + while True: + msg = self.files[topic].read() + if msg: + break + await asyncio.sleep(2, loop=loop) + msg_dict = yaml.load(msg) + assert len(msg_dict) == 1 + for k, v in msg_dict.items(): + return k, v + except Exception as e: # TODO refine + raise MsgException(str(e)) diff --git a/lcm/osm_lcm/ROclient.py b/lcm/osm_lcm/ROclient.py new file mode 100644 index 00000000..84ce7aa1 --- /dev/null +++ b/lcm/osm_lcm/ROclient.py @@ -0,0 +1,880 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +## +# Copyright 2015 Telefónica Investigación y Desarrollo, S.A.U. +# This file is part of openmano +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# For those usages not covered by the Apache License, Version 2.0 please +# contact with: nfvlabs@tid.es +## + +""" +asyncio RO python client to interact with RO-server +""" + +import asyncio +import aiohttp + +import json +import yaml +import logging +import sys +from urllib.parse import quote +from uuid import UUID + +__author__ = "Alfonso Tierno, Pablo Montes" +__date__ = "$09-Jan-2018 09:09:48$" +__version__ = "0.1.0-r470" +version_date = "Jan 2018" +requests = None + +class ROClientException(Exception): + def __init__(self, message, http_code=400): + self.http_code = http_code + Exception.__init__(self, message) + """Common Exception for all openmano client exceptions""" + + +def remove_envelop(item, indata=None): + """ + Obtain the useful data removing the envelop. It goes through the vnfd or nsd catalog and returns the + vnfd or nsd content + :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' + :param indata: Content to be inspected + :return: the useful part of indata (a reference, not a new dictionay) plus boolean indicating if it was enveloped + """ + clean_indata = indata + enveloped = False + if not indata: + return {}, False + if item == "vnfd": + if clean_indata.get('vnfd:vnfd-catalog'): + enveloped = True + clean_indata = clean_indata['vnfd:vnfd-catalog'] + elif clean_indata.get('vnfd-catalog'): + enveloped = True + clean_indata = clean_indata['vnfd-catalog'] + if clean_indata.get('vnfd'): + if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1: + raise ROClientException("'vnfd' must be a list only one element") + clean_indata = clean_indata['vnfd'][0] + elif item == "nsd": + if clean_indata.get('nsd:nsd-catalog'): + enveloped = True + clean_indata = clean_indata['nsd:nsd-catalog'] + elif clean_indata.get('nsd-catalog'): + enveloped = True + clean_indata = clean_indata['nsd-catalog'] + if clean_indata.get('nsd'): + if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1: + raise ROClientException("'nsd' must be a list only one element") + clean_indata = clean_indata['nsd'][0] + elif item == "tenant": + if len(indata) == 1 and "tenant" in indata: + enveloped = True + clean_indata = indata["tenant"] + elif item == "vim" or item == "datacenter": + if len(indata) == 1 and "datacenter" in indata: + enveloped = True + clean_indata = indata["datacenter"] + elif item == "ns" or item == "instances": + if len(indata) == 1 and "instance" in indata: + enveloped = True + clean_indata = indata["instance"] + else: + assert False, "remove_envelop with unknown item {}".format(item) + + return clean_indata, enveloped + + +class ROClient: + headers_req = {'Accept': 'application/yaml', 'content-type': 'application/yaml'} + client_to_RO = {'tenant': 'tenants', 'vim': 'datacenters', 'vnfd': 'vnfs', 'nsd': 'scenarios', + 'ns': 'instances'} + mandatory_for_create = { + 'tenant': ("name", ), + 'vim': ("name", "vim_url"), + 'vnfd': ("name", "id", "connection-point", "vdu"), + 'nsd': ("name", "id", "constituent-vnfd"), + 'ns': ("name", "scenario", "datacenter"), + } + timeout_large = 120 + timeout_short = 30 + + def __init__(self, loop, endpoint_url, **kwargs): + self.loop = loop + self.endpoint_url = endpoint_url + + self.username = kwargs.get("username") + self.password = kwargs.get("password") + self.tenant_id_name = kwargs.get("tenant") + self.tenant = None + self.datacenter_id_name = kwargs.get("datacenter") + self.datacenter = None + self.logger = logging.getLogger(kwargs.get('logger', 'ROClient')) + if kwargs.get("debug"): + self.logger.setLevel(logging.DEBUG) + global requests + requests = kwargs.get("TODO remove") + + def __getitem__(self, index): + if index == 'tenant': + return self.tenant_id_name + elif index == 'datacenter': + return self.datacenter_id_name + elif index == 'username': + return self.username + elif index == 'password': + return self.password + elif index == 'endpoint_url': + return self.endpoint_url + else: + raise KeyError("Invalid key '%s'" %str(index)) + + def __setitem__(self,index, value): + if index == 'tenant': + self.tenant_id_name = value + elif index == 'datacenter': + self.datacenter_id_name = value + elif index == 'username': + self.username = value + elif index == 'password': + self.password = value + elif index == 'endpoint_url': + self.endpoint_url = value + else: + raise KeyError("Invalid key '{}'".format(index)) + self.tenant = None # force to reload tenant with different credentials + self.datacenter = None # force to reload datacenter with different credentials + + def _parse(self, descriptor, descriptor_format, response=False): + #try yaml + if descriptor_format and descriptor_format != "json" and descriptor_format != "yaml": + raise ROClientException("'descriptor_format' must be a 'json' or 'yaml' text") + if descriptor_format != "json": + try: + return yaml.load(descriptor) + except yaml.YAMLError as exc: + error_pos = "" + if hasattr(exc, 'problem_mark'): + mark = exc.problem_mark + error_pos = " at line:{} column:{}s".format(mark.line+1, mark.column+1) + error_text = "yaml format error" + error_pos + elif descriptor_format != "yaml": + try: + return json.loads(descriptor) + except Exception as e: + if response: + error_text = "json format error" + str(e) + + if response: + raise ROClientException(error_text) + raise ROClientException(error_text) + + def _parse_yaml(self, descriptor, response=False): + try: + return yaml.load(descriptor) + except yaml.YAMLError as exc: + error_pos = "" + if hasattr(exc, 'problem_mark'): + mark = exc.problem_mark + error_pos = " at line:{} column:{}s".format(mark.line+1, mark.column+1) + error_text = "yaml format error" + error_pos + if response: + raise ROClientException(error_text) + raise ROClientException(error_text) + + @staticmethod + def check_if_uuid(uuid_text): + """ + Check if text correspond to an uuid foramt + :param uuid_text: + :return: True if it is an uuid False if not + """ + try: + UUID(uuid_text) + return True + except (ValueError, TypeError): + return False + + @staticmethod + def _create_envelop(item, indata=None): + """ + Returns a new dict that incledes indata with the expected envelop + :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' + :param indata: Content to be enveloped + :return: a new dic with {: {indata} } where envelop can be e.g. tenant, datacenter, ... + """ + if item == "vnfd": + return {'vnfd-catalog': {'vnfd': [indata]}} + elif item == "nsd": + return {'nsd-catalog': {'nsd': [indata]}} + elif item == "tenant": + return {'tenant': indata} + elif item == "vim" or item == "datacenter": + return {'datacenter': indata} + elif item == "ns" or item == "instances": + return {'instance': indata} + else: + assert False, "_create_envelop with unknown item {}".format(item) + + @staticmethod + def check_ns_status(ns_descriptor): + """ + Inspect RO instance descriptor and indicates the status + :param ns_descriptor: instance descriptor obtained with self.show("ns", ) + :return: status, message: status can be BUILD,ACTIVE,ERROR, message is a text message + """ + net_total = 0 + vm_total = 0 + net_done = 0 + vm_done = 0 + + for net in ns_descriptor["nets"]: + net_total += 1 + if net["status"] == "ERROR": + return "ERROR", net["error_msg"] + elif net["status"] == "ACTIVE": + net_done += 1 + for vnf in ns_descriptor["vnfs"]: + for vm in vnf["vms"]: + vm_total += 1 + if vm["status"] == "ERROR": + return "ERROR", vm["error_msg"] + elif vm["status"] == "ACTIVE": + vm_done += 1 + + if net_total == net_done and vm_total == vm_done: + return "ACTIVE", "VMs {}, networks: {}".format(vm_total, net_total) + else: + return "BUILD", "VMs: {}/{}, networks: {}/{}".format(vm_done, vm_total, net_done, net_total) + + @staticmethod + def get_ns_vnf_ip(ns_descriptor): + """ + Get a dict with the IPs of every vnf. + :param ns_descriptor: instance descriptor obtained with self.show("ns", ) + :return: dict iwth key member_vnf_index, value ip_address + """ + ns_ip={} + for vnf in ns_descriptor["vnfs"]: + ns_ip[vnf["member_vnf_index"]] = vnf["ip_address"] + #uuid sce_vnf_id + # vnf[mgmt_access]: '{interface_id: cf3cbf00-385c-49b4-9a3f-b400b7b15dc6, vm_id: d0dd22a9-91ef-46f1-8e8f-8cf4b2d5b2d7}' + # vnf[vms] + return ns_ip + + async def get_list(self, item, all_tenants=False, filter_by=None): + """ + Obtain a list of items filtering by the specigy filter_by. + :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' + :param all_tenants: True if not filtering by tenant. Only allowed for admin + :param filter_by: dictionary with filtering + :return: a list of dict. It can be empty. Raises ROClientException on Error, + """ + if item not in self.client_to_RO: + raise ROClientException("Invalid item {}".format(item)) + if item == 'tenant': + all_tenants = None + with aiohttp.ClientSession(loop=self.loop) as session: + content = await self._list_item(session, self.client_to_RO[item], all_tenants=all_tenants, + filter_dict=filter_by) + if isinstance(content, dict): + if len(content) == 1: + return content.values()[0] + else: + raise ROClientException("Output not a list neither dict with len equal 1", http_code=500) + return content + + async def _get_item_uuid(self, session, item, item_id_name, all_tenants=False): + if all_tenants: + tenant_text = "/any" + elif all_tenants is None: + tenant_text = "" + else: + if not self.tenant: + await self._get_tenant(session) + tenant_text = "/" + self.tenant + + item_id = 0 + url = "{}{}/{}".format(self.endpoint_url, tenant_text, item) + if self.check_if_uuid(item_id_name): + item_id = item_id_name + url += "/" + item_id_name + elif item_id_name and item_id_name.startswith("'") and item_id_name.endswith("'"): + item_id_name = item_id_name[1:-1] + self.logger.debug("openmano GET %s", url) + with aiohttp.Timeout(self.timeout_short): + async with session.get(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status == 404: # NOT_FOUND + raise ROClientException("No {} found with id '{}'".format(item[:-1], item_id_name), + http_code=404) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + content = self._parse_yaml(response_text, response=True) + + if item_id: + return item_id + desc = content[item] + assert isinstance(desc, list), "_get_item_uuid get a non dict with a list inside {}".format(type(desc)) + uuid = None + for i in desc: + if item_id_name and i["name"] != item_id_name: + continue + if uuid: # found more than one + raise ROClientException( + "Found more than one {} with name '{}'. uuid must be used".format(item, item_id_name), + http_code=404) + uuid = i["uuid"] + if not uuid: + raise ROClientException("No {} found with name '{}'".format(item[:-1], item_id_name), http_code=404) + return uuid + + async def _get_item(self, session, item, item_id_name, all_tenants=False): + if all_tenants: + tenant_text = "/any" + elif all_tenants is None: + tenant_text = "" + else: + if not self.tenant: + await self._get_tenant(session) + tenant_text = "/" + self.tenant + + if self.check_if_uuid(item_id_name): + uuid = item_id_name + else: + # check that exist + uuid = self._get_item_uuid(session, item, item_id_name, all_tenants) + + url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid) + self.logger.debug("GET %s", url ) + with aiohttp.Timeout(self.timeout_short): + async with session.get(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + + return self._parse_yaml(response_text, response=True) + + async def _get_tenant(self, session): + if not self.tenant: + self.tenant = await self._get_item_uuid(session, "tenants", self.tenant_id_name, None) + return self.tenant + + async def _get_datacenter(self, session): + if not self.tenant: + await self._get_tenant(session) + if not self.datacenter: + self.datacenter = await self._get_item_uuid(session, "datacenters", self.datacenter_id_name, True) + return self.datacenter + + async def _create_item(self, session, item, descriptor, all_tenants=False): + if all_tenants: + tenant_text = "/any" + elif all_tenants is None: + tenant_text = "" + else: + if not self.tenant: + await self._get_tenant(session) + tenant_text = "/" + self.tenant + payload_req = yaml.safe_dump(descriptor) + + api_version_text = "" + if item == "vnfs": + # assumes version v3 only + api_version_text = "/v3" + item = "vnfd" + elif item == "scenarios": + # assumes version v3 only + api_version_text = "/v3" + item = "nsd" + + #print payload_req + + url = "{}{apiver}{tenant}/{item}".format(self.endpoint_url, apiver=api_version_text, tenant=tenant_text, + item=item) + self.logger.debug("openmano POST %s %s", url, payload_req) + with aiohttp.Timeout(self.timeout_large): + async with session.post(url, headers=self.headers_req, data=payload_req) as response: + response_text = await response.read() + self.logger.debug("POST {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + + response_desc = self._parse_yaml(response_text, response=True) + desc, _ = remove_envelop(item, response_desc) + return desc + + async def _del_item(self, session, item, item_id_name, all_tenants=False): + if all_tenants: + tenant_text = "/any" + elif all_tenants is None: + tenant_text = "" + else: + if not self.tenant: + await self._get_tenant(session) + tenant_text = "/" + self.tenant + if not self.check_if_uuid(item_id_name): + # check that exist + uuid = await self._get_item_uuid(session, item, item_id_name, all_tenants) + else: + uuid = item_id_name + + url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid) + self.logger.debug("DELETE %s", url) + with aiohttp.Timeout(self.timeout_short): + async with session.delete(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug("DELETE {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + return self._parse_yaml(response_text, response=True) + + async def _list_item(self, session, item, all_tenants=False, filter_dict=None): + if all_tenants: + tenant_text = "/any" + elif all_tenants is None: + tenant_text = "" + else: + if not self.tenant: + await self._get_tenant(session) + tenant_text = "/" + self.tenant + + url = "{}{}/{}".format(self.endpoint_url, tenant_text, item) + separator = "?" + if filter_dict: + for k in filter_dict: + url += separator + quote(str(k)) + "=" + quote(str(filter_dict[k])) + separator = "&" + self.logger.debug("openmano GET %s", url) + with aiohttp.Timeout(self.timeout_short): + async with session.get(url, headers=self.headers_req) as response: + response_text = await response.read() + self.logger.debug("GET {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + return self._parse_yaml(response_text, response=True) + + async def _edit_item(self, session, item, descriptor, item_id_name, all_tenants=False): + if all_tenants: + tenant_text = "/any" + elif all_tenants is None: + tenant_text = "" + else: + if not self.tenant: + await self._get_tenant(session) + tenant_text = "/" + self.tenant + + if not uuid: + #check that exist + uuid = self._get_item_uuid(session, "tenants", item_id_name, all_tenants) + + payload_req = yaml.safe_dump(descriptor) + + #print payload_req + + url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid) + self.logger.debug("openmano PUT %s %s", url, payload_req) + with aiohttp.Timeout(self.timeout_large): + async with session.put(url, headers=self.headers_req, data=payload_req) as response: + response_text = await response.read() + self.logger.debug("PUT {} [{}] {}".format(url, response.status, response_text[:100])) + if response.status >= 300: + raise ROClientException(response_text, http_code=response.status) + return self._parse_yaml(response_text, response=True) + + async def show(self, item, item_id_name=None, all_tenants=False): + """ + Obtain the information of an item from its id or name + :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' + :param item_id_name: RO id or name of the item. Raise and exception if more than one found + :param all_tenants: True if not filtering by tenant. Only allowed for admin + :return: dictionary with the information or raises ROClientException on Error, NotFound, found several + """ + if item not in self.client_to_RO: + raise ROClientException("Invalid item {}".format(item)) + if item == 'tenant': + all_tenants = None + + with aiohttp.ClientSession(loop=self.loop) as session: + content = await self._get_item(session, self.client_to_RO[item], item_id_name, all_tenants=all_tenants) + if len(content) == 1: + return content.values()[0] + else: + return content + + async def delete(self, item, item_id_name=None, all_tenants=False): + """ + Delete the information of an item from its id or name + :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' + :param item_id_name: RO id or name of the item. Raise and exception if more than one found + :param all_tenants: True if not filtering by tenant. Only allowed for admin + :return: dictionary with the information or raises ROClientException on Error, NotFound, found several + """ + if item not in self.client_to_RO: + raise ROClientException("Invalid item {}".format(item)) + if item == 'tenant': + all_tenants = None + + with aiohttp.ClientSession(loop=self.loop) as session: + if item == 'vim': + # check that exist + item_id = await self._get_item_uuid(session, "datacenters", item_id_name, all_tenants=True) + all_tenants = None + return await self._del_item(session, self.client_to_RO[item], item_id_name, all_tenants=all_tenants) + + async def create(self, item, descriptor=None, descriptor_format=None, **kwargs): + """ + Creates an item from its descriptor + :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns' + :param descriptor: can be a dict, or a yaml/json text. Autodetect unless descriptor_format is provided + :param descriptor_format: Can be 'json' or 'yaml' + :param kwargs: Overrides descriptor with values as name, description, vim_url, vim_url_admin, vim_type + keys can be a dot separated list to specify elements inside dict + :return: dictionary with the information or raises ROClientException on Error + """ + if isinstance(descriptor, str): + descriptor = self._parse(descriptor, descriptor_format) + elif descriptor: + pass + else: + descriptor = {} + + if item not in self.client_to_RO: + raise ROClientException("Invalid item {}".format(item)) + desc, enveloped = remove_envelop(item, descriptor) + + # Override descriptor with kwargs + if kwargs: + try: + for k, v in kwargs.items(): + update_content = desc + kitem_old = None + klist = k.split(".") + for kitem in klist: + if kitem_old is not None: + update_content = update_content[kitem_old] + if isinstance(update_content, dict): + kitem_old = kitem + elif isinstance(update_content, list): + kitem_old = int(kitem) + else: + raise ROClientException( + "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem)) + update_content[kitem_old] = v + except KeyError: + raise ROClientException( + "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old)) + except ValueError: + raise ROClientException("Invalid query string '{}'. Expected integer index list instead of '{}'".format( + k, kitem)) + except IndexError: + raise ROClientException( + "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old)) + + for mandatory in self.mandatory_for_create[item]: + if mandatory not in desc: + raise ROClientException("'{}' is mandatory parameter for {}".format(mandatory, item)) + + all_tenants = False + if item in ('tenant', 'vim'): + all_tenants = None + + if not enveloped: + create_desc = self._create_envelop(item, desc) + else: + create_desc = descriptor + + with aiohttp.ClientSession(loop=self.loop) as session: + return await self._create_item(session, self.client_to_RO[item], create_desc, all_tenants) + + def edit_tenant(self, uuid=None, name=None, descriptor=None, descriptor_format=None, new_name=None, new_description=None): + """Edit the parameters of a tenant + Params: must supply a descriptor or/and a new_name or new_description + uuid or/and name. If only name is supplied, there must be only one or an exception is raised + descriptor: with format {'tenant':{params to change info}} + must be a dictionary or a json/yaml text. + name: the tenant name. Overwrite descriptor name if any + description: tenant descriptor.. Overwrite descriptor description if any + Return: Raises an exception on error, not found or found several + Obtain a dictionary with format {'tenant':{newtenant_info}} + """ + # TODO revise + if isinstance(descriptor, str): + descriptor = self.parse(descriptor, descriptor_format) + elif descriptor: + pass + elif new_name or new_description: + descriptor={"tenant": {}} + else: + raise ROClientException("Missing descriptor") + + if 'tenant' not in descriptor or len(descriptor)!=1: + raise ROClientException("Descriptor must contain only one 'tenant' field") + if new_name: + descriptor['tenant']['name'] = new_name + if new_description: + descriptor['tenant']['description'] = new_description + + return self._edit_item("tenants", descriptor, uuid, name, all_tenants=None) + + #DATACENTERS + + def edit_datacenter(self, uuid=None, name=None, descriptor=None, descriptor_format=None, all_tenants=False, **kwargs): + """Edit the parameters of a datacenter + Params: must supply a descriptor or/and a parameter to change + uuid or/and name. If only name is supplied, there must be only one or an exception is raised + descriptor: with format {'datacenter':{params to change info}} + must be a dictionary or a json/yaml text. + parameters to change can be supplyied by the descriptor or as parameters: + new_name: the datacenter name + vim_url: the datacenter URL + vim_url_admin: the datacenter URL for administrative issues + vim_type: the datacenter type, can be openstack or openvim. + public: boolean, available to other tenants + description: datacenter description + Return: Raises an exception on error, not found or found several + Obtain a dictionary with format {'datacenter':{new_datacenter_info}} + """ + + if isinstance(descriptor, str): + descriptor = self.parse(descriptor, descriptor_format) + elif descriptor: + pass + elif kwargs: + descriptor={"datacenter": {}} + else: + raise ROClientException("Missing descriptor") + + if 'datacenter' not in descriptor or len(descriptor)!=1: + raise ROClientException("Descriptor must contain only one 'datacenter' field") + for param in kwargs: + if param=='new_name': + descriptor['datacenter']['name'] = kwargs[param] + else: + descriptor['datacenter'][param] = kwargs[param] + return self._edit_item("datacenters", descriptor, uuid, name, all_tenants=None) + + def attach_datacenter(self, uuid_name=None, descriptor=None, descriptor_format=None, vim_user=None, vim_password=None, vim_tenant_name=None, vim_tenant_id=None): + #check that exist + uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=True) + tenant_text = "/"+self._get_tenant() + + if isinstance(descriptor, str): + descriptor = self.parse(descriptor, descriptor_format) + elif descriptor: + pass + elif vim_user or vim_password or vim_tenant_name or vim_tenant_id: + descriptor={"datacenter": {}} + else: + raise ROClientException("Missing descriptor or params") + + if vim_user or vim_password or vim_tenant_name or vim_tenant_id: + #print args.name + try: + if vim_user: + descriptor['datacenter']['vim_user'] = vim_user + if vim_password: + descriptor['datacenter']['vim_password'] = vim_password + if vim_tenant_name: + descriptor['datacenter']['vim_tenant_name'] = vim_tenant_name + if vim_tenant_id: + descriptor['datacenter']['vim_tenant'] = vim_tenant_id + except (KeyError, TypeError) as e: + if str(e)=='datacenter': error_pos= "missing field 'datacenter'" + else: error_pos="wrong format" + raise ROClientException("Wrong datacenter descriptor: " + error_pos) + + payload_req = yaml.safe_dump(descriptor) + #print payload_req + url = "{}{}/datacenters/{}".format(self.endpoint_url, tenant_text, uuid) + self.logger.debug("openmano POST %s %s", url, payload_req) + mano_response = requests.post(url, headers = self.headers_req, data=payload_req) + self.logger.debug("openmano response: %s", mano_response.text ) + + content = self._parse_yaml(mano_response.text, response=True) + if mano_response.status_code==200: + return content + else: + raise ROClientException(str(content), http_code=mano_response.status) + + def detach_datacenter(self, uuid_name=None): + if not uuid: + #check that exist + uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=False) + tenant_text = "/"+self._get_tenant() + url = "{}{}/datacenters/{}".format(self.endpoint_url, tenant_text, uuid) + self.logger.debug("openmano DELETE %s", url) + mano_response = requests.delete(url, headers = self.headers_req) + self.logger.debug("openmano response: %s", mano_response.text ) + + content = self._parse_yaml(mano_response.text, response=True) + if mano_response.status_code==200: + return content + else: + raise ROClientException(str(content), http_code=mano_response.status) + + #VNFS + + def edit_scenario(self, uuid=None, name=None, descriptor=None, descriptor_format=None, all_tenants=False, **kwargs): + """Edit the parameters of a scenario + Params: must supply a descriptor or/and a parameters to change + uuid or/and name. If only name is supplied, there must be only one or an exception is raised + descriptor: with format {'scenario':{params to change info}} + must be a dictionary or a json/yaml text. + parameters to change can be supplyied by the descriptor or as parameters: + new_name: the scenario name + public: boolean, available to other tenants + description: scenario description + tenant_id. Propietary tenant + Return: Raises an exception on error, not found or found several + Obtain a dictionary with format {'scenario':{new_scenario_info}} + """ + + if isinstance(descriptor, str): + descriptor = self.parse(descriptor, descriptor_format) + elif descriptor: + pass + elif kwargs: + descriptor={"scenario": {}} + else: + raise ROClientException("Missing descriptor") + + if 'scenario' not in descriptor or len(descriptor)>2: + raise ROClientException("Descriptor must contain only one 'scenario' field") + for param in kwargs: + if param=='new_name': + descriptor['scenario']['name'] = kwargs[param] + else: + descriptor['scenario'][param] = kwargs[param] + return self._edit_item("scenarios", descriptor, uuid, name, all_tenants=None) + + #VIM ACTIONS + def vim_action(self, action, item, uuid=None, all_tenants=False, **kwargs): + """Perform an action over a vim + Params: + action: can be 'list', 'get'/'show', 'delete' or 'create' + item: can be 'tenants' or 'networks' + uuid: uuid of the tenant/net to show or to delete. Ignore otherwise + other parameters: + datacenter_name, datacenter_id: datacenters to act on, if missing uses classes store datacenter + descriptor, descriptor_format: descriptor needed on creation, can be a dict or a yaml/json str + must be a dictionary or a json/yaml text. + name: for created tenant/net Overwrite descriptor name if any + description: tenant descriptor. Overwrite descriptor description if any + + Return: Raises an exception on error + Obtain a dictionary with format {'tenant':{new_tenant_info}} + """ + if item not in ("tenants", "networks", "images"): + raise ROClientException("Unknown value for item '{}', must be 'tenants', 'nets' or " + "images".format(str(item))) + + image_actions = ['list','get','show','delete'] + if item == "images" and action not in image_actions: + raise ROClientException("Only available actions for item '{}' are {}\n" + "Requested action was '{}'".format(item, ', '.join(image_actions), action)) + if all_tenants: + tenant_text = "/any" + else: + tenant_text = "/"+self._get_tenant() + + if "datacenter_id" in kwargs or "datacenter_name" in kwargs: + datacenter = self._get_item_uuid(session, "datacenters", kwargs.get("datacenter"), all_tenants=all_tenants) + else: + datacenter = self.get_datacenter(session) + + if action=="list": + url = "{}{}/vim/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item) + self.logger.debug("GET %s", url ) + mano_response = requests.get(url, headers=self.headers_req) + self.logger.debug("openmano response: %s", mano_response.text ) + content = self._parse_yaml(mano_response.text, response=True) + if mano_response.status_code==200: + return content + else: + raise ROClientException(str(content), http_code=mano_response.status) + elif action=="get" or action=="show": + url = "{}{}/vim/{}/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item, uuid) + self.logger.debug("GET %s", url ) + mano_response = requests.get(url, headers=self.headers_req) + self.logger.debug("openmano response: %s", mano_response.text ) + content = self._parse_yaml(mano_response.text, response=True) + if mano_response.status_code==200: + return content + else: + raise ROClientException(str(content), http_code=mano_response.status) + elif action=="delete": + url = "{}{}/vim/{}/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item, uuid) + self.logger.debug("DELETE %s", url ) + mano_response = requests.delete(url, headers=self.headers_req) + self.logger.debug("openmano response: %s", mano_response.text ) + content = self._parse_yaml(mano_response.text, response=True) + if mano_response.status_code==200: + return content + else: + raise ROClientException(str(content), http_code=mano_response.status) + elif action=="create": + if "descriptor" in kwargs: + if isinstance(kwargs["descriptor"], str): + descriptor = self._parse(kwargs["descriptor"], kwargs.get("descriptor_format") ) + else: + descriptor = kwargs["descriptor"] + elif "name" in kwargs: + descriptor={item[:-1]: {"name": kwargs["name"]}} + else: + raise ROClientException("Missing descriptor") + + if item[:-1] not in descriptor or len(descriptor)!=1: + raise ROClientException("Descriptor must contain only one 'tenant' field") + if "name" in kwargs: + descriptor[ item[:-1] ]['name'] = kwargs["name"] + if "description" in kwargs: + descriptor[ item[:-1] ]['description'] = kwargs["description"] + payload_req = yaml.safe_dump(descriptor) + #print payload_req + url = "{}{}/vim/{}/{}".format(self.endpoint_url, tenant_text, datacenter, item) + self.logger.debug("openmano POST %s %s", url, payload_req) + mano_response = requests.post(url, headers = self.headers_req, data=payload_req) + self.logger.debug("openmano response: %s", mano_response.text ) + content = self._parse_yaml(mano_response.text, response=True) + if mano_response.status_code==200: + return content + else: + raise ROClientException(str(content), http_code=mano_response.status) + else: + raise ROClientException("Unknown value for action '{}".format(str(action))) + + +if __name__ == '__main__': + RO_URL = "http://localhost:9090/openmano" + RO_TENANT = "2c94f639-cefc-4f3a-a8f9-bbab0471946a" + RO_VIM = "3e70deb6-aea1-11e7-af13-080027429aaf" + + streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s" + logging.basicConfig(format=streamformat) + + loop = asyncio.get_event_loop() + myClient = ROClient(endpoint_url=RO_URL, loop=loop, tenant_id=RO_TENANT, datacenter_id=RO_VIM, debug=True) + content = loop.run_until_complete(myClient.list_tenants()) + print(content) + loop.close() + + diff --git a/lcm/osm_lcm/lcm.cfg b/lcm/osm_lcm/lcm.cfg new file mode 100644 index 00000000..c93323f5 --- /dev/null +++ b/lcm/osm_lcm/lcm.cfg @@ -0,0 +1,45 @@ + +# TODO currently is a pure yaml format. Transform it to [ini] style with yaml inside to be coherent with other modules + +#[global] +global: + log_file: "" + log_level: DEBUG + +#[RO] +RO: + #host: ro # hostname or IP + host: localhost + port: 9090 + tenant: osm + +#[VCA] +VCA: + host: vca + port: 17070 + user: admin + secret: secret + +#[database] +database: + #driver: mongo # mongo or memory + driver: memory + host: mongo # hostname or IP + port: 27017 + name: osm + user: user + password: password + +#[storage] +storage: + driver: local # local filesystem + # for local provide file path + #path: /app/storage + path: /home/atierno/OSM/osm/RO/lcm/local/storage + +#[message] +message: + driver: local # local or kafka + # for local provide file path + #path: /app/storage/kafka + path: /home/atierno/OSM/osm/RO/lcm/local/kafka diff --git a/lcm/osm_lcm/lcm.py b/lcm/osm_lcm/lcm.py new file mode 100644 index 00000000..f35ec606 --- /dev/null +++ b/lcm/osm_lcm/lcm.py @@ -0,0 +1,403 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +import asyncio +import yaml +import ROclient +import dbmemory +import dbmongo +import fslocal +import msglocal +from dbbase import DbException +from fsbase import FsException +from msgbase import MsgException +import logging + +#streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s" +streamformat = "%(name)s %(levelname)s: %(message)s" +logging.basicConfig(format=streamformat, level=logging.DEBUG) + + +class LcmException(Exception): + pass + + +class Lcm: + + def __init__(self, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + # contains created tasks/futures to be able to cancel + self.lcm_tasks = {} + + self.config = config + # logging + self.logger = logging.getLogger('lcm') + self.config = config + self.ro_url = "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]) + self.ro_tenant = config["RO"]["tenant"] + self.vca = config["VCA"] # TODO VCA + self.loop = None + try: + if config["database"]["driver"] == "mongo": + self.db = dbmongo.dbmongo() + self.db.db_connect(config["database"]) + elif config["database"]["driver"] == "memory": + self.db = dbmemory.dbmemory() + self.db.db_connect(config["database"]) + else: + raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format( + config["database"]["driver"])) + + if config["storage"]["driver"] == "local": + self.fs = fslocal.FsLocal() + self.fs.fs_connect(config["storage"]) + else: + raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format( + config["storage"]["driver"])) + + if config["message"]["driver"] == "local": + self.msg = msglocal.msgLocal() + self.msg.connect(config["message"]) + else: + raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format( + config["storage"]["driver"])) + except (DbException, FsException, MsgException) as e: + self.self.logger.critical(str(e), exc_info=True) + raise LcmException(str(e)) + + async def create_ns(self, nsr_id): + self.logger.debug("create_ns task nsr_id={} Enter".format(nsr_id)) + nsr_lcm = { + "id": nsr_id, + "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"}, + "nsr_ip": {}, + "VCA": {"TODO"}, + "status": "BUILD", + "status_detailed": "", + } + + deloyment_timeout = 120 + try: + ns_request = self.db.get_one("ns_request", {"id": nsr_id}) + nsd = self.db.get_one("nsd", {"id": ns_request["nsd_id"]}) + RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant, + datacenter=ns_request["vim"]) + nsr_lcm["status_detailed"] = "Creating vnfd at RO" + # ns_request["constituent-vnfr-ref"] = [] + + self.db.create("nsr_lcm", nsr_lcm) + + # get vnfds, instantiate at RO + self.logger.debug("create_ns task nsr_id={} RO VNFD".format(nsr_id)) + for c_vnf in nsd["constituent-vnfd"]: + vnfd_id = c_vnf["vnfd-id-ref"] + vnfd = self.db.get_one("vnfd", {"id": vnfd_id}) + vnfd.pop("_admin", None) + vnfd.pop("_id", None) + # vnfr = deepcopy(vnfd) + # vnfr["member-vnf-index"] = c_vnf["member-vnf-index"] + # vnfr["nsr-id"] = nsr_id + # vnfr["id"] = uuid4() + # vnfr["vnf-id"] = vnfd["id"] + # ns_request["constituent-vnfr-ref"],append(vnfd_id) + + # TODO change id for RO in case it is present + try: + desc = await RO.create("vnfd", descriptor=vnfd) + nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"] + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 409: # conflict, vnfd already present + print("debug", e) + else: + raise + + # db_new("vnfr", vnfr) + # db_update("ns_request", nsr_id, ns_request) + + # create nsd at RO + self.logger.debug("create_ns task nsr_id={} RO NSD".format(nsr_id)) + nsr_lcm["status_detailed"] = "Creating nsd at RO" + nsd_id = ns_request["nsd_id"] + nsd = self.db.get_one("nsd", {"id": nsd_id}) + nsd.pop("_admin", None) + nsd.pop("_id", None) + try: + desc = await RO.create("nsd", descriptor=nsd) + nsr_lcm["RO"]["nsd_id"] = desc["uuid"] + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 409: # conflict, nsd already present + print("debug", e) + else: + raise + + # Crate ns at RO + self.logger.debug("create_ns task nsr_id={} RO NS".format(nsr_id)) + nsr_lcm["status_detailed"] = "Creating ns at RO" + desc = await RO.create("ns", name=ns_request["name"], datacenter=ns_request["vim"], scenario=nsr_lcm["RO"]["nsd_id"]) + RO_nsr_id = desc["uuid"] + nsr_lcm["RO"]["nsr_id"] = RO_nsr_id + nsr_lcm["RO"]["nsr_status"] = "BUILD" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + + # wait until NS is ready + deloyment_timeout = 600 + while deloyment_timeout > 0: + ns_status_detailed = "Waiting ns ready at RO" + nsr_lcm["status_detailed"] = ns_status_detailed + desc = await RO.show("ns", RO_nsr_id) + ns_status, ns_status_info = RO.check_ns_status(desc) + nsr_lcm["RO"]["nsr_status"] = ns_status + if ns_status == "ERROR": + raise ROclient.ROClientException(ns_status_info) + elif ns_status == "BUILD": + nsr_lcm["status_detailed"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info) + elif ns_status == "ACTIVE": + nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc) + break + else: + assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status) + + await asyncio.sleep(5, loop=self.loop) + deloyment_timeout -= 5 + if deloyment_timeout <= 0: + raise ROclient.ROClientException("Timeot wating ns to be ready") + nsr_lcm["status_detailed"] = "Configuring vnfr" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + + #for nsd in nsr_lcm["descriptors"]["nsd"]: + + self.logger.debug("create_ns task nsr_id={} VCA look for".format(nsr_id)) + for c_vnf in nsd["constituent-vnfd"]: + vnfd_id = c_vnf["vnfd-id-ref"] + vnfd_index = int(c_vnf["member-vnf-index"]) + vnfd = self.db.get_one("vnfd", {"id": vnfd_id}) + if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"): + proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"] + config_primitive = vnfd["vnf-configuration"].get("config-primitive") + # get parameters for juju charm + base_folder = vnfd["_admin"]["storage"] + path = base_folder + "/charms/" + proxy_charm + mgmt_ip = nsr_lcm['nsr_ip'][vnfd_index] + # TODO launch VCA charm + # task = asyncio.ensure_future(DeployCharm(self.loop, path, mgmt_ip, config_primitive)) + nsr_lcm["status"] = "DONE" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + + return nsr_lcm + + except (ROclient.ROClientException, Exception) as e: + self.logger.debug("create_ns nsr_id={} Exception {}".format(nsr_id, e), exc_info=True) + nsr_lcm["status"] = "ERROR" + nsr_lcm["status_detailed"] += ": ERROR {}".format(e) + finally: + self.logger.debug("create_ns task nsr_id={} Exit".format(nsr_id)) + + + async def delete_ns(self, nsr_id): + self.logger.debug("delete_ns task nsr_id={} Enter".format(nsr_id)) + nsr_lcm = self.db.get_one("nsr_lcm", {"id": nsr_id}) + ns_request = self.db.get_one("ns_request", {"id": nsr_id}) + + nsr_lcm["status"] = "DELETING" + nsr_lcm["status_detailed"] = "Deleting charms" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + # TODO destroy VCA charm + + # remove from RO + RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant, + datacenter=ns_request["vim"]) + # Delete ns + try: + RO_nsr_id = nsr_lcm["RO"]["nsr_id"] + if RO_nsr_id: + nsr_lcm["status_detailed"] = "Deleting ns at RO" + desc = await RO.delete("ns", RO_nsr_id) + print("debug", "deleted RO ns {}".format(RO_nsr_id)) + nsr_lcm["RO"]["nsr_id"] = None + nsr_lcm["RO"]["nsr_status"] = "DELETED" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 404: + nsr_lcm["RO"]["nsr_id"] = None + nsr_lcm["RO"]["nsr_status"] = "DELETED" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + print("warning", e) + else: + print("error", e) + + # Delete nsd + try: + RO_nsd_id = nsr_lcm["RO"]["nsd_id"] + if RO_nsd_id: + nsr_lcm["status_detailed"] = "Deleting nsd at RO" + desc = await RO.delete("nsd", RO_nsd_id) + print("debug", "deleted RO nsd {}".format(RO_nsd_id)) + nsr_lcm["RO"]["nsd_id"] = None + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 404: + nsr_lcm["RO"]["nsd_id"] = None + print("warning", e) + else: + print("error", e) + + for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items(): + try: + if RO_vnfd_id: + nsr_lcm["status_detailed"] = "Deleting vnfd at RO" + desc = await RO.delete("vnfd", RO_vnfd_id) + print("debug", "deleted RO vnfd {}".format(RO_vnfd_id)) + nsr_lcm["RO"]["vnfd_id"][vnf_id] = None + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 404: + nsr_lcm["RO"]["vnfd_id"][vnf_id] = None + print("warning", e) + else: + print("error", e) + self.logger.debug("delete_ns task nsr_id={} Exit".format(nsr_id)) + + + async def test(self, param=None): + self.logger.debug("Starting/Ending test task: {}".format(param)) + + + def cancel_tasks(self, nsr_id): + """ + Cancel all active tasks of a concrete nsr identified for nsr_id + :param nsr_id: nsr identity + :return: None, or raises an exception if not possible + """ + if not self.lcm_tasks.get(nsr_id): + return + for order_id, tasks_set in self.lcm_tasks[nsr_id].items(): + for task_name, task in tasks_set.items(): + result = task.cancel() + if result: + self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name)) + self.lcm_tasks[nsr_id] = {} + + + + async def read_kafka(self): + self.logger.debug("kafka task Enter") + order_id = 1 + # future = asyncio.Future() + + while True: + command, params = await self.msg.aioread(self.loop, "ns") + order_id += 1 + if command == "exit": + print("Bye!") + break + elif command.startswith("#"): + continue + elif command == "echo": + print(params) + elif command == "test": + asyncio.Task(self.test(params), loop=self.loop) + elif command == "break": + print("put a break in this line of code") + elif command == "create": + nsr_id = params.strip() + self.logger.debug("Deploying NS {}".format(nsr_id)) + task = asyncio.ensure_future(self.create_ns(nsr_id)) + if nsr_id not in self.lcm_tasks: + self.lcm_tasks[nsr_id] = {} + self.lcm_tasks[nsr_id][order_id] = {"create_ns": task} + elif command == "delete": + nsr_id = params.strip() + self.logger.debug("Deleting NS {}".format(nsr_id)) + self.cancel_tasks(nsr_id) + task = asyncio.ensure_future(self.delete_ns(nsr_id)) + if nsr_id not in self.lcm_tasks: + self.lcm_tasks[nsr_id] = {} + self.lcm_tasks[nsr_id][order_id] = {"delete_ns": task} + elif command == "show": + nsr_id = params.strip() + nsr_lcm = self.db.get_one("nsr_lcm", {"id": nsr_id}) + print("nsr_lcm", nsr_lcm) + print("self.lcm_tasks", self.lcm_tasks.get(nsr_id)) + else: + self.logger.debug("unknown command '{}'".format(command)) + print("Usage:\n echo: <>\n create: \n delete: \n show: ") + self.logger.debug("kafka task Exit") + + + def start(self): + self.loop = asyncio.get_event_loop() + self.loop.run_until_complete(self.read_kafka()) + self.loop.close() + self.loop = None + + +def read_config_file(config_file): + # TODO make a [ini] + yaml inside parser + # the configparser library is not suitable, because it does not admit comments at the end of line, + # and not parse integer or boolean + try: + with open(config_file) as f: + conf = yaml.load(f) + # TODO insert envioronment + # for k, v in environ.items(): + # if k.startswith("OSMLCM_"): + # split _ lower add to config + return conf + except Exception as e: + self.logger.critical("At config file '{}': {}".format(config_file, e)) + + + +if __name__ == '__main__': + + config_file = "lcm.cfg" + conf = read_config_file(config_file) + lcm = Lcm(conf) + + # FOR TEST + RO_VIM = "OST2_MRT" + + #FILL DATABASE + with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f: + vnfd = yaml.load(f) + vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"} + lcm.db.create("vnfd", vnfd_clean) + with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f: + vnfd = yaml.load(f) + vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"} + lcm.db.create("vnfd", vnfd_clean) + with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f: + nsd = yaml.load(f) + nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) + nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"} + lcm.db.create("nsd", nsd_clean) + + ns_request = { + "id": "ns1", + "nsr_id": "ns1", + "name": "pingpongOne", + "vim": RO_VIM, + "nsd_id": nsd_clean["id"], # nsd_ping_pong + } + lcm.db.create("ns_request", ns_request) + ns_request = { + "id": "ns2", + "nsr_id": "ns2", + "name": "pingpongTwo", + "vim": RO_VIM, + "nsd_id": nsd_clean["id"], # nsd_ping_pong + } + lcm.db.create("ns_request", ns_request) + + lcm.start() + + +