import ROclient
import ns
import vim_sdn
+import netslice
from lcm_utils import versiontuple, LcmException, TaskRegistry
# from osm_lcm import version as lcm_version, version_date as lcm_version_date, ROclient
__author__ = "Alfonso Tierno"
-min_RO_version = [0, 5, 72]
+min_RO_version = [0, 5, 84]
min_n2vc_version = "0.0.2"
-min_common_version = "0.1.7"
+min_common_version = "0.1.11"
# uncomment if LCM is installed as library and installed, and get them from __init__.py
-lcm_version = '0.1.17'
-lcm_version_date = '2018-10-11'
+lcm_version = '0.1.27'
+lcm_version_date = '2018-11-27'
class Lcm:
+ ping_interval_pace = 120 # how many time ping is send once is confirmed all is running
+ ping_interval_boot = 5 # how many time ping is sent when booting
+
def __init__(self, config_file, loop=None):
"""
Init, Connect to database, filesystem storage, and messaging
raise LcmException("Not compatible osm/N2VC version '{}'. Needed '{}' or higher".format(
n2vc_version, min_n2vc_version))
# check version of common
- if versiontuple(common_version) < versiontuple("0.1.7"):
+ if versiontuple(common_version) < versiontuple(min_common_version):
raise LcmException("Not compatible osm/common version '{}'. Needed '{}' or higher".format(
common_version, min_common_version))
raise LcmException(str(e))
self.ns = ns.NsLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.vca_config, self.loop)
+ self.netslice = netslice.NetsliceLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config,
+ self.vca_config, self.loop)
self.vim = vim_sdn.VimLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
self.sdn = vim_sdn.SdnLcm(self.db, self.msg, self.fs, self.lcm_tasks, self.ro_config, self.loop)
try:
await self.msg.aiowrite("admin", "ping", {"from": "lcm", "to": "lcm"}, self.loop)
# time between pings are low when it is not received and at starting
- wait_time = 5 if not kafka_has_received else 120
+ wait_time = self.ping_interval_boot if not kafka_has_received else self.ping_interval_pace
if not self.pings_not_received:
kafka_has_received = True
self.pings_not_received += 1
first_start = True
while consecutive_errors < 10:
try:
- topics = ("admin", "ns", "vim_account", "sdn")
+ topics = ("admin", "ns", "vim_account", "sdn", "nsi")
topic, command, params = await self.msg.aioread(topics, self.loop)
if topic != "admin" and command != "ping":
self.logger.debug("Task kafka_read receives {} {}: {}".format(topic, command, params))
continue # TODO cleaning of task just in case should be done
elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
continue
+ elif topic == "nsi": # netslice LCM processes (instantiate, terminate, etc)
+ if command == "instantiate":
+ # self.logger.debug("Instantiating Network Slice {}".format(nsilcmop["netsliceInstanceId"]))
+ nsilcmop = params
+ nsilcmop_id = nsilcmop["_id"] # slice operation id
+ nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
+ task = asyncio.ensure_future(self.netslice.instantiate(nsir_id, nsilcmop_id))
+ self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_instantiate", task)
+ continue
+ elif command == "terminate":
+ # self.logger.debug("Terminating Network Slice NS {}".format(nsilcmop["netsliceInstanceId"]))
+ nsilcmop = params
+ nsilcmop_id = nsilcmop["_id"] # slice operation id
+ nsir_id = nsilcmop["netsliceInstanceId"] # slice record id
+ self.lcm_tasks.cancel(topic, nsir_id)
+ task = asyncio.ensure_future(self.netslice.terminate(nsir_id, nsilcmop_id))
+ self.lcm_tasks.register("nsi", nsir_id, nsilcmop_id, "nsi_terminate", task)
+ continue
+ elif command == "show":
+ try:
+ db_nsir = self.db.get_one("nsirs", {"_id": nsir_id})
+ print("nsir:\n _id={}\n operational-status: {}\n config-status: {}"
+ "\n detailed-status: {}\n deploy: {}\n tasks: {}"
+ "".format(nsir_id, db_nsir["operational-status"], db_nsir["config-status"],
+ db_nsir["detailed-status"],
+ db_nsir["_admin"]["deployed"], self.lcm_netslice_tasks.get(nsir_id)))
+ except Exception as e:
+ print("nsir {} not found: {}".format(nsir_id, e))
+ sys.stdout.flush()
+ continue
+ elif command == "deleted":
+ continue # TODO cleaning of task just in case should be done
+ elif command in ("terminated", "instantiated", "scaled", "actioned"): # "scaled-cooldown-time"
+ continue
elif topic == "vim_account":
vim_id = params["_id"]
if command == "create":
# self.logger.debug("Task kafka_read terminating")
self.logger.debug("Task kafka_read exit")
+ def health_check(self):
+
+ global exit_code
+ task = None
+ exit_code = 1
+
+ def health_check_callback(topic, command, params):
+ global exit_code
+ print("receiving callback {} {} {}".format(topic, command, params))
+ if topic == "admin" and command == "ping" and params["to"] == "lcm" and params["from"] == "lcm":
+ # print("received LCM ping")
+ exit_code = 0
+ task.cancel()
+
+ try:
+ task = asyncio.ensure_future(self.msg.aioread(("admin",), self.loop, health_check_callback))
+ self.loop.run_until_complete(task)
+ except Exception:
+ pass
+ exit(exit_code)
+
def start(self):
# check RO version
if not k.startswith("OSMLCM_"):
continue
k_items = k.lower().split("_")
+ if len(k_items) < 3:
+ continue
+ if k_items[1] in ("ro", "vca"):
+ # put in capital letter
+ k_items[1] = k_items[1].upper()
c = conf
try:
for k_item in k_items[1:-1]:
- if k_item in ("ro", "vca"):
- # put in capital letter
- k_item = k_item.upper()
c = c[k_item]
if k_items[-1] == "port":
c[k_items[-1]] = int(v)
def usage():
print("""Usage: {} [options]
-c|--config [configuration_file]: loads the configuration file (default: ./nbi.cfg)
+ --health-check: do not run lcm, but inspect kafka bus to determine if lcm is healthy
-h|--help: shows this help
""".format(sys.argv[0]))
# --log-socket-host HOST: send logs to this host")
if __name__ == '__main__':
try:
# load parameters and configuration
- opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help"])
+ opts, args = getopt.getopt(sys.argv[1:], "hc:", ["config=", "help", "health-check"])
# TODO add "log-socket-host=", "log-socket-port=", "log-file="
config_file = None
+ health_check = None
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("-c", "--config"):
config_file = a
+ elif o == "--health-check":
+ health_check = True
# elif o == "--log-socket-port":
# log_socket_port = a
# elif o == "--log-socket-host":
assert False, "Unhandled option"
if config_file:
if not path.isfile(config_file):
- print("configuration file '{}' that not exist".format(config_file), file=sys.stderr)
+ print("configuration file '{}' not exist".format(config_file), file=sys.stderr)
exit(1)
else:
for config_file in (__file__[:__file__.rfind(".")] + ".cfg", "./lcm.cfg", "/etc/osm/lcm.cfg"):
if path.isfile(config_file):
break
else:
- print("No configuration file 'nbi.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
+ print("No configuration file 'lcm.cfg' found neither at local folder nor at /etc/osm/", file=sys.stderr)
exit(1)
lcm = Lcm(config_file)
- lcm.start()
+ if health_check:
+ lcm.health_check()
+ else:
+ lcm.start()
except (LcmException, getopt.GetoptError) as e:
print(str(e), file=sys.stderr)
# usage()