Skip to content

Commit

Permalink
Added bigquery support (#16)
Browse files Browse the repository at this point in the history
* Added BigQuery support

* Update bigquery.py
  • Loading branch information
lyowang authored and Gerrrr committed Sep 24, 2024
1 parent dc88a38 commit eeacce7
Show file tree
Hide file tree
Showing 12 changed files with 733 additions and 4 deletions.
22 changes: 22 additions & 0 deletions examples/bigquery/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## Schema

See [schema.sql](schema.sql) for the example schema.

## Usage

Define BigQuery connection details via environment variables:

```bash
export BIGQUERY_PROJECT_ID=...
export BIGQUERY_DATASET=...
export BIGQUERY_VAULT_SECRET=...
```
or in `hunter.yaml`.

Also configure the credentials. See [config_credentials.sh](config_credentials.sh) for an example.

The following command shows results for a single test `aggregate_mem` and updates the database with newly found change points:

```bash
$ BRANCH=trunk HUNTER_CONFIG=hunter.yaml hunter analyze aggregate_mem --update-bigquery
```
8 changes: 8 additions & 0 deletions examples/bigquery/config_credentials.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Configure the GCP BigQuery key.
touch bigquery_credentials.json
export BIGQUERY_CREDENTIALS=$(readlink -f bigquery_credentials.json)
echo "Loading ${BIGQUERY_CREDENTIALS} to export analysis summaries to BigQuery/Metabase."
# ie: export BIGQUERY_VAULT_SECRET=v1/ci/kv/gcp/flink_sql_bigquery
vault kv get -field=json "${BIGQUERY_VAULT_SECRET}" > "${BIGQUERY_CREDENTIALS}"
# You may also copy your credential json directly to the bigquery_credentials.json for this to work.
chmod 600 "${BIGQUERY_CREDENTIALS}"
53 changes: 53 additions & 0 deletions examples/bigquery/hunter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# External systems connectors configuration:
bigquery:
project_id: ${BIGQUERY_PROJECT_ID}
dataset: ${BIGQUERY_DATASET}
credentials: ${BIGQUERY_CREDENTIALS}

# Templates define common bits shared between test definitions:
templates:
common:
type: bigquery
time_column: commit_ts
attributes: [experiment_id, config_id, commit]
# required for --update-bigquery to work
update_statement: |
UPDATE ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.results
SET {metric}_rel_forward_change=%s,
{metric}_rel_backward_change=%s,
{metric}_p_value=%s
WHERE experiment_id = '{experiment_id}' AND config_id = {config_id}
metrics:
process_cumulative_rate_mean:
direction: 1
scale: 1
process_cumulative_rate_stderr:
direction: -1
scale: 1
process_cumulative_rate_diff:
direction: -1
scale: 1

# Define your tests here:
tests:
aggregate_mem:
inherit: [ common ] # avoids repeating metrics definitions and postgres-related config
query: |
SELECT e.commit,
e.commit_ts,
r.process_cumulative_rate_mean,
r.process_cumulative_rate_stderr,
r.process_cumulative_rate_diff,
r.experiment_id,
r.config_id
FROM ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.results r
INNER JOIN ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.configs c ON r.config_id = c.id
INNER JOIN ${BIGQUERY_PROJECT_ID}.${BIGQUERY_DATASET}.experiments e ON r.experiment_id = e.id
WHERE e.exclude_from_analysis = false AND
e.branch = 'test-branch' AND
e.username = 'ci' AND
c.store = 'test-store' AND
c.cache = true AND
c.benchmark = 'tpcds' AND
c.instance_type = 'test-instance'
ORDER BY e.commit_ts ASC;
43 changes: 43 additions & 0 deletions examples/bigquery/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
CREATE TABLE IF NOT EXISTS configs (
id BIGINT PRIMARY KEY NOT ENFORCED,
benchmark STRING NOT NULL,
scenario STRING NOT NULL,
store STRING NOT NULL,
instance_type STRING NOT NULL,
cache BOOLEAN NOT NULL
);

CREATE TABLE IF NOT EXISTS experiments (
id BIGINT PRIMARY KEY NOT ENFORCED,
ts TIMESTAMP NOT NULL,
branch STRING NOT NULL,
commit STRING NOT NULL,
commit_ts TIMESTAMP NOT NULL,
username STRING NOT NULL,
details_url STRING NOT NULL,
exclude_from_analysis BOOLEAN DEFAULT false NOT NULL,
exclude_reason STRING
);

CREATE TABLE IF NOT EXISTS results (
experiment_id BIGINT NOT NULL REFERENCES flink_sql.experiments(id) NOT ENFORCED,
config_id BIGINT NOT NULL REFERENCES flink_sql.configs(id) NOT ENFORCED,

process_cumulative_rate_mean BIGINT NOT NULL,
process_cumulative_rate_stderr BIGINT NOT NULL,
process_cumulative_rate_diff BIGINT NOT NULL,

process_cumulative_rate_mean_rel_forward_change FLOAT64,
process_cumulative_rate_mean_rel_backward_change FLOAT64,
process_cumulative_rate_mean_p_value DECIMAL,

process_cumulative_rate_stderr_rel_forward_change FLOAT64,
process_cumulative_rate_stderr_rel_backward_change FLOAT64,
process_cumulative_rate_stderr_p_value DECIMAL,

process_cumulative_rate_diff_rel_forward_change FLOAT64,
process_cumulative_rate_diff_rel_backward_change FLOAT64,
process_cumulative_rate_diff_p_value DECIMAL,

PRIMARY KEY (experiment_id, config_id) NOT ENFORCED
);
68 changes: 68 additions & 0 deletions hunter/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Dict

from google.cloud import bigquery
from google.oauth2 import service_account

from hunter.analysis import ChangePoint
from hunter.test_config import BigQueryTestConfig


@dataclass
class BigQueryConfig:
project_id: str
dataset: str
credentials: str


@dataclass
class BigQueryError(Exception):
message: str


class BigQuery:
__client = None
__config = None

def __init__(self, config: BigQueryConfig):
self.__config = config

@property
def client(self) -> bigquery.Client:
if self.__client is None:
credentials = service_account.Credentials.from_service_account_file(
self.__config.credentials,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
self.__client = bigquery.Client(credentials=credentials, project=credentials.project_id)
return self.__client

def fetch_data(self, query: str):
query_job = self.client.query(query) # API request
results = query_job.result()
columns = [field.name for field in results.schema]
return (columns, results)

def insert_change_point(
self,
test: BigQueryTestConfig,
metric_name: str,
attributes: Dict,
change_point: ChangePoint,
):
kwargs = {**attributes, **{test.time_column: datetime.utcfromtimestamp(change_point.time)}}
update_stmt = test.update_stmt.format(
metric=metric_name,
forward_change_percent=change_point.forward_change_percent(),
backward_change_percent=change_point.backward_change_percent(),
p_value=change_point.stats.pvalue,
**kwargs
)
query_job = self.client.query(update_stmt)

# Wait for the query to finish
query_job.result()

# Output the number of rows affected
print("Affected rows: {}".format(query_job.num_dml_affected_rows))
11 changes: 11 additions & 0 deletions hunter/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from expandvars import expandvars
from ruamel.yaml import YAML

from hunter.bigquery import BigQueryConfig
from hunter.grafana import GrafanaConfig
from hunter.graphite import GraphiteConfig
from hunter.postgres import PostgresConfig
Expand All @@ -22,6 +23,7 @@ class Config:
test_groups: Dict[str, List[TestConfig]]
slack: SlackConfig
postgres: PostgresConfig
bigquery: BigQueryConfig


@dataclass
Expand Down Expand Up @@ -133,6 +135,14 @@ def load_config_from(config_file: Path) -> Config:
database=config["postgres"]["database"],
)

bigquery_config = None
if config.get("bigquery") is not None:
bigquery_config = BigQueryConfig(
project_id=config["bigquery"]["project_id"],
dataset=config["bigquery"]["dataset"],
credentials=config["bigquery"]["credentials"],
)

templates = load_templates(config)
tests = load_tests(config, templates)
groups = load_test_groups(config, tests)
Expand All @@ -142,6 +152,7 @@ def load_config_from(config_file: Path) -> Config:
grafana=grafana_config,
slack=slack_config,
postgres=postgres_config,
bigquery=bigquery_config,
tests=tests,
test_groups=groups,
)
Expand Down
Loading

0 comments on commit eeacce7

Please sign in to comment.