diff --git a/examples/bigquery/README.md b/examples/bigquery/README.md new file mode 100644 index 0000000..035d088 --- /dev/null +++ b/examples/bigquery/README.md @@ -0,0 +1,22 @@ +## Schema + +See [schema.sql](schema.sql) for the example schema. + +## Usage + +Define BigQuery connection details via environment variables: + +```bash +export BIGQUERY_PROJECT_ID=... +export BIGQUERY_DATASET=... +export BIGQUERY_VAULT_SECRET=... +``` +or in `hunter.yaml`. + +Also configure the credentials. See [config_credentials.sh](config_credentials.sh) for an example. + +The following command shows results for a single test `aggregate_mem` and updates the database with newly found change points: + +```bash +$ BRANCH=trunk HUNTER_CONFIG=hunter.yaml hunter analyze aggregate_mem --update-bigquery +``` diff --git a/examples/bigquery/config_credentials.sh b/examples/bigquery/config_credentials.sh new file mode 100644 index 0000000..dfe8e67 --- /dev/null +++ b/examples/bigquery/config_credentials.sh @@ -0,0 +1,8 @@ +# Configure the GCP BigQuery key. +touch bigquery_credentials.json +export BIGQUERY_CREDENTIALS=$(readlink -f bigquery_credentials.json) +echo "Loading ${BIGQUERY_CREDENTIALS} to export analysis summaries to BigQuery/Metabase." +# ie: export BIGQUERY_VAULT_SECRET=v1/ci/kv/gcp/flink_sql_bigquery +vault kv get -field=json "${BIGQUERY_VAULT_SECRET}" > "${BIGQUERY_CREDENTIALS}" +# You may also copy your credential json directly to the bigquery_credentials.json for this to work. +chmod 600 "${BIGQUERY_CREDENTIALS}" \ No newline at end of file diff --git a/examples/bigquery/hunter.yaml b/examples/bigquery/hunter.yaml new file mode 100644 index 0000000..3c4a533 --- /dev/null +++ b/examples/bigquery/hunter.yaml @@ -0,0 +1,53 @@ +# External systems connectors configuration: +bigquery: + project_id: ${BIGQUERY_PROJECT_ID} + dataset: ${BIGQUERY_DATASET} + credentials: ${BIGQUERY_CREDENTIALS} + +# Templates define common bits shared between test definitions: +templates: + common: + type: bigquery + time_column: commit_ts + attributes: [experiment_id, config_id, commit] + # required for --update-bigquery to work + update_statement: | + UPDATE ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.results + SET {metric}_rel_forward_change=%s, + {metric}_rel_backward_change=%s, + {metric}_p_value=%s + WHERE experiment_id = '{experiment_id}' AND config_id = {config_id} + metrics: + process_cumulative_rate_mean: + direction: 1 + scale: 1 + process_cumulative_rate_stderr: + direction: -1 + scale: 1 + process_cumulative_rate_diff: + direction: -1 + scale: 1 + +# Define your tests here: +tests: + aggregate_mem: + inherit: [ common ] # avoids repeating metrics definitions and postgres-related config + query: | + SELECT e.commit, + e.commit_ts, + r.process_cumulative_rate_mean, + r.process_cumulative_rate_stderr, + r.process_cumulative_rate_diff, + r.experiment_id, + r.config_id + FROM ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.results r + INNER JOIN ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.configs c ON r.config_id = c.id + INNER JOIN ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.experiments e ON r.experiment_id = e.id + WHERE e.exclude_from_analysis = false AND + e.branch = 'test-branch' AND + e.username = 'ci' AND + c.store = 'test-store' AND + c.cache = true AND + c.benchmark = 'tpcds' AND + c.instance_type = 'test-instance' + ORDER BY e.commit_ts ASC; \ No newline at end of file diff --git a/examples/bigquery/schema.sql b/examples/bigquery/schema.sql new file mode 100644 index 0000000..15a710f --- /dev/null +++ b/examples/bigquery/schema.sql @@ -0,0 +1,43 @@ +CREATE TABLE IF NOT EXISTS configs ( + id BIGINT PRIMARY KEY NOT ENFORCED, + benchmark STRING NOT NULL, + scenario STRING NOT NULL, + store STRING NOT NULL, + instance_type STRING NOT NULL, + cache BOOLEAN NOT NULL +); + +CREATE TABLE IF NOT EXISTS experiments ( + id BIGINT PRIMARY KEY NOT ENFORCED, + ts TIMESTAMP NOT NULL, + branch STRING NOT NULL, + commit STRING NOT NULL, + commit_ts TIMESTAMP NOT NULL, + username STRING NOT NULL, + details_url STRING NOT NULL, + exclude_from_analysis BOOLEAN DEFAULT false NOT NULL, + exclude_reason STRING +); + +CREATE TABLE IF NOT EXISTS results ( + experiment_id BIGINT NOT NULL REFERENCES flink_sql.experiments(id) NOT ENFORCED, + config_id BIGINT NOT NULL REFERENCES flink_sql.configs(id) NOT ENFORCED, + + process_cumulative_rate_mean BIGINT NOT NULL, + process_cumulative_rate_stderr BIGINT NOT NULL, + process_cumulative_rate_diff BIGINT NOT NULL, + + process_cumulative_rate_mean_rel_forward_change FLOAT64, + process_cumulative_rate_mean_rel_backward_change FLOAT64, + process_cumulative_rate_mean_p_value DECIMAL, + + process_cumulative_rate_stderr_rel_forward_change FLOAT64, + process_cumulative_rate_stderr_rel_backward_change FLOAT64, + process_cumulative_rate_stderr_p_value DECIMAL, + + process_cumulative_rate_diff_rel_forward_change FLOAT64, + process_cumulative_rate_diff_rel_backward_change FLOAT64, + process_cumulative_rate_diff_p_value DECIMAL, + + PRIMARY KEY (experiment_id, config_id) NOT ENFORCED +); \ No newline at end of file diff --git a/hunter/bigquery.py b/hunter/bigquery.py new file mode 100644 index 0000000..b43aed1 --- /dev/null +++ b/hunter/bigquery.py @@ -0,0 +1,68 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Dict + +from google.cloud import bigquery +from google.oauth2 import service_account + +from hunter.analysis import ChangePoint +from hunter.test_config import BigQueryTestConfig + + +@dataclass +class BigQueryConfig: + project_id: str + dataset: str + credentials: str + + +@dataclass +class BigQueryError(Exception): + message: str + + +class BigQuery: + __client = None + __config = None + + def __init__(self, config: BigQueryConfig): + self.__config = config + + @property + def client(self) -> bigquery.Client: + if self.__client is None: + credentials = service_account.Credentials.from_service_account_file( + self.__config.credentials, + scopes=["https://www.googleapis.com/auth/cloud-platform"], + ) + self.__client = bigquery.Client(credentials=credentials, project=credentials.project_id) + return self.__client + + def fetch_data(self, query: str): + query_job = self.client.query(query) # API request + results = query_job.result() + columns = [field.name for field in results.schema] + return (columns, results) + + def insert_change_point( + self, + test: BigQueryTestConfig, + metric_name: str, + attributes: Dict, + change_point: ChangePoint, + ): + kwargs = {**attributes, **{test.time_column: datetime.utcfromtimestamp(change_point.time)}} + update_stmt = test.update_stmt.format( + metric=metric_name, + forward_change_percent=change_point.forward_change_percent(), + backward_change_percent=change_point.backward_change_percent(), + p_value=change_point.stats.pvalue, + **kwargs + ) + query_job = self.client.query(update_stmt) + + # Wait for the query to finish + query_job.result() + + # Output the number of rows affected + print("Affected rows: {}".format(query_job.num_dml_affected_rows)) diff --git a/hunter/config.py b/hunter/config.py index d961457..a65a4f6 100644 --- a/hunter/config.py +++ b/hunter/config.py @@ -6,6 +6,7 @@ from expandvars import expandvars from ruamel.yaml import YAML +from hunter.bigquery import BigQueryConfig from hunter.grafana import GrafanaConfig from hunter.graphite import GraphiteConfig from hunter.postgres import PostgresConfig @@ -22,6 +23,7 @@ class Config: test_groups: Dict[str, List[TestConfig]] slack: SlackConfig postgres: PostgresConfig + bigquery: BigQueryConfig @dataclass @@ -133,6 +135,14 @@ def load_config_from(config_file: Path) -> Config: database=config["postgres"]["database"], ) + bigquery_config = None + if config.get("bigquery") is not None: + bigquery_config = BigQueryConfig( + project_id=config["bigquery"]["project_id"], + dataset=config["bigquery"]["dataset"], + credentials=config["bigquery"]["credentials"], + ) + templates = load_templates(config) tests = load_tests(config, templates) groups = load_test_groups(config, tests) @@ -142,6 +152,7 @@ def load_config_from(config_file: Path) -> Config: grafana=grafana_config, slack=slack_config, postgres=postgres_config, + bigquery=bigquery_config, tests=tests, test_groups=groups, ) diff --git a/hunter/importer.py b/hunter/importer.py index 0ae3f30..40677b7 100644 --- a/hunter/importer.py +++ b/hunter/importer.py @@ -7,12 +7,15 @@ from pathlib import Path from typing import Dict, List, Optional +from hunter.bigquery import BigQuery from hunter.config import Config from hunter.data_selector import DataSelector from hunter.graphite import DataPoint, Graphite, GraphiteError from hunter.postgres import Postgres from hunter.series import Metric, Series from hunter.test_config import ( + BigQueryMetric, + BigQueryTestConfig, CsvMetric, CsvTestConfig, GraphiteTestConfig, @@ -651,6 +654,111 @@ def fetch_all_attribute_names(self, test_conf: JsonTestConfig) -> List[str]: return [m for m in attr_names] +class BigQueryImporter(Importer): + __bigquery: BigQuery + + def __init__(self, bigquery: BigQuery): + self.__bigquery = bigquery + + @staticmethod + def __selected_metrics( + defined_metrics: Dict[str, BigQueryMetric], selected_metrics: Optional[List[str]] + ) -> Dict[str, BigQueryMetric]: + + if selected_metrics is not None: + return {name: defined_metrics[name] for name in selected_metrics} + else: + return defined_metrics + + def fetch_data( + self, test_conf: BigQueryTestConfig, selector: DataSelector = DataSelector() + ) -> Series: + if not isinstance(test_conf, BigQueryTestConfig): + raise ValueError("Expected BigQueryTestConfig") + + if selector.branch: + raise ValueError("BigQuery tests don't support branching yet") + + since_time = selector.since_time + until_time = selector.until_time + if since_time.timestamp() > until_time.timestamp(): + raise DataImportError( + f"Invalid time range: [" + f"{format_timestamp(int(since_time.timestamp()))}, " + f"{format_timestamp(int(until_time.timestamp()))}]" + ) + metrics = self.__selected_metrics(test_conf.metrics, selector.metrics) + + columns, rows = self.__bigquery.fetch_data(test_conf.query) + + # Decide which columns to fetch into which components of the result: + try: + time_index: int = columns.index(test_conf.time_column) + attr_indexes: List[int] = [columns.index(c) for c in test_conf.attributes] + metric_names = [m.name for m in metrics.values()] + metric_columns = [m.column for m in metrics.values()] + metric_indexes: List[int] = [columns.index(c) for c in metric_columns] + except ValueError as err: + raise DataImportError(f"Column not found {err.args[0]}") + + time: List[float] = [] + data: Dict[str, List[float]] = {} + for n in metric_names: + data[n] = [] + attributes: Dict[str, List[str]] = {} + for i in attr_indexes: + attributes[columns[i]] = [] + + for row in rows: + ts: datetime = row[time_index] + if since_time is not None and ts < since_time: + continue + if until_time is not None and ts >= until_time: + continue + time.append(ts.timestamp()) + + # Read metric values. Note we can still fail on conversion to float, + # because the user is free to override the column selection and thus + # they may select a column that contains non-numeric data: + for name, i in zip(metric_names, metric_indexes): + try: + data[name].append(float(row[i])) + except ValueError as err: + raise DataImportError( + "Could not convert value in column " + columns[i] + ": " + err.args[0] + ) + + # Attributes are just copied as-is, with no conversion: + for i in attr_indexes: + attributes[columns[i]].append(row[i]) + + # Convert metrics to series.Metrics + metrics = {m.name: Metric(m.direction, m.scale) for m in metrics.values()} + + # Leave last n points: + time = time[-selector.last_n_points :] + tmp = data + data = {} + for k, v in tmp.items(): + data[k] = v[-selector.last_n_points :] + tmp = attributes + attributes = {} + for k, v in tmp.items(): + attributes[k] = v[-selector.last_n_points :] + + return Series( + test_conf.name, + branch=None, + time=time, + metrics=metrics, + data=data, + attributes=attributes, + ) + + def fetch_all_metric_names(self, test_conf: BigQueryTestConfig) -> List[str]: + return [m for m in test_conf.metrics.keys()] + + class Importers: __config: Config __csv_importer: Optional[CsvImporter] @@ -658,6 +766,7 @@ class Importers: __histostat_importer: Optional[HistoStatImporter] __postgres_importer: Optional[PostgresImporter] __json_importer: Optional[JsonImporter] + __bigquery_importer: Optional[BigQueryImporter] def __init__(self, config: Config): self.__config = config @@ -666,6 +775,7 @@ def __init__(self, config: Config): self.__histostat_importer = None self.__postgres_importer = None self.__json_importer = None + self.__bigquery_importer = None def csv_importer(self) -> CsvImporter: if self.__csv_importer is None: @@ -692,6 +802,11 @@ def json_importer(self) -> JsonImporter: self.__json_importer = JsonImporter() return self.__json_importer + def bigquery_importer(self) -> BigQueryImporter: + if self.__bigquery_importer is None: + self.__bigquery_importer = BigQueryImporter(BigQuery(self.__config.bigquery)) + return self.__bigquery_importer + def get(self, test: TestConfig) -> Importer: if isinstance(test, CsvTestConfig): return self.csv_importer() @@ -703,5 +818,7 @@ def get(self, test: TestConfig) -> Importer: return self.postgres_importer() elif isinstance(test, JsonTestConfig): return self.json_importer() + elif isinstance(test, BigQueryTestConfig): + return self.bigquery_importer() else: raise ValueError(f"Unsupported test type {type(test)}") diff --git a/hunter/main.py b/hunter/main.py index eb167ee..6ffbda0 100644 --- a/hunter/main.py +++ b/hunter/main.py @@ -11,6 +11,7 @@ from hunter import config from hunter.attributes import get_back_links +from hunter.bigquery import BigQuery, BigQueryError from hunter.config import Config, ConfigError from hunter.data_selector import DataSelector from hunter.grafana import Annotation, Grafana, GrafanaError @@ -21,6 +22,7 @@ from hunter.series import AnalysisOptions, AnalyzedSeries, compare from hunter.slack import NotificationError, SlackNotifier from hunter.test_config import ( + BigQueryTestConfig, GraphiteTestConfig, PostgresTestConfig, TestConfig, @@ -40,6 +42,7 @@ class Hunter: __grafana: Optional[Grafana] __slack: Optional[SlackNotifier] __postgres: Optional[Postgres] + __bigquery: Optional[BigQuery] def __init__(self, conf: Config): self.__conf = conf @@ -47,6 +50,7 @@ def __init__(self, conf: Config): self.__grafana = None self.__slack = self.__maybe_create_slack_notifier() self.__postgres = None + self.__bigquery = None def list_tests(self, group_names: Optional[List[str]]): if group_names is not None: @@ -216,6 +220,11 @@ def __get_postgres(self) -> Postgres: self.__postgres = Postgres(self.__conf.postgres) return self.__postgres + def __get_bigquery(self) -> BigQuery: + if self.__bigquery is None: + self.__bigquery = BigQuery(self.__conf.bigquery) + return self.__bigquery + def update_postgres(self, test: PostgresTestConfig, series: AnalyzedSeries): postgres = self.__get_postgres() for metric_name, change_points in series.change_points.items(): @@ -223,6 +232,13 @@ def update_postgres(self, test: PostgresTestConfig, series: AnalyzedSeries): attributes = series.attributes_at(cp.index) postgres.insert_change_point(test, metric_name, attributes, cp) + def update_bigquery(self, test: BigQueryTestConfig, series: AnalyzedSeries): + bigquery = self.__get_bigquery() + for metric_name, change_points in series.change_points.items(): + for cp in change_points: + attributes = series.attributes_at(cp.index) + bigquery.insert_change_point(test, metric_name, attributes, cp) + def regressions( self, test: TestConfig, selector: DataSelector, options: AnalysisOptions ) -> bool: @@ -487,7 +503,6 @@ def main(): except ConfigError as err: logging.error(err.message) exit(1) - script_main(conf) @@ -523,6 +538,11 @@ def script_main(conf: Config, args: List[str] = None): help="Update PostgreSQL database results with change points", action="store_true", ) + analyze_parser.add_argument( + "--update-bigquery", + help="Update BigQuery database results with change points", + action="store_true", + ) analyze_parser.add_argument( "--notify-slack", help="Send notification containing a summary of change points to given Slack channels", @@ -582,6 +602,7 @@ def script_main(conf: Config, args: List[str] = None): if args.command == "analyze": update_grafana_flag = args.update_grafana update_postgres_flag = args.update_postgres + update_bigquery_flag = args.update_bigquery slack_notification_channels = args.notify_slack slack_cph_since = parse_datetime(args.cph_report_since) data_selector = data_selector_from_args(args) @@ -602,6 +623,10 @@ def script_main(conf: Config, args: List[str] = None): if not isinstance(test, PostgresTestConfig): raise PostgresError("Not a Postgres test") hunter.update_postgres(test, analyzed_series) + if update_bigquery_flag: + if not isinstance(test, BigQueryTestConfig): + raise BigQueryError("Not a BigQuery test") + hunter.update_bigquery(test, analyzed_series) if slack_notification_channels: tests_analyzed_series[test.name] = analyzed_series except DataImportError as err: diff --git a/hunter/test_config.py b/hunter/test_config.py index 7e01b79..7f881ef 100644 --- a/hunter/test_config.py +++ b/hunter/test_config.py @@ -157,6 +157,42 @@ def fully_qualified_metric_names(self) -> List[str]: return list(self.metrics.keys()) +@dataclass +class BigQueryMetric: + name: str + direction: int + scale: float + column: str + + +@dataclass +class BigQueryTestConfig(TestConfig): + query: str + update_stmt: str + time_column: str + attributes: List[str] + metrics: Dict[str, BigQueryMetric] + + def __init__( + self, + name: str, + query: str, + update_stmt: str = "", + time_column: str = "time", + metrics: List[BigQueryMetric] = None, + attributes: List[str] = None, + ): + self.name = name + self.query = query + self.time_column = time_column + self.metrics = {m.name: m for m in metrics} if metrics else {} + self.attributes = attributes + self.update_stmt = update_stmt + + def fully_qualified_metric_names(self) -> List[str]: + return list(self.metrics.keys()) + + def create_test_config(name: str, config: Dict) -> TestConfig: """ Loads properties of a test from a dictionary read from hunter's config file @@ -173,6 +209,8 @@ def create_test_config(name: str, config: Dict) -> TestConfig: return create_histostat_test_config(name, config) elif test_type == "postgres": return create_postgres_test_config(name, config) + elif test_type == "bigquery": + return create_bigquery_test_config(name, config) elif test_type == "json": return create_json_test_config(name, config) elif test_type is None: @@ -298,6 +336,36 @@ def create_postgres_test_config(test_name: str, test_info: Dict) -> PostgresTest raise TestConfigError(f"Configuration key not found in test {test_name}: {e.args[0]}") +def create_bigquery_test_config(test_name: str, test_info: Dict) -> PostgresTestConfig: + try: + time_column = test_info.get("time_column", "time") + attributes = test_info.get("attributes", []) + metrics_info = test_info.get("metrics") + query = test_info["query"] + update_stmt = test_info.get("update_statement", "") + + metrics = [] + if isinstance(metrics_info, List): + for name in metrics_info: + metrics.append(CsvMetric(name, 1, 1.0)) + elif isinstance(metrics_info, Dict): + for metric_name, metric_conf in metrics_info.items(): + metrics.append( + PostgresMetric( + name=metric_name, + column=metric_conf.get("column", metric_name), + direction=int(metric_conf.get("direction", "1")), + scale=float(metric_conf.get("scale", "1")), + ) + ) + else: + raise TestConfigError(f"Metrics of the test {test_name} must be a list or dictionary") + + return BigQueryTestConfig(test_name, query, update_stmt, time_column, metrics, attributes) + except KeyError as e: + raise TestConfigError(f"Configuration key not found in test {test_name}: {e.args[0]}") + + @dataclass class JsonTestConfig(TestConfig): name: str diff --git a/poetry.lock b/poetry.lock index 5e2057a..c04cfc2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -67,6 +67,14 @@ d = ["aiohttp (>=3.7.4)"] jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"] uvloop = ["uvloop (>=0.15.2)"] +[[package]] +name = "cachetools" +version = "5.5.0" +description = "Extensible memoizing collections and decorators" +category = "main" +optional = false +python-versions = ">=3.7" + [[package]] name = "certifi" version = "2024.6.2" @@ -172,6 +180,161 @@ mccabe = ">=0.6.0,<0.7.0" pycodestyle = ">=2.8.0,<2.9.0" pyflakes = ">=2.4.0,<2.5.0" +[[package]] +name = "google-api-core" +version = "2.19.2" +description = "Google API client core library" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +google-auth = ">=2.14.1,<3.0.dev0" +googleapis-common-protos = ">=1.56.2,<2.0.dev0" +grpcio = [ + {version = ">=1.33.2,<2.0dev", optional = true, markers = "extra == \"grpc\""}, + {version = ">=1.49.1,<2.0dev", optional = true, markers = "python_version >= \"3.11\" or python_version >= \"3.11\" and extra == \"grpc\""}, +] +grpcio-status = [ + {version = ">=1.33.2,<2.0.dev0", optional = true, markers = "extra == \"grpc\""}, + {version = ">=1.49.1,<2.0.dev0", optional = true, markers = "python_version >= \"3.11\" or python_version >= \"3.11\" and extra == \"grpc\""}, +] +proto-plus = ">=1.22.3,<2.0.0dev" +protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<6.0.0.dev0" +requests = ">=2.18.0,<3.0.0.dev0" + +[package.extras] +grpc = ["grpcio (>=1.33.2,<2.0dev)", "grpcio-status (>=1.33.2,<2.0.dev0)", "grpcio (>=1.49.1,<2.0dev)", "grpcio-status (>=1.49.1,<2.0.dev0)"] +grpcgcp = ["grpcio-gcp (>=0.2.2,<1.0.dev0)"] +grpcio-gcp = ["grpcio-gcp (>=0.2.2,<1.0.dev0)"] + +[[package]] +name = "google-auth" +version = "2.34.0" +description = "Google Authentication Library" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +cachetools = ">=2.0.0,<6.0" +pyasn1-modules = ">=0.2.1" +rsa = ">=3.1.4,<5" + +[package.extras] +aiohttp = ["aiohttp (>=3.6.2,<4.0.0.dev0)", "requests (>=2.20.0,<3.0.0.dev0)"] +enterprise-cert = ["cryptography", "pyopenssl"] +pyopenssl = ["pyopenssl (>=20.0.0)", "cryptography (>=38.0.3)"] +reauth = ["pyu2f (>=0.1.5)"] +requests = ["requests (>=2.20.0,<3.0.0.dev0)"] + +[[package]] +name = "google-cloud-bigquery" +version = "3.25.0" +description = "Google BigQuery API client library" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +google-api-core = {version = ">=1.34.1,<2.0.0 || >=2.11.0,<3.0.0dev", extras = ["grpc"]} +google-auth = ">=2.14.1,<3.0.0dev" +google-cloud-core = ">=1.6.0,<3.0.0dev" +google-resumable-media = ">=0.6.0,<3.0dev" +packaging = ">=20.0.0" +python-dateutil = ">=2.7.2,<3.0dev" +requests = ">=2.21.0,<3.0.0dev" + +[package.extras] +all = ["google-cloud-bigquery-storage (>=2.6.0,<3.0.0dev)", "grpcio (>=1.47.0,<2.0dev)", "pyarrow (>=3.0.0)", "pandas (>=1.1.0)", "db-dtypes (>=0.3.0,<2.0.0dev)", "ipywidgets (>=7.7.0)", "ipykernel (>=6.0.0)", "geopandas (>=0.9.0,<1.0dev)", "Shapely (>=1.8.4,<3.0.0dev)", "ipython (>=7.23.1,!=8.1.0)", "tqdm (>=4.7.4,<5.0.0dev)", "opentelemetry-api (>=1.1.0)", "opentelemetry-sdk (>=1.1.0)", "opentelemetry-instrumentation (>=0.20b0)", "proto-plus (>=1.15.0,<2.0.0dev)", "protobuf (>=3.19.5,!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev)", "importlib-metadata (>=1.0.0)", "grpcio (>=1.49.1,<2.0dev)"] +bigquery-v2 = ["proto-plus (>=1.15.0,<2.0.0dev)", "protobuf (>=3.19.5,!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev)"] +bqstorage = ["google-cloud-bigquery-storage (>=2.6.0,<3.0.0dev)", "grpcio (>=1.47.0,<2.0dev)", "pyarrow (>=3.0.0)", "grpcio (>=1.49.1,<2.0dev)"] +geopandas = ["geopandas (>=0.9.0,<1.0dev)", "Shapely (>=1.8.4,<3.0.0dev)"] +ipython = ["ipython (>=7.23.1,!=8.1.0)", "ipykernel (>=6.0.0)"] +ipywidgets = ["ipywidgets (>=7.7.0)", "ipykernel (>=6.0.0)"] +opentelemetry = ["opentelemetry-api (>=1.1.0)", "opentelemetry-sdk (>=1.1.0)", "opentelemetry-instrumentation (>=0.20b0)"] +pandas = ["pandas (>=1.1.0)", "pyarrow (>=3.0.0)", "db-dtypes (>=0.3.0,<2.0.0dev)", "importlib-metadata (>=1.0.0)"] +tqdm = ["tqdm (>=4.7.4,<5.0.0dev)"] + +[[package]] +name = "google-cloud-core" +version = "2.4.1" +description = "Google Cloud API client core library" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +google-api-core = ">=1.31.6,<2.0.0 || >2.3.0,<3.0.0dev" +google-auth = ">=1.25.0,<3.0dev" + +[package.extras] +grpc = ["grpcio (>=1.38.0,<2.0dev)", "grpcio-status (>=1.38.0,<2.0.dev0)"] + +[[package]] +name = "google-crc32c" +version = "1.5.0" +description = "A python wrapper of the C library 'Google CRC32C'" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.extras] +testing = ["pytest"] + +[[package]] +name = "google-resumable-media" +version = "2.7.2" +description = "Utilities for Google Media Downloads and Resumable Uploads" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +google-crc32c = ">=1.0,<2.0dev" + +[package.extras] +aiohttp = ["aiohttp (>=3.6.2,<4.0.0dev)", "google-auth (>=1.22.0,<2.0dev)"] +requests = ["requests (>=2.18.0,<3.0.0dev)"] + +[[package]] +name = "googleapis-common-protos" +version = "1.65.0" +description = "Common protobufs used in Google APIs" +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +protobuf = ">=3.20.2,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<6.0.0.dev0" + +[package.extras] +grpc = ["grpcio (>=1.44.0,<2.0.0.dev0)"] + +[[package]] +name = "grpcio" +version = "1.66.1" +description = "HTTP/2-based RPC framework" +category = "main" +optional = false +python-versions = ">=3.8" + +[package.extras] +protobuf = ["grpcio-tools (>=1.66.1)"] + +[[package]] +name = "grpcio-status" +version = "1.66.1" +description = "Status proto mapping for gRPC" +category = "main" +optional = false +python-versions = ">=3.8" + +[package.dependencies] +googleapis-common-protos = ">=1.5.5" +grpcio = ">=1.66.1" +protobuf = ">=5.26.1,<6.0dev" + [[package]] name = "idna" version = "3.7" @@ -251,7 +414,7 @@ python-versions = ">=3.8" name = "packaging" version = "24.1" description = "Core utilities for Python packages" -category = "dev" +category = "main" optional = false python-versions = ">=3.8" @@ -288,6 +451,28 @@ python-versions = ">=3.8" testing = ["pytest-benchmark", "pytest"] dev = ["tox", "pre-commit"] +[[package]] +name = "proto-plus" +version = "1.24.0" +description = "Beautiful, Pythonic protocol buffers." +category = "main" +optional = false +python-versions = ">=3.7" + +[package.dependencies] +protobuf = ">=3.19.0,<6.0.0dev" + +[package.extras] +testing = ["google-api-core (>=1.31.5)"] + +[[package]] +name = "protobuf" +version = "5.28.0" +description = "" +category = "main" +optional = false +python-versions = ">=3.8" + [[package]] name = "psycopg2-binary" version = "2.9.9" @@ -304,6 +489,25 @@ category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +[[package]] +name = "pyasn1" +version = "0.6.0" +description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)" +category = "main" +optional = false +python-versions = ">=3.8" + +[[package]] +name = "pyasn1-modules" +version = "0.4.0" +description = "A collection of ASN.1-based protocols modules" +category = "main" +optional = false +python-versions = ">=3.8" + +[package.dependencies] +pyasn1 = ">=0.4.6,<0.7.0" + [[package]] name = "pycodestyle" version = "2.8.0" @@ -402,6 +606,17 @@ urllib3 = ">=1.21.1,<3" socks = ["PySocks (>=1.5.6,!=1.5.7)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +[[package]] +name = "rsa" +version = "4.9" +description = "Pure-Python RSA implementation" +category = "main" +optional = false +python-versions = ">=3.6,<4" + +[package.dependencies] +pyasn1 = ">=0.1.3" + [[package]] name = "ruamel.yaml" version = "0.17.21" @@ -632,7 +847,7 @@ test = ["pytest (>=6,<8.1.0 || >=8.2.0)", "pytest-checkdocs (>=2.4)", "pytest-co [metadata] lock-version = "1.1" python-versions = ">=3.8,<3.13" -content-hash = "89df43ac35ccf05fe7c0c132dc9ebbf1d0158c297763935d8c7ae5a028e68a6f" +content-hash = "5cc46e5e57ac49496685cee40d5155a9c22992832162d252db0565a363499e87" [metadata.files] atomicwrites = [] @@ -640,6 +855,7 @@ attrs = [] autoflake = [] "backports.zoneinfo" = [] black = [] +cachetools = [] certifi = [] charset-normalizer = [] click = [] @@ -650,6 +866,15 @@ distlib = [] expandvars = [] filelock = [] flake8 = [] +google-api-core = [] +google-auth = [] +google-cloud-bigquery = [] +google-cloud-core = [] +google-crc32c = [] +google-resumable-media = [] +googleapis-common-protos = [] +grpcio = [] +grpcio-status = [] idna = [] importlib-metadata = [] iniconfig = [] @@ -662,8 +887,12 @@ packaging = [] pathspec = [] platformdirs = [] pluggy = [] +proto-plus = [] +protobuf = [] psycopg2-binary = [] py = [] +pyasn1 = [] +pyasn1-modules = [] pycodestyle = [] pyflakes = [] pystache = [] @@ -672,6 +901,7 @@ python-dateutil = [] pytz = [] regex = [] requests = [] +rsa = [] "ruamel.yaml" = [] "ruamel.yaml.clib" = [] scipy = [] diff --git a/pyproject.toml b/pyproject.toml index 8cbc7b4..d40be0c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ pystache = "^0.6.0" tabulate = "^0.8.7" validators = "^0.18.2" slack-sdk = "^3.4.2" +google-cloud-bigquery = "^3.25.0" [tool.poetry.dev-dependencies] diff --git a/tests/importer_test.py b/tests/importer_test.py index 600f06a..47ff752 100644 --- a/tests/importer_test.py +++ b/tests/importer_test.py @@ -4,8 +4,15 @@ from hunter.csv_options import CsvOptions from hunter.graphite import DataSelector -from hunter.importer import CsvImporter, HistoStatImporter, PostgresImporter +from hunter.importer import ( + BigQueryImporter, + CsvImporter, + HistoStatImporter, + PostgresImporter, +) from hunter.test_config import ( + BigQueryMetric, + BigQueryTestConfig, CsvMetric, CsvTestConfig, HistoStatTestConfig, @@ -198,3 +205,79 @@ def test_import_postgres_last_n_points(): assert len(series.time) == 5 assert len(series.data["m2"]) == 5 assert len(series.attributes["commit"]) == 5 + + +class MockBigQuery: + def fetch_data(self, query: str): + return ( + ["time", "metric1", "metric2", "commit"], + [ + (datetime(2022, 7, 1, 15, 11, tzinfo=pytz.UTC), 2, 3, "aaabbb"), + (datetime(2022, 7, 2, 16, 22, tzinfo=pytz.UTC), 5, 6, "cccddd"), + (datetime(2022, 7, 3, 17, 13, tzinfo=pytz.UTC), 2, 3, "aaaccc"), + (datetime(2022, 7, 4, 18, 24, tzinfo=pytz.UTC), 5, 6, "ccc123"), + (datetime(2022, 7, 5, 19, 15, tzinfo=pytz.UTC), 2, 3, "aaa493"), + (datetime(2022, 7, 6, 20, 26, tzinfo=pytz.UTC), 5, 6, "cccfgl"), + (datetime(2022, 7, 7, 21, 17, tzinfo=pytz.UTC), 2, 3, "aaalll"), + (datetime(2022, 7, 8, 22, 28, tzinfo=pytz.UTC), 5, 6, "cccccc"), + (datetime(2022, 7, 9, 23, 19, tzinfo=pytz.UTC), 2, 3, "aadddd"), + (datetime(2022, 7, 10, 9, 29, tzinfo=pytz.UTC), 5, 6, "cciiii"), + ], + ) + + +def test_import_bigquery(): + test = BigQueryTestConfig( + name="test", + query="SELECT * FROM sample;", + time_column="time", + metrics=[BigQueryMetric("m1", 1, 1.0, "metric1"), BigQueryMetric("m2", 1, 5.0, "metric2")], + attributes=["commit"], + ) + importer = BigQueryImporter(MockBigQuery()) + series = importer.fetch_data(test_conf=test, selector=data_selector()) + assert len(series.data.keys()) == 2 + assert len(series.time) == 10 + assert len(series.data["m1"]) == 10 + assert len(series.data["m2"]) == 10 + assert len(series.attributes["commit"]) == 10 + assert series.metrics["m2"].scale == 5.0 + + +def test_import_bigquery_with_time_filter(): + test = BigQueryTestConfig( + name="test", + query="SELECT * FROM sample;", + time_column="time", + metrics=[BigQueryMetric("m1", 1, 1.0, "metric1"), BigQueryMetric("m2", 1, 5.0, "metric2")], + attributes=["commit"], + ) + + importer = BigQueryImporter(MockBigQuery()) + selector = DataSelector() + tz = pytz.timezone("Etc/GMT+1") + selector.since_time = datetime(2022, 7, 8, 0, 0, 0, tzinfo=tz) + selector.until_time = datetime(2022, 7, 10, 0, 0, 0, tzinfo=tz) + series = importer.fetch_data(test, selector=selector) + assert len(series.data.keys()) == 2 + assert len(series.time) == 2 + assert len(series.data["m1"]) == 2 + assert len(series.data["m2"]) == 2 + + +def test_import_bigquery_last_n_points(): + test = BigQueryTestConfig( + name="test", + query="SELECT * FROM sample;", + time_column="time", + metrics=[BigQueryMetric("m1", 1, 1.0, "metric1"), BigQueryMetric("m2", 1, 5.0, "metric2")], + attributes=["commit"], + ) + + importer = BigQueryImporter(MockBigQuery()) + selector = data_selector() + selector.last_n_points = 5 + series = importer.fetch_data(test, selector=selector) + assert len(series.time) == 5 + assert len(series.data["m2"]) == 5 + assert len(series.attributes["commit"]) == 5