X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=lcm%2Fosm_lcm%2Flcm.py;fp=lcm%2Fosm_lcm%2Flcm.py;h=f35ec60631b009657ed8435dcee1508082a8bc6a;hb=0aef0dbb3c8b50426f31812e7f386dc9188823d2;hp=0000000000000000000000000000000000000000;hpb=204e39e475d59271fb240234c16169b2ccc6231f;p=osm%2FRO.git diff --git a/lcm/osm_lcm/lcm.py b/lcm/osm_lcm/lcm.py new file mode 100644 index 00000000..f35ec606 --- /dev/null +++ b/lcm/osm_lcm/lcm.py @@ -0,0 +1,403 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + +import asyncio +import yaml +import ROclient +import dbmemory +import dbmongo +import fslocal +import msglocal +from dbbase import DbException +from fsbase import FsException +from msgbase import MsgException +import logging + +#streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s" +streamformat = "%(name)s %(levelname)s: %(message)s" +logging.basicConfig(format=streamformat, level=logging.DEBUG) + + +class LcmException(Exception): + pass + + +class Lcm: + + def __init__(self, config): + """ + Init, Connect to database, filesystem storage, and messaging + :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', + :return: None + """ + # contains created tasks/futures to be able to cancel + self.lcm_tasks = {} + + self.config = config + # logging + self.logger = logging.getLogger('lcm') + self.config = config + self.ro_url = "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]) + self.ro_tenant = config["RO"]["tenant"] + self.vca = config["VCA"] # TODO VCA + self.loop = None + try: + if config["database"]["driver"] == "mongo": + self.db = dbmongo.dbmongo() + self.db.db_connect(config["database"]) + elif config["database"]["driver"] == "memory": + self.db = dbmemory.dbmemory() + self.db.db_connect(config["database"]) + else: + raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format( + config["database"]["driver"])) + + if config["storage"]["driver"] == "local": + self.fs = fslocal.FsLocal() + self.fs.fs_connect(config["storage"]) + else: + raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format( + config["storage"]["driver"])) + + if config["message"]["driver"] == "local": + self.msg = msglocal.msgLocal() + self.msg.connect(config["message"]) + else: + raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format( + config["storage"]["driver"])) + except (DbException, FsException, MsgException) as e: + self.self.logger.critical(str(e), exc_info=True) + raise LcmException(str(e)) + + async def create_ns(self, nsr_id): + self.logger.debug("create_ns task nsr_id={} Enter".format(nsr_id)) + nsr_lcm = { + "id": nsr_id, + "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"}, + "nsr_ip": {}, + "VCA": {"TODO"}, + "status": "BUILD", + "status_detailed": "", + } + + deloyment_timeout = 120 + try: + ns_request = self.db.get_one("ns_request", {"id": nsr_id}) + nsd = self.db.get_one("nsd", {"id": ns_request["nsd_id"]}) + RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant, + datacenter=ns_request["vim"]) + nsr_lcm["status_detailed"] = "Creating vnfd at RO" + # ns_request["constituent-vnfr-ref"] = [] + + self.db.create("nsr_lcm", nsr_lcm) + + # get vnfds, instantiate at RO + self.logger.debug("create_ns task nsr_id={} RO VNFD".format(nsr_id)) + for c_vnf in nsd["constituent-vnfd"]: + vnfd_id = c_vnf["vnfd-id-ref"] + vnfd = self.db.get_one("vnfd", {"id": vnfd_id}) + vnfd.pop("_admin", None) + vnfd.pop("_id", None) + # vnfr = deepcopy(vnfd) + # vnfr["member-vnf-index"] = c_vnf["member-vnf-index"] + # vnfr["nsr-id"] = nsr_id + # vnfr["id"] = uuid4() + # vnfr["vnf-id"] = vnfd["id"] + # ns_request["constituent-vnfr-ref"],append(vnfd_id) + + # TODO change id for RO in case it is present + try: + desc = await RO.create("vnfd", descriptor=vnfd) + nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"] + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 409: # conflict, vnfd already present + print("debug", e) + else: + raise + + # db_new("vnfr", vnfr) + # db_update("ns_request", nsr_id, ns_request) + + # create nsd at RO + self.logger.debug("create_ns task nsr_id={} RO NSD".format(nsr_id)) + nsr_lcm["status_detailed"] = "Creating nsd at RO" + nsd_id = ns_request["nsd_id"] + nsd = self.db.get_one("nsd", {"id": nsd_id}) + nsd.pop("_admin", None) + nsd.pop("_id", None) + try: + desc = await RO.create("nsd", descriptor=nsd) + nsr_lcm["RO"]["nsd_id"] = desc["uuid"] + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 409: # conflict, nsd already present + print("debug", e) + else: + raise + + # Crate ns at RO + self.logger.debug("create_ns task nsr_id={} RO NS".format(nsr_id)) + nsr_lcm["status_detailed"] = "Creating ns at RO" + desc = await RO.create("ns", name=ns_request["name"], datacenter=ns_request["vim"], scenario=nsr_lcm["RO"]["nsd_id"]) + RO_nsr_id = desc["uuid"] + nsr_lcm["RO"]["nsr_id"] = RO_nsr_id + nsr_lcm["RO"]["nsr_status"] = "BUILD" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + + # wait until NS is ready + deloyment_timeout = 600 + while deloyment_timeout > 0: + ns_status_detailed = "Waiting ns ready at RO" + nsr_lcm["status_detailed"] = ns_status_detailed + desc = await RO.show("ns", RO_nsr_id) + ns_status, ns_status_info = RO.check_ns_status(desc) + nsr_lcm["RO"]["nsr_status"] = ns_status + if ns_status == "ERROR": + raise ROclient.ROClientException(ns_status_info) + elif ns_status == "BUILD": + nsr_lcm["status_detailed"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info) + elif ns_status == "ACTIVE": + nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc) + break + else: + assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status) + + await asyncio.sleep(5, loop=self.loop) + deloyment_timeout -= 5 + if deloyment_timeout <= 0: + raise ROclient.ROClientException("Timeot wating ns to be ready") + nsr_lcm["status_detailed"] = "Configuring vnfr" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + + #for nsd in nsr_lcm["descriptors"]["nsd"]: + + self.logger.debug("create_ns task nsr_id={} VCA look for".format(nsr_id)) + for c_vnf in nsd["constituent-vnfd"]: + vnfd_id = c_vnf["vnfd-id-ref"] + vnfd_index = int(c_vnf["member-vnf-index"]) + vnfd = self.db.get_one("vnfd", {"id": vnfd_id}) + if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"): + proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"] + config_primitive = vnfd["vnf-configuration"].get("config-primitive") + # get parameters for juju charm + base_folder = vnfd["_admin"]["storage"] + path = base_folder + "/charms/" + proxy_charm + mgmt_ip = nsr_lcm['nsr_ip'][vnfd_index] + # TODO launch VCA charm + # task = asyncio.ensure_future(DeployCharm(self.loop, path, mgmt_ip, config_primitive)) + nsr_lcm["status"] = "DONE" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + + return nsr_lcm + + except (ROclient.ROClientException, Exception) as e: + self.logger.debug("create_ns nsr_id={} Exception {}".format(nsr_id, e), exc_info=True) + nsr_lcm["status"] = "ERROR" + nsr_lcm["status_detailed"] += ": ERROR {}".format(e) + finally: + self.logger.debug("create_ns task nsr_id={} Exit".format(nsr_id)) + + + async def delete_ns(self, nsr_id): + self.logger.debug("delete_ns task nsr_id={} Enter".format(nsr_id)) + nsr_lcm = self.db.get_one("nsr_lcm", {"id": nsr_id}) + ns_request = self.db.get_one("ns_request", {"id": nsr_id}) + + nsr_lcm["status"] = "DELETING" + nsr_lcm["status_detailed"] = "Deleting charms" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + # TODO destroy VCA charm + + # remove from RO + RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant, + datacenter=ns_request["vim"]) + # Delete ns + try: + RO_nsr_id = nsr_lcm["RO"]["nsr_id"] + if RO_nsr_id: + nsr_lcm["status_detailed"] = "Deleting ns at RO" + desc = await RO.delete("ns", RO_nsr_id) + print("debug", "deleted RO ns {}".format(RO_nsr_id)) + nsr_lcm["RO"]["nsr_id"] = None + nsr_lcm["RO"]["nsr_status"] = "DELETED" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 404: + nsr_lcm["RO"]["nsr_id"] = None + nsr_lcm["RO"]["nsr_status"] = "DELETED" + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + print("warning", e) + else: + print("error", e) + + # Delete nsd + try: + RO_nsd_id = nsr_lcm["RO"]["nsd_id"] + if RO_nsd_id: + nsr_lcm["status_detailed"] = "Deleting nsd at RO" + desc = await RO.delete("nsd", RO_nsd_id) + print("debug", "deleted RO nsd {}".format(RO_nsd_id)) + nsr_lcm["RO"]["nsd_id"] = None + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 404: + nsr_lcm["RO"]["nsd_id"] = None + print("warning", e) + else: + print("error", e) + + for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items(): + try: + if RO_vnfd_id: + nsr_lcm["status_detailed"] = "Deleting vnfd at RO" + desc = await RO.delete("vnfd", RO_vnfd_id) + print("debug", "deleted RO vnfd {}".format(RO_vnfd_id)) + nsr_lcm["RO"]["vnfd_id"][vnf_id] = None + self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + except ROclient.ROClientException as e: + if e.http_code == 404: + nsr_lcm["RO"]["vnfd_id"][vnf_id] = None + print("warning", e) + else: + print("error", e) + self.logger.debug("delete_ns task nsr_id={} Exit".format(nsr_id)) + + + async def test(self, param=None): + self.logger.debug("Starting/Ending test task: {}".format(param)) + + + def cancel_tasks(self, nsr_id): + """ + Cancel all active tasks of a concrete nsr identified for nsr_id + :param nsr_id: nsr identity + :return: None, or raises an exception if not possible + """ + if not self.lcm_tasks.get(nsr_id): + return + for order_id, tasks_set in self.lcm_tasks[nsr_id].items(): + for task_name, task in tasks_set.items(): + result = task.cancel() + if result: + self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name)) + self.lcm_tasks[nsr_id] = {} + + + + async def read_kafka(self): + self.logger.debug("kafka task Enter") + order_id = 1 + # future = asyncio.Future() + + while True: + command, params = await self.msg.aioread(self.loop, "ns") + order_id += 1 + if command == "exit": + print("Bye!") + break + elif command.startswith("#"): + continue + elif command == "echo": + print(params) + elif command == "test": + asyncio.Task(self.test(params), loop=self.loop) + elif command == "break": + print("put a break in this line of code") + elif command == "create": + nsr_id = params.strip() + self.logger.debug("Deploying NS {}".format(nsr_id)) + task = asyncio.ensure_future(self.create_ns(nsr_id)) + if nsr_id not in self.lcm_tasks: + self.lcm_tasks[nsr_id] = {} + self.lcm_tasks[nsr_id][order_id] = {"create_ns": task} + elif command == "delete": + nsr_id = params.strip() + self.logger.debug("Deleting NS {}".format(nsr_id)) + self.cancel_tasks(nsr_id) + task = asyncio.ensure_future(self.delete_ns(nsr_id)) + if nsr_id not in self.lcm_tasks: + self.lcm_tasks[nsr_id] = {} + self.lcm_tasks[nsr_id][order_id] = {"delete_ns": task} + elif command == "show": + nsr_id = params.strip() + nsr_lcm = self.db.get_one("nsr_lcm", {"id": nsr_id}) + print("nsr_lcm", nsr_lcm) + print("self.lcm_tasks", self.lcm_tasks.get(nsr_id)) + else: + self.logger.debug("unknown command '{}'".format(command)) + print("Usage:\n echo: <>\n create: \n delete: \n show: ") + self.logger.debug("kafka task Exit") + + + def start(self): + self.loop = asyncio.get_event_loop() + self.loop.run_until_complete(self.read_kafka()) + self.loop.close() + self.loop = None + + +def read_config_file(config_file): + # TODO make a [ini] + yaml inside parser + # the configparser library is not suitable, because it does not admit comments at the end of line, + # and not parse integer or boolean + try: + with open(config_file) as f: + conf = yaml.load(f) + # TODO insert envioronment + # for k, v in environ.items(): + # if k.startswith("OSMLCM_"): + # split _ lower add to config + return conf + except Exception as e: + self.logger.critical("At config file '{}': {}".format(config_file, e)) + + + +if __name__ == '__main__': + + config_file = "lcm.cfg" + conf = read_config_file(config_file) + lcm = Lcm(conf) + + # FOR TEST + RO_VIM = "OST2_MRT" + + #FILL DATABASE + with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f: + vnfd = yaml.load(f) + vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"} + lcm.db.create("vnfd", vnfd_clean) + with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f: + vnfd = yaml.load(f) + vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"} + lcm.db.create("vnfd", vnfd_clean) + with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f: + nsd = yaml.load(f) + nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) + nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"} + lcm.db.create("nsd", nsd_clean) + + ns_request = { + "id": "ns1", + "nsr_id": "ns1", + "name": "pingpongOne", + "vim": RO_VIM, + "nsd_id": nsd_clean["id"], # nsd_ping_pong + } + lcm.db.create("ns_request", ns_request) + ns_request = { + "id": "ns2", + "nsr_id": "ns2", + "name": "pingpongTwo", + "vim": RO_VIM, + "nsd_id": nsd_clean["id"], # nsd_ping_pong + } + lcm.db.create("ns_request", ns_request) + + lcm.start() + + +