161d5a49323bc6e436c788d2b88056135b8ceafe
[osm/SO.git] / rwlaunchpad / plugins / rwresmgr / rift / tasklets / rwresmgrtasklet / rwresmgr_core.py
1 #
2 # Copyright 2016 RIFT.IO Inc
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16
17 import uuid
18 import collections
19 import asyncio
20 import concurrent.futures
21
22 import gi
23 gi.require_version('RwDts', '1.0')
24 gi.require_version('RwYang', '1.0')
25 gi.require_version('RwResourceMgrYang', '1.0')
26 gi.require_version('RwLaunchpadYang', '1.0')
27 gi.require_version('RwcalYang', '1.0')
28 from gi.repository import (
29 RwDts as rwdts,
30 RwYang,
31 RwResourceMgrYang,
32 RwLaunchpadYang,
33 RwcalYang,
34 )
35
36 from gi.repository.RwTypes import RwStatus
37
38 class ResMgrCALNotPresent(Exception):
39 pass
40
41 class ResMgrCloudAccountNotFound(Exception):
42 pass
43
44 class ResMgrCloudAccountExists(Exception):
45 pass
46
47 class ResMgrCloudAccountInUse(Exception):
48 pass
49
50 class ResMgrDuplicatePool(Exception):
51 pass
52
53 class ResMgrPoolNotAvailable(Exception):
54 pass
55
56 class ResMgrPoolOperationFailed(Exception):
57 pass
58
59 class ResMgrDuplicateEventId(Exception):
60 pass
61
62 class ResMgrUnknownEventId(Exception):
63 pass
64
65 class ResMgrUnknownResourceId(Exception):
66 pass
67
68 class ResMgrResourceIdBusy(Exception):
69 pass
70
71 class ResMgrResourceIdNotAllocated(Exception):
72 pass
73
74 class ResMgrNoResourcesAvailable(Exception):
75 pass
76
77 class ResMgrResourcesInitFailed(Exception):
78 pass
79
80 class ResMgrCALOperationFailure(Exception):
81 pass
82
83
84
85 class ResourceMgrCALHandler(object):
86 def __init__(self, loop, executor, log, log_hdl, account):
87 self._log = log
88 self._loop = loop
89 self._executor = executor
90 self._account = account.cal_account_msg
91 self._rwcal = account.cal
92 if account.account_type == 'aws':
93 self._subnets = ["172.31.97.0/24", "172.31.98.0/24", "172.31.99.0/24", "172.31.100.0/24", "172.31.101.0/24"]
94 else:
95 self._subnets = ["11.0.0.0/24",
96 "12.0.0.0/24",
97 "13.0.0.0/24",
98 "14.0.0.0/24",
99 "15.0.0.0/24",
100 "16.0.0.0/24",
101 "17.0.0.0/24",
102 "18.0.0.0/24",
103 "19.0.0.0/24",
104 "20.0.0.0/24",
105 "21.0.0.0/24",
106 "22.0.0.0/24",]
107 self._subnet_ptr = 0
108
109 def _select_link_subnet(self):
110 subnet = self._subnets[self._subnet_ptr]
111 self._subnet_ptr += 1
112 if self._subnet_ptr == len(self._subnets):
113 self._subnet_ptr = 0
114 return subnet
115
116 @asyncio.coroutine
117 def create_virtual_network(self, req_params):
118 #rc, rsp = self._rwcal.get_virtual_link_list(self._account)
119 self._log.debug("Calling get_virtual_link_list API")
120 rc, rsp = yield from self._loop.run_in_executor(self._executor,
121 self._rwcal.get_virtual_link_list,
122 self._account)
123
124 assert rc == RwStatus.SUCCESS
125
126 links = [vlink for vlink in rsp.virtual_link_info_list if vlink.name == req_params.name]
127 if links:
128 self._log.debug("Found existing virtual-network with matching name in cloud. Reusing the virtual-network with id: %s" %(links[0].virtual_link_id))
129 return ('precreated', links[0].virtual_link_id)
130 elif req_params.vim_network_name:
131 self._log.error("Virtual-network-allocate operation failed for cloud account: %s Vim Network with name %s does not pre-exist",
132 self._account.name, req_params.vim_network_name)
133 raise ResMgrCALOperationFailure("Virtual-network allocate operation failed for cloud account: %s Vim Network with name %s does not pre-exist"
134 %(self._account.name, req_params.vim_network_name))
135
136 params = RwcalYang.VirtualLinkReqParams()
137 params.from_dict(req_params.as_dict())
138 params.subnet = self._select_link_subnet()
139 #rc, rs = self._rwcal.create_virtual_link(self._account, params)
140 self._log.debug("Calling create_virtual_link API with params: %s" %(str(req_params)))
141 rc, rs = yield from self._loop.run_in_executor(self._executor,
142 self._rwcal.create_virtual_link,
143 self._account,
144 params)
145 if rc.status != RwStatus.SUCCESS:
146 self._log.error("Virtual-network-allocate operation failed for cloud account: %s - error_msg: %s, Traceback: %s",
147 self._account.name, rc.error_msg, rc.traceback)
148 raise ResMgrCALOperationFailure("Virtual-network allocate operation failed for cloud account: %s (%s)"
149 %(self._account.name, rc.error_msg))
150
151 return ('dynamic',rs)
152
153 @asyncio.coroutine
154 def delete_virtual_network(self, network_id):
155 #rc = self._rwcal.delete_virtual_link(self._account, network_id)
156 self._log.debug("Calling delete_virtual_link API with id: %s" %(network_id))
157 rc = yield from self._loop.run_in_executor(self._executor,
158 self._rwcal.delete_virtual_link,
159 self._account,
160 network_id)
161 if rc != RwStatus.SUCCESS:
162 self._log.error("Virtual-network-release operation failed for cloud account: %s. ResourceID: %s",
163 self._account.name,
164 network_id)
165 raise ResMgrCALOperationFailure("Virtual-network release operation failed for cloud account: %s. ResourceId: %s" %(self._account.name, network_id))
166
167 @asyncio.coroutine
168 def get_virtual_network_info(self, network_id):
169 #rc, rs = self._rwcal.get_virtual_link(self._account, network_id)
170 self._log.debug("Calling get_virtual_link_info API with id: %s" %(network_id))
171 rc, rs = yield from self._loop.run_in_executor(self._executor,
172 self._rwcal.get_virtual_link,
173 self._account,
174 network_id)
175 if rc != RwStatus.SUCCESS:
176 self._log.error("Virtual-network-info operation failed for cloud account: %s. ResourceID: %s",
177 self._account.name,
178 network_id)
179 raise ResMgrCALOperationFailure("Virtual-network-info operation failed for cloud account: %s. ResourceID: %s" %(self._account.name, network_id))
180 return rs
181
182 @asyncio.coroutine
183 def create_virtual_compute(self, req_params):
184 #rc, rsp = self._rwcal.get_vdu_list(self._account)
185 self._log.debug("Calling get_vdu_list API")
186
187 rc, rsp = yield from self._loop.run_in_executor(self._executor,
188 self._rwcal.get_vdu_list,
189 self._account)
190 assert rc == RwStatus.SUCCESS
191 vdus = [vm for vm in rsp.vdu_info_list if vm.name == req_params.name]
192 if vdus:
193 self._log.debug("Found existing virtual-compute with matching name in cloud. Reusing the virtual-compute element with id: %s" %(vdus[0].vdu_id))
194 return vdus[0].vdu_id
195
196 params = RwcalYang.VDUInitParams()
197 params.from_dict(req_params.as_dict())
198
199 image_checksum = req_params.image_checksum if req_params.has_field("image_checksum") else None
200 params.image_id = yield from self.get_image_id_from_image_info(req_params.image_name, image_checksum)
201
202 #rc, rs = self._rwcal.create_vdu(self._account, params)
203 self._log.debug("Calling create_vdu API with params %s" %(str(params)))
204 rc, rs = yield from self._loop.run_in_executor(self._executor,
205 self._rwcal.create_vdu,
206 self._account,
207 params)
208
209 if rc.status != RwStatus.SUCCESS:
210 self._log.error("Virtual-compute-create operation failed for cloud account: %s - error_msg: %s, Traceback: %s",
211 self._account.name, rc.error_msg, rc.traceback)
212 raise ResMgrCALOperationFailure("Virtual-compute-create operation failed for cloud account: %s (%s)"
213 %(self._account.name, rc.error_msg))
214
215 return rs
216
217 @asyncio.coroutine
218 def modify_virtual_compute(self, req_params):
219 #rc = self._rwcal.modify_vdu(self._account, req_params)
220 self._log.debug("Calling modify_vdu API with params: %s" %(str(req_params)))
221 rc = yield from self._loop.run_in_executor(self._executor,
222 self._rwcal.modify_vdu,
223 self._account,
224 req_params)
225 if rc != RwStatus.SUCCESS:
226 self._log.error("Virtual-compute-modify operation failed for cloud account: %s", self._account.name)
227 raise ResMgrCALOperationFailure("Virtual-compute-modify operation failed for cloud account: %s" %(self._account.name))
228
229 @asyncio.coroutine
230 def delete_virtual_compute(self, compute_id):
231 #rc = self._rwcal.delete_vdu(self._account, compute_id)
232 self._log.debug("Calling delete_vdu API with id: %s" %(compute_id))
233 rc = yield from self._loop.run_in_executor(self._executor,
234 self._rwcal.delete_vdu,
235 self._account,
236 compute_id)
237 if rc != RwStatus.SUCCESS:
238 self._log.error("Virtual-compute-release operation failed for cloud account: %s. ResourceID: %s",
239 self._account.name,
240 compute_id)
241 raise ResMgrCALOperationFailure("Virtual-compute-release operation failed for cloud account: %s. ResourceID: %s" %(self._account.name, compute_id))
242
243 @asyncio.coroutine
244 def get_virtual_compute_info(self, compute_id):
245 #rc, rs = self._rwcal.get_vdu(self._account, compute_id)
246 self._log.debug("Calling get_vdu API with id: %s" %(compute_id))
247 rc, rs = yield from self._loop.run_in_executor(self._executor,
248 self._rwcal.get_vdu,
249 self._account,
250 compute_id)
251 if rc != RwStatus.SUCCESS:
252 self._log.error("Virtual-compute-info operation failed for cloud account: %s. ResourceID: %s",
253 self._account.name,
254 compute_id)
255 raise ResMgrCALOperationFailure("Virtual-compute-info operation failed for cloud account: %s. ResourceID: %s" %(self._account.name, compute_id))
256 return rs
257
258 @asyncio.coroutine
259 def get_compute_flavor_info_list(self):
260 #rc, rs = self._rwcal.get_flavor_list(self._account)
261 self._log.debug("Calling get_flavor_list API")
262 rc, rs = yield from self._loop.run_in_executor(self._executor,
263 self._rwcal.get_flavor_list,
264 self._account)
265 if rc != RwStatus.SUCCESS:
266 self._log.error("Get-flavor-info-list operation failed for cloud account: %s",
267 self._account.name)
268 raise ResMgrCALOperationFailure("Get-flavor-info-list operation failed for cloud account: %s" %(self._account.name))
269 return rs.flavorinfo_list
270
271 @asyncio.coroutine
272 def create_compute_flavor(self, request):
273 flavor = RwcalYang.FlavorInfoItem()
274 flavor.name = str(uuid.uuid4())
275 epa_types = ['vm_flavor', 'guest_epa', 'host_epa', 'host_aggregate']
276 epa_dict = {k: v for k, v in request.as_dict().items() if k in epa_types}
277 flavor.from_dict(epa_dict)
278
279 self._log.info("Creating flavor: %s", flavor)
280 #rc, rs = self._rwcal.create_flavor(self._account, flavor)
281 self._log.debug("Calling create_flavor API")
282 rc, rs = yield from self._loop.run_in_executor(self._executor,
283 self._rwcal.create_flavor,
284 self._account,
285 flavor)
286 if rc != RwStatus.SUCCESS:
287 self._log.error("Create-flavor operation failed for cloud account: %s",
288 self._account.name)
289 raise ResMgrCALOperationFailure("Create-flavor operation failed for cloud account: %s" %(self._account.name))
290 return rs
291
292 @asyncio.coroutine
293 def get_image_info_list(self):
294 #rc, rs = self._rwcal.get_image_list(self._account)
295 self._log.debug("Calling get_image_list API")
296 rc, rs = yield from self._loop.run_in_executor(self._executor,
297 self._rwcal.get_image_list,
298 self._account)
299 if rc != RwStatus.SUCCESS:
300 self._log.error("Get-image-info-list operation failed for cloud account: %s",
301 self._account.name)
302 raise ResMgrCALOperationFailure("Get-image-info-list operation failed for cloud account: %s" %(self._account.name))
303 return rs.imageinfo_list
304
305 @asyncio.coroutine
306 def get_image_id_from_image_info(self, image_name, image_checksum=None):
307 self._log.debug("Looking up image id for image name %s and checksum %s on cloud account: %s",
308 image_name, image_checksum, self._account.name
309 )
310
311 image_list = yield from self.get_image_info_list()
312 matching_images = [i for i in image_list if i.name == image_name]
313
314 # If the image checksum was filled in then further filter the images by the checksum
315 if image_checksum is not None:
316 matching_images = [i for i in matching_images if i.checksum == image_checksum]
317 else:
318 self._log.warning("Image checksum not provided. Lookup using image name (%s) only.",
319 image_name)
320
321 if len(matching_images) == 0:
322 raise ResMgrCALOperationFailure("Could not find image name {} (using checksum: {}) for cloud account: {}".format(
323 image_name, image_checksum, self._account.name
324 ))
325
326 elif len(matching_images) > 1:
327 unique_checksums = {i.checksum for i in matching_images}
328 if len(unique_checksums) > 1:
329 msg = ("Too many images with different checksums matched "
330 "image name of %s for cloud account: %s" % (image_name, self._account.name))
331 raise ResMgrCALOperationFailure(msg)
332
333 return matching_images[0].id
334
335 @asyncio.coroutine
336 def get_image_info(self, image_id):
337 #rc, rs = self._rwcal.get_image(self._account, image_id)
338 self._log.debug("Calling get_image API for id: %s" %(image_id))
339 rc, rs = yield from self._loop.run_in_executor(self._executor,
340 self._rwcal.get_image,
341 self._account,
342 image_id)
343 if rc != RwStatus.SUCCESS:
344 self._log.error("Get-image-info-list operation failed for cloud account: %s",
345 self._account.name)
346 raise ResMgrCALOperationFailure("Get-image-info operation failed for cloud account: %s" %(self._account.name))
347 return rs.imageinfo_list
348
349 def dynamic_flavor_supported(self):
350 return getattr(self._account, self._account.account_type).dynamic_flavor_support
351
352
353 class Resource(object):
354 def __init__(self, resource_id, resource_type, request):
355 self._id = resource_id
356 self._type = resource_type
357 self._request = request
358
359 @property
360 def resource_id(self):
361 return self._id
362
363 @property
364 def resource_type(self):
365 return self._type
366
367 @property
368 def request(self):
369 return self._request
370
371 def cleanup(self):
372 pass
373
374
375 class ComputeResource(Resource):
376 pass
377
378
379 class NetworkResource(Resource):
380 pass
381
382
383 class ResourcePoolInfo(object):
384 def __init__(self, name, pool_type, resource_type, max_size):
385 self.name = name
386 self.pool_type = pool_type
387 self.resource_type = resource_type
388 self.max_size = max_size
389
390 @classmethod
391 def from_dict(cls, pool_dict):
392 return cls(
393 pool_dict["name"],
394 pool_dict["pool_type"],
395 pool_dict["resource_type"],
396 pool_dict["max_size"],
397 )
398
399
400 class ResourcePool(object):
401 def __init__(self, log, loop, pool_info, resource_class, cal):
402 self._log = log
403 self._loop = loop
404 self._name = pool_info.name
405 self._pool_type = pool_info.pool_type
406 self._resource_type = pool_info.resource_type
407 self._cal = cal
408 self._resource_class = resource_class
409
410 self._max_size = pool_info.max_size
411
412 self._status = 'unlocked'
413 ### A Dictionary of all the resources in this pool, keyed by CAL resource-id
414 self._all_resources = {}
415 ### A List of free resources in this pool
416 self._free_resources = []
417 ### A Dictionary of all the allocated resources in this pool, keyed by CAL resource-id
418 self._allocated_resources = {}
419
420 @property
421 def name(self):
422 return self._name
423
424 @property
425 def cal(self):
426 """ This instance's ResourceMgrCALHandler """
427 return self._cal
428
429 @property
430 def pool_type(self):
431 return self._pool_type
432
433 @property
434 def resource_type(self):
435 return self._resource_type
436
437 @property
438 def max_size(self):
439 return self._max_size
440
441 @property
442 def status(self):
443 return self._status
444
445 def in_use(self):
446 if len(self._allocated_resources) != 0:
447 return True
448 else:
449 return False
450
451 def update_cal_handler(self, cal):
452 if self.in_use():
453 raise ResMgrPoolOperationFailed(
454 "Cannot update CAL plugin for in use pool"
455 )
456
457 self._cal = cal
458
459 def lock_pool(self):
460 self._log.info("Locking the pool :%s", self.name)
461 self._status = 'locked'
462
463 def unlock_pool(self):
464 self._log.info("Unlocking the pool :%s", self.name)
465 self._status = 'unlocked'
466
467 def add_resource(self, resource_info):
468 self._log.info("Adding static resource to Pool: %s, Resource-id: %s Resource-Type: %s",
469 self.name,
470 resource_info.resource_id,
471 self.resource_type)
472
473 ### Add static resources to pool
474 resource = self._resource_class(resource_info.resource_id, 'static')
475 assert resource.resource_id == resource_info.resource_id
476 self._all_resources[resource.resource_id] = resource
477 self._free_resources.append(resource)
478
479 def delete_resource(self, resource_id):
480 if resource_id not in self._all_resources:
481 self._log.error("Resource Id: %s not present in pool: %s. Delete operation failed", resource_id, self.name)
482 raise ResMgrUnknownResourceId("Resource Id: %s requested for release is not found" %(resource_id))
483
484 if resource_id in self._allocated_resources:
485 self._log.error("Resource Id: %s in use. Delete operation failed", resource_id)
486 raise ResMgrResourceIdBusy("Resource Id: %s requested for release is in use" %(resource_id))
487
488 self._log.info("Deleting resource: %s from pool: %s, Resource-Type",
489 resource_id,
490 self.name,
491 self.resource_type)
492
493 resource = self._all_resources.pop(resource_id)
494 self._free_resources.remove(resource)
495 resource.cleanup()
496 del resource
497
498 @asyncio.coroutine
499 def read_resource_info(self, resource_id):
500 if resource_id not in self._all_resources:
501 self._log.error("Resource Id: %s not present in pool: %s. Read operation failed", resource_id, self.name)
502 raise ResMgrUnknownResourceId("Resource Id: %s requested for read is not found" %(resource_id))
503
504 if resource_id not in self._allocated_resources:
505 self._log.error("Resource Id: %s not in use. Read operation failed", resource_id)
506 raise ResMgrResourceIdNotAllocated("Resource Id: %s not in use. Read operation failed" %(resource_id))
507
508 resource = self._allocated_resources[resource_id]
509 resource_info = yield from self.get_resource_info(resource)
510 return resource_info
511
512 def get_pool_info(self):
513 info = RwResourceMgrYang.ResourceRecordInfo()
514 self._log.info("Providing info for pool: %s", self.name)
515 info.name = self.name
516 if self.pool_type:
517 info.pool_type = self.pool_type
518 if self.resource_type:
519 info.resource_type = self.resource_type
520 if self.status:
521 info.pool_status = self.status
522
523 info.total_resources = len(self._all_resources)
524 info.free_resources = len(self._free_resources)
525 info.allocated_resources = len(self._allocated_resources)
526 return info
527
528 def cleanup(self):
529 for _, v in self._all_resources.items():
530 v.cleanup()
531
532 @asyncio.coroutine
533 def _allocate_static_resource(self, request, resource_type):
534 unit_type = {'compute': 'VDU', 'network':'VirtualLink'}
535 match_found = False
536 resource = None
537 self._log.info("Doing resource match from pool :%s", self._free_resources)
538 for resource in self._free_resources:
539 resource_info = yield from self.get_resource_info(resource)
540 self._log.info("Attempting to match %s-requirements for %s: %s with resource-id :%s",
541 resource_type, unit_type[resource_type],request.name, resource.resource_id)
542 if self.match_epa_params(resource_info, request):
543 if self.match_image_params(resource_info, request):
544 match_found = True
545 self._log.info("%s-requirements matched for %s: %s with resource-id :%s",
546 resource_type, unit_type[resource_type],request.name, resource.resource_id)
547 yield from self.initialize_resource_in_cal(resource, request)
548 break
549
550 if not match_found:
551 self._log.error("No match found for %s-requirements for %s: %s in pool: %s. %s instantiation failed",
552 resource_type,
553 unit_type[resource_type],
554 request.name,
555 self.name,
556 unit_type[resource_type])
557 return None
558 else:
559 ### Move resource from free-list into allocated-list
560 self._log.info("Allocating the static resource with resource-id: %s for %s: %s",
561 resource.resource_id,
562 unit_type[resource_type],request.name)
563 self._free_resources.remove(resource)
564 self._allocated_resources[resource.resource_id] = resource
565
566 return resource
567
568 @asyncio.coroutine
569 def allocate_resource(self, request):
570 resource = yield from self.allocate_resource_in_cal(request)
571 resource_info = yield from self.get_resource_info(resource)
572 return resource.resource_id, resource_info
573
574 @asyncio.coroutine
575 def release_resource(self, resource_id):
576 self._log.debug("Releasing resource_id %s in pool %s", resource_id, self.name)
577 if resource_id not in self._allocated_resources:
578 self._log.error("Failed to release a resource with resource-id: %s in pool: %s. Resource not known",
579 resource_id,
580 self.name)
581 raise ResMgrUnknownResourceId("Failed to release resource with resource-id: %s. Unknown resource-id" %(resource_id))
582
583 ### Get resource object
584 resource = self._allocated_resources.pop(resource_id)
585 yield from self.uninitialize_resource_in_cal(resource)
586 yield from self.release_cal_resource(resource)
587
588
589 class NetworkPool(ResourcePool):
590 def __init__(self, log, loop, pool_info, cal):
591 super(NetworkPool, self).__init__(log, loop, pool_info, NetworkResource, cal)
592
593 @asyncio.coroutine
594 def allocate_resource_in_cal(self, request):
595 resource = None
596 if self.pool_type == 'static':
597 self._log.info("Attempting network resource allocation from static pool: %s", self.name)
598 ### Attempt resource allocation from static pool
599 resource = yield from self._allocate_static_resource(request, 'network')
600 elif self.pool_type == 'dynamic':
601 ### Attempt resource allocation from dynamic pool
602 self._log.info("Attempting network resource allocation from dynamic pool: %s", self.name)
603 if len(self._free_resources) != 0:
604 self._log.info("Dynamic pool: %s has %d static resources, Attempting resource allocation from static resources",
605 self.name, len(self._free_resources))
606 resource = yield from self._allocate_static_resource(request, 'network')
607 if resource is None:
608 self._log.info("Could not resource from static resources. Going for dynamic resource allocation")
609 ## Not static resource available. Attempt dynamic resource from pool
610 resource = yield from self.allocate_dynamic_resource(request)
611 if resource is None:
612 raise ResMgrNoResourcesAvailable("No matching resource available for allocation from pool: %s" %(self.name))
613 return resource
614
615 @asyncio.coroutine
616 def allocate_dynamic_resource(self, request):
617 resource_type, resource_id = yield from self._cal.create_virtual_network(request)
618 if resource_id in self._all_resources:
619 self._log.error("Resource with id %s name %s of type %s is already used", resource_id, request.name, resource_type)
620 raise ResMgrNoResourcesAvailable("Resource with name %s of type network is already used" %(resource_id))
621 resource = self._resource_class(resource_id, resource_type, request)
622 self._all_resources[resource_id] = resource
623 self._allocated_resources[resource_id] = resource
624 self._log.info("Successfully allocated virtual-network resource from CAL with resource-id: %s", resource_id)
625 return resource
626
627 @asyncio.coroutine
628 def release_cal_resource(self, resource):
629 if resource.resource_type == 'dynamic':
630 self._log.debug("Deleting virtual network with network_id: %s", resource.resource_id)
631 yield from self._cal.delete_virtual_network(resource.resource_id)
632 self._all_resources.pop(resource.resource_id)
633 self._log.info("Successfully released virtual-network resource in CAL with resource-id: %s", resource.resource_id)
634 elif resource.resource_type == 'precreated':
635 self._all_resources.pop(resource.resource_id)
636 self._log.info("Successfully removed precreated virtual-network resource from allocated list: %s", resource.resource_id)
637 else:
638 self._log.info("Successfully released virtual-network resource with resource-id: %s into available-list", resource.resource_id)
639 self._free_resources.append(resource)
640
641 @asyncio.coroutine
642 def get_resource_info(self, resource):
643 info = yield from self._cal.get_virtual_network_info(resource.resource_id)
644 self._log.info("Successfully retrieved virtual-network information from CAL with resource-id: %s. Info: %s",
645 resource.resource_id, str(info))
646 response = RwResourceMgrYang.VirtualLinkEventData_ResourceInfo()
647 response.from_dict(info.as_dict())
648 response.pool_name = self.name
649 response.resource_state = 'active'
650 return response
651
652 @asyncio.coroutine
653 def get_info_by_id(self, resource_id):
654 info = yield from self._cal.get_virtual_network_info(resource_id)
655 self._log.info("Successfully retrieved virtual-network information from CAL with resource-id: %s. Info: %s",
656 resource_id, str(info))
657 return info
658
659 def match_image_params(self, resource_info, request_params):
660 return True
661
662 def match_epa_params(self, resource_info, request_params):
663 if not hasattr(request_params, 'provider_network'):
664 ### Its a match if nothing is requested
665 return True
666 else:
667 required = getattr(request_params, 'provider_network')
668
669 if not hasattr(resource_info, 'provider_network'):
670 ### Its no match
671 return False
672 else:
673 available = getattr(resource_info, 'provider_network')
674
675 self._log.debug("Matching Network EPA params. Required: %s, Available: %s", required, available)
676
677 if required.has_field('name') and required.name!= available.name:
678 self._log.debug("Provider Network mismatch. Required: %s, Available: %s",
679 required.name,
680 available.name)
681 return False
682
683 self._log.debug("Matching EPA params physical network name")
684
685 if required.has_field('physical_network') and required.physical_network != available.physical_network:
686 self._log.debug("Physical Network mismatch. Required: %s, Available: %s",
687 required.physical_network,
688 available.physical_network)
689 return False
690
691 self._log.debug("Matching EPA params overlay type")
692 if required.has_field('overlay_type') and required.overlay_type != available.overlay_type:
693 self._log.debug("Overlay type mismatch. Required: %s, Available: %s",
694 required.overlay_type,
695 available.overlay_type)
696 return False
697
698 self._log.debug("Matching EPA params SegmentationID")
699 if required.has_field('segmentation_id') and required.segmentation_id != available.segmentation_id:
700 self._log.debug("Segmentation-Id mismatch. Required: %s, Available: %s",
701 required.segmentation_id,
702 available.segmentation_id)
703 return False
704 return True
705
706 @asyncio.coroutine
707 def initialize_resource_in_cal(self, resource, request):
708 pass
709
710 @asyncio.coroutine
711 def uninitialize_resource_in_cal(self, resource):
712 pass
713
714
715 class ComputePool(ResourcePool):
716 def __init__(self, log, loop, pool_info, cal):
717 super(ComputePool, self).__init__(log, loop, pool_info, ComputeResource, cal)
718
719 @asyncio.coroutine
720 def allocate_resource_in_cal(self, request):
721 resource = None
722 if self.pool_type == 'static':
723 self._log.info("Attempting compute resource allocation from static pool: %s", self.name)
724 ### Attempt resource allocation from static pool
725 resource = yield from self._allocate_static_resource(request, 'compute')
726 elif self.pool_type == 'dynamic':
727 ### Attempt resource allocation from dynamic pool
728 self._log.info("Attempting compute resource allocation from dynamic pool: %s", self.name)
729 if len(self._free_resources) != 0:
730 self._log.info("Dynamic pool: %s has %d static resources, Attempting resource allocation from static resources",
731 len(self._free_resources),
732 self.name)
733 resource = yield from self._allocate_static_resource(request, 'compute')
734 if resource is None:
735 self._log.info("Attempting for dynamic resource allocation")
736 resource = yield from self.allocate_dynamic_resource(request)
737 if resource is None:
738 raise ResMgrNoResourcesAvailable("No matching resource available for allocation from pool: %s" %(self.name))
739
740 requested_params = RwcalYang.VDUInitParams()
741 requested_params.from_dict(request.as_dict())
742 resource.requested_params = requested_params
743 return resource
744
745 @asyncio.coroutine
746 def allocate_dynamic_resource(self, request):
747 #request.flavor_id = yield from self.select_resource_flavor(request)
748 resource_id = yield from self._cal.create_virtual_compute(request)
749 resource = self._resource_class(resource_id, 'dynamic', request)
750 self._all_resources[resource_id] = resource
751 self._allocated_resources[resource_id] = resource
752 self._log.info("Successfully allocated virtual-compute resource from CAL with resource-id: %s", resource_id)
753 return resource
754
755 @asyncio.coroutine
756 def release_cal_resource(self, resource):
757 if hasattr(resource, 'requested_params'):
758 delattr(resource, 'requested_params')
759 if resource.resource_type == 'dynamic':
760 yield from self._cal.delete_virtual_compute(resource.resource_id)
761 self._all_resources.pop(resource.resource_id)
762 self._log.info("Successfully released virtual-compute resource in CAL with resource-id: %s", resource.resource_id)
763 else:
764 self._log.info("Successfully released virtual-compute resource with resource-id: %s into available-list", resource.resource_id)
765 self._free_resources.append(resource)
766
767 @asyncio.coroutine
768 def get_resource_info(self, resource):
769 info = yield from self._cal.get_virtual_compute_info(resource.resource_id)
770
771 self._log.info("Successfully retrieved virtual-compute information from CAL with resource-id: %s. Info: %s",
772 resource.resource_id, str(info))
773 response = RwResourceMgrYang.VDUEventData_ResourceInfo()
774 response.from_dict(info.as_dict())
775 response.pool_name = self.name
776 response.resource_state = self._get_resource_state(info, resource.requested_params)
777 return response
778
779 @asyncio.coroutine
780 def get_info_by_id(self, resource_id):
781 info = yield from self._cal.get_virtual_compute_info(resource_id)
782 self._log.info("Successfully retrieved virtual-compute information from CAL with resource-id: %s. Info: %s",
783 resource_id, str(info))
784 return info
785
786 def _get_resource_state(self, resource_info, requested_params):
787
788
789 def conn_pts_len_equal():
790 # if explicit mgmt network is defined then the allocated ports might
791 # one more than the expected.
792 allocated_ports = len(resource_info.connection_points)
793 requested_ports = len(requested_params.connection_points)
794
795 if not requested_params.mgmt_network:
796 allocated_ports -= 1
797
798 return allocated_ports == requested_ports
799
800 if resource_info.state == 'failed':
801 self._log.error("<Compute-Resource: %s> Reached failed state.",
802 resource_info.name)
803 return 'failed'
804
805 if resource_info.state != 'active':
806 self._log.info("<Compute-Resource: %s> Not reached active state.",
807 resource_info.name)
808 return 'pending'
809
810 if not resource_info.has_field('management_ip') or resource_info.management_ip == '':
811 self._log.info("<Compute-Resource: %s> Management IP not assigned.",
812 resource_info.name)
813 return 'pending'
814
815 if (requested_params.has_field('allocate_public_address')) and (requested_params.allocate_public_address == True):
816 if not resource_info.has_field('public_ip'):
817 self._log.warning("<Compute-Resource: %s> Management IP not assigned- waiting for public ip, %s",
818 resource_info.name, requested_params)
819 return 'pending'
820
821 if not conn_pts_len_equal():
822 self._log.warning("<Compute-Resource: %s> Waiting for requested number of ports to be assigned to virtual-compute, requested: %d, assigned: %d",
823 resource_info.name,
824 len(requested_params.connection_points),
825 len(resource_info.connection_points))
826 return 'pending'
827
828 #not_active = [c for c in resource_info.connection_points
829 # if c.state != 'active']
830
831 #if not_active:
832 # self._log.warning("<Compute-Resource: %s> Management IP not assigned- waiting for connection_points , %s",
833 # resource_info.name, resource_info)
834 # return 'pending'
835
836 ## Find the connection_points which are in active state but does not have IP address
837 no_address = [c for c in resource_info.connection_points
838 if (c.state == 'active') and (not c.has_field('ip_address'))]
839
840 if no_address:
841 self._log.warning("<Compute-Resource: %s> Management IP not assigned- waiting for connection_points , %s",
842 resource_info.name, resource_info)
843 return 'pending'
844
845 return 'active'
846
847 @asyncio.coroutine
848 def select_resource_flavor(self, request):
849 flavors = yield from self._cal.get_compute_flavor_info_list()
850 self._log.debug("Received %d flavor information from RW.CAL", len(flavors))
851 flavor_id = None
852 match_found = False
853 for flv in flavors:
854 self._log.info("Attempting to match compute requirement for VDU: %s with flavor %s",
855 request.name, flv)
856 if self.match_epa_params(flv, request):
857 self._log.info("Flavor match found for compute requirements for VDU: %s with flavor name: %s, flavor-id: %s",
858 request.name, flv.name, flv.id)
859 match_found = True
860 flavor_id = flv.id
861 break
862
863 if not match_found:
864 ### Check if CAL account allows dynamic flavor creation
865 if self._cal.dynamic_flavor_supported():
866 self._log.info("Attempting to create a new flavor for required compute-requirement for VDU: %s", request.name)
867 flavor_id = yield from self._cal.create_compute_flavor(request)
868 else:
869 ### No match with existing flavors and CAL does not support dynamic flavor creation
870 self._log.error("Unable to create flavor for compute requirement for VDU: %s. VDU instantiation failed", request.name)
871 raise ResMgrNoResourcesAvailable("No resource available with matching EPA attributes")
872 else:
873 ### Found flavor
874 self._log.info("Found flavor with id: %s for compute requirement for VDU: %s",
875 flavor_id, request.name)
876 return flavor_id
877
878 def _match_vm_flavor(self, required, available):
879 self._log.info("Matching VM Flavor attributes")
880 if available.vcpu_count != required.vcpu_count:
881 self._log.debug("VCPU requirement mismatch. Required: %d, Available: %d",
882 required.vcpu_count,
883 available.vcpu_count)
884 return False
885 if available.memory_mb != required.memory_mb:
886 self._log.debug("Memory requirement mismatch. Required: %d MB, Available: %d MB",
887 required.memory_mb,
888 available.memory_mb)
889 return False
890 if available.storage_gb != required.storage_gb:
891 self._log.debug("Storage requirement mismatch. Required: %d GB, Available: %d GB",
892 required.storage_gb,
893 available.storage_gb)
894 return False
895 self._log.debug("VM Flavor match found")
896 return True
897
898 def _match_guest_epa(self, required, available):
899 self._log.info("Matching Guest EPA attributes")
900 if required.has_field('pcie_device'):
901 self._log.debug("Matching pcie_device")
902 if available.has_field('pcie_device') == False:
903 self._log.debug("Matching pcie_device failed. Not available in flavor")
904 return False
905 else:
906 for dev in required.pcie_device:
907 if not [ d for d in available.pcie_device
908 if ((d.device_id == dev.device_id) and (d.count == dev.count)) ]:
909 self._log.debug("Matching pcie_device failed. Required: %s, Available: %s", required.pcie_device, available.pcie_device)
910 return False
911 elif available.has_field('pcie_device'):
912 self._log.debug("Rejecting available flavor because pcie_device not required but available")
913 return False
914
915
916 if required.has_field('mempage_size'):
917 self._log.debug("Matching mempage_size")
918 if available.has_field('mempage_size') == False:
919 self._log.debug("Matching mempage_size failed. Not available in flavor")
920 return False
921 else:
922 if required.mempage_size != available.mempage_size:
923 self._log.debug("Matching mempage_size failed. Required: %s, Available: %s", required.mempage_size, available.mempage_size)
924 return False
925 elif available.has_field('mempage_size'):
926 self._log.debug("Rejecting available flavor because mempage_size not required but available")
927 return False
928
929 if required.has_field('cpu_pinning_policy'):
930 self._log.debug("Matching cpu_pinning_policy")
931 if required.cpu_pinning_policy != 'ANY':
932 if available.has_field('cpu_pinning_policy') == False:
933 self._log.debug("Matching cpu_pinning_policy failed. Not available in flavor")
934 return False
935 else:
936 if required.cpu_pinning_policy != available.cpu_pinning_policy:
937 self._log.debug("Matching cpu_pinning_policy failed. Required: %s, Available: %s", required.cpu_pinning_policy, available.cpu_pinning_policy)
938 return False
939 elif available.has_field('cpu_pinning_policy'):
940 self._log.debug("Rejecting available flavor because cpu_pinning_policy not required but available")
941 return False
942
943 if required.has_field('cpu_thread_pinning_policy'):
944 self._log.debug("Matching cpu_thread_pinning_policy")
945 if available.has_field('cpu_thread_pinning_policy') == False:
946 self._log.debug("Matching cpu_thread_pinning_policy failed. Not available in flavor")
947 return False
948 else:
949 if required.cpu_thread_pinning_policy != available.cpu_thread_pinning_policy:
950 self._log.debug("Matching cpu_thread_pinning_policy failed. Required: %s, Available: %s", required.cpu_thread_pinning_policy, available.cpu_thread_pinning_policy)
951 return False
952 elif available.has_field('cpu_thread_pinning_policy'):
953 self._log.debug("Rejecting available flavor because cpu_thread_pinning_policy not required but available")
954 return False
955
956 if required.has_field('trusted_execution'):
957 self._log.debug("Matching trusted_execution")
958 if required.trusted_execution == True:
959 if available.has_field('trusted_execution') == False:
960 self._log.debug("Matching trusted_execution failed. Not available in flavor")
961 return False
962 else:
963 if required.trusted_execution != available.trusted_execution:
964 self._log.debug("Matching trusted_execution failed. Required: %s, Available: %s", required.trusted_execution, available.trusted_execution)
965 return False
966 elif available.has_field('trusted_execution'):
967 self._log.debug("Rejecting available flavor because trusted_execution not required but available")
968 return False
969
970 if required.has_field('numa_node_policy'):
971 self._log.debug("Matching numa_node_policy")
972 if available.has_field('numa_node_policy') == False:
973 self._log.debug("Matching numa_node_policy failed. Not available in flavor")
974 return False
975 else:
976 if required.numa_node_policy.has_field('node_cnt'):
977 self._log.debug("Matching numa_node_policy node_cnt")
978 if available.numa_node_policy.has_field('node_cnt') == False:
979 self._log.debug("Matching numa_node_policy node_cnt failed. Not available in flavor")
980 return False
981 else:
982 if required.numa_node_policy.node_cnt != available.numa_node_policy.node_cnt:
983 self._log.debug("Matching numa_node_policy node_cnt failed. Required: %s, Available: %s",required.numa_node_policy.node_cnt, available.numa_node_policy.node_cnt)
984 return False
985 elif available.numa_node_policy.has_field('node_cnt'):
986 self._log.debug("Rejecting available flavor because numa node count not required but available")
987 return False
988
989 if required.numa_node_policy.has_field('mem_policy'):
990 self._log.debug("Matching numa_node_policy mem_policy")
991 if available.numa_node_policy.has_field('mem_policy') == False:
992 self._log.debug("Matching numa_node_policy mem_policy failed. Not available in flavor")
993 return False
994 else:
995 if required.numa_node_policy.mem_policy != available.numa_node_policy.mem_policy:
996 self._log.debug("Matching numa_node_policy mem_policy failed. Required: %s, Available: %s", required.numa_node_policy.mem_policy, available.numa_node_policy.mem_policy)
997 return False
998 elif available.numa_node_policy.has_field('mem_policy'):
999 self._log.debug("Rejecting available flavor because num node mem_policy not required but available")
1000 return False
1001
1002 if required.numa_node_policy.has_field('node'):
1003 self._log.debug("Matching numa_node_policy nodes configuration")
1004 if available.numa_node_policy.has_field('node') == False:
1005 self._log.debug("Matching numa_node_policy nodes configuration failed. Not available in flavor")
1006 return False
1007 for required_node in required.numa_node_policy.node:
1008 self._log.debug("Matching numa_node_policy nodes configuration for node %s", required_node)
1009 numa_match = False
1010 for available_node in available.numa_node_policy.node:
1011 if required_node.id != available_node.id:
1012 self._log.debug("Matching numa_node_policy nodes configuration failed. Required: %s, Available: %s", required_node, available_node)
1013 continue
1014 if required_node.vcpu != available_node.vcpu:
1015 self._log.debug("Matching numa_node_policy nodes configuration failed. Required: %s, Available: %s", required_node, available_node)
1016 continue
1017 if required_node.memory_mb != available_node.memory_mb:
1018 self._log.debug("Matching numa_node_policy nodes configuration failed. Required: %s, Available: %s", required_node, available_node)
1019 continue
1020 numa_match = True
1021 if numa_match == False:
1022 return False
1023 elif available.numa_node_policy.has_field('node'):
1024 self._log.debug("Rejecting available flavor because numa nodes not required but available")
1025 return False
1026 elif available.has_field('numa_node_policy'):
1027 self._log.debug("Rejecting available flavor because numa_node_policy not required but available")
1028 return False
1029 self._log.info("Successful match for Guest EPA attributes")
1030 return True
1031
1032 def _match_vswitch_epa(self, required, available):
1033 self._log.debug("VSwitch EPA match found")
1034 return True
1035
1036 def _match_hypervisor_epa(self, required, available):
1037 self._log.debug("Hypervisor EPA match found")
1038 return True
1039
1040 def _match_host_epa(self, required, available):
1041 self._log.info("Matching Host EPA attributes")
1042 if required.has_field('cpu_model'):
1043 self._log.debug("Matching CPU model")
1044 if available.has_field('cpu_model') == False:
1045 self._log.debug("Matching CPU model failed. Not available in flavor")
1046 return False
1047 else:
1048 #### Convert all PREFER to REQUIRE since flavor will only have REQUIRE attributes
1049 if required.cpu_model.replace('PREFER', 'REQUIRE') != available.cpu_model:
1050 self._log.debug("Matching CPU model failed. Required: %s, Available: %s", required.cpu_model, available.cpu_model)
1051 return False
1052 elif available.has_field('cpu_model'):
1053 self._log.debug("Rejecting available flavor because cpu_model not required but available")
1054 return False
1055
1056 if required.has_field('cpu_arch'):
1057 self._log.debug("Matching CPU architecture")
1058 if available.has_field('cpu_arch') == False:
1059 self._log.debug("Matching CPU architecture failed. Not available in flavor")
1060 return False
1061 else:
1062 #### Convert all PREFER to REQUIRE since flavor will only have REQUIRE attributes
1063 if required.cpu_arch.replace('PREFER', 'REQUIRE') != available.cpu_arch:
1064 self._log.debug("Matching CPU architecture failed. Required: %s, Available: %s", required.cpu_arch, available.cpu_arch)
1065 return False
1066 elif available.has_field('cpu_arch'):
1067 self._log.debug("Rejecting available flavor because cpu_arch not required but available")
1068 return False
1069
1070 if required.has_field('cpu_vendor'):
1071 self._log.debug("Matching CPU vendor")
1072 if available.has_field('cpu_vendor') == False:
1073 self._log.debug("Matching CPU vendor failed. Not available in flavor")
1074 return False
1075 else:
1076 #### Convert all PREFER to REQUIRE since flavor will only have REQUIRE attributes
1077 if required.cpu_vendor.replace('PREFER', 'REQUIRE') != available.cpu_vendor:
1078 self._log.debug("Matching CPU vendor failed. Required: %s, Available: %s", required.cpu_vendor, available.cpu_vendor)
1079 return False
1080 elif available.has_field('cpu_vendor'):
1081 self._log.debug("Rejecting available flavor because cpu_vendor not required but available")
1082 return False
1083
1084 if required.has_field('cpu_socket_count'):
1085 self._log.debug("Matching CPU socket count")
1086 if available.has_field('cpu_socket_count') == False:
1087 self._log.debug("Matching CPU socket count failed. Not available in flavor")
1088 return False
1089 else:
1090 if required.cpu_socket_count != available.cpu_socket_count:
1091 self._log.debug("Matching CPU socket count failed. Required: %s, Available: %s", required.cpu_socket_count, available.cpu_socket_count)
1092 return False
1093 elif available.has_field('cpu_socket_count'):
1094 self._log.debug("Rejecting available flavor because cpu_socket_count not required but available")
1095 return False
1096
1097 if required.has_field('cpu_core_count'):
1098 self._log.debug("Matching CPU core count")
1099 if available.has_field('cpu_core_count') == False:
1100 self._log.debug("Matching CPU core count failed. Not available in flavor")
1101 return False
1102 else:
1103 if required.cpu_core_count != available.cpu_core_count:
1104 self._log.debug("Matching CPU core count failed. Required: %s, Available: %s", required.cpu_core_count, available.cpu_core_count)
1105 return False
1106 elif available.has_field('cpu_core_count'):
1107 self._log.debug("Rejecting available flavor because cpu_core_count not required but available")
1108 return False
1109
1110 if required.has_field('cpu_core_thread_count'):
1111 self._log.debug("Matching CPU core thread count")
1112 if available.has_field('cpu_core_thread_count') == False:
1113 self._log.debug("Matching CPU core thread count failed. Not available in flavor")
1114 return False
1115 else:
1116 if required.cpu_core_thread_count != available.cpu_core_thread_count:
1117 self._log.debug("Matching CPU core thread count failed. Required: %s, Available: %s", required.cpu_core_thread_count, available.cpu_core_thread_count)
1118 return False
1119 elif available.has_field('cpu_core_thread_count'):
1120 self._log.debug("Rejecting available flavor because cpu_core_thread_count not required but available")
1121 return False
1122
1123 if required.has_field('cpu_feature'):
1124 self._log.debug("Matching CPU feature list")
1125 if available.has_field('cpu_feature') == False:
1126 self._log.debug("Matching CPU feature list failed. Not available in flavor")
1127 return False
1128 else:
1129 for feature in required.cpu_feature:
1130 if feature not in available.cpu_feature:
1131 self._log.debug("Matching CPU feature list failed. Required feature: %s is not present. Available features: %s", feature, available.cpu_feature)
1132 return False
1133 elif available.has_field('cpu_feature'):
1134 self._log.debug("Rejecting available flavor because cpu_feature not required but available")
1135 return False
1136 self._log.info("Successful match for Host EPA attributes")
1137 return True
1138
1139
1140 def _match_placement_group_inputs(self, required, available):
1141 self._log.info("Matching Host aggregate attributes")
1142
1143 if not required and not available:
1144 # Host aggregate not required and not available => success
1145 self._log.info("Successful match for Host Aggregate attributes")
1146 return True
1147 if required and available:
1148 # Host aggregate requested and available => Do a match and decide
1149 xx = [ x.as_dict() for x in required ]
1150 yy = [ y.as_dict() for y in available ]
1151 for i in xx:
1152 if i not in yy:
1153 self._log.debug("Rejecting available flavor because host Aggregate mismatch. Required: %s, Available: %s ", required, available)
1154 return False
1155 self._log.info("Successful match for Host Aggregate attributes")
1156 return True
1157 else:
1158 # Either of following conditions => Failure
1159 # - Host aggregate required but not available
1160 # - Host aggregate not required but available
1161 self._log.debug("Rejecting available flavor because host Aggregate mismatch. Required: %s, Available: %s ", required, available)
1162 return False
1163
1164
1165 def match_image_params(self, resource_info, request_params):
1166 return True
1167
1168 def match_epa_params(self, resource_info, request_params):
1169 result = self._match_vm_flavor(getattr(request_params, 'vm_flavor'),
1170 getattr(resource_info, 'vm_flavor'))
1171 if result == False:
1172 self._log.debug("VM Flavor mismatched")
1173 return False
1174
1175 result = self._match_guest_epa(getattr(request_params, 'guest_epa'),
1176 getattr(resource_info, 'guest_epa'))
1177 if result == False:
1178 self._log.debug("Guest EPA mismatched")
1179 return False
1180
1181 result = self._match_vswitch_epa(getattr(request_params, 'vswitch_epa'),
1182 getattr(resource_info, 'vswitch_epa'))
1183 if result == False:
1184 self._log.debug("Vswitch EPA mismatched")
1185 return False
1186
1187 result = self._match_hypervisor_epa(getattr(request_params, 'hypervisor_epa'),
1188 getattr(resource_info, 'hypervisor_epa'))
1189 if result == False:
1190 self._log.debug("Hypervisor EPA mismatched")
1191 return False
1192
1193 result = self._match_host_epa(getattr(request_params, 'host_epa'),
1194 getattr(resource_info, 'host_epa'))
1195 if result == False:
1196 self._log.debug("Host EPA mismatched")
1197 return False
1198
1199 result = self._match_placement_group_inputs(getattr(request_params, 'host_aggregate'),
1200 getattr(resource_info, 'host_aggregate'))
1201
1202 if result == False:
1203 self._log.debug("Host Aggregate mismatched")
1204 return False
1205
1206 return True
1207
1208 @asyncio.coroutine
1209 def initialize_resource_in_cal(self, resource, request):
1210 self._log.info("Initializing the compute-resource with id: %s in RW.CAL", resource.resource_id)
1211 modify_params = RwcalYang.VDUModifyParams()
1212 modify_params.vdu_id = resource.resource_id
1213 modify_params.image_id = request.image_id
1214
1215 for c_point in request.connection_points:
1216 self._log.debug("Adding connection point for VDU: %s to virtual-compute with id: %s Connection point Name: %s",
1217 request.name,resource.resource_id,c_point.name)
1218 point = modify_params.connection_points_add.add()
1219 point.name = c_point.name
1220 point.virtual_link_id = c_point.virtual_link_id
1221 yield from self._cal.modify_virtual_compute(modify_params)
1222
1223 @asyncio.coroutine
1224 def uninitialize_resource_in_cal(self, resource):
1225 self._log.info("Un-initializing the compute-resource with id: %s in RW.CAL", resource.resource_id)
1226 modify_params = RwcalYang.VDUModifyParams()
1227 modify_params.vdu_id = resource.resource_id
1228 resource_info = yield from self.get_resource_info(resource)
1229 for c_point in resource_info.connection_points:
1230 self._log.debug("Removing connection point: %s from VDU: %s ",
1231 c_point.name,resource_info.name)
1232 point = modify_params.connection_points_remove.add()
1233 point.connection_point_id = c_point.connection_point_id
1234 yield from self._cal.modify_virtual_compute(modify_params)
1235
1236
1237 class ResourceMgrCore(object):
1238 def __init__(self, dts, log, log_hdl, loop, parent):
1239 self._log = log
1240 self._log_hdl = log_hdl
1241 self._dts = dts
1242 self._loop = loop
1243 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
1244 self._parent = parent
1245 self._cloud_cals = {}
1246 # Dictionary of pool objects keyed by name
1247 self._cloud_pool_table = {}
1248 # Dictionary of tuples (resource_id, cloud_account_name, pool_name) keyed by event_id
1249 self._resource_table = {}
1250 self._pool_class = {'compute': ComputePool,
1251 'network': NetworkPool}
1252
1253 def _get_cloud_pool_table(self, cloud_account_name):
1254 if cloud_account_name not in self._cloud_pool_table:
1255 msg = "Cloud account %s not found" % cloud_account_name
1256 self._log.error(msg)
1257 raise ResMgrCloudAccountNotFound(msg)
1258
1259 return self._cloud_pool_table[cloud_account_name]
1260
1261 def _get_cloud_cal_plugin(self, cloud_account_name):
1262 if cloud_account_name not in self._cloud_cals:
1263 msg = "Cloud account %s not found" % cloud_account_name
1264 self._log.error(msg)
1265 raise ResMgrCloudAccountNotFound(msg)
1266
1267 return self._cloud_cals[cloud_account_name]
1268
1269 def _add_default_cloud_pools(self, cloud_account_name):
1270 self._log.debug("Adding default compute and network pools for cloud account %s",
1271 cloud_account_name)
1272 default_pools = [
1273 {
1274 'name': '____default_compute_pool',
1275 'resource_type': 'compute',
1276 'pool_type': 'dynamic',
1277 'max_size': 128,
1278 },
1279 {
1280 'name': '____default_network_pool',
1281 'resource_type': 'network',
1282 'pool_type': 'dynamic',
1283 'max_size': 128,
1284 },
1285 ]
1286
1287 for pool_dict in default_pools:
1288 pool_info = ResourcePoolInfo.from_dict(pool_dict)
1289 self._log.info("Applying configuration for cloud account %s pool: %s",
1290 cloud_account_name, pool_info.name)
1291
1292 self.add_resource_pool(cloud_account_name, pool_info)
1293 self.unlock_resource_pool(cloud_account_name, pool_info.name)
1294
1295 def get_cloud_account_names(self):
1296 """ Returns a list of configured cloud account names """
1297 return self._cloud_cals.keys()
1298
1299 def add_cloud_account(self, account):
1300 self._log.debug("Received CAL account. Account Name: %s, Account Type: %s",
1301 account.name, account.account_type)
1302
1303 ### Add cal handler to all the pools
1304 if account.name in self._cloud_cals:
1305 raise ResMgrCloudAccountExists("Cloud account already exists in res mgr: %s",
1306 account.name)
1307
1308 self._cloud_pool_table[account.name] = {}
1309
1310 cal = ResourceMgrCALHandler(self._loop, self._executor, self._log, self._log_hdl, account)
1311 self._cloud_cals[account.name] = cal
1312
1313 self._add_default_cloud_pools(account.name)
1314
1315 def update_cloud_account(self, account):
1316 raise NotImplementedError("Update cloud account not implemented")
1317
1318 def delete_cloud_account(self, account_name, dry_run=False):
1319 cloud_pool_table = self._get_cloud_pool_table(account_name)
1320 for pool in cloud_pool_table.values():
1321 if pool.in_use():
1322 raise ResMgrCloudAccountInUse("Cannot delete cloud which is currently in use")
1323
1324 # If dry_run is specified, do not actually delete the cloud account
1325 if dry_run:
1326 return
1327
1328 for pool in list(cloud_pool_table):
1329 self.delete_resource_pool(account_name, pool)
1330
1331 del self._cloud_pool_table[account_name]
1332 del self._cloud_cals[account_name]
1333
1334 def add_resource_pool(self, cloud_account_name, pool_info):
1335 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1336 if pool_info.name in cloud_pool_table:
1337 raise ResMgrDuplicatePool("Pool with name: %s already exists", pool_info.name)
1338
1339 cloud_cal = self._get_cloud_cal_plugin(cloud_account_name)
1340 pool = self._pool_class[pool_info.resource_type](self._log, self._loop, pool_info, cloud_cal)
1341
1342 cloud_pool_table[pool_info.name] = pool
1343
1344 def delete_resource_pool(self, cloud_account_name, pool_name):
1345 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1346 if pool_name not in cloud_pool_table:
1347 self._log.error("Pool: %s not found for deletion", pool_name)
1348 return
1349 pool = cloud_pool_table[pool_name]
1350
1351 if pool.in_use():
1352 # Can't delete a pool in use
1353 self._log.error("Pool: %s in use. Can not delete in-use pool", pool.name)
1354 return
1355
1356 pool.cleanup()
1357 del cloud_pool_table[pool_name]
1358 self._log.info("Resource Pool: %s successfully deleted", pool_name)
1359
1360 def modify_resource_pool(self, cloud_account_name, pool):
1361 pass
1362
1363 def lock_resource_pool(self, cloud_account_name, pool_name):
1364 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1365 if pool_name not in cloud_pool_table:
1366 self._log.info("Pool: %s is not available for lock operation")
1367 return
1368
1369 pool = cloud_pool_table[pool_name]
1370 pool.lock_pool()
1371
1372 def unlock_resource_pool(self, cloud_account_name, pool_name):
1373 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1374 if pool_name not in cloud_pool_table:
1375 self._log.info("Pool: %s is not available for unlock operation")
1376 return
1377
1378 pool = cloud_pool_table[pool_name]
1379 pool.unlock_pool()
1380
1381 def get_resource_pool_info(self, cloud_account_name, pool_name):
1382 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1383 if pool_name in cloud_pool_table:
1384 pool = cloud_pool_table[pool_name]
1385 return pool.get_pool_info()
1386 else:
1387 return None
1388
1389 def get_resource_pool_list(self, cloud_account_name):
1390 return [v for _, v in self._get_cloud_pool_table(cloud_account_name).items()]
1391
1392 def _select_resource_pools(self, cloud_account_name, resource_type):
1393 pools = [pool for pool in self.get_resource_pool_list(cloud_account_name) if pool.resource_type == resource_type and pool.status == 'unlocked']
1394 if not pools:
1395 raise ResMgrPoolNotAvailable("No %s pool found for resource allocation", resource_type)
1396
1397 return pools[0]
1398
1399 @asyncio.coroutine
1400 def allocate_virtual_resource(self, event_id, cloud_account_name, request, resource_type):
1401 ### Check if event_id is unique or already in use
1402 if event_id in self._resource_table:
1403 r_id, cloud_account_name, pool_name = self._resource_table[event_id]
1404 self._log.warning("Requested event-id :%s for resource-allocation already active with pool: %s",
1405 event_id, pool_name)
1406 # If resource-type matches then return the same resource
1407 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1408 pool = cloud_pool_table[pool_name]
1409 if pool.resource_type == resource_type:
1410
1411 info = yield from pool.read_resource_info(r_id)
1412 return info
1413 else:
1414 self._log.error("Event-id conflict. Duplicate event-id: %s", event_id)
1415 raise ResMgrDuplicateEventId("Requested event-id :%s already active with pool: %s" %(event_id, pool_name))
1416
1417 ### All-OK, lets go ahead with resource allocation
1418 pool = self._select_resource_pools(cloud_account_name, resource_type)
1419 self._log.info("Selected pool %s for resource allocation", pool.name)
1420
1421 r_id, r_info = yield from pool.allocate_resource(request)
1422
1423 self._resource_table[event_id] = (r_id, cloud_account_name, pool.name)
1424 return r_info
1425
1426 @asyncio.coroutine
1427 def reallocate_virtual_resource(self, event_id, cloud_account_name, request, resource_type, resource):
1428 ### Check if event_id is unique or already in use
1429 if event_id in self._resource_table:
1430 r_id, cloud_account_name, pool_name = self._resource_table[event_id]
1431 self._log.warning("Requested event-id :%s for resource-allocation already active with pool: %s",
1432 event_id, pool_name)
1433 # If resource-type matches then return the same resource
1434 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1435 pool = cloud_pool_table[pool_name]
1436 if pool.resource_type == resource_type:
1437 info = yield from pool.read_resource_info(r_id)
1438 return info
1439 else:
1440 self._log.error("Event-id conflict. Duplicate event-id: %s", event_id)
1441 raise ResMgrDuplicateEventId("Requested event-id :%s already active with pool: %s" %(event_id, pool_name))
1442
1443 r_info = None
1444 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1445 pool = cloud_pool_table[resource.pool_name]
1446 if pool.resource_type == resource_type:
1447 if resource_type == 'network':
1448 r_id = resource.virtual_link_id
1449 r_info = yield from pool.get_info_by_id(resource.virtual_link_id)
1450 elif resource_type == 'compute':
1451 r_id = resource.vdu_id
1452 r_info = yield from pool.get_info_by_id(resource.vdu_id)
1453
1454 if r_info is None:
1455 r_id, r_info = yield from pool.allocate_resource(request)
1456 self._resource_table[event_id] = (r_id, cloud_account_name, resource.pool_name)
1457 return r_info
1458
1459 self._resource_table[event_id] = (r_id, cloud_account_name, resource.pool_name)
1460 new_resource = pool._resource_class(r_id, 'dynamic', request)
1461 if resource_type == 'compute':
1462 requested_params = RwcalYang.VDUInitParams()
1463 requested_params.from_dict(request.as_dict())
1464 new_resource.requested_params = requested_params
1465 pool._all_resources[r_id] = new_resource
1466 pool._allocated_resources[r_id] = new_resource
1467 return r_info
1468
1469 @asyncio.coroutine
1470 def release_virtual_resource(self, event_id, resource_type):
1471 ### Check if event_id exists
1472 if event_id not in self._resource_table:
1473 self._log.error("Received resource-release-request with unknown Event-id :%s", event_id)
1474 raise ResMgrUnknownEventId("Received resource-release-request with unknown Event-id :%s" %(event_id))
1475
1476 ## All-OK, lets proceed with resource release
1477 r_id, cloud_account_name, pool_name = self._resource_table.pop(event_id)
1478 self._log.debug("Attempting to release virtual resource id %s from pool %s",
1479 r_id, pool_name)
1480
1481 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1482 pool = cloud_pool_table[pool_name]
1483 yield from pool.release_resource(r_id)
1484
1485 @asyncio.coroutine
1486 def read_virtual_resource(self, event_id, resource_type):
1487 ### Check if event_id exists
1488 if event_id not in self._resource_table:
1489 self._log.error("Received resource-read-request with unknown Event-id :%s", event_id)
1490 raise ResMgrUnknownEventId("Received resource-read-request with unknown Event-id :%s" %(event_id))
1491
1492 ## All-OK, lets proceed
1493 r_id, cloud_account_name, pool_name = self._resource_table[event_id]
1494 cloud_pool_table = self._get_cloud_pool_table(cloud_account_name)
1495 pool = cloud_pool_table[pool_name]
1496 info = yield from pool.read_resource_info(r_id)
1497 return info