2 # -*- coding: utf-8 -*-
11 from dbbase
import DbException
12 from fsbase
import FsException
13 from msgbase
import MsgException
16 #streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s"
17 streamformat
= "%(name)s %(levelname)s: %(message)s"
18 logging
.basicConfig(format
=streamformat
, level
=logging
.DEBUG
)
21 class LcmException(Exception):
27 def __init__(self
, config
):
29 Init, Connect to database, filesystem storage, and messaging
30 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
33 # contains created tasks/futures to be able to cancel
38 self
.logger
= logging
.getLogger('lcm')
40 self
.ro_url
= "http://{}:{}/openmano".format(config
["RO"]["host"], config
["RO"]["port"])
41 self
.ro_tenant
= config
["RO"]["tenant"]
42 self
.vca
= config
["VCA"] # TODO VCA
45 if config
["database"]["driver"] == "mongo":
46 self
.db
= dbmongo
.dbmongo()
47 self
.db
.db_connect(config
["database"])
48 elif config
["database"]["driver"] == "memory":
49 self
.db
= dbmemory
.dbmemory()
50 self
.db
.db_connect(config
["database"])
52 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
53 config
["database"]["driver"]))
55 if config
["storage"]["driver"] == "local":
56 self
.fs
= fslocal
.FsLocal()
57 self
.fs
.fs_connect(config
["storage"])
59 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
60 config
["storage"]["driver"]))
62 if config
["message"]["driver"] == "local":
63 self
.msg
= msglocal
.msgLocal()
64 self
.msg
.connect(config
["message"])
66 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
67 config
["storage"]["driver"]))
68 except (DbException
, FsException
, MsgException
) as e
:
69 self
.self
.logger
.critical(str(e
), exc_info
=True)
70 raise LcmException(str(e
))
72 async def create_ns(self
, nsr_id
):
73 self
.logger
.debug("create_ns task nsr_id={} Enter".format(nsr_id
))
76 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
80 "status_detailed": "",
83 deloyment_timeout
= 120
85 ns_request
= self
.db
.get_one("ns_request", {"id": nsr_id
})
86 nsd
= self
.db
.get_one("nsd", {"id": ns_request
["nsd_id"]})
87 RO
= ROclient
.ROClient(self
.loop
, endpoint_url
=self
.ro_url
, tenant
=self
.ro_tenant
,
88 datacenter
=ns_request
["vim"])
89 nsr_lcm
["status_detailed"] = "Creating vnfd at RO"
90 # ns_request["constituent-vnfr-ref"] = []
92 self
.db
.create("nsr_lcm", nsr_lcm
)
94 # get vnfds, instantiate at RO
95 self
.logger
.debug("create_ns task nsr_id={} RO VNFD".format(nsr_id
))
96 for c_vnf
in nsd
["constituent-vnfd"]:
97 vnfd_id
= c_vnf
["vnfd-id-ref"]
98 vnfd
= self
.db
.get_one("vnfd", {"id": vnfd_id
})
99 vnfd
.pop("_admin", None)
100 vnfd
.pop("_id", None)
101 # vnfr = deepcopy(vnfd)
102 # vnfr["member-vnf-index"] = c_vnf["member-vnf-index"]
103 # vnfr["nsr-id"] = nsr_id
104 # vnfr["id"] = uuid4()
105 # vnfr["vnf-id"] = vnfd["id"]
106 # ns_request["constituent-vnfr-ref"],append(vnfd_id)
108 # TODO change id for RO in case it is present
110 desc
= await RO
.create("vnfd", descriptor
=vnfd
)
111 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = desc
["uuid"]
112 self
.db
.replace("nsr_lcm", {"id": nsr_id
}, nsr_lcm
)
113 except ROclient
.ROClientException
as e
:
114 if e
.http_code
== 409: # conflict, vnfd already present
119 # db_new("vnfr", vnfr)
120 # db_update("ns_request", nsr_id, ns_request)
123 self
.logger
.debug("create_ns task nsr_id={} RO NSD".format(nsr_id
))
124 nsr_lcm
["status_detailed"] = "Creating nsd at RO"
125 nsd_id
= ns_request
["nsd_id"]
126 nsd
= self
.db
.get_one("nsd", {"id": nsd_id
})
127 nsd
.pop("_admin", None)
130 desc
= await RO
.create("nsd", descriptor
=nsd
)
131 nsr_lcm
["RO"]["nsd_id"] = desc
["uuid"]
132 self
.db
.replace("nsr_lcm", {"id": nsr_id
}, nsr_lcm
)
133 except ROclient
.ROClientException
as e
:
134 if e
.http_code
== 409: # conflict, nsd already present
140 self
.logger
.debug("create_ns task nsr_id={} RO NS".format(nsr_id
))
141 nsr_lcm
["status_detailed"] = "Creating ns at RO"
142 desc
= await RO
.create("ns", name
=ns_request
["name"], datacenter
=ns_request
["vim"], scenario
=nsr_lcm
["RO"]["nsd_id"])
143 RO_nsr_id
= desc
["uuid"]
144 nsr_lcm
["RO"]["nsr_id"] = RO_nsr_id
145 nsr_lcm
["RO"]["nsr_status"] = "BUILD"
146 self
.db
.replace("nsr_lcm", {"id": nsr_id
}, nsr_lcm
)
148 # wait until NS is ready
149 deloyment_timeout
= 600
150 while deloyment_timeout
> 0:
151 ns_status_detailed
= "Waiting ns ready at RO"
152 nsr_lcm
["status_detailed"] = ns_status_detailed
153 desc
= await RO
.show("ns", RO_nsr_id
)
154 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
155 nsr_lcm
["RO"]["nsr_status"] = ns_status
156 if ns_status
== "ERROR":
157 raise ROclient
.ROClientException(ns_status_info
)
158 elif ns_status
== "BUILD":
159 nsr_lcm
["status_detailed"] = ns_status_detailed
+ "; nsr_id: '{}', {}".format(nsr_id
, ns_status_info
)
160 elif ns_status
== "ACTIVE":
161 nsr_lcm
["nsr_ip"] = RO
.get_ns_vnf_ip(desc
)
164 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
166 await asyncio
.sleep(5, loop
=self
.loop
)
167 deloyment_timeout
-= 5
168 if deloyment_timeout
<= 0:
169 raise ROclient
.ROClientException("Timeot wating ns to be ready")
170 nsr_lcm
["status_detailed"] = "Configuring vnfr"
171 self
.db
.replace("nsr_lcm", {"id": nsr_id
}, nsr_lcm
)
173 #for nsd in nsr_lcm["descriptors"]["nsd"]:
175 self
.logger
.debug("create_ns task nsr_id={} VCA look for".format(nsr_id
))
176 for c_vnf
in nsd
["constituent-vnfd"]:
177 vnfd_id
= c_vnf
["vnfd-id-ref"]
178 vnfd_index
= int(c_vnf
["member-vnf-index"])
179 vnfd
= self
.db
.get_one("vnfd", {"id": vnfd_id
})
180 if vnfd
.get("vnf-configuration") and vnfd
["vnf-configuration"].get("juju"):
181 proxy_charm
= vnfd
["vnf-configuration"]["juju"]["charm"]
182 config_primitive
= vnfd
["vnf-configuration"].get("config-primitive")
183 # get parameters for juju charm
184 base_folder
= vnfd
["_admin"]["storage"]
185 path
= base_folder
+ "/charms/" + proxy_charm
186 mgmt_ip
= nsr_lcm
['nsr_ip'][vnfd_index
]
187 # TODO launch VCA charm
188 # task = asyncio.ensure_future(DeployCharm(self.loop, path, mgmt_ip, config_primitive))
189 nsr_lcm
["status"] = "DONE"
190 self
.db
.replace("nsr_lcm", {"id": nsr_id
}, nsr_lcm
)
194 except (ROclient
.ROClientException
, Exception) as e
:
195 self
.logger
.debug("create_ns nsr_id={} Exception {}".format(nsr_id
, e
), exc_info
=True)
196 nsr_lcm
["status"] = "ERROR"
197 nsr_lcm
["status_detailed"] += ": ERROR {}".format(e
)
199 self
.logger
.debug("create_ns task nsr_id={} Exit".format(nsr_id
))
202 async def delete_ns(self
, nsr_id
):
203 self
.logger
.debug("delete_ns task nsr_id={} Enter".format(nsr_id
))
204 nsr_lcm
= self
.db
.get_one("nsr_lcm", {"id": nsr_id
})
205 ns_request
= self
.db
.get_one("ns_request", {"id": nsr_id
})
207 nsr_lcm
["status"] = "DELETING"
208 nsr_lcm
["status_detailed"] = "Deleting charms"
209 self
.db
.replace("nsr_lcm", {"id": nsr_id
}, nsr_lcm
)
210 # TODO destroy VCA charm
213 RO
= ROclient
.ROClient(self
.loop
, endpoint_url
=self
.ro_url
, tenant
=self
.ro_tenant
,
214 datacenter
=ns_request
["vim"])
217 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"]
219 nsr_lcm
["status_detailed"] = "Deleting ns at RO"
220 desc
= await RO
.delete("ns", RO_nsr_id
)
221 print("debug", "deleted RO ns {}".format(RO_nsr_id
))
222 nsr_lcm
["RO"]["nsr_id"] = None
223 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
224 self
.db
.replace("nsr_lcm", {"id": nsr_id
}, nsr_lcm
)
225 except ROclient
.ROClientException
as e
:
226 if e
.http_code
== 404:
227 nsr_lcm
["RO"]["nsr_id"] = None
228 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
229 self
.db
.replace("nsr_lcm", {"id": nsr_id
}, nsr_lcm
)
236 RO_nsd_id
= nsr_lcm
["RO"]["nsd_id"]
238 nsr_lcm
["status_detailed"] = "Deleting nsd at RO"
239 desc
= await RO
.delete("nsd", RO_nsd_id
)
240 print("debug", "deleted RO nsd {}".format(RO_nsd_id
))
241 nsr_lcm
["RO"]["nsd_id"] = None
242 self
.db
.replace("nsr_lcm", {"id": nsr_id
}, nsr_lcm
)
243 except ROclient
.ROClientException
as e
:
244 if e
.http_code
== 404:
245 nsr_lcm
["RO"]["nsd_id"] = None
250 for vnf_id
, RO_vnfd_id
in nsr_lcm
["RO"]["vnfd_id"].items():
253 nsr_lcm
["status_detailed"] = "Deleting vnfd at RO"
254 desc
= await RO
.delete("vnfd", RO_vnfd_id
)
255 print("debug", "deleted RO vnfd {}".format(RO_vnfd_id
))
256 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
257 self
.db
.replace("nsr_lcm", {"id": nsr_id
}, nsr_lcm
)
258 except ROclient
.ROClientException
as e
:
259 if e
.http_code
== 404:
260 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
264 self
.logger
.debug("delete_ns task nsr_id={} Exit".format(nsr_id
))
267 async def test(self
, param
=None):
268 self
.logger
.debug("Starting/Ending test task: {}".format(param
))
271 def cancel_tasks(self
, nsr_id
):
273 Cancel all active tasks of a concrete nsr identified for nsr_id
274 :param nsr_id: nsr identity
275 :return: None, or raises an exception if not possible
277 if not self
.lcm_tasks
.get(nsr_id
):
279 for order_id
, tasks_set
in self
.lcm_tasks
[nsr_id
].items():
280 for task_name
, task
in tasks_set
.items():
281 result
= task
.cancel()
283 self
.logger
.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id
, order_id
, task_name
))
284 self
.lcm_tasks
[nsr_id
] = {}
288 async def read_kafka(self
):
289 self
.logger
.debug("kafka task Enter")
291 # future = asyncio.Future()
294 command
, params
= await self
.msg
.aioread(self
.loop
, "ns")
296 if command
== "exit":
299 elif command
.startswith("#"):
301 elif command
== "echo":
303 elif command
== "test":
304 asyncio
.Task(self
.test(params
), loop
=self
.loop
)
305 elif command
== "break":
306 print("put a break in this line of code")
307 elif command
== "create":
308 nsr_id
= params
.strip()
309 self
.logger
.debug("Deploying NS {}".format(nsr_id
))
310 task
= asyncio
.ensure_future(self
.create_ns(nsr_id
))
311 if nsr_id
not in self
.lcm_tasks
:
312 self
.lcm_tasks
[nsr_id
] = {}
313 self
.lcm_tasks
[nsr_id
][order_id
] = {"create_ns": task
}
314 elif command
== "delete":
315 nsr_id
= params
.strip()
316 self
.logger
.debug("Deleting NS {}".format(nsr_id
))
317 self
.cancel_tasks(nsr_id
)
318 task
= asyncio
.ensure_future(self
.delete_ns(nsr_id
))
319 if nsr_id
not in self
.lcm_tasks
:
320 self
.lcm_tasks
[nsr_id
] = {}
321 self
.lcm_tasks
[nsr_id
][order_id
] = {"delete_ns": task
}
322 elif command
== "show":
323 nsr_id
= params
.strip()
324 nsr_lcm
= self
.db
.get_one("nsr_lcm", {"id": nsr_id
})
325 print("nsr_lcm", nsr_lcm
)
326 print("self.lcm_tasks", self
.lcm_tasks
.get(nsr_id
))
328 self
.logger
.debug("unknown command '{}'".format(command
))
329 print("Usage:\n echo: <>\n create: <ns1|ns2>\n delete: <ns1|ns2>\n show: <ns1|ns2>")
330 self
.logger
.debug("kafka task Exit")
334 self
.loop
= asyncio
.get_event_loop()
335 self
.loop
.run_until_complete(self
.read_kafka())
340 def read_config_file(config_file
):
341 # TODO make a [ini] + yaml inside parser
342 # the configparser library is not suitable, because it does not admit comments at the end of line,
343 # and not parse integer or boolean
345 with
open(config_file
) as f
:
347 # TODO insert envioronment
348 # for k, v in environ.items():
349 # if k.startswith("OSMLCM_"):
350 # split _ lower add to config
352 except Exception as e
:
353 self
.logger
.critical("At config file '{}': {}".format(config_file
, e
))
357 if __name__
== '__main__':
359 config_file
= "lcm.cfg"
360 conf
= read_config_file(config_file
)
367 with
open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f
:
369 vnfd_clean
, _
= ROclient
.remove_envelop("vnfd", vnfd
)
370 vnfd_clean
["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"}
371 lcm
.db
.create("vnfd", vnfd_clean
)
372 with
open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f
:
374 vnfd_clean
, _
= ROclient
.remove_envelop("vnfd", vnfd
)
375 vnfd_clean
["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"}
376 lcm
.db
.create("vnfd", vnfd_clean
)
377 with
open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f
:
379 nsd_clean
, _
= ROclient
.remove_envelop("nsd", nsd
)
380 nsd_clean
["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"}
381 lcm
.db
.create("nsd", nsd_clean
)
386 "name": "pingpongOne",
388 "nsd_id": nsd_clean
["id"], # nsd_ping_pong
390 lcm
.db
.create("ns_request", ns_request
)
394 "name": "pingpongTwo",
396 "nsd_id": nsd_clean
["id"], # nsd_ping_pong
398 lcm
.db
.create("ns_request", ns_request
)