Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/persistence/sql/utils.py: 90.83%
109 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15""" Holds constants and utility functions for the SQL scheduler. """
18import operator
19from datetime import datetime
20from typing import Any, Dict, List, Tuple, cast
22from sqlalchemy.sql.expression import ClauseElement, and_, or_
23from sqlalchemy.sql.operators import ColumnOperators
24from sqlalchemy.sql.schema import Column
26from buildgrid._exceptions import InvalidArgumentError
27from buildgrid.server.operations.filtering import OperationFilter, SortKey
28from buildgrid.server.persistence.sql.models import JobEntry, OperationEntry, PlatformEntry
30DATETIME_FORMAT = "%Y-%m-%d-%H-%M-%S-%f"
33LIST_OPERATIONS_PARAMETER_MODEL_MAP = cast(
34 Dict[str, Column[Any]],
35 {
36 "stage": JobEntry.stage,
37 "name": OperationEntry.name,
38 "queued_time": JobEntry.queued_timestamp,
39 "start_time": JobEntry.worker_start_timestamp,
40 "completed_time": JobEntry.worker_completed_timestamp,
41 "invocation_id": OperationEntry.invocation_id,
42 "correlated_invocations_id": OperationEntry.correlated_invocations_id,
43 "tool_name": OperationEntry.tool_name,
44 "tool_version": OperationEntry.tool_version,
45 "action_digest": JobEntry.action_digest,
46 "command": JobEntry.command,
47 "platform": PlatformEntry.key,
48 "platform-value": PlatformEntry.value,
49 },
50)
53def strtobool(val: str) -> bool:
54 """Convert a string representation of truth to true (1) or false (0).
55 True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
56 are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
57 'val' is anything else.
58 """
59 val = val.lower()
60 if val in ("y", "yes", "t", "true", "on", "1"):
61 return True
62 elif val in ("n", "no", "f", "false", "off", "0"):
63 return False
64 else:
65 raise ValueError(f"invalid truth value {val}")
68def parse_list_operations_sort_value(value: str, column: Column[Any]) -> Any:
69 """Convert the string representation of a value to the proper Python type."""
70 python_type = column.property.columns[0].type.python_type
71 if python_type == datetime:
72 return datetime.strptime(value, DATETIME_FORMAT)
73 elif python_type == bool:
74 # Using this distutils function to cover a few different bool representations
75 return strtobool(value)
76 else:
77 return python_type(value)
80def dump_list_operations_token_value(token_value: Any) -> str:
81 """Convert a value to a string for use in the page_token."""
82 if isinstance(token_value, datetime):
83 return datetime.strftime(token_value, DATETIME_FORMAT)
84 else:
85 return str(token_value)
88def build_pagination_clause_for_sort_key(
89 sort_value: Any, previous_sort_values: List[Any], sort_keys: List[SortKey]
90) -> ClauseElement:
91 """Build part of a filter clause to figure out the starting point of the page given
92 by the page_token. See the docstring of build_page_filter for more details."""
93 if len(sort_keys) <= len(previous_sort_values):
94 raise ValueError("Not enough keys to unpack")
96 filter_clause_list = []
97 for i, previous_sort_value in enumerate(previous_sort_values):
98 previous_sort_col = LIST_OPERATIONS_PARAMETER_MODEL_MAP[sort_keys[i].name]
99 filter_clause_list.append(previous_sort_col == previous_sort_value)
100 sort_key = sort_keys[len(previous_sort_values)]
101 sort_col = LIST_OPERATIONS_PARAMETER_MODEL_MAP[sort_key.name]
102 if sort_key.descending:
103 filter_clause_list.append(sort_col < sort_value)
104 else:
105 filter_clause_list.append(sort_col > sort_value)
106 return and_(*filter_clause_list)
109def build_page_filter(page_token: str, sort_keys: List[SortKey]) -> ClauseElement:
110 """Build a filter to determine the starting point of the rows to fetch, based
111 on the page_token.
113 The page_token is directly related to the sort order, and in this way it acts as a
114 "cursor." It is given in the format Xval|Yval|Zval|..., where each element is a value
115 corresponding to an orderable column in the database. If the corresponding rows are
116 X, Y, and Z, then X is the primary sort key, with Y breaking ties between X, and Z
117 breaking ties between X and Y. The corresponding filter clause is then:
119 (X > Xval) OR (X == XVal AND Y > Yval) OR (X == Xval AND Y == Yval AND Z > Zval) ...
120 """
121 # The page token is used as a "cursor" to determine the starting point
122 # of the rows to fetch. It is derived from the sort keys.
123 token_elements = page_token.split("|")
124 if len(token_elements) != len(sort_keys):
125 # It is possible that an extra "|" was in the input
126 # TODO: Handle extra "|"s somehow? Or at least allow escaping them
127 raise InvalidArgumentError(
128 f'Wrong number of "|"-separated elements in page token [{page_token}]. '
129 f"Expected {len(sort_keys)}, got {len(token_elements)}."
130 )
132 sort_key_clause_list = []
133 previous_sort_values: List[Any] = []
134 # Build the compound clause for each sort key in the token
135 for i, sort_key in enumerate(sort_keys):
136 col = LIST_OPERATIONS_PARAMETER_MODEL_MAP[sort_key.name]
137 sort_value = parse_list_operations_sort_value(token_elements[i], col)
138 filter_clause = build_pagination_clause_for_sort_key(sort_value, previous_sort_values, sort_keys)
139 sort_key_clause_list.append(filter_clause)
140 previous_sort_values.append(sort_value)
142 return or_(*sort_key_clause_list)
145def build_page_token(operation: Column[Any], sort_keys: List[SortKey]) -> str:
146 """Use the sort keys to build a page token from the given operation."""
147 token_values = []
148 for sort_key in sort_keys:
149 col = LIST_OPERATIONS_PARAMETER_MODEL_MAP[sort_key.name]
150 col_properties = col.property.columns[0]
151 column_name = col_properties.name
152 table_name = col_properties.table.name
153 if table_name == "operations":
154 token_value = getattr(operation, column_name)
155 elif table_name == "jobs":
156 token_value = getattr(operation.job, column_name)
157 else:
158 raise ValueError("Got invalid table f{table_name} for sort key {sort_key.name} while building page_token")
160 token_values.append(dump_list_operations_token_value(token_value))
162 next_page_token = "|".join(token_values)
163 return next_page_token
166def extract_sort_keys(operation_filters: List[OperationFilter]) -> Tuple[List[SortKey], List[OperationFilter]]:
167 """Splits the operation filters into sort keys and non-sort filters, returning both as
168 separate lists.
170 Sort keys are specified with the "sort_order" parameter in the filter string. Multiple
171 "sort_order"s can appear in the filter string, and all are extracted and returned."""
172 sort_keys = []
173 non_sort_filters = []
174 for op_filter in operation_filters:
175 if op_filter.parameter == "sort_order":
176 if op_filter.operator != operator.eq:
177 raise InvalidArgumentError('sort_order must be specified with the "=" operator.')
178 sort_keys.append(op_filter.value)
179 else:
180 non_sort_filters.append(op_filter)
182 return sort_keys, non_sort_filters
185def build_sort_column_list(sort_keys: List[SortKey]) -> List["ColumnOperators[Any]"]:
186 """Convert the list of sort keys into a list of columns that can be
187 passed to an order_by.
189 This function checks the sort keys to ensure that they are in the
190 parameter-model map and raises an InvalidArgumentError if they are not."""
191 sort_columns: List["ColumnOperators[Any]"] = []
192 for sort_key in sort_keys:
193 try:
194 col = LIST_OPERATIONS_PARAMETER_MODEL_MAP[sort_key.name]
195 if sort_key.descending:
196 sort_columns.append(col.desc())
197 else:
198 sort_columns.append(col.asc())
199 except KeyError:
200 raise InvalidArgumentError(f"[{sort_key.name}] is not a valid sort key.")
201 return sort_columns
204def convert_filter_to_sql_filter(operation_filter: OperationFilter) -> ClauseElement:
205 """Convert the internal representation of a filter to a representation that SQLAlchemy
206 can understand. The return type is a "ColumnElement," per the end of this section in
207 the SQLAlchemy docs: https://docs.sqlalchemy.org/en/13/core/tutorial.html#selecting-specific-columns
209 This function assumes that the parser has appropriately converted the filter
210 value to a Python type that can be compared to the parameter."""
211 try:
212 param = LIST_OPERATIONS_PARAMETER_MODEL_MAP[operation_filter.parameter]
213 except KeyError:
214 raise InvalidArgumentError(f"Invalid parameter: [{operation_filter.parameter}]")
216 if operation_filter.parameter == "command":
217 if operation_filter.operator == operator.eq:
218 return param.like(f"%{operation_filter.value}%")
219 elif operation_filter.operator == operator.ne:
220 return param.notlike(f"%{operation_filter.value}%") # type: ignore[no-any-return]
222 if operation_filter.parameter == "platform":
223 key, value = operation_filter.value.split(":", 1)
224 value_column = LIST_OPERATIONS_PARAMETER_MODEL_MAP["platform-value"]
225 return and_(param == key, operation_filter.operator(value_column, value))
227 # Better type? Returning Any from function declared to return "ClauseElement"
228 return operation_filter.operator(param, operation_filter.value) # type: ignore[no-any-return]
231def build_custom_filters(operation_filters: List[OperationFilter]) -> List[ClauseElement]:
232 return [
233 convert_filter_to_sql_filter(operation_filter)
234 for operation_filter in operation_filters
235 if operation_filter.parameter != "platform"
236 ]