06393e5f7d3aa3a463327796a4648e2d20981306
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 import msgkafka
12 from dbbase import DbException
13 from fsbase import FsException
14 from msgbase import MsgException
15 from os import environ
16 import logging
17
18 #streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s"
19 streamformat = "%(name)s %(levelname)s: %(message)s"
20 logging.basicConfig(format=streamformat, level=logging.DEBUG)
21
22
23 class LcmException(Exception):
24 pass
25
26
27 class Lcm:
28
29 def __init__(self, config_file):
30 """
31 Init, Connect to database, filesystem storage, and messaging
32 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
33 :return: None
34 """
35 # contains created tasks/futures to be able to cancel
36 self.lcm_tasks = {}
37 # logging
38 self.logger = logging.getLogger('lcm')
39 # load configuration
40 config = self.read_config_file(config_file)
41 self.config = config
42 self.config = config
43 self.ro_url = "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"])
44 self.ro_tenant = config["RO"]["tenant"]
45 self.vca = config["VCA"] # TODO VCA
46 self.loop = None
47 try:
48 if config["database"]["driver"] == "mongo":
49 self.db = dbmongo.dbmongo()
50 self.db.db_connect(config["database"])
51 elif config["database"]["driver"] == "memory":
52 self.db = dbmemory.dbmemory()
53 self.db.db_connect(config["database"])
54 else:
55 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
56 config["database"]["driver"]))
57
58 if config["storage"]["driver"] == "local":
59 self.fs = fslocal.FsLocal()
60 self.fs.fs_connect(config["storage"])
61 else:
62 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
63 config["storage"]["driver"]))
64
65 if config["message"]["driver"] == "local":
66 self.msg = msglocal.msgLocal()
67 self.msg.connect(config["message"])
68 elif config["message"]["driver"] == "kafka":
69 self.msg = msgkafka.MsgKafka()
70 self.msg.connect(config["message"])
71 else:
72 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
73 config["storage"]["driver"]))
74 except (DbException, FsException, MsgException) as e:
75 self.logger.critical(str(e), exc_info=True)
76 raise LcmException(str(e))
77
78 # def update_nsr_db(self, nsr_id, nsr_desc):
79 # self.db.replace("nsrs", nsr_id, nsr_desc)
80
81 async def create_ns(self, nsr_id):
82 self.logger.debug("create_ns task nsr_id={} Enter".format(nsr_id))
83 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
84 nsr_lcm = {
85 "id": nsr_id,
86 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
87 "nsr_ip": {},
88 "VCA": {}, # "TODO"
89 }
90 db_nsr["_admin"]["deploy"] = nsr_lcm
91 db_nsr["detailed-status"] = "creating"
92 db_nsr["operational-status"] = "init"
93
94 deloyment_timeout = 120
95 try:
96 nsd = db_nsr["nsd"]
97 RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant,
98 datacenter=db_nsr["datacenter"])
99
100 # get vnfds, instantiate at RO
101 for c_vnf in nsd["constituent-vnfd"]:
102 vnfd_id = c_vnf["vnfd-id-ref"]
103 self.logger.debug("create_ns task nsr_id={} RO vnfd={} creating".format(nsr_id, vnfd_id))
104 db_nsr["detailed-status"] = "Creating vnfd {} at RO".format(vnfd_id)
105 vnfd = self.db.get_one("vnfds", {"id": vnfd_id})
106 vnfd.pop("_admin", None)
107 vnfd.pop("_id", None)
108
109 # look if present
110 vnfd_list = await RO.get_list("vnfd", filter_by={"osm_id": vnfd_id})
111 if vnfd_list:
112 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = vnfd_list[0]["uuid"]
113 self.logger.debug("create_ns task nsr_id={} RO vnfd={} exist. Using RO_id={}".format(
114 nsr_id, vnfd_id, vnfd_list[0]["uuid"]))
115 else:
116 desc = await RO.create("vnfd", descriptor=vnfd)
117 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
118 self.db.replace("nsrs", nsr_id, db_nsr)
119
120 # db_new("vnfr", vnfr)
121 # db_update("ns_request", nsr_id, ns_request)
122
123 # create nsd at RO
124 nsd_id = db_nsr["nsd"]["id"]
125 self.logger.debug("create_ns task nsr_id={} RO nsd={} creating".format(nsr_id, nsd_id))
126 db_nsr["detailed-status"] = "Creating nsd {} at RO".format(nsd_id)
127 nsd = self.db.get_one("nsds", {"id": nsd_id})
128 nsd.pop("_admin", None)
129 nsd.pop("_id", None)
130
131 nsd_list = await RO.get_list("nsd", filter_by={"osm_id": nsd_id})
132 if nsd_list:
133 nsr_lcm["RO"]["nsd_id"] = nsd_list[0]["uuid"]
134 self.logger.debug("create_ns task nsr_id={} RO nsd={} exist. Using RO_id={}".format(
135 nsr_id, nsd_id, nsd_list[0]["uuid"]))
136 else:
137 desc = await RO.create("nsd", descriptor=nsd)
138 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
139 self.db.replace("nsrs", nsr_id, db_nsr)
140
141 # Crate ns at RO
142 self.logger.debug("create_ns task nsr_id={} RO ns creating".format(nsr_id))
143 db_nsr["detailed-status"] = "Creating ns at RO"
144 desc = await RO.create("ns", name=db_nsr["name"], datacenter=db_nsr["datacenter"],
145 scenario=nsr_lcm["RO"]["nsd_id"])
146 RO_nsr_id = desc["uuid"]
147 nsr_lcm["RO"]["nsr_id"] = RO_nsr_id
148 nsr_lcm["RO"]["nsr_status"] = "BUILD"
149 self.db.replace("nsrs", nsr_id, db_nsr)
150
151 # wait until NS is ready
152 self.logger.debug("create_ns task nsr_id={} RO ns_id={} waiting to be ready".format(nsr_id, RO_nsr_id))
153 deloyment_timeout = 600
154 while deloyment_timeout > 0:
155 ns_status_detailed = "Waiting ns ready at RO"
156 db_nsr["detailed-status"] = ns_status_detailed
157 desc = await RO.show("ns", RO_nsr_id)
158 ns_status, ns_status_info = RO.check_ns_status(desc)
159 nsr_lcm["RO"]["nsr_status"] = ns_status
160 if ns_status == "ERROR":
161 raise ROclient.ROClientException(ns_status_info)
162 elif ns_status == "BUILD":
163 db_nsr["detailed-status"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info)
164 elif ns_status == "ACTIVE":
165 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
166 break
167 else:
168 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
169
170 await asyncio.sleep(5, loop=self.loop)
171 deloyment_timeout -= 5
172 if deloyment_timeout <= 0:
173 raise ROclient.ROClientException("Timeot wating ns to be ready")
174 db_nsr["detailed-status"] = "Configuring vnfr"
175 self.db.replace("nsrs", nsr_id, db_nsr)
176
177 #for nsd in nsr_lcm["descriptors"]["nsd"]:
178
179 self.logger.debug("create_ns task nsr_id={} VCA look for".format(nsr_id))
180 for c_vnf in nsd["constituent-vnfd"]:
181 vnfd_id = c_vnf["vnfd-id-ref"]
182 vnfd_index = str(c_vnf["member-vnf-index"])
183 vnfd = self.db.get_one("vnfds", {"id": vnfd_id})
184 db_nsr["config-status"] = "config_not_needed"
185 if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"):
186 db_nsr["config-status"] = "configuring"
187 proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"]
188 config_primitive = vnfd["vnf-configuration"].get("config-primitive")
189 # get parameters for juju charm
190 base_folder = vnfd["_admin"]["storage"]
191 path = "{}{}/{}/charms".format(base_folder["path"], base_folder["folder"], base_folder["file"],
192 proxy_charm)
193 mgmt_ip = nsr_lcm['nsr_ip'][vnfd_index]
194 # TODO launch VCA charm
195 # task = asyncio.ensure_future(DeployCharm(self.loop, path, mgmt_ip, config_primitive))
196 db_nsr["detailed-status"] = "Done"
197 db_nsr["operational-status"] = "running"
198 self.db.replace("nsrs", nsr_id, db_nsr)
199
200 self.logger.debug("create_ns task nsr_id={} Exit Ok".format(nsr_id))
201 return nsr_lcm
202
203 except (ROclient.ROClientException, Exception) as e:
204 db_nsr["operational-status"] = "failed"
205 db_nsr["detailed-status"] += ": ERROR {}".format(e)
206 self.db.replace("nsrs", nsr_id, db_nsr)
207 self.logger.debug(
208 "create_ns nsr_id={} Exit Exception on '{}': {}".format(nsr_id, db_nsr["detailed-status"], e),
209 exc_info=True)
210
211 async def delete_ns(self, nsr_id):
212 self.logger.debug("delete_ns task nsr_id={}, Delete_ns task nsr_id={} Enter".format(nsr_id, nsr_id))
213 db_nsr = self.db.get_one("nsrs", {"_id": nsr_id})
214 nsr_lcm = db_nsr["_admin"]["deploy"]
215
216 db_nsr["operational-status"] = "terminate"
217 db_nsr["config-status"] = "terminate"
218 db_nsr["detailed-status"] = "Deleting charms"
219 self.db.replace("nsrs", nsr_id, db_nsr)
220 # TODO destroy VCA charm
221
222 # remove from RO
223 RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant,
224 datacenter=db_nsr["datacenter"])
225 # Delete ns
226 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
227 if RO_nsr_id:
228 try:
229 db_nsr["detailed-status"] = "Deleting ns at RO"
230 desc = await RO.delete("ns", RO_nsr_id)
231 self.logger.debug("delete_ns task nsr_id={} RO ns={} deleted".format(nsr_id, RO_nsr_id))
232 nsr_lcm["RO"]["nsr_id"] = None
233 nsr_lcm["RO"]["nsr_status"] = "DELETED"
234 except ROclient.ROClientException as e:
235 if e.http_code == 404: # not found
236 nsr_lcm["RO"]["nsr_id"] = None
237 nsr_lcm["RO"]["nsr_status"] = "DELETED"
238 self.logger.debug("delete_ns task nsr_id={} RO ns={} already deleted".format(nsr_id, RO_nsr_id))
239 elif e.http_code == 409: #conflict
240 self.logger.debug("delete_ns task nsr_id={} RO ns={} delete conflict: {}".format(nsr_id, RO_nsr_id,
241 e))
242 else:
243 self.logger.error("delete_ns task nsr_id={} RO ns={} delete error: {}".format(nsr_id, RO_nsr_id, e))
244 self.db.replace("nsrs", nsr_id, db_nsr)
245
246 # Delete nsd
247 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
248 if RO_nsd_id:
249 try:
250 db_nsr["detailed-status"] = "Deleting nsd at RO"
251 desc = await RO.delete("nsd", RO_nsd_id)
252 self.logger.debug("delete_ns task nsr_id={} RO nsd={} deleted".format(nsr_id, RO_nsd_id))
253 nsr_lcm["RO"]["nsd_id"] = None
254 except ROclient.ROClientException as e:
255 if e.http_code == 404: # not found
256 nsr_lcm["RO"]["nsd_id"] = None
257 self.logger.debug("delete_ns task nsr_id={} RO nsd={} already deleted".format(nsr_id, RO_nsd_id))
258 elif e.http_code == 409: #conflict
259 self.logger.debug("delete_ns task nsr_id={} RO nsd={} delete conflict: {}".format(nsr_id, RO_nsd_id,
260 e))
261 else:
262 self.logger.error("delete_ns task nsr_id={} RO nsd={} delete error: {}".format(nsr_id, RO_nsd_id,
263 e))
264 self.db.replace("nsrs", nsr_id, db_nsr)
265
266 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
267 if not RO_vnfd_id:
268 continue
269 try:
270 db_nsr["detailed-status"] = "Deleting vnfd {} at RO".format(vnf_id)
271 desc = await RO.delete("vnfd", RO_vnfd_id)
272 self.logger.debug("delete_ns task nsr_id={} RO vnfd={} deleted".format(nsr_id, RO_vnfd_id))
273 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
274 except ROclient.ROClientException as e:
275 if e.http_code == 404: # not found
276 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
277 self.logger.debug("delete_ns task nsr_id={} RO vnfd={} already deleted ".format(nsr_id, RO_vnfd_id))
278 elif e.http_code == 409: #conflict
279 self.logger.debug("delete_ns task nsr_id={} RO vnfd={} delete conflict: {}".format(
280 nsr_id, RO_vnfd_id, e))
281 else:
282 self.logger.error("delete_ns task nsr_id={} RO vnfd={} delete error: {}".format(
283 nsr_id, RO_vnfd_id, e))
284 self.db.replace("nsrs", nsr_id, db_nsr)
285
286
287 # TODO delete from database or mark as deleted???
288 db_nsr["operational-status"] = "terminated"
289 self.db.del_one("nsrs", {"_id": nsr_id})
290 self.logger.debug("delete_ns task nsr_id={} Exit".format(nsr_id))
291
292 async def test(self, param=None):
293 self.logger.debug("Starting/Ending test task: {}".format(param))
294
295 def cancel_tasks(self, nsr_id):
296 """
297 Cancel all active tasks of a concrete nsr identified for nsr_id
298 :param nsr_id: nsr identity
299 :return: None, or raises an exception if not possible
300 """
301 if not self.lcm_tasks.get(nsr_id):
302 return
303 for order_id, tasks_set in self.lcm_tasks[nsr_id].items():
304 for task_name, task in tasks_set.items():
305 result = task.cancel()
306 if result:
307 self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name))
308 self.lcm_tasks[nsr_id] = {}
309
310 async def read_kafka(self):
311 self.logger.debug("kafka task Enter")
312 order_id = 1
313 # future = asyncio.Future()
314
315 while True:
316 command, params = await self.msg.aioread("ns", self.loop)
317 order_id += 1
318 if command == "exit":
319 print("Bye!")
320 break
321 elif command.startswith("#"):
322 continue
323 elif command == "echo":
324 print(params)
325 elif command == "test":
326 asyncio.Task(self.test(params), loop=self.loop)
327 elif command == "break":
328 print("put a break in this line of code")
329 elif command == "create":
330 nsr_id = params.strip()
331 self.logger.debug("Deploying NS {}".format(nsr_id))
332 task = asyncio.ensure_future(self.create_ns(nsr_id))
333 if nsr_id not in self.lcm_tasks:
334 self.lcm_tasks[nsr_id] = {}
335 self.lcm_tasks[nsr_id][order_id] = {"create_ns": task}
336 elif command == "delete":
337 nsr_id = params.strip()
338 self.logger.debug("Deleting NS {}".format(nsr_id))
339 self.cancel_tasks(nsr_id)
340 task = asyncio.ensure_future(self.delete_ns(nsr_id))
341 if nsr_id not in self.lcm_tasks:
342 self.lcm_tasks[nsr_id] = {}
343 self.lcm_tasks[nsr_id][order_id] = {"delete_ns": task}
344 elif command == "show":
345 nsr_id = params.strip()
346 nsr_lcm = self.db.get_one("nsr_lcm", {"id": nsr_id})
347 print("nsr_lcm", nsr_lcm)
348 print("self.lcm_tasks", self.lcm_tasks.get(nsr_id))
349 else:
350 self.logger.debug("unknown command '{}'".format(command))
351 print("Usage:\n echo: <>\n create: <ns1|ns2>\n delete: <ns1|ns2>\n show: <ns1|ns2>")
352 self.logger.debug("kafka task Exit")
353
354
355 def start(self):
356 self.loop = asyncio.get_event_loop()
357 self.loop.run_until_complete(self.read_kafka())
358 self.loop.close()
359 self.loop = None
360
361
362 def read_config_file(self, config_file):
363 # TODO make a [ini] + yaml inside parser
364 # the configparser library is not suitable, because it does not admit comments at the end of line,
365 # and not parse integer or boolean
366 try:
367 with open(config_file) as f:
368 conf = yaml.load(f)
369 for k, v in environ.items():
370 if not k.startswith("OSMLCM_"):
371 continue
372 k_items = k.lower().split("_")
373 c = conf
374 try:
375 for k_item in k_items[1:-1]:
376 if k_item in ("ro", "vca"):
377 # put in capital letter
378 k_item = k_item.upper()
379 c = c[k_item]
380 if k_items[-1] == "port":
381 c[k_items[-1]] = int(v)
382 else:
383 c[k_items[-1]] = v
384 except Exception as e:
385 self.logger.warn("skipping environ '{}' on exception '{}'".format(k, e))
386
387 return conf
388 except Exception as e:
389 self.logger.critical("At config file '{}': {}".format(config_file, e))
390
391
392
393 if __name__ == '__main__':
394
395 config_file = "lcm.cfg"
396 lcm = Lcm(config_file)
397
398 # # FOR TEST
399 # RO_VIM = "OST2_MRT"
400 #
401 # #FILL DATABASE
402 # with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f:
403 # vnfd = yaml.load(f)
404 # vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd)
405 # vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"}
406 # lcm.db.create("vnfd", vnfd_clean)
407 # with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f:
408 # vnfd = yaml.load(f)
409 # vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd)
410 # vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"}
411 # lcm.db.create("vnfd", vnfd_clean)
412 # with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f:
413 # nsd = yaml.load(f)
414 # nsd_clean, _ = ROclient.remove_envelop("nsd", nsd)
415 # nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"}
416 # lcm.db.create("nsd", nsd_clean)
417 #
418 # ns_request = {
419 # "id": "ns1",
420 # "nsr_id": "ns1",
421 # "name": "pingpongOne",
422 # "vim": RO_VIM,
423 # "nsd_id": nsd_clean["id"], # nsd_ping_pong
424 # }
425 # lcm.db.create("ns_request", ns_request)
426 # ns_request = {
427 # "id": "ns2",
428 # "nsr_id": "ns2",
429 # "name": "pingpongTwo",
430 # "vim": RO_VIM,
431 # "nsd_id": nsd_clean["id"], # nsd_ping_pong
432 # }
433 # lcm.db.create("ns_request", ns_request)
434
435 lcm.start()
436
437
438