06393e5f7d3aa3a463327796a4648e2d20981306
2 # -*- coding: utf-8 -*-
12 from dbbase
import DbException
13 from fsbase
import FsException
14 from msgbase
import MsgException
15 from os
import environ
18 #streamformat = "%(asctime)s %(name)s %(levelname)s: %(message)s"
19 streamformat
= "%(name)s %(levelname)s: %(message)s"
20 logging
.basicConfig(format
=streamformat
, level
=logging
.DEBUG
)
23 class LcmException(Exception):
29 def __init__(self
, config_file
):
31 Init, Connect to database, filesystem storage, and messaging
32 :param config: two level dictionary with configuration. Top level should contain 'database', 'storage',
35 # contains created tasks/futures to be able to cancel
38 self
.logger
= logging
.getLogger('lcm')
40 config
= self
.read_config_file(config_file
)
43 self
.ro_url
= "http://{}:{}/openmano".format(config
["RO"]["host"], config
["RO"]["port"])
44 self
.ro_tenant
= config
["RO"]["tenant"]
45 self
.vca
= config
["VCA"] # TODO VCA
48 if config
["database"]["driver"] == "mongo":
49 self
.db
= dbmongo
.dbmongo()
50 self
.db
.db_connect(config
["database"])
51 elif config
["database"]["driver"] == "memory":
52 self
.db
= dbmemory
.dbmemory()
53 self
.db
.db_connect(config
["database"])
55 raise LcmException("Invalid configuration param '{}' at '[database]':'driver'".format(
56 config
["database"]["driver"]))
58 if config
["storage"]["driver"] == "local":
59 self
.fs
= fslocal
.FsLocal()
60 self
.fs
.fs_connect(config
["storage"])
62 raise LcmException("Invalid configuration param '{}' at '[storage]':'driver'".format(
63 config
["storage"]["driver"]))
65 if config
["message"]["driver"] == "local":
66 self
.msg
= msglocal
.msgLocal()
67 self
.msg
.connect(config
["message"])
68 elif config
["message"]["driver"] == "kafka":
69 self
.msg
= msgkafka
.MsgKafka()
70 self
.msg
.connect(config
["message"])
72 raise LcmException("Invalid configuration param '{}' at '[message]':'driver'".format(
73 config
["storage"]["driver"]))
74 except (DbException
, FsException
, MsgException
) as e
:
75 self
.logger
.critical(str(e
), exc_info
=True)
76 raise LcmException(str(e
))
78 # def update_nsr_db(self, nsr_id, nsr_desc):
79 # self.db.replace("nsrs", nsr_id, nsr_desc)
81 async def create_ns(self
, nsr_id
):
82 self
.logger
.debug("create_ns task nsr_id={} Enter".format(nsr_id
))
83 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
86 "RO": {"vnfd_id": {}, "nsd_id": None, "nsr_id": None, "nsr_status": "SCHEDULED"},
90 db_nsr
["_admin"]["deploy"] = nsr_lcm
91 db_nsr
["detailed-status"] = "creating"
92 db_nsr
["operational-status"] = "init"
94 deloyment_timeout
= 120
97 RO
= ROclient
.ROClient(self
.loop
, endpoint_url
=self
.ro_url
, tenant
=self
.ro_tenant
,
98 datacenter
=db_nsr
["datacenter"])
100 # get vnfds, instantiate at RO
101 for c_vnf
in nsd
["constituent-vnfd"]:
102 vnfd_id
= c_vnf
["vnfd-id-ref"]
103 self
.logger
.debug("create_ns task nsr_id={} RO vnfd={} creating".format(nsr_id
, vnfd_id
))
104 db_nsr
["detailed-status"] = "Creating vnfd {} at RO".format(vnfd_id
)
105 vnfd
= self
.db
.get_one("vnfds", {"id": vnfd_id
})
106 vnfd
.pop("_admin", None)
107 vnfd
.pop("_id", None)
110 vnfd_list
= await RO
.get_list("vnfd", filter_by
={"osm_id": vnfd_id
})
112 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = vnfd_list
[0]["uuid"]
113 self
.logger
.debug("create_ns task nsr_id={} RO vnfd={} exist. Using RO_id={}".format(
114 nsr_id
, vnfd_id
, vnfd_list
[0]["uuid"]))
116 desc
= await RO
.create("vnfd", descriptor
=vnfd
)
117 nsr_lcm
["RO"]["vnfd_id"][vnfd_id
] = desc
["uuid"]
118 self
.db
.replace("nsrs", nsr_id
, db_nsr
)
120 # db_new("vnfr", vnfr)
121 # db_update("ns_request", nsr_id, ns_request)
124 nsd_id
= db_nsr
["nsd"]["id"]
125 self
.logger
.debug("create_ns task nsr_id={} RO nsd={} creating".format(nsr_id
, nsd_id
))
126 db_nsr
["detailed-status"] = "Creating nsd {} at RO".format(nsd_id
)
127 nsd
= self
.db
.get_one("nsds", {"id": nsd_id
})
128 nsd
.pop("_admin", None)
131 nsd_list
= await RO
.get_list("nsd", filter_by
={"osm_id": nsd_id
})
133 nsr_lcm
["RO"]["nsd_id"] = nsd_list
[0]["uuid"]
134 self
.logger
.debug("create_ns task nsr_id={} RO nsd={} exist. Using RO_id={}".format(
135 nsr_id
, nsd_id
, nsd_list
[0]["uuid"]))
137 desc
= await RO
.create("nsd", descriptor
=nsd
)
138 nsr_lcm
["RO"]["nsd_id"] = desc
["uuid"]
139 self
.db
.replace("nsrs", nsr_id
, db_nsr
)
142 self
.logger
.debug("create_ns task nsr_id={} RO ns creating".format(nsr_id
))
143 db_nsr
["detailed-status"] = "Creating ns at RO"
144 desc
= await RO
.create("ns", name
=db_nsr
["name"], datacenter
=db_nsr
["datacenter"],
145 scenario
=nsr_lcm
["RO"]["nsd_id"])
146 RO_nsr_id
= desc
["uuid"]
147 nsr_lcm
["RO"]["nsr_id"] = RO_nsr_id
148 nsr_lcm
["RO"]["nsr_status"] = "BUILD"
149 self
.db
.replace("nsrs", nsr_id
, db_nsr
)
151 # wait until NS is ready
152 self
.logger
.debug("create_ns task nsr_id={} RO ns_id={} waiting to be ready".format(nsr_id
, RO_nsr_id
))
153 deloyment_timeout
= 600
154 while deloyment_timeout
> 0:
155 ns_status_detailed
= "Waiting ns ready at RO"
156 db_nsr
["detailed-status"] = ns_status_detailed
157 desc
= await RO
.show("ns", RO_nsr_id
)
158 ns_status
, ns_status_info
= RO
.check_ns_status(desc
)
159 nsr_lcm
["RO"]["nsr_status"] = ns_status
160 if ns_status
== "ERROR":
161 raise ROclient
.ROClientException(ns_status_info
)
162 elif ns_status
== "BUILD":
163 db_nsr
["detailed-status"] = ns_status_detailed
+ "; nsr_id: '{}', {}".format(nsr_id
, ns_status_info
)
164 elif ns_status
== "ACTIVE":
165 nsr_lcm
["nsr_ip"] = RO
.get_ns_vnf_ip(desc
)
168 assert False, "ROclient.check_ns_status returns unknown {}".format(ns_status
)
170 await asyncio
.sleep(5, loop
=self
.loop
)
171 deloyment_timeout
-= 5
172 if deloyment_timeout
<= 0:
173 raise ROclient
.ROClientException("Timeot wating ns to be ready")
174 db_nsr
["detailed-status"] = "Configuring vnfr"
175 self
.db
.replace("nsrs", nsr_id
, db_nsr
)
177 #for nsd in nsr_lcm["descriptors"]["nsd"]:
179 self
.logger
.debug("create_ns task nsr_id={} VCA look for".format(nsr_id
))
180 for c_vnf
in nsd
["constituent-vnfd"]:
181 vnfd_id
= c_vnf
["vnfd-id-ref"]
182 vnfd_index
= str(c_vnf
["member-vnf-index"])
183 vnfd
= self
.db
.get_one("vnfds", {"id": vnfd_id
})
184 db_nsr
["config-status"] = "config_not_needed"
185 if vnfd
.get("vnf-configuration") and vnfd
["vnf-configuration"].get("juju"):
186 db_nsr
["config-status"] = "configuring"
187 proxy_charm
= vnfd
["vnf-configuration"]["juju"]["charm"]
188 config_primitive
= vnfd
["vnf-configuration"].get("config-primitive")
189 # get parameters for juju charm
190 base_folder
= vnfd
["_admin"]["storage"]
191 path
= "{}{}/{}/charms".format(base_folder
["path"], base_folder
["folder"], base_folder
["file"],
193 mgmt_ip
= nsr_lcm
['nsr_ip'][vnfd_index
]
194 # TODO launch VCA charm
195 # task = asyncio.ensure_future(DeployCharm(self.loop, path, mgmt_ip, config_primitive))
196 db_nsr
["detailed-status"] = "Done"
197 db_nsr
["operational-status"] = "running"
198 self
.db
.replace("nsrs", nsr_id
, db_nsr
)
200 self
.logger
.debug("create_ns task nsr_id={} Exit Ok".format(nsr_id
))
203 except (ROclient
.ROClientException
, Exception) as e
:
204 db_nsr
["operational-status"] = "failed"
205 db_nsr
["detailed-status"] += ": ERROR {}".format(e
)
206 self
.db
.replace("nsrs", nsr_id
, db_nsr
)
208 "create_ns nsr_id={} Exit Exception on '{}': {}".format(nsr_id
, db_nsr
["detailed-status"], e
),
211 async def delete_ns(self
, nsr_id
):
212 self
.logger
.debug("delete_ns task nsr_id={}, Delete_ns task nsr_id={} Enter".format(nsr_id
, nsr_id
))
213 db_nsr
= self
.db
.get_one("nsrs", {"_id": nsr_id
})
214 nsr_lcm
= db_nsr
["_admin"]["deploy"]
216 db_nsr
["operational-status"] = "terminate"
217 db_nsr
["config-status"] = "terminate"
218 db_nsr
["detailed-status"] = "Deleting charms"
219 self
.db
.replace("nsrs", nsr_id
, db_nsr
)
220 # TODO destroy VCA charm
223 RO
= ROclient
.ROClient(self
.loop
, endpoint_url
=self
.ro_url
, tenant
=self
.ro_tenant
,
224 datacenter
=db_nsr
["datacenter"])
226 RO_nsr_id
= nsr_lcm
["RO"]["nsr_id"]
229 db_nsr
["detailed-status"] = "Deleting ns at RO"
230 desc
= await RO
.delete("ns", RO_nsr_id
)
231 self
.logger
.debug("delete_ns task nsr_id={} RO ns={} deleted".format(nsr_id
, RO_nsr_id
))
232 nsr_lcm
["RO"]["nsr_id"] = None
233 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
234 except ROclient
.ROClientException
as e
:
235 if e
.http_code
== 404: # not found
236 nsr_lcm
["RO"]["nsr_id"] = None
237 nsr_lcm
["RO"]["nsr_status"] = "DELETED"
238 self
.logger
.debug("delete_ns task nsr_id={} RO ns={} already deleted".format(nsr_id
, RO_nsr_id
))
239 elif e
.http_code
== 409: #conflict
240 self
.logger
.debug("delete_ns task nsr_id={} RO ns={} delete conflict: {}".format(nsr_id
, RO_nsr_id
,
243 self
.logger
.error("delete_ns task nsr_id={} RO ns={} delete error: {}".format(nsr_id
, RO_nsr_id
, e
))
244 self
.db
.replace("nsrs", nsr_id
, db_nsr
)
247 RO_nsd_id
= nsr_lcm
["RO"]["nsd_id"]
250 db_nsr
["detailed-status"] = "Deleting nsd at RO"
251 desc
= await RO
.delete("nsd", RO_nsd_id
)
252 self
.logger
.debug("delete_ns task nsr_id={} RO nsd={} deleted".format(nsr_id
, RO_nsd_id
))
253 nsr_lcm
["RO"]["nsd_id"] = None
254 except ROclient
.ROClientException
as e
:
255 if e
.http_code
== 404: # not found
256 nsr_lcm
["RO"]["nsd_id"] = None
257 self
.logger
.debug("delete_ns task nsr_id={} RO nsd={} already deleted".format(nsr_id
, RO_nsd_id
))
258 elif e
.http_code
== 409: #conflict
259 self
.logger
.debug("delete_ns task nsr_id={} RO nsd={} delete conflict: {}".format(nsr_id
, RO_nsd_id
,
262 self
.logger
.error("delete_ns task nsr_id={} RO nsd={} delete error: {}".format(nsr_id
, RO_nsd_id
,
264 self
.db
.replace("nsrs", nsr_id
, db_nsr
)
266 for vnf_id
, RO_vnfd_id
in nsr_lcm
["RO"]["vnfd_id"].items():
270 db_nsr
["detailed-status"] = "Deleting vnfd {} at RO".format(vnf_id
)
271 desc
= await RO
.delete("vnfd", RO_vnfd_id
)
272 self
.logger
.debug("delete_ns task nsr_id={} RO vnfd={} deleted".format(nsr_id
, RO_vnfd_id
))
273 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
274 except ROclient
.ROClientException
as e
:
275 if e
.http_code
== 404: # not found
276 nsr_lcm
["RO"]["vnfd_id"][vnf_id
] = None
277 self
.logger
.debug("delete_ns task nsr_id={} RO vnfd={} already deleted ".format(nsr_id
, RO_vnfd_id
))
278 elif e
.http_code
== 409: #conflict
279 self
.logger
.debug("delete_ns task nsr_id={} RO vnfd={} delete conflict: {}".format(
280 nsr_id
, RO_vnfd_id
, e
))
282 self
.logger
.error("delete_ns task nsr_id={} RO vnfd={} delete error: {}".format(
283 nsr_id
, RO_vnfd_id
, e
))
284 self
.db
.replace("nsrs", nsr_id
, db_nsr
)
287 # TODO delete from database or mark as deleted???
288 db_nsr
["operational-status"] = "terminated"
289 self
.db
.del_one("nsrs", {"_id": nsr_id
})
290 self
.logger
.debug("delete_ns task nsr_id={} Exit".format(nsr_id
))
292 async def test(self
, param
=None):
293 self
.logger
.debug("Starting/Ending test task: {}".format(param
))
295 def cancel_tasks(self
, nsr_id
):
297 Cancel all active tasks of a concrete nsr identified for nsr_id
298 :param nsr_id: nsr identity
299 :return: None, or raises an exception if not possible
301 if not self
.lcm_tasks
.get(nsr_id
):
303 for order_id
, tasks_set
in self
.lcm_tasks
[nsr_id
].items():
304 for task_name
, task
in tasks_set
.items():
305 result
= task
.cancel()
307 self
.logger
.debug("nsr_id={} order_id={} task={} cancelled".format(nsr_id
, order_id
, task_name
))
308 self
.lcm_tasks
[nsr_id
] = {}
310 async def read_kafka(self
):
311 self
.logger
.debug("kafka task Enter")
313 # future = asyncio.Future()
316 command
, params
= await self
.msg
.aioread("ns", self
.loop
)
318 if command
== "exit":
321 elif command
.startswith("#"):
323 elif command
== "echo":
325 elif command
== "test":
326 asyncio
.Task(self
.test(params
), loop
=self
.loop
)
327 elif command
== "break":
328 print("put a break in this line of code")
329 elif command
== "create":
330 nsr_id
= params
.strip()
331 self
.logger
.debug("Deploying NS {}".format(nsr_id
))
332 task
= asyncio
.ensure_future(self
.create_ns(nsr_id
))
333 if nsr_id
not in self
.lcm_tasks
:
334 self
.lcm_tasks
[nsr_id
] = {}
335 self
.lcm_tasks
[nsr_id
][order_id
] = {"create_ns": task
}
336 elif command
== "delete":
337 nsr_id
= params
.strip()
338 self
.logger
.debug("Deleting NS {}".format(nsr_id
))
339 self
.cancel_tasks(nsr_id
)
340 task
= asyncio
.ensure_future(self
.delete_ns(nsr_id
))
341 if nsr_id
not in self
.lcm_tasks
:
342 self
.lcm_tasks
[nsr_id
] = {}
343 self
.lcm_tasks
[nsr_id
][order_id
] = {"delete_ns": task
}
344 elif command
== "show":
345 nsr_id
= params
.strip()
346 nsr_lcm
= self
.db
.get_one("nsr_lcm", {"id": nsr_id
})
347 print("nsr_lcm", nsr_lcm
)
348 print("self.lcm_tasks", self
.lcm_tasks
.get(nsr_id
))
350 self
.logger
.debug("unknown command '{}'".format(command
))
351 print("Usage:\n echo: <>\n create: <ns1|ns2>\n delete: <ns1|ns2>\n show: <ns1|ns2>")
352 self
.logger
.debug("kafka task Exit")
356 self
.loop
= asyncio
.get_event_loop()
357 self
.loop
.run_until_complete(self
.read_kafka())
362 def read_config_file(self
, config_file
):
363 # TODO make a [ini] + yaml inside parser
364 # the configparser library is not suitable, because it does not admit comments at the end of line,
365 # and not parse integer or boolean
367 with
open(config_file
) as f
:
369 for k
, v
in environ
.items():
370 if not k
.startswith("OSMLCM_"):
372 k_items
= k
.lower().split("_")
375 for k_item
in k_items
[1:-1]:
376 if k_item
in ("ro", "vca"):
377 # put in capital letter
378 k_item
= k_item
.upper()
380 if k_items
[-1] == "port":
381 c
[k_items
[-1]] = int(v
)
384 except Exception as e
:
385 self
.logger
.warn("skipping environ '{}' on exception '{}'".format(k
, e
))
388 except Exception as e
:
389 self
.logger
.critical("At config file '{}': {}".format(config_file
, e
))
393 if __name__
== '__main__':
395 config_file
= "lcm.cfg"
396 lcm
= Lcm(config_file
)
399 # RO_VIM = "OST2_MRT"
402 # with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf/src/ping_vnfd.yaml") as f:
403 # vnfd = yaml.load(f)
404 # vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd)
405 # vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/ping_vnf"}
406 # lcm.db.create("vnfd", vnfd_clean)
407 # with open("/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf/src/pong_vnfd.yaml") as f:
408 # vnfd = yaml.load(f)
409 # vnfd_clean, _ = ROclient.remove_envelop("vnfd", vnfd)
410 # vnfd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/vnfd/pong_vnf"}
411 # lcm.db.create("vnfd", vnfd_clean)
412 # with open("/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns/src/ping_pong_nsd.yaml") as f:
414 # nsd_clean, _ = ROclient.remove_envelop("nsd", nsd)
415 # nsd_clean["_admin"] = {"storage": "/home/atierno/OSM/osm/devops/descriptor-packages/nsd/ping_pong_ns"}
416 # lcm.db.create("nsd", nsd_clean)
421 # "name": "pingpongOne",
423 # "nsd_id": nsd_clean["id"], # nsd_ping_pong
425 # lcm.db.create("ns_request", ns_request)
429 # "name": "pingpongTwo",
431 # "nsd_id": nsd_clean["id"], # nsd_ping_pong
433 # lcm.db.create("ns_request", ns_request)