Coverage for osmclient/cli_commands/subscriptions.py: 51%

59 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-22 09:01 +0000

1# Copyright ETSI Contributors and Others. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import click 

17from osmclient.common.exceptions import ClientException 

18from osmclient.cli_commands import utils 

19from prettytable import PrettyTable 

20import json 

21import logging 

22 

23logger = logging.getLogger("osmclient") 

24 

25 

26@click.command( 

27 name="subscription-create", 

28 short_help="creates a new subscription to a specific event", 

29) 

30@click.option( 

31 "--event_type", 

32 # type=click.Choice(['ns', 'nspkg', 'vnfpkg'], case_sensitive=False)) 

33 type=click.Choice(["ns"], case_sensitive=False), 

34 help="event type to be subscribed (for the moment, only ns is supported)", 

35) 

36@click.option("--event", default=None, help="specific yaml configuration for the event") 

37@click.option( 

38 "--event_file", default=None, help="specific yaml configuration file for the event" 

39) 

40@click.pass_context 

41def subscription_create(ctx, event_type, event, event_file): 

42 """creates a new subscription to a specific event""" 

43 logger.debug("") 

44 utils.check_client_version(ctx.obj, ctx.command.name) 

45 if event_file: 

46 if event: 

47 raise ClientException( 

48 '"--event" option is incompatible with "--event_file" option' 

49 ) 

50 with open(event_file, "r") as cf: 

51 event = cf.read() 

52 ctx.obj.subscription.create(event_type, event) 

53 

54 

55@click.command(name="subscription-delete", short_help="deletes a subscription") 

56@click.option( 

57 "--event_type", 

58 # type=click.Choice(['ns', 'nspkg', 'vnfpkg'], case_sensitive=False)) 

59 type=click.Choice(["ns"], case_sensitive=False), 

60 help="event type to be subscribed (for the moment, only ns is supported)", 

61) 

62@click.argument("subscription_id") 

63@click.option( 

64 "--force", is_flag=True, help="forces the deletion bypassing pre-conditions" 

65) 

66@click.pass_context 

67def subscription_delete(ctx, event_type, subscription_id, force): 

68 """deletes a subscription 

69 

70 SUBSCRIPTION_ID: ID of the subscription to be deleted 

71 """ 

72 logger.debug("") 

73 utils.check_client_version(ctx.obj, ctx.command.name) 

74 ctx.obj.subscription.delete(event_type, subscription_id, force) 

75 

76 

77@click.command(name="subscription-list", short_help="list all subscriptions") 

78@click.option( 

79 "--event_type", 

80 # type=click.Choice(['ns', 'nspkg', 'vnfpkg'], case_sensitive=False)) 

81 type=click.Choice(["ns"], case_sensitive=False), 

82 help="event type to be subscribed (for the moment, only ns is supported)", 

83) 

84@click.option( 

85 "--filter", 

86 default=None, 

87 multiple=True, 

88 help="restricts the list to the subscriptions matching the filter", 

89) 

90@click.pass_context 

91def subscription_list(ctx, event_type, filter): 

92 """list all subscriptions""" 

93 logger.debug("") 

94 utils.check_client_version(ctx.obj, ctx.command.name) 

95 if filter: 

96 filter = "&".join(filter) 

97 resp = ctx.obj.subscription.list(event_type, filter) 

98 table = PrettyTable(["id", "filter", "CallbackUri"]) 

99 for sub in resp: 

100 table.add_row( 

101 [ 

102 sub["_id"], 

103 utils.wrap_text(text=json.dumps(sub["filter"], indent=2), width=70), 

104 sub["CallbackUri"], 

105 ] 

106 ) 

107 table.align = "l" 

108 print(table) 

109 

110 

111@click.command( 

112 name="subscription-show", short_help="shows the details of a subscription" 

113) 

114@click.argument("subscription_id") 

115@click.option( 

116 "--event_type", 

117 # type=click.Choice(['ns', 'nspkg', 'vnfpkg'], case_sensitive=False)) 

118 type=click.Choice(["ns"], case_sensitive=False), 

119 help="event type to be subscribed (for the moment, only ns is supported)", 

120) 

121@click.option( 

122 "--filter", 

123 multiple=True, 

124 help="restricts the information to the fields in the filter", 

125) 

126@click.pass_context 

127def subscription_show(ctx, event_type, subscription_id, filter): 

128 """shows the details of a subscription 

129 

130 SUBSCRIPTION_ID: ID of the subscription 

131 """ 

132 logger.debug("") 

133 resp = ctx.obj.subscription.get(subscription_id) 

134 table = PrettyTable(["key", "attribute"]) 

135 for k, v in list(resp.items()): 

136 if not filter or k in filter: 

137 table.add_row([k, utils.wrap_text(text=json.dumps(v, indent=2), width=100)]) 

138 table.align = "l" 

139 print(table)