__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+# TODO consider use this decorator for database access retries
+# @retry_mongocall
+# def retry_mongocall(call):
+# def _retry_mongocall(*args, **kwargs):
+# retry = 1
+# while True:
+# try:
+# return call(*args, **kwargs)
+# except pymongo.AutoReconnect as e:
+# if retry == 4:
+# raise DbException(str(e))
+# sleep(retry)
+# return _retry_mongocall
+
class DbMongo(DbBase):
conn_initial_timout = 120
"ncont", "neq"):
operator = "$" + query_k[dot_index+1:]
if operator == "$neq":
- operator = "$nq"
+ operator = "$ne"
k = query_k[:dot_index]
else:
operator = "$eq"
raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e),
http_code=HTTPStatus.BAD_REQUEST)
-
def get_list(self, table, filter={}):
try:
l = []
rows = collection.find(filter)
if rows.count() == 0:
if fail_on_empty:
- raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+ raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+ HTTPStatus.NOT_FOUND)
return None
elif rows.count() > 1:
if fail_on_more:
- raise DbException("Found more than one entry with filter='{}'".format(filter),
+ raise DbException("Found more than one {} with filter='{}'".format(table[:-1], filter),
HTTPStatus.CONFLICT)
return rows[0]
except Exception as e: # TODO refine
rows = collection.delete_one(self._format_filter(filter))
if rows.deleted_count == 0:
if fail_on_empty:
- raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+ raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+ HTTPStatus.NOT_FOUND)
return None
return {"deleted": rows.deleted_count}
except Exception as e: # TODO refine
rows = collection.update_one(self._format_filter(filter), {"$set": update_dict})
if rows.updated_count == 0:
if fail_on_empty:
- raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+ raise DbException("Not found any {} with filter='{}'".format(table[:-1], filter),
+ HTTPStatus.NOT_FOUND)
return None
return {"deleted": rows.deleted_count}
except Exception as e: # TODO refine
def replace(self, table, id, indata, fail_on_empty=True):
try:
+ _filter = {"_id": id}
collection = self.db[table]
- rows = collection.replace_one({"_id": id}, indata)
- if rows.modified_count == 0:
+ rows = collection.replace_one(_filter, indata)
+ if rows.matched_count == 0:
if fail_on_empty:
- raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND)
+ raise DbException("Not found any {} with filter='{}'".format(table[:-1], _filter),
+ HTTPStatus.NOT_FOUND)
return None
return {"replace": rows.modified_count}
except Exception as e: # TODO refine
except Exception as e:
raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR)
- def file_exists(self, storage):
+ def file_exists(self, storage, mode=None):
"""
Indicates if "storage" file exist
:param storage: can be a str or a str list
+ :param mode: can be 'file' exist as a regular file; 'dir' exists as a directory or; 'None' just exists
:return: True, False
"""
if isinstance(storage, str):
f = storage
else:
f = "/".join(storage)
- return os.path.exists(self.path + f)
+ if os.path.exists(self.path + f):
+ if mode == "file" and os.path.isfile(self.path + f):
+ return True
+ if mode == "dir" and os.path.isdir(self.path + f):
+ return True
+ return False
def file_size(self, storage):
"""
:param mode: file mode
:return: file object
"""
- if isinstance(storage, str):
- f = storage
- else:
- f = "/".join(storage)
- return open(self.path + f, mode)
+ try:
+ if isinstance(storage, str):
+ f = storage
+ else:
+ f = "/".join(storage)
+ return open(self.path + f, mode)
+ except FileNotFoundError:
+ raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+ except IOError:
+ raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
+
+ def dir_ls(self, storage):
+ """
+ return folder content
+ :param storage: can be a str or list of str
+ :return: folder content
+ """
+ try:
+ if isinstance(storage, str):
+ f = storage
+ else:
+ f = "/".join(storage)
+ return os.listdir(self.path + f)
+ except NotADirectoryError:
+ raise FsException("File {} does not exist".format(f), http_code=HTTPStatus.NOT_FOUND)
+ except IOError:
+ raise FsException("File {} cannot be opened".format(f), http_code=HTTPStatus.BAD_REQUEST)
def file_delete(self, storage, ignore_non_exist=False):
"""
if os.path.exists(f):
rmtree(f)
elif not ignore_non_exist:
- raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.BAD_REQUEST)
+ raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.NOT_FOUND)
+import asyncio
from http import HTTPStatus
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
def connect(self, config):
pass
- def write(self, msg):
+ def disconnect(self):
pass
- def read(self):
+ def write(self, topic, key, msg):
pass
- def disconnect(self):
+ def read(self, topic):
pass
+ async def aiowrite(self, topic, key, msg, loop):
+ pass
+
+ async def aioread(self, topic, loop):
+ pass
except Exception as e: # TODO refine
raise MsgException(str(e))
+ def disconnect(self):
+ try:
+ self.loop.close()
+ except Exception as e: # TODO refine
+ raise MsgException(str(e))
+
def write(self, topic, key, msg):
try:
- self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=yaml.safe_dump(msg, default_flow_style=True)))
+ self.loop.run_until_complete(self.aiowrite(topic=topic, key=key,
+ msg=yaml.safe_dump(msg, default_flow_style=True),
+ loop=self.loop))
except Exception as e:
raise MsgException("Error writing {} topic: {}".format(topic, str(e)))
def read(self, topic):
- #self.topic_lst.append(topic)
+ """
+ Read from one or several topics. it is non blocking returning None if nothing is available
+ :param topic: can be str: single topic; or str list: several topics
+ :return: topic, key, message; or None
+ """
try:
- return self.loop.run_until_complete(self.aioread(topic))
+ return self.loop.run_until_complete(self.aioread(topic, self.loop))
+ except MsgException:
+ raise
except Exception as e:
raise MsgException("Error reading {} topic: {}".format(topic, str(e)))
- async def aiowrite(self, topic, key, msg, loop=None):
+ async def aiowrite(self, topic, key, msg, loop):
try:
- if not loop:
- loop = self.loop
self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode,
bootstrap_servers=self.broker)
await self.producer.start()
finally:
await self.producer.stop()
- async def aioread(self, topic, loop=None):
- if not loop:
- loop = self.loop
- self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
- await self.consumer.start()
- self.consumer.subscribe([topic])
+ async def aioread(self, topic, loop):
+ """
+ Asyncio read from one or several topics. It blocks
+ :param topic: can be str: single topic; or str list: several topics
+ :param loop: asyncio loop
+ :return: topic, key, message
+ """
try:
+ if isinstance(topic, (list, tuple)):
+ topic_list = topic
+ else:
+ topic_list = (topic,)
+
+ self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker)
+ await self.consumer.start()
+ self.consumer.subscribe(topic_list)
async for message in self.consumer:
- return yaml.load(message.key), yaml.load(message.value)
+ return message.topic, yaml.load(message.key), yaml.load(message.value)
except KafkaError as e:
raise MsgException(str(e))
finally:
import yaml
import asyncio
from msgbase import MsgBase, MsgException
+from time import sleep
__author__ = "Alfonso Tierno <alfonso.tiernosepulveda@telefonica.com>"
+"""
+This emulated kafka bus by just using a shared file system. Usefull for testing or devops.
+One file is used per topic. Only one producer and one consumer is allowed per topic. Both consumer and producer
+access to the same file. e.g. same volume if running with docker.
+One text line per message is used in yaml format
+"""
class MsgLocal(MsgBase):
self.path = None
# create a different file for each topic
self.files = {}
+ self.buffer = {}
def connect(self, config):
try:
Insert a message into topic
:param topic: topic
:param key: key text to be inserted
- :param msg: value object to be inserted
+ :param msg: value object to be inserted, can be str, object ...
:return: None or raises and exception
"""
try:
if topic not in self.files:
- self.files[topic] = open(self.path + topic, "w+")
- yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True)
+ self.files[topic] = open(self.path + topic, "a+")
+ yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True, width=20000)
self.files[topic].flush()
except Exception as e: # TODO refine
raise MsgException(str(e))
- def read(self, topic):
+ def read(self, topic, blocks=True):
+ """
+ Read from one or several topics. it is non blocking returning None if nothing is available
+ :param topic: can be str: single topic; or str list: several topics
+ :param blocks: indicates if it should wait and block until a message is present or returns None
+ :return: topic, key, message; or None if blocks==True
+ """
try:
- msg = ""
- if topic not in self.files:
- self.files[topic] = open(self.path + topic, "a+")
- # ignore previous content
- for line in self.files[topic]:
- if not line.endswith("\n"):
- msg = line
- msg += self.files[topic].readline()
- if not msg.endswith("\n"):
- return None
- msg_dict = yaml.load(msg)
- assert len(msg_dict) == 1
- for k, v in msg_dict.items():
- return k, v
+ if isinstance(topic, (list, tuple)):
+ topic_list = topic
+ else:
+ topic_list = (topic, )
+ while True:
+ for single_topic in topic_list:
+ if single_topic not in self.files:
+ self.files[single_topic] = open(self.path + single_topic, "a+")
+ self.buffer[single_topic] = ""
+ self.buffer[single_topic] += self.files[single_topic].readline()
+ if not self.buffer[single_topic].endswith("\n"):
+ continue
+ msg_dict = yaml.load(self.buffer[single_topic])
+ self.buffer[single_topic] = ""
+ assert len(msg_dict) == 1
+ for k, v in msg_dict.items():
+ return single_topic, k, v
+ if not blocks:
+ return None
+ sleep(2)
except Exception as e: # TODO refine
raise MsgException(str(e))
- async def aioread(self, topic, loop=None):
+ async def aioread(self, topic, loop):
+ """
+ Asyncio read from one or several topics. It blocks
+ :param topic: can be str: single topic; or str list: several topics
+ :param loop: asyncio loop
+ :return: topic, key, message
+ """
try:
- msg = ""
- if not loop:
- loop = asyncio.get_event_loop()
- if topic not in self.files:
- self.files[topic] = open(self.path + topic, "a+")
- # ignore previous content
- for line in self.files[topic]:
- if not line.endswith("\n"):
- msg = line
while True:
- msg += self.files[topic].readline()
- if msg.endswith("\n"):
- break
+ msg = self.read(topic, blocks=False)
+ if msg:
+ return msg
await asyncio.sleep(2, loop=loop)
- msg_dict = yaml.load(msg)
- assert len(msg_dict) == 1
- for k, v in msg_dict.items():
- return k, v
+ except MsgException:
+ raise
except Exception as e: # TODO refine
raise MsgException(str(e))
+
vnfd or nsd content
:param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns'
:param indata: Content to be inspected
- :return: the useful part of indata (a reference, not a new dictionay) plus boolean indicating if it was enveloped
+ :return: the useful part of indata (a reference, not a new dictionay)
"""
clean_indata = indata
- enveloped = False
if not indata:
- return {}, False
+ return {}
if item == "vnfd":
if clean_indata.get('vnfd:vnfd-catalog'):
- enveloped = True
clean_indata = clean_indata['vnfd:vnfd-catalog']
elif clean_indata.get('vnfd-catalog'):
- enveloped = True
clean_indata = clean_indata['vnfd-catalog']
if clean_indata.get('vnfd'):
if not isinstance(clean_indata['vnfd'], list) or len(clean_indata['vnfd']) != 1:
clean_indata = clean_indata['vnfd'][0]
elif item == "nsd":
if clean_indata.get('nsd:nsd-catalog'):
- enveloped = True
clean_indata = clean_indata['nsd:nsd-catalog']
elif clean_indata.get('nsd-catalog'):
- enveloped = True
clean_indata = clean_indata['nsd-catalog']
if clean_indata.get('nsd'):
if not isinstance(clean_indata['nsd'], list) or len(clean_indata['nsd']) != 1:
raise ROClientException("'nsd' must be a list only one element")
clean_indata = clean_indata['nsd'][0]
+ elif item == "sdn":
+ if len(indata) == 1 and "sdn_controller" in indata:
+ clean_indata = indata["sdn_controller"]
elif item == "tenant":
if len(indata) == 1 and "tenant" in indata:
- enveloped = True
clean_indata = indata["tenant"]
- elif item == "vim" or item == "datacenter":
+ elif item in ("vim", "vim_account", "datacenters"):
if len(indata) == 1 and "datacenter" in indata:
- enveloped = True
clean_indata = indata["datacenter"]
elif item == "ns" or item == "instances":
if len(indata) == 1 and "instance" in indata:
- enveloped = True
clean_indata = indata["instance"]
else:
assert False, "remove_envelop with unknown item {}".format(item)
- return clean_indata, enveloped
+ return clean_indata
class ROClient:
headers_req = {'Accept': 'application/yaml', 'content-type': 'application/yaml'}
- client_to_RO = {'tenant': 'tenants', 'vim': 'datacenters', 'vnfd': 'vnfs', 'nsd': 'scenarios',
+ client_to_RO = {'tenant': 'tenants', 'vim': 'datacenters', 'vim_account': 'datacenters', 'sdn': 'sdn_controllers',
+ 'vnfd': 'vnfs', 'nsd': 'scenarios',
'ns': 'instances'}
mandatory_for_create = {
'tenant': ("name", ),
- 'vim': ("name", "vim_url"),
'vnfd': ("name", "id", "connection-point", "vdu"),
'nsd': ("name", "id", "constituent-vnfd"),
'ns': ("name", "scenario", "datacenter"),
+ 'vim': ("name", "vim_url"),
+ 'vim_account': (),
+ 'sdn': ("name", "port", 'ip', 'dpid', 'type'),
}
timeout_large = 120
timeout_short = 30
def __setitem__(self,index, value):
if index == 'tenant':
self.tenant_id_name = value
- elif index == 'datacenter':
+ elif index == 'datacenter' or index == 'vim':
self.datacenter_id_name = value
elif index == 'username':
self.username = value
return {'nsd-catalog': {'nsd': [indata]}}
elif item == "tenant":
return {'tenant': indata}
- elif item == "vim" or item == "datacenter":
+ elif item in ("vim", "vim_account", "datacenter"):
return {'datacenter': indata}
elif item == "ns" or item == "instances":
return {'instance': indata}
else:
assert False, "_create_envelop with unknown item {}".format(item)
+ @staticmethod
+ def update_descriptor(desc, kwargs):
+ desc = deepcopy(desc) # do not modify original descriptor
+ try:
+ for k, v in kwargs.items():
+ update_content = desc
+ kitem_old = None
+ klist = k.split(".")
+ for kitem in klist:
+ if kitem_old is not None:
+ update_content = update_content[kitem_old]
+ if isinstance(update_content, dict):
+ kitem_old = kitem
+ elif isinstance(update_content, list):
+ kitem_old = int(kitem)
+ else:
+ raise ROClientException(
+ "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
+ if v == "__DELETE__":
+ del update_content[kitem_old]
+ else:
+ update_content[kitem_old] = v
+ return desc
+ except KeyError:
+ raise ROClientException(
+ "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
+ except ValueError:
+ raise ROClientException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
+ k, kitem))
+ except IndexError:
+ raise ROClientException(
+ "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
+
@staticmethod
def check_ns_status(ns_descriptor):
"""
for net in ns_descriptor["nets"]:
net_total += 1
- if net["status"] == "ERROR":
+ if net["status"] in ("ERROR", "VIM_ERROR"):
return "ERROR", net["error_msg"]
elif net["status"] == "ACTIVE":
net_done += 1
for vnf in ns_descriptor["vnfs"]:
for vm in vnf["vms"]:
vm_total += 1
- if vm["status"] == "ERROR":
+ if vm["status"] in ("ERROR", "VIM_ERROR"):
return "ERROR", vm["error_msg"]
elif vm["status"] == "ACTIVE":
vm_done += 1
# vnf[vms]
return ns_ip
- async def get_list(self, item, all_tenants=False, filter_by=None):
- """
- Obtain a list of items filtering by the specigy filter_by.
- :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns'
- :param all_tenants: True if not filtering by tenant. Only allowed for admin
- :param filter_by: dictionary with filtering
- :return: a list of dict. It can be empty. Raises ROClientException on Error,
- """
- try:
- if item not in self.client_to_RO:
- raise ROClientException("Invalid item {}".format(item))
- if item == 'tenant':
- all_tenants = None
- with aiohttp.ClientSession(loop=self.loop) as session:
- content = await self._list_item(session, self.client_to_RO[item], all_tenants=all_tenants,
- filter_dict=filter_by)
- if isinstance(content, dict):
- if len(content) == 1:
- for _, v in content.items():
- return v
- return content.values()[0]
- else:
- raise ROClientException("Output not a list neither dict with len equal 1", http_code=500)
- return content
- except aiohttp.errors.ClientOSError as e:
- raise ROClientException(e, http_code=504)
-
async def _get_item_uuid(self, session, item, item_id_name, all_tenants=False):
if all_tenants:
tenant_text = "/any"
uuid = item_id_name
else:
# check that exist
- uuid = self._get_item_uuid(session, item, item_id_name, all_tenants)
+ uuid = await self._get_item_uuid(session, item, item_id_name, all_tenants)
url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid)
self.logger.debug("GET %s", url )
self.datacenter = await self._get_item_uuid(session, "datacenters", self.datacenter_id_name, True)
return self.datacenter
- async def _create_item(self, session, item, descriptor, all_tenants=False):
+ async def _create_item(self, session, item, descriptor, item_id_name=None, action=None, all_tenants=False):
if all_tenants:
tenant_text = "/any"
elif all_tenants is None:
await self._get_tenant(session)
tenant_text = "/" + self.tenant
payload_req = yaml.safe_dump(descriptor)
+ #print payload_req
api_version_text = ""
if item == "vnfs":
api_version_text = "/v3"
item = "nsd"
- #print payload_req
+ if not item_id_name:
+ uuid=""
+ elif self.check_if_uuid(item_id_name):
+ uuid = "/{}".format(item_id_name)
+ else:
+ # check that exist
+ uuid = await self._get_item_uuid(session, item, item_id_name, all_tenants)
+ uuid = "/{}".format(uuid)
+ if not action:
+ action = ""
+ else:
+ action = "/".format(action)
- url = "{}{apiver}{tenant}/{item}".format(self.endpoint_url, apiver=api_version_text, tenant=tenant_text,
- item=item)
+ url = "{}{apiver}{tenant}/{item}{id}{action}".format(self.endpoint_url, apiver=api_version_text, tenant=tenant_text,
+ item=item, id=uuid, action=action)
self.logger.debug("openmano POST %s %s", url, payload_req)
with aiohttp.Timeout(self.timeout_large):
async with session.post(url, headers=self.headers_req, data=payload_req) as response:
if response.status >= 300:
raise ROClientException(response_text, http_code=response.status)
- response_desc = self._parse_yaml(response_text, response=True)
- desc, _ = remove_envelop(item, response_desc)
- return desc
+ return self._parse_yaml(response_text, response=True)
async def _del_item(self, session, item, item_id_name, all_tenants=False):
if all_tenants:
tenant_text = "/" + self.tenant
if not self.check_if_uuid(item_id_name):
# check that exist
- uuid = await self._get_item_uuid(session, item, item_id_name, all_tenants)
+ _all_tenants = all_tenants
+ if item == "datacenters":
+ _all_tenants = True
+ uuid = await self._get_item_uuid(session, item, item_id_name, all_tenants=_all_tenants)
else:
uuid = item_id_name
raise ROClientException(response_text, http_code=response.status)
return self._parse_yaml(response_text, response=True)
- async def _edit_item(self, session, item, descriptor, item_id_name, all_tenants=False):
+ async def _edit_item(self, session, item, item_id, descriptor, all_tenants=False):
if all_tenants:
tenant_text = "/any"
elif all_tenants is None:
await self._get_tenant(session)
tenant_text = "/" + self.tenant
- if not uuid:
- #check that exist
- uuid = self._get_item_uuid(session, "tenants", item_id_name, all_tenants)
-
payload_req = yaml.safe_dump(descriptor)
#print payload_req
- url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, uuid)
+ url = "{}{}/{}/{}".format(self.endpoint_url, tenant_text, item, item_id)
self.logger.debug("openmano PUT %s %s", url, payload_req)
with aiohttp.Timeout(self.timeout_large):
async with session.put(url, headers=self.headers_req, data=payload_req) as response:
raise ROClientException(response_text, http_code=response.status)
return self._parse_yaml(response_text, response=True)
+ async def get_list(self, item, all_tenants=False, filter_by=None):
+ """
+ Obtain a list of items filtering by the specigy filter_by.
+ :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns'
+ :param all_tenants: True if not filtering by tenant. Only allowed for admin
+ :param filter_by: dictionary with filtering
+ :return: a list of dict. It can be empty. Raises ROClientException on Error,
+ """
+ try:
+ if item not in self.client_to_RO:
+ raise ROClientException("Invalid item {}".format(item))
+ if item == 'tenant':
+ all_tenants = None
+ with aiohttp.ClientSession(loop=self.loop) as session:
+ content = await self._list_item(session, self.client_to_RO[item], all_tenants=all_tenants,
+ filter_dict=filter_by)
+ if isinstance(content, dict):
+ if len(content) == 1:
+ for _, v in content.items():
+ return v
+ return content.values()[0]
+ else:
+ raise ROClientException("Output not a list neither dict with len equal 1", http_code=500)
+ return content
+ except aiohttp.errors.ClientOSError as e:
+ raise ROClientException(e, http_code=504)
+
async def show(self, item, item_id_name=None, all_tenants=False):
"""
Obtain the information of an item from its id or name
raise ROClientException("Invalid item {}".format(item))
if item == 'tenant':
all_tenants = None
+ elif item == 'vim':
+ all_tenants = True
+ elif item == 'vim_account':
+ all_tenants = False
with aiohttp.ClientSession(loop=self.loop) as session:
content = await self._get_item(session, self.client_to_RO[item], item_id_name, all_tenants=all_tenants)
- if len(content) == 1:
- return content.values()[0]
- else:
- return content
+ return remove_envelop(item, content)
except aiohttp.errors.ClientOSError as e:
raise ROClientException(e, http_code=504)
try:
if item not in self.client_to_RO:
raise ROClientException("Invalid item {}".format(item))
- if item == 'tenant':
+ if item == 'tenant' or item == 'vim':
all_tenants = None
with aiohttp.ClientSession(loop=self.loop) as session:
- if item == 'vim':
- # check that exist
- item_id = await self._get_item_uuid(session, "datacenters", item_id_name, all_tenants=True)
- all_tenants = None
return await self._del_item(session, self.client_to_RO[item], item_id_name, all_tenants=all_tenants)
except aiohttp.errors.ClientOSError as e:
raise ROClientException(e, http_code=504)
- async def create(self, item, descriptor=None, descriptor_format=None, **kwargs):
- """
- Creates an item from its descriptor
- :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns'
+ async def edit(self, item, item_id_name, descriptor=None, descriptor_format=None, **kwargs):
+ """ Edit an item
+ :param item: can be 'tenant', 'vim', 'vnfd', 'nsd', 'ns', 'vim'
:param descriptor: can be a dict, or a yaml/json text. Autodetect unless descriptor_format is provided
:param descriptor_format: Can be 'json' or 'yaml'
:param kwargs: Overrides descriptor with values as name, description, vim_url, vim_url_admin, vim_type
if item not in self.client_to_RO:
raise ROClientException("Invalid item {}".format(item))
- desc, enveloped = remove_envelop(item, descriptor)
+ desc = remove_envelop(item, descriptor)
# Override descriptor with kwargs
if kwargs:
- desc = deepcopy(desc) # do not modify original descriptor
- try:
- for k, v in kwargs.items():
- update_content = desc
- kitem_old = None
- klist = k.split(".")
- for kitem in klist:
- if kitem_old is not None:
- update_content = update_content[kitem_old]
- if isinstance(update_content, dict):
- kitem_old = kitem
- elif isinstance(update_content, list):
- kitem_old = int(kitem)
- else:
- raise ROClientException(
- "Invalid query string '{}'. Descriptor is not a list nor dict at '{}'".format(k, kitem))
- if v == "__DELETE__":
- del update_content[kitem_old]
- else:
- update_content[kitem_old] = v
- except KeyError:
- raise ROClientException(
- "Invalid query string '{}'. Descriptor does not contain '{}'".format(k, kitem_old))
- except ValueError:
- raise ROClientException("Invalid query string '{}'. Expected integer index list instead of '{}'".format(
- k, kitem))
- except IndexError:
- raise ROClientException(
- "Invalid query string '{}'. Index '{}' out of range".format(k, kitem_old))
-
- for mandatory in self.mandatory_for_create[item]:
- if mandatory not in desc:
- raise ROClientException("'{}' is mandatory parameter for {}".format(mandatory, item))
-
+ desc = self.update_descriptor(desc, kwargs)
all_tenants = False
if item in ('tenant', 'vim'):
all_tenants = None
- if not enveloped:
- create_desc = self._create_envelop(item, desc)
- else:
- create_desc = descriptor
+ create_desc = self._create_envelop(item, desc)
with aiohttp.ClientSession(loop=self.loop) as session:
- return await self._create_item(session, self.client_to_RO[item], create_desc, all_tenants)
+ _all_tenants = all_tenants
+ if item == 'vim':
+ _all_tenants = True
+ item_id = await self._get_item_uuid(session, self.client_to_RO[item], item_id_name, all_tenants=_all_tenants)
+ # await self._get_tenant(session)
+ outdata = await self._edit_item(session, self.client_to_RO[item], item_id, create_desc, all_tenants=all_tenants)
+ return remove_envelop(item, outdata)
except aiohttp.errors.ClientOSError as e:
raise ROClientException(e, http_code=504)
- def edit_tenant(self, uuid=None, name=None, descriptor=None, descriptor_format=None, new_name=None, new_description=None):
- """Edit the parameters of a tenant
- Params: must supply a descriptor or/and a new_name or new_description
- uuid or/and name. If only name is supplied, there must be only one or an exception is raised
- descriptor: with format {'tenant':{params to change info}}
- must be a dictionary or a json/yaml text.
- name: the tenant name. Overwrite descriptor name if any
- description: tenant descriptor.. Overwrite descriptor description if any
- Return: Raises an exception on error, not found or found several
- Obtain a dictionary with format {'tenant':{newtenant_info}}
+ async def create(self, item, descriptor=None, descriptor_format=None, **kwargs):
+ """
+ Creates an item from its descriptor
+ :param item: can be 'tenant', 'vnfd', 'nsd', 'ns', 'vim', 'vim_account', 'sdn'
+ :param descriptor: can be a dict, or a yaml/json text. Autodetect unless descriptor_format is provided
+ :param descriptor_format: Can be 'json' or 'yaml'
+ :param kwargs: Overrides descriptor with values as name, description, vim_url, vim_url_admin, vim_type
+ keys can be a dot separated list to specify elements inside dict
+ :return: dictionary with the information or raises ROClientException on Error
"""
try:
- # TODO revise
if isinstance(descriptor, str):
- descriptor = self.parse(descriptor, descriptor_format)
+ descriptor = self._parse(descriptor, descriptor_format)
elif descriptor:
pass
- elif new_name or new_description:
- descriptor={"tenant": {}}
else:
- raise ROClientException("Missing descriptor")
+ descriptor = {}
- if 'tenant' not in descriptor or len(descriptor)!=1:
- raise ROClientException("Descriptor must contain only one 'tenant' field")
- if new_name:
- descriptor['tenant']['name'] = new_name
- if new_description:
- descriptor['tenant']['description'] = new_description
+ if item not in self.client_to_RO:
+ raise ROClientException("Invalid item {}".format(item))
+ desc = remove_envelop(item, descriptor)
+
+ # Override descriptor with kwargs
+ if kwargs:
+ desc = self.update_descriptor(desc, kwargs)
+
+ for mandatory in self.mandatory_for_create[item]:
+ if mandatory not in desc:
+ raise ROClientException("'{}' is mandatory parameter for {}".format(mandatory, item))
+
+ all_tenants = False
+ if item in ('tenant', 'vim'):
+ all_tenants = None
+
+ create_desc = self._create_envelop(item, desc)
- return self._edit_item("tenants", descriptor, uuid, name, all_tenants=None)
+ with aiohttp.ClientSession(loop=self.loop) as session:
+ outdata = await self._create_item(session, self.client_to_RO[item], create_desc,
+ all_tenants=all_tenants)
+ return remove_envelop(item, outdata)
except aiohttp.errors.ClientOSError as e:
raise ROClientException(e, http_code=504)
+ async def attach_datacenter(self, datacenter=None, descriptor=None, descriptor_format=None, **kwargs):
+
+ if isinstance(descriptor, str):
+ descriptor = self._parse(descriptor, descriptor_format)
+ elif descriptor:
+ pass
+ else:
+ descriptor = {}
+ desc = remove_envelop("vim", descriptor)
+
+ # # check that exist
+ # uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=True)
+ # tenant_text = "/" + self._get_tenant()
+ if kwargs:
+ desc = self.update_descriptor(desc, kwargs)
+
+ if not desc.get("vim_tenant_name") and not desc.get("vim_tenant_id"):
+ raise ROClientException("Wrong descriptor. At least vim_tenant_name or vim_tenant_id must be provided")
+ create_desc = self._create_envelop("vim", desc)
+ payload_req = yaml.safe_dump(create_desc)
+ with aiohttp.ClientSession(loop=self.loop) as session:
+ # check that exist
+ item_id = await self._get_item_uuid(session, "datacenters", datacenter, all_tenants=True)
+ await self._get_tenant(session)
+
+ url = "{}/{tenant}/datacenters/{datacenter}".format(self.endpoint_url, tenant=self.tenant,
+ datacenter=item_id)
+ self.logger.debug("openmano POST %s %s", url, payload_req)
+ with aiohttp.Timeout(self.timeout_large):
+ async with session.post(url, headers=self.headers_req, data=payload_req) as response:
+ response_text = await response.read()
+ self.logger.debug("POST {} [{}] {}".format(url, response.status, response_text[:100]))
+ if response.status >= 300:
+ raise ROClientException(response_text, http_code=response.status)
+
+ response_desc = self._parse_yaml(response_text, response=True)
+ desc = remove_envelop("vim", response_desc)
+ return desc
+
+ async def detach_datacenter(self, datacenter=None):
+ #TODO replace the code with delete_item(vim_account,...)
+ with aiohttp.ClientSession(loop=self.loop) as session:
+ # check that exist
+ item_id = await self._get_item_uuid(session, "datacenters", datacenter, all_tenants=False)
+ tenant = await self._get_tenant(session)
+
+ url = "{}/{tenant}/datacenters/{datacenter}".format(self.endpoint_url, tenant=tenant,
+ datacenter=item_id)
+ self.logger.debug("openmano DELETE %s", url)
+ with aiohttp.Timeout(self.timeout_large):
+ async with session.delete(url, headers=self.headers_req) as response:
+ response_text = await response.read()
+ self.logger.debug("DELETE {} [{}] {}".format(url, response.status, response_text[:100]))
+ if response.status >= 300:
+ raise ROClientException(response_text, http_code=response.status)
+
+ response_desc = self._parse_yaml(response_text, response=True)
+ desc = remove_envelop("vim", response_desc)
+ return desc
+
+
+ # TODO convert to asyncio
+
#DATACENTERS
def edit_datacenter(self, uuid=None, name=None, descriptor=None, descriptor_format=None, all_tenants=False, **kwargs):
descriptor['datacenter'][param] = kwargs[param]
return self._edit_item("datacenters", descriptor, uuid, name, all_tenants=None)
- def attach_datacenter(self, uuid_name=None, descriptor=None, descriptor_format=None, vim_user=None, vim_password=None, vim_tenant_name=None, vim_tenant_id=None):
- #check that exist
- uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=True)
- tenant_text = "/"+self._get_tenant()
-
- if isinstance(descriptor, str):
- descriptor = self.parse(descriptor, descriptor_format)
- elif descriptor:
- pass
- elif vim_user or vim_password or vim_tenant_name or vim_tenant_id:
- descriptor={"datacenter": {}}
- else:
- raise ROClientException("Missing descriptor or params")
-
- if vim_user or vim_password or vim_tenant_name or vim_tenant_id:
- #print args.name
- try:
- if vim_user:
- descriptor['datacenter']['vim_user'] = vim_user
- if vim_password:
- descriptor['datacenter']['vim_password'] = vim_password
- if vim_tenant_name:
- descriptor['datacenter']['vim_tenant_name'] = vim_tenant_name
- if vim_tenant_id:
- descriptor['datacenter']['vim_tenant'] = vim_tenant_id
- except (KeyError, TypeError) as e:
- if str(e)=='datacenter': error_pos= "missing field 'datacenter'"
- else: error_pos="wrong format"
- raise ROClientException("Wrong datacenter descriptor: " + error_pos)
-
- payload_req = yaml.safe_dump(descriptor)
- #print payload_req
- url = "{}{}/datacenters/{}".format(self.endpoint_url, tenant_text, uuid)
- self.logger.debug("openmano POST %s %s", url, payload_req)
- mano_response = requests.post(url, headers = self.headers_req, data=payload_req)
- self.logger.debug("openmano response: %s", mano_response.text )
-
- content = self._parse_yaml(mano_response.text, response=True)
- if mano_response.status_code==200:
- return content
- else:
- raise ROClientException(str(content), http_code=mano_response.status)
-
- def detach_datacenter(self, uuid_name=None):
- if not uuid:
- #check that exist
- uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=False)
- tenant_text = "/"+self._get_tenant()
- url = "{}{}/datacenters/{}".format(self.endpoint_url, tenant_text, uuid)
- self.logger.debug("openmano DELETE %s", url)
- mano_response = requests.delete(url, headers = self.headers_req)
- self.logger.debug("openmano response: %s", mano_response.text )
-
- content = self._parse_yaml(mano_response.text, response=True)
- if mano_response.status_code==200:
- return content
- else:
- raise ROClientException(str(content), http_code=mano_response.status)
-
- #VNFS
def edit_scenario(self, uuid=None, name=None, descriptor=None, descriptor_format=None, all_tenants=False, **kwargs):
"""Edit the parameters of a scenario
Return: Raises an exception on error, not found or found several
Obtain a dictionary with format {'scenario':{new_scenario_info}}
"""
-
+
if isinstance(descriptor, str):
descriptor = self.parse(descriptor, descriptor_format)
elif descriptor:
descriptor={"scenario": {}}
else:
raise ROClientException("Missing descriptor")
-
+
if 'scenario' not in descriptor or len(descriptor)>2:
raise ROClientException("Descriptor must contain only one 'scenario' field")
for param in kwargs:
if __name__ == '__main__':
RO_URL = "http://localhost:9090/openmano"
- RO_TENANT = "osm"
- RO_VIM = "myvim"
+ TEST_TENANT = "myTenant"
+ TEST_VIM1 = "myvim"
+ TEST_URL1 = "https://localhost:5000/v1"
+ TEST_TYPE1 = "openstack"
+ TEST_CONFIG1 = {"use_floating_ip": True}
+ TEST_VIM2 = "myvim2"
+ TEST_URL2 = "https://localhost:5000/v2"
+ TEST_TYPE2 = "openvim"
+ TEST_CONFIG2 = {"config2": "config2", "config3": True}
streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s"
logging.basicConfig(format=streamformat)
+ logger = logging.getLogger("ROClient")
+ tenant_id = None
+ vim_id = False
loop = asyncio.get_event_loop()
- myClient = ROClient(endpoint_url=RO_URL, loop=loop, tenant_id=RO_TENANT, datacenter_id=RO_VIM, debug=True)
- content = loop.run_until_complete(myClient.list_tenants())
- print(content)
+ myClient = ROClient(endpoint_url=RO_URL, loop=loop, loglevel="DEBUG")
+ try:
+ # test tenant
+ content = loop.run_until_complete(myClient.get_list("tenant"))
+ print("tenants", content)
+ content = loop.run_until_complete(myClient.create("tenant", name=TEST_TENANT))
+ tenant_id = True
+ content = loop.run_until_complete(myClient.show("tenant", TEST_TENANT))
+ print("tenant", TEST_TENANT, content)
+ content = loop.run_until_complete(myClient.edit("tenant", TEST_TENANT, description="another description"))
+ content = loop.run_until_complete(myClient.show("tenant", TEST_TENANT))
+ print("tenant edited", TEST_TENANT, content)
+ myClient["tenant"] = TEST_TENANT
+
+
+ # test VIM
+ content = loop.run_until_complete(myClient.create("vim", name=TEST_VIM1, type=TEST_TYPE1, vim_url=TEST_URL1, config=TEST_CONFIG1))
+ vim_id = True
+ content = loop.run_until_complete(myClient.get_list("vim"))
+ print("vim", content)
+ content = loop.run_until_complete(myClient.show("vim", TEST_VIM1))
+ print("vim", TEST_VIM1, content)
+ content = loop.run_until_complete(myClient.edit("vim", TEST_VIM1, description="another description",
+ name=TEST_VIM2, type=TEST_TYPE2, vim_url=TEST_URL2,
+ config=TEST_CONFIG2))
+ content = loop.run_until_complete(myClient.show("vim", TEST_VIM2))
+ print("vim edited", TEST_VIM2, content)
+
+ # test VIM_ACCOUNT
+ content = loop.run_until_complete(myClient.attach_datacenter(TEST_VIM2, vim_username='user',
+ vim_password='pass', vim_tenant_name='vimtenant1', config=TEST_CONFIG1))
+ vim_id = True
+ content = loop.run_until_complete(myClient.get_list("vim_account"))
+ print("vim_account", content)
+ content = loop.run_until_complete(myClient.show("vim_account", TEST_VIM2))
+ print("vim_account", TEST_VIM2, content)
+ content = loop.run_until_complete(myClient.edit("vim_account", TEST_VIM2, vim_username='user2', vim_password='pass2',
+ vim_tenant_name="vimtenant2", config=TEST_CONFIG2))
+ content = loop.run_until_complete(myClient.show("vim_account", TEST_VIM2))
+ print("vim_account edited", TEST_VIM2, content)
+
+ myClient["vim"] = TEST_VIM2
+
+ except Exception as e:
+ logger.error("Error {}".format(e), exc_info=True)
+
+ for item in (("vim_account", TEST_VIM1), ("vim", TEST_VIM1),
+ ("vim_account", TEST_VIM2), ("vim", TEST_VIM2),
+ ("tenant", TEST_TENANT)):
+ try:
+ content = loop.run_until_complete(myClient.delete(item[0], item[1]))
+ print("{} {} deleted; {}".format(item[0], item[1], content))
+ except Exception as e:
+ if e.http_code == 404:
+ print("{} {} not present or already deleted".format(item[0], item[1]))
+ else:
+ logger.error("Error {}".format(e), exc_info=True)
+
loop.close()
#[message]
message:
- driver: local # local or kafka
+ driver: kafka # local or kafka
# for local provide file path
path: /app/storage/kafka
# for kafka provide host and port
import msgkafka
import logging
import functools
+import sys
from dbbase import DbException
from fsbase import FsException
from msgbase import MsgException
from os import environ
# from vca import DeployApplication, RemoveApplication
from n2vc.vnf import N2VC
-import os.path
-import time
+# import os.path
+# import time
from copy import deepcopy
from http import HTTPStatus
+
class LcmException(Exception):
pass
:param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
:return: None
"""
+
+ self.db = None
+ self.msg = None
+ self.fs = None
# contains created tasks/futures to be able to cancel
- self.lcm_tasks = {}
+
+ self.lcm_ns_tasks = {}
+ self.lcm_vim_tasks = {}
+ self.lcm_sdn_tasks = {}
# logging
self.logger = logging.getLogger('lcm')
# load configuration
self.logger.critical(str(e), exc_info=True)
raise LcmException(str(e))
- def update_nsr_db(self, nsr_id, nsr_desc):
+ def update_db(self, item, _id, _desc):
try:
- self.db.replace("nsrs", nsr_id, nsr_desc)
+ self.db.replace(item, _id, _desc)
except DbException as e:
- self.logger.error("Updating nsr_id={}: {}".format(nsr_id, e))
+ self.logger.error("Updating {} _id={}: {}".format(item, _id, e))
+
+ async def create_vim(self, vim_content, order_id):
+ vim_id = vim_content["_id"]
+ logging_text = "Task create_vim={} ".format(vim_id)
+ self.logger.debug(logging_text + "Enter")
+ db_vim = None
+ exc = None
+ try:
+ step = "Getting vim from db"
+ db_vim = self.db.get_one("vims", {"_id": vim_id})
+ if "_admin" not in db_vim:
+ db_vim["_admin"] = {}
+ if "deploy" not in db_vim["_admin"]:
+ db_vim["_admin"]["deploy"] = {}
+ db_vim["_admin"]["deploy"]["RO"] = None
+
+ step = "Creating vim at RO"
+ RO = ROclient.ROClient(self.loop, **self.ro_config)
+ vim_RO = deepcopy(vim_content)
+ vim_RO.pop("_id", None)
+ vim_RO.pop("_admin", None)
+ vim_RO.pop("schema_version", None)
+ vim_RO.pop("schema_type", None)
+ vim_RO.pop("vim_tenant_name", None)
+ vim_RO["type"] = vim_RO.pop("vim_type")
+ vim_RO.pop("vim_user", None)
+ vim_RO.pop("vim_password", None)
+ desc = await RO.create("vim", descriptor=vim_RO)
+ RO_vim_id = desc["uuid"]
+ db_vim["_admin"]["deploy"]["RO"] = RO_vim_id
+ self.update_db("vims", vim_id, db_vim)
+
+ step = "Attach vim to RO tenant"
+ vim_RO = {"vim_tenant_name": vim_content["vim_tenant_name"],
+ "vim_username": vim_content["vim_user"],
+ "vim_password": vim_content["vim_password"],
+ "config": vim_content["config"]
+ }
+ desc = await RO.attach_datacenter(RO_vim_id , descriptor=vim_RO)
+ db_vim["_admin"]["operationalState"] = "ENABLED"
+ self.update_db("vims", vim_id, db_vim)
+
+ self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
+ return RO_vim_id
+
+ except (ROclient.ROClientException, DbException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ if exc and db_vim:
+ db_vim["_admin"]["operationalState"] = "ERROR"
+ db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
+ self.update_db("vims", vim_id, db_vim)
+
+ async def edit_vim(self, vim_content, order_id):
+ vim_id = vim_content["_id"]
+ logging_text = "Task edit_vim={} ".format(vim_id)
+ self.logger.debug(logging_text + "Enter")
+ db_vim = None
+ exc = None
+ step = "Getting vim from db"
+ try:
+ db_vim = self.db.get_one("vims", {"_id": vim_id})
+ if db_vim.get("_admin") and db_vim["_admin"].get("deploy") and db_vim["_admin"]["deploy"].get("RO"):
+ RO_vim_id = db_vim["_admin"]["deploy"]["RO"]
+ step = "Editing vim at RO"
+ RO = ROclient.ROClient(self.loop, **self.ro_config)
+ vim_RO = deepcopy(vim_content)
+ vim_RO.pop("_id", None)
+ vim_RO.pop("_admin", None)
+ vim_RO.pop("schema_version", None)
+ vim_RO.pop("schema_type", None)
+ vim_RO.pop("vim_tenant_name", None)
+ vim_RO["type"] = vim_RO.pop("vim_type")
+ vim_RO.pop("vim_user", None)
+ vim_RO.pop("vim_password", None)
+ if vim_RO:
+ desc = await RO.edit("vim", RO_vim_id, descriptor=vim_RO)
+
+ step = "Editing vim-account at RO tenant"
+ vim_RO = {}
+ for k in ("vim_tenant_name", "vim_password", "config"):
+ if k in vim_content:
+ vim_RO[k] = vim_content[k]
+ if "vim_user" in vim_content:
+ vim_content["vim_username"] = vim_content["vim_user"]
+ if vim_RO:
+ desc = await RO.edit("vim_account", RO_vim_id, descriptor=vim_RO)
+ db_vim["_admin"]["operationalState"] = "ENABLED"
+ self.update_db("vims", vim_id, db_vim)
+
+ self.logger.debug(logging_text + "Exit Ok RO_vim_id".format(RO_vim_id))
+ return RO_vim_id
+
+ except (ROclient.ROClientException, DbException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ if exc and db_vim:
+ db_vim["_admin"]["operationalState"] = "ERROR"
+ db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
+ self.update_db("vims", vim_id, db_vim)
+
+ async def delete_vim(self, vim_id, order_id):
+ logging_text = "Task delete_vim={} ".format(vim_id)
+ self.logger.debug(logging_text + "Enter")
+ db_vim = None
+ exc = None
+ step = "Getting vim from db"
+ try:
+ db_vim = self.db.get_one("vims", {"_id": vim_id})
+ if db_vim.get("_admin") and db_vim["_admin"].get("deploy") and db_vim["_admin"]["deploy"].get("RO"):
+ RO_vim_id = db_vim["_admin"]["deploy"]["RO"]
+ RO = ROclient.ROClient(self.loop, **self.ro_config)
+ step = "Detaching vim from RO tenant"
+ try:
+ await RO.detach_datacenter(RO_vim_id)
+ except ROclient.ROClientException as e:
+ if e.http_code == 404: # not found
+ self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
+ else:
+ raise
+
+ step = "Deleting vim from RO"
+ try:
+ await RO.delete("vim", RO_vim_id)
+ except ROclient.ROClientException as e:
+ if e.http_code == 404: # not found
+ self.logger.debug(logging_text + "RO_vim_id={} already deleted".format(RO_vim_id))
+ else:
+ raise
+ else:
+ # nothing to delete
+ self.logger.error(logging_text + "Skipping. There is not RO information at database")
+ self.db.del_one("vims", {"_id": vim_id})
+ self.logger.debug("delete_vim task vim_id={} Exit Ok".format(vim_id))
+ return None
+
+ except (ROclient.ROClientException, DbException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ if exc and db_vim:
+ db_vim["_admin"]["operationalState"] = "ERROR"
+ db_vim["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
+ self.update_db("vims", vim_id, db_vim)
+
+ async def create_sdn(self, sdn_content, order_id):
+ sdn_id = sdn_content["_id"]
+ logging_text = "Task create_sdn={} ".format(sdn_id)
+ self.logger.debug(logging_text + "Enter")
+ db_sdn = None
+ exc = None
+ try:
+ step = "Getting sdn from db"
+ db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
+ if "_admin" not in db_sdn:
+ db_sdn["_admin"] = {}
+ if "deploy" not in db_sdn["_admin"]:
+ db_sdn["_admin"]["deploy"] = {}
+ db_sdn["_admin"]["deploy"]["RO"] = None
+
+ step = "Creating sdn at RO"
+ RO = ROclient.ROClient(self.loop, **self.ro_config)
+ sdn_RO = deepcopy(sdn_content)
+ sdn_RO.pop("_id", None)
+ sdn_RO.pop("_admin", None)
+ sdn_RO.pop("schema_version", None)
+ sdn_RO.pop("schema_type", None)
+ desc = await RO.create("sdn", descriptor=sdn_RO)
+ RO_sdn_id = desc["uuid"]
+ db_sdn["_admin"]["deploy"]["RO"] = RO_sdn_id
+ db_sdn["_admin"]["operationalState"] = "ENABLED"
+ self.update_db("sdns", sdn_id, db_sdn)
+ self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
+ return RO_sdn_id
+
+ except (ROclient.ROClientException, DbException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ if exc and db_sdn:
+ db_sdn["_admin"]["operationalState"] = "ERROR"
+ db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
+ self.update_db("sdns", sdn_id, db_sdn)
+
+ async def edit_sdn(self, sdn_content, order_id):
+ sdn_id = sdn_content["_id"]
+ logging_text = "Task edit_sdn={} ".format(sdn_id)
+ self.logger.debug(logging_text + "Enter")
+ db_sdn = None
+ exc = None
+ step = "Getting sdn from db"
+ try:
+ db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
+ if db_sdn.get("_admin") and db_sdn["_admin"].get("deploy") and db_sdn["_admin"]["deploy"].get("RO"):
+ RO_sdn_id = db_sdn["_admin"]["deploy"]["RO"]
+ RO = ROclient.ROClient(self.loop, **self.ro_config)
+ step = "Editing sdn at RO"
+ sdn_RO = deepcopy(sdn_content)
+ sdn_RO.pop("_id", None)
+ sdn_RO.pop("_admin", None)
+ sdn_RO.pop("schema_version", None)
+ sdn_RO.pop("schema_type", None)
+ if sdn_RO:
+ desc = await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
+ db_sdn["_admin"]["operationalState"] = "ENABLED"
+ self.update_db("sdns", sdn_id, db_sdn)
+
+ self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
+ return RO_sdn_id
+
+ except (ROclient.ROClientException, DbException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ if exc and db_sdn:
+ db_sdn["_admin"]["operationalState"] = "ERROR"
+ db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
+ self.update_db("sdns", sdn_id, db_sdn)
+
+ async def delete_sdn(self, sdn_id, order_id):
+ logging_text = "Task delete_sdn={} ".format(sdn_id)
+ self.logger.debug(logging_text + "Enter")
+ db_sdn = None
+ exc = None
+ step = "Getting sdn from db"
+ try:
+ db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
+ if db_sdn.get("_admin") and db_sdn["_admin"].get("deploy") and db_sdn["_admin"]["deploy"].get("RO"):
+ RO_sdn_id = db_sdn["_admin"]["deploy"]["RO"]
+ RO = ROclient.ROClient(self.loop, **self.ro_config)
+ step = "Deleting sdn from RO"
+ try:
+ await RO.delete("sdn", RO_sdn_id)
+ except ROclient.ROClientException as e:
+ if e.http_code == 404: # not found
+ self.logger.debug(logging_text + "RO_sdn_id={} already deleted".format(RO_sdn_id))
+ else:
+ raise
+ else:
+ # nothing to delete
+ self.logger.error(logging_text + "Skipping. There is not RO information at database")
+ self.db.del_one("sdns", {"_id": sdn_id})
+ self.logger.debug("delete_sdn task sdn_id={} Exit Ok".format(sdn_id))
+ return None
+
+ except (ROclient.ROClientException, DbException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ if exc and db_sdn:
+ db_sdn["_admin"]["operationalState"] = "ERROR"
+ db_sdn["_admin"]["detailed-status"] = "ERROR {}: {}".format(step , exc)
+ self.update_db("sdns", sdn_id, db_sdn)
+
+ def vnfd2RO(self, vnfd, new_id=None):
+ """
+ Converts creates a new vnfd descriptor for RO base on input OSM IM vnfd
+ :param vnfd: input vnfd
+ :param new_id: overrides vnf id if provided
+ :return: copy of vnfd
+ """
+ ci_file = None
+ try:
+ vnfd_RO = deepcopy(vnfd)
+ vnfd_RO.pop("_id", None)
+ vnfd_RO.pop("_admin", None)
+ if new_id:
+ vnfd_RO["id"] = new_id
+ for vdu in vnfd_RO["vdu"]:
+ if "cloud-init-file" in vdu:
+ base_folder = vnfd["_admin"]["storage"]
+ clout_init_file = "{}/{}/cloud_init/{}".format(
+ base_folder["folder"],
+ base_folder["pkg-dir"],
+ vdu["cloud-init-file"]
+ )
+ ci_file = self.fs.file_open(clout_init_file, "r")
+ # TODO: detect if binary or text. Propose to read as binary and try to decode to utf8. If fails convert to base 64 or similar
+ clout_init_content = ci_file.read()
+ ci_file.close()
+ ci_file = None
+ vdu.pop("cloud-init-file", None)
+ vdu["cloud-init"] = clout_init_content
+ return vnfd_RO
+ except FsException as e:
+ raise LcmException("Error reading file at vnfd {}: {} ".format(vnfd["_id"], e))
+ finally:
+ if ci_file:
+ ci_file.close()
def n2vc_callback(self, model_name, application_name, workload_status, db_nsr, vnf_member_index, task=None):
"""Update the lcm database with the status of the charm.
- active: The unit is deployed, configured, and ready
- error: The charm has failed and needs attention.
- terminated: The charm has been destroyed
+ - removing,
+ - removed
Updates the network service's config-status to reflect the state of all
charms.
"""
-
- if not workload_status and not task:
- self.logger.error("Task create_ns={} n2vc_callback Enter with bad parameters")
- return
-
- if not workload_status:
- self.logger.error("Task create_ns={} n2vc_callback Enter with bad parameters, no workload_status")
- return
-
+ nsr_id = None
try:
- self.logger.debug("[n2vc_callback] Workload status \"{}\"".format(workload_status))
nsr_id = db_nsr["_id"]
nsr_lcm = db_nsr["_admin"]["deploy"]
- nsr_lcm["VCA"][vnf_member_index]['operational-status'] = workload_status
-
if task:
if task.cancelled():
+ self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} task Cancelled".format(nsr_id, vnf_member_index))
return
if task.done():
exc = task.exception()
if exc:
- nsr_lcm = db_nsr["_admin"]["deploy"]
- nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "failed"
- db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_member_index, exc)
- db_nsr["config-status"] = "failed"
- self.update_nsr_db(nsr_id, db_nsr)
+ self.logger.error(
+ "[n2vc_callback] create_ns={} vnf_index={} task Exception={}".format(nsr_id, vnf_member_index, exc))
+ nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "error"
+ nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = str(exc)
+ else:
+ self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} task Done".format(nsr_id, vnf_member_index))
+ # TODO it seams that task Done, but callback is still ongoing. For the moment comment this two lines
+ # nsr_lcm["VCA"][vnf_member_index]['operational-status'] = "active"
+ # nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
+ elif workload_status:
+ self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} Enter workload_status={}".format(nsr_id, vnf_member_index, workload_status))
+ if nsr_lcm["VCA"][vnf_member_index]['operational-status'] == workload_status:
+ return # same status, ignore
+ nsr_lcm["VCA"][vnf_member_index]['operational-status'] = workload_status
+ # TODO N2VC some error message in case of error should be obtained from N2VC
+ nsr_lcm["VCA"][vnf_member_index]['detailed-status'] = ""
else:
- vca_status = nsr_lcm["VCA"][vnf_member_index]['operational-status']
-
- units = len(nsr_lcm["VCA"])
- active = 0
- statusmap = {}
- for vnf_index in nsr_lcm["VCA"]:
- if vca_status not in statusmap:
- # Initialize it
- statusmap[vca_status] = 0
-
- statusmap[vca_status] += 1
+ self.logger.critical("[n2vc_callback] create_ns={} vnf_index={} Enter with bad parameters".format(nsr_id, vnf_member_index), exc_info=True)
+ return
- if vca_status == "active":
- active += 1
+ some_failed = False
+ all_active = True
+ status_map = {}
+ for vnf_index, vca_info in nsr_lcm["VCA"].items():
+ vca_status = vca_info["operational-status"]
+ if vca_status not in status_map:
+ # Initialize it
+ status_map[vca_status] = 0
+ status_map[vca_status] += 1
+
+ if vca_status != "active":
+ all_active = False
+ if vca_status == "error":
+ some_failed = True
+ db_nsr["config-status"] = "failed"
+ db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_member_index,
+ vca_info["detailed-status"])
+ break
- cs = ""
- for status in statusmap:
- cs += "{} ({}) ".format(status, statusmap[status])
+ if all_active:
+ self.logger.debug("[n2vc_callback] create_ns={} vnf_index={} All active".format(nsr_id, vnf_member_index))
+ db_nsr["config-status"] = "configured"
+ db_nsr["detailed-status"] = "done"
+ elif some_failed:
+ pass
+ else:
+ cs = "configuring: "
+ separator = ""
+ for status, num in status_map.items():
+ cs += separator + "{}: {}".format(status, num)
+ separator = ", "
db_nsr["config-status"] = cs
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
except Exception as e:
- # self.logger.critical("Task create_ns={} n2vc_callback Exception {}".format(nsr_id, e), exc_info=True)
- self.logger.critical("Task create_ns n2vc_callback Exception {}".format(e), exc_info=True)
- pass
-
- def vca_deploy_callback(self, db_nsr, vnf_index, status, task):
- # TODO study using this callback when VCA.DeployApplication success from VCAMonitor
- # By the moment this callback is used only to capture exception conditions from VCA DeployApplication
- nsr_id = db_nsr["_id"]
- self.logger.debug("Task create_ns={} vca_deploy_callback Enter".format(nsr_id))
- try:
- if task.cancelled():
- return
- if task.done():
- exc = task.exception()
- if exc:
- nsr_lcm = db_nsr["_admin"]["deploy"]
- nsr_lcm["VCA"][vnf_index]['operational-status'] = "failed"
- db_nsr["detailed-status"] = "fail configuring vnf_index={} {}".format(vnf_index, exc)
- db_nsr["config-status"] = "failed"
- self.update_nsr_db(nsr_id, db_nsr)
- else:
- # TODO may be used to be called when VCA monitor status changes
- pass
- # except DbException as e:
- # self.logger.error("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id, e))
- except Exception as e:
- self.logger.critical("Task create_ns={} vca_deploy_callback Exception {}".format(nsr_id, e), exc_info=True)
+ self.logger.critical("[n2vc_callback] create_ns={} vnf_index={} Exception {}".format(nsr_id, vnf_member_index, e), exc_info=True)
async def create_ns(self, nsr_id, order_id):
logging_text = "Task create_ns={} ".format(nsr_id)
try:
db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
nsd = db_nsr["nsd"]
+ nsr_name = db_nsr["name"] # TODO short-name??
needed_vnfd = {}
for c_vnf in nsd["constituent-vnfd"]:
vnfd_id = c_vnf["vnfd-id-ref"]
self.logger.debug(logging_text + "RO vnfd={} exist. Using RO_id={}".format(
vnfd_id, vnfd_list[0]["uuid"]))
else:
- vnfd_RO = deepcopy(vnfd)
- vnfd_RO.pop("_id", None)
- vnfd_RO.pop("_admin", None)
- vnfd_RO["id"] = vnfd_id_RO
+ vnfd_RO = self.vnfd2RO(vnfd, vnfd_id_RO)
desc = await RO.create("vnfd", descriptor=vnfd_RO)
nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
# create nsd at RO
nsd_id = nsd["id"]
c_vnf["vnfd-id-ref"] = nsr_id + "." + vnfd_id[:200]
desc = await RO.create("nsd", descriptor=nsd_RO)
nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
# Crate ns at RO
# if present use it unless in error status
scenario=nsr_lcm["RO"]["nsd_id"])
RO_nsr_id = nsr_lcm["RO"]["nsr_id"] = desc["uuid"]
nsr_lcm["RO"]["nsr_status"] = "BUILD"
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
# wait until NS is ready
step = ns_status_detailed = "Waiting ns ready at RO"
raise ROclient.ROClientException(ns_status_info)
elif ns_status == "BUILD":
db_nsr["detailed-status"] = ns_status_detailed + "; {}".format(ns_status_info)
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
elif ns_status == "ACTIVE":
nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
break
if deloyment_timeout <= 0:
raise ROclient.ROClientException("Timeout waiting ns to be ready")
db_nsr["detailed-status"] = "Configuring vnfr"
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
vnfd_to_config = 0
step = "Looking for needed vnfd to configure"
# Note: The charm needs to exist on disk at the location
# specified by charm_path.
base_folder = vnfd["_admin"]["storage"]
+ storage_params = self.fs.get_params()
charm_path = "{}{}/{}/charms/{}".format(
- base_folder["path"],
+ storage_params["path"],
base_folder["folder"],
- base_folder["file"],
+ base_folder["pkg-dir"],
proxy_charm
)
'rw_mgmt_ip': nsr_lcm['nsr_ip'][vnf_index],
}
- # ns_name will be ignored in the current version of N2VC
+ # model_name will be ignored in the current version of N2VC
# but will be implemented for the next point release.
- ns_name = 'default'
+ model_name = 'default'
application_name = self.n2vc.FormatApplicationName(
- 'default',
- vnfd['name'],
+ nsr_name, # 'default',
vnf_index,
+ vnfd['name'],
)
+ # TODO N2VC implement this inside n2vc.FormatApplicationName
+ application_name = application_name[:50]
nsr_lcm["VCA"][vnf_index] = {
- "model": ns_name,
+ "model": model_name,
"application": application_name,
"operational-status": "init",
+ "detailed-status": "",
"vnfd_id": vnfd_id,
}
- self.logger.debug("Passing artifacts path '{}' for {}".format(charm_path, proxy_charm))
+ self.logger.debug("Task create_ns={} Passing artifacts path '{}' for {}".format(nsr_id, charm_path, proxy_charm))
task = asyncio.ensure_future(
self.n2vc.DeployCharms(
- ns_name, # The network service name
+ model_name, # The network service name
application_name, # The application name
vnfd, # The vnf descriptor
charm_path, # Path to charm
None, # Callback parameter (task)
)
)
- task.add_done_callback(functools.partial(self.n2vc_callback, None, None, None, None, db_nsr))
+ task.add_done_callback(functools.partial(self.n2vc_callback, model_name, application_name,
+ None, db_nsr, vnf_index))
- self.lcm_tasks[nsr_id][order_id]["create_charm:" + vnf_index] = task
+ self.lcm_ns_tasks[nsr_id][order_id]["create_charm:" + vnf_index] = task
db_nsr["config-status"] = "configuring" if vnfd_to_config else "configured"
- db_nsr["detailed-status"] = "Configuring 1/{}".format(vnfd_to_config) if vnfd_to_config else "done"
+ db_nsr["detailed-status"] = "configuring: init: {}".format(vnfd_to_config) if vnfd_to_config else "done"
db_nsr["operational-status"] = "running"
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
- self.logger.debug("create_ns task nsr_id={} Exit Ok".format(nsr_id))
+ self.logger.debug("Task create_ns={} Exit Ok".format(nsr_id))
return nsr_lcm
- except (ROclient.ROClientException, DbException) as e:
+ except (ROclient.ROClientException, DbException, LcmException) as e:
self.logger.error(logging_text + "Exit Exception {}".format(e))
exc = e
except Exception as e:
if exc and db_nsr:
db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
db_nsr["operational-status"] = "failed"
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
async def delete_ns(self, nsr_id, order_id):
logging_text = "Task delete_ns={} ".format(nsr_id)
nsd = db_nsr["nsd"]
nsr_lcm = db_nsr["_admin"]["deploy"]
- db_nsr["operational-status"] = "terminate"
- db_nsr["config-status"] = "terminate"
+ db_nsr["operational-status"] = "terminating"
+ db_nsr["config-status"] = "terminating"
db_nsr["detailed-status"] = "Deleting charms"
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
try:
- step = db_nsr["detailed-status"] = "Deleting charms"
self.logger.debug(logging_text + step)
for vnf_index, deploy_info in nsr_lcm["VCA"].items():
if deploy_info and deploy_info.get("application"):
vnf_index,
)
)
-
- self.lcm_tasks[nsr_id][order_id]["delete_charm:" + vnf_index] = task
+ # task.add_done_callback(functools.partial(self.n2vc_callback, deploy_info['model'],
+ # deploy_info['application'],None, db_nsr, vnf_index))
+ self.lcm_ns_tasks[nsr_id][order_id]["delete_charm:" + vnf_index] = task
except Exception as e:
self.logger.debug(logging_text + "Failed while deleting charms: {}".format(e))
# remove from RO
self.logger.debug(logging_text + "RO_ns_id={} delete conflict: {}".format(RO_nsr_id, e))
else:
self.logger.error(logging_text + "RO_ns_id={} delete error: {}".format(RO_nsr_id, e))
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
# Delete nsd
RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
self.logger.debug(logging_text + "RO_nsd_id={} delete conflict: {}".format(RO_nsd_id, e))
else:
self.logger.error(logging_text + "RO_nsd_id={} delete error: {}".format(RO_nsd_id, e))
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
if not RO_vnfd_id:
self.logger.debug(logging_text + "RO_vnfd_id={} delete conflict: {}".format(RO_vnfd_id, e))
else:
self.logger.error(logging_text + "RO_vnfd_id={} delete error: {}".format(RO_vnfd_id, e))
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
# TODO delete from database or mark as deleted???
db_nsr["operational-status"] = "terminated"
if exc and db_nsr:
db_nsr["detailed-status"] = "ERROR {}: {}".format(step , exc)
db_nsr["operational-status"] = "failed"
- self.update_nsr_db(nsr_id, db_nsr)
+ self.update_db("nsrs", nsr_id, db_nsr)
async def test(self, param=None):
self.logger.debug("Starting/Ending test task: {}".format(param))
- def cancel_tasks(self, nsr_id):
+ def cancel_tasks(self, topic, _id):
"""
- Cancel all active tasks of a concrete nsr identified for nsr_id
- :param nsr_id: nsr identity
+ Cancel all active tasks of a concrete nsr or vim identified for _id
+ :param topic: can be ns or vim_account
+ :param _id: nsr or vim identity
:return: None, or raises an exception if not possible
"""
- if not self.lcm_tasks.get(nsr_id):
+ if topic == "ns":
+ lcm_tasks = self.lcm_ns_tasks
+ elif topic== "vim_account":
+ lcm_tasks = self.lcm_vim_tasks
+ elif topic== "sdn":
+ lcm_tasks = self.lcm_sdn_tasks
+
+ if not lcm_tasks.get(_id):
return
- for order_id, tasks_set in self.lcm_tasks[nsr_id].items():
+ for order_id, tasks_set in lcm_tasks[_id].items():
for task_name, task in tasks_set.items():
result = task.cancel()
if result:
- self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name))
- self.lcm_tasks[nsr_id] = {}
+ self.logger.debug("{} _id={} order_id={} task={} cancelled".format(topic, _id, order_id, task_name))
+ lcm_tasks[_id] = {}
async def read_kafka(self):
- self.logger.debug("kafka task Enter")
+ self.logger.debug("Task Kafka Enter")
order_id = 1
# future = asyncio.Future()
+ consecutive_errors = 0
+ while consecutive_errors < 10:
+ try:
+ topic, command, params = await self.msg.aioread(("ns", "vim_account", "sdn"), self.loop)
+ self.logger.debug("Task Kafka receives {} {}: {}".format(topic, command, params))
+ consecutive_errors = 0
+ order_id += 1
+ if command == "exit":
+ print("Bye!")
+ break
+ elif command.startswith("#"):
+ continue
+ elif command == "echo":
+ # just for test
+ print(params)
+ sys.stdout.flush()
+ continue
+ elif command == "test":
+ asyncio.Task(self.test(params), loop=self.loop)
+ continue
- while True:
- command, params = await self.msg.aioread("ns", self.loop)
- order_id += 1
- if command == "exit":
- print("Bye!")
- break
- elif command.startswith("#"):
- continue
- elif command == "echo":
- # just for test
- print(params)
- elif command == "test":
- asyncio.Task(self.test(params), loop=self.loop)
- elif command == "break":
- print("put a break in this line of code")
- elif command == "create":
- nsr_id = params.strip()
- self.logger.debug("Deploying NS {}".format(nsr_id))
- task = asyncio.ensure_future(self.create_ns(nsr_id, order_id))
- if nsr_id not in self.lcm_tasks:
- self.lcm_tasks[nsr_id] = {}
- self.lcm_tasks[nsr_id][order_id] = {"create_ns": task}
- elif command == "delete":
- nsr_id = params.strip()
- self.logger.debug("Deleting NS {}".format(nsr_id))
- self.cancel_tasks(nsr_id)
- task = asyncio.ensure_future(self.delete_ns(nsr_id, order_id))
- if nsr_id not in self.lcm_tasks:
- self.lcm_tasks[nsr_id] = {}
- self.lcm_tasks[nsr_id][order_id] = {"delete_ns": task}
- elif command == "show":
- # just for test
- nsr_id = params.strip()
- try:
- db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
- print("nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
- "{}\n deploy: {}\n tasks: {}".format(
+ if topic == "ns":
+ nsr_id = params.strip()
+ if command == "create":
+ # self.logger.debug("Deploying NS {}".format(nsr_id))
+ task = asyncio.ensure_future(self.create_ns(nsr_id, order_id))
+ if nsr_id not in self.lcm_ns_tasks:
+ self.lcm_ns_tasks[nsr_id] = {}
+ self.lcm_ns_tasks[nsr_id][order_id] = {"create_ns": task}
+ continue
+ elif command == "delete":
+ # self.logger.debug("Deleting NS {}".format(nsr_id))
+ self.cancel_tasks(topic, nsr_id)
+ task = asyncio.ensure_future(self.delete_ns(nsr_id, order_id))
+ if nsr_id not in self.lcm_ns_tasks:
+ self.lcm_ns_tasks[nsr_id] = {}
+ self.lcm_ns_tasks[nsr_id][order_id] = {"delete_ns": task}
+ continue
+ elif command == "show":
+ try:
+ db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
+ print(
+ "nsr:\n _id={}\n operational-status: {}\n config-status: {}\n detailed-status: "
+ "{}\n deploy: {}\n tasks: {}".format(
nsr_id, db_nsr["operational-status"],
db_nsr["config-status"], db_nsr["detailed-status"],
- db_nsr["_admin"]["deploy"], self.lcm_tasks.get(nsr_id)))
- except Exception as e:
- print("nsr {} not found: {}".format(nsr_id, e))
- else:
- self.logger.critical("unknown command '{}'".format(command))
- self.logger.debug("kafka task Exit")
-
+ db_nsr["_admin"]["deploy"], self.lcm_ns_tasks.get(nsr_id)))
+ except Exception as e:
+ print("nsr {} not found: {}".format(nsr_id, e))
+ sys.stdout.flush()
+ continue
+ elif topic == "vim_account":
+ vim_id = params["_id"]
+ if command == "create":
+ task = asyncio.ensure_future(self.create_vim(params, order_id))
+ if vim_id not in self.lcm_vim_tasks:
+ self.lcm_vim_tasks[vim_id] = {}
+ self.lcm_vim_tasks[vim_id][order_id] = {"create_vim": task}
+ continue
+ elif command == "delete":
+ self.cancel_tasks(topic, vim_id)
+ task = asyncio.ensure_future(self.delete_vim(vim_id, order_id))
+ if vim_id not in self.lcm_vim_tasks:
+ self.lcm_vim_tasks[vim_id] = {}
+ self.lcm_vim_tasks[vim_id][order_id] = {"delete_vim": task}
+ continue
+ elif command == "show":
+ print("not implemented show with vim_account")
+ sys.stdout.flush()
+ continue
+ elif command == "edit":
+ task = asyncio.ensure_future(self.edit_vim(vim_id, order_id))
+ if vim_id not in self.lcm_vim_tasks:
+ self.lcm_vim_tasks[vim_id] = {}
+ self.lcm_vim_tasks[vim_id][order_id] = {"edit_vim": task}
+ continue
+ elif topic == "sdn":
+ _sdn_id = params["_id"]
+ if command == "create":
+ task = asyncio.ensure_future(self.create_sdn(params, order_id))
+ if _sdn_id not in self.lcm_sdn_tasks:
+ self.lcm_sdn_tasks[_sdn_id] = {}
+ self.lcm_sdn_tasks[_sdn_id][order_id] = {"create_sdn": task}
+ continue
+ elif command == "delete":
+ self.cancel_tasks(topic, _sdn_id)
+ task = asyncio.ensure_future(self.delete_sdn(_sdn_id, order_id))
+ if _sdn_id not in self.lcm_sdn_tasks:
+ self.lcm_sdn_tasks[_sdn_id] = {}
+ self.lcm_sdn_tasks[_sdn_id][order_id] = {"delete_sdn": task}
+ continue
+ elif command == "edit":
+ task = asyncio.ensure_future(self.edit_sdn(_sdn_id, order_id))
+ if _sdn_id not in self.lcm_sdn_tasks:
+ self.lcm_sdn_tasks[_sdn_id] = {}
+ self.lcm_sdn_tasks[_sdn_id][order_id] = {"edit_sdn": task}
+ continue
+ self.logger.critical("unknown topic {} and command '{}'".format(topic, command))
+ except Exception as e:
+ if consecutive_errors == 5:
+ self.logger.error("Task Kafka task exit error too many errors. Exception: {}".format(e))
+ break
+ else:
+ consecutive_errors += 1
+ self.logger.error("Task Kafka Exception {}".format(e))
+ await asyncio.sleep(1, loop=self.loop)
+ self.logger.debug("Task Kafka terminating")
+ # TODO
+ # self.cancel_tasks("ALL", "create")
+ # timeout = 200
+ # while self.is_pending_tasks():
+ # self.logger.debug("Task Kafka terminating. Waiting for tasks termination")
+ # await asyncio.sleep(2, loop=self.loop)
+ # timeout -= 2
+ # if not timeout:
+ # self.cancel_tasks("ALL", "ALL")
+ self.logger.debug("Task Kafka exit")
def start(self):
self.loop = asyncio.get_event_loop()
self.loop.run_until_complete(self.read_kafka())
self.loop.close()
self.loop = None
+ if self.db:
+ self.db.db_disconnect()
+ if self.msg:
+ self.msg.disconnect()
+ if self.fs:
+ self.fs.fs_disconnect()
def read_config_file(self, config_file):
return conf
except Exception as e:
self.logger.critical("At config file '{}': {}".format(config_file, e))
+ exit(1)
if __name__ == '__main__':