Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/persistence/sql/utils.py: 90.83%

109 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-04-15 14:01 +0000

1# Copyright (C) 2020 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15""" Holds constants and utility functions for the SQL scheduler. """ 

16 

17 

18import operator 

19from datetime import datetime 

20from typing import Any, Dict, List, Tuple, cast 

21 

22from sqlalchemy.sql.expression import ClauseElement, and_, or_ 

23from sqlalchemy.sql.operators import ColumnOperators 

24from sqlalchemy.sql.schema import Column 

25 

26from buildgrid._exceptions import InvalidArgumentError 

27from buildgrid.server.operations.filtering import OperationFilter, SortKey 

28from buildgrid.server.persistence.sql.models import JobEntry, OperationEntry, PlatformEntry 

29 

30DATETIME_FORMAT = "%Y-%m-%d-%H-%M-%S-%f" 

31 

32 

33LIST_OPERATIONS_PARAMETER_MODEL_MAP = cast( 

34 Dict[str, Column[Any]], 

35 { 

36 "stage": JobEntry.stage, 

37 "name": OperationEntry.name, 

38 "queued_time": JobEntry.queued_timestamp, 

39 "start_time": JobEntry.worker_start_timestamp, 

40 "completed_time": JobEntry.worker_completed_timestamp, 

41 "invocation_id": OperationEntry.invocation_id, 

42 "correlated_invocations_id": OperationEntry.correlated_invocations_id, 

43 "tool_name": OperationEntry.tool_name, 

44 "tool_version": OperationEntry.tool_version, 

45 "action_digest": JobEntry.action_digest, 

46 "command": JobEntry.command, 

47 "platform": PlatformEntry.key, 

48 "platform-value": PlatformEntry.value, 

49 }, 

50) 

51 

52 

53def strtobool(val: str) -> bool: 

54 """Convert a string representation of truth to true (1) or false (0). 

55 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values 

56 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if 

57 'val' is anything else. 

58 """ 

59 val = val.lower() 

60 if val in ("y", "yes", "t", "true", "on", "1"): 

61 return True 

62 elif val in ("n", "no", "f", "false", "off", "0"): 

63 return False 

64 else: 

65 raise ValueError(f"invalid truth value {val}") 

66 

67 

68def parse_list_operations_sort_value(value: str, column: Column[Any]) -> Any: 

69 """Convert the string representation of a value to the proper Python type.""" 

70 python_type = column.property.columns[0].type.python_type 

71 if python_type == datetime: 

72 return datetime.strptime(value, DATETIME_FORMAT) 

73 elif python_type == bool: 

74 # Using this distutils function to cover a few different bool representations 

75 return strtobool(value) 

76 else: 

77 return python_type(value) 

78 

79 

80def dump_list_operations_token_value(token_value: Any) -> str: 

81 """Convert a value to a string for use in the page_token.""" 

82 if isinstance(token_value, datetime): 

83 return datetime.strftime(token_value, DATETIME_FORMAT) 

84 else: 

85 return str(token_value) 

86 

87 

88def build_pagination_clause_for_sort_key( 

89 sort_value: Any, previous_sort_values: List[Any], sort_keys: List[SortKey] 

90) -> ClauseElement: 

91 """Build part of a filter clause to figure out the starting point of the page given 

92 by the page_token. See the docstring of build_page_filter for more details.""" 

93 if len(sort_keys) <= len(previous_sort_values): 

94 raise ValueError("Not enough keys to unpack") 

95 

96 filter_clause_list = [] 

97 for i, previous_sort_value in enumerate(previous_sort_values): 

98 previous_sort_col = LIST_OPERATIONS_PARAMETER_MODEL_MAP[sort_keys[i].name] 

99 filter_clause_list.append(previous_sort_col == previous_sort_value) 

100 sort_key = sort_keys[len(previous_sort_values)] 

101 sort_col = LIST_OPERATIONS_PARAMETER_MODEL_MAP[sort_key.name] 

102 if sort_key.descending: 

103 filter_clause_list.append(sort_col < sort_value) 

104 else: 

105 filter_clause_list.append(sort_col > sort_value) 

106 return and_(*filter_clause_list) 

107 

108 

109def build_page_filter(page_token: str, sort_keys: List[SortKey]) -> ClauseElement: 

110 """Build a filter to determine the starting point of the rows to fetch, based 

111 on the page_token. 

112 

113 The page_token is directly related to the sort order, and in this way it acts as a 

114 "cursor." It is given in the format Xval|Yval|Zval|..., where each element is a value 

115 corresponding to an orderable column in the database. If the corresponding rows are 

116 X, Y, and Z, then X is the primary sort key, with Y breaking ties between X, and Z 

117 breaking ties between X and Y. The corresponding filter clause is then: 

118 

119 (X > Xval) OR (X == XVal AND Y > Yval) OR (X == Xval AND Y == Yval AND Z > Zval) ... 

120 """ 

121 # The page token is used as a "cursor" to determine the starting point 

122 # of the rows to fetch. It is derived from the sort keys. 

123 token_elements = page_token.split("|") 

124 if len(token_elements) != len(sort_keys): 

125 # It is possible that an extra "|" was in the input 

126 # TODO: Handle extra "|"s somehow? Or at least allow escaping them 

