Skip to content

Commit

Permalink
Fix memory leak in translating logical addresses to SQL queue address…
Browse files Browse the repository at this point in the history
…es (#1281)
  • Loading branch information
DavidBoike authored Feb 1, 2024
1 parent ca80244 commit 1efe66c
Showing 1 changed file with 28 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ public QueueAddressTranslator(string defaultCatalog, string defaultSchema, strin

public QueueAddress Generate(Transport.QueueAddress queueAddress)
{
return logicalAddressCache.GetOrAdd(queueAddress, TranslateLogicalAddress);
// AddressKey has a GetHashCode implementation and can be used as a dictionary key, Transport.QueueAddress (from Core) does not (at this time)
var key = AddressKey.Create(queueAddress);
return logicalAddressCache.GetOrAdd(key, key => key.ToSqlQueueAddress());
}

public CanonicalQueueAddress Parse(string address)
Expand Down Expand Up @@ -50,32 +52,37 @@ static string Override(string configuredValue, string addressValue, string defau
return configuredValue ?? addressValue ?? defaultValue;
}

public static QueueAddress TranslateLogicalAddress(Transport.QueueAddress queueAddress)
QueueSchemaAndCatalogOptions queueOptions;
ConcurrentDictionary<string, CanonicalQueueAddress> physicalAddressCache = new();
ConcurrentDictionary<AddressKey, QueueAddress> logicalAddressCache = new();

record struct AddressKey(string BaseAddress, string Discriminator, string Qualifier, string Schema, string Catalog)
{
var nonEmptyParts = new[]
public static AddressKey Create(Transport.QueueAddress a)
{
queueAddress.BaseAddress,
queueAddress.Qualifier,
queueAddress.Discriminator
}.Where(p => !string.IsNullOrEmpty(p));

var tableName = string.Join(".", nonEmptyParts);
string schema = null;
string catalog = null;
if (a.Properties is not null)
{
a.Properties.TryGetValue(SettingsKeys.SchemaPropertyKey, out schema);
a.Properties.TryGetValue(SettingsKeys.CatalogPropertyKey, out catalog);
}
return new AddressKey(a.BaseAddress, a.Discriminator, a.Qualifier, schema, catalog);
}

public QueueAddress ToSqlQueueAddress()
{
var nonEmptyParts = new[]
{
BaseAddress,
Qualifier,
Discriminator
}.Where(p => !string.IsNullOrEmpty(p));

string schemaName = null;
string catalogName = null;
var tableName = string.Join(".", nonEmptyParts);

if (queueAddress.Properties != null)
{
queueAddress?.Properties.TryGetValue(SettingsKeys.SchemaPropertyKey, out schemaName);
queueAddress?.Properties.TryGetValue(SettingsKeys.CatalogPropertyKey, out catalogName);
return new QueueAddress(tableName, Schema, Catalog);
}

return new QueueAddress(tableName, schemaName, catalogName);
}

QueueSchemaAndCatalogOptions queueOptions;
ConcurrentDictionary<string, CanonicalQueueAddress> physicalAddressCache = new ConcurrentDictionary<string, CanonicalQueueAddress>();
ConcurrentDictionary<Transport.QueueAddress, QueueAddress> logicalAddressCache = new ConcurrentDictionary<Transport.QueueAddress, QueueAddress>();
}
}

0 comments on commit 1efe66c

Please sign in to comment.