3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 import concurrent
.futures
27 gi
.require_version('RwVnfrYang', '1.0')
28 gi
.require_version('RwMon', '1.0')
29 from gi
.repository
import (
37 class VdurMissingVimIdError(Exception):
38 def __init__(self
, vdur_id
):
39 super().__init
__("VDUR:{} is has no VIM ID".format(vdur_id
))
42 class VdurAlreadyRegisteredError(Exception):
43 def __init__(self
, vdur_id
):
44 super().__init
__("VDUR:{} is already registered".format(vdur_id
))
47 class AccountInUseError(Exception):
51 class UnknownAccountError(Exception):
55 class AccountAlreadyRegisteredError(Exception):
56 def __init__(self
, account_name
):
57 msg
= "'{}' already registered".format(account_name
)
58 super().__init
__(account_name
)
61 class PluginUnavailableError(Exception):
65 class PluginNotSupportedError(PluginUnavailableError
):
69 class AlarmCreateError(Exception):
71 super().__init
__("failed to create alarm")
74 class AlarmDestroyError(Exception):
76 super().__init
__("failed to destroy alarm")
79 class PluginFactory(object):
80 __metaclass__
= abc
.ABCMeta
83 def create(self
, cloud_account
, plugin_name
):
88 return self
.__class
__.PLUGIN_NAME
93 return list(self
.__class
__.FALLBACKS
)
98 class MonascaPluginFactory(PluginFactory
):
99 PLUGIN_NAME
= "monasca"
100 FALLBACKS
= ["ceilometer",]
102 def create(self
, cloud_account
):
103 raise PluginUnavailableError()
106 class CeilometerPluginFactory(PluginFactory
):
107 PLUGIN_NAME
= "ceilometer"
108 FALLBACKS
= ["unavailable",]
110 def create(self
, cloud_account
):
111 plugin
= rw_peas
.PeasPlugin("rwmon_ceilometer", 'RwMon-1.0')
112 impl
= plugin
.get_interface("Monitoring")
114 # Check that the plugin is available on the platform associated with
116 _
, available
= impl
.nfvi_metrics_available(cloud_account
)
118 raise PluginUnavailableError()
123 class UnavailablePluginFactory(PluginFactory
):
124 PLUGIN_NAME
= "unavailable"
126 class UnavailablePlugin(object):
127 def nfvi_metrics_available(self
, cloud_account
):
130 def create(self
, cloud_account
):
131 return UnavailablePluginFactory
.UnavailablePlugin()
134 class MockPluginFactory(PluginFactory
):
136 FALLBACKS
= ["unavailable",]
138 def create(self
, cloud_account
):
139 plugin
= rw_peas
.PeasPlugin("rwmon_mock", 'RwMon-1.0')
140 impl
= plugin
.get_interface("Monitoring")
142 # Check that the plugin is available on the platform associated with
144 _
, available
= impl
.nfvi_metrics_available(cloud_account
)
146 raise PluginUnavailableError()
151 class NfviMetricsPluginManager(object):
152 def __init__(self
, log
):
153 self
._plugins
= dict()
155 self
._factories
= dict()
157 self
.register_plugin_factory(MockPluginFactory())
158 self
.register_plugin_factory(CeilometerPluginFactory())
159 self
.register_plugin_factory(MonascaPluginFactory())
160 self
.register_plugin_factory(UnavailablePluginFactory())
166 def register_plugin_factory(self
, factory
):
167 self
._factories
[factory
.name
] = factory
169 def plugin(self
, account_name
):
170 return self
._plugins
[account_name
]
172 def register(self
, cloud_account
, plugin_name
):
173 # Check to see if the cloud account has already been registered
174 if cloud_account
.name
in self
._plugins
:
175 raise AccountAlreadyRegisteredError(cloud_account
.name
)
177 if plugin_name
not in self
._factories
:
178 raise PluginNotSupportedError(plugin_name
)
180 # Create a plugin from one of the factories
181 fallbacks
= [plugin_name
,]
184 name
= fallbacks
.pop(0)
186 factory
= self
._factories
[name
]
187 plugin
= factory
.create(cloud_account
)
188 self
._plugins
[cloud_account
.name
] = plugin
191 except PluginUnavailableError
as e
:
192 self
.log
.warning("plugin for {} unavailable".format(name
))
193 fallbacks
.extend(factory
.fallbacks
)
195 raise PluginUnavailableError()
197 def unregister(self
, account_name
):
198 if account_name
in self
._plugins
:
199 del self
._plugins
[account_name
]
202 class NfviMetrics(object):
204 The NfviMetrics class contains the logic to retrieve NFVI metrics for a
205 particular VDUR. Of particular importance is that this object caches the
206 metrics until the data become stale so that it does not create excessive
207 load upon the underlying data-source.
210 # The sample interval defines the maximum time (secs) that metrics will be
211 # cached for. This duration should coincide with the sampling interval used
212 # by the underlying data-source to capture metrics.
215 # The maximum time (secs) an instance will wait for a request to the data
216 # source to be completed
219 def __init__(self
, log
, loop
, account
, plugin
, vdur
):
220 """Creates an instance of NfviMetrics
223 manager - a NfviInterface instance
224 account - a CloudAccount instance
225 plugin - an NFVI plugin
226 vdur - a VDUR instance
231 self
._account
= account
232 self
._plugin
= plugin
234 self
._metrics
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_NfviMetrics()
236 self
._vim
_id
= vdur
.vim_id
237 self
._updating
= None
241 """The logger used by NfviMetrics"""
246 """The current asyncio loop"""
251 """The VDUR that these metrics are associated with"""
255 """Return the NFVI metrics for this VDUR
257 This function will immediately return the current, known NFVI metrics
258 for the associated VDUR. It will also, if the data are stale, schedule
259 a call to the data-source to retrieve new data.
262 if self
.should_update():
263 self
._updating
= self
.loop
.create_task(self
.update())
267 def should_update(self
):
268 """Return a boolean indicating whether an update should be performed"""
269 running
= self
._updating
is not None and not self
._updating
.done()
270 overdue
= time
.time() > self
._timestamp
+ NfviMetrics
.SAMPLE_INTERVAL
272 return overdue
and not running
276 """Update the NFVI metrics for the associated VDUR
278 This coroutine will request new metrics from the data-source and update
284 # Make the request to the plugin in a separate thread and do
285 # not exceed the timeout
286 _
, metrics
= yield from asyncio
.wait_for(
287 self
.loop
.run_in_executor(
289 self
._plugin
.nfvi_metrics
,
293 timeout
=NfviMetrics
.TIMEOUT
,
297 except asyncio
.TimeoutError
:
298 msg
= "timeout on request for nfvi metrics (vim-id = {})"
299 self
.log
.warning(msg
.format(self
._vim
_id
))
302 except Exception as e
:
303 self
.log
.exception(e
)
307 # Create uninitialized metric structure
308 vdu_metrics
= RwVnfrYang
.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vdur_NfviMetrics()
311 vdu_metrics
.vcpu
.total
= self
.vdur
.vm_flavor
.vcpu_count
312 vdu_metrics
.vcpu
.utilization
= metrics
.vcpu
.utilization
315 vdu_metrics
.memory
.used
= metrics
.memory
.used
316 vdu_metrics
.memory
.total
= self
.vdur
.vm_flavor
.memory_mb
317 vdu_metrics
.memory
.utilization
= 100 * vdu_metrics
.memory
.used
/ vdu_metrics
.memory
.total
321 vdu_metrics
.storage
.used
= metrics
.storage
.used
322 if self
.vdur
.has_field('volumes'):
323 for volume
in self
.vdur
.volumes
:
324 if vdu_metrics
.storage
.total
is None:
325 vdu_metrics
.storage
.total
= 1e9
* volume
.size
327 vdu_metrics
.storage
.total
+= (1e9
* volume
.size
)
329 vdu_metrics
.storage
.total
= 1e9
* self
.vdur
.vm_flavor
.storage_gb
330 utilization
= 100 * vdu_metrics
.storage
.used
/ vdu_metrics
.storage
.total
331 if utilization
> 100:
333 vdu_metrics
.storage
.utilization
= utilization
334 except ZeroDivisionError:
335 vdu_metrics
.storage
.utilization
= 0
338 vdu_metrics
.network
.incoming
.packets
= metrics
.network
.incoming
.packets
339 vdu_metrics
.network
.incoming
.packet_rate
= metrics
.network
.incoming
.packet_rate
340 vdu_metrics
.network
.incoming
.bytes
= metrics
.network
.incoming
.bytes
341 vdu_metrics
.network
.incoming
.byte_rate
= metrics
.network
.incoming
.byte_rate
344 vdu_metrics
.network
.outgoing
.packets
= metrics
.network
.outgoing
.packets
345 vdu_metrics
.network
.outgoing
.packet_rate
= metrics
.network
.outgoing
.packet_rate
346 vdu_metrics
.network
.outgoing
.bytes
= metrics
.network
.outgoing
.bytes
347 vdu_metrics
.network
.outgoing
.byte_rate
= metrics
.network
.outgoing
.byte_rate
350 vdu_metrics
.external_ports
.total
= len(self
.vdur
.external_interface
)
353 vdu_metrics
.internal_ports
.total
= len(self
.vdur
.internal_interface
)
355 self
._metrics
= vdu_metrics
357 except Exception as e
:
358 self
.log
.exception(e
)
361 # Regardless of the result of the query, we want to make sure that
362 # we do not poll the data source until another sample duration has
364 self
._timestamp
= time
.time()
367 class NfviMetricsCache(object):
368 def __init__(self
, log
, loop
, plugin_manager
):
371 self
._plugin
_manager
= plugin_manager
372 self
._nfvi
_metrics
= dict()
374 self
._vim
_to
_vdur
= dict()
375 self
._vdur
_to
_vim
= dict()
377 def create_entry(self
, account
, vdur
):
378 plugin
= self
._plugin
_manager
.plugin(account
.name
)
379 metrics
= NfviMetrics(self
._log
, self
._loop
, account
, plugin
, vdur
)
380 self
._nfvi
_metrics
[vdur
.vim_id
] = metrics
382 self
._vim
_to
_vdur
[vdur
.vim_id
] = vdur
.id
383 self
._vdur
_to
_vim
[vdur
.id] = vdur
.vim_id
385 def destroy_entry(self
, vdur_id
):
386 vim_id
= self
._vdur
_to
_vim
[vdur_id
]
388 del self
._nfvi
_metrics
[vim_id
]
389 del self
._vdur
_to
_vim
[vdur_id
]
390 del self
._vim
_to
_vdur
[vim_id
]
392 def retrieve(self
, vim_id
):
393 return self
._nfvi
_metrics
[vim_id
].retrieve()
395 def to_vim_id(self
, vdur_id
):
396 return self
._vdur
_to
_vim
[vdur_id
]
398 def to_vdur_id(self
, vim_id
):
399 return self
._vim
_to
_vdur
[vim_id
]
401 def contains_vdur_id(self
, vdur_id
):
402 return vdur_id
in self
._vdur
_to
_vim
404 def contains_vim_id(self
, vim_id
):
405 return vim_id
in self
._vim
_to
_vdur
408 class NfviInterface(object):
410 The NfviInterface serves as an interface for communicating with the
411 underlying infrastructure, i.e. retrieving metrics for VDURs that have been
412 registered with it and managing alarms.
414 The NfviInterface should only need to be invoked using a cloud account and
415 optionally a VIM ID; It should not need to handle mapping from VDUR ID to
419 def __init__(self
, loop
, log
, plugin_manager
, cache
):
420 """Creates an NfviInterface instance
425 plugin_manager - an instance of NfviMetricsPluginManager
426 cache - an instance of NfviMetricsCache
429 self
._executor
= concurrent
.futures
.ThreadPoolExecutor(max_workers
=16)
430 self
._plugin
_manager
= plugin_manager
437 """The event loop used by this NfviInterface"""
442 """The event log used by this NfviInterface"""
447 """The list of metrics contained in this NfviInterface"""
448 return list(self
._cache
._nfvi
_metrics
.values())
450 def nfvi_metrics_available(self
, account
):
451 plugin
= self
._plugin
_manager
.plugin(account
.name
)
452 _
, available
= plugin
.nfvi_metrics_available(account
)
455 def retrieve(self
, vdur_id
):
456 """Returns the NFVI metrics for the specified VDUR
458 Note, a VDUR must be registered with a NfviInterface before
459 metrics can be retrieved for it.
462 vdur_id - the ID of the VDUR to whose metrics should be retrieve
465 An NfviMetrics object for the specified VDUR
468 return self
._cache
.retrieve(self
._cache
.to_vim_id(vdur_id
))
471 def alarm_create(self
, account
, vim_id
, alarm
, timeout
=5):
472 """Create a new alarm
475 account - a CloudAccount instance
476 vim_id - the VM to associate with this alarm
477 alarm - an alarm structure
478 timeout - the request timeout (sec)
481 If the data source does not respond in a timely manner, an
482 asyncio.TimeoutError will be raised.
485 plugin
= self
._plugin
_manager
.plugin(account
.name
)
486 status
= yield from asyncio
.wait_for(
487 self
.loop
.run_in_executor(
489 plugin
.do_alarm_create
,
498 if status
== RwTypes
.RwStatus
.FAILURE
:
499 raise AlarmCreateError()
502 def alarm_destroy(self
, account
, alarm_id
, timeout
=5):
503 """Destroy an existing alarm
506 account - a CloudAccount instance
507 alarm_id - the identifier of the alarm to destroy
508 timeout - the request timeout (sec)
511 If the data source does not respond in a timely manner, an
512 asyncio.TimeoutError will be raised.
515 plugin
= self
._plugin
_manager
.plugin(account
.name
)
516 status
= yield from asyncio
.wait_for(
517 self
.loop
.run_in_executor(
519 plugin
.do_alarm_delete
,
527 if status
== RwTypes
.RwStatus
.FAILURE
:
528 raise AlarmDestroyError()
531 class InstanceConfiguration(object):
533 The InstanceConfiguration class represents configuration information that
534 affects the behavior of the monitor. Essentially this class should contain
535 not functional behavior but serve as a convenient way to share data amongst
536 the components of the monitoring system.
540 self
.polling_period
= None
541 self
.max_polling_frequency
= None
542 self
.min_cache_lifetime
= None
543 self
.public_ip
= None
546 class Monitor(object):
548 The Monitor class is intended to act as a unifying interface for the
549 different sub-systems that are used to monitor the NFVI.
552 def __init__(self
, loop
, log
, config
, project
):
553 """Create a Monitor object
557 log - the logger used by this object
558 config - an instance of InstanceConfiguration
559 project - an instance of the project
564 self
._project
= project
566 self
._cloud
_accounts
= dict()
567 self
._nfvi
_plugins
= NfviMetricsPluginManager(log
)
568 self
._cache
= NfviMetricsCache(log
, loop
, self
._nfvi
_plugins
)
569 self
._nfvi
_interface
= NfviInterface(loop
, log
, self
._nfvi
_plugins
, self
._cache
)
570 self
._config
= config
572 self
._vnfr
_to
_vdurs
= collections
.defaultdict(set)
573 self
._alarms
= collections
.defaultdict(list)
577 """The event loop used by this object"""
582 """The event log used by this object"""
591 """The NFVI metrics cache"""
596 """The list of metrics contained in this Monitor"""
597 return self
._nfvi
_interface
.metrics
599 def nfvi_metrics_available(self
, account
):
600 """Returns a boolean indicating whether NFVI metrics are available
603 account - the name of the cloud account to check
606 a boolean indicating availability of NFVI metrics
609 if account
not in self
._cloud
_accounts
:
612 cloud_account
= self
._cloud
_accounts
[account
]
613 return self
._nfvi
_interface
.nfvi_metrics_available(cloud_account
)
615 def add_cloud_account(self
, account
):
616 """Add a cloud account to the monitor
619 account - a cloud account object
622 If the cloud account has already been added to the monitor, an
623 AccountAlreadyRegisteredError is raised.
626 if account
.name
in self
._cloud
_accounts
:
627 raise AccountAlreadyRegisteredError(account
.name
)
629 self
._cloud
_accounts
[account
.name
] = account
631 if account
.account_type
== "openstack":
632 self
.register_cloud_account(account
, "monasca")
634 self
.register_cloud_account(account
, "mock")
636 def remove_cloud_account(self
, account_name
):
637 """Remove a cloud account from the monitor
640 account_name - removes the cloud account that has this name
643 If the specified cloud account cannot be found, an
644 UnknownAccountError is raised.
647 if account_name
not in self
._cloud
_accounts
:
648 raise UnknownAccountError()
650 # Make sure that there are no VNFRs associated with this account
651 for vnfr
in self
._vnfrs
.values():
652 if vnfr
.cloud_account
== account_name
:
653 raise AccountInUseError()
655 del self
._cloud
_accounts
[account_name
]
656 self
._nfvi
_plugins
.unregister(account_name
)
658 def get_cloud_account(self
, account_name
):
659 """Returns a cloud account by name
662 account_name - the name of the account to return
665 An UnknownAccountError is raised if there is not account object
666 associated with the provided name
669 A cloud account object
672 if account_name
not in self
._cloud
_accounts
:
673 raise UnknownAccountError()
675 return self
._cloud
_accounts
[account_name
]
677 def register_cloud_account(self
, account
, plugin_name
):
678 """Register a cloud account with an NFVI plugin
680 Note that a cloud account can only be registered for one plugin at a
684 account - the cloud account to associate with the plugin
685 plugin_name - the name of the plugin to use
688 self
._nfvi
_plugins
.register(account
, plugin_name
)
690 def add_vnfr(self
, vnfr
):
691 """Add a VNFR to the monitor
697 An UnknownAccountError is raised if the account name contained in
698 the VNFR does not reference a cloud account that has been added to
702 if vnfr
.cloud_account
not in self
._cloud
_accounts
:
703 raise UnknownAccountError()
705 account
= self
._cloud
_accounts
[vnfr
.cloud_account
]
707 for vdur
in vnfr
.vdur
:
709 self
.add_vdur(account
, vdur
)
710 self
._vnfr
_to
_vdurs
[vnfr
.id].add(vdur
.id)
711 except (VdurMissingVimIdError
, VdurAlreadyRegisteredError
):
714 self
._vnfrs
[vnfr
.id] = vnfr
716 def update_vnfr(self
, vnfr
):
717 """Updates the VNFR information in the monitor
723 An UnknownAccountError is raised if the account name contained in
724 the VNFR does not reference a cloud account that has been added to
728 if vnfr
.cloud_account
not in self
._cloud
_accounts
:
729 raise UnknownAccountError()
731 account
= self
._cloud
_accounts
[vnfr
.cloud_account
]
733 for vdur
in vnfr
.vdur
:
735 self
.add_vdur(account
, vdur
)
736 self
._vnfr
_to
_vdurs
[vnfr
.id].add(vdur
.id)
737 except (VdurMissingVimIdError
, VdurAlreadyRegisteredError
):
740 def remove_vnfr(self
, vnfr_id
):
741 """Remove a VNFR from the monitor
744 vnfr_id - the ID of the VNFR to remove
747 vdur_ids
= self
._vnfr
_to
_vdurs
[vnfr_id
]
749 for vdur_id
in vdur_ids
:
750 self
.remove_vdur(vdur_id
)
752 del self
._vnfrs
[vnfr_id
]
753 del self
._vnfr
_to
_vdurs
[vnfr_id
]
755 def add_vdur(self
, account
, vdur
):
756 """Adds a VDUR to the monitor
758 Adding a VDUR to the monitor will automatically create a NFVI metrics
759 object that is associated with the VDUR so that the monitor cane
760 provide the NFVI metrics associated with the VDUR.
763 account - the cloud account associated with the VNFR that contains
768 A VdurMissingVimIdError is raised if the provided VDUR does not
769 contain a VIM ID. A VdurAlreadyRegisteredError is raised if the ID
770 associated with the VDUR has already been registered.
774 raise VdurMissingVimIdError(vdur
.id)
776 if self
.is_registered_vdur(vdur
.id):
777 raise VdurAlreadyRegisteredError(vdur
.id)
779 self
.cache
.create_entry(account
, vdur
)
781 def remove_vdur(self
, vdur_id
):
782 """Removes a VDUR from the monitor
785 vdur_id - the ID of the VDUR to remove
788 self
.cache
.destroy_entry(vdur_id
)
790 # Schedule any alarms associated with the VDUR for destruction
791 for account_name
, alarm_id
in self
._alarms
[vdur_id
]:
792 self
.loop
.create_task(self
.destroy_alarm(account_name
, alarm_id
))
794 del self
._alarms
[vdur_id
]
796 def list_vdur(self
, vnfr_id
):
797 """Returns a list of VDURs
800 vnfr_id - the identifier of the VNFR contains the VDURs
806 return self
._vnfrs
[vnfr_id
].vdur
808 def is_registered_vnfr(self
, vnfr_id
):
809 """Returns True if the VNFR is registered with the monitor
812 vnfr_id - the ID of the VNFR to check
815 True if the VNFR is registered and False otherwise.
818 return vnfr_id
in self
._vnfrs
820 def is_registered_vdur(self
, vdur_id
):
821 """Returns True if the VDUR is registered with the monitor
824 vnfr_id - the ID of the VDUR to check
827 True if the VDUR is registered and False otherwise.
830 return self
.cache
.contains_vdur_id(vdur_id
)
832 def retrieve_nfvi_metrics(self
, vdur_id
):
833 """Retrieves the NFVI metrics associated with a VDUR
836 vdur_id - the ID of the VDUR whose metrics are to be retrieved
839 NFVI metrics for a VDUR
842 return self
._nfvi
_interface
.retrieve(vdur_id
)
845 def create_alarm(self
, account_name
, vdur_id
, alarm
):
846 """Create a new alarm
848 This function create an alarm and augments the provided endpoints with
849 endpoints to the launchpad if the launchpad has a public IP. The added
850 endpoints are of the form,
852 http://{host}:4568/{platform}/{vdur_id}/{action}
854 where the 'action' is one of 'ok', 'alarm', or 'insufficient_data'. The
855 messages that are pushed to the launchpad are not defined by RIFT so
856 we need to know which platform an alarm is sent from in order to
861 account_name - the name of the account to use to create the alarm
862 vdur_id - the identifier of the VDUR to associated with the
863 alarm. If the identifier is None, the alarm is not
864 associated with a specific VDUR.
865 alarm - the alarm data
868 account
= self
.get_cloud_account(account_name
)
869 vim_id
= self
.cache
.to_vim_id(vdur_id
)
871 # If the launchpad has a public IP, augment the action webhooks to
872 # include the launchpad so that the alarms can be broadcast as event
874 if self
._config
.public_ip
is not None:
875 url
= "http://{host}:4568/{platform}/{vdur_id}".format(
876 host
=self
._config
.public_ip
,
877 platform
=account
.account_type
,
880 alarm
.actions
.ok
.add().url
= url
+ "/ok"
881 alarm
.actions
.alarm
.add().url
= url
+ "/alarm"
882 alarm
.actions
.alarm
.add().url
= url
+ "/insufficient_data"
884 yield from self
._nfvi
_interface
.alarm_create(account
, vim_id
, alarm
)
886 # Associate the VDUR ID with the alarm ID
887 self
._alarms
[vdur_id
].append((account_name
, alarm
.alarm_id
))
890 def destroy_alarm(self
, account_name
, alarm_id
):
891 """Destroy an existing alarm
894 account_name - the name of the account that owns the alert
895 alarm_id - the identifier of the alarm to destroy
898 account
= self
.get_cloud_account(account_name
)
899 yield from self
._nfvi
_interface
.alarm_destroy(account
, alarm_id
)