From 0aef0dbb3c8b50426f31812e7f386dc9188823d2 Mon Sep 17 00:00:00 2001 From: tierno Date: Thu, 1 Feb 2018 19:13:07 +0100 Subject: [PATCH] lightweight build structure Change-Id: I7a04acdd31dd6ce97546fd762c3c5d550387806d Signed-off-by: tierno --- lcm/lcm.py | 380 ----------------------------- lcm/osm_common/__init__.py | 0 lcm/{ => osm_common}/dbbase.py | 4 +- lcm/{ => osm_common}/dbmemory.py | 0 lcm/osm_common/dbmongo.py | 153 ++++++++++++ lcm/osm_common/fsbase.py | 40 +++ lcm/osm_common/fslocal.py | 108 +++++++++ lcm/osm_common/msgbase.py | 39 +++ lcm/osm_common/msglocal.py | 78 ++++++ lcm/{ => osm_lcm}/ROclient.py | 0 lcm/osm_lcm/lcm.cfg | 45 ++++ lcm/osm_lcm/lcm.py | 403 +++++++++++++++++++++++++++++++ 12 files changed, 868 insertions(+), 382 deletions(-) delete mode 100644 lcm/lcm.py create mode 100644 lcm/osm_common/__init__.py rename lcm/{ => osm_common}/dbbase.py (88%) rename lcm/{ => osm_common}/dbmemory.py (100%) create mode 100644 lcm/osm_common/dbmongo.py create mode 100644 lcm/osm_common/fsbase.py create mode 100644 lcm/osm_common/fslocal.py create mode 100644 lcm/osm_common/msgbase.py create mode 100644 lcm/osm_common/msglocal.py rename lcm/{ => osm_lcm}/ROclient.py (100%) create mode 100644 lcm/osm_lcm/lcm.cfg create mode 100644 lcm/osm_lcm/lcm.py diff --git a/lcm/lcm.py b/lcm/lcm.py deleted file mode 100644 index 99581428..00000000 --- a/lcm/lcm.py +++ /dev/null @@ -1,380 +0,0 @@ -#!/usr/bin/python3 -# -*- coding: utf-8 -*- - -import asyncio -import aiohttp -import yaml -import ROclient -import time -import dbmemory -import logging - -from copy import deepcopy -from uuid import uuid4 - -#streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s" -streamformat = "%(name)s %(levelname)s: %(message)s" -logging.basicConfig(format=streamformat, level=logging.DEBUG) -logger = logging.getLogger('lcm') - -ro_account = { - "url": "http://localhost:9090/openmano", - "tenant": "osm" -} - -vca_account = { - # TODO -} - -# conains created tasks/futures to be able to cancel -lcm_tasks = {} - -headers_req = {'Accept': 'application/yaml', 'content-type': 'application/yaml'} -ns_status = ("CREATION-SCHEDULED", "DEPLOYING", "CONFIGURING", "DELETION-SCHEDULED", "UN-CONFIGURING", "UNDEPLOYING") - -# TODO replace with database calls -db = dbmemory.dbmemory() - - - -async def CreateNS(loop, nsr_id): - logger.debug("CreateNS task nsr_id={} Enter".format(nsr_id)) - nsr_lcm = { - "id": nsr_id, - "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"}, - "nsr_ip": {}, - "VCA": {"TODO"}, - "status": "BUILD", - "status_detailed": "", - } - - deloyment_timeout = 120 - try: - ns_request = db.get_one("ns_request", {"id": nsr_id}) - nsd = db.get_one("nsd", {"id": ns_request["nsd_id"]}) - RO = ROclient.ROClient(loop, endpoint_url=ro_account["url"], tenant=ro_account["tenant"], - datacenter=ns_request["vim"]) - nsr_lcm["status_detailed"] = "Creating vnfd at RO" - # ns_request["constituent-vnfr-ref"] = [] - - db.create("nsr_lcm", nsr_lcm) - - # get vnfds, instantiate at RO - logger.debug("CreateNS task nsr_id={} RO VNFD".format(nsr_id)) - for c_vnf in nsd["constituent-vnfd"]: - vnfd_id = c_vnf["vnfd-id-ref"] - vnfd = db.get_one("vnfd", {"id": vnfd_id}) - vnfd.pop("_admin", None) - vnfd.pop("_id", None) - # vnfr = deepcopy(vnfd) - # vnfr["member-vnf-index"] = c_vnf["member-vnf-index"] - # vnfr["nsr-id"] = nsr_id - # vnfr["id"] = uuid4() - # vnfr["vnf-id"] = vnfd["id"] - # ns_request["constituent-vnfr-ref"],append(vnfd_id) - - # TODO change id for RO in case it is present - try: - desc = await RO.create("vnfd", descriptor=vnfd) - nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"] - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 409: # conflict, vnfd already present - print("debug", e) - else: - raise - - # db_new("vnfr", vnfr) - # db_update("ns_request", nsr_id, ns_request) - - # create nsd at RO - logger.debug("CreateNS task nsr_id={} RO NSD".format(nsr_id)) - nsr_lcm["status_detailed"] = "Creating nsd at RO" - nsd_id = ns_request["nsd_id"] - nsd = db.get_one("nsd", {"id": nsd_id}) - nsd.pop("_admin", None) - nsd.pop("_id", None) - try: - desc = await RO.create("nsd", descriptor=nsd) - nsr_lcm["RO"]["nsd_id"] = desc["uuid"] - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 409: # conflict, nsd already present - print("debug", e) - else: - raise - - # Crate ns at RO - logger.debug("CreateNS task nsr_id={} RO NS".format(nsr_id)) - nsr_lcm["status_detailed"] = "Creating ns at RO" - desc = await RO.create("ns", name=ns_request["name"], datacenter=ns_request["vim"], scenario=nsr_lcm["RO"]["nsd_id"]) - RO_nsr_id = desc["uuid"] - nsr_lcm["RO"]["nsr_id"] = RO_nsr_id - nsr_lcm["RO"]["nsr_status"] = "BUILD" - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - - # wait until NS is ready - deloyment_timeout = 600 - while deloyment_timeout > 0: - ns_status_detailed = "Waiting ns ready at RO" - nsr_lcm["status_detailed"] = ns_status_detailed - desc = await RO.show("ns", RO_nsr_id) - ns_status, ns_status_info = RO.check_ns_status(desc) - nsr_lcm["RO"]["nsr_status"] = ns_status - if ns_status == "ERROR": - raise ROclient.ROClientException(ns_status_info) - elif ns_status == "BUILD": - nsr_lcm["status_detailed"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info) - elif ns_status == "ACTIVE": - nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc) - break - else: - assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status) - - await asyncio.sleep(5, loop=loop) - deloyment_timeout -= 5 - if deloyment_timeout <= 0: - raise ROclient.ROClientException("Timeot wating ns to be ready") - nsr_lcm["status_detailed"] = "Configuring vnfr" - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - - #for nsd in nsr_lcm["descriptors"]["nsd"]: - - logger.debug("CreateNS task nsr_id={} VCA look for".format(nsr_id)) - for c_vnf in nsd["constituent-vnfd"]: - vnfd_id = c_vnf["vnfd-id-ref"] - vnfd_index = int(c_vnf["member-vnf-index"]) - vnfd = db.get_one("vnfd", {"id": vnfd_id}) - if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"): - proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"] - config_primitive = vnfd["vnf-configuration"].get("config-primitive") - # get parameters for juju charm - base_folder = vnfd["_admin"]["storage"] - path = base_folder + "/charms/" + proxy_charm - mgmt_ip = nsr_lcm['nsr_ip'][vnfd_index] - # TODO launch VCA charm - # task = asyncio.ensure_future(DeployCharm(loop, path, mgmt_ip, config_primitive)) - nsr_lcm["status"] = "DONE" - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - - return nsr_lcm - - except (ROclient.ROClientException, Exception) as e: - logger.debug("CreateNS nsr_id={} Exception {}".format(nsr_id, e), exc_info=True) - nsr_lcm["status"] = "ERROR" - nsr_lcm["status_detailed"] += ": ERROR {}".format(e) - finally: - logger.debug("CreateNS task nsr_id={} Exit".format(nsr_id)) - - -async def DestroyNS(loop, nsr_id): - logger.debug("DestroyNS task nsr_id={} Enter".format(nsr_id)) - nsr_lcm = db.get_one("nsr_lcm", {"id": nsr_id}) - ns_request = db.get_one("ns_request", {"id": nsr_id}) - - nsr_lcm["status"] = "DELETING" - nsr_lcm["status_detailed"] = "Deleting charms" - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - # TODO destroy VCA charm - - # remove from RO - RO = ROclient.ROClient(loop, endpoint_url=ro_account["url"], tenant=ro_account["tenant"], - datacenter=ns_request["vim"]) - # Delete ns - try: - RO_nsr_id = nsr_lcm["RO"]["nsr_id"] - if RO_nsr_id: - nsr_lcm["status_detailed"] = "Deleting ns at RO" - desc = await RO.delete("ns", RO_nsr_id) - print("debug", "deleted RO ns {}".format(RO_nsr_id)) - nsr_lcm["RO"]["nsr_id"] = None - nsr_lcm["RO"]["nsr_status"] = "DELETED" - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 404: - nsr_lcm["RO"]["nsr_id"] = None - nsr_lcm["RO"]["nsr_status"] = "DELETED" - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - print("warning", e) - else: - print("error", e) - - # Delete nsd - try: - RO_nsd_id = nsr_lcm["RO"]["nsd_id"] - if RO_nsd_id: - nsr_lcm["status_detailed"] = "Deleting nsd at RO" - desc = await RO.delete("nsd", RO_nsd_id) - print("debug", "deleted RO nsd {}".format(RO_nsd_id)) - nsr_lcm["RO"]["nsd_id"] = None - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 404: - nsr_lcm["RO"]["nsd_id"] = None - print("warning", e) - else: - print("error", e) - - for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items(): - try: - if RO_vnfd_id: - nsr_lcm["status_detailed"] = "Deleting vnfd at RO" - desc = await RO.delete("vnfd", RO_vnfd_id) - print("debug", "deleted RO vnfd {}".format(RO_vnfd_id)) - nsr_lcm["RO"]["vnfd_id"][vnf_id] = None - db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 404: - nsr_lcm["RO"]["vnfd_id"][vnf_id] = None - print("warning", e) - else: - print("error", e) - logger.debug("DestroyNS task nsr_id={} Exit".format(nsr_id)) - - -async def test(loop, param=None): - logger.debug("Starting/Ending test task: {}".format(param)) - - -def cancel_tasks(loop, nsr_id): - """ - Cancel all active tasks of a concrete nsr identified for nsr_id - :param loop: loop - :param nsr_id: nsr identity - :return: None, or raises an exception if not possible - """ - global lcm_tasks - if not lcm_tasks.get(nsr_id): - return - for order_id, tasks_set in lcm_tasks[nsr_id].items(): - for task_name, task in tasks_set.items(): - result = task.cancel() - if result: - logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name)) - lcm_tasks[nsr_id] = {} - - - -async def read_kafka(loop, bus_info): - global lcm_tasks - logger.debug("kafka task Enter") - order_id = 1 - # future = asyncio.Future() - with open(bus_info["file"]) as f: - - # ignore old orders. Read file - command = "fake" - while command: - command = f.read() - - while True: - command = f.read() - if not command: - await asyncio.sleep(2, loop=loop) - continue - order_id += 1 - command = command.strip() - command, _, params = command.partition(" ") - if command == "exit": - print("Bye!") - break - elif command.startswith("#"): - continue - elif command == "echo": - print(params) - elif command == "test": - asyncio.Task(test(loop, params), loop=loop) - elif command == "break": - print("put a break in this line of code") - elif command == "new-ns": - nsr_id = params.strip() - logger.debug("Deploying NS {}".format(nsr_id)) - task = asyncio.ensure_future(CreateNS(loop, nsr_id)) - if nsr_id not in lcm_tasks: - lcm_tasks[nsr_id] = {} - lcm_tasks[nsr_id][order_id] = {"CreateNS": task} - elif command == "del-ns": - nsr_id = params.strip() - logger.debug("Deleting NS {}".format(nsr_id)) - cancel_tasks(loop, nsr_id) - task = asyncio.ensure_future(DestroyNS(loop, nsr_id)) - if nsr_id not in lcm_tasks: - lcm_tasks[nsr_id] = {} - lcm_tasks[nsr_id][order_id] = {"DestroyNS": task} - elif command == "get-ns": - nsr_id = params.strip() - nsr_lcm = db.get_one("nsr_lcm", {"id": nsr_id}) - print("nsr_lcm", nsr_lcm) - print("lcm_tasks", lcm_tasks.get(nsr_id)) - else: - logger.debug("unknown command '{}'".format(command)) - print("Usage:\n echo <>\n new-ns \n del-ns \n get-ns ") - logger.debug("kafka task Exit") - - -def lcm(): - loop = asyncio.get_event_loop() - loop.run_until_complete(read_kafka(loop, {"file": "/home/atierno/OSM/osm/NBI/kafka"})) - return - - -def lcm2(): - loop = asyncio.get_event_loop() - # asyncio.ensure_future(CreateNS, loop) - try: - content = loop.run_until_complete(CreateNS(loop, "ns1")) - print("Done: {}".format(content)) - except ROclient.ROClientException as e: - print("Error {}".format(e)) - - time.sleep(10) - - content = loop.run_until_complete(DestroyNS(loop, "ns1")) - print(content) - - loop.close() - - -if __name__ == '__main__': - - # FOR TEST - RO_VIM = "OST2_MRT" - - #FILL DATABASE - with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f: - vnfd = yaml.load(f) - vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) - vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"} - db.create("vnfd", vnfd_clean) - with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f: - vnfd = yaml.load(f) - vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) - vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"} - db.create("vnfd", vnfd_clean) - with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f: - nsd = yaml.load(f) - nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) - nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"} - db.create("nsd", nsd_clean) - - ns_request = { - "id": "ns1", - "nsr_id": "ns1", - "name": "pingpongOne", - "vim": RO_VIM, - "nsd_id": nsd_clean["id"], # nsd_ping_pong - } - db.create("ns_request", ns_request) - ns_request = { - "id": "ns2", - "nsr_id": "ns2", - "name": "pingpongTwo", - "vim": RO_VIM, - "nsd_id": nsd_clean["id"], # nsd_ping_pong - } - db.create("ns_request", ns_request) - # lcm2() - lcm() - - - diff --git a/lcm/osm_common/__init__.py b/lcm/osm_common/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lcm/dbbase.py b/lcm/osm_common/dbbase.py similarity index 88% rename from lcm/dbbase.py rename to lcm/osm_common/dbbase.py index 55041b68..a2768ae0 100644 --- a/lcm/dbbase.py +++ b/lcm/osm_common/dbbase.py @@ -3,9 +3,11 @@ class DbException(Exception): def __init__(self, message, http_code=404): + # TODO change to http.HTTPStatus instead of int that allows .value and .name self.http_code = http_code Exception.__init__(self, message) + class dbbase(object): def __init__(self): @@ -31,5 +33,3 @@ class dbbase(object): def del_one(self, table, filter={}, fail_on_empty=True): pass - - diff --git a/lcm/dbmemory.py b/lcm/osm_common/dbmemory.py similarity index 100% rename from lcm/dbmemory.py rename to lcm/osm_common/dbmemory.py diff --git a/lcm/osm_common/dbmongo.py b/lcm/osm_common/dbmongo.py new file mode 100644 index 00000000..38454b3b --- /dev/null +++ b/lcm/osm_common/dbmongo.py @@ -0,0 +1,153 @@ +#import pymongo +from pymongo import MongoClient +from dbbase import DbException, dbbase +from http import HTTPStatus + +class dbmongo(dbbase): + + def __init__(self): + pass + + def db_connect(self, config): + try: + self.client = MongoClient(config["host"], config["port"]) + self.db = self.client[config["name"]] + # get data to try a connection + self.db.users.find_one({"username": "admin"}) + except Exception as e: # TODO refine + raise DbException(str(e)) + + def db_disconnect(self): + pass # TODO + + @staticmethod + def _format_filter(filter): + try: + db_filter = {} + for query_k, query_v in filter.items(): + dot_index = query_k.rfind(".") + if dot_index > 1 and query_k[dot_index+1:] in ("eq", "ne", "gt", "gte", "lt", "lte", "cont", + "ncont", "neq"): + operator = "$" + query_k[dot_index+1:] + if operator == "$neq": + operator = "$nq" + k = query_k[:dot_index] + else: + operator = "$eq" + k = query_k + + v = query_v + if isinstance(v, list): + if operator in ("$eq", "$cont"): + operator = "$in" + v = query_v + elif operator in ("$ne", "$ncont"): + operator = "$nin" + v = query_v + else: + v = query_v.join(",") + + if operator in ("$eq", "$cont"): + # v cannot be a comma separated list, because operator would have been changed to $in + db_filter[k] = v + elif operator == "$ncount": + # v cannot be a comma separated list, because operator would have been changed to $nin + db_filter[k] = {"$ne": v} + else: + # maybe db_filter[k] exist. e.g. in the query string for values between 5 and 8: "a.gt=5&a.lt=8" + if k not in db_filter: + db_filter[k] = {} + db_filter[k][operator] = v + + return db_filter + except Exception as e: + raise DbException("Invalid query string filter at {}:{}. Error: {}".format(query_k, v, e), + http_code=HTTPStatus.BAD_REQUEST.value) + + + def get_list(self, table, filter={}): + try: + l = [] + collection = self.db[table] + rows = collection.find(self._format_filter(filter)) + for row in rows: + l.append(row) + return l + except DbException: + raise + except Exception as e: # TODO refine + raise DbException(str(e)) + + def get_one(self, table, filter={}, fail_on_empty=True, fail_on_more=True): + try: + if filter: + filter = self._format_filter(filter) + collection = self.db[table] + if not (fail_on_empty and fail_on_more): + return collection.find_one(filter) + rows = collection.find(filter) + if rows.count() == 0: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return None + elif rows.count() > 1: + if fail_on_more: + raise DbException("Found more than one entry with filter='{}'".format(filter), + HTTPStatus.CONFLICT.value) + return rows[0] + except Exception as e: # TODO refine + raise DbException(str(e)) + + def del_list(self, table, filter={}): + try: + collection = self.db[table] + rows = collection.delete_many(self._format_filter(filter)) + return {"deleted": rows.deleted_count} + except DbException: + raise + except Exception as e: # TODO refine + raise DbException(str(e)) + + def del_one(self, table, filter={}, fail_on_empty=True): + try: + collection = self.db[table] + rows = collection.delete_one(self._format_filter(filter)) + if rows.deleted_count == 0: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return None + return {"deleted": rows.deleted_count} + except Exception as e: # TODO refine + raise DbException(str(e)) + + def create(self, table, indata): + try: + collection = self.db[table] + data = collection.insert_one(indata) + return data.inserted_id + except Exception as e: # TODO refine + raise DbException(str(e)) + + def set_one(self, table, filter, update_dict, fail_on_empty=True): + try: + collection = self.db[table] + rows = collection.update_one(self._format_filter(filter), {"$set": update_dict}) + if rows.updated_count == 0: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return None + return {"deleted": rows.deleted_count} + except Exception as e: # TODO refine + raise DbException(str(e)) + + def replace(self, table, id, indata, fail_on_empty=True): + try: + collection = self.db[table] + rows = collection.replace_one({"_id": id}, indata) + if rows.modified_count == 0: + if fail_on_empty: + raise DbException("Not found entry with filter='{}'".format(filter), HTTPStatus.NOT_FOUND.value) + return None + return {"replace": rows.modified_count} + except Exception as e: # TODO refine + raise DbException(str(e)) diff --git a/lcm/osm_common/fsbase.py b/lcm/osm_common/fsbase.py new file mode 100644 index 00000000..85562bdd --- /dev/null +++ b/lcm/osm_common/fsbase.py @@ -0,0 +1,40 @@ + +class FsException(Exception): + + def __init__(self, message, http_code=404): + self.http_code = http_code + Exception.__init__(self, message) + + +class FsBase(object): + + def __init__(self): + pass + + def get_params(self): + return {} + + def fs_connect(self, config): + pass + + def fs_disconnect(self): + pass + + def mkdir(self, folder): + pass + + def file_exists(self, storage): + pass + + def file_size(self, storage): + pass + + def file_extract(self, tar_object, path): + pass + + def file_open(self, storage, mode): + pass + + def file_delete(self, storage, ignore_non_exist=False): + pass + diff --git a/lcm/osm_common/fslocal.py b/lcm/osm_common/fslocal.py new file mode 100644 index 00000000..b88475f5 --- /dev/null +++ b/lcm/osm_common/fslocal.py @@ -0,0 +1,108 @@ +import os +import tarfile +from http import HTTPStatus +from shutil import rmtree +from fsbase import FsBase, FsException + + +class FsLocal(FsBase): + + def __init__(self): + self.path = None + + def get_params(self): + return {"fs": "local", "path": self.path} + + def fs_connect(self, config): + try: + self.path = config["path"] + if not self.path.endswith("/"): + self.path += "/" + if not os.path.exists(self.path): + raise FsException("Invalid configuration param at '[storage]': path '{}' does not exist".format( + config["path"])) + except FsException: + raise + except Exception as e: # TODO refine + raise FsException(str(e)) + + def fs_disconnect(self): + pass # TODO + + def mkdir(self, folder): + """ + Creates a folder or parent object location + :param folder: + :return: None or raises and exception + """ + try: + os.mkdir(self.path + folder) + except Exception as e: + raise FsException(str(e), http_code=HTTPStatus.INTERNAL_SERVER_ERROR.value) + + def file_exists(self, storage): + """ + Indicates if "storage" file exist + :param storage: can be a str or a str list + :return: True, False + """ + if isinstance(storage, str): + f = storage + else: + f = "/".join(storage) + return os.path.exists(self.path + f) + + def file_size(self, storage): + """ + return file size + :param storage: can be a str or a str list + :return: file size + """ + if isinstance(storage, str): + f = storage + else: + f = "/".join(storage) + return os.path.getsize(self.path + f) + + def file_extract(self, tar_object, path): + """ + extract a tar file + :param tar_object: object of type tar + :param path: can be a str or a str list, or a tar object where to extract the tar_object + :return: None + """ + if isinstance(path, str): + f = self.path + path + else: + f = self.path + "/".join(path) + tar_object.extractall(path=f) + + def file_open(self, storage, mode): + """ + Open a file + :param storage: can be a str or list of str + :param mode: file mode + :return: file object + """ + if isinstance(storage, str): + f = storage + else: + f = "/".join(storage) + return open(self.path + f, mode) + + def file_delete(self, storage, ignore_non_exist=False): + """ + Delete storage content recursivelly + :param storage: can be a str or list of str + :param ignore_non_exist: not raise exception if storage does not exist + :return: None + """ + + if isinstance(storage, str): + f = self.path + storage + else: + f = self.path + "/".join(storage) + if os.path.exists(f): + rmtree(f) + elif not ignore_non_exist: + raise FsException("File {} does not exist".format(storage), http_code=HTTPStatus.BAD_REQUEST.value) diff --git a/lcm/osm_common/msgbase.py b/lcm/osm_common/msgbase.py new file mode 100644 index 00000000..745df7f6 --- /dev/null +++ b/lcm/osm_common/msgbase.py @@ -0,0 +1,39 @@ + +from http import HTTPStatus + + +class MsgException(Exception): + """ + Base Exception class for all msgXXXX exceptions + """ + + def __init__(self, message, http_code=HTTPStatus.INTERNAL_SERVER_ERROR): + """ + General exception + :param message: descriptive text + :param http_code: type. It contains ".value" (http error code) and ".name" (http error name + """ + self.http_code = http_code + Exception.__init__(self, message) + + +class MsgBase(object): + """ + Base class for all msgXXXX classes + """ + + def __init__(self): + pass + + def connect(self, config): + pass + + def write(self, msg): + pass + + def read(self): + pass + + def disconnect(self): + pass + diff --git a/lcm/osm_common/msglocal.py b/lcm/osm_common/msglocal.py new file mode 100644 index 00000000..5045181f --- /dev/null +++ b/lcm/osm_common/msglocal.py @@ -0,0 +1,78 @@ +import os +import yaml +import asyncio +from msgbase import MsgBase, MsgException + + +class msgLocal(MsgBase): + + def __init__(self): + self.path = None + # create a different file for each topic + self.files = {} + + def connect(self, config): + try: + self.path = config["path"] + if not self.path.endswith("/"): + self.path += "/" + if not os.path.exists(self.path): + os.mkdir(self.path) + except MsgException: + raise + except Exception as e: # TODO refine + raise MsgException(str(e)) + + def disconnect(self): + for f in self.files.values(): + try: + f.close() + except Exception as e: # TODO refine + pass + + def write(self, topic, key, msg): + """ + Insert a message into topic + :param topic: topic + :param key: key text to be inserted + :param msg: value object to be inserted + :return: None or raises and exception + """ + try: + if topic not in self.files: + self.files[topic] = open(self.path + topic, "w+") + yaml.safe_dump({key: msg}, self.files[topic], default_flow_style=True) + self.files[topic].flush() + except Exception as e: # TODO refine + raise MsgException(str(e)) + + def read(self, topic): + try: + if topic not in self.files: + self.files[topic] = open(self.path + topic, "r+") + msg = self.files[topic].read() + msg_dict = yaml.load(msg) + assert len(msg_dict) == 1 + for k, v in msg_dict.items(): + return k, v + except Exception as e: # TODO refine + raise MsgException(str(e)) + + async def aioread(self, loop, topic): + try: + if topic not in self.files: + self.files[topic] = open(self.path + topic, "r+") + # ignore previous content + while self.files[topic].read(): + pass + while True: + msg = self.files[topic].read() + if msg: + break + await asyncio.sleep(2, loop=loop) + msg_dict = yaml.load(msg) + assert len(msg_dict) == 1 + for k, v in msg_dict.items(): + return k, v + except Exception as e: # TODO refine + raise MsgException(str(e)) diff --git a/lcm/ROclient.py b/lcm/osm_lcm/ROclient.py similarity index 100% rename from lcm/ROclient.py rename to lcm/osm_lcm/ROclient.py diff --git a/lcm/osm_lcm/lcm.cfg b/lcm/osm_lcm/lcm.cfg new file mode 100644 index 00000000..c93323f5 --- /dev/null +++ b/lcm/osm_lcm/lcm.cfg @@ -0,0 +1,45 @@ + +# TODO currently is a pure yaml format. Transform it to [ini] style with yaml inside to be coherent with other modules + +#[global] +global: + log_file: "" + log_level: DEBUG + +#[RO] +RO: + #host: ro # hostname or IP + host: localhost + port: 9090 + tenant: osm + +#[VCA] +VCA: + host: vca + port: 17070 + user: admin + secret: secret + +#[database] +database: + #driver: mongo # mongo or memory + driver: memory + host: mongo # hostname or IP + port: 27017 + name: osm + user: user + password: password + +#[storage] +storage: + driver: local # local filesystem + # for local provide file path + #path: /app/storage + path: /home/atierno/OSM/osm/RO/lcm/local/storage + +#[message] +message: + driver: local # local or kafka + # for local provide file path + #path: /app/storage/kafka + path: /home/atierno/OSM/osm/RO/lcm/local/kafka diff --git a/lcm/osm_lcm/lcm.py b/lcm/osm_lcm/lcm.py new file mode 100644 index 00000000..f35ec606 --- /dev/null +++ b/lcm/osm_lcm/lcm.py @@ -0,0 +1,403 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +import asyncio +import yaml +import ROclient +import dbmemory +import dbmongo +import fslocal +import msglocal +from dbbase import DbException +from fsbase import FsException +from msgbase import MsgException +import logging + +#streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s" +streamformat = "%(name)s %(levelname)s: %(message)s" +logging.basicConfig(format=streamformat, level=logging.DEBUG) + + +class LcmException(Exception): + pass + + +class Lcm: + + def __init__(self, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + # contains created tasks/futures to be able to cancel + self.lcm_tasks = {} + + self.config = config + # logging + self.logger = logging.getLogger('lcm') + self.config = config + self.ro_url = "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]) + self.ro_tenant = config["RO"]["tenant"] + self.vca = config["VCA"] # TODO VCA + self.loop = None + try: + if config["database"]["driver"] == "mongo": + self.db = dbmongo.dbmongo() + self.db.db_connect(config["database"]) + elif config["database"]["driver"] == "memory": + self.db = dbmemory.dbmemory() + self.db.db_connect(config["database"]) + else: + raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format( + config["database"]["driver"])) + + if config["storage"]["driver"] == "local": + self.fs = fslocal.FsLocal() + self.fs.fs_connect(config["storage"]) + else: + raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format( + config["storage"]["driver"])) + + if config["message"]["driver"] == "local": + self.msg = msglocal.msgLocal() + self.msg.connect(config["message"]) + else: + raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format( + config["storage"]["driver"])) + except (DbException, FsException, MsgException) as e: + self.self.logger.critical(str(e), exc_info=True) + raise LcmException(str(e)) + + async def create_ns(self, nsr_id): + self.logger.debug("create_ns task nsr_id={} Enter".format(nsr_id)) + nsr_lcm = { + "id": nsr_id, + "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"}, + "nsr_ip": {}, + "VCA": {"TODO"}, + "status": "BUILD", + "status_detailed": "", + } + + deloyment_timeout = 120 + try: + ns_request = self.db.get_one("ns_request", {"id": nsr_id}) + nsd = self.db.get_one("nsd", {"id": ns_request["nsd_id"]}) + RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant, + datacenter=ns_request["vim"]) + nsr_lcm["status_detailed"] = "Creating vnfd at RO" + # ns_request["constituent-vnfr-ref"] = [] + + self.db.create("nsr_lcm", nsr_lcm) + + # get vnfds, instantiate at RO + self.logger.debug("create_ns task nsr_id={} RO VNFD".format(nsr_id)) + for c_vnf in nsd["constituent-vnfd"]: + vnfd_id = c_vnf["vnfd-id-ref"] + vnfd = self.db.get_one("vnfd", {"id": vnfd_id}) + vnfd.pop("_admin", None) + vnfd.pop("_id", None) + # vnfr = deepcopy(vnfd) + # vnfr["member-vnf-index"] = c_vnf["member-vnf-index"] + # vnfr["nsr-id"] = nsr_id + # vnfr["id"] = uuid4() + # vnfr["vnf-id"] = vnfd["id"] + # ns_request["constituent-vnfr-ref"],append(vnfd_id) + + # TODO change id for RO in case it is present + try: + desc = await RO.create("vnfd", descriptor=vnfd) + nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"] + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 409: # conflict, vnfd already present + print("debug", e) + else: + raise + + # db_new("vnfr", vnfr) + # db_update("ns_request", nsr_id, ns_request) + + # create nsd at RO + self.logger.debug("create_ns task nsr_id={} RO NSD".format(nsr_id)) + nsr_lcm["status_detailed"] = "Creating nsd at RO" + nsd_id = ns_request["nsd_id"] + nsd = self.db.get_one("nsd", {"id": nsd_id}) + nsd.pop("_admin", None) + nsd.pop("_id", None) + try: + desc = await RO.create("nsd", descriptor=nsd) + nsr_lcm["RO"]["nsd_id"] = desc["uuid"] + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 409: # conflict, nsd already present + print("debug", e) + else: + raise + + # Crate ns at RO + self.logger.debug("create_ns task nsr_id={} RO NS".format(nsr_id)) + nsr_lcm["status_detailed"] = "Creating ns at RO" + desc = await RO.create("ns", name=ns_request["name"], datacenter=ns_request["vim"], scenario=nsr_lcm["RO"]["nsd_id"]) + RO_nsr_id = desc["uuid"] + nsr_lcm["RO"]["nsr_id"] = RO_nsr_id + nsr_lcm["RO"]["nsr_status"] = "BUILD" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + + # wait until NS is ready + deloyment_timeout = 600 + while deloyment_timeout > 0: + ns_status_detailed = "Waiting ns ready at RO" + nsr_lcm["status_detailed"] = ns_status_detailed + desc = await RO.show("ns", RO_nsr_id) + ns_status, ns_status_info = RO.check_ns_status(desc) + nsr_lcm["RO"]["nsr_status"] = ns_status + if ns_status == "ERROR": + raise ROclient.ROClientException(ns_status_info) + elif ns_status == "BUILD": + nsr_lcm["status_detailed"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info) + elif ns_status == "ACTIVE": + nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc) + break + else: + assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status) + + await asyncio.sleep(5, loop=self.loop) + deloyment_timeout -= 5 + if deloyment_timeout <= 0: + raise ROclient.ROClientException("Timeot wating ns to be ready") + nsr_lcm["status_detailed"] = "Configuring vnfr" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + + #for nsd in nsr_lcm["descriptors"]["nsd"]: + + self.logger.debug("create_ns task nsr_id={} VCA look for".format(nsr_id)) + for c_vnf in nsd["constituent-vnfd"]: + vnfd_id = c_vnf["vnfd-id-ref"] + vnfd_index = int(c_vnf["member-vnf-index"]) + vnfd = self.db.get_one("vnfd", {"id": vnfd_id}) + if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"): + proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"] + config_primitive = vnfd["vnf-configuration"].get("config-primitive") + # get parameters for juju charm + base_folder = vnfd["_admin"]["storage"] + path = base_folder + "/charms/" + proxy_charm + mgmt_ip = nsr_lcm['nsr_ip'][vnfd_index] + # TODO launch VCA charm + # task = asyncio.ensure_future(DeployCharm(self.loop, path, mgmt_ip, config_primitive)) + nsr_lcm["status"] = "DONE" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + + return nsr_lcm + + except (ROclient.ROClientException, Exception) as e: + self.logger.debug("create_ns nsr_id={} Exception {}".format(nsr_id, e), exc_info=True) + nsr_lcm["status"] = "ERROR" + nsr_lcm["status_detailed"] += ": ERROR {}".format(e) + finally: + self.logger.debug("create_ns task nsr_id={} Exit".format(nsr_id)) + + + async def delete_ns(self, nsr_id): + self.logger.debug("delete_ns task nsr_id={} Enter".format(nsr_id)) + nsr_lcm = self.db.get_one("nsr_lcm", {"id": nsr_id}) + ns_request = self.db.get_one("ns_request", {"id": nsr_id}) + + nsr_lcm["status"] = "DELETING" + nsr_lcm["status_detailed"] = "Deleting charms" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + # TODO destroy VCA charm + + # remove from RO + RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant, + datacenter=ns_request["vim"]) + # Delete ns + try: + RO_nsr_id = nsr_lcm["RO"]["nsr_id"] + if RO_nsr_id: + nsr_lcm["status_detailed"] = "Deleting ns at RO" + desc = await RO.delete("ns", RO_nsr_id) + print("debug", "deleted RO ns {}".format(RO_nsr_id)) + nsr_lcm["RO"]["nsr_id"] = None + nsr_lcm["RO"]["nsr_status"] = "DELETED" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 404: + nsr_lcm["RO"]["nsr_id"] = None + nsr_lcm["RO"]["nsr_status"] = "DELETED" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + print("warning", e) + else: + print("error", e) + + # Delete nsd + try: + RO_nsd_id = nsr_lcm["RO"]["nsd_id"] + if RO_nsd_id: + nsr_lcm["status_detailed"] = "Deleting nsd at RO" + desc = await RO.delete("nsd", RO_nsd_id) + print("debug", "deleted RO nsd {}".format(RO_nsd_id)) + nsr_lcm["RO"]["nsd_id"] = None + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 404: + nsr_lcm["RO"]["nsd_id"] = None + print("warning", e) + else: + print("error", e) + + for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items(): + try: + if RO_vnfd_id: + nsr_lcm["status_detailed"] = "Deleting vnfd at RO" + desc = await RO.delete("vnfd", RO_vnfd_id) + print("debug", "deleted RO vnfd {}".format(RO_vnfd_id)) + nsr_lcm["RO"]["vnfd_id"][vnf_id] = None + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 404: + nsr_lcm["RO"]["vnfd_id"][vnf_id] = None + print("warning", e) + else: + print("error", e) + self.logger.debug("delete_ns task nsr_id={} Exit".format(nsr_id)) + + + async def test(self, param=None): + self.logger.debug("Starting/Ending test task: {}".format(param)) + + + def cancel_tasks(self, nsr_id): + """ + Cancel all active tasks of a concrete nsr identified for nsr_id + :param nsr_id: nsr identity + :return: None, or raises an exception if not possible + """ + if not self.lcm_tasks.get(nsr_id): + return + for order_id, tasks_set in self.lcm_tasks[nsr_id].items(): + for task_name, task in tasks_set.items(): + result = task.cancel() + if result: + self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name)) + self.lcm_tasks[nsr_id] = {} + + + + async def read_kafka(self): + self.logger.debug("kafka task Enter") + order_id = 1 + # future = asyncio.Future() + + while True: + command, params = await self.msg.aioread(self.loop, "ns") + order_id += 1 + if command == "exit": + print("Bye!") + break + elif command.startswith("#"): + continue + elif command == "echo": + print(params) + elif command == "test": + asyncio.Task(self.test(params), loop=self.loop) + elif command == "break": + print("put a break in this line of code") + elif command == "create": + nsr_id = params.strip() + self.logger.debug("Deploying NS {}".format(nsr_id)) + task = asyncio.ensure_future(self.create_ns(nsr_id)) + if nsr_id not in self.lcm_tasks: + self.lcm_tasks[nsr_id] = {} + self.lcm_tasks[nsr_id][order_id] = {"create_ns": task} + elif command == "delete": + nsr_id = params.strip() + self.logger.debug("Deleting NS {}".format(nsr_id)) + self.cancel_tasks(nsr_id) + task = asyncio.ensure_future(self.delete_ns(nsr_id)) + if nsr_id not in self.lcm_tasks: + self.lcm_tasks[nsr_id] = {} + self.lcm_tasks[nsr_id][order_id] = {"delete_ns": task} + elif command == "show": + nsr_id = params.strip() + nsr_lcm = self.db.get_one("nsr_lcm", {"id": nsr_id}) + print("nsr_lcm", nsr_lcm) + print("self.lcm_tasks", self.lcm_tasks.get(nsr_id)) + else: + self.logger.debug("unknown command '{}'".format(command)) + print("Usage:\n echo: <>\n create: \n delete: \n show: ") + self.logger.debug("kafka task Exit") + + + def start(self): + self.loop = asyncio.get_event_loop() + self.loop.run_until_complete(self.read_kafka()) + self.loop.close() + self.loop = None + + +def read_config_file(config_file): + # TODO make a [ini] + yaml inside parser + # the configparser library is not suitable, because it does not admit comments at the end of line, + # and not parse integer or boolean + try: + with open(config_file) as f: + conf = yaml.load(f) + # TODO insert envioronment + # for k, v in environ.items(): + # if k.startswith("OSMLCM_"): + # split _ lower add to config + return conf + except Exception as e: + self.logger.critical("At config file '{}': {}".format(config_file, e)) + + + +if __name__ == '__main__': + + config_file = "lcm.cfg" + conf = read_config_file(config_file) + lcm = Lcm(conf) + + # FOR TEST + RO_VIM = "OST2_MRT" + + #FILL DATABASE + with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f: + vnfd = yaml.load(f) + vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"} + lcm.db.create("vnfd", vnfd_clean) + with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f: + vnfd = yaml.load(f) + vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"} + lcm.db.create("vnfd", vnfd_clean) + with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f: + nsd = yaml.load(f) + nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) + nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"} + lcm.db.create("nsd", nsd_clean) + + ns_request = { + "id": "ns1", + "nsr_id": "ns1", + "name": "pingpongOne", + "vim": RO_VIM, + "nsd_id": nsd_clean["id"], # nsd_ping_pong + } + lcm.db.create("ns_request", ns_request) + ns_request = { + "id": "ns2", + "nsr_id": "ns2", + "name": "pingpongTwo", + "vim": RO_VIM, + "nsd_id": nsd_clean["id"], # nsd_ping_pong + } + lcm.db.create("ns_request", ns_request) + + lcm.start() + + + -- 2.25.1