3 # Copyright 2016 RIFT.IO Inc
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
22 import concurrent
.futures
27 gi
.require_version('RwVnfrYang', '1.0')
28 gi
.require_version('RwMon', '1.0')
29 from gi
.repository
import (
37 class VdurMissingVimIdError(Exception):
38 def __init__(self
, vdur_id
):
39 super().__init
__("VDUR:{} is has no VIM ID".format(vdur_id
))
42 class VdurAlreadyRegisteredError(Exception):
43 def __init__(self
, vdur_id
):
44 super().__init
__("VDUR:{} is already registered".format(vdur_id
))
47 class AccountInUseError(Exception):
51 class UnknownAccountError(Exception):
55 class AccountAlreadyRegisteredError(Exception):
56 def __init__(self
, account_name
):
57 msg
= "'{}' already registered".format(account_name
)
58 super().__init
__(account_name
)
61 class PluginUnavailableError(Exception):
65 class PluginNotSupportedError(PluginUnavailableError
):
69 class AlarmCreateError(Exception):
71 super().__init
__("failed to create alarm")
74 class AlarmDestroyError(Exception):
76 super().__init
__("failed to destroy alarm")
79 class PluginFactory(object):
80 __metaclass__
= abc
.ABCMeta
83 def create(self
, cloud_account
, plugin_name
):
88 return self
.__class
__.PLUGIN_NAME
93 return list(self
.__class
__.FALLBACKS
)
98 class MonascaPluginFactory(PluginFactory
):
99 PLUGIN_NAME
= "monasca"
100 FALLBACKS
= ["ceilometer",]
102 def create(self
, cloud_account
):
103 raise PluginUnavailableError()
106 class CeilometerPluginFactory(PluginFactory
):
107 PLUGIN_NAME
= "ceilometer"
108 FALLBACKS
= ["unavailable",]
110 def create(self
, cloud_account
):
111 plugin
= rw_peas
.PeasPlugin("rwmon_ceilometer", 'RwMon-1.0')
112 impl
= plugin
.get_interface("Monitoring")
114 # Check that the plugin is available on the platform associated with
116 _
, available
= impl
.nfvi_metrics_available(cloud_account
)
118 raise PluginUnavailableError()
123 class UnavailablePluginFactory(PluginFactory
):
124 PLUGIN_NAME
= "unavailable"
126 class UnavailablePlugin(object):
127 def nfvi_metrics_available(self
, cloud_account
):
130 def create(self
, cloud_account
):
131 return UnavailablePluginFactory
.UnavailablePlugin()
134 class MockPluginFactory(PluginFactory
):
136 FALLBACKS
= ["unavailable",]
138 def create(self
, cloud_account
):
139 plugin
= rw_peas
.PeasPlugin("rwmon_mock", 'RwMon-1.0')
140 impl
= plugin
.get_interface("Monitoring")
142 # Check that the plugin is available on the platform associated with
144 _
, available
= impl
.nfvi_metrics_available(cloud_account
)
146 raise PluginUnavailableError()
151 class NfviMetricsPluginManager(object):
152 def __init__(self
, log
):
153 self
._plugins
= dict()
155 self
._factories
= dict()
157 self
.register_plugin_factory(MockPluginFactory())
158 self
.register_plugin_factory(CeilometerPluginFactory())
159 self
.register_plugin_factory(MonascaPluginFactory())
160 self
.register_plugin_factory(UnavailablePluginFactory())
166 def register_plugin_factory(self
, factory
):
167 self
._factories
[factory
.name
] = factory
169 def plugin(self
, account_name
):
170 return self
._plugins
[account_name
]
172 def register(self
, cloud_account
, plugin_name
):
173 # Check to see if the cloud account has already been registered
174 if cloud_account
.name
in self
._plugins
:
175 raise AccountAlreadyRegisteredError(cloud_account
.name
)
177 if plugin_name
not in self
._factories
:
178 raise PluginNotSupportedError(plugin_name
)
180 # Create a plugin from one of the factories
181 fallbacks
= [plugin_name
,]
184 name
= fallbacks
.pop(0)
186 factory
= self
._factories
[name
]
187 plugin
= factory
.create(cloud_account
)
188 self
._plugins
[cloud_account
.name
] = plugin
191 except PluginUnavailableError
as e
:
192 self
.log
.warning("plugin for {} unavailable".format(name
))
193 fallbacks
.extend(factory
.fallbacks
)
195 raise PluginUnavailableError()
197 def unregister(self
, account_name
):
198 if account_name
in self
._plugins
:
199 del self
._plugins
[account_name
]
202 class NfviMetrics(object):
204 The NfviMetrics class contains the logic to retrieve NFVI metrics for a
205 particular VDUR. Of particular importance is that this object caches the
206 metrics until the data become stale so that it does not create excessive
207 load upon the underlying data-source.
210 # The sample interval defines the maximum time (secs) that metrics will be
211 # cached for. This duration should coincide with the sampling interval used
212 # by the underlying data-source to capture metrics.
215 # The maximum time (secs) an instance will wait for a request to the data
216 # source to be completed
219 def __init__(self
, log
, loop
, account
, plugin
, vdur
):
220 """Creates an instance of NfviMetrics
223 manager - a NfviInterface instance
224 account - a CloudAccount instance
225 plugin - an NFVI plugin
226 vdur - a VDUR instance
231 self
._account
= account
232 self
._plugin
= plugin
234 self
._metrics
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_NfviMetrics()
236 self
._vim
_id
= vdur
.vim_id
237 self
._updating
= None
241 """The logger used by NfviMetrics"""
246 """The current asyncio loop"""
251 """The VDUR that these metrics are associated with"""
255 """Return the NFVI metrics for this VDUR
257 This function will immediately return the current, known NFVI metrics
258 for the associated VDUR. It will also, if the data are stale, schedule
259 a call to the data-source to retrieve new data.
262 if self
.should_update():
263 self
._updating
= self
.loop
.create_task(self
.update())
267 def should_update(self
):
268 """Return a boolean indicating whether an update should be performed"""
269 running
= self
._updating
is not None and not self
._updating
.done()
270 overdue
= time
.time() > self
._timestamp
+ NfviMetrics
.SAMPLE_INTERVAL
272 return overdue
and not running
276 """Update the NFVI metrics for the associated VDUR
278 This coroutine will request new metrics from the data-source and update
284 # Make the request to the plugin in a separate thread and do
285 # not exceed the timeout
286 _
, metrics
= yield from asyncio
.wait_for(
287 self
.loop
.run_in_executor(
289 self
._plugin
.nfvi_metrics
,
293 timeout
=NfviMetrics
.TIMEOUT
,
297 except asyncio
.TimeoutError
:
298 msg
= "timeout on request for nfvi metrics (vim-id = {})"
299 self
.log
.warning(msg
.format(self
._vim
_id
))
302 except Exception as e
:
303 self
.log
.exception(e
)
307 # Create uninitialized metric structure
308 vdu_metrics
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vdur_NfviMetrics()
311 vdu_metrics
.vcpu
.total
= self
.vdur
.vm_flavor
.vcpu_count
312 vdu_metrics
.vcpu
.utilization
= metrics
.vcpu
.utilization
315 vdu_metrics
.memory
.used
= metrics
.memory
.used
316 vdu_metrics
.memory
.total
= self
.vdur
.vm_flavor
.memory_mb
317 vdu_metrics
.memory
.utilization
= 100 * vdu_metrics
.memory
.used
/ vdu_metrics
.memory
.total
320 vdu_metrics
.storage
.used
= metrics
.storage
.used
321 vdu_metrics
.storage
.total
= 1e9
* self
.vdur
.vm_flavor
.storage_gb
322 vdu_metrics
.storage
.utilization
= 100 * vdu_metrics
.storage
.used
/ vdu_metrics
.storage
.total
325 vdu_metrics
.network
.incoming
.packets
= metrics
.network
.incoming
.packets
326 vdu_metrics
.network
.incoming
.packet_rate
= metrics
.network
.incoming
.packet_rate
327 vdu_metrics
.network
.incoming
.bytes
= metrics
.network
.incoming
.bytes
328 vdu_metrics
.network
.incoming
.byte_rate
= metrics
.network
.incoming
.byte_rate
331 vdu_metrics
.network
.outgoing
.packets
= metrics
.network
.outgoing
.packets
332 vdu_metrics
.network
.outgoing
.packet_rate
= metrics
.network
.outgoing
.packet_rate
333 vdu_metrics
.network
.outgoing
.bytes
= metrics
.network
.outgoing
.bytes
334 vdu_metrics
.network
.outgoing
.byte_rate
= metrics
.network
.outgoing
.byte_rate
337 vdu_metrics
.external_ports
.total
= len(self
.vdur
.external_interface
)
340 vdu_metrics
.internal_ports
.total
= len(self
.vdur
.internal_interface
)
342 self
._metrics
= vdu_metrics
344 except Exception as e
:
345 self
.log
.exception(e
)
348 # Regardless of the result of the query, we want to make sure that
349 # we do not poll the data source until another sample duration has
351 self
._timestamp
= time
.time()
354 class NfviMetricsCache(object):
355 def __init__(self
, log
, loop
, plugin_manager
):
358 self
._plugin
_manager
= plugin_manager
359 self
._nfvi
_metrics
= dict()
361 self
._vim
_to
_vdur
= dict()
362 self
._vdur
_to
_vim
= dict()
364 def create_entry(self
, account
, vdur
):
365 plugin
= self
._plugin
_manager
.plugin(account
.name
)
366 metrics
= NfviMetrics(self
._log
, self
._loop
, account
, plugin
, vdur
)
367 self
._nfvi
_metrics
[vdur
.vim_id
] = metrics
369 self
._vim
_to
_vdur
[vdur
.vim_id
] = vdur
.id
370 self
._vdur
_to
_vim
[vdur
.id] = vdur
.vim_id
372 def destroy_entry(self
, vdur_id
):
373 vim_id
= self
._vdur
_to
_vim
[vdur_id
]
375 del self
._nfvi
_metrics
[vim_id
]
376 del self
._vdur
_to
_vim
[vdur_id
]
377 del self
._vim
_to
_vdur
[vim_id
]
379 def retrieve(self
, vim_id
):
380 return self
._nfvi
_metrics
[vim_id
].retrieve()
382 def to_vim_id(self
, vdur_id
):
383 return self
._vdur
_to
_vim
[vdur_id
]
385 def to_vdur_id(self
, vim_id
):
386 return self
._vim
_to
_vdur
[vim_id
]
388 def contains_vdur_id(self
, vdur_id
):
389 return vdur_id
in self
._vdur
_to
_vim
391 def contains_vim_id(self
, vim_id
):
392 return vim_id
in self
._vim
_to
_vdur
395 class NfviInterface(object):
397 The NfviInterface serves as an interface for communicating with the
398 underlying infrastructure, i.e. retrieving metrics for VDURs that have been
399 registered with it and managing alarms.
401 The NfviInterface should only need to be invoked using a cloud account and
402 optionally a VIM ID; It should not need to handle mapping from VDUR ID to
406 def __init__(self
, loop
, log
, plugin_manager
, cache
):
407 """Creates an NfviInterface instance
412 plugin_manager - an instance of NfviMetricsPluginManager
413 cache - an instance of NfviMetricsCache
416 self
._executor
= concurrent
.futures
.ThreadPoolExecutor(max_workers
=16)
417 self
._plugin
_manager
= plugin_manager
424 """The event loop used by this NfviInterface"""
429 """The event log used by this NfviInterface"""
434 """The list of metrics contained in this NfviInterface"""
435 return list(self
._cache
._nfvi
_metrics
.values())
437 def nfvi_metrics_available(self
, account
):
438 plugin
= self
._plugin
_manager
.plugin(account
.name
)
439 _
, available
= plugin
.nfvi_metrics_available(account
)
442 def retrieve(self
, vdur_id
):
443 """Returns the NFVI metrics for the specified VDUR
445 Note, a VDUR must be registered with a NfviInterface before
446 metrics can be retrieved for it.
449 vdur_id - the ID of the VDUR to whose metrics should be retrieve
452 An NfviMetrics object for the specified VDUR
455 return self
._cache
.retrieve(self
._cache
.to_vim_id(vdur_id
))
458 def alarm_create(self
, account
, vim_id
, alarm
, timeout
=5):
459 """Create a new alarm
462 account - a CloudAccount instance
463 vim_id - the VM to associate with this alarm
464 alarm - an alarm structure
465 timeout - the request timeout (sec)
468 If the data source does not respond in a timely manner, an
469 asyncio.TimeoutError will be raised.
472 plugin
= self
._plugin
_manager
.plugin(account
.name
)
473 status
= yield from asyncio
.wait_for(
474 self
.loop
.run_in_executor(
476 plugin
.do_alarm_create
,
485 if status
== RwTypes
.RwStatus
.FAILURE
:
486 raise AlarmCreateError()
489 def alarm_destroy(self
, account
, alarm_id
, timeout
=5):
490 """Destroy an existing alarm
493 account - a CloudAccount instance
494 alarm_id - the identifier of the alarm to destroy
495 timeout - the request timeout (sec)
498 If the data source does not respond in a timely manner, an
499 asyncio.TimeoutError will be raised.
502 plugin
= self
._plugin
_manager
.plugin(account
.name
)
503 status
= yield from asyncio
.wait_for(
504 self
.loop
.run_in_executor(
506 plugin
.do_alarm_delete
,
514 if status
== RwTypes
.RwStatus
.FAILURE
:
515 raise AlarmDestroyError()
518 class InstanceConfiguration(object):
520 The InstanceConfiguration class represents configuration information that
521 affects the behavior of the monitor. Essentially this class should contain
522 not functional behavior but serve as a convenient way to share data amongst
523 the components of the monitoring system.
527 self
.polling_period
= None
528 self
.max_polling_frequency
= None
529 self
.min_cache_lifetime
= None
530 self
.public_ip
= None
533 class Monitor(object):
535 The Monitor class is intended to act as a unifying interface for the
536 different sub-systems that are used to monitor the NFVI.
539 def __init__(self
, loop
, log
, config
):
540 """Create a Monitor object
544 log - the logger used by this object
545 config - an instance of InstanceConfiguration
551 self
._cloud
_accounts
= dict()
552 self
._nfvi
_plugins
= NfviMetricsPluginManager(log
)
553 self
._cache
= NfviMetricsCache(log
, loop
, self
._nfvi
_plugins
)
554 self
._nfvi
_interface
= NfviInterface(loop
, log
, self
._nfvi
_plugins
, self
._cache
)
555 self
._config
= config
557 self
._vnfr
_to
_vdurs
= collections
.defaultdict(set)
558 self
._alarms
= collections
.defaultdict(list)
562 """The event loop used by this object"""
567 """The event log used by this object"""
572 """The NFVI metrics cache"""
577 """The list of metrics contained in this Monitor"""
578 return self
._nfvi
_interface
.metrics
580 def nfvi_metrics_available(self
, account
):
581 """Returns a boolean indicating whether NFVI metrics are available
584 account - the name of the cloud account to check
587 a boolean indicating availability of NFVI metrics
590 if account
not in self
._cloud
_accounts
:
593 cloud_account
= self
._cloud
_accounts
[account
]
594 return self
._nfvi
_interface
.nfvi_metrics_available(cloud_account
)
596 def add_cloud_account(self
, account
):
597 """Add a cloud account to the monitor
600 account - a cloud account object
603 If the cloud account has already been added to the monitor, an
604 AccountAlreadyRegisteredError is raised.
607 if account
.name
in self
._cloud
_accounts
:
608 raise AccountAlreadyRegisteredError(account
.name
)
610 self
._cloud
_accounts
[account
.name
] = account
612 if account
.account_type
== "openstack":
613 self
.register_cloud_account(account
, "monasca")
615 self
.register_cloud_account(account
, "mock")
617 def remove_cloud_account(self
, account_name
):
618 """Remove a cloud account from the monitor
621 account_name - removes the cloud account that has this name
624 If the specified cloud account cannot be found, an
625 UnknownAccountError is raised.
628 if account_name
not in self
._cloud
_accounts
:
629 raise UnknownAccountError()
631 # Make sure that there are no VNFRs associated with this account
632 for vnfr
in self
._vnfrs
.values():
633 if vnfr
.cloud_account
== account_name
:
634 raise AccountInUseError()
636 del self
._cloud
_accounts
[account_name
]
637 self
._nfvi
_plugins
.unregister(account_name
)
639 def get_cloud_account(self
, account_name
):
640 """Returns a cloud account by name
643 account_name - the name of the account to return
646 An UnknownAccountError is raised if there is not account object
647 associated with the provided name
650 A cloud account object
653 if account_name
not in self
._cloud
_accounts
:
654 raise UnknownAccountError()
656 return self
._cloud
_accounts
[account_name
]
658 def register_cloud_account(self
, account
, plugin_name
):
659 """Register a cloud account with an NFVI plugin
661 Note that a cloud account can only be registered for one plugin at a
665 account - the cloud account to associate with the plugin
666 plugin_name - the name of the plugin to use
669 self
._nfvi
_plugins
.register(account
, plugin_name
)
671 def add_vnfr(self
, vnfr
):
672 """Add a VNFR to the monitor
678 An UnknownAccountError is raised if the account name contained in
679 the VNFR does not reference a cloud account that has been added to
683 if vnfr
.cloud_account
not in self
._cloud
_accounts
:
684 raise UnknownAccountError()
686 account
= self
._cloud
_accounts
[vnfr
.cloud_account
]
688 for vdur
in vnfr
.vdur
:
690 self
.add_vdur(account
, vdur
)
691 self
._vnfr
_to
_vdurs
[vnfr
.id].add(vdur
.id)
692 except (VdurMissingVimIdError
, VdurAlreadyRegisteredError
):
695 self
._vnfrs
[vnfr
.id] = vnfr
697 def update_vnfr(self
, vnfr
):
698 """Updates the VNFR information in the monitor
704 An UnknownAccountError is raised if the account name contained in
705 the VNFR does not reference a cloud account that has been added to
709 if vnfr
.cloud_account
not in self
._cloud
_accounts
:
710 raise UnknownAccountError()
712 account
= self
._cloud
_accounts
[vnfr
.cloud_account
]
714 for vdur
in vnfr
.vdur
:
716 self
.add_vdur(account
, vdur
)
717 self
._vnfr
_to
_vdurs
[vnfr
.id].add(vdur
.id)
718 except (VdurMissingVimIdError
, VdurAlreadyRegisteredError
):
721 def remove_vnfr(self
, vnfr_id
):
722 """Remove a VNFR from the monitor
725 vnfr_id - the ID of the VNFR to remove
728 vdur_ids
= self
._vnfr
_to
_vdurs
[vnfr_id
]
730 for vdur_id
in vdur_ids
:
731 self
.remove_vdur(vdur_id
)
733 del self
._vnfrs
[vnfr_id
]
734 del self
._vnfr
_to
_vdurs
[vnfr_id
]
736 def add_vdur(self
, account
, vdur
):
737 """Adds a VDUR to the monitor
739 Adding a VDUR to the monitor will automatically create a NFVI metrics
740 object that is associated with the VDUR so that the monitor cane
741 provide the NFVI metrics associated with the VDUR.
744 account - the cloud account associated with the VNFR that contains
749 A VdurMissingVimIdError is raised if the provided VDUR does not
750 contain a VIM ID. A VdurAlreadyRegisteredError is raised if the ID
751 associated with the VDUR has already been registered.
755 raise VdurMissingVimIdError(vdur
.id)
757 if self
.is_registered_vdur(vdur
.id):
758 raise VdurAlreadyRegisteredError(vdur
.id)
760 self
.cache
.create_entry(account
, vdur
)
762 def remove_vdur(self
, vdur_id
):
763 """Removes a VDUR from the monitor
766 vdur_id - the ID of the VDUR to remove
769 self
.cache
.destroy_entry(vdur_id
)
771 # Schedule any alarms associated with the VDUR for destruction
772 for account_name
, alarm_id
in self
._alarms
[vdur_id
]:
773 self
.loop
.create_task(self
.destroy_alarm(account_name
, alarm_id
))
775 del self
._alarms
[vdur_id
]
777 def list_vdur(self
, vnfr_id
):
778 """Returns a list of VDURs
781 vnfr_id - the identifier of the VNFR contains the VDURs
787 return self
._vnfrs
[vnfr_id
].vdur
789 def is_registered_vnfr(self
, vnfr_id
):
790 """Returns True if the VNFR is registered with the monitor
793 vnfr_id - the ID of the VNFR to check
796 True if the VNFR is registered and False otherwise.
799 return vnfr_id
in self
._vnfrs
801 def is_registered_vdur(self
, vdur_id
):
802 """Returns True if the VDUR is registered with the monitor
805 vnfr_id - the ID of the VDUR to check
808 True if the VDUR is registered and False otherwise.
811 return self
.cache
.contains_vdur_id(vdur_id
)
813 def retrieve_nfvi_metrics(self
, vdur_id
):
814 """Retrieves the NFVI metrics associated with a VDUR
817 vdur_id - the ID of the VDUR whose metrics are to be retrieved
820 NFVI metrics for a VDUR
823 return self
._nfvi
_interface
.retrieve(vdur_id
)
826 def create_alarm(self
, account_name
, vdur_id
, alarm
):
827 """Create a new alarm
829 This function create an alarm and augments the provided endpoints with
830 endpoints to the launchpad if the launchpad has a public IP. The added
831 endpoints are of the form,
833 http://{host}:4568/{platform}/{vdur_id}/{action}
835 where the 'action' is one of 'ok', 'alarm', or 'insufficient_data'. The
836 messages that are pushed to the launchpad are not defined by RIFT so
837 we need to know which platform an alarm is sent from in order to
842 account_name - the name of the account to use to create the alarm
843 vdur_id - the identifier of the VDUR to associated with the
844 alarm. If the identifier is None, the alarm is not
845 associated with a specific VDUR.
846 alarm - the alarm data
849 account
= self
.get_cloud_account(account_name
)
850 vim_id
= self
.cache
.to_vim_id(vdur_id
)
852 # If the launchpad has a public IP, augment the action webhooks to
853 # include the launchpad so that the alarms can be broadcast as event
855 if self
._config
.public_ip
is not None:
856 url
= "http://{host}:4568/{platform}/{vdur_id}".format(
857 host
=self
._config
.public_ip
,
858 platform
=account
.account_type
,
861 alarm
.actions
.ok
.add().url
= url
+ "/ok"
862 alarm
.actions
.alarm
.add().url
= url
+ "/alarm"
863 alarm
.actions
.alarm
.add().url
= url
+ "/insufficient_data"
865 yield from self
._nfvi
_interface
.alarm_create(account
, vim_id
, alarm
)
867 # Associate the VDUR ID with the alarm ID
868 self
._alarms
[vdur_id
].append((account_name
, alarm
.alarm_id
))
871 def destroy_alarm(self
, account_name
, alarm_id
):
872 """Destroy an existing alarm
875 account_name - the name of the account that owns the alert
876 alarm_id - the identifier of the alarm to destroy
879 account
= self
.get_cloud_account(account_name
)
880 yield from self
._nfvi
_interface
.alarm_destroy(account
, alarm_id
)