Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TO Discuss - do not merge (yet) - Encapsulate the ConnectionProviderManager class and its usage in PluginService #286

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions common/lib/aws_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,16 @@ export abstract class AwsClient extends EventEmitter {

this.telemetryFactory = new DefaultTelemetryFactory(this.properties);
const container = new PluginServiceManagerContainer();
this.pluginService = new PluginService(container, this, dbType, knownDialectsByCode, this.properties, driverDialect);
this.pluginManager = new PluginManager(
this.pluginService = new PluginService(
container,
this,
dbType,
knownDialectsByCode,
this.properties,
new ConnectionProviderManager(new DriverConnectionProvider(), null),
this.telemetryFactory
driverDialect,
new ConnectionProviderManager(new DriverConnectionProvider(), null)
);
this.pluginManager = new PluginManager(container, this.properties, this.telemetryFactory);
}

private async setup() {
Expand Down
9 changes: 2 additions & 7 deletions common/lib/connection_plugin_chain_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import { OktaAuthPluginFactory } from "./plugins/federated_auth/okta_auth_plugin
import { HostMonitoringPluginFactory } from "./plugins/efm/host_monitoring_plugin_factory";
import { AuroraInitialConnectionStrategyFactory } from "./plugins/aurora_initial_connection_strategy_plugin_factory";
import { AuroraConnectionTrackerPluginFactory } from "./plugins/connection_tracker/aurora_connection_tracker_plugin_factory";
import { ConnectionProviderManager } from "./connection_provider_manager";
import { DeveloperConnectionPluginFactory } from "./plugins/dev/developer_connection_plugin_factory";

/*
Expand Down Expand Up @@ -67,11 +66,7 @@ export class ConnectionPluginChainBuilder {
["executeTime", { factory: ExecuteTimePluginFactory, weight: ConnectionPluginChainBuilder.WEIGHT_RELATIVE_TO_PRIOR_PLUGIN }]
]);

static async getPlugins(
pluginService: PluginService,
props: Map<string, any>,
connectionProviderManager: ConnectionProviderManager
): Promise<ConnectionPlugin[]> {
static async getPlugins(pluginService: PluginService, props: Map<string, any>): Promise<ConnectionPlugin[]> {
const plugins: ConnectionPlugin[] = [];
let pluginCodes: string = props.get(WrapperProperties.PLUGINS.name);
if (pluginCodes == null) {
Expand Down Expand Up @@ -119,7 +114,7 @@ export class ConnectionPluginChainBuilder {
}
}

plugins.push(new DefaultPlugin(pluginService, connectionProviderManager));
plugins.push(new DefaultPlugin(pluginService));

return plugins;
}
Expand Down
2 changes: 1 addition & 1 deletion common/lib/connection_provider_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class ConnectionProviderManager {
}
}

if (this.effectiveProvider?.acceptsStrategy(role, strategy)) {
if (!host && this.effectiveProvider?.acceptsStrategy(role, strategy)) {
try {
host = this.effectiveProvider.getHostInfoByStrategy(hosts, role, strategy, props);
} catch {
Expand Down
24 changes: 3 additions & 21 deletions common/lib/plugin_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import { OldConnectionSuggestionAction } from "./old_connection_suggestion_actio
import { HostRole } from "./host_role";
import { ClientWrapper } from "./client_wrapper";
import { CanReleaseResources } from "./can_release_resources";
import { ConnectionProviderManager } from "./connection_provider_manager";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { TelemetryTraceLevel } from "./utils/telemetry/telemetry_trace_level";
import { ConnectionProvider } from "./connection_provider";
Expand Down Expand Up @@ -71,19 +70,12 @@ export class PluginManager {
private static readonly GET_HOST_INFO_BY_STRATEGY_METHOD: string = "getHostInfoByStrategy";
private readonly props: Map<string, any>;
private _plugins: ConnectionPlugin[] = [];
private readonly connectionProviderManager: ConnectionProviderManager;
private pluginServiceManagerContainer: PluginServiceManagerContainer;
protected telemetryFactory: TelemetryFactory;

constructor(
pluginServiceManagerContainer: PluginServiceManagerContainer,
props: Map<string, any>,
connectionProviderManager: ConnectionProviderManager,
telemetryFactory: TelemetryFactory
) {
constructor(pluginServiceManagerContainer: PluginServiceManagerContainer, props: Map<string, any>, telemetryFactory: TelemetryFactory) {
this.pluginServiceManagerContainer = pluginServiceManagerContainer;
this.pluginServiceManagerContainer.pluginManager = this;
this.connectionProviderManager = connectionProviderManager;
this.props = props;
this.telemetryFactory = telemetryFactory;
}
Expand All @@ -95,11 +87,7 @@ export class PluginManager {
if (plugins) {
this._plugins = plugins;
} else {
this._plugins = await ConnectionPluginChainBuilder.getPlugins(
this.pluginServiceManagerContainer.pluginService,
this.props,
this.connectionProviderManager
);
this._plugins = await ConnectionPluginChainBuilder.getPlugins(this.pluginServiceManagerContainer.pluginService, this.props);
}
}
}
Expand Down Expand Up @@ -274,7 +262,7 @@ export class PluginManager {
return false;
}

getHostInfoByStrategy(role: HostRole, strategy: string): HostInfo {
getHostInfoByStrategy(role: HostRole, strategy: string): HostInfo | undefined {
for (const plugin of this._plugins) {
const pluginSubscribedMethods = plugin.getSubscribedMethods();
const isSubscribed =
Expand All @@ -291,8 +279,6 @@ export class PluginManager {
}
}
}

throw new AwsWrapperError("The driver does not support the requested host selection strategy: " + strategy);
}

async releaseResources() {
Expand All @@ -306,10 +292,6 @@ export class PluginManager {
}
}

getConnectionProvider(hostInfo: HostInfo | null, props: Map<string, any>): ConnectionProvider {
return this.connectionProviderManager.getConnectionProvider(hostInfo, props);
}

private implementsCanReleaseResources(plugin: any): plugin is CanReleaseResources {
return plugin.releaseResources !== undefined;
}
Expand Down
33 changes: 26 additions & 7 deletions common/lib/plugin_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { getWriter } from "./utils/utils";
import { ConnectionProvider } from "./connection_provider";
import { TelemetryFactory } from "./utils/telemetry/telemetry_factory";
import { DriverDialect } from "./driver_dialect/driver_dialect";
import { ConnectionProviderManager } from "./connection_provider_manager";

export class PluginService implements ErrorHandler, HostListProviderService {
private readonly _currentClient: AwsClient;
Expand All @@ -52,6 +53,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
private _initialConnectionHostInfo?: HostInfo;
private _isInTransaction: boolean = false;
private pluginServiceManagerContainer: PluginServiceManagerContainer;
private readonly connectionProviderManager: ConnectionProviderManager;
protected hosts: HostInfo[] = [];
private dbDialectProvider: DatabaseDialectProvider;
private readonly initialHost: string;
Expand All @@ -67,7 +69,8 @@ export class PluginService implements ErrorHandler, HostListProviderService {
dbType: DatabaseType,
knownDialectsByCode: Map<DatabaseDialectCodes, DatabaseDialect>,
props: Map<string, any>,
driverDialect: DriverDialect
driverDialect: DriverDialect,
connectionProviderManager: ConnectionProviderManager
) {
this._currentClient = client;
this.pluginServiceManagerContainer = container;
Expand All @@ -76,6 +79,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
this.driverDialect = driverDialect;
this.initialHost = props.get(WrapperProperties.HOST.name);
this.sessionStateService = new SessionStateServiceImpl(this, this.props);
this.connectionProviderManager = connectionProviderManager;
container.pluginService = this;

this.dialect = this.dbDialectProvider.getDialect(this.props);
Expand Down Expand Up @@ -114,8 +118,23 @@ export class PluginService implements ErrorHandler, HostListProviderService {
}

getHostInfoByStrategy(role: HostRole, strategy: string): HostInfo | undefined {
if (role === HostRole.UNKNOWN) {
logger.debug("unknown role requested"); // TODO provide message using Messages.get was: DefaultConnectionPlugin.unknownRoleRequested -
return;
}

const pluginManager = this.pluginServiceManagerContainer.pluginManager;
return pluginManager?.getHostInfoByStrategy(role, strategy);
const host = pluginManager?.getHostInfoByStrategy(role, strategy);
if (host) {
return host;
}

if (this.getHosts().length === 0) {
logger.debug("no Hosts Available"); // TODO provide message using Messages.get was: DefaultConnectionPlugin.noHostsAvailable
return;
}

return this.connectionProviderManager.getHostInfoByStrategy(this.getHosts(), role, strategy, this.props);
}

getCurrentHostInfo(): HostInfo | null {
Expand Down Expand Up @@ -152,10 +171,7 @@ export class PluginService implements ErrorHandler, HostListProviderService {
}

getConnectionProvider(hostInfo: HostInfo | null, props: Map<string, any>): ConnectionProvider {
if (!this.pluginServiceManagerContainer.pluginManager) {
throw new AwsWrapperError("Plugin manager should not be undefined");
}
return this.pluginServiceManagerContainer.pluginManager.getConnectionProvider(hostInfo, props);
return this.connectionProviderManager.getConnectionProvider(hostInfo, props);
}

getDialect(): DatabaseDialect {
Expand All @@ -175,7 +191,10 @@ export class PluginService implements ErrorHandler, HostListProviderService {
}

acceptsStrategy(role: HostRole, strategy: string): boolean {
return this.pluginServiceManagerContainer.pluginManager?.acceptsStrategy(role, strategy) ?? false;
return (
(this.pluginServiceManagerContainer.pluginManager?.acceptsStrategy(role, strategy) ?? false) ||
this.connectionProviderManager.acceptsStrategy(role, strategy)
);
}

async forceRefreshHostList(): Promise<void>;
Expand Down
34 changes: 4 additions & 30 deletions common/lib/plugins/default_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,23 @@ import { HostInfo } from "../host_info";
import { AbstractConnectionPlugin } from "../abstract_connection_plugin";
import { HostChangeOptions } from "../host_change_options";
import { OldConnectionSuggestionAction } from "../old_connection_suggestion_action";
import { HostRole } from "../host_role";
import { PluginService } from "../plugin_service";
import { ConnectionProviderManager } from "../connection_provider_manager";
import { ConnectionProvider } from "../connection_provider";
import { AwsWrapperError } from "../utils/errors";
import { HostAvailability } from "../host_availability/host_availability";
import { ClientWrapper } from "../client_wrapper";
import { TelemetryTraceLevel } from "../utils/telemetry/telemetry_trace_level";

export class DefaultPlugin extends AbstractConnectionPlugin {
id: string = uniqueId("_defaultPlugin");
private readonly pluginService: PluginService;
private readonly connectionProviderManager: ConnectionProviderManager;

constructor(pluginService: PluginService, connectionProviderManager: ConnectionProviderManager) {
constructor(pluginService: PluginService) {
super();
this.pluginService = pluginService;
this.connectionProviderManager = connectionProviderManager;
}

override getSubscribedMethods(): Set<string> {
return new Set<string>(["*"]);
return new Set<string>(["*"]); // TODO verify Subscribed Methods
}

override async forceConnect<Type>(
Expand All @@ -52,7 +47,7 @@ export class DefaultPlugin extends AbstractConnectionPlugin {
isInitialConnection: boolean,
forceConnectFunc: () => Promise<ClientWrapper>
): Promise<ClientWrapper> {
return await this.connectInternal(hostInfo, props, this.connectionProviderManager.getConnectionProvider(hostInfo, props));
return await this.connectInternal(hostInfo, props, this.pluginService.getConnectionProvider(hostInfo, props));
}

override initHostProvider(
Expand All @@ -70,7 +65,7 @@ export class DefaultPlugin extends AbstractConnectionPlugin {
isInitialConnection: boolean,
connectFunc: () => Promise<ClientWrapper>
): Promise<ClientWrapper> {
return await this.connectInternal(hostInfo, props, this.connectionProviderManager.getConnectionProvider(hostInfo, props));
return await this.connectInternal(hostInfo, props, this.pluginService.getConnectionProvider(hostInfo, props));
}

private async connectInternal(hostInfo: HostInfo, props: Map<string, any>, connProvider: ConnectionProvider): Promise<ClientWrapper> {
Expand Down Expand Up @@ -105,25 +100,4 @@ export class DefaultPlugin extends AbstractConnectionPlugin {
override notifyHostListChanged(changes: Map<string, Set<HostChangeOptions>>): Promise<void> {
return Promise.resolve();
}

override acceptsStrategy(role: HostRole, strategy: string): boolean {
if (role === HostRole.UNKNOWN) {
// Users must request either a writer or a reader role.
return false;
}
return this.connectionProviderManager.acceptsStrategy(role, strategy);
}

override getHostInfoByStrategy(role: HostRole, strategy: string): HostInfo {
if (role === HostRole.UNKNOWN) {
throw new AwsWrapperError(Messages.get("DefaultConnectionPlugin.unknownRoleRequested"));
}

const hosts = this.pluginService.getHosts();
if (hosts.length < 1) {
throw new AwsWrapperError(Messages.get("DefaultConnectionPlugin.noHostsAvailable"));
}

return this.connectionProviderManager.getHostInfoByStrategy(hosts, role, strategy, this.pluginService.props);
}
}
24 changes: 3 additions & 21 deletions tests/plugin_benchmarks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

import { anything, instance, mock, when } from "ts-mockito";
import { ConnectionProvider } from "../common/lib/connection_provider";
import { PluginService } from "../common/lib/plugin_service";
import { PluginServiceManagerContainer } from "../common/lib/plugin_service_manager_container";
import { WrapperProperties } from "../common/lib/wrapper_property";
Expand All @@ -26,10 +25,8 @@ import { HostInfoBuilder } from "../common/lib/host_info_builder";
import { SimpleHostAvailabilityStrategy } from "../common/lib/host_availability/simple_host_availability_strategy";
import { AwsPGClient } from "../pg/lib";
import { NullTelemetryFactory } from "../common/lib/utils/telemetry/null_telemetry_factory";
import { ConnectionProviderManager } from "../common/lib/connection_provider_manager";
import { PgClientWrapper } from "../common/lib/pg_client_wrapper";

const mockConnectionProvider = mock<ConnectionProvider>();
const mockPluginService = mock(PluginService);
const mockClient = mock(AwsPGClient);

Expand Down Expand Up @@ -59,24 +56,9 @@ WrapperProperties.HOST.set(propsExecute, connectionString);
WrapperProperties.HOST.set(propsReadWrite, connectionString);
WrapperProperties.HOST.set(props, connectionString);

const pluginManagerExecute = new PluginManager(
pluginServiceManagerContainer,
propsExecute,
new ConnectionProviderManager(instance(mockConnectionProvider), null),
telemetryFactory
);
const pluginManagerReadWrite = new PluginManager(
pluginServiceManagerContainer,
propsReadWrite,
new ConnectionProviderManager(instance(mockConnectionProvider), null),
telemetryFactory
);
const pluginManager = new PluginManager(
pluginServiceManagerContainer,
props,
new ConnectionProviderManager(instance(mockConnectionProvider), null),
new NullTelemetryFactory()
);
const pluginManagerExecute = new PluginManager(pluginServiceManagerContainer, propsExecute, telemetryFactory);
const pluginManagerReadWrite = new PluginManager(pluginServiceManagerContainer, propsReadWrite, telemetryFactory);
const pluginManager = new PluginManager(pluginServiceManagerContainer, props, new NullTelemetryFactory());

suite(
"Plugin benchmarks",
Expand Down
31 changes: 5 additions & 26 deletions tests/plugin_manager_benchmarks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import { WrapperProperties } from "../common/lib/wrapper_property";
import { DefaultPlugin } from "../common/lib/plugins/default_plugin";
import { BenchmarkPluginFactory } from "./testplugin/benchmark_plugin_factory";
import { NullTelemetryFactory } from "../common/lib/utils/telemetry/null_telemetry_factory";
import { ConnectionProviderManager } from "../common/lib/connection_provider_manager";
import { PgDatabaseDialect } from "../pg/lib/dialect/pg_database_dialect";
import { NodePostgresDriverDialect } from "../pg/lib/dialect/node_postgres_driver_dialect";

Expand All @@ -48,25 +47,15 @@ const propsWithPlugins = new Map<string, any>();

WrapperProperties.PLUGINS.set(propsWithNoPlugins, "");

const pluginManagerWithNoPlugins = new PluginManager(
pluginServiceManagerContainer,
propsWithNoPlugins,
new ConnectionProviderManager(instance(mockConnectionProvider), null),
telemetryFactory
);
const pluginManagerWithPlugins = new PluginManager(
pluginServiceManagerContainer,
propsWithPlugins,
new ConnectionProviderManager(instance(mockConnectionProvider), null),
telemetryFactory
);
const pluginManagerWithNoPlugins = new PluginManager(pluginServiceManagerContainer, propsWithNoPlugins, telemetryFactory);
const pluginManagerWithPlugins = new PluginManager(pluginServiceManagerContainer, propsWithPlugins, telemetryFactory);

async function createPlugins(pluginService: PluginService, connectionProvider: ConnectionProvider, props: Map<string, any>) {
const plugins = new Array<ConnectionPlugin>();
for (let i = 0; i < 10; i++) {
plugins.push(await new BenchmarkPluginFactory().getInstance(pluginService, props));
}
plugins.push(new DefaultPlugin(pluginService, new ConnectionProviderManager(instance(mockConnectionProvider), null)));
plugins.push(new DefaultPlugin(pluginService));
return plugins;
}

Expand All @@ -80,22 +69,12 @@ suite(
}),

add("initPluginManagerWithPlugins", async () => {
const manager = new PluginManager(
pluginServiceManagerContainer,
propsWithPlugins,
new ConnectionProviderManager(instance(mockConnectionProvider), null),
new NullTelemetryFactory()
);
const manager = new PluginManager(pluginServiceManagerContainer, propsWithPlugins, new NullTelemetryFactory());
await manager.init(await createPlugins(instance(mockPluginService), instance(mockConnectionProvider), propsWithPlugins));
}),

add("initPluginManagerWithNoPlugins", async () => {
const manager = new PluginManager(
pluginServiceManagerContainer,
propsWithNoPlugins,
new ConnectionProviderManager(instance(mockConnectionProvider), null),
new NullTelemetryFactory()
);
const manager = new PluginManager(pluginServiceManagerContainer, propsWithNoPlugins, new NullTelemetryFactory());
await manager.init();
}),

Expand Down
Loading