f35ec60631b009657ed8435dcee1508082a8bc6a
[osm/RO.git] / lcm / osm_lcm / lcm.py
1 #!/usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 import asyncio
5 import yaml
6 import ROclient
7 import dbmemory
8 import dbmongo
9 import fslocal
10 import msglocal
11 from dbbase import DbException
12 from fsbase import FsException
13 from msgbase import MsgException
14 import logging
15
16 #streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s"
17 streamformat = "%(name)s %(levelname)s: %(message)s"
18 logging.basicConfig(format=streamformat, level=logging.DEBUG)
19
20
21 class LcmException(Exception):
22 pass
23
24
25 class Lcm:
26
27 def __init__(self, config):
28 """
29 Init, Connect to database, filesystem storage, and messaging
30 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
31 :return: None
32 """
33 # contains created tasks/futures to be able to cancel
34 self.lcm_tasks = {}
35
36 self.config = config
37 # logging
38 self.logger = logging.getLogger('lcm')
39 self.config = config
40 self.ro_url = "http://{}:{}/openmano".format(config["RO"]["host"], config["RO"]["port"])
41 self.ro_tenant = config["RO"]["tenant"]
42 self.vca = config["VCA"] # TODO VCA
43 self.loop = None
44 try:
45 if config["database"]["driver"] == "mongo":
46 self.db = dbmongo.dbmongo()
47 self.db.db_connect(config["database"])
48 elif config["database"]["driver"] == "memory":
49 self.db = dbmemory.dbmemory()
50 self.db.db_connect(config["database"])
51 else:
52 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
53 config["database"]["driver"]))
54
55 if config["storage"]["driver"] == "local":
56 self.fs = fslocal.FsLocal()
57 self.fs.fs_connect(config["storage"])
58 else:
59 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
60 config["storage"]["driver"]))
61
62 if config["message"]["driver"] == "local":
63 self.msg = msglocal.msgLocal()
64 self.msg.connect(config["message"])
65 else:
66 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
67 config["storage"]["driver"]))
68 except (DbException, FsException, MsgException) as e:
69 self.self.logger.critical(str(e), exc_info=True)
70 raise LcmException(str(e))
71
72 async def create_ns(self, nsr_id):
73 self.logger.debug("create_ns task nsr_id={} Enter".format(nsr_id))
74 nsr_lcm = {
75 "id": nsr_id,
76 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
77 "nsr_ip": {},
78 "VCA": {"TODO"},
79 "status": "BUILD",
80 "status_detailed": "",
81 }
82
83 deloyment_timeout = 120
84 try:
85 ns_request = self.db.get_one("ns_request", {"id": nsr_id})
86 nsd = self.db.get_one("nsd", {"id": ns_request["nsd_id"]})
87 RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant,
88 datacenter=ns_request["vim"])
89 nsr_lcm["status_detailed"] = "Creating vnfd at RO"
90 # ns_request["constituent-vnfr-ref"] = []
91
92 self.db.create("nsr_lcm", nsr_lcm)
93
94 # get vnfds, instantiate at RO
95 self.logger.debug("create_ns task nsr_id={} RO VNFD".format(nsr_id))
96 for c_vnf in nsd["constituent-vnfd"]:
97 vnfd_id = c_vnf["vnfd-id-ref"]
98 vnfd = self.db.get_one("vnfd", {"id": vnfd_id})
99 vnfd.pop("_admin", None)
100 vnfd.pop("_id", None)
101 # vnfr = deepcopy(vnfd)
102 # vnfr["member-vnf-index"] = c_vnf["member-vnf-index"]
103 # vnfr["nsr-id"] = nsr_id
104 # vnfr["id"] = uuid4()
105 # vnfr["vnf-id"] = vnfd["id"]
106 # ns_request["constituent-vnfr-ref"],append(vnfd_id)
107
108 # TODO change id for RO in case it is present
109 try:
110 desc = await RO.create("vnfd", descriptor=vnfd)
111 nsr_lcm["RO"]["vnfd_id"][vnfd_id] = desc["uuid"]
112 self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm)
113 except ROclient.ROClientException as e:
114 if e.http_code == 409: # conflict, vnfd already present
115 print("debug", e)
116 else:
117 raise
118
119 # db_new("vnfr", vnfr)
120 # db_update("ns_request", nsr_id, ns_request)
121
122 # create nsd at RO
123 self.logger.debug("create_ns task nsr_id={} RO NSD".format(nsr_id))
124 nsr_lcm["status_detailed"] = "Creating nsd at RO"
125 nsd_id = ns_request["nsd_id"]
126 nsd = self.db.get_one("nsd", {"id": nsd_id})
127 nsd.pop("_admin", None)
128 nsd.pop("_id", None)
129 try:
130 desc = await RO.create("nsd", descriptor=nsd)
131 nsr_lcm["RO"]["nsd_id"] = desc["uuid"]
132 self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm)
133 except ROclient.ROClientException as e:
134 if e.http_code == 409: # conflict, nsd already present
135 print("debug", e)
136 else:
137 raise
138
139 # Crate ns at RO
140 self.logger.debug("create_ns task nsr_id={} RO NS".format(nsr_id))
141 nsr_lcm["status_detailed"] = "Creating ns at RO"
142 desc = await RO.create("ns", name=ns_request["name"], datacenter=ns_request["vim"], scenario=nsr_lcm["RO"]["nsd_id"])
143 RO_nsr_id = desc["uuid"]
144 nsr_lcm["RO"]["nsr_id"] = RO_nsr_id
145 nsr_lcm["RO"]["nsr_status"] = "BUILD"
146 self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm)
147
148 # wait until NS is ready
149 deloyment_timeout = 600
150 while deloyment_timeout > 0:
151 ns_status_detailed = "Waiting ns ready at RO"
152 nsr_lcm["status_detailed"] = ns_status_detailed
153 desc = await RO.show("ns", RO_nsr_id)
154 ns_status, ns_status_info = RO.check_ns_status(desc)
155 nsr_lcm["RO"]["nsr_status"] = ns_status
156 if ns_status == "ERROR":
157 raise ROclient.ROClientException(ns_status_info)
158 elif ns_status == "BUILD":
159 nsr_lcm["status_detailed"] = ns_status_detailed + "; nsr_id: '{}', {}".format(nsr_id, ns_status_info)
160 elif ns_status == "ACTIVE":
161 nsr_lcm["nsr_ip"] = RO.get_ns_vnf_ip(desc)
162 break
163 else:
164 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status)
165
166 await asyncio.sleep(5, loop=self.loop)
167 deloyment_timeout -= 5
168 if deloyment_timeout <= 0:
169 raise ROclient.ROClientException("Timeot wating ns to be ready")
170 nsr_lcm["status_detailed"] = "Configuring vnfr"
171 self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm)
172
173 #for nsd in nsr_lcm["descriptors"]["nsd"]:
174
175 self.logger.debug("create_ns task nsr_id={} VCA look for".format(nsr_id))
176 for c_vnf in nsd["constituent-vnfd"]:
177 vnfd_id = c_vnf["vnfd-id-ref"]
178 vnfd_index = int(c_vnf["member-vnf-index"])
179 vnfd = self.db.get_one("vnfd", {"id": vnfd_id})
180 if vnfd.get("vnf-configuration") and vnfd["vnf-configuration"].get("juju"):
181 proxy_charm = vnfd["vnf-configuration"]["juju"]["charm"]
182 config_primitive = vnfd["vnf-configuration"].get("config-primitive")
183 # get parameters for juju charm
184 base_folder = vnfd["_admin"]["storage"]
185 path = base_folder + "/charms/" + proxy_charm
186 mgmt_ip = nsr_lcm['nsr_ip'][vnfd_index]
187 # TODO launch VCA charm
188 # task = asyncio.ensure_future(DeployCharm(self.loop, path, mgmt_ip, config_primitive))
189 nsr_lcm["status"] = "DONE"
190 self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm)
191
192 return nsr_lcm
193
194 except (ROclient.ROClientException, Exception) as e:
195 self.logger.debug("create_ns nsr_id={} Exception {}".format(nsr_id, e), exc_info=True)
196 nsr_lcm["status"] = "ERROR"
197 nsr_lcm["status_detailed"] += ": ERROR {}".format(e)
198 finally:
199 self.logger.debug("create_ns task nsr_id={} Exit".format(nsr_id))
200
201
202 async def delete_ns(self, nsr_id):
203 self.logger.debug("delete_ns task nsr_id={} Enter".format(nsr_id))
204 nsr_lcm = self.db.get_one("nsr_lcm", {"id": nsr_id})
205 ns_request = self.db.get_one("ns_request", {"id": nsr_id})
206
207 nsr_lcm["status"] = "DELETING"
208 nsr_lcm["status_detailed"] = "Deleting charms"
209 self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm)
210 # TODO destroy VCA charm
211
212 # remove from RO
213 RO = ROclient.ROClient(self.loop, endpoint_url=self.ro_url, tenant=self.ro_tenant,
214 datacenter=ns_request["vim"])
215 # Delete ns
216 try:
217 RO_nsr_id = nsr_lcm["RO"]["nsr_id"]
218 if RO_nsr_id:
219 nsr_lcm["status_detailed"] = "Deleting ns at RO"
220 desc = await RO.delete("ns", RO_nsr_id)
221 print("debug", "deleted RO ns {}".format(RO_nsr_id))
222 nsr_lcm["RO"]["nsr_id"] = None
223 nsr_lcm["RO"]["nsr_status"] = "DELETED"
224 self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm)
225 except ROclient.ROClientException as e:
226 if e.http_code == 404:
227 nsr_lcm["RO"]["nsr_id"] = None
228 nsr_lcm["RO"]["nsr_status"] = "DELETED"
229 self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm)
230 print("warning", e)
231 else:
232 print("error", e)
233
234 # Delete nsd
235 try:
236 RO_nsd_id = nsr_lcm["RO"]["nsd_id"]
237 if RO_nsd_id:
238 nsr_lcm["status_detailed"] = "Deleting nsd at RO"
239 desc = await RO.delete("nsd", RO_nsd_id)
240 print("debug", "deleted RO nsd {}".format(RO_nsd_id))
241 nsr_lcm["RO"]["nsd_id"] = None
242 self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm)
243 except ROclient.ROClientException as e:
244 if e.http_code == 404:
245 nsr_lcm["RO"]["nsd_id"] = None
246 print("warning", e)
247 else:
248 print("error", e)
249
250 for vnf_id, RO_vnfd_id in nsr_lcm["RO"]["vnfd_id"].items():
251 try:
252 if RO_vnfd_id:
253 nsr_lcm["status_detailed"] = "Deleting vnfd at RO"
254 desc = await RO.delete("vnfd", RO_vnfd_id)
255 print("debug", "deleted RO vnfd {}".format(RO_vnfd_id))
256 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
257 self.db.replace("nsr_lcm", {"id": nsr_id}, nsr_lcm)
258 except ROclient.ROClientException as e:
259 if e.http_code == 404:
260 nsr_lcm["RO"]["vnfd_id"][vnf_id] = None
261 print("warning", e)
262 else:
263 print("error", e)
264 self.logger.debug("delete_ns task nsr_id={} Exit".format(nsr_id))
265
266
267 async def test(self, param=None):
268 self.logger.debug("Starting/Ending test task: {}".format(param))
269
270
271 def cancel_tasks(self, nsr_id):
272 """
273 Cancel all active tasks of a concrete nsr identified for nsr_id
274 :param nsr_id: nsr identity
275 :return: None, or raises an exception if not possible
276 """
277 if not self.lcm_tasks.get(nsr_id):
278 return
279 for order_id, tasks_set in self.lcm_tasks[nsr_id].items():
280 for task_name, task in tasks_set.items():
281 result = task.cancel()
282 if result:
283 self.logger.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id, order_id, task_name))
284 self.lcm_tasks[nsr_id] = {}
285
286
287
288 async def read_kafka(self):
289 self.logger.debug("kafka task Enter")
290 order_id = 1
291 # future = asyncio.Future()
292
293 while True:
294 command, params = await self.msg.aioread(self.loop, "ns")
295 order_id += 1
296 if command == "exit":
297 print("Bye!")
298 break
299 elif command.startswith("#"):
300 continue
301 elif command == "echo":
302 print(params)
303 elif command == "test":
304 asyncio.Task(self.test(params), loop=self.loop)
305 elif command == "break":
306 print("put a break in this line of code")
307 elif command == "create":
308 nsr_id = params.strip()
309 self.logger.debug("Deploying NS {}".format(nsr_id))
310 task = asyncio.ensure_future(self.create_ns(nsr_id))
311 if nsr_id not in self.lcm_tasks:
312 self.lcm_tasks[nsr_id] = {}
313 self.lcm_tasks[nsr_id][order_id] = {"create_ns": task}
314 elif command == "delete":
315 nsr_id = params.strip()
316 self.logger.debug("Deleting NS {}".format(nsr_id))
317 self.cancel_tasks(nsr_id)
318 task = asyncio.ensure_future(self.delete_ns(nsr_id))
319 if nsr_id not in self.lcm_tasks:
320 self.lcm_tasks[nsr_id] = {}
321 self.lcm_tasks[nsr_id][order_id] = {"delete_ns": task}
322 elif command == "show":
323 nsr_id = params.strip()
324 nsr_lcm = self.db.get_one("nsr_lcm", {"id": nsr_id})
325 print("nsr_lcm", nsr_lcm)
326 print("self.lcm_tasks", self.lcm_tasks.get(nsr_id))
327 else:
328 self.logger.debug("unknown command '{}'".format(command))
329 print("Usage:\n echo: <>\n create: <ns1|ns2>\n delete: <ns1|ns2>\n show: <ns1|ns2>")
330 self.logger.debug("kafka task Exit")
331
332
333 def start(self):
334 self.loop = asyncio.get_event_loop()
335 self.loop.run_until_complete(self.read_kafka())
336 self.loop.close()
337 self.loop = None
338
339
340 def read_config_file(config_file):
341 # TODO make a [ini] + yaml inside parser
342 # the configparser library is not suitable, because it does not admit comments at the end of line,
343 # and not parse integer or boolean
344 try:
345 with open(config_file) as f:
346 conf = yaml.load(f)
347 # TODO insert envioronment
348 # for k, v in environ.items():
349 # if k.startswith("OSMLCM_"):
350 # split _ lower add to config
351 return conf
352 except Exception as e:
353 self.logger.critical("At config file '{}': {}".format(config_file, e))
354
355
356
357 if __name__ == '__main__':
358
359 config_file = "lcm.cfg"
360 conf = read_config_file(config_file)
361 lcm = Lcm(conf)
362
363 # FOR TEST
364 RO_VIM = "OST2_MRT"
365
366 #FILL DATABASE
367 with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f:
368 vnfd = yaml.load(f)
369 vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd)
370 vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"}
371 lcm.db.create("vnfd", vnfd_clean)
372 with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f:
373 vnfd = yaml.load(f)
374 vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd)
375 vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"}
376 lcm.db.create("vnfd", vnfd_clean)
377 with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f:
378 nsd = yaml.load(f)
379 nsd_clean, _ = ROclient.remove_envelop("nsd", nsd)
380 nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"}
381 lcm.db.create("nsd", nsd_clean)
382
383 ns_request = {
384 "id": "ns1",
385 "nsr_id": "ns1",
386 "name": "pingpongOne",
387 "vim": RO_VIM,
388 "nsd_id": nsd_clean["id"], # nsd_ping_pong
389 }
390 lcm.db.create("ns_request", ns_request)
391 ns_request = {
392 "id": "ns2",
393 "nsr_id": "ns2",
394 "name": "pingpongTwo",
395 "vim": RO_VIM,
396 "nsd_id": nsd_clean["id"], # nsd_ping_pong
397 }
398 lcm.db.create("ns_request", ns_request)
399
400 lcm.start()
401
402
403