X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Flcm.py;h=8932d8910664bdbd86c6e4bc3b07f133d140b6af;hb=506398326f3d6c95e423ff3b9185a3ffe6a8abe0;hp=851ba090c2831d5ce2f4498e4a9b7d0e65e60e31;hpb=5697b8b03a3acd17827ce536cb8aff15df8776ad;p=osm%2FLCM.git diff --git a/osm_lcm/lcm.py b/osm_lcm/lcm.py index 851ba09..8932d89 100644 --- a/osm_lcm/lcm.py +++ b/osm_lcm/lcm.py @@ -29,7 +29,7 @@ import logging.handlers import getopt import sys -from osm_lcm import ns, prometheus, vim_sdn, netslice +from osm_lcm import ns, vim_sdn, netslice from osm_lcm.ng_ro import NgRoException, NgRoClient from osm_lcm.ROclient import ROClient, ROClientException @@ -44,6 +44,7 @@ from osm_common.fsbase import FsException from osm_common.msgbase import MsgException from osm_lcm.data_utils.database.database import Database from osm_lcm.data_utils.filesystem.filesystem import Filesystem +from osm_lcm.lcm_hc import get_health_check_file from os import environ, path from random import choice as random_choice from n2vc import version as n2vc_version @@ -58,9 +59,6 @@ min_RO_version = "6.0.2" min_n2vc_version = "0.0.2" min_common_version = "0.1.19" -health_check_file = ( - path.expanduser("~") + "/time_last_ping" -) # TODO find better location for this file class Lcm: @@ -98,6 +96,7 @@ class Lcm: # load configuration config = self.read_config_file(config_file) self.config = config + self.health_check_file = get_health_check_file(self.config) self.config["ro_config"] = { "ng": config["RO"].get("ng", False), "uri": config["RO"].get("uri"), @@ -185,6 +184,7 @@ class Lcm: self.db = Database(config).instance.db self.fs = Filesystem(config).instance.fs + self.fs.sync() # copy message configuration in order to remove 'group_id' for msg_admin config_message = config["message"].copy() @@ -214,20 +214,6 @@ class Lcm: # contains created tasks/futures to be able to cancel self.lcm_tasks = TaskRegistry(self.worker_id, self.logger) - if self.config.get("tsdb") and self.config["tsdb"].get("driver"): - if self.config["tsdb"]["driver"] == "prometheus": - self.prometheus = prometheus.Prometheus( - self.config["tsdb"], self.worker_id, self.loop - ) - else: - raise LcmException( - "Invalid configuration param '{}' at '[tsdb]':'driver'".format( - config["tsdb"]["driver"] - ) - ) - else: - self.prometheus = None - async def check_RO_version(self): tries = 14 last_error = None @@ -358,12 +344,12 @@ class Lcm: return self.pings_not_received = 0 try: - with open(health_check_file, "w") as f: + with open(self.health_check_file, "w") as f: f.write(str(time())) except Exception as e: self.logger.error( "Cannot write into '{}' for healthcheck: {}".format( - health_check_file, e + self.health_check_file, e ) ) return @@ -452,6 +438,14 @@ class Lcm: task = asyncio.ensure_future(self.ns.action(nsr_id, nslcmop_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_action", task) return + elif command == "update": + # self.logger.debug("Update NS {}".format(nsr_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns.update(nsr_id, nslcmop_id)) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_update", task) + return elif command == "scale": # self.logger.debug("Update NS {}".format(nsr_id)) nslcmop = params @@ -460,6 +454,32 @@ class Lcm: task = asyncio.ensure_future(self.ns.scale(nsr_id, nslcmop_id)) self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_scale", task) return + elif command == "heal": + # self.logger.debug("Healing NS {}".format(nsr_id)) + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns.heal(nsr_id, nslcmop_id)) + self.lcm_tasks.register( + "ns", nsr_id, nslcmop_id, "ns_heal", task + ) + return + elif command == "migrate": + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns.migrate(nsr_id, nslcmop_id)) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_migrate", task) + return + elif command == "verticalscale": + nslcmop = params + nslcmop_id = nslcmop["_id"] + nsr_id = nslcmop["nsInstanceId"] + task = asyncio.ensure_future(self.ns.vertical_scale(nsr_id, nslcmop_id)) + self.logger.debug("nsr_id,nslcmop_id,task {},{},{}".format(nsr_id, nslcmop_id, task)) + self.lcm_tasks.register("ns", nsr_id, nslcmop_id, "ns_verticalscale", task) + self.logger.debug("LCM task registered {},{},{} ".format(nsr_id, nslcmop_id, task)) + return elif command == "show": nsr_id = params try: @@ -483,12 +503,19 @@ class Lcm: elif command == "deleted": return # TODO cleaning of task just in case should be done elif command in ( + "vnf_terminated", + "policy_updated", "terminated", "instantiated", "scaled", + "healed", "actioned", + "updated", + "migrated", + "verticalscaled", ): # "scaled-cooldown-time" return + elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc) if command == "instantiate": # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"])) @@ -541,6 +568,7 @@ class Lcm: "terminated", "instantiated", "scaled", + "healed", "actioned", ): # "scaled-cooldown-time" return @@ -684,9 +712,7 @@ class Lcm: # check RO version self.loop.run_until_complete(self.check_RO_version()) - self.ns = ns.NsLcm( - self.msg, self.lcm_tasks, self.config, self.loop, self.prometheus - ) + self.ns = ns.NsLcm(self.msg, self.lcm_tasks, self.config, self.loop) self.netslice = netslice.NetsliceLcm( self.msg, self.lcm_tasks, self.config, self.loop, self.ns ) @@ -701,13 +727,10 @@ class Lcm: self.msg, self.lcm_tasks, self.config, self.loop ) - # configure tsdb prometheus - if self.prometheus: - self.loop.run_until_complete(self.prometheus.start()) - self.loop.run_until_complete( asyncio.gather(self.kafka_read(), self.kafka_ping()) ) + # TODO # self.logger.debug("Terminating cancelling creation tasks") # self.lcm_tasks.cancel("ALL", "create") @@ -736,7 +759,7 @@ class Lcm: try: # read file as yaml format with open(config_file) as f: - conf = yaml.load(f, Loader=yaml.Loader) + conf = yaml.safe_load(f) # Ensure all sections are not empty for k in ( "global", @@ -854,7 +877,7 @@ if __name__ == "__main__": elif o == "--health-check": from osm_lcm.lcm_hc import health_check - health_check(health_check_file, Lcm.ping_interval_pace) + health_check(config_file, Lcm.ping_interval_pace) # elif o == "--log-socket-port": # log_socket_port = a # elif o == "--log-socket-host":