9bff8c196ee75b8cd5ed476b095c65cefbfcc9b1
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 from dbbase import DbException
13 from fsbase import FsException
14 from msgbase import MsgException
15 from os import environ
16 import logging
17 from vca import DeployApplication, RemoveApplication
18
19 #streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s"
20 streamformat = "%(name)s %(levelname)s: %(message)s"
21 logging.basicConfig(format=streamformat, level=logging.DEBUG)
22
23
24 class LcmException(Exception):
25 pass
26
27
28 class Lcm:
29
30 def __init__(self, config_file):
31 """
32 Init, Connect to database, filesystem storage, and messaging
33 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
34 :return: None
35 """
36 # contains created tasks/futures to be able to cancel
37 self.lcm_tasks = {}
38 # logging
39 self.logger = logging.getLogger('lcm')
40 # load configuration
41 config = self.read_config_file(config_file)
42 self.config = config
43 self.ro_url = "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"])
44 self.ro_tenant = config["RO"]["tenant"]
45 self.vca = config["VCA"] # TODO VCA
46 self.loop = None
47 try:
48 if config["database"]["driver"] == "mongo":
49 self.db = dbmongo.dbmongo()
50 self.db.db_connect(config["database"])
51 elif config["database"]["driver"] == "memory":
52 self.db = dbmemory.dbmemory()
53 self.db.db_connect(config["database"])
54 else:
55 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
56 config["database"]["driver"]))
57
58 if config["storage"]["driver"] == "local":
59 self.fs = fslocal.FsLocal()
60 self.fs.fs_connect(config["storage"])
61 else:
62 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
63 config["storage"]["driver"]))
64
65 if config["message"]["driver"] == "local":
66 self.msg = msglocal.msgLocal()
67 self.msg.connect(config["message"])
68 elif config["message"]["driver"] == "kafka":
69 self.msg = msgkafka.MsgKafka()
70 self.msg.connect(config["message"])
71 else:
72 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
73 config["storage"]["driver"]))
74 except (DbException, FsException, MsgException) as e:
75 self.logger.critical(str(e), exc_info=True)
76 raise LcmException(str(e))
77
78 # def update_nsr_db(self, nsr_id, nsr_desc):
79 # self.db.replace("nsrs", nsr_id, nsr_desc)
80
81 async def create_ns(self, nsr_id):
82 self.logger.debug("create_ns task nsr_id={} Enter".format(nsr_id))
83 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
84 nsr_lcm = {
85 "id": nsr_id,
86 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
87 "nsr_ip": {},
88 "VCA": {}, # "TODO"
89 }
90 db_nsr["_admin"]["deploy"] = nsr_lcm
91 db_nsr["detailed-status"] = "creating"
92 db_nsr["operational-status"] = "init"
93
94 deloyment_timeout = 120
95 try:
96 nsd = db_nsr["nsd"]
97 RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant,
98 datacenter=db_nsr["datacenter"])
99
100 # get vnfds, instantiate at RO
101 for c_vnf in nsd["constituent-vnfd"]:
102 vnfd_id = c_vnf["vnfd-id-ref"]
103 self.logger.debug("create_ns task nsr_id={} RO vnfd={} creating".format(nsr_id, vnfd_id))
104 db_nsr["detailed-status"] = "Creating vnfd {} at RO".format(vnfd_id)
105 vnfd = self.db.get_one("vnfds", {"id": vnfd_id})
106 vnfd.pop("_admin", None)
107 vnfd.pop("_id", None)
108
109 # look if present
110 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id})
111 if vnfd_list:
112 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
113 self.logger.debug("create_ns task nsr_id={} RO vnfd={} exist. Using RO_id={}".format(
114 nsr_id, vnfd_id, vnfd_list[0]["uuid"]))
115 else:
116 desc = await RO.create("vnfd", descriptor=vnfd)
117 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
118 self.db.replace("nsrs", nsr_id, db_nsr)
119
120 # db_new("vnfr", vnfr)
121 # db_update("ns_request", nsr_id, ns_request)
122
123 # create nsd at RO
124 nsd_id = db_nsr["nsd"]["id"]
125 self.logger.debug("create_ns task nsr_id={} RO nsd={} creating".format(nsr_id, nsd_id))
126 db_nsr["detailed-status"] = "Creating nsd {} at RO".format(nsd_id)
127 nsd = self.db.get_one("nsds", {"id": nsd_id})
128 nsd.pop("_admin", None)
129 nsd.pop("_id", None)
130
131 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id})
132 if nsd_list:
133 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
134 self.logger.debug("create_ns task nsr_id={} RO nsd={} exist. Using RO_id={}".format(
135 nsr_id, nsd_id, nsd_list[0]["uuid"]))
136 else:
137 desc = await RO.create("nsd", descriptor=nsd)
138 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
139 self.db.replace("nsrs", nsr_id, db_nsr)
140
141 # Crate ns at RO
142 self.logger.debug("create_ns task nsr_id={} RO ns creating".format(nsr_id))
143 db_nsr["detailed-status"] = "Creating ns at RO"
144 desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"],
145 scenario=nsr_lcm["RO"]["nsd_id"])
146 RO_nsr_id = desc["uuid"]
147 nsr_lcm["RO"]["nsr_id"] = RO_nsr_id
148 nsr_lcm["RO"]["nsr_status"] = "BUILD"
149 self.db.replace("nsrs", nsr_id, db_nsr)
150
151 # wait until NS is ready
152 self.logger.debug("create_ns task nsr_id={} RO ns_id={} waiting to be ready".format(nsr_id, RO_nsr_id))
153 deloyment_timeout = 600
154 while deloyment_timeout > 0:
155 ns_status_detailed = "Waiting ns ready at RO"
156 db_nsr["detailed-status"] = ns_status_detailed
157 desc = await RO.show("ns", RO_nsr_id)
158 ns_status, ns_status_info = RO.check_ns_status(desc)
159 nsr_lcm["RO"]["nsr_status"] = ns_status
160 if ns_status == "ERROR":
161 raise ROclient.ROClientException(ns_status_info)
162 elif ns_status == "BUILD":
163 db_nsr["detailed-status"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info)
164 elif ns_status == "ACTIVE":
165 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
166 break
167 else:
168 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
169
170 await asyncio.sleep(5, loop=self.loop)
171 deloyment_timeout -= 5
172 if deloyment_timeout <= 0:
173 raise ROclient.ROClientException("Timeot wating ns to be ready")
174 db_nsr["detailed-status"] = "Configuring vnfr"
175 self.db.replace("nsrs", nsr_id, db_nsr)
176
177 #for nsd in nsr_lcm["descriptors"]["nsd"]:
178
179 self.logger.debug("create_ns task nsr_id={} VCA look for".format(nsr_id))
180 for c_vnf in nsd["constituent-vnfd"]:
181 vnfd_id = c_vnf["vnfd-id-ref"]
182 vnfd_index = str(c_vnf["member-vnf-index"])
183 vnfd = self.db.get_one("vnfds", {"id": vnfd_id})
184 db_nsr["config-status"] = "config_not_needed"
185 if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"):
186 db_nsr["config-status"] = "configuring"
187 proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"]
188
189 # Note: The charm needs to exist on disk at the location
190 # specified by charm_path.
191 base_folder = vnfd["_admin"]["storage"]
192 charm_path = "{}{}/{}/charms/{}".format(
193 base_folder["path"],
194 base_folder["folder"],
195 base_folder["file"],
196 proxy_charm
197 )
198 task = asyncio.ensure_future(
199 DeployApplication(
200 self.config['VCA'],
201 self.db,
202 db_nsr,
203 vnfd,
204 vnfd_index,
205 charm_path,
206 )
207 )
208
209 db_nsr["detailed-status"] = "Done"
210 db_nsr["operational-status"] = "running"
211 self.db.replace("nsrs", nsr_id, db_nsr)
212
213 self.logger.debug("create_ns task nsr_id={} Exit Ok".format(nsr_id))
214 return nsr_lcm
215
216 except (ROclient.ROClientException, Exception) as e:
217 db_nsr["operational-status"] = "failed"
218 db_nsr["detailed-status"] += ": ERROR {}".format(e)
219 self.db.replace("nsrs", nsr_id, db_nsr)
220 self.logger.debug(
221 "create_ns nsr_id={} Exit Exception on '{}': {}".format(nsr_id, db_nsr["detailed-status"], e),
222 exc_info=True)
223
224 async def delete_ns(self, nsr_id):
225 self.logger.debug("delete_ns task nsr_id={}, Delete_ns task nsr_id={} Enter".format(nsr_id, nsr_id))
226 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
227 nsr_lcm = db_nsr["_admin"]["deploy"]
228
229 db_nsr["operational-status"] = "terminate"
230 db_nsr["config-status"] = "terminate"
231 db_nsr["detailed-status"] = "Deleting charms"
232 self.db.replace("nsrs", nsr_id, db_nsr)
233
234 try:
235 self.logger.debug("Deleting charms")
236 nsd = db_nsr["nsd"]
237 for c_vnf in nsd["constituent-vnfd"]:
238 vnfd_id = c_vnf["vnfd-id-ref"]
239 vnfd_index = str(c_vnf["member-vnf-index"])
240 vnfd = self.db.get_one("vnfds", {"id": vnfd_id})
241 if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"):
242 asyncio.ensure_future(
243 RemoveApplication(
244 self.config['VCA'],
245 self.db,
246 db_nsr,
247 vnfd,
248 vnfd_index,
249 )
250 )
251 except Exception as e:
252 self.logger.debug("Failed while deleting charms: {}".format(e))
253 # remove from RO
254
255 RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant,
256 datacenter=db_nsr["datacenter"])
257 # Delete ns
258 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
259 if RO_nsr_id:
260 try:
261 db_nsr["detailed-status"] = "Deleting ns at RO"
262 desc = await RO.delete("ns", RO_nsr_id)
263 self.logger.debug("delete_ns task nsr_id={} RO ns={} deleted".format(nsr_id, RO_nsr_id))
264 nsr_lcm["RO"]["nsr_id"] = None
265 nsr_lcm["RO"]["nsr_status"] = "DELETED"
266 except ROclient.ROClientException as e:
267 if e.http_code == 404: # not found
268 nsr_lcm["RO"]["nsr_id"] = None
269 nsr_lcm["RO"]["nsr_status"] = "DELETED"
270 self.logger.debug("delete_ns task nsr_id={} RO ns={} already deleted".format(nsr_id, RO_nsr_id))
271 elif e.http_code == 409: #conflict
272 self.logger.debug("delete_ns task nsr_id={} RO ns={} delete conflict: {}".format(nsr_id, RO_nsr_id,
273 e))
274 else:
275 self.logger.error("delete_ns task nsr_id={} RO ns={} delete error: {}".format(nsr_id, RO_nsr_id, e))
276 self.db.replace("nsrs", nsr_id, db_nsr)
277
278 # Delete nsd
279 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
280 if RO_nsd_id:
281 try:
282 db_nsr["detailed-status"] = "Deleting nsd at RO"
283 desc = await RO.delete("nsd", RO_nsd_id)
284 self.logger.debug("delete_ns task nsr_id={} RO nsd={} deleted".format(nsr_id, RO_nsd_id))
285 nsr_lcm["RO"]["nsd_id"] = None
286 except ROclient.ROClientException as e:
287 if e.http_code == 404: # not found
288 nsr_lcm["RO"]["nsd_id"] = None
289 self.logger.debug("delete_ns task nsr_id={} RO nsd={} already deleted".format(nsr_id, RO_nsd_id))
290 elif e.http_code == 409: #conflict
291 self.logger.debug("delete_ns task nsr_id={} RO nsd={} delete conflict: {}".format(nsr_id, RO_nsd_id,
292 e))
293 else:
294 self.logger.error("delete_ns task nsr_id={} RO nsd={} delete error: {}".format(nsr_id, RO_nsd_id,
295 e))
296 self.db.replace("nsrs", nsr_id, db_nsr)
297
298 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
299 if not RO_vnfd_id:
300 continue
301 try:
302 db_nsr["detailed-status"] = "Deleting vnfd {} at RO".format(vnf_id)
303 desc = await RO.delete("vnfd", RO_vnfd_id)
304 self.logger.debug("delete_ns task nsr_id={} RO vnfd={} deleted".format(nsr_id, RO_vnfd_id))
305 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
306 except ROclient.ROClientException as e:
307 if e.http_code == 404: # not found
308 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
309 self.logger.debug("delete_ns task nsr_id={} RO vnfd={} already deleted ".format(nsr_id, RO_vnfd_id))
310 elif e.http_code == 409: #conflict
311 self.logger.debug("delete_ns task nsr_id={} RO vnfd={} delete conflict: {}".format(
312 nsr_id, RO_vnfd_id, e))
313 else:
314 self.logger.error("delete_ns task nsr_id={} RO vnfd={} delete error: {}".format(
315 nsr_id, RO_vnfd_id, e))
316 self.db.replace("nsrs", nsr_id, db_nsr)
317
318 # TODO delete from database or mark as deleted???
319 db_nsr["operational-status"] = "terminated"
320 self.db.del_one("nsrs", {"_id": nsr_id})
321 self.logger.debug("delete_ns task nsr_id={} Exit".format(nsr_id))
322
323 async def test(self, param=None):
324 self.logger.debug("Starting/Ending test task: {}".format(param))
325
326 def cancel_tasks(self, nsr_id):
327 """
328 Cancel all active tasks of a concrete nsr identified for nsr_id
329 :param nsr_id: nsr identity
330 :return: None, or raises an exception if not possible
331 """
332 if not self.lcm_tasks.get(nsr_id):
333 return
334 for order_id, tasks_set in self.lcm_tasks[nsr_id].items():
335 for task_name, task in tasks_set.items():
336 result = task.cancel()
337 if result:
338 self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name))
339 self.lcm_tasks[nsr_id] = {}
340
341 async def read_kafka(self):
342 self.logger.debug("kafka task Enter")
343 order_id = 1
344 # future = asyncio.Future()
345
346 while True:
347 command, params = await self.msg.aioread("ns", self.loop)
348 order_id += 1
349 if command == "exit":
350 print("Bye!")
351 break
352 elif command.startswith("#"):
353 continue
354 elif command == "echo":
355 print(params)
356 elif command == "test":
357 asyncio.Task(self.test(params), loop=self.loop)
358 elif command == "break":
359 print("put a break in this line of code")
360 elif command == "create":
361 nsr_id = params.strip()
362 self.logger.debug("Deploying NS {}".format(nsr_id))
363 task = asyncio.ensure_future(self.create_ns(nsr_id))
364 if nsr_id not in self.lcm_tasks:
365 self.lcm_tasks[nsr_id] = {}
366 self.lcm_tasks[nsr_id][order_id] = {"create_ns": task}
367 elif command == "delete":
368 nsr_id = params.strip()
369 self.logger.debug("Deleting NS {}".format(nsr_id))
370 self.cancel_tasks(nsr_id)
371 task = asyncio.ensure_future(self.delete_ns(nsr_id))
372 if nsr_id not in self.lcm_tasks:
373 self.lcm_tasks[nsr_id] = {}
374 self.lcm_tasks[nsr_id][order_id] = {"delete_ns": task}
375 elif command == "show":
376 nsr_id = params.strip()
377 nsr_lcm = self.db.get_one("nsr_lcm", {"id": nsr_id})
378 print("nsr_lcm", nsr_lcm)
379 print("self.lcm_tasks", self.lcm_tasks.get(nsr_id))
380 else:
381 self.logger.debug("unknown command '{}'".format(command))
382 print("Usage:\n echo: <>\n create: <ns1|ns2>\n delete: <ns1|ns2>\n show: <ns1|ns2>")
383 self.logger.debug("kafka task Exit")
384
385
386 def start(self):
387 self.loop = asyncio.get_event_loop()
388 self.loop.run_until_complete(self.read_kafka())
389 self.loop.close()
390 self.loop = None
391
392
393 def read_config_file(self, config_file):
394 # TODO make a [ini] + yaml inside parser
395 # the configparser library is not suitable, because it does not admit comments at the end of line,
396 # and not parse integer or boolean
397 try:
398 with open(config_file) as f:
399 conf = yaml.load(f)
400 for k, v in environ.items():
401 if not k.startswith("OSMLCM_"):
402 continue
403 k_items = k.lower().split("_")
404 c = conf
405 try:
406 for k_item in k_items[1:-1]:
407 if k_item in ("ro", "vca"):
408 # put in capital letter
409 k_item = k_item.upper()
410 c = c[k_item]
411 if k_items[-1] == "port":
412 c[k_items[-1]] = int(v)
413 else:
414 c[k_items[-1]] = v
415 except Exception as e:
416 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
417
418 return conf
419 except Exception as e:
420 self.logger.critical("At config file '{}': {}".format(config_file, e))
421
422
423 if __name__ == '__main__':
424
425 config_file = "lcm.cfg"
426 lcm = Lcm(config_file)
427
428 # # FOR TEST
429 # RO_VIM = "OST2_MRT"
430 #
431 # #FILL DATABASE
432 # with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f:
433 # vnfd = yaml.load(f)
434 # vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd)
435 # vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"}
436 # lcm.db.create("vnfd", vnfd_clean)
437 # with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f:
438 # vnfd = yaml.load(f)
439 # vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd)
440 # vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"}
441 # lcm.db.create("vnfd", vnfd_clean)
442 # with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f:
443 # nsd = yaml.load(f)
444 # nsd_clean, _ = ROclient.remove_envelop("nsd", nsd)
445 # nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"}
446 # lcm.db.create("nsd", nsd_clean)
447 #
448 # ns_request = {
449 # "id": "ns1",
450 # "nsr_id": "ns1",
451 # "name": "pingpongOne",
452 # "vim": RO_VIM,
453 # "nsd_id": nsd_clean["id"], # nsd_ping_pong
454 # }
455 # lcm.db.create("ns_request", ns_request)
456 # ns_request = {
457 # "id": "ns2",
458 # "nsr_id": "ns2",
459 # "name": "pingpongTwo",
460 # "vim": RO_VIM,
461 # "nsd_id": nsd_clean["id"], # nsd_ping_pong
462 # }
463 # lcm.db.create("ns_request", ns_request)
464
465 lcm.start()