From: tierno Date: Mon, 5 Feb 2018 13:53:28 +0000 (+0100) Subject: lightweight build unify database record to nsrs X-Git-Tag: v3.0.3~4 X-Git-Url: https://osm.etsi.org/gitweb/?a=commitdiff_plain;h=f3c4dbc42e206bcc0d4d3369f6d0d156d7ffe669;p=osm%2FRO.git lightweight build unify database record to nsrs Change-Id: I39ac2baa21629361b16587aa8aebe7b358d85297 Signed-off-by: tierno --- diff --git a/lcm/Dockerfile b/lcm/Dockerfile new file mode 100644 index 00000000..f27dadf4 --- /dev/null +++ b/lcm/Dockerfile @@ -0,0 +1,53 @@ +FROM ubuntu:16.04 + +# Set the working directory to /app +WORKDIR /app/osm_lcm + +# Copy the current directory contents into the container at /app +ADD . /app + +RUN apt update && apt install -y git python3 \ + python3-pip python3-pymongo python3-yaml python3-aiohttp \ + && pip3 install aiokafka \ + && mkdir -p /app/storage/kafka && mkdir -p /app/log + + + +LABEL Maintainer="alfonso.tiernosepulveda@telefonica.com" \ + Description="This implements a north bound interface for OSM" \ + Version="1.0" + # Author="Alfonso Tierno" + +# Used for local storage +VOLUME /app/storage +# Used for logs +VOLUME /app/log + +# The following ENV can be added with "docker run -e xxx' to configure +# RO +ENV OSMLCM_RO_HOST ro +ENV OSMLCM_RO_PORT 9090 +ENV OSMLCM_RO_TENANT osm + +# VCA +ENV OSMLCM_VCA_HOST vca +ENV OSMLCM_VCA_PORT: 17070 +ENV OSMLCM_VCA_USER: admin +ENV OSMLCM_VCA_SECRET: secret + +# database +ENV OSMLCM_DATABASE_DRIVER mongo +ENV OSMLCM_DATABASE_HOST mongo +ENV OSMLCM_DATABASE_PORT 27017 +ENV OSMLCM_STORAGE_DRIVER local +ENV OSMLCM_STORAGE_PATH /app/storage + +# message +ENV OSMLCM_MESSAGE_DRIVER kafka +ENV OSMLCM_MESSAGE_HOST kafka +ENV OSMLCM_MESSAGE_PORT 9092 + +ENV PYTHONPATH /app/osm_common +# Run app.py when the container launches +CMD ["python3", "lcm.py"] + diff --git a/lcm/LICENSE b/lcm/LICENSE new file mode 100644 index 00000000..8dada3ed --- /dev/null +++ b/lcm/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/lcm/osm_common/msgkafka.py b/lcm/osm_common/msgkafka.py index 7b294fc0..f3507452 100644 --- a/lcm/osm_common/msgkafka.py +++ b/lcm/osm_common/msgkafka.py @@ -6,7 +6,7 @@ import asyncio import yaml #import json -class msgKafka(MsgBase): +class MsgKafka(MsgBase): def __init__(self): self.host = None self.port = None @@ -26,24 +26,25 @@ class msgKafka(MsgBase): except Exception as e: # TODO refine raise MsgException(str(e)) - def write(self, topic, msg, key): - + def write(self, topic, key, msg): try: - self.loop.run_until_complete(self.aiowrite(key, msg=yaml.safe_dump(msg, default_flow_style=True), topic=topic)) + self.loop.run_until_complete(self.aiowrite(topic=topic, key=key, msg=yaml.safe_dump(msg, default_flow_style=True))) except Exception as e: raise MsgException("Error writing {} topic: {}".format(topic, str(e))) def read(self, topic): - self.topic_lst.append(topic) + #self.topic_lst.append(topic) try: - return self.loop.run_until_complete(self.aioread(self.topic_lst)) + return self.loop.run_until_complete(self.aioread(topic)) except Exception as e: raise MsgException("Error reading {} topic: {}".format(topic, str(e))) - async def aiowrite(self, key, msg, topic): + async def aiowrite(self, topic, key, msg, loop=None): try: - self.producer = AIOKafkaProducer(loop=self.loop, key_serializer=str.encode, value_serializer=str.encode, + if not loop: + loop = self.loop + self.producer = AIOKafkaProducer(loop=loop, key_serializer=str.encode, value_serializer=str.encode, bootstrap_servers=self.broker) await self.producer.start() await self.producer.send(topic=topic, key=key, value=msg) @@ -52,10 +53,12 @@ class msgKafka(MsgBase): finally: await self.producer.stop() - async def aioread(self, topic): - self.consumer = AIOKafkaConsumer(loop=self.loop, bootstrap_servers=self.broker) + async def aioread(self, topic, loop=None): + if not loop: + loop = self.loop + self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker) await self.consumer.start() - self.consumer.subscribe(topic) + self.consumer.subscribe([topic]) try: async for message in self.consumer: return yaml.load(message.key), yaml.load(message.value) diff --git a/lcm/osm_common/msglocal.py b/lcm/osm_common/msglocal.py index 5045181f..a380e618 100644 --- a/lcm/osm_common/msglocal.py +++ b/lcm/osm_common/msglocal.py @@ -58,8 +58,10 @@ class msgLocal(MsgBase): except Exception as e: # TODO refine raise MsgException(str(e)) - async def aioread(self, loop, topic): + async def aioread(self, topic, loop=None): try: + if not loop: + loop = asyncio.get_event_loop() if topic not in self.files: self.files[topic] = open(self.path + topic, "r+") # ignore previous content diff --git a/lcm/osm_lcm/ROclient.py b/lcm/osm_lcm/ROclient.py index 84ce7aa1..bbea5fe3 100644 --- a/lcm/osm_lcm/ROclient.py +++ b/lcm/osm_lcm/ROclient.py @@ -272,7 +272,7 @@ class ROClient: """ ns_ip={} for vnf in ns_descriptor["vnfs"]: - ns_ip[vnf["member_vnf_index"]] = vnf["ip_address"] + ns_ip[str(vnf["member_vnf_index"])] = vnf["ip_address"] #uuid sce_vnf_id # vnf[mgmt_access]: '{interface_id: cf3cbf00-385c-49b4-9a3f-b400b7b15dc6, vm_id: d0dd22a9-91ef-46f1-8e8f-8cf4b2d5b2d7}' # vnf[vms] @@ -295,6 +295,8 @@ class ROClient: filter_dict=filter_by) if isinstance(content, dict): if len(content) == 1: + for _, v in content.items(): + return v return content.values()[0] else: raise ROClientException("Output not a list neither dict with len equal 1", http_code=500) diff --git a/lcm/osm_lcm/lcm.cfg b/lcm/osm_lcm/lcm.cfg index c93323f5..a44ee75c 100644 --- a/lcm/osm_lcm/lcm.cfg +++ b/lcm/osm_lcm/lcm.cfg @@ -22,8 +22,7 @@ VCA: #[database] database: - #driver: mongo # mongo or memory - driver: memory + driver: mongo # mongo or memory host: mongo # hostname or IP port: 27017 name: osm @@ -34,12 +33,13 @@ database: storage: driver: local # local filesystem # for local provide file path - #path: /app/storage - path: /home/atierno/OSM/osm/RO/lcm/local/storage + path: /app/storage #[message] message: driver: local # local or kafka # for local provide file path - #path: /app/storage/kafka - path: /home/atierno/OSM/osm/RO/lcm/local/kafka + path: /app/storage/kafka + # for kafka provide host and port + host: kafka + port: 9092 diff --git a/lcm/osm_lcm/lcm.py b/lcm/osm_lcm/lcm.py index f35ec606..06393e5f 100644 --- a/lcm/osm_lcm/lcm.py +++ b/lcm/osm_lcm/lcm.py @@ -8,9 +8,11 @@ import dbmemory import dbmongo import fslocal import msglocal +import msgkafka from dbbase import DbException from fsbase import FsException from msgbase import MsgException +from os import environ import logging #streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s" @@ -24,7 +26,7 @@ class LcmException(Exception): class Lcm: - def __init__(self, config): + def __init__(self, config_file): """ Init, Connect to database, filesystem storage, and messaging :param config: two level dictionary with configuration. Top level should contain 'database', 'storage', @@ -32,10 +34,11 @@ class Lcm: """ # contains created tasks/futures to be able to cancel self.lcm_tasks = {} - - self.config = config # logging self.logger = logging.getLogger('lcm') + # load configuration + config = self.read_config_file(config_file) + self.config = config self.config = config self.ro_url = "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"]) self.ro_tenant = config["RO"]["tenant"] @@ -62,101 +65,102 @@ class Lcm: if config["message"]["driver"] == "local": self.msg = msglocal.msgLocal() self.msg.connect(config["message"]) + elif config["message"]["driver"] == "kafka": + self.msg = msgkafka.MsgKafka() + self.msg.connect(config["message"]) else: raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format( config["storage"]["driver"])) except (DbException, FsException, MsgException) as e: - self.self.logger.critical(str(e), exc_info=True) + self.logger.critical(str(e), exc_info=True) raise LcmException(str(e)) + # def update_nsr_db(self, nsr_id, nsr_desc): + # self.db.replace("nsrs", nsr_id, nsr_desc) + async def create_ns(self, nsr_id): self.logger.debug("create_ns task nsr_id={} Enter".format(nsr_id)) + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) nsr_lcm = { "id": nsr_id, "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"}, "nsr_ip": {}, - "VCA": {"TODO"}, - "status": "BUILD", - "status_detailed": "", + "VCA": {}, # "TODO" } + db_nsr["_admin"]["deploy"] = nsr_lcm + db_nsr["detailed-status"] = "creating" + db_nsr["operational-status"] = "init" deloyment_timeout = 120 try: - ns_request = self.db.get_one("ns_request", {"id": nsr_id}) - nsd = self.db.get_one("nsd", {"id": ns_request["nsd_id"]}) + nsd = db_nsr["nsd"] RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant, - datacenter=ns_request["vim"]) - nsr_lcm["status_detailed"] = "Creating vnfd at RO" - # ns_request["constituent-vnfr-ref"] = [] - - self.db.create("nsr_lcm", nsr_lcm) + datacenter=db_nsr["datacenter"]) # get vnfds, instantiate at RO - self.logger.debug("create_ns task nsr_id={} RO VNFD".format(nsr_id)) for c_vnf in nsd["constituent-vnfd"]: vnfd_id = c_vnf["vnfd-id-ref"] - vnfd = self.db.get_one("vnfd", {"id": vnfd_id}) + self.logger.debug("create_ns task nsr_id={} RO vnfd={} creating".format(nsr_id, vnfd_id)) + db_nsr["detailed-status"] = "Creating vnfd {} at RO".format(vnfd_id) + vnfd = self.db.get_one("vnfds", {"id": vnfd_id}) vnfd.pop("_admin", None) vnfd.pop("_id", None) - # vnfr = deepcopy(vnfd) - # vnfr["member-vnf-index"] = c_vnf["member-vnf-index"] - # vnfr["nsr-id"] = nsr_id - # vnfr["id"] = uuid4() - # vnfr["vnf-id"] = vnfd["id"] - # ns_request["constituent-vnfr-ref"],append(vnfd_id) - - # TODO change id for RO in case it is present - try: + + # look if present + vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id}) + if vnfd_list: + nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"] + self.logger.debug("create_ns task nsr_id={} RO vnfd={} exist. Using RO_id={}".format( + nsr_id, vnfd_id, vnfd_list[0]["uuid"])) + else: desc = await RO.create("vnfd", descriptor=vnfd) nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"] - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 409: # conflict, vnfd already present - print("debug", e) - else: - raise + self.db.replace("nsrs", nsr_id, db_nsr) # db_new("vnfr", vnfr) # db_update("ns_request", nsr_id, ns_request) # create nsd at RO - self.logger.debug("create_ns task nsr_id={} RO NSD".format(nsr_id)) - nsr_lcm["status_detailed"] = "Creating nsd at RO" - nsd_id = ns_request["nsd_id"] - nsd = self.db.get_one("nsd", {"id": nsd_id}) + nsd_id = db_nsr["nsd"]["id"] + self.logger.debug("create_ns task nsr_id={} RO nsd={} creating".format(nsr_id, nsd_id)) + db_nsr["detailed-status"] = "Creating nsd {} at RO".format(nsd_id) + nsd = self.db.get_one("nsds", {"id": nsd_id}) nsd.pop("_admin", None) nsd.pop("_id", None) - try: + + nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id}) + if nsd_list: + nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"] + self.logger.debug("create_ns task nsr_id={} RO nsd={} exist. Using RO_id={}".format( + nsr_id, nsd_id, nsd_list[0]["uuid"])) + else: desc = await RO.create("nsd", descriptor=nsd) nsr_lcm["RO"]["nsd_id"] = desc["uuid"] - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 409: # conflict, nsd already present - print("debug", e) - else: - raise + self.db.replace("nsrs", nsr_id, db_nsr) # Crate ns at RO - self.logger.debug("create_ns task nsr_id={} RO NS".format(nsr_id)) - nsr_lcm["status_detailed"] = "Creating ns at RO" - desc = await RO.create("ns", name=ns_request["name"], datacenter=ns_request["vim"], scenario=nsr_lcm["RO"]["nsd_id"]) + self.logger.debug("create_ns task nsr_id={} RO ns creating".format(nsr_id)) + db_nsr["detailed-status"] = "Creating ns at RO" + desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"], + scenario=nsr_lcm["RO"]["nsd_id"]) RO_nsr_id = desc["uuid"] nsr_lcm["RO"]["nsr_id"] = RO_nsr_id nsr_lcm["RO"]["nsr_status"] = "BUILD" - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + self.db.replace("nsrs", nsr_id, db_nsr) # wait until NS is ready + self.logger.debug("create_ns task nsr_id={} RO ns_id={} waiting to be ready".format(nsr_id, RO_nsr_id)) deloyment_timeout = 600 while deloyment_timeout > 0: ns_status_detailed = "Waiting ns ready at RO" - nsr_lcm["status_detailed"] = ns_status_detailed + db_nsr["detailed-status"] = ns_status_detailed desc = await RO.show("ns", RO_nsr_id) ns_status, ns_status_info = RO.check_ns_status(desc) nsr_lcm["RO"]["nsr_status"] = ns_status if ns_status == "ERROR": raise ROclient.ROClientException(ns_status_info) elif ns_status == "BUILD": - nsr_lcm["status_detailed"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info) + db_nsr["detailed-status"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info) elif ns_status == "ACTIVE": nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc) break @@ -167,107 +171,127 @@ class Lcm: deloyment_timeout -= 5 if deloyment_timeout <= 0: raise ROclient.ROClientException("Timeot wating ns to be ready") - nsr_lcm["status_detailed"] = "Configuring vnfr" - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + db_nsr["detailed-status"] = "Configuring vnfr" + self.db.replace("nsrs", nsr_id, db_nsr) #for nsd in nsr_lcm["descriptors"]["nsd"]: self.logger.debug("create_ns task nsr_id={} VCA look for".format(nsr_id)) for c_vnf in nsd["constituent-vnfd"]: vnfd_id = c_vnf["vnfd-id-ref"] - vnfd_index = int(c_vnf["member-vnf-index"]) - vnfd = self.db.get_one("vnfd", {"id": vnfd_id}) + vnfd_index = str(c_vnf["member-vnf-index"]) + vnfd = self.db.get_one("vnfds", {"id": vnfd_id}) + db_nsr["config-status"] = "config_not_needed" if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"): + db_nsr["config-status"] = "configuring" proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"] config_primitive = vnfd["vnf-configuration"].get("config-primitive") # get parameters for juju charm base_folder = vnfd["_admin"]["storage"] - path = base_folder + "/charms/" + proxy_charm + path = "{}{}/{}/charms".format(base_folder["path"], base_folder["folder"], base_folder["file"], + proxy_charm) mgmt_ip = nsr_lcm['nsr_ip'][vnfd_index] # TODO launch VCA charm # task = asyncio.ensure_future(DeployCharm(self.loop, path, mgmt_ip, config_primitive)) - nsr_lcm["status"] = "DONE" - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + db_nsr["detailed-status"] = "Done" + db_nsr["operational-status"] = "running" + self.db.replace("nsrs", nsr_id, db_nsr) + self.logger.debug("create_ns task nsr_id={} Exit Ok".format(nsr_id)) return nsr_lcm except (ROclient.ROClientException, Exception) as e: - self.logger.debug("create_ns nsr_id={} Exception {}".format(nsr_id, e), exc_info=True) - nsr_lcm["status"] = "ERROR" - nsr_lcm["status_detailed"] += ": ERROR {}".format(e) - finally: - self.logger.debug("create_ns task nsr_id={} Exit".format(nsr_id)) - + db_nsr["operational-status"] = "failed" + db_nsr["detailed-status"] += ": ERROR {}".format(e) + self.db.replace("nsrs", nsr_id, db_nsr) + self.logger.debug( + "create_ns nsr_id={} Exit Exception on '{}': {}".format(nsr_id, db_nsr["detailed-status"], e), + exc_info=True) async def delete_ns(self, nsr_id): - self.logger.debug("delete_ns task nsr_id={} Enter".format(nsr_id)) - nsr_lcm = self.db.get_one("nsr_lcm", {"id": nsr_id}) - ns_request = self.db.get_one("ns_request", {"id": nsr_id}) - - nsr_lcm["status"] = "DELETING" - nsr_lcm["status_detailed"] = "Deleting charms" - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + self.logger.debug("delete_ns task nsr_id={}, Delete_ns task nsr_id={} Enter".format(nsr_id, nsr_id)) + db_nsr = self.db.get_one("nsrs", {"_id": nsr_id}) + nsr_lcm = db_nsr["_admin"]["deploy"] + + db_nsr["operational-status"] = "terminate" + db_nsr["config-status"] = "terminate" + db_nsr["detailed-status"] = "Deleting charms" + self.db.replace("nsrs", nsr_id, db_nsr) # TODO destroy VCA charm # remove from RO RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant, - datacenter=ns_request["vim"]) + datacenter=db_nsr["datacenter"]) # Delete ns - try: - RO_nsr_id = nsr_lcm["RO"]["nsr_id"] - if RO_nsr_id: - nsr_lcm["status_detailed"] = "Deleting ns at RO" + RO_nsr_id = nsr_lcm["RO"]["nsr_id"] + if RO_nsr_id: + try: + db_nsr["detailed-status"] = "Deleting ns at RO" desc = await RO.delete("ns", RO_nsr_id) - print("debug", "deleted RO ns {}".format(RO_nsr_id)) - nsr_lcm["RO"]["nsr_id"] = None - nsr_lcm["RO"]["nsr_status"] = "DELETED" - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 404: + self.logger.debug("delete_ns task nsr_id={} RO ns={} deleted".format(nsr_id, RO_nsr_id)) nsr_lcm["RO"]["nsr_id"] = None nsr_lcm["RO"]["nsr_status"] = "DELETED" - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - print("warning", e) - else: - print("error", e) + except ROclient.ROClientException as e: + if e.http_code == 404: # not found + nsr_lcm["RO"]["nsr_id"] = None + nsr_lcm["RO"]["nsr_status"] = "DELETED" + self.logger.debug("delete_ns task nsr_id={} RO ns={} already deleted".format(nsr_id, RO_nsr_id)) + elif e.http_code == 409: #conflict + self.logger.debug("delete_ns task nsr_id={} RO ns={} delete conflict: {}".format(nsr_id, RO_nsr_id, + e)) + else: + self.logger.error("delete_ns task nsr_id={} RO ns={} delete error: {}".format(nsr_id, RO_nsr_id, e)) + self.db.replace("nsrs", nsr_id, db_nsr) # Delete nsd - try: - RO_nsd_id = nsr_lcm["RO"]["nsd_id"] - if RO_nsd_id: - nsr_lcm["status_detailed"] = "Deleting nsd at RO" + RO_nsd_id = nsr_lcm["RO"]["nsd_id"] + if RO_nsd_id: + try: + db_nsr["detailed-status"] = "Deleting nsd at RO" desc = await RO.delete("nsd", RO_nsd_id) - print("debug", "deleted RO nsd {}".format(RO_nsd_id)) - nsr_lcm["RO"]["nsd_id"] = None - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) - except ROclient.ROClientException as e: - if e.http_code == 404: + self.logger.debug("delete_ns task nsr_id={} RO nsd={} deleted".format(nsr_id, RO_nsd_id)) nsr_lcm["RO"]["nsd_id"] = None - print("warning", e) - else: - print("error", e) + except ROclient.ROClientException as e: + if e.http_code == 404: # not found + nsr_lcm["RO"]["nsd_id"] = None + self.logger.debug("delete_ns task nsr_id={} RO nsd={} already deleted".format(nsr_id, RO_nsd_id)) + elif e.http_code == 409: #conflict + self.logger.debug("delete_ns task nsr_id={} RO nsd={} delete conflict: {}".format(nsr_id, RO_nsd_id, + e)) + else: + self.logger.error("delete_ns task nsr_id={} RO nsd={} delete error: {}".format(nsr_id, RO_nsd_id, + e)) + self.db.replace("nsrs", nsr_id, db_nsr) for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items(): + if not RO_vnfd_id: + continue try: - if RO_vnfd_id: - nsr_lcm["status_detailed"] = "Deleting vnfd at RO" - desc = await RO.delete("vnfd", RO_vnfd_id) - print("debug", "deleted RO vnfd {}".format(RO_vnfd_id)) - nsr_lcm["RO"]["vnfd_id"][vnf_id] = None - self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm) + db_nsr["detailed-status"] = "Deleting vnfd {} at RO".format(vnf_id) + desc = await RO.delete("vnfd", RO_vnfd_id) + self.logger.debug("delete_ns task nsr_id={} RO vnfd={} deleted".format(nsr_id, RO_vnfd_id)) + nsr_lcm["RO"]["vnfd_id"][vnf_id] = None except ROclient.ROClientException as e: - if e.http_code == 404: + if e.http_code == 404: # not found nsr_lcm["RO"]["vnfd_id"][vnf_id] = None - print("warning", e) + self.logger.debug("delete_ns task nsr_id={} RO vnfd={} already deleted ".format(nsr_id, RO_vnfd_id)) + elif e.http_code == 409: #conflict + self.logger.debug("delete_ns task nsr_id={} RO vnfd={} delete conflict: {}".format( + nsr_id, RO_vnfd_id, e)) else: - print("error", e) - self.logger.debug("delete_ns task nsr_id={} Exit".format(nsr_id)) + self.logger.error("delete_ns task nsr_id={} RO vnfd={} delete error: {}".format( + nsr_id, RO_vnfd_id, e)) + self.db.replace("nsrs", nsr_id, db_nsr) + + # TODO delete from database or mark as deleted??? + db_nsr["operational-status"] = "terminated" + self.db.del_one("nsrs", {"_id": nsr_id}) + self.logger.debug("delete_ns task nsr_id={} Exit".format(nsr_id)) async def test(self, param=None): self.logger.debug("Starting/Ending test task: {}".format(param)) - def cancel_tasks(self, nsr_id): """ Cancel all active tasks of a concrete nsr identified for nsr_id @@ -283,15 +307,13 @@ class Lcm: self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name)) self.lcm_tasks[nsr_id] = {} - - async def read_kafka(self): self.logger.debug("kafka task Enter") order_id = 1 # future = asyncio.Future() while True: - command, params = await self.msg.aioread(self.loop, "ns") + command, params = await self.msg.aioread("ns", self.loop) order_id += 1 if command == "exit": print("Bye!") @@ -337,65 +359,78 @@ class Lcm: self.loop = None -def read_config_file(config_file): - # TODO make a [ini] + yaml inside parser - # the configparser library is not suitable, because it does not admit comments at the end of line, - # and not parse integer or boolean - try: - with open(config_file) as f: - conf = yaml.load(f) - # TODO insert envioronment - # for k, v in environ.items(): - # if k.startswith("OSMLCM_"): - # split _ lower add to config - return conf - except Exception as e: - self.logger.critical("At config file '{}': {}".format(config_file, e)) + def read_config_file(self, config_file): + # TODO make a [ini] + yaml inside parser + # the configparser library is not suitable, because it does not admit comments at the end of line, + # and not parse integer or boolean + try: + with open(config_file) as f: + conf = yaml.load(f) + for k, v in environ.items(): + if not k.startswith("OSMLCM_"): + continue + k_items = k.lower().split("_") + c = conf + try: + for k_item in k_items[1:-1]: + if k_item in ("ro", "vca"): + # put in capital letter + k_item = k_item.upper() + c = c[k_item] + if k_items[-1] == "port": + c[k_items[-1]] = int(v) + else: + c[k_items[-1]] = v + except Exception as e: + self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e)) + + return conf + except Exception as e: + self.logger.critical("At config file '{}': {}".format(config_file, e)) if __name__ == '__main__': config_file = "lcm.cfg" - conf = read_config_file(config_file) - lcm = Lcm(conf) - - # FOR TEST - RO_VIM = "OST2_MRT" - - #FILL DATABASE - with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f: - vnfd = yaml.load(f) - vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) - vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"} - lcm.db.create("vnfd", vnfd_clean) - with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f: - vnfd = yaml.load(f) - vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) - vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"} - lcm.db.create("vnfd", vnfd_clean) - with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f: - nsd = yaml.load(f) - nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) - nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"} - lcm.db.create("nsd", nsd_clean) - - ns_request = { - "id": "ns1", - "nsr_id": "ns1", - "name": "pingpongOne", - "vim": RO_VIM, - "nsd_id": nsd_clean["id"], # nsd_ping_pong - } - lcm.db.create("ns_request", ns_request) - ns_request = { - "id": "ns2", - "nsr_id": "ns2", - "name": "pingpongTwo", - "vim": RO_VIM, - "nsd_id": nsd_clean["id"], # nsd_ping_pong - } - lcm.db.create("ns_request", ns_request) + lcm = Lcm(config_file) + + # # FOR TEST + # RO_VIM = "OST2_MRT" + # + # #FILL DATABASE + # with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f: + # vnfd = yaml.load(f) + # vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + # vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"} + # lcm.db.create("vnfd", vnfd_clean) + # with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f: + # vnfd = yaml.load(f) + # vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd) + # vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"} + # lcm.db.create("vnfd", vnfd_clean) + # with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f: + # nsd = yaml.load(f) + # nsd_clean, _ = ROclient.remove_envelop("nsd", nsd) + # nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"} + # lcm.db.create("nsd", nsd_clean) + # + # ns_request = { + # "id": "ns1", + # "nsr_id": "ns1", + # "name": "pingpongOne", + # "vim": RO_VIM, + # "nsd_id": nsd_clean["id"], # nsd_ping_pong + # } + # lcm.db.create("ns_request", ns_request) + # ns_request = { + # "id": "ns2", + # "nsr_id": "ns2", + # "name": "pingpongTwo", + # "vim": RO_VIM, + # "nsd_id": nsd_clean["id"], # nsd_ping_pong + # } + # lcm.db.create("ns_request", ns_request) lcm.start()