X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=lcm%2Fosm_lcm%2Flcm.py;fp=lcm%2Fosm_lcm%2Flcm.py;h=06393e5f7d3aa3a463327796a4648e2d20981306;hb=f3c4dbc42e206bcc0d4d3369f6d0d156d7ffe669;hp=f35ec60631b009657ed8435dcee1508082a8bc6a;hpb=13fd64d81fed0ccbc67bcd2cbe9d63c8b06d20eb;p=osm%2FRO.git diff --git a/lcm/osm_lcm/lcm.py b/lcm/osm_lcm/lcm.py index f35ec606..06393e5f 100644 --- a/lcm/osm_lcm/lcm.py +++ b/lcm/osm_lcm/lcm.py @@ -8,9 +8,11 @@ import dbmemory import dbmongo import fslocal import msglocal +import msgkafka from dbbase import DbException from fsbase import FsException from msgbase import MsgException +from os import environ import logging #streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s" @@ -24,7 +26,7 @@ class LcmException(Exception): class Lcm: - def __init__(self, config): + def __init__(self, config_file): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -32,10 +34,11 @@ class Lcm: """ # contains created tasks/futures to be able to cancel self.lcm_tasks = {} - - self.config = config # logging self.logger = logging.getLogger('lcm') + # load configuration + config = self.read_config_file(config_file) + self.config = config self.config = config self.ro_url = "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]) self.ro_tenant = config["RO"]["tenant"] @@ -62,101 +65,102 @@ class Lcm: if config["message"]["driver"] == "local": self.msg = msglocal.msgLocal() self.msg.connect(config["message"]) + elif config["message"]["driver"] == "kafka": + self.msg = msgkafka.MsgKafka() + self.msg.connect(config["message"]) else: raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format( config["storage"]["driver"])) except (DbException, FsException, MsgException) as e: - self.self.logger.critical(str(e), exc_info=True) + self.logger.critical(str(e), exc_info=True) raise LcmException(str(e)) + # def update_nsr_db(self, nsr_id, nsr_desc): + # self.db.replace("nsrs", nsr_id, nsr_desc) + async def create_ns(self, nsr_id): self.logger.debug("create_ns task nsr_id={} Enter".format(nsr_id)) + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) nsr_lcm = { "id": nsr_id, "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"}, "nsr_ip": {}, - "VCA": {"TODO"}, - "status": "BUILD", - "status_detailed": "", + "VCA": {}, # "TODO" } + db_nsr["_admin"]["deploy"] = nsr_lcm + db_nsr["detailed-status"] = "creating" + db_nsr["operational-status"] = "init" deloyment_timeout = 120 try: - ns_request = self.db.get_one("ns_request", {"id": nsr_id}) - nsd = self.db.get_one("nsd", {"id": ns_request["nsd_id"]}) + nsd = db_nsr["nsd"] RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant, - datacenter=ns_request["vim"]) - nsr_lcm["status_detailed"] = "Creating vnfd at RO" - # ns_request["constituent-vnfr-ref"] = [] - - self.db.create("nsr_lcm", nsr_lcm) + datacenter=db_nsr["datacenter"]) # get vnfds, instantiate at RO - self.logger.debug("create_ns task nsr_id={} RO VNFD".format(nsr_id)) for c_vnf in nsd["constituent-vnfd"]: vnfd_id = c_vnf["vnfd-id-ref"] - vnfd = self.db.get_one("vnfd", {"id": vnfd_id}) + self.logger.debug("create_ns task nsr_id={} RO vnfd={} creating".format(nsr_id, vnfd_id)) + db_nsr["detailed-status"] = "Creating vnfd {} at RO".format(vnfd_id) + vnfd = self.db.get_one("vnfds", {"id": vnfd_id}) vnfd.pop("_admin", None) vnfd.pop("_id", None) - # vnfr = deepcopy(vnfd) - # vnfr["member-vnf-index"] = c_vnf["member-vnf-index"] - # vnfr["nsr-id"] = nsr_id - # vnfr["id"] = uuid4() - # vnfr["vnf-id"] = vnfd["id"] - # ns_request["constituent-vnfr-ref"],append(vnfd_id) - - # TODO change id for RO in case it is present - try: + + # look if present + vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id}) + if vnfd_list: + nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"] + self.logger.debug("create_ns task nsr_id={} RO vnfd={} exist. Using RO_id={}".format( + nsr_id, vnfd_id, vnfd_list[0]["uuid"])) + else: desc = await RO.create("vnfd", descriptor=vnfd) nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"] - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 409: # conflict, vnfd already present - print("debug", e) - else: - raise + self.db.replace("nsrs", nsr_id, db_nsr) # db_new("vnfr", vnfr) # db_update("ns_request", nsr_id, ns_request) # create nsd at RO - self.logger.debug("create_ns task nsr_id={} RO NSD".format(nsr_id)) - nsr_lcm["status_detailed"] = "Creating nsd at RO" - nsd_id = ns_request["nsd_id"] - nsd = self.db.get_one("nsd", {"id": nsd_id}) + nsd_id = db_nsr["nsd"]["id"] + self.logger.debug("create_ns task nsr_id={} RO nsd={} creating".format(nsr_id, nsd_id)) + db_nsr["detailed-status"] = "Creating nsd {} at RO".format(nsd_id) + nsd = self.db.get_one("nsds", {"id": nsd_id}) nsd.pop("_admin", None) nsd.pop("_id", None) - try: + + nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id}) + if nsd_list: + nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"] + self.logger.debug("create_ns task nsr_id={} RO nsd={} exist. Using RO_id={}".format( + nsr_id, nsd_id, nsd_list[0]["uuid"])) + else: desc = await RO.create("nsd", descriptor=nsd) nsr_lcm["RO"]["nsd_id"] = desc["uuid"] - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 409: # conflict, nsd already present - print("debug", e) - else: - raise + self.db.replace("nsrs", nsr_id, db_nsr) # Crate ns at RO - self.logger.debug("create_ns task nsr_id={} RO NS".format(nsr_id)) - nsr_lcm["status_detailed"] = "Creating ns at RO" - desc = await RO.create("ns", name=ns_request["name"], datacenter=ns_request["vim"], scenario=nsr_lcm["RO"]["nsd_id"]) + self.logger.debug("create_ns task nsr_id={} RO ns creating".format(nsr_id)) + db_nsr["detailed-status"] = "Creating ns at RO" + desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"], + scenario=nsr_lcm["RO"]["nsd_id"]) RO_nsr_id = desc["uuid"] nsr_lcm["RO"]["nsr_id"] = RO_nsr_id nsr_lcm["RO"]["nsr_status"] = "BUILD" - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + self.db.replace("nsrs", nsr_id, db_nsr) # wait until NS is ready + self.logger.debug("create_ns task nsr_id={} RO ns_id={} waiting to be ready".format(nsr_id, RO_nsr_id)) deloyment_timeout = 600 while deloyment_timeout > 0: ns_status_detailed = "Waiting ns ready at RO" - nsr_lcm["status_detailed"] = ns_status_detailed + db_nsr["detailed-status"] = ns_status_detailed desc = await RO.show("ns", RO_nsr_id) ns_status, ns_status_info = RO.check_ns_status(desc) nsr_lcm["RO"]["nsr_status"] = ns_status if ns_status == "ERROR": raise ROclient.ROClientException(ns_status_info) elif ns_status == "BUILD": - nsr_lcm["status_detailed"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info) + db_nsr["detailed-status"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info) elif ns_status == "ACTIVE": nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc) break @@ -167,107 +171,127 @@ class Lcm: deloyment_timeout -= 5 if deloyment_timeout <= 0: raise ROclient.ROClientException("Timeot wating ns to be ready") - nsr_lcm["status_detailed"] = "Configuring vnfr" - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + db_nsr["detailed-status"] = "Configuring vnfr" + self.db.replace("nsrs", nsr_id, db_nsr) #for nsd in nsr_lcm["descriptors"]["nsd"]: self.logger.debug("create_ns task nsr_id={} VCA look for".format(nsr_id)) for c_vnf in nsd["constituent-vnfd"]: vnfd_id = c_vnf["vnfd-id-ref"] - vnfd_index = int(c_vnf["member-vnf-index"]) - vnfd = self.db.get_one("vnfd", {"id": vnfd_id}) + vnfd_index = str(c_vnf["member-vnf-index"]) + vnfd = self.db.get_one("vnfds", {"id": vnfd_id}) + db_nsr["config-status"] = "config_not_needed" if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"): + db_nsr["config-status"] = "configuring" proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"] config_primitive = vnfd["vnf-configuration"].get("config-primitive") # get parameters for juju charm base_folder = vnfd["_admin"]["storage"] - path = base_folder + "/charms/" + proxy_charm + path = "{}{}/{}/charms".format(base_folder["path"], base_folder["folder"], base_folder["file"], + proxy_charm) mgmt_ip = nsr_lcm['nsr_ip'][vnfd_index] # TODO launch VCA charm # task = asyncio.ensure_future(DeployCharm(self.loop, path, mgmt_ip, config_primitive)) - nsr_lcm["status"] = "DONE" - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + db_nsr["detailed-status"] = "Done" + db_nsr["operational-status"] = "running" + self.db.replace("nsrs", nsr_id, db_nsr) + self.logger.debug("create_ns task nsr_id={} Exit Ok".format(nsr_id)) return nsr_lcm except (ROclient.ROClientException, Exception) as e: - self.logger.debug("create_ns nsr_id={} Exception {}".format(nsr_id, e), exc_info=True) - nsr_lcm["status"] = "ERROR" - nsr_lcm["status_detailed"] += ": ERROR {}".format(e) - finally: - self.logger.debug("create_ns task nsr_id={} Exit".format(nsr_id)) - + db_nsr["operational-status"] = "failed" + db_nsr["detailed-status"] += ": ERROR {}".format(e) + self.db.replace("nsrs", nsr_id, db_nsr) + self.logger.debug( + "create_ns nsr_id={} Exit Exception on '{}': {}".format(nsr_id, db_nsr["detailed-status"], e), + exc_info=True) async def delete_ns(self, nsr_id): - self.logger.debug("delete_ns task nsr_id={} Enter".format(nsr_id)) - nsr_lcm = self.db.get_one("nsr_lcm", {"id": nsr_id}) - ns_request = self.db.get_one("ns_request", {"id": nsr_id}) - - nsr_lcm["status"] = "DELETING" - nsr_lcm["status_detailed"] = "Deleting charms" - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + self.logger.debug("delete_ns task nsr_id={}, Delete_ns task nsr_id={} Enter".format(nsr_id, nsr_id)) + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + nsr_lcm = db_nsr["_admin"]["deploy"] + + db_nsr["operational-status"] = "terminate" + db_nsr["config-status"] = "terminate" + db_nsr["detailed-status"] = "Deleting charms" + self.db.replace("nsrs", nsr_id, db_nsr) # TODO destroy VCA charm # remove from RO RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant, - datacenter=ns_request["vim"]) + datacenter=db_nsr["datacenter"]) # Delete ns - try: - RO_nsr_id = nsr_lcm["RO"]["nsr_id"] - if RO_nsr_id: - nsr_lcm["status_detailed"] = "Deleting ns at RO" + RO_nsr_id = nsr_lcm["RO"]["nsr_id"] + if RO_nsr_id: + try: + db_nsr["detailed-status"] = "Deleting ns at RO" desc = await RO.delete("ns", RO_nsr_id) - print("debug", "deleted RO ns {}".format(RO_nsr_id)) - nsr_lcm["RO"]["nsr_id"] = None - nsr_lcm["RO"]["nsr_status"] = "DELETED" - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 404: + self.logger.debug("delete_ns task nsr_id={} RO ns={} deleted".format(nsr_id, RO_nsr_id)) nsr_lcm["RO"]["nsr_id"] = None nsr_lcm["RO"]["nsr_status"] = "DELETED" - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - print("warning", e) - else: - print("error", e) + except ROclient.ROClientException as e: + if e.http_code == 404: # not found + nsr_lcm["RO"]["nsr_id"] = None + nsr_lcm["RO"]["nsr_status"] = "DELETED" + self.logger.debug("delete_ns task nsr_id={} RO ns={} already deleted".format(nsr_id, RO_nsr_id)) + elif e.http_code == 409: #conflict + self.logger.debug("delete_ns task nsr_id={} RO ns={} delete conflict: {}".format(nsr_id, RO_nsr_id, + e)) + else: + self.logger.error("delete_ns task nsr_id={} RO ns={} delete error: {}".format(nsr_id, RO_nsr_id, e)) + self.db.replace("nsrs", nsr_id, db_nsr) # Delete nsd - try: - RO_nsd_id = nsr_lcm["RO"]["nsd_id"] - if RO_nsd_id: - nsr_lcm["status_detailed"] = "Deleting nsd at RO" + RO_nsd_id = nsr_lcm["RO"]["nsd_id"] + if RO_nsd_id: + try: + db_nsr["detailed-status"] = "Deleting nsd at RO" desc = await RO.delete("nsd", RO_nsd_id) - print("debug", "deleted RO nsd {}".format(RO_nsd_id)) - nsr_lcm["RO"]["nsd_id"] = None - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 404: + self.logger.debug("delete_ns task nsr_id={} RO nsd={} deleted".format(nsr_id, RO_nsd_id)) nsr_lcm["RO"]["nsd_id"] = None - print("warning", e) - else: - print("error", e) + except ROclient.ROClientException as e: + if e.http_code == 404: # not found + nsr_lcm["RO"]["nsd_id"] = None + self.logger.debug("delete_ns task nsr_id={} RO nsd={} already deleted".format(nsr_id, RO_nsd_id)) + elif e.http_code == 409: #conflict + self.logger.debug("delete_ns task nsr_id={} RO nsd={} delete conflict: {}".format(nsr_id, RO_nsd_id, + e)) + else: + self.logger.error("delete_ns task nsr_id={} RO nsd={} delete error: {}".format(nsr_id, RO_nsd_id, + e)) + self.db.replace("nsrs", nsr_id, db_nsr) for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items(): + if not RO_vnfd_id: + continue try: - if RO_vnfd_id: - nsr_lcm["status_detailed"] = "Deleting vnfd at RO" - desc = await RO.delete("vnfd", RO_vnfd_id) - print("debug", "deleted RO vnfd {}".format(RO_vnfd_id)) - nsr_lcm["RO"]["vnfd_id"][vnf_id] = None - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + db_nsr["detailed-status"] = "Deleting vnfd {} at RO".format(vnf_id) + desc = await RO.delete("vnfd", RO_vnfd_id) + self.logger.debug("delete_ns task nsr_id={} RO vnfd={} deleted".format(nsr_id, RO_vnfd_id)) + nsr_lcm["RO"]["vnfd_id"][vnf_id] = None except ROclient.ROClientException as e: - if e.http_code == 404: + if e.http_code == 404: # not found nsr_lcm["RO"]["vnfd_id"][vnf_id] = None - print("warning", e) + self.logger.debug("delete_ns task nsr_id={} RO vnfd={} already deleted ".format(nsr_id, RO_vnfd_id)) + elif e.http_code == 409: #conflict + self.logger.debug("delete_ns task nsr_id={} RO vnfd={} delete conflict: {}".format( + nsr_id, RO_vnfd_id, e)) else: - print("error", e) - self.logger.debug("delete_ns task nsr_id={} Exit".format(nsr_id)) + self.logger.error("delete_ns task nsr_id={} RO vnfd={} delete error: {}".format( + nsr_id, RO_vnfd_id, e)) + self.db.replace("nsrs", nsr_id, db_nsr) + + # TODO delete from database or mark as deleted??? + db_nsr["operational-status"] = "terminated" + self.db.del_one("nsrs", {"_id": nsr_id}) + self.logger.debug("delete_ns task nsr_id={} Exit".format(nsr_id)) async def test(self, param=None): self.logger.debug("Starting/Ending test task: {}".format(param)) - def cancel_tasks(self, nsr_id): """ Cancel all active tasks of a concrete nsr identified for nsr_id @@ -283,15 +307,13 @@ class Lcm: self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name)) self.lcm_tasks[nsr_id] = {} - - async def read_kafka(self): self.logger.debug("kafka task Enter") order_id = 1 # future = asyncio.Future() while True: - command, params = await self.msg.aioread(self.loop, "ns") + command, params = await self.msg.aioread("ns", self.loop) order_id += 1 if command == "exit": print("Bye!") @@ -337,65 +359,78 @@ class Lcm: self.loop = None -def read_config_file(config_file): - # TODO make a [ini] + yaml inside parser - # the configparser library is not suitable, because it does not admit comments at the end of line, - # and not parse integer or boolean - try: - with open(config_file) as f: - conf = yaml.load(f) - # TODO insert envioronment - # for k, v in environ.items(): - # if k.startswith("OSMLCM_"): - # split _ lower add to config - return conf - except Exception as e: - self.logger.critical("At config file '{}': {}".format(config_file, e)) + def read_config_file(self, config_file): + # TODO make a [ini] + yaml inside parser + # the configparser library is not suitable, because it does not admit comments at the end of line, + # and not parse integer or boolean + try: + with open(config_file) as f: + conf = yaml.load(f) + for k, v in environ.items(): + if not k.startswith("OSMLCM_"): + continue + k_items = k.lower().split("_") + c = conf + try: + for k_item in k_items[1:-1]: + if k_item in ("ro", "vca"): + # put in capital letter + k_item = k_item.upper() + c = c[k_item] + if k_items[-1] == "port": + c[k_items[-1]] = int(v) + else: + c[k_items[-1]] = v + except Exception as e: + self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e)) + + return conf + except Exception as e: + self.logger.critical("At config file '{}': {}".format(config_file, e)) if __name__ == '__main__': config_file = "lcm.cfg" - conf = read_config_file(config_file) - lcm = Lcm(conf) - - # FOR TEST - RO_VIM = "OST2_MRT" - - #FILL DATABASE - with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f: - vnfd = yaml.load(f) - vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) - vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"} - lcm.db.create("vnfd", vnfd_clean) - with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f: - vnfd = yaml.load(f) - vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) - vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"} - lcm.db.create("vnfd", vnfd_clean) - with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f: - nsd = yaml.load(f) - nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) - nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"} - lcm.db.create("nsd", nsd_clean) - - ns_request = { - "id": "ns1", - "nsr_id": "ns1", - "name": "pingpongOne", - "vim": RO_VIM, - "nsd_id": nsd_clean["id"], # nsd_ping_pong - } - lcm.db.create("ns_request", ns_request) - ns_request = { - "id": "ns2", - "nsr_id": "ns2", - "name": "pingpongTwo", - "vim": RO_VIM, - "nsd_id": nsd_clean["id"], # nsd_ping_pong - } - lcm.db.create("ns_request", ns_request) + lcm = Lcm(config_file) + + # # FOR TEST + # RO_VIM = "OST2_MRT" + # + # #FILL DATABASE + # with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f: + # vnfd = yaml.load(f) + # vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + # vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"} + # lcm.db.create("vnfd", vnfd_clean) + # with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f: + # vnfd = yaml.load(f) + # vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + # vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"} + # lcm.db.create("vnfd", vnfd_clean) + # with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f: + # nsd = yaml.load(f) + # nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) + # nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"} + # lcm.db.create("nsd", nsd_clean) + # + # ns_request = { + # "id": "ns1", + # "nsr_id": "ns1", + # "name": "pingpongOne", + # "vim": RO_VIM, + # "nsd_id": nsd_clean["id"], # nsd_ping_pong + # } + # lcm.db.create("ns_request", ns_request) + # ns_request = { + # "id": "ns2", + # "nsr_id": "ns2", + # "name": "pingpongTwo", + # "vim": RO_VIM, + # "nsd_id": nsd_clean["id"], # nsd_ping_pong + # } + # lcm.db.create("ns_request", ns_request) lcm.start()