Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
crystall-bitquill committed Nov 22, 2024
1 parent 574c5c0 commit f30dc78
Show file tree
Hide file tree
Showing 18 changed files with 267 additions and 143 deletions.
3 changes: 3 additions & 0 deletions common/lib/aws_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { ConnectionProviderManager } from "./connection_provider_manager";
import { DefaultTelemetryFactory } from "./utils/telemetry/default_telemetry_factory";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { DriverDialect } from "./driver_dialect/driver_dialect";
import { SessionStateService } from "./session_state_service";

export abstract class AwsClient extends EventEmitter {
private _defaultPort: number = -1;
Expand All @@ -39,6 +40,7 @@ export abstract class AwsClient extends EventEmitter {
protected isConnected: boolean = false;
protected _connectionUrlParser: ConnectionUrlParser;
readonly properties: Map<string, any>;
readonly sessionStateService: SessionStateService;
config: any;
targetClient?: ClientWrapper;

Expand All @@ -64,6 +66,7 @@ export abstract class AwsClient extends EventEmitter {
new ConnectionProviderManager(new DriverConnectionProvider(), null),
this.telemetryFactory
);
this.sessionStateService = this.pluginService.getSessionStateService();
}

private async setup() {
Expand Down
16 changes: 13 additions & 3 deletions common/lib/database_dialect/database_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,30 @@ import { HostListProvider } from "../host_list_provider/host_list_provider";
import { HostListProviderService } from "../host_list_provider_service";
import { ClientWrapper } from "../client_wrapper";
import { FailoverRestriction } from "../plugins/failover/failover_restriction";
import { AwsPoolClient } from "../aws_pool_client";
import { AwsPoolConfig } from "../aws_pool_config";
import { ErrorHandler } from "../error_handler";
import { SessionState } from "../session_state";

export enum DatabaseType {
MYSQL,
POSTGRES
}

export interface DatabaseDialect {
readonly defaultAutoCommit?: boolean;
readonly defaultReadOnly?: boolean;
readonly defaultTransactionIsolation?: number;
readonly defaultCatalog?: string;
readonly defaultSchema?: string;

getDefaultPort(): number;
getHostAliasQuery(): string;
getHostAliasAndParseResults(targetClient: ClientWrapper): Promise<string>;
getServerVersionQuery(): string;
getSetReadOnlyQuery(readOnly: boolean): string
getSetReadOnlyQuery(readOnly: boolean): string;
getSetAutoCommitQuery(autoCommit: boolean): string;
getSetTransactionIsolationQuery(level: number): string;
getSetCatalogQuery(catalog: string): string;
getSetSchemaQuery(schema: string): string;
getDialectUpdateCandidates(): string[];
getErrorHandler(): ErrorHandler;
isDialect(targetClient: ClientWrapper): Promise<boolean>;
Expand All @@ -46,4 +55,5 @@ export interface DatabaseDialect {
doesStatementSetAutoCommit(statement: string): boolean | undefined;
doesStatementSetSchema(statement: string): string | undefined;
doesStatementSetCatalog(statement: string): string | undefined;
setDefaultSessionState(sessionState: SessionState): void;
}
2 changes: 2 additions & 0 deletions common/lib/driver_dialect/driver_dialect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import { ClientWrapper } from "../client_wrapper";
import { AwsPoolConfig } from "../aws_pool_config";
import { AwsPoolClient } from "../aws_pool_client";
import { HostInfo } from "../host_info";
import { PluginService } from "../plugin_service";
import { SessionState } from "../session_state";

export interface DriverDialect {
getDialectName(): string;
Expand Down
1 change: 1 addition & 0 deletions common/lib/mysql_client_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export class MySQLClientWrapper implements ClientWrapper {
this.hostInfo = hostInfo;
this.properties = properties;
this.id = uniqueId("MySQLClient_");

this.setSessionStateDefault();
}

Expand Down
2 changes: 1 addition & 1 deletion common/lib/pg_client_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ export class PgClientWrapper implements ClientWrapper {
this.client = targetClient;
this.hostInfo = hostInfo;
this.properties = properties;
this.sessionState = new SessionState();
this.id = uniqueId("PgClient_");
this.setSessionStateDefault();
}

query(sql: any): Promise<any> {
Expand Down
14 changes: 7 additions & 7 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import { DatabaseDialectCodes } from "./database_dialect/database_dialect_codes"
import { getWriter } from "./utils/utils";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { DriverDialect } from "./driver_dialect/driver_dialect";
import { SessionState } from "./session_state";

export class PluginService implements ErrorHandler, HostListProviderService {
private readonly _currentClient: AwsClient;
Expand Down Expand Up @@ -74,10 +75,9 @@ export class PluginService implements ErrorHandler, HostListProviderService {
this.dbDialectProvider = new DatabaseDialectManager(knownDialectsByCode, dbType, this.props);
this.driverDialect = driverDialect;
this.initialHost = props.get(WrapperProperties.HOST.name);
this.sessionStateService = new SessionStateServiceImpl(this, this.props);
container.pluginService = this;

this.dialect = this.dbDialectProvider.getDialect(this.props);
this.sessionStateService = new SessionStateServiceImpl(this, this.props);
}

isInTransaction(): boolean {
Expand Down Expand Up @@ -430,35 +430,35 @@ export class PluginService implements ErrorHandler, HostListProviderService {
private async updateReadOnly(statements: string[]) {
const updateReadOnly = SqlMethodUtils.doesSetReadOnly(statements, this.getDialect());
if (updateReadOnly !== undefined) {
await this.getCurrentClient().setReadOnly(updateReadOnly);
this.getSessionStateService().setReadOnly(updateReadOnly);
}
}

private async updateAutoCommit(statements: string[]) {
const updateAutoCommit = SqlMethodUtils.doesSetAutoCommit(statements, this.getDialect());
if (updateAutoCommit !== undefined) {
await this.getCurrentClient().setAutoCommit(updateAutoCommit);
this.getSessionStateService().setAutoCommit(updateAutoCommit);
}
}

private async updateCatalog(statements: string[]) {
const updateCatalog = SqlMethodUtils.doesSetCatalog(statements, this.getDialect());
if (updateCatalog !== undefined) {
await this.getCurrentClient().setCatalog(updateCatalog);
this.getSessionStateService().setCatalog(updateCatalog);
}
}

private async updateSchema(statements: string[]) {
const updateSchema = SqlMethodUtils.doesSetSchema(statements, this.getDialect());
if (updateSchema !== undefined) {
await this.getCurrentClient().setSchema(updateSchema);
this.getSessionStateService().setSchema(updateSchema);
}
}

private async updateTransactionIsolation(statements: string[]) {
const updateTransactionIsolation = SqlMethodUtils.doesSetTransactionIsolation(statements, this.getDialect());
if (updateTransactionIsolation !== undefined) {
await this.getCurrentClient().setTransactionIsolation(updateTransactionIsolation);
this.getSessionStateService().setTransactionIsolation(updateTransactionIsolation);
}
}

Expand Down
36 changes: 24 additions & 12 deletions common/lib/session_state_service_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { PluginService } from "./plugin_service";
import { AwsWrapperError, UnsupportedMethodError } from "./utils/errors";
import { logger } from "../logutils";
import { SessionStateTransferHandler } from "./session_state_transfer_handler";
import { DatabaseDialect } from "./database_dialect/database_dialect";

export class SessionStateServiceImpl implements SessionStateService {
protected sessionState: SessionState;
Expand Down Expand Up @@ -61,7 +62,8 @@ export class SessionStateServiceImpl implements SessionStateService {
this.sessionState.autoCommit.resetPristineValue();
this.setupPristineAutoCommit();
try {
await newClient.setAutoCommit(this.sessionState.autoCommit.value);
await newClient.targetClient.query(this.pluginService.getDialect().getSetAutoCommitQuery(this.sessionState.autoCommit.value));
this.setAutoCommit(this.sessionState.autoCommit.value);
} catch (error: any) {
if (error instanceof UnsupportedMethodError) {
// ignore
Expand All @@ -75,7 +77,7 @@ export class SessionStateServiceImpl implements SessionStateService {
this.setupPristineReadOnly();
try {
await newClient.targetClient.query(this.pluginService.getDialect().getSetReadOnlyQuery(this.sessionState.readOnly.value));
this.updateReadOnly(this.sessionState.readOnly.value);
this.setReadOnly(this.sessionState.readOnly.value);
} catch (error: any) {
if (error instanceof UnsupportedMethodError) {
// ignore
Expand All @@ -88,7 +90,8 @@ export class SessionStateServiceImpl implements SessionStateService {
this.sessionState.catalog.resetPristineValue();
this.setupPristineCatalog();
try {
await newClient.setCatalog(this.sessionState.catalog.value);
await newClient.targetClient.query(this.pluginService.getDialect().getSetCatalogQuery(this.sessionState.catalog.value));
this.setCatalog(this.sessionState.catalog.value);
} catch (error: any) {
if (error instanceof UnsupportedMethodError) {
// ignore
Expand All @@ -101,7 +104,8 @@ export class SessionStateServiceImpl implements SessionStateService {
this.sessionState.schema.resetPristineValue();
this.setupPristineSchema();
try {
await newClient.setSchema(this.sessionState.schema.value);
await newClient.targetClient.query(this.pluginService.getDialect().getSetSchemaQuery(this.sessionState.schema.value));
this.setSchema(this.sessionState.schema.value);
} catch (error: any) {
if (error instanceof UnsupportedMethodError) {
// ignore
Expand All @@ -114,7 +118,10 @@ export class SessionStateServiceImpl implements SessionStateService {
this.sessionState.transactionIsolation.resetPristineValue();
this.setupPristineTransactionIsolation();
try {
await newClient.setTransactionIsolation(this.sessionState.transactionIsolation.value);
await newClient.targetClient.query(
this.pluginService.getDialect().getSetTransactionIsolationQuery(this.sessionState.transactionIsolation.value)
);
this.setTransactionIsolation(this.sessionState.transactionIsolation.value);
} catch (error: any) {
if (error instanceof UnsupportedMethodError) {
// ignore
Expand All @@ -140,7 +147,8 @@ export class SessionStateServiceImpl implements SessionStateService {

if (this.copySessionState?.autoCommit.canRestorePristine() && this.copySessionState?.autoCommit.pristineValue !== undefined) {
try {
await client.setAutoCommit(this.copySessionState?.autoCommit.pristineValue);
await client.targetClient.query(this.pluginService.getDialect().getSetAutoCommitQuery(this.sessionState.autoCommit.pristineValue));
this.setAutoCommit(this.sessionState.autoCommit.pristineValue);
} catch (error: any) {
if (error instanceof UnsupportedMethodError) {
// ignore
Expand All @@ -151,8 +159,8 @@ export class SessionStateServiceImpl implements SessionStateService {

if (this.copySessionState?.readOnly.canRestorePristine() && this.copySessionState?.readOnly.pristineValue !== undefined) {
try {
await client.targetClient.query(this.pluginService.getDialect().getSetReadOnlyQuery(this.sessionState.readOnly.value));
this.updateReadOnly(this.sessionState.readOnly.value);
await client.targetClient.query(this.pluginService.getDialect().getSetReadOnlyQuery(this.sessionState.readOnly.pristineValue));
this.setReadOnly(this.sessionState.readOnly.pristineValue);
} catch (error: any) {
if (error instanceof UnsupportedMethodError) {
// ignore
Expand All @@ -163,7 +171,8 @@ export class SessionStateServiceImpl implements SessionStateService {

if (this.copySessionState?.catalog.canRestorePristine() && this.copySessionState?.catalog.pristineValue !== undefined) {
try {
await client.setCatalog(this.copySessionState?.catalog.pristineValue);
await client.targetClient.query(this.pluginService.getDialect().getSetCatalogQuery(this.sessionState.catalog.pristineValue));
this.setCatalog(this.sessionState.catalog.pristineValue);
} catch (error: any) {
if (error instanceof UnsupportedMethodError) {
// ignore
Expand All @@ -174,7 +183,8 @@ export class SessionStateServiceImpl implements SessionStateService {

if (this.copySessionState?.schema.canRestorePristine() && this.copySessionState?.schema.pristineValue !== undefined) {
try {
await client.setSchema(this.copySessionState?.schema.pristineValue);
await client.targetClient.query(this.pluginService.getDialect().getSetSchemaQuery(this.sessionState.schema.pristineValue));
this.setSchema(this.sessionState.schema.pristineValue);
} catch (error: any) {
if (error instanceof UnsupportedMethodError) {
// ignore
Expand All @@ -185,7 +195,10 @@ export class SessionStateServiceImpl implements SessionStateService {

if (this.copySessionState?.transactionIsolation.canRestorePristine() && this.copySessionState?.transactionIsolation.pristineValue !== undefined) {
try {
await client.setTransactionIsolation(this.copySessionState?.transactionIsolation.pristineValue);
await client.targetClient.query(
this.pluginService.getDialect().getSetTransactionIsolationQuery(this.sessionState.transactionIsolation.pristineValue)
);
this.setTransactionIsolation(this.sessionState.transactionIsolation.pristineValue);
} catch (error: any) {
if (error instanceof UnsupportedMethodError) {
// ignore
Expand Down Expand Up @@ -289,7 +302,6 @@ export class SessionStateServiceImpl implements SessionStateService {
}

updateReadOnly(readOnly: boolean): void {
this.pluginService.getCurrentClient().targetClient.sessionState.readOnly.value = readOnly;
this.pluginService.getSessionStateService().setupPristineReadOnly();
this.pluginService.getSessionStateService().setReadOnly(readOnly);
}
Expand Down
41 changes: 15 additions & 26 deletions mysql/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export class AwsMySQLClient extends AwsClient {
});
}

private async readOnlyQuery(options: QueryOptions, callback?: any): Promise<Query> {
private async queryWithoutUpdate(options: QueryOptions, callback?: any): Promise<Query> {
const host = this.pluginService.getCurrentHostInfo();

return this.pluginManager.execute(
Expand All @@ -94,7 +94,9 @@ export class AwsMySQLClient extends AwsClient {
}

async setReadOnly(readOnly: boolean): Promise<Query | void> {
const result = await this.readOnlyQuery({ sql: `SET SESSION TRANSACTION READ ${readOnly ? "ONLY" : "WRITE"}` });
this.pluginService.getSessionStateService().setupPristineReadOnly();
const result = await this.queryWithoutUpdate({ sql: `SET SESSION TRANSACTION READ ${readOnly ? "ONLY" : "WRITE"}` });
this.targetClient.sessionState.readOnly.value = readOnly;
this.pluginService.getSessionStateService().updateReadOnly(readOnly);
return result;
}
Expand All @@ -104,35 +106,27 @@ export class AwsMySQLClient extends AwsClient {
}

async setAutoCommit(autoCommit: boolean): Promise<Query | void> {
if (autoCommit === this.getAutoCommit()) {
return;
}

this.pluginService.getSessionStateService().setupPristineAutoCommit();
this.pluginService.getSessionStateService().setAutoCommit(autoCommit);

this.targetClient.sessionState.autoCommit.value = autoCommit;
let setting = "1";
if (!autoCommit) {
setting = "0";
}
return await this.query({ sql: `SET AUTOCOMMIT=${setting}` });
const result = await this.queryWithoutUpdate({ sql: `SET AUTOCOMMIT=${setting}` });
this.targetClient.sessionState.autoCommit.value = autoCommit;
this.pluginService.getSessionStateService().setAutoCommit(autoCommit);
return result;
}

getAutoCommit(): boolean {
return this.targetClient.sessionState.autoCommit.value;
}

async setCatalog(catalog: string): Promise<Query | void> {
if (catalog === this.getCatalog()) {
return;
}

this.pluginService.getSessionStateService().setupPristineCatalog();
this.pluginService.getSessionStateService().setCatalog(catalog);

await this.queryWithoutUpdate({ sql: `USE ${catalog}` });
this.targetClient.sessionState.catalog.value = catalog;
await this.query({ sql: `USE ${catalog}` });
this.pluginService.getSessionStateService().setCatalog(catalog);
}

getCatalog(): string {
Expand All @@ -148,32 +142,27 @@ export class AwsMySQLClient extends AwsClient {
}

async setTransactionIsolation(level: TransactionIsolationLevel): Promise<Query | void> {
if (level === this.getTransactionIsolation()) {
return;
}

this.pluginService.getSessionStateService().setupPristineTransactionIsolation();
this.pluginService.getSessionStateService().setTransactionIsolation(level);

this.targetClient.sessionState.transactionIsolation.value = level;
switch (level) {
case 0:
await this.query({ sql: "SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED" });
await this.queryWithoutUpdate({ sql: "SET SESSION TRANSACTION ISOLATION LEVEL READ UNCOMMITTED" });
break;
case 1:
await this.query({ sql: "SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED" });
await this.queryWithoutUpdate({ sql: "SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED" });
break;
case 2:
await this.query({ sql: "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ" });
await this.queryWithoutUpdate({ sql: "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ" });
break;
case 3:
await this.query({ sql: "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE" });
await this.queryWithoutUpdate({ sql: "SET SESSION TRANSACTION ISOLATION LEVEL SERIALIZABLE" });
break;
default:
throw new AwsWrapperError(Messages.get("Client.invalidTransactionIsolationLevel", String(level)));
}

this.targetClient.sessionState.transactionIsolation.value = level;
this.pluginService.getSessionStateService().setTransactionIsolation(level);
}

getTransactionIsolation(): number {
Expand Down
Loading

0 comments on commit f30dc78

Please sign in to comment.