Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use session state service impl to change readOnly state #322

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
push:
branches:
- main
- refactor/update-read-only
paths-ignore:
- "**/*.md"
- "**/*.jpg"
Expand Down
6 changes: 0 additions & 6 deletions common/lib/aws_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ export abstract class AwsClient extends EventEmitter {
protected pluginManager: PluginManager;
protected pluginService: PluginService;
protected isConnected: boolean = false;
protected _isReadOnly: boolean = false;
protected _isolationLevel: number = 0;
protected _connectionUrlParser: ConnectionUrlParser;
protected _configurationProfile: ConfigurationProfile | null = null;
readonly properties: Map<string, any>;
Expand Down Expand Up @@ -151,8 +149,6 @@ export abstract class AwsClient extends EventEmitter {
return this._connectionUrlParser;
}

abstract updateSessionStateReadOnly(readOnly: boolean): Promise<any | void>;

abstract setReadOnly(readOnly: boolean): Promise<any | void>;

abstract isReadOnly(): boolean;
Expand All @@ -179,8 +175,6 @@ export abstract class AwsClient extends EventEmitter {

abstract rollback(): Promise<any>;

abstract resetState(): void;

async isValid(): Promise<boolean> {
if (!this.targetClient) {
return Promise.resolve(false);
Expand Down
1 change: 1 addition & 0 deletions common/lib/client_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import { HostInfo } from "./host_info";
import { SessionState } from "./session_state";
Copy link
Contributor

@joyc-bq joyc-bq Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this an unneeded import?


export interface ClientWrapper {
readonly client: any;
Expand Down
6 changes: 6 additions & 0 deletions common/lib/database_dialect/database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { HostListProviderService } from "../host_list_provider_service";
import { ClientWrapper } from "../client_wrapper";
import { FailoverRestriction } from "../plugins/failover/failover_restriction";
import { ErrorHandler } from "../error_handler";
import { SessionState } from "../session_state";

export enum DatabaseType {
MYSQL,
Expand All @@ -30,6 +31,11 @@ export interface DatabaseDialect {
getHostAliasQuery(): string;
getHostAliasAndParseResults(targetClient: ClientWrapper): Promise<string>;
getServerVersionQuery(): string;
getSetReadOnlyQuery(readOnly: boolean): string;
getSetAutoCommitQuery(autoCommit: boolean): string;
getSetTransactionIsolationQuery(level: number): string;
getSetCatalogQuery(catalog: string): string;
getSetSchemaQuery(schema: string): string;
getDialectUpdateCandidates(): string[];
getErrorHandler(): ErrorHandler;
isDialect(targetClient: ClientWrapper): Promise<boolean>;
Expand Down
2 changes: 2 additions & 0 deletions common/lib/mysql_client_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import { ClientWrapper } from "./client_wrapper";
import { HostInfo } from "./host_info";
import { ClientUtils } from "./utils/client_utils";
import { uniqueId } from "../logutils";
import { SessionState } from "./session_state";
import { TransactionIsolationLevel } from "./utils/transaction_isolation_level";
import { DriverDialect } from "./driver_dialect/driver_dialect";

/*
Expand Down
4 changes: 4 additions & 0 deletions common/lib/pg_client_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import { ClientWrapper } from "./client_wrapper";
import { HostInfo } from "./host_info";
import { uniqueId } from "../logutils";
import { SessionState } from "./session_state";
import { TransactionIsolationLevel } from "./utils/transaction_isolation_level";

/*
This an internal wrapper class for a target community driver client created by the NodePostgresPgDriverDialect.
Expand All @@ -26,6 +28,7 @@ export class PgClientWrapper implements ClientWrapper {
readonly hostInfo: HostInfo;
readonly properties: Map<string, string>;
readonly id: string;
readonly sessionState = new SessionState();

/**
* Creates a wrapper for the target community driver client.
Expand All @@ -38,6 +41,7 @@ export class PgClientWrapper implements ClientWrapper {
this.client = targetClient;
this.hostInfo = hostInfo;
this.properties = properties;
this.sessionState = new SessionState();
this.id = uniqueId("PgClient_");
}

Expand Down
14 changes: 7 additions & 7 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { getWriter } from "./utils/utils";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { DriverDialect } from "./driver_dialect/driver_dialect";
import { ConfigurationProfile } from "./profile/configuration_profile";
import { SessionState } from "./session_state";

export class PluginService implements ErrorHandler, HostListProviderService {
private readonly _currentClient: AwsClient;
Expand Down Expand Up @@ -75,10 +76,10 @@ export class PluginService implements ErrorHandler, HostListProviderService {
this.dbDialectProvider = new DatabaseDialectManager(knownDialectsByCode, dbType, this.props);
this.driverDialect = driverDialect;
this.initialHost = props.get(WrapperProperties.HOST.name);
this.sessionStateService = new SessionStateServiceImpl(this, this.props);
container.pluginService = this;

this.dialect = WrapperProperties.CUSTOM_DATABASE_DIALECT.get(this.props) ?? this.dbDialectProvider.getDialect(this.props);
this.sessionStateService = new SessionStateServiceImpl(this, this.props);
}

isInTransaction(): boolean {
Expand Down Expand Up @@ -333,7 +334,6 @@ export class PluginService implements ErrorHandler, HostListProviderService {
this.sessionStateService.begin();

try {
this.getCurrentClient().resetState();
this.getCurrentClient().targetClient = newClient;
this._currentHostInfo = hostInfo;
await this.sessionStateService.applyCurrentSessionState(this.getCurrentClient());
Expand Down Expand Up @@ -432,35 +432,35 @@ export class PluginService implements ErrorHandler, HostListProviderService {
private async updateReadOnly(statements: string[]) {
const updateReadOnly = SqlMethodUtils.doesSetReadOnly(statements, this.getDialect());
if (updateReadOnly !== undefined) {
await this.getCurrentClient().setReadOnly(updateReadOnly);
this.getSessionStateService().setReadOnly(updateReadOnly);
}
}

private async updateAutoCommit(statements: string[]) {
const updateAutoCommit = SqlMethodUtils.doesSetAutoCommit(statements, this.getDialect());
if (updateAutoCommit !== undefined) {
await this.getCurrentClient().setAutoCommit(updateAutoCommit);
this.getSessionStateService().setAutoCommit(updateAutoCommit);
}
}

private async updateCatalog(statements: string[]) {
const updateCatalog = SqlMethodUtils.doesSetCatalog(statements, this.getDialect());
if (updateCatalog !== undefined) {
await this.getCurrentClient().setCatalog(updateCatalog);
this.getSessionStateService().setCatalog(updateCatalog);
}
}

private async updateSchema(statements: string[]) {
const updateSchema = SqlMethodUtils.doesSetSchema(statements, this.getDialect());
if (updateSchema !== undefined) {
await this.getCurrentClient().setSchema(updateSchema);
this.getSessionStateService().setSchema(updateSchema);
}
}

private async updateTransactionIsolation(statements: string[]) {
const updateTransactionIsolation = SqlMethodUtils.doesSetTransactionIsolation(statements, this.getDialect());
if (updateTransactionIsolation !== undefined) {
await this.getCurrentClient().setTransactionIsolation(updateTransactionIsolation);
this.getSessionStateService().setTransactionIsolation(updateTransactionIsolation);
}
}

Expand Down
2 changes: 1 addition & 1 deletion common/lib/plugins/failover/failover_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,9 @@ export class FailoverPlugin extends AbstractConnectionPlugin {
throw new FailoverFailedError(Messages.get("Failover.unableToConnectToReader"));
}

this.pluginService.getCurrentHostInfo()?.removeAlias(Array.from(oldAliases));
await this.pluginService.abortCurrentClient();
await this.pluginService.setCurrentClient(result.client, result.newHost);
this.pluginService.getCurrentHostInfo()?.removeAlias(Array.from(oldAliases));
await this.updateTopology(true);
this.failoverReaderSuccessCounter.inc();
} catch (error: any) {
Expand Down
2 changes: 2 additions & 0 deletions common/lib/pool_client_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ import { ClientWrapper } from "./client_wrapper";
import { HostInfo } from "./host_info";
import { uniqueId } from "../logutils";
import { ClientUtils } from "./utils/client_utils";
import { SessionState } from "./session_state";

export class PoolClientWrapper implements ClientWrapper {
readonly client: any;
readonly hostInfo: HostInfo;
readonly properties: Map<string, string>;
readonly id: string;
readonly sessionState = new SessionState();

constructor(targetClient: any, hostInfo: HostInfo, properties: Map<string, any>) {
this.client = targetClient;
Expand Down
147 changes: 125 additions & 22 deletions common/lib/session_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,28 @@
limitations under the License.
*/

class SessionStateField<Type> {
import { DatabaseDialect } from "./database_dialect/database_dialect";
import { AwsClient } from "./aws_client";

export abstract class SessionStateField<Type> {
value?: Type;
pristineValue?: Type;

constructor(copy?: SessionStateField<Type>) {
if (copy) {
this.value = copy.value;
this.pristineValue = copy.pristineValue;
}
}

abstract setValue(state: SessionState): void;

abstract setPristineValue(state: SessionState): void;

abstract getQuery(dialect: DatabaseDialect, isPristine: boolean): string;

abstract getClientValue(client: AwsClient): Type;

resetValue(): void {
this.value = undefined;
}
Expand Down Expand Up @@ -60,38 +78,123 @@ class SessionStateField<Type> {
return true;
}

copy(): SessionStateField<Type> {
const newField: SessionStateField<Type> = new SessionStateField();
if (this.value !== undefined) {
newField.value = this.value;
}
toString() {
return `${this.pristineValue ?? "(blank)"} => ${this.value ?? "(blank)"}`;
}
}

if (this.pristineValue !== undefined) {
newField.pristineValue = this.pristineValue;
}
class AutoCommitState extends SessionStateField<boolean> {
setValue(state: SessionState) {
this.value = state.autoCommit.value;
}

return newField;
setPristineValue(state: SessionState) {
this.value = state.autoCommit.pristineValue;
}

toString() {
return `${this.pristineValue ?? "(blank)"} => ${this.value ?? "(blank)"}`;
getQuery(dialect: DatabaseDialect, isPristine: boolean = false) {
return dialect.getSetAutoCommitQuery(isPristine ? this.pristineValue : this.value);
}

getClientValue(client: AwsClient): boolean {
return client.getAutoCommit();
}
}

class ReadOnlyState extends SessionStateField<boolean> {
setValue(state: SessionState) {
this.value = state.readOnly.value;
}

setPristineValue(state: SessionState) {
this.value = state.readOnly.pristineValue;
}

getQuery(dialect: DatabaseDialect, isPristine: boolean = false) {
return dialect.getSetReadOnlyQuery(this.value);
}

getClientValue(client: AwsClient): boolean {
return client.isReadOnly();
}
}

class CatalogState extends SessionStateField<string> {
setValue(state: SessionState) {
this.value = state.catalog.value;
}

setPristineValue(state: SessionState) {
this.value = state.catalog.pristineValue;
}

getQuery(dialect: DatabaseDialect, isPristine: boolean = false) {
return dialect.getSetCatalogQuery(isPristine ? this.pristineValue : this.value);
}

getClientValue(client: AwsClient): string {
return client.getCatalog();
}
}

class SchemaState extends SessionStateField<string> {
setValue(state: SessionState) {
this.value = state.schema.value;
}

setPristineValue(state: SessionState) {
this.value = state.schema.pristineValue;
}

getQuery(dialect: DatabaseDialect, isPristine: boolean = false) {
return dialect.getSetSchemaQuery(isPristine ? this.pristineValue : this.value);
}

getClientValue(client: AwsClient): string {
return client.getSchema();
}
}

class TransactionIsolationState extends SessionStateField<number> {
setValue(state: SessionState) {
this.value = state.transactionIsolation.value;
}

setPristineValue(state: SessionState) {
this.value = state.transactionIsolation.pristineValue;
}

getQuery(dialect: DatabaseDialect, isPristine: boolean = false) {
return dialect.getSetTransactionIsolationQuery(isPristine ? this.pristineValue : this.value);
}

getClientValue(client: AwsClient): number {
return client.getTransactionIsolation();
}
}

export class SessionState {
autoCommit: SessionStateField<boolean> = new SessionStateField<boolean>();
readOnly: SessionStateField<boolean> = new SessionStateField<boolean>();
catalog: SessionStateField<string> = new SessionStateField<string>();
schema: SessionStateField<string> = new SessionStateField<string>();
transactionIsolation: SessionStateField<number> = new SessionStateField<number>();
autoCommit: AutoCommitState = new AutoCommitState();
readOnly: ReadOnlyState = new ReadOnlyState();
catalog: CatalogState = new CatalogState();
schema: SchemaState = new SchemaState();
transactionIsolation: TransactionIsolationState = new TransactionIsolationState();

static setState(target: SessionStateField<any>, source: SessionState): void {
target.setValue(source);
}

static setPristineState(target: SessionStateField<any>, source: SessionState): void {
target.setPristineValue(source);
}

copy(): SessionState {
const newSessionState = new SessionState();
newSessionState.autoCommit = this.autoCommit.copy();
newSessionState.readOnly = this.readOnly.copy();
newSessionState.catalog = this.catalog.copy();
newSessionState.schema = this.schema.copy();
newSessionState.transactionIsolation = this.transactionIsolation.copy();
newSessionState.autoCommit = new AutoCommitState(this.autoCommit);
newSessionState.readOnly = new ReadOnlyState(this.readOnly);
newSessionState.catalog = new CatalogState(this.catalog);
newSessionState.schema = new SchemaState(this.schema);
newSessionState.transactionIsolation = new TransactionIsolationState(this.transactionIsolation);

return newSessionState;
}
Expand Down
Loading
Loading