update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwresmgr / rift / tasklets / rwresmgrtasklet / rwresmgr_core.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import uuid
18 import collections
19 import asyncio
20 import concurrent.futures
21
22 import gi
23 gi.require_version('RwDts', '1.0')
24 gi.require_version('RwYang', '1.0')
25 gi.require_version('RwResourceMgrYang', '1.0')
26 gi.require_version('RwLaunchpadYang', '1.0')
27 gi.require_version('RwcalYang', '1.0')
28 from gi.repository import (
29 RwDts as rwdts,
30 RwYang,
31 RwResourceMgrYang,
32 RwLaunchpadYang,
33 RwcalYang,
34 )
35
36 from gi.repository.RwTypes import RwStatus
37
38 class ResMgrCALNotPresent(Exception):
39 pass
40
41 class ResMgrCloudAccountNotFound(Exception):
42 pass
43
44 class ResMgrCloudAccountExists(Exception):
45 pass
46
47 class ResMgrCloudAccountInUse(Exception):
48 pass
49
50 class ResMgrDuplicatePool(Exception):
51 pass
52
53 class ResMgrPoolNotAvailable(Exception):
54 pass
55
56 class ResMgrPoolOperationFailed(Exception):
57 pass
58
59 class ResMgrDuplicateEventId(Exception):
60 pass
61
62 class ResMgrUnknownEventId(Exception):
63 pass
64
65 class ResMgrUnknownResourceId(Exception):
66 pass
67
68 class ResMgrResourceIdBusy(Exception):
69 pass
70
71 class ResMgrResourceIdNotAllocated(Exception):
72 pass
73
74 class ResMgrNoResourcesAvailable(Exception):
75 pass
76
77 class ResMgrResourcesInitFailed(Exception):
78 pass
79
80 class ResMgrCALOperationFailure(Exception):
81 pass
82
83
84
85 class ResourceMgrCALHandler(object):
86 def __init__(self, loop, executor, log, log_hdl, account):
87 self._log = log
88 self._loop = loop
89 self._executor = executor
90 self._account = account.cal_account_msg
91 self._rwcal = account.cal
92 if account.account_type == 'aws':
93 self._subnets = ["172.31.97.0/24", "172.31.98.0/24", "172.31.99.0/24", "172.31.100.0/24", "172.31.101.0/24"]
94 else:
95 self._subnets = ["11.0.0.0/24",
96 "12.0.0.0/24",
97 "13.0.0.0/24",
98 "14.0.0.0/24",
99 "15.0.0.0/24",
100 "16.0.0.0/24",
101 "17.0.0.0/24",
102 "18.0.0.0/24",
103 "19.0.0.0/24",
104 "20.0.0.0/24",
105 "21.0.0.0/24",
106 "22.0.0.0/24",
107 "23.0.0.0/24",
108 "24.0.0.0/24",
109 "25.0.0.0/24",
110 "26.0.0.0/24",
111 "27.0.0.0/24",
112 "28.0.0.0/24",
113 "29.0.0.0/24",
114 "30.0.0.0/24",
115 "31.0.0.0/24",
116 "32.0.0.0/24",
117 "33.0.0.0/24",
118 "34.0.0.0/24",
119 "35.0.0.0/24",
120 "36.0.0.0/24",
121 "37.0.0.0/24",
122 "38.0.0.0/24"]
123 self._subnet_ptr = 0
124 self._boot_cache = {'compute': []}
125 self._lock = asyncio.Lock(loop=self._loop)
126
127 def get_cloud_account(self):
128 return self._account
129
130 def _select_link_subnet(self):
131 subnet = self._subnets[self._subnet_ptr]
132 self._subnet_ptr += 1
133 if self._subnet_ptr == len(self._subnets):
134 self._subnet_ptr = 0
135 return subnet
136
137 @asyncio.coroutine
138 def create_virtual_network(self, req_params):
139 rc, link = yield from self._loop.run_in_executor(self._executor,
140 self._rwcal.get_virtual_link_by_name,
141 self._account,
142 req_params.name)
143 if link:
144 self._log.debug("Found existing virtual-network with matching name in cloud. Reusing the virtual-network with id: %s" %(link.virtual_link_id))
145 if req_params.vim_network_name:
146 resource_type = 'precreated'
147 else:
148 # This is case of realloc
149 resource_type = 'dynamic'
150 return (resource_type, link.virtual_link_id)
151 elif req_params.vim_network_name:
152 self._log.error("Virtual-network-allocate operation failed for cloud account: %s Vim Network with name %s does not pre-exist",
153 self._account.name, req_params.vim_network_name)
154 raise ResMgrCALOperationFailure("Virtual-network allocate operation failed for cloud account: %s Vim Network with name %s does not pre-exist"
155 %(self._account.name, req_params.vim_network_name))
156
157 params = RwcalYang.YangData_RwProject_Project_VirtualLinkReqParams()
158 params.from_dict(req_params.as_dict())
159 params.subnet = self._select_link_subnet()
160 #rc, rs = self._rwcal.create_virtual_link(self._account, params)
161 self._log.debug("Calling create_virtual_link API with params: %s" %(str(req_params)))
162 rc, rs = yield from self._loop.run_in_executor(self._executor,
163 self._rwcal.create_virtual_link,
164 self._account,
165 params)
166 if rc.status != RwStatus.SUCCESS:
167 self._log.error("Virtual-network-allocate operation failed for cloud account: %s - error_msg: %s, Traceback: %s",
168 self._account.name, rc.error_msg, rc.traceback)
169 raise ResMgrCALOperationFailure("Virtual-network allocate operation failed for cloud account: %s (%s)"
170 %(self._account.name, rc.error_msg))
171
172 return ('dynamic',rs)
173
174 @asyncio.coroutine
175 def delete_virtual_network(self, network_id):
176 #rc = self._rwcal.delete_virtual_link(self._account, network_id)
177 self._log.debug("Calling delete_virtual_link API with id: %s" %(network_id))
178 rc = yield from self._loop.run_in_executor(self._executor,
179 self._rwcal.delete_virtual_link,
180 self._account,
181 network_id)
182 if rc != RwStatus.SUCCESS:
183 self._log.error("Virtual-network-release operation failed for cloud account: %s. ResourceID: %s",
184 self._account.name,
185 network_id)
186 raise ResMgrCALOperationFailure("Virtual-network release operation failed for cloud account: %s. ResourceId: %s" %(self._account.name, network_id))
187
188 @asyncio.coroutine
189 def get_virtual_network_info(self, network_id):
190 #rc, rs = self._rwcal.get_virtual_link(self._account, network_id)
191 self._log.debug("Calling get_virtual_link_info API with id: %s" %(network_id))
192 rc, rs = yield from self._loop.run_in_executor(self._executor,
193 self._rwcal.get_virtual_link,
194 self._account,
195 network_id)
196 if rc != RwStatus.SUCCESS:
197 self._log.error("Virtual-network-info operation failed for cloud account: %s. ResourceID: %s",
198 self._account.name,
199 network_id)
200 raise ResMgrCALOperationFailure("Virtual-network-info operation failed for cloud account: %s. ResourceID: %s" %(self._account.name, network_id))
201 return rs
202
203 @asyncio.coroutine
204 def create_virtual_compute(self, req_params):
205 if not self._boot_cache['compute']:
206 self._log.debug("Calling get_vdu_list API")
207 yield from self._lock.acquire()
208 try:
209 self._log.debug("Populating compute cache ")
210 rc, rsp = yield from self._loop.run_in_executor(self._executor,
211 self._rwcal.get_vdu_list,
212 self._account)
213
214 if rc.status != RwStatus.SUCCESS:
215 self._log.error("Virtual-compute-info operation failed for cloud account: %s - error_msg: %s, Traceback: %s",
216 self._account.name, rc.error_msg, rc.traceback)
217 raise ResMgrCALOperationFailure("Virtual-compute-info operation failed for cloud account: %s, Error (%s)"
218 % (self._account.name, rc.error_msg))
219 self._boot_cache['compute'] = rsp.vdu_info_list
220 finally:
221 self._lock.release()
222 else:
223 self._log.debug("!!!!!!!! Found compute cache ")
224
225 vdus = [vm for vm in self._boot_cache['compute'] if vm.name == req_params.name]
226
227 if vdus:
228 self._log.debug("Found existing virtual-compute with matching name in cloud. Reusing the virtual-compute element with id: %s" %(vdus[0].vdu_id))
229 return vdus[0].vdu_id
230
231 params = RwcalYang.YangData_RwProject_Project_VduInitParams()
232 params.from_dict(req_params.as_dict())
233
234 if 'image_name' in req_params:
235 image_checksum = req_params.image_checksum if req_params.has_field("image_checksum") else None
236 params.image_id = yield from self.get_image_id_from_image_info(req_params.image_name, image_checksum)
237
238 self._log.debug("Calling create_vdu API with params %s" %(str(params)))
239 rc, rs = yield from self._loop.run_in_executor(self._executor,
240 self._rwcal.create_vdu,
241 self._account,
242 params)
243
244 if rc.status != RwStatus.SUCCESS:
245 self._log.error("Virtual-compute-create operation failed for cloud account: %s - error_msg: %s, Traceback: %s",
246 self._account.name, rc.error_msg, rc.traceback)
247 raise ResMgrCALOperationFailure("Virtual-compute-create operation failed for cloud account: %s (%s)"
248 %(self._account.name, rc.error_msg))
249
250 return rs
251
252 @asyncio.coroutine
253 def modify_virtual_compute(self, req_params):
254 #rc = self._rwcal.modify_vdu(self._account, req_params)
255 self._log.debug("Calling modify_vdu API with params: %s" %(str(req_params)))
256 rc = yield from self._loop.run_in_executor(self._executor,
257 self._rwcal.modify_vdu,
258 self._account,
259 req_params)
260 if rc != RwStatus.SUCCESS:
261 self._log.error("Virtual-compute-modify operation failed for cloud account: %s", self._account.name)
262 raise ResMgrCALOperationFailure("Virtual-compute-modify operation failed for cloud account: %s" %(self._account.name))
263
264 @asyncio.coroutine
265 def delete_virtual_compute(self, compute_id):
266 #rc = self._rwcal.delete_vdu(self._account, compute_id)
267 self._log.debug("Calling delete_vdu API with id: %s" %(compute_id))
268 # Delete the cache
269 self._boot_cache['compute'] = list()
270
271 rc = yield from self._loop.run_in_executor(self._executor,
272 self._rwcal.delete_vdu,
273 self._account,
274 compute_id)
275 if rc != RwStatus.SUCCESS:
276 self._log.error("Virtual-compute-release operation failed for cloud account: %s. ResourceID: %s",
277 self._account.name,
278 compute_id)
279 raise ResMgrCALOperationFailure("Virtual-compute-release operation failed for cloud account: %s. ResourceID: %s" %(self._account.name, compute_id))
280
281 @asyncio.coroutine
282 def get_virtual_compute_info(self, compute_id, mgmt_network=""):
283 #rc, rs = self._rwcal.get_vdu(self._account, compute_id, None)
284 self._log.debug("Calling get_vdu API with id: %s" %(compute_id))
285 rc, rs = yield from self._loop.run_in_executor(self._executor,
286 self._rwcal.get_vdu,
287 self._account,
288 compute_id,
289 mgmt_network)
290 if rc.status != RwStatus.SUCCESS:
291 self._log.error("Virtual-compute-info operation failed for cloud account: %s - error_msg: %s, Traceback: %s",
292 self._account.name, rc.error_msg, rc.traceback)
293 raise ResMgrCALOperationFailure("Virtual-compute-info operation failed for cloud account: %s, ResourceID: %s, Error (%s)"
294 %(self._account.name, compute_id, rc.error_msg))
295 return rs
296
297 @asyncio.coroutine
298 def get_compute_flavor_info_list(self):
299 #rc, rs = self._rwcal.get_flavor_list(self._account)
300 self._log.debug("Calling get_flavor_list API")
301 rc, rs = yield from self._loop.run_in_executor(self._executor,
302 self._rwcal.get_flavor_list,
303 self._account)
304 if rc != RwStatus.SUCCESS:
305 self._log.error("Get-flavor-info-list operation failed for cloud account: %s",
306 self._account.name)
307 raise ResMgrCALOperationFailure("Get-flavor-info-list operation failed for cloud account: %s" %(self._account.name))
308 return rs.flavorinfo_list
309
310 @asyncio.coroutine
311 def create_compute_flavor(self, request):
312 flavor = RwcalYang.YangData_RwProject_Project_VimResources_FlavorinfoList()
313 flavor.name = str(uuid.uuid4())
314 epa_types = ['vm_flavor', 'guest_epa', 'host_epa', 'host_aggregate']
315 epa_dict = {k: v for k, v in request.as_dict().items() if k in epa_types}
316 flavor.from_dict(epa_dict)
317
318 self._log.info("Creating flavor: %s", flavor)
319 #rc, rs = self._rwcal.create_flavor(self._account, flavor)
320 self._log.debug("Calling create_flavor API")
321 rc, rs = yield from self._loop.run_in_executor(self._executor,
322 self._rwcal.create_flavor,
323 self._account,
324 flavor)
325 if rc != RwStatus.SUCCESS:
326 self._log.error("Create-flavor operation failed for cloud account: %s",
327 self._account.name)
328 raise ResMgrCALOperationFailure("Create-flavor operation failed for cloud account: %s" %(self._account.name))
329 return rs
330
331 @asyncio.coroutine
332 def get_image_info_list(self):
333 #rc, rs = self._rwcal.get_image_list(self._account)
334 self._log.debug("Calling get_image_list API")
335 rc, rs = yield from self._loop.run_in_executor(self._executor,
336 self._rwcal.get_image_list,
337 self._account)
338 if rc != RwStatus.SUCCESS:
339 self._log.error("Get-image-info-list operation failed for cloud account: %s",
340 self._account.name)
341 raise ResMgrCALOperationFailure("Get-image-info-list operation failed for cloud account: %s" %(self._account.name))
342 return rs.imageinfo_list
343
344 @asyncio.coroutine
345 def get_image_id_from_image_info(self, image_name, image_checksum=None):
346 self._log.debug("Looking up image id for image name %s and checksum %s on cloud account: %s",
347 image_name, image_checksum, self._account.name
348 )
349
350 image_list = yield from self.get_image_info_list()
351 matching_images = [i for i in image_list if i.name == image_name]
352
353 # If the image checksum was filled in then further filter the images by the checksum
354 if image_checksum is not None:
355 matching_images = [i for i in matching_images if i.checksum == image_checksum]
356 else:
357 self._log.warning("Image checksum not provided. Lookup using image name (%s) only.",
358 image_name)
359
360 if len(matching_images) == 0:
361 raise ResMgrCALOperationFailure("Could not find image name {} (using checksum: {}) for cloud account: {}".format(
362 image_name, image_checksum, self._account.name
363 ))
364
365 elif len(matching_images) > 1:
366 unique_checksums = {i.checksum for i in matching_images}
367 if len(unique_checksums) > 1:
368 msg = ("Too many images with different checksums matched "
369 "image name of %s for cloud account: %s" % (image_name, self._account.name))
370 raise ResMgrCALOperationFailure(msg)
371
372 return matching_images[0].id
373
374 @asyncio.coroutine
375 def get_image_info(self, image_id):
376 #rc, rs = self._rwcal.get_image(self._account, image_id)
377 self._log.debug("Calling get_image API for id: %s" %(image_id))
378 rc, rs = yield from self._loop.run_in_executor(self._executor,
379 self._rwcal.get_image,
380 self._account,
381 image_id)
382 if rc != RwStatus.SUCCESS:
383 self._log.error("Get-image-info-list operation failed for cloud account: %s",
384 self._account.name)
385 raise ResMgrCALOperationFailure("Get-image-info operation failed for cloud account: %s" %(self._account.name))
386 return rs.imageinfo_list
387
388 def dynamic_flavor_supported(self):
389 return getattr(self._account, self._account.account_type).dynamic_flavor_support
390
391
392 class Resource(object):
393 def __init__(self, resource_id, resource_type, request):
394 self._id = resource_id
395 self._type = resource_type
396 self._request = request
397
398 @property
399 def resource_id(self):
400 return self._id
401
402 @property
403 def resource_type(self):
404 return self._type
405
406 @property
407 def request(self):
408 return self._request
409
410 def cleanup(self):
411 pass
412
413
414 class ComputeResource(Resource):
415 pass
416
417
418 class NetworkResource(Resource):
419 pass
420
421
422 class ResourcePoolInfo(object):
423 def __init__(self, name, pool_type, resource_type, max_size):
424 self.name = name
425 self.pool_type = pool_type
426 self.resource_type = resource_type
427 self.max_size = max_size
428
429 @classmethod
430 def from_dict(cls, pool_dict):
431 return cls(
432 pool_dict["name"],
433 pool_dict["pool_type"],
434 pool_dict["resource_type"],
435 pool_dict["max_size"],
436 )
437
438
439 class ResourcePool(object):
440 def __init__(self, log, loop, pool_info, resource_class, cal):
441 self._log = log
442 self._loop = loop
443 self._name = pool_info.name
444 self._pool_type = pool_info.pool_type
445 self._resource_type = pool_info.resource_type
446 self._cal = cal
447 self._resource_class = resource_class
448
449 self._max_size = pool_info.max_size
450
451 self._status = 'unlocked'
452 ### A Dictionary of all the resources in this pool, keyed by CAL resource-id
453 self._all_resources = {}
454 ### A List of free resources in this pool
455 self._free_resources = []
456 ### A Dictionary of all the allocated resources in this pool, keyed by CAL resource-id
457 self._allocated_resources = {}
458
459 @property
460 def name(self):
461 return self._name
462
463 @property
464 def cal(self):
465 """ This instance's ResourceMgrCALHandler """
466 return self._cal
467
468 @property
469 def pool_type(self):
470 return self._pool_type
471
472 @property
473 def resource_type(self):
474 return self._resource_type
475
476 @property
477 def max_size(self):
478 return self._max_size
479
480 @property
481 def status(self):
482 return self._status
483
484 def in_use(self):
485 if len(self._allocated_resources) != 0:
486 return True
487 else:
488 return False
489
490 def update_cal_handler(self, cal):
491 if self.in_use():
492 raise ResMgrPoolOperationFailed(
493 "Cannot update CAL plugin for in use pool"
494 )
495
496 self._cal = cal
497
498 def lock_pool(self):
499 self._log.info("Locking the pool :%s", self.name)
500 self._status = 'locked'
501
502 def unlock_pool(self):
503 self._log.info("Unlocking the pool :%s", self.name)
504 self._status = 'unlocked'
505
506 def add_resource(self, resource_info):
507 self._log.info("Adding static resource to Pool: %s, Resource-id: %s Resource-Type: %s",
508 self.name,
509 resource_info.resource_id,
510 self.resource_type)
511
512 ### Add static resources to pool
513 resource = self._resource_class(resource_info.resource_id, 'static')
514 assert resource.resource_id == resource_info.resource_id
515 self._all_resources[resource.resource_id] = resource
516 self._free_resources.append(resource)
517
518 def delete_resource(self, resource_id):
519 if resource_id not in self._all_resources:
520 self._log.error("Resource Id: %s not present in pool: %s. Delete operation failed", resource_id, self.name)
521 raise ResMgrUnknownResourceId("Resource Id: %s requested for release is not found" %(resource_id))
522
523 if resource_id in self._allocated_resources:
524 self._log.error("Resource Id: %s in use. Delete operation failed", resource_id)
525 raise ResMgrResourceIdBusy("Resource Id: %s requested for release is in use" %(resource_id))
526
527 self._log.info("Deleting resource: %s from pool: %s, Resource-Type",
528 resource_id,
529 self.name,
530 self.resource_type)
531
532 resource = self._all_resources.pop(resource_id)
533 self._free_resources.remove(resource)
534 resource.cleanup()
535 del resource
536
537 @asyncio.coroutine
538 def read_resource_info(self, resource_id):
539 if resource_id not in self._all_resources:
540 self._log.error("Resource Id: %s not present in pool: %s. Read operation failed", resource_id, self.name)
541 raise ResMgrUnknownResourceId("Resource Id: %s requested for read is not found" %(resource_id))
542
543 if resource_id not in self._allocated_resources:
544 self._log.error("Resource Id: %s not in use. Read operation failed", resource_id)
545 raise ResMgrResourceIdNotAllocated("Resource Id: %s not in use. Read operation failed" %(resource_id))
546
547 resource = self._allocated_resources[resource_id]
548 resource_info = yield from self.get_resource_info(resource)
549 return resource_info
550
551 def get_pool_info(self):
552 info = RwResourceMgrYang.YangData_RwProject_Project_ResourcePoolRecords_CloudAccount_Records()
553 self._log.info("Providing info for pool: %s", self.name)
554 info.name = self.name
555 if self.pool_type:
556 info.pool_type = self.pool_type
557 if self.resource_type:
558 info.resource_type = self.resource_type
559 if self.status:
560 info.pool_status = self.status
561
562 info.total_resources = len(self._all_resources)
563 info.free_resources = len(self._free_resources)
564 info.allocated_resources = len(self._allocated_resources)
565 return info
566
567 def cleanup(self):
568 for _, v in self._all_resources.items():
569 v.cleanup()
570
571 @asyncio.coroutine
572 def _allocate_static_resource(self, request, resource_type):
573 unit_type = {'compute': 'VDU', 'network':'VirtualLink'}
574 match_found = False
575 resource = None
576 self._log.info("Doing resource match from pool :%s", self._free_resources)
577 for resource in self._free_resources:
578 resource_info = yield from self.get_resource_info(resource)
579 self._log.info("Attempting to match %s-requirements for %s: %s with resource-id :%s",
580 resource_type, unit_type[resource_type],request.name, resource.resource_id)
581 if self.match_epa_params(resource_info, request):
582 if self.match_image_params(resource_info, request):
583 match_found = True
584 self._log.info("%s-requirements matched for %s: %s with resource-id :%s",
585 resource_type, unit_type[resource_type],request.name, resource.resource_id)
586 yield from self.initialize_resource_in_cal(resource, request)
587 break
588
589 if not match_found:
590 self._log.error("No match found for %s-requirements for %s: %s in pool: %s. %s instantiation failed",
591 resource_type,
592 unit_type[resource_type],
593 request.name,
594 self.name,
595 unit_type[resource_type])
596 return None
597 else:
598 ### Move resource from free-list into allocated-list
599 self._log.info("Allocating the static resource with resource-id: %s for %s: %s",
600 resource.resource_id,
601 unit_type[resource_type],request.name)
602 self._free_resources.remove(resource)
603 self._allocated_resources[resource.resource_id] = resource
604
605 return resource
606
607 @asyncio.coroutine
608 def allocate_resource(self, request):
609 resource = yield from self.allocate_resource_in_cal(request)
610 resource_info = yield from self.get_resource_info(resource)
611 return resource.resource_id, resource_info
612
613 @asyncio.coroutine
614 def release_resource(self, resource_id):
615 self._log.debug("Releasing resource_id %s in pool %s", resource_id, self.name)
616 if resource_id not in self._allocated_resources:
617 self._log.error("Failed to release a resource with resource-id: %s in pool: %s. Resource not known",
618 resource_id,
619 self.name)
620 raise ResMgrUnknownResourceId("Failed to release resource with resource-id: %s. Unknown resource-id" %(resource_id))
621
622 ### Get resource object
623 resource = self._allocated_resources.pop(resource_id)
624 yield from self.uninitialize_resource_in_cal(resource)
625 yield from self.release_cal_resource(resource)
626
627
628 class NetworkPool(ResourcePool):
629 def __init__(self, log, loop, pool_info, cal):
630 super(NetworkPool, self).__init__(log, loop, pool_info, NetworkResource, cal)
631
632 @asyncio.coroutine
633 def allocate_resource_in_cal(self, request):
634 resource = None
635 if self.pool_type == 'static':
636 self._log.info("Attempting network resource allocation from static pool: %s", self.name)
637 ### Attempt resource allocation from static pool
638 resource = yield from self._allocate_static_resource(request, 'network')
639 elif self.pool_type == 'dynamic':
640 ### Attempt resource allocation from dynamic pool
641 self._log.info("Attempting network resource allocation from dynamic pool: %s", self.name)
642 if len(self._free_resources) != 0:
643 self._log.info("Dynamic pool: %s has %d static resources, Attempting resource allocation from static resources",
644 self.name, len(self._free_resources))
645 resource = yield from self._allocate_static_resource(request, 'network')
646 if resource is None:
647 self._log.info("Could not resource from static resources. Going for dynamic resource allocation")
648 ## Not static resource available. Attempt dynamic resource from pool
649 resource = yield from self.allocate_dynamic_resource(request)
650 if resource is None:
651 raise ResMgrNoResourcesAvailable("No matching resource available for allocation from pool: %s" %(self.name))
652 return resource
653
654 @asyncio.coroutine
655 def allocate_dynamic_resource(self, request):
656 resource_type, resource_id = yield from self._cal.create_virtual_network(request)
657 # Removing the following check (RIFT-15144 MANO fails to attach to existing VIM network)
658 #if resource_id in self._all_resources:
659 # self._log.error("Resource with id %s name %s of type %s is already used", resource_id, request.name, resource_type)
660 # raise ResMgrNoResourcesAvailable("Resource with name %s of type network is already used" %(resource_id))
661 resource = self._resource_class(resource_id, resource_type, request)
662 self._all_resources[resource_id] = resource
663 self._allocated_resources[resource_id] = resource
664 self._log.info("Successfully allocated virtual-network resource from CAL with resource-id: %s resource type %s", resource_id, resource_type)
665 return resource
666
667 @asyncio.coroutine
668 def release_cal_resource(self, resource):
669 if resource.resource_type == 'dynamic':
670 self._log.debug("Deleting virtual network with network_id: %s", resource.resource_id)
671 yield from self._cal.delete_virtual_network(resource.resource_id)
672 self._all_resources.pop(resource.resource_id)
673 self._log.info("Successfully released virtual-network resource in CAL with resource-id: %s", resource.resource_id)
674 elif resource.resource_type == 'precreated':
675 self._all_resources.pop(resource.resource_id)
676 self._log.info("Successfully removed precreated virtual-network resource from allocated list: %s", resource.resource_id)
677 else:
678 self._log.info("Successfully released virtual-network resource with resource-id: %s into available-list", resource.resource_id)
679 self._free_resources.append(resource)
680
681 @asyncio.coroutine
682 def get_resource_info(self, resource):
683 info = yield from self._cal.get_virtual_network_info(resource.resource_id)
684 self._log.info("Successfully retrieved virtual-network information from CAL with resource-id: %s. Info: %s",
685 resource.resource_id, str(info))
686 response = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VlinkEvent_VlinkEventData_ResourceInfo()
687 response.from_dict(info.as_dict())
688 response.pool_name = self.name
689 response.resource_state = 'active'
690 return response
691
692 @asyncio.coroutine
693 def get_info_by_id(self, resource_id):
694 info = yield from self._cal.get_virtual_network_info(resource_id)
695 self._log.info("Successfully retrieved virtual-network information from CAL with resource-id: %s. Info: %s",
696 resource_id, str(info))
697 return info
698
699 def match_image_params(self, resource_info, request_params):
700 return True
701
702 def match_epa_params(self, resource_info, request_params):
703 if not hasattr(request_params, 'provider_network'):
704 ### Its a match if nothing is requested
705 return True
706 else:
707 required = getattr(request_params, 'provider_network')
708
709 if not hasattr(resource_info, 'provider_network'):
710 ### Its no match
711 return False
712 else:
713 available = getattr(resource_info, 'provider_network')
714
715 self._log.debug("Matching Network EPA params. Required: %s, Available: %s", required, available)
716
717 if required.has_field('name') and required.name!= available.name:
718 self._log.debug("Provider Network mismatch. Required: %s, Available: %s",
719 required.name,
720 available.name)
721 return False
722
723 self._log.debug("Matching EPA params physical network name")
724
725 if required.has_field('physical_network') and required.physical_network != available.physical_network:
726 self._log.debug("Physical Network mismatch. Required: %s, Available: %s",
727 required.physical_network,
728 available.physical_network)
729 return False
730
731 self._log.debug("Matching EPA params overlay type")
732 if required.has_field('overlay_type') and required.overlay_type != available.overlay_type:
733 self._log.debug("Overlay type mismatch. Required: %s, Available: %s",
734 required.overlay_type,
735 available.overlay_type)
736 return False
737
738 self._log.debug("Matching EPA params SegmentationID")
739 if required.has_field('segmentation_id') and required.segmentation_id != available.segmentation_id:
740 self._log.debug("Segmentation-Id mismatch. Required: %s, Available: %s",
741 required.segmentation_id,
742 available.segmentation_id)
743 return False
744 return True
745
746 @asyncio.coroutine
747 def initialize_resource_in_cal(self, resource, request):
748 pass
749
750 @asyncio.coroutine
751 def uninitialize_resource_in_cal(self, resource):
752 pass
753
754
755 class ComputePool(ResourcePool):
756 def __init__(self, log, loop, pool_info, cal):
757 super(ComputePool, self).__init__(log, loop, pool_info, ComputeResource, cal)
758
759 @asyncio.coroutine
760 def allocate_resource_in_cal(self, request):
761 resource = None
762 if self.pool_type == 'static':
763 self._log.info("Attempting compute resource allocation from static pool: %s", self.name)
764 ### Attempt resource allocation from static pool
765 resource = yield from self._allocate_static_resource(request, 'compute')
766 elif self.pool_type == 'dynamic':
767 ### Attempt resource allocation from dynamic pool
768 self._log.info("Attempting compute resource allocation from dynamic pool: %s", self.name)
769 if len(self._free_resources) != 0:
770 self._log.info("Dynamic pool: %s has %d static resources, Attempting resource allocation from static resources",
771 len(self._free_resources),
772 self.name)
773 resource = yield from self._allocate_static_resource(request, 'compute')
774 if resource is None:
775 self._log.info("Attempting for dynamic resource allocation")
776 resource = yield from self.allocate_dynamic_resource(request)
777 if resource is None:
778 raise ResMgrNoResourcesAvailable("No matching resource available for allocation from pool: %s" %(self.name))
779
780 requested_params = RwcalYang.YangData_RwProject_Project_VduInitParams()
781 requested_params.from_dict(request.as_dict())
782 resource.requested_params = requested_params
783 return resource
784
785 @asyncio.coroutine
786 def allocate_dynamic_resource(self, request):
787 #request.flavor_id = yield from self.select_resource_flavor(request)
788 resource_id = yield from self._cal.create_virtual_compute(request)
789 resource = self._resource_class(resource_id, 'dynamic', request)
790 self._all_resources[resource_id] = resource
791 self._allocated_resources[resource_id] = resource
792 self._log.info("Successfully allocated virtual-compute resource from CAL with resource-id: %s", resource_id)
793 return resource
794
795 @asyncio.coroutine
796 def release_cal_resource(self, resource):
797 if hasattr(resource, 'requested_params'):
798 delattr(resource, 'requested_params')
799 if resource.resource_type == 'dynamic':
800 yield from self._cal.delete_virtual_compute(resource.resource_id)
801 self._all_resources.pop(resource.resource_id)
802 self._log.info("Successfully released virtual-compute resource in CAL with resource-id: %s", resource.resource_id)
803 else:
804 self._log.info("Successfully released virtual-compute resource with resource-id: %s into available-list", resource.resource_id)
805 self._free_resources.append(resource)
806
807 @asyncio.coroutine
808 def get_resource_info(self, resource):
809 mgmt_network = ""
810 if resource.request.mgmt_network is not None:
811 mgmt_network = resource.request.mgmt_network
812 info = yield from self._cal.get_virtual_compute_info(resource.resource_id, mgmt_network=mgmt_network)
813
814 self._log.info("Successfully retrieved virtual-compute information from CAL with resource-id: %s. Info: %s",
815 resource.resource_id, str(info))
816 response = RwResourceMgrYang.YangData_RwProject_Project_ResourceMgmt_VduEvent_VduEventData_ResourceInfo()
817 response.from_dict(info.as_dict())
818 response.pool_name = self.name
819 response.resource_state = self._get_resource_state(info, resource.requested_params)
820 return response
821
822 @asyncio.coroutine
823 def get_info_by_id(self, resource_id):
824 info = yield from self._cal.get_virtual_compute_info(resource_id)
825 self._log.info("Successfully retrieved virtual-compute information from CAL with resource-id: %s. Info: %s",
826 resource_id, str(info))
827 return info
828
829 def _get_resource_state(self, resource_info, requested_params):
830
831
832 def conn_pts_len_equal():
833 # if explicit mgmt network is defined then the allocated ports might
834 # one more than the expected.
835 allocated_ports = len(resource_info.connection_points)
836 requested_ports = len(requested_params.connection_points)
837
838 if not requested_params.mgmt_network:
839 allocated_ports -= 1
840
841 return allocated_ports == requested_ports
842
843 if resource_info.state == 'failed':
844 self._log.error("<Compute-Resource: %s> Reached failed state.",
845 resource_info.name)
846 self._log.error("<Compute-Resource: {}> info at the time of failure: {}".format(resource_info.name, str(resource_info)))
847 return 'failed'
848
849 if resource_info.state != 'active':
850 self._log.info("<Compute-Resource: %s> Not reached active state.",
851 resource_info.name)
852 return 'pending'
853
854 if not resource_info.has_field('management_ip') or resource_info.management_ip == '':
855 self._log.info("<Compute-Resource: %s> Management IP not assigned.",
856 resource_info.name)
857 return 'pending'
858
859 if (requested_params.has_field('allocate_public_address')) and (requested_params.allocate_public_address == True):
860 if not resource_info.has_field('public_ip'):
861 self._log.warning("<Compute-Resource: %s> Management IP not assigned- waiting for public ip, %s",
862 resource_info.name, requested_params)
863 return 'pending'
864
865 if not conn_pts_len_equal():
866 self._log.warning("<Compute-Resource: %s> Waiting for requested number of ports to be assigned to virtual-compute, requested: %d, assigned: %d",
867 resource_info.name,
868 len(requested_params.connection_points),
869 len(resource_info.connection_points))
870 return 'pending'
871
872 #not_active = [c for c in resource_info.connection_points
873 # if c.state != 'active']
874
875 #if not_active:
876 # self._log.warning("<Compute-Resource: %s> Management IP not assigned- waiting for connection_points , %s",
877 # resource_info.name, resource_info)
878 # return 'pending'
879
880 ## Find the connection_points which are in active state but does not have IP address
881 no_address = [c for c in resource_info.connection_points
882 if (c.state == 'active') and (not c.has_field('ip_address'))]
883
884 if no_address:
885 self._log.warning("<Compute-Resource: %s> Management IP not assigned- waiting for connection_points , %s",
886 resource_info.name, resource_info)
887 return 'pending'
888
889 return 'active'
890
891 @asyncio.coroutine
892 def select_resource_flavor(self, request):
893 flavors = yield from self._cal.get_compute_flavor_info_list()
894 self._log.debug("Received %d flavor information from RW.CAL", len(flavors))
895 flavor_id = None
896 match_found = False
897 for flv in flavors:
898 self._log.info("Attempting to match compute requirement for VDU: %s with flavor %s",
899 request.name, flv)
900 if self.match_epa_params(flv, request):
901 self._log.info("Flavor match found for compute requirements for VDU: %s with flavor name: %s, flavor-id: %s",
902 request.name, flv.name, flv.id)
903 match_found = True
904 flavor_id = flv.id
905 break
906
907 if not match_found:
908 ### Check if CAL account allows dynamic flavor creation
909 if self._cal.dynamic_flavor_supported():
910 self._log.info("Attempting to create a new flavor for required compute-requirement for VDU: %s", request.name)
911 flavor_id = yield from self._cal.create_compute_flavor(request)
912 else:
913 ### No match with existing flavors and CAL does not support dynamic flavor creation
914 self._log.error("Unable to create flavor for compute requirement for VDU: %s. VDU instantiation failed", request.name)
915 raise ResMgrNoResourcesAvailable("No resource available with matching EPA attributes")
916 else:
917 ### Found flavor
918 self._log.info("Found flavor with id: %s for compute requirement for VDU: %s",
919 flavor_id, request.name)
920 return flavor_id
921
922 def _match_vm_flavor(self, required, available):
923 self._log.info("Matching VM Flavor attributes")
924 if available.vcpu_count != required.vcpu_count:
925 self._log.debug("VCPU requirement mismatch. Required: %d, Available: %d",
926 required.vcpu_count,
927 available.vcpu_count)
928 return False
929 if available.memory_mb != required.memory_mb:
930 self._log.debug("Memory requirement mismatch. Required: %d MB, Available: %d MB",
931 required.memory_mb,
932 available.memory_mb)
933 return False
934 if available.storage_gb != required.storage_gb:
935 self._log.debug("Storage requirement mismatch. Required: %d GB, Available: %d GB",
936 required.storage_gb,
937 available.storage_gb)
938 return False
939 self._log.debug("VM Flavor match found")
940 return True
941
942 def _match_guest_epa(self, required, available):
943 self._log.info("Matching Guest EPA attributes")
944 if required.has_field('pcie_device'):
945 self._log.debug("Matching pcie_device")
946 if available.has_field('pcie_device') == False:
947 self._log.debug("Matching pcie_device failed. Not available in flavor")
948 return False
949 else:
950 for dev in required.pcie_device:
951 if not [ d for d in available.pcie_device
952 if ((d.device_id == dev.device_id) and (d.count == dev.count)) ]:
953 self._log.debug("Matching pcie_device failed. Required: %s, Available: %s", required.pcie_device, available.pcie_device)
954 return False
955 elif available.has_field('pcie_device'):
956 self._log.debug("Rejecting available flavor because pcie_device not required but available")
957 return False
958
959
960 if required.has_field('mempage_size'):
961 self._log.debug("Matching mempage_size")
962 if available.has_field('mempage_size') == False:
963 self._log.debug("Matching mempage_size failed. Not available in flavor")
964 return False
965 else:
966 if required.mempage_size != available.mempage_size:
967 self._log.debug("Matching mempage_size failed. Required: %s, Available: %s", required.mempage_size, available.mempage_size)
968 return False
969 elif available.has_field('mempage_size'):
970 self._log.debug("Rejecting available flavor because mempage_size not required but available")
971 return False
972
973 if required.has_field('cpu_pinning_policy'):
974 self._log.debug("Matching cpu_pinning_policy")
975 if required.cpu_pinning_policy != 'ANY':
976 if available.has_field('cpu_pinning_policy') == False:
977 self._log.debug("Matching cpu_pinning_policy failed. Not available in flavor")
978 return False
979 else:
980 if required.cpu_pinning_policy != available.cpu_pinning_policy:
981 self._log.debug("Matching cpu_pinning_policy failed. Required: %s, Available: %s", required.cpu_pinning_policy, available.cpu_pinning_policy)
982 return False
983 elif available.has_field('cpu_pinning_policy'):
984 self._log.debug("Rejecting available flavor because cpu_pinning_policy not required but available")
985 return False
986
987 if required.has_field('cpu_thread_pinning_policy'):
988 self._log.debug("Matching cpu_thread_pinning_policy")
989 if available.has_field('cpu_thread_pinning_policy') == False:
990 self._log.debug("Matching cpu_thread_pinning_policy failed. Not available in flavor")
991 return False
992 else:
993 if required.cpu_thread_pinning_policy != available.cpu_thread_pinning_policy:
994 self._log.debug("Matching cpu_thread_pinning_policy failed. Required: %s, Available: %s", required.cpu_thread_pinning_policy, available.cpu_thread_pinning_policy)
995 return False
996 elif available.has_field('cpu_thread_pinning_policy'):
997 self._log.debug("Rejecting available flavor because cpu_thread_pinning_policy not required but available")
998 return False
999
1000 if required.has_field('trusted_execution'):
1001 self._log.debug("Matching trusted_execution")
1002 if required.trusted_execution == True:
1003 if available.has_field('trusted_execution') == False:
1004 self._log.debug("Matching trusted_execution failed. Not available in flavor")
1005 return False
1006 else:
1007 if required.trusted_execution != available.trusted_execution:
1008 self._log.debug("Matching trusted_execution failed. Required: %s, Available: %s", required.trusted_execution, available.trusted_execution)
1009 return False
1010 elif available.has_field('trusted_execution'):
1011 self._log.debug("Rejecting available flavor because trusted_execution not required but available")
1012 return False
1013
1014 if required.has_field('numa_node_policy'):
1015 self._log.debug("Matching numa_node_policy")
1016 if available.has_field('numa_node_policy') == False:
1017 self._log.debug("Matching numa_node_policy failed. Not available in flavor")
1018 return False
1019 else:
1020 if required.numa_node_policy.has_field('node_cnt'):
1021 self._log.debug("Matching numa_node_policy node_cnt")
1022 if available.numa_node_policy.has_field('node_cnt') == False:
1023 self._log.debug("Matching numa_node_policy node_cnt failed. Not available in flavor")
1024 return False
1025 else:
1026 if required.numa_node_policy.node_cnt != available.numa_node_policy.node_cnt:
1027 self._log.debug("Matching numa_node_policy node_cnt failed. Required: %s, Available: %s",required.numa_node_policy.node_cnt, available.numa_node_policy.node_cnt)
1028 return False
1029 elif available.numa_node_policy.has_field('node_cnt'):
1030 self._log.debug("Rejecting available flavor because numa node count not required but available")
1031 return False
1032
1033 if required.numa_node_policy.has_field('mem_policy'):
1034 self._log.debug("Matching numa_node_policy mem_policy")
1035 if available.numa_node_policy.has_field('mem_policy') == False:
1036 self._log.debug("Matching numa_node_policy mem_policy failed. Not available in flavor")
1037 return False
1038 else:
1039 if required.numa_node_policy.mem_policy != available.numa_node_policy.mem_policy:
1040 self._log.debug("Matching numa_node_policy mem_policy failed. Required: %s, Available: %s", required.numa_node_policy.mem_policy, available.numa_node_policy.mem_policy)
1041 return False
1042 elif available.numa_node_policy.has_field('mem_policy'):
1043 self._log.debug("Rejecting available flavor because num node mem_policy not required but available")
1044 return False
1045
1046 if required.numa_node_policy.has_field('node'):
1047 self._log.debug("Matching numa_node_policy nodes configuration")
1048 if available.numa_node_policy.has_field('node') == False:
1049 self._log.debug("Matching numa_node_policy nodes configuration failed. Not available in flavor")
1050 return False
1051 for required_node in required.numa_node_policy.node:
1052 self._log.debug("Matching numa_node_policy nodes configuration for node %s", required_node)
1053 numa_match = False
1054 for available_node in available.numa_node_policy.node:
1055 if required_node.id != available_node.id:
1056 self._log.debug("Matching numa_node_policy nodes configuration failed. Required: %s, Available: %s", required_node, available_node)
1057 continue
1058 if required_node.vcpu != available_node.vcpu:
1059 self._log.debug("Matching numa_node_policy nodes configuration failed. Required: %s, Available: %s", required_node, available_node)
1060 continue
1061 if required_node.memory_mb != available_node.memory_mb:
1062 self._log.debug("Matching numa_node_policy nodes configuration failed. Required: %s, Available: %s", required_node, available_node)
1063 continue
1064 numa_match = True
1065 if numa_match == False:
1066 return False
1067 elif available.numa_node_policy.has_field('node'):
1068 self._log.debug("Rejecting available flavor because numa nodes not required but available")
1069 return False
1070 elif available.has_field('numa_node_policy'):
1071 self._log.debug("Rejecting available flavor because numa_node_policy not required but available")
1072 return False
1073 self._log.info("Successful match for Guest EPA attributes")
1074 return True
1075
1076 def _match_vswitch_epa(self, required, available):
1077 self._log.debug("VSwitch EPA match found")
1078 return True
1079
1080 def _match_hypervisor_epa(self, required, available):
1081 self._log.debug("Hypervisor EPA match found")
1082 return True
1083
1084 def _match_host_epa(self, required, available):
1085 self._log.info("Matching Host EPA attributes")
1086 if required.has_field('cpu_model'):
1087 self._log.debug("Matching CPU model")
1088 if available.has_field('cpu_model') == False:
1089 self._log.debug("Matching CPU model failed. Not available in flavor")
1090 return False
1091 else:
1092 #### Convert all PREFER to REQUIRE since flavor will only have REQUIRE attributes
1093 if required.cpu_model.replace('PREFER', 'REQUIRE') != available.cpu_model:
1094 self._log.debug("Matching CPU model failed. Required: %s, Available: %s", required.cpu_model, available.cpu_model)
1095 return False
1096 elif available.has_field('cpu_model'):
1097 self._log.debug("Rejecting available flavor because cpu_model not required but available")
1098 return False
1099
1100 if required.has_field('cpu_arch'):
1101 self._log.debug("Matching CPU architecture")
1102 if available.has_field('cpu_arch') == False:
1103 self._log.debug("Matching CPU architecture failed. Not available in flavor")
1104 return False
1105 else:
1106 #### Convert all PREFER to REQUIRE since flavor will only have REQUIRE attributes
1107 if required.cpu_arch.replace('PREFER', 'REQUIRE') != available.cpu_arch:
1108 self._log.debug("Matching CPU architecture failed. Required: %s, Available: %s", required.cpu_arch, available.cpu_arch)
1109 return False
1110 elif available.has_field('cpu_arch'):
1111 self._log.debug("Rejecting available flavor because cpu_arch not required but available")
1112 return False
1113
1114 if required.has_field('cpu_vendor'):
1115 self._log.debug("Matching CPU vendor")
1116 if available.has_field('cpu_vendor') == False:
1117 self._log.debug("Matching CPU vendor failed. Not available in flavor")
1118 return False
1119 else:
1120 #### Convert all PREFER to REQUIRE since flavor will only have REQUIRE attributes
1121 if required.cpu_vendor.replace('PREFER', 'REQUIRE') != available.cpu_vendor:
1122 self._log.debug("Matching CPU vendor failed. Required: %s, Available: %s", required.cpu_vendor, available.cpu_vendor)
1123 return False
1124 elif available.has_field('cpu_vendor'):
1125 self._log.debug("Rejecting available flavor because cpu_vendor not required but available")
1126 return False
1127
1128 if required.has_field('cpu_socket_count'):
1129 self._log.debug("Matching CPU socket count")
1130 if available.has_field('cpu_socket_count') == False:
1131 self._log.debug("Matching CPU socket count failed. Not available in flavor")
1132 return False
1133 else:
1134 if required.cpu_socket_count != available.cpu_socket_count:
1135 self._log.debug("Matching CPU socket count failed. Required: %s, Available: %s", required.cpu_socket_count, available.cpu_socket_count)
1136 return False
1137 elif available.has_field('cpu_socket_count'):
1138 self._log.debug("Rejecting available flavor because cpu_socket_count not required but available")
1139 return False
1140
1141 if required.has_field('cpu_core_count'):
1142 self._log.debug("Matching CPU core count")
1143 if available.has_field('cpu_core_count') == False:
1144 self._log.debug("Matching CPU core count failed. Not available in flavor")
1145 return False
1146 else:
1147 if required.cpu_core_count != available.cpu_core_count:
1148 self._log.debug("Matching CPU core count failed. Required: %s, Available: %s", required.cpu_core_count, available.cpu_core_count)
1149 return False
1150 elif available.has_field('cpu_core_count'):
1151 self._log.debug("Rejecting available flavor because cpu_core_count not required but available")
1152 return False
1153
1154 if required.has_field('cpu_core_thread_count'):
1155 self._log.debug("Matching CPU core thread count")
1156 if available.has_field('cpu_core_thread_count') == False:
1157 self._log.debug("Matching CPU core thread count failed. Not available in flavor")
1158 return False
1159 else:
1160 if required.cpu_core_thread_count != available.cpu_core_thread_count:
1161 self._log.debug("Matching CPU core thread count failed. Required: %s, Available: %s", required.cpu_core_thread_count, available.cpu_core_thread_count)
1162 return False
1163 elif available.has_field('cpu_core_thread_count'):
1164 self._log.debug("Rejecting available flavor because cpu_core_thread_count not required but available")
1165 return False
1166
1167 if required.has_field('cpu_feature'):
1168 self._log.debug("Matching CPU feature list")
1169 if available.has_field('cpu_feature') == False:
1170 self._log.debug("Matching CPU feature list failed. Not available in flavor")
1171 return False
1172 else:
1173 for feature in required.cpu_feature:
1174 if feature not in available.cpu_feature:
1175 self._log.debug("Matching CPU feature list failed. Required feature: %s is not present. Available features: %s", feature, available.cpu_feature)
1176 return False
1177 elif available.has_field('cpu_feature'):
1178 self._log.debug("Rejecting available flavor because cpu_feature not required but available")
1179 return False
1180 self._log.info("Successful match for Host EPA attributes")
1181 return True
1182
1183
1184 def _match_placement_group_inputs(self, required, available):
1185 self._log.info("Matching Host aggregate attributes")
1186
1187 if not required and not available:
1188 # Host aggregate not required and not available => success
1189 self._log.info("Successful match for Host Aggregate attributes")
1190 return True
1191 if required and available:
1192 # Host aggregate requested and available => Do a match and decide
1193 xx = [ x.as_dict() for x in required ]
1194 yy = [ y.as_dict() for y in available ]
1195 for i in xx:
1196 if i not in yy:
1197 self._log.debug("Rejecting available flavor because host Aggregate mismatch. Required: %s, Available: %s ", required, available)
1198 return False
1199 self._log.info("Successful match for Host Aggregate attributes")
1200 return True
1201 else:
1202 # Either of following conditions => Failure
1203 # - Host aggregate required but not available
1204 # - Host aggregate not required but available
1205 self._log.debug("Rejecting available flavor because host Aggregate mismatch. Required: %s, Available: %s ", required, available)
1206 return False
1207
1208
1209 def match_image_params(self, resource_info, request_params):
1210 return True
1211
1212 def match_epa_params(self, resource_info, request_params):
1213 result = self._match_vm_flavor(getattr(request_params, 'vm_flavor'),
1214 getattr(resource_info, 'vm_flavor'))
1215 if result == False:
1216 self._log.debug("VM Flavor mismatched")
1217 return False
1218
1219 result = self._match_guest_epa(getattr(request_params, 'guest_epa'),
1220 getattr(resource_info, 'guest_epa'))
1221 if result == False:
1222 self._log.debug("Guest EPA mismatched")
1223 return False
1224
1225 result = self._match_vswitch_epa(getattr(request_params, 'vswitch_epa'),
1226 getattr(resource_info, 'vswitch_epa'))
1227 if result == False:
1228 self._log.debug("Vswitch EPA mismatched")
1229 return False
1230
1231 result = self._match_hypervisor_epa(getattr(request_params, 'hypervisor_epa'),
1232 getattr(resource_info, 'hypervisor_epa'))
1233 if result == False:
1234 self._log.debug("Hypervisor EPA mismatched")
1235 return False
1236
1237 result = self._match_host_epa(getattr(request_params, 'host_epa'),
1238 getattr(resource_info, 'host_epa'))
1239 if result == False:
1240 self._log.debug("Host EPA mismatched")
1241 return False
1242
1243 result = self._match_placement_group_inputs(getattr(request_params, 'host_aggregate'),
1244 getattr(resource_info, 'host_aggregate'))
1245
1246 if result == False:
1247 self._log.debug("Host Aggregate mismatched")
1248 return False
1249
1250 return True
1251
1252 @asyncio.coroutine
1253 def initialize_resource_in_cal(self, resource, request):
1254 self._log.info("Initializing the compute-resource with id: %s in RW.CAL", resource.resource_id)
1255 modify_params = RwcalYang.YangData_RwProject_Project_VduModifyParams()
1256 modify_params.vdu_id = resource.resource_id
1257 modify_params.image_id = request.image_id
1258
1259 for c_point in request.connection_points:
1260 self._log.debug("Adding connection point for VDU: %s to virtual-compute with id: %s Connection point Name: %s",
1261 request.name,resource.resource_id,c_point.name)
1262 point = modify_params.connection_points_add.add()
1263 point.name = c_point.name
1264 point.virtual_link_id = c_point.virtual_link_id
1265 yield from self._cal.modify_virtual_compute(modify_params)
1266
1267 @asyncio.coroutine
1268 def uninitialize_resource_in_cal(self, resource):
1269 self._log.info("Un-initializing the compute-resource with id: %s in RW.CAL", resource.resource_id)
1270 modify_params = RwcalYang.YangData_RwProject_Project_VduModifyParams()
1271 modify_params.vdu_id = resource.resource_id
1272 resource_info = yield from self.get_resource_info(resource)
1273 for c_point in resource_info.connection_points:
1274 self._log.debug("Removing connection point: %s from VDU: %s ",
1275 c_point.name,resource_info.name)
1276 point = modify_params.connection_points_remove.add()
1277 point.connection_point_id = c_point.connection_point_id
1278 yield from self._cal.modify_virtual_compute(modify_params)
1279
1280
1281 class ResourceMgrCore(object):
1282 def __init__(self, dts, log, log_hdl, loop, parent):
1283 self._log = log
1284 self._log_hdl = log_hdl
1285 self._dts = dts
1286 self._loop = loop
1287 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
1288 self._parent = parent
1289 self._cloud_cals = {}
1290 # Dictionary of pool objects keyed by name
1291 self._cloud_pool_table = {}
1292 # Dictionary of tuples (resource_id, cloud_account_name, pool_name) keyed by event_id
1293 self._resource_table = {}
1294 self._pool_class = {'compute': ComputePool,
1295 'network': NetworkPool}
1296
1297 def _get_cloud_pool_table(self, cloud_account_name):
1298 if cloud_account_name not in self._cloud_pool_table:
1299 msg = "Cloud account %s not found" % cloud_account_name
1300 self._log.error(msg)
1301 raise ResMgrCloudAccountNotFound(msg)
1302
1303 return self._cloud_pool_table[cloud_account_name]
1304
1305 def _get_cloud_cal_plugin(self, cloud_account_name):
1306 if cloud_account_name not in self._cloud_cals:
1307 msg = "Cloud account %s not found" % cloud_account_name
1308 self._log.error(msg)
1309 raise ResMgrCloudAccountNotFound(msg)
1310
1311 return self._cloud_cals[cloud_account_name]
1312
1313 def _add_default_cloud_pools(self, cloud_account_name):
1314 self._log.debug("Adding default compute and network pools for cloud account %s",
1315 cloud_account_name)
1316 default_pools = [
1317 {
1318 'name': '____default_compute_pool',
1319 'resource_type': 'compute',
1320 'pool_type': 'dynamic',
1321 'max_size': 128,
1322 },
1323 {
1324 'name': '____default_network_pool',
1325 'resource_type': 'network',
1326 'pool_type': 'dynamic',
1327 'max_size': 128,
1328 },
1329 ]
1330
1331 for pool_dict in default_pools:
1332 pool_info = ResourcePoolInfo.from_dict(pool_dict)
1333 self._log.info("Applying configuration for cloud account %s pool: %s",
1334 cloud_account_name, pool_info.name)
1335
1336 self.add_resource_pool(cloud_account_name, pool_info)
1337 self.unlock_resource_pool(cloud_account_name, pool_info.name)
1338
1339 def get_cloud_account_names(self):
1340 """ Returns a list of configured cloud account names """
1341 return self._cloud_cals.keys()
1342
1343 def get_cloud_account_detail(self, account_name):
1344 """ Returns the cloud detail message"""
1345 cloud_account = self._cloud_cals[account_name]
1346 return cloud_account.get_cloud_account()
1347
1348 def add_cloud_account(self, account):
1349 self._log.debug("Received CAL account. Account Name: %s, Account Type: %s",
1350 account.name, account.account_type)
1351
1352 ### Add cal handler to all the pools
1353 if account.name in self._cloud_cals:
1354 raise ResMgrCloudAccountExists("Cloud account already exists in res mgr: %s",
1355 account.name)
1356
1357 self._cloud_pool_table[account.name] = {}
1358
1359 cal = ResourceMgrCALHandler(self._loop, self._executor, self._log, self._log_hdl, account)
1360 self._cloud_cals[account.name] = cal
1361
1362 self._add_default_cloud_pools(account.name)
1363
1364 def update_cloud_account(self, account):
1365 raise NotImplementedError("Update cloud account not implemented")
1366
1367 def delete_cloud_account(self, account_name, dry_run=False):
1368 cloud_pool_table = self._get_cloud_pool_table(account_name)
1369 for pool in cloud_pool_table.values():
1370 if pool.in_use():
1371 raise ResMgrCloudAccountInUse("Cannot delete cloud which is currently in use")
1372
1373 # If dry_run is specified, do not actually delete the cloud account
1374 if dry_run:
1375 return
1376
1377 for pool in list(cloud_pool_table):
1378 self.delete_resource_pool(account_name, pool)
1379
1380 del self._cloud_pool_table[account_name]
1381 del self._cloud_cals[account_name]
1382
1383 def add_resource_pool(self, cloud_account_name, pool_info):
1384 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1385 if pool_info.name in cloud_pool_table:
1386 raise ResMgrDuplicatePool("Pool with name: %s already exists", pool_info.name)
1387
1388 cloud_cal = self._get_cloud_cal_plugin(cloud_account_name)
1389 pool = self._pool_class[pool_info.resource_type](self._log, self._loop, pool_info, cloud_cal)
1390
1391 cloud_pool_table[pool_info.name] = pool
1392
1393 def delete_resource_pool(self, cloud_account_name, pool_name):
1394 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1395 if pool_name not in cloud_pool_table:
1396 self._log.error("Pool: %s not found for deletion", pool_name)
1397 return
1398 pool = cloud_pool_table[pool_name]
1399
1400 if pool.in_use():
1401 # Can't delete a pool in use
1402 self._log.error("Pool: %s in use. Can not delete in-use pool", pool.name)
1403 return
1404
1405 pool.cleanup()
1406 del cloud_pool_table[pool_name]
1407 self._log.info("Resource Pool: %s successfully deleted", pool_name)
1408
1409 def modify_resource_pool(self, cloud_account_name, pool):
1410 pass
1411
1412 def lock_resource_pool(self, cloud_account_name, pool_name):
1413 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1414 if pool_name not in cloud_pool_table:
1415 self._log.info("Pool: %s is not available for lock operation")
1416 return
1417
1418 pool = cloud_pool_table[pool_name]
1419 pool.lock_pool()
1420
1421 def unlock_resource_pool(self, cloud_account_name, pool_name):
1422 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1423 if pool_name not in cloud_pool_table:
1424 self._log.info("Pool: %s is not available for unlock operation")
1425 return
1426
1427 pool = cloud_pool_table[pool_name]
1428 pool.unlock_pool()
1429
1430 def get_resource_pool_info(self, cloud_account_name, pool_name):
1431 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1432 if pool_name in cloud_pool_table:
1433 pool = cloud_pool_table[pool_name]
1434 return pool.get_pool_info()
1435 else:
1436 return None
1437
1438 def get_resource_pool_list(self, cloud_account_name):
1439 return [v for _, v in self._get_cloud_pool_table(cloud_account_name).items()]
1440
1441 def _select_resource_pools(self, cloud_account_name, resource_type):
1442 pools = [pool for pool in self.get_resource_pool_list(cloud_account_name) if pool.resource_type == resource_type and pool.status == 'unlocked']
1443 if not pools:
1444 raise ResMgrPoolNotAvailable("No %s pool found for resource allocation", resource_type)
1445
1446 return pools[0]
1447
1448 @asyncio.coroutine
1449 def allocate_virtual_resource(self, event_id, cloud_account_name, request, resource_type):
1450 ### Check if event_id is unique or already in use
1451 if event_id in self._resource_table:
1452 r_id, cloud_account_name, pool_name = self._resource_table[event_id]
1453 self._log.warning("Requested event-id :%s for resource-allocation already active with pool: %s",
1454 event_id, pool_name)
1455 # If resource-type matches then return the same resource
1456 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1457 pool = cloud_pool_table[pool_name]
1458 if pool.resource_type == resource_type:
1459
1460 info = yield from pool.read_resource_info(r_id)
1461 return info
1462 else:
1463 self._log.error("Event-id conflict. Duplicate event-id: %s", event_id)
1464 raise ResMgrDuplicateEventId("Requested event-id :%s already active with pool: %s" %(event_id, pool_name))
1465
1466 ### All-OK, lets go ahead with resource allocation
1467 pool = self._select_resource_pools(cloud_account_name, resource_type)
1468 self._log.info("Selected pool %s for resource allocation", pool.name)
1469
1470 r_id, r_info = yield from pool.allocate_resource(request)
1471
1472 self._resource_table[event_id] = (r_id, cloud_account_name, pool.name)
1473 return r_info
1474
1475 @asyncio.coroutine
1476 def reallocate_virtual_resource(self, event_id, cloud_account_name, request, resource_type, resource):
1477 ### Check if event_id is unique or already in use
1478 if event_id in self._resource_table:
1479 r_id, cloud_account_name, pool_name = self._resource_table[event_id]
1480 self._log.warning("Requested event-id :%s for resource-allocation already active with pool: %s",
1481 event_id, pool_name)
1482 # If resource-type matches then return the same resource
1483 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1484 pool = cloud_pool_table[pool_name]
1485 if pool.resource_type == resource_type:
1486 info = yield from pool.read_resource_info(r_id)
1487 return info
1488 else:
1489 self._log.error("Event-id conflict. Duplicate event-id: %s", event_id)
1490 raise ResMgrDuplicateEventId("Requested event-id :%s already active with pool: %s" %(event_id, pool_name))
1491
1492 self._log.debug("Re-allocate virtual resource. resource type %s", resource_type)
1493 r_info = None
1494 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1495 pool = cloud_pool_table[resource.pool_name]
1496 if pool.resource_type == resource_type:
1497 if resource_type == 'network':
1498 r_id = resource.virtual_link_id
1499 r_info = yield from pool.get_info_by_id(resource.virtual_link_id)
1500 elif resource_type == 'compute':
1501 r_id = resource.vdu_id
1502 r_info = yield from pool.get_info_by_id(resource.vdu_id)
1503
1504 if r_info is None:
1505 r_id, r_info = yield from pool.allocate_resource(request)
1506 self._resource_table[event_id] = (r_id, cloud_account_name, resource.pool_name)
1507 return r_info
1508
1509 self._resource_table[event_id] = (r_id, cloud_account_name, resource.pool_name)
1510 new_resource = pool._resource_class(r_id, 'dynamic', request)
1511 if resource_type == 'compute':
1512 requested_params = RwcalYang.YangData_RwProject_Project_VduInitParams()
1513 requested_params.from_dict(request.as_dict())
1514 new_resource.requested_params = requested_params
1515 pool._all_resources[r_id] = new_resource
1516 pool._allocated_resources[r_id] = new_resource
1517 return r_info
1518
1519 @asyncio.coroutine
1520 def release_virtual_resource(self, event_id, resource_type):
1521 ### Check if event_id exists
1522 if event_id not in self._resource_table:
1523 self._log.error("Received resource-release-request with unknown Event-id :%s", event_id)
1524 raise ResMgrUnknownEventId("Received resource-release-request with unknown Event-id :%s" %(event_id))
1525
1526 ## All-OK, lets proceed with resource release
1527 r_id, cloud_account_name, pool_name = self._resource_table.pop(event_id)
1528 self._log.debug("Attempting to release virtual resource id %s from pool %s",
1529 r_id, pool_name)
1530
1531 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1532 pool = cloud_pool_table[pool_name]
1533 yield from pool.release_resource(r_id)
1534
1535 @asyncio.coroutine
1536 def read_virtual_resource(self, event_id, resource_type):
1537 ### Check if event_id exists
1538 if event_id not in self._resource_table:
1539 self._log.error("Received resource-read-request with unknown Event-id :%s", event_id)
1540 raise ResMgrUnknownEventId("Received resource-read-request with unknown Event-id :%s" %(event_id))
1541
1542 ## All-OK, lets proceed
1543 r_id, cloud_account_name, pool_name = self._resource_table[event_id]
1544 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1545 pool = cloud_pool_table[pool_name]
1546 info = yield from pool.read_resource_info(r_id)
1547 return info