127 raise InvalidArgumentError( 

128 f'Wrong number of "|"-separated elements in page token [{page_token}]. ' 

129 f"Expected {len(sort_keys)}, got {len(token_elements)}." 

130 ) 

131 

132 sort_key_clause_list = [] 

133 previous_sort_values: List[Any] = [] 

134 # Build the compound clause for each sort key in the token 

135 for i, sort_key in enumerate(sort_keys): 

136 col = LIST_OPERATIONS_PARAMETER_MODEL_MAP[sort_key.name] 

137 sort_value = parse_list_operations_sort_value(token_elements[i], col) 

138 filter_clause = build_pagination_clause_for_sort_key(sort_value, previous_sort_values, sort_keys) 

139 sort_key_clause_list.append(filter_clause) 

140 previous_sort_values.append(sort_value) 

141 

142 return or_(*sort_key_clause_list) 

143 

144 

145def build_page_token(operation: Column[Any], sort_keys: List[SortKey]) -> str: 

146 """Use the sort keys to build a page token from the given operation.""" 

147 token_values = [] 

148 for sort_key in sort_keys: 

149 col = LIST_OPERATIONS_PARAMETER_MODEL_MAP[sort_key.name] 

150 col_properties = col.property.columns[0] 

151 column_name = col_properties.name 

152 table_name = col_properties.table.name 

153 if table_name == "operations": 

154 token_value = getattr(operation, column_name) 

155 elif table_name == "jobs": 

156 token_value = getattr(operation.job, column_name) 

157 else: 

158 raise ValueError("Got invalid table f{table_name} for sort key {sort_key.name} while building page_token") 

159 

160 token_values.append(dump_list_operations_token_value(token_value)) 

161 

162 next_page_token = "|".join(token_values) 

163 return next_page_token 

164 

165 

166def extract_sort_keys(operation_filters: List[OperationFilter]) -> Tuple[List[SortKey], List[OperationFilter]]: 

167 """Splits the operation filters into sort keys and non-sort filters, returning both as 

168 separate lists. 

169 

170 Sort keys are specified with the "sort_order" parameter in the filter string. Multiple 

171 "sort_order"s can appear in the filter string, and all are extracted and returned.""" 

172 sort_keys = [] 

173 non_sort_filters = [] 

174 for op_filter in operation_filters: 

175 if op_filter.parameter == "sort_order": 

176 if op_filter.operator != operator.eq: 

177 raise InvalidArgumentError('sort_order must be specified with the "=" operator.') 

178 sort_keys.append(op_filter.value) 

179 else: 

180 non_sort_filters.append(op_filter) 

181 

182 return sort_keys, non_sort_filters 

183 

184 

185def build_sort_column_list(sort_keys: List[SortKey]) -> List["ColumnOperators[Any]"]: 

186 """Convert the list of sort keys into a list of columns that can be 

187 passed to an order_by. 

188 

189 This function checks the sort keys to ensure that they are in the 

190 parameter-model map and raises an InvalidArgumentError if they are not.""" 

191 sort_columns: List["ColumnOperators[Any]"] = [] 

192 for sort_key in sort_keys: 

193 try: 

194 col = LIST_OPERATIONS_PARAMETER_MODEL_MAP[sort_key.name] 

195 if sort_key.descending: 

196 sort_columns.append(col.desc()) 

197 else: 

198 sort_columns.append(col.asc()) 

199 except KeyError: 

200 raise InvalidArgumentError(f"[{sort_key.name}] is not a valid sort key.") 

201 return sort_columns 

202 

203 

204def convert_filter_to_sql_filter(operation_filter: OperationFilter) -> ClauseElement: 

205 """Convert the internal representation of a filter to a representation that SQLAlchemy 

206 can understand. The return type is a "ColumnElement," per the end of this section in 

207 the SQLAlchemy docs: https://docs.sqlalchemy.org/en/13/core/tutorial.html#selecting-specific-columns 

208 

209 This function assumes that the parser has appropriately converted the filter 

210 value to a Python type that can be compared to the parameter.""" 

211 try: 

212 param = LIST_OPERATIONS_PARAMETER_MODEL_MAP[operation_filter.parameter] 

213 except KeyError: 

214 raise InvalidArgumentError(f"Invalid parameter: [{operation_filter.parameter}]") 

215 

216 if operation_filter.parameter == "command": 

217 if operation_filter.operator == operator.eq: 

218 return param.like(f"%{operation_filter.value}%") 

219 elif operation_filter.operator == operator.ne: 

220 return param.notlike(f"%{operation_filter.value}%") # type: ignore[no-any-return] 

221 

222 if operation_filter.parameter == "platform": 

223 key, value = operation_filter.value.split(":", 1) 

224 value_column = LIST_OPERATIONS_PARAMETER_MODEL_MAP["platform-value"] 

225 return and_(param == key, operation_filter.operator(value_column, value)) 

226 

227 # Better type? Returning Any from function declared to return "ClauseElement" 

228 return operation_filter.operator(param, operation_filter.value) # type: ignore[no-any-return] 

229 

230 

231def build_custom_filters(operation_filters: List[OperationFilter]) -> List[ClauseElement]: 

232 return [ 

233 convert_filter_to_sql_filter(operation_filter) 

234 for operation_filter in operation_filters 

235 if operation_filter.parameter != "platform" 

236 ]