elif item in ("vim", "vim_account", "datacenters"):
if len(indata) == 1 and "datacenter" in indata:
clean_indata = indata["datacenter"]
+ elif item == "wim":
+ if len(indata) == 1 and "wim" in indata:
+ clean_indata = indata["wim"]
+ elif item == "wim_account":
+ if len(indata) == 1 and "wim_account" in indata:
+ clean_indata = indata["wim_account"]
elif item == "ns" or item == "instances":
if len(indata) == 1 and "instance" in indata:
clean_indata = indata["instance"]
class ROClient:
headers_req = {'Accept': 'application/yaml', 'content-type': 'application/yaml'}
client_to_RO = {'tenant': 'tenants', 'vim': 'datacenters', 'vim_account': 'datacenters', 'sdn': 'sdn_controllers',
- 'vnfd': 'vnfs', 'nsd': 'scenarios',
+ 'vnfd': 'vnfs', 'nsd': 'scenarios', 'wim': 'wims', 'wim_account': 'wims',
'ns': 'instances'}
mandatory_for_create = {
'tenant': ("name", ),
'nsd': ("name", "id"),
'ns': ("name", "scenario", "datacenter"),
'vim': ("name", "vim_url"),
+ 'wim': ("name", "wim_url"),
'vim_account': (),
+ 'wim_account': (),
'sdn': ("name", "port", 'ip', 'dpid', 'type'),
}
timeout_large = 120
return {'tenant': indata}
elif item in ("vim", "vim_account", "datacenter"):
return {'datacenter': indata}
+ elif item == "wim":
+ return {'wim': indata}
+ elif item == "wim_account":
+ return {'wim_account': indata}
elif item == "ns" or item == "instances":
return {'instance': indata}
elif item == "sdn":
if not self.check_if_uuid(item_id_name):
# check that exist
_all_tenants = all_tenants
- if item == "datacenters":
+ if item in ("datacenters", 'wims'):
_all_tenants = True
uuid = await self._get_item_uuid(session, item, item_id_name, all_tenants=_all_tenants)
else:
try:
if item not in self.client_to_RO:
raise ROClientException("Invalid item {}".format(item))
- if item == 'tenant' or item == 'vim':
+ if item in ('tenant', 'vim', 'wim'):
all_tenants = None
with aiohttp.ClientSession(loop=self.loop) as session:
raise ROClientException("'{}' is mandatory parameter for {}".format(mandatory, item))
all_tenants = False
- if item in ('tenant', 'vim'):
+ if item in ('tenant', 'vim', 'wim'):
all_tenants = None
create_desc = self._create_envelop(item, desc)
except asyncio.TimeoutError:
raise ROClientException("Timeout", http_code=504)
- async def attach_datacenter(self, datacenter=None, descriptor=None, descriptor_format=None, **kwargs):
-
+ async def attach(self, item, item_id_name=None, descriptor=None, descriptor_format=None, **kwargs):
+ """
+ Attach a datacenter or wim to a tenant, creating a vim_account, wim_account
+ :param item: can be vim_account or wim_account
+ :param item_id_name: id or name of the datacenter, wim
+ :param descriptor:
+ :param descriptor_format:
+ :param kwargs:
+ :return:
+ """
try:
if isinstance(descriptor, str):
descriptor = self._parse(descriptor, descriptor_format)
pass
else:
descriptor = {}
- desc = remove_envelop("vim", descriptor)
+
+ desc = remove_envelop(item, descriptor)
# # check that exist
# uuid = self._get_item_uuid(session, "datacenters", uuid_name, all_tenants=True)
if kwargs:
desc = self.update_descriptor(desc, kwargs)
- if not desc.get("vim_tenant_name") and not desc.get("vim_tenant_id"):
- raise ROClientException("Wrong descriptor. At least vim_tenant_name or vim_tenant_id must be provided")
- create_desc = self._create_envelop("vim", desc)
+ if item == "vim_account":
+ if not desc.get("vim_tenant_name") and not desc.get("vim_tenant_id"):
+ raise ROClientException("Wrong descriptor. At least vim_tenant_name or vim_tenant_id must be "
+ "provided")
+ elif item != "wim_account":
+ raise ROClientException("Attach with unknown item {}. Must be 'vim_account' or 'wim_account'".
+ format(item))
+ create_desc = self._create_envelop(item, desc)
payload_req = yaml.safe_dump(create_desc)
with aiohttp.ClientSession(loop=self.loop) as session:
# check that exist
- item_id = await self._get_item_uuid(session, "datacenters", datacenter, all_tenants=True)
+ item_id = await self._get_item_uuid(session, self.client_to_RO[item], item_id_name, all_tenants=True)
await self._get_tenant(session)
- url = "{}/{tenant}/datacenters/{datacenter}".format(self.endpoint_url, tenant=self.tenant,
- datacenter=item_id)
+ url = "{}/{tenant}/{item}/{item_id}".format(self.endpoint_url, tenant=self.tenant,
+ item=self.client_to_RO[item], item_id=item_id)
self.logger.debug("RO POST %s %s", url, payload_req)
with aiohttp.Timeout(self.timeout_large):
async with session.post(url, headers=self.headers_req, data=payload_req) as response:
raise ROClientException(response_text, http_code=response.status)
response_desc = self._parse_yaml(response_text, response=True)
- desc = remove_envelop("vim", response_desc)
+ desc = remove_envelop(item, response_desc)
return desc
except aiohttp.errors.ClientOSError as e:
raise ROClientException(e, http_code=504)
except asyncio.TimeoutError:
raise ROClientException("Timeout", http_code=504)
- async def detach_datacenter(self, datacenter=None):
+ async def detach(self, item, item_id_name=None):
# TODO replace the code with delete_item(vim_account,...)
try:
with aiohttp.ClientSession(loop=self.loop) as session:
# check that exist
- item_id = await self._get_item_uuid(session, "datacenters", datacenter, all_tenants=False)
+ item_id = await self._get_item_uuid(session, self.client_to_RO[item], item_id_name, all_tenants=False)
tenant = await self._get_tenant(session)
- url = "{}/{tenant}/datacenters/{datacenter}".format(self.endpoint_url, tenant=tenant,
- datacenter=item_id)
+ url = "{}/{tenant}/{item}/{datacenter}".format(self.endpoint_url, tenant=tenant,
+ item=self.client_to_RO[item], datacenter=item_id)
self.logger.debug("RO DELETE %s", url)
with aiohttp.Timeout(self.timeout_large):
async with session.delete(url, headers=self.headers_req) as response:
raise ROClientException(response_text, http_code=response.status)
response_desc = self._parse_yaml(response_text, response=True)
- desc = remove_envelop("vim", response_desc)
+ desc = remove_envelop(item, response_desc)
return desc
except aiohttp.errors.ClientOSError as e:
raise ROClientException(e, http_code=504)
__author__ = "Alfonso Tierno"
-min_RO_version = [0, 5, 84]
+min_RO_version = [0, 6, 0]
min_n2vc_version = "0.0.2"
min_common_version = "0.1.11"
# uncomment if LCM is installed as library and installed, and get them from __init__.py
self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config,
self.vca_config, self.loop)
self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
+ self.wim = vim_sdn.WimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
async def check_RO_version(self):
first_start = True
while consecutive_errors < 10:
try:
- topics = ("admin", "ns", "vim_account", "sdn", "nsi")
+ topics = ("admin", "ns", "vim_account", "wim_account", "sdn", "nsi")
topic, command, params = await self.msg.aioread(topics, self.loop)
if topic != "admin" and command != "ping":
self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
task = asyncio.ensure_future(self.vim.edit(params, order_id))
self.lcm_tasks.register("vim_account", vim_id, order_id, "vim_edit", task)
continue
+ elif topic == "wim_account":
+ wim_id = params["_id"]
+ if command == "create":
+ task = asyncio.ensure_future(self.wim.create(params, order_id))
+ self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_create", task)
+ continue
+ elif command == "delete":
+ self.lcm_tasks.cancel(topic, wim_id)
+ task = asyncio.ensure_future(self.wim.delete(wim_id, order_id))
+ self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_delete", task)
+ continue
+ elif command == "show":
+ print("not implemented show with wim_account")
+ sys.stdout.flush()
+ continue
+ elif command == "edit":
+ task = asyncio.ensure_future(self.wim.edit(params, order_id))
+ self.lcm_tasks.register("wim_account", wim_id, order_id, "wim_edit", task)
+ continue
elif topic == "sdn":
_sdn_id = params["_id"]
if command == "create":
desc = await RO.create("vim", descriptor=vim_RO)
RO_vim_id = desc["uuid"]
db_vim_update["_admin.deployed.RO"] = RO_vim_id
+ self.logger.debug(logging_text + "VIM created at RO_vim_id={}".format(RO_vim_id))
step = "Creating vim_account at RO"
db_vim_update["_admin.detailed-status"] = step
schema_version=schema_version,
salt=vim_id)
- desc = await RO.attach_datacenter(RO_vim_id, descriptor=vim_account_RO)
+ desc = await RO.attach("vim_account", RO_vim_id, descriptor=vim_account_RO)
db_vim_update["_admin.deployed.RO-account"] = desc["uuid"]
db_vim_update["_admin.operationalState"] = "ENABLED"
db_vim_update["_admin.detailed-status"] = "Done"
# await asyncio.sleep(15) # TODO remove. This is for test
- self.logger.debug(logging_text + "Exit Ok RO_vim_id={}".format(RO_vim_id))
+ self.logger.debug(logging_text + "Exit Ok VIM account created at RO_vim_account_id={}".format(desc["uuid"]))
return
except (ROclient.ROClientException, DbException) as e:
RO = ROclient.ROClient(self.loop, **self.ro_config)
step = "Detaching vim from RO tenant"
try:
- await RO.detach_datacenter(RO_vim_id)
+ await RO.detach("vim_account", RO_vim_id)
except ROclient.ROClientException as e:
if e.http_code == 404: # not found
self.logger.debug(logging_text + "RO_vim_id={} already detached".format(RO_vim_id))
self.lcm_tasks.remove("vim_account", vim_id, order_id)
+class WimLcm(LcmBase):
+ # values that are encrypted at wim config because they are passwords
+ wim_config_encrypted = ()
+
+ def __init__(self, db, msg, fs, lcm_tasks, ro_config, loop):
+ """
+ Init, Connect to database, filesystem storage, and messaging
+ :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
+ :return: None
+ """
+
+ self.logger = logging.getLogger('lcm.vim')
+ self.loop = loop
+ self.lcm_tasks = lcm_tasks
+ self.ro_config = ro_config
+
+ super().__init__(db, msg, fs, self.logger)
+
+ async def create(self, wim_content, order_id):
+ wim_id = wim_content["_id"]
+ logging_text = "Task wim_create={} ".format(wim_id)
+ self.logger.debug(logging_text + "Enter")
+ db_wim = None
+ db_wim_update = {}
+ exc = None
+ try:
+ step = "Getting wim-id='{}' from db".format(wim_id)
+ db_wim = self.db.get_one("wim_accounts", {"_id": wim_id})
+ db_wim_update["_admin.deployed.RO"] = None
+
+ step = "Creating wim at RO"
+ db_wim_update["_admin.detailed-status"] = step
+ self.update_db_2("wim_accounts", wim_id, db_wim_update)
+ RO = ROclient.ROClient(self.loop, **self.ro_config)
+ wim_RO = deepcopy(wim_content)
+ wim_RO.pop("_id", None)
+ wim_RO.pop("_admin", None)
+ schema_version = wim_RO.pop("schema_version", None)
+ wim_RO.pop("schema_type", None)
+ wim_RO.pop("wim_tenant_name", None)
+ wim_RO["type"] = wim_RO.pop("wim_type")
+ wim_RO.pop("wim_user", None)
+ wim_RO.pop("wim_password", None)
+ desc = await RO.create("wim", descriptor=wim_RO)
+ RO_wim_id = desc["uuid"]
+ db_wim_update["_admin.deployed.RO"] = RO_wim_id
+ self.logger.debug(logging_text + "WIM created at RO_wim_id={}".format(RO_wim_id))
+
+ step = "Creating wim_account at RO"
+ db_wim_update["_admin.detailed-status"] = step
+ self.update_db_2("wim_accounts", wim_id, db_wim_update)
+
+ if wim_content.get("wim_password"):
+ wim_content["wim_password"] = self.db.decrypt(wim_content["wim_password"],
+ schema_version=schema_version,
+ salt=wim_id)
+ wim_account_RO = {"name": wim_content["name"],
+ "user": wim_content["user"],
+ "password": wim_content["password"]
+ }
+ if wim_RO.get("config"):
+ wim_account_RO["config"] = wim_RO["config"]
+ if "wim_port_mapping" in wim_account_RO["config"]:
+ del wim_account_RO["config"]["wim_port_mapping"]
+ for p in self.wim_config_encrypted:
+ if wim_account_RO["config"].get(p):
+ wim_account_RO["config"][p] = self.db.decrypt(wim_account_RO["config"][p],
+ schema_version=schema_version,
+ salt=wim_id)
+
+ desc = await RO.attach("wim_account", RO_wim_id, descriptor=wim_account_RO)
+ db_wim_update["_admin.deployed.RO-account"] = desc["uuid"]
+ db_wim_update["_admin.operationalState"] = "ENABLED"
+ db_wim_update["_admin.detailed-status"] = "Done"
+
+ self.logger.debug(logging_text + "Exit Ok WIM account created at RO_wim_account_id={}".format(desc["uuid"]))
+ return
+
+ except (ROclient.ROClientException, DbException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ if exc and db_wim:
+ db_wim_update["_admin.operationalState"] = "ERROR"
+ db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
+ if db_wim_update:
+ self.update_db_2("wim_accounts", wim_id, db_wim_update)
+ self.lcm_tasks.remove("wim_account", wim_id, order_id)
+
+ async def edit(self, wim_content, order_id):
+ wim_id = wim_content["_id"]
+ logging_text = "Task wim_edit={} ".format(wim_id)
+ self.logger.debug(logging_text + "Enter")
+ db_wim = None
+ exc = None
+ RO_wim_id = None
+ db_wim_update = {}
+ step = "Getting wim-id='{}' from db".format(wim_id)
+ try:
+ db_wim = self.db.get_one("wim_accounts", {"_id": wim_id})
+
+ # look if previous tasks in process
+ task_name, task_dependency = self.lcm_tasks.lookfor_related("wim_account", wim_id, order_id)
+ if task_dependency:
+ step = "Waiting for related tasks to be completed: {}".format(task_name)
+ self.logger.debug(logging_text + step)
+ # TODO write this to database
+ _, pending = await asyncio.wait(task_dependency, timeout=3600)
+ if pending:
+ raise LcmException("Timeout waiting related tasks to be completed")
+
+ if db_wim.get("_admin") and db_wim["_admin"].get("deployed") and db_wim["_admin"]["deployed"].get("RO"):
+
+ RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
+ step = "Editing wim at RO"
+ RO = ROclient.ROClient(self.loop, **self.ro_config)
+ wim_RO = deepcopy(wim_content)
+ wim_RO.pop("_id", None)
+ wim_RO.pop("_admin", None)
+ schema_version = wim_RO.pop("schema_version", None)
+ wim_RO.pop("schema_type", None)
+ wim_RO.pop("wim_tenant_name", None)
+ if "wim_type" in wim_RO:
+ wim_RO["type"] = wim_RO.pop("wim_type")
+ wim_RO.pop("wim_user", None)
+ wim_RO.pop("wim_password", None)
+ # TODO make a deep update of wim_port_mapping
+ if wim_RO:
+ await RO.edit("wim", RO_wim_id, descriptor=wim_RO)
+
+ step = "Editing wim-account at RO tenant"
+ wim_account_RO = {}
+ if "config" in wim_content:
+ if "wim_port_mapping" in wim_content["config"]:
+ del wim_content["config"]["wim_port_mapping"]
+ if not wim_content["config"]:
+ del wim_content["config"]
+ if "wim_tenant_name" in wim_content:
+ wim_account_RO["wim_tenant_name"] = wim_content["wim_tenant_name"]
+ if "wim_password" in wim_content:
+ wim_account_RO["wim_password"] = wim_content["wim_password"]
+ if wim_content.get("wim_password"):
+ wim_account_RO["wim_password"] = self.db.decrypt(wim_content["wim_password"],
+ schema_version=schema_version,
+ salt=wim_id)
+ if "config" in wim_content:
+ wim_account_RO["config"] = wim_content["config"]
+ if wim_content.get("config"):
+ for p in self.wim_config_encrypted:
+ if wim_content["config"].get(p):
+ wim_account_RO["config"][p] = self.db.decrypt(wim_content["config"][p],
+ schema_version=schema_version,
+ salt=wim_id)
+
+ if "wim_user" in wim_content:
+ wim_content["wim_username"] = wim_content["wim_user"]
+ # wim_account must be edited always even if empty in order to ensure changes are translated to RO
+ # wim_thread. RO will remove and relaunch a new thread for this wim_account
+ await RO.edit("wim_account", RO_wim_id, descriptor=wim_account_RO)
+ db_wim_update["_admin.operationalState"] = "ENABLED"
+
+ self.logger.debug(logging_text + "Exit Ok RO_wim_id={}".format(RO_wim_id))
+ return
+
+ except (ROclient.ROClientException, DbException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ if exc and db_wim:
+ db_wim_update["_admin.operationalState"] = "ERROR"
+ db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
+ if db_wim_update:
+ self.update_db_2("wim_accounts", wim_id, db_wim_update)
+ self.lcm_tasks.remove("wim_account", wim_id, order_id)
+
+ async def delete(self, wim_id, order_id):
+ logging_text = "Task wim_delete={} ".format(wim_id)
+ self.logger.debug(logging_text + "Enter")
+ db_wim = None
+ db_wim_update = {}
+ exc = None
+ step = "Getting wim from db"
+ try:
+ db_wim = self.db.get_one("wim_accounts", {"_id": wim_id})
+ if db_wim.get("_admin") and db_wim["_admin"].get("deployed") and db_wim["_admin"]["deployed"].get("RO"):
+ RO_wim_id = db_wim["_admin"]["deployed"]["RO"]
+ RO = ROclient.ROClient(self.loop, **self.ro_config)
+ step = "Detaching wim from RO tenant"
+ try:
+ await RO.detach("wim_account", RO_wim_id)
+ except ROclient.ROClientException as e:
+ if e.http_code == 404: # not found
+ self.logger.debug(logging_text + "RO_wim_id={} already detached".format(RO_wim_id))
+ else:
+ raise
+
+ step = "Deleting wim from RO"
+ try:
+ await RO.delete("wim", RO_wim_id)
+ except ROclient.ROClientException as e:
+ if e.http_code == 404: # not found
+ self.logger.debug(logging_text + "RO_wim_id={} already deleted".format(RO_wim_id))
+ else:
+ raise
+ else:
+ # nothing to delete
+ self.logger.error(logging_text + "Nohing to remove at RO")
+ self.db.del_one("wim_accounts", {"_id": wim_id})
+ self.logger.debug(logging_text + "Exit Ok")
+ return
+
+ except (ROclient.ROClientException, DbException) as e:
+ self.logger.error(logging_text + "Exit Exception {}".format(e))
+ exc = e
+ except Exception as e:
+ self.logger.critical(logging_text + "Exit Exception {}".format(e), exc_info=True)
+ exc = e
+ finally:
+ self.lcm_tasks.remove("wim_account", wim_id, order_id)
+ if exc and db_wim:
+ db_wim_update["_admin.operationalState"] = "ERROR"
+ db_wim_update["_admin.detailed-status"] = "ERROR {}: {}".format(step, exc)
+ if db_wim_update:
+ self.update_db_2("wim_accounts", wim_id, db_wim_update)
+ self.lcm_tasks.remove("wim_account", wim_id, order_id)
+
+
class SdnLcm(LcmBase):
def __init__(self, db, msg, fs, lcm_tasks, ro_config, loop):
step = "Getting sdn from db"
try:
db_sdn = self.db.get_one("sdns", {"_id": sdn_id})
+ RO_sdn_id = None
if db_sdn.get("_admin") and db_sdn["_admin"].get("deployed") and db_sdn["_admin"]["deployed"].get("RO"):
RO_sdn_id = db_sdn["_admin"]["deployed"]["RO"]
RO = ROclient.ROClient(self.loop, **self.ro_config)
await RO.edit("sdn", RO_sdn_id, descriptor=sdn_RO)
db_sdn_update["_admin.operationalState"] = "ENABLED"
- self.logger.debug(logging_text + "Exit Ok RO_sdn_id".format(RO_sdn_id))
+ self.logger.debug(logging_text + "Exit Ok RO_sdn_id={}".format(RO_sdn_id))
return
except (ROclient.ROClientException, DbException) as e: