Skip to content

Commit

Permalink
framework/db: introduce a new MySQL table based distributed lock
Browse files Browse the repository at this point in the history
This introduces a MySQL innodb table based distributed lock which can
be used by one or more management server and its threads. This removes
usage of MySQL server provided locking functions (GET_LOCK,
RELEASE_LOCK) which are not replicated or supported currently by any
MySQL clustering solutions. This would be the first main step in having
CloudStack to work with a MySQL clustering solution such as InnoDB
cluster, Percona Xtradb cluster, MariaDB galera cluster. There may be
other changes required which can be found in due course if this feature
works at scale.

Signed-off-by: Rohit Yadav <[email protected]>
  • Loading branch information
rohityadavcloud committed Apr 26, 2024
1 parent f965118 commit 9830bbe
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import com.cloud.utils.FileUtil;
import org.apache.cloudstack.utils.CloudStackVersion;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -398,6 +399,7 @@ protected void executeViewScripts() {

@Override
public void check() {
initDistributedLock();
GlobalLock lock = GlobalLock.getInternLock("DatabaseUpgrade");
try {
s_logger.info("Grabbing lock to check for database upgrade.");
Expand Down Expand Up @@ -441,6 +443,39 @@ public void check() {
}
}

private void initDistributedLock() {
s_logger.info("Setting up distributed lock table if not created.");
TransactionLegacy txn = TransactionLegacy.open("initDistributedLock");
txn.start();
String errorMessage = "Unable to get the database connections";
try {
Connection conn = txn.getConnection();
errorMessage = "Unable to create distributed_lock table in the 'cloud' database ";
String sql = "CREATE TABLE IF NOT EXISTS `cloud`.`distributed_lock` (" +
" `name` varchar(1024) NOT NULL," +
" `thread` varchar(1024) NOT NULL," +
" `ms_id` bigint NOT NULL, `pid` int NOT NULL," +
" `created` datetime DEFAULT NULL," +
" PRIMARY KEY (`name`)," +
" UNIQUE KEY `name` (`name`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8";
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.execute();
}
try (PreparedStatement pstmt = conn.prepareStatement("DELETE FROM cloud.distributed_lock WHERE ms_id=?")) {
pstmt.setLong(1, ManagementServerNode.getManagementServerId());
pstmt.execute();
}
txn.commit();
} catch (CloudRuntimeException | SQLException e) {
s_logger.error(e.getMessage());
errorMessage = String.format("%s due to %s.", errorMessage, e.getMessage());
throw new CloudRuntimeException(errorMessage, e);
} finally {
txn.close();
}
}

private void initializeDatabaseEncryptors() {
TransactionLegacy txn = TransactionLegacy.open("initializeDatabaseEncryptors");
txn.start();
Expand Down
59 changes: 32 additions & 27 deletions framework/db/src/main/java/com/cloud/utils/db/DbUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import javax.persistence.Table;
import javax.persistence.Transient;

import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.log4j.Logger;

import static com.cloud.utils.AutoCloseableUtil.closeAutoCloseable;
Expand Down Expand Up @@ -198,28 +199,36 @@ public static final String getTableName(Class<?> clazz) {
public static boolean getGlobalLock(String name, int timeoutSeconds) {
Connection conn = getConnectionForGlobalLocks(name, true);
if (conn == null) {
LOGGER.error("Unable to acquire DB connection for global lock system");
LOGGER.error("Unable to acquire DB connection for distributed lock: " + name);
return false;
}

try (PreparedStatement pstmt = conn.prepareStatement("SELECT COALESCE(GET_LOCK(?, ?),0)");) {
pstmt.setString(1, name);
pstmt.setInt(2, timeoutSeconds);

try (ResultSet rs = pstmt.executeQuery();) {
if (rs != null && rs.first()) {
if (rs.getInt(1) > 0) {
return true;
} else {
if (LOGGER.isDebugEnabled())
LOGGER.debug("GET_LOCK() timed out on lock : " + name);
}
int remainingTime = timeoutSeconds;
while (remainingTime > 0) {
try (PreparedStatement pstmt = conn.prepareStatement(
"INSERT INTO cloud.distributed_lock (name, thread, ms_id, pid, created) " +
"VALUES (?, ?, ?, ?, now()) ON DUPLICATE KEY UPDATE name=name")) {
pstmt.setString(1, name);
pstmt.setString(2, Thread.currentThread().getName());
pstmt.setLong(3, ManagementServerNode.getManagementServerId());
pstmt.setLong(4, ProcessHandle.current().pid());
if (pstmt.executeUpdate() > 0) {
return true;
}
} catch (SQLException e) {
LOGGER.error("Inserting to cloud.distributed_lock query threw exception ", e);
} catch (Throwable e) {
LOGGER.error("Inserting to cloud.distributed_lock query threw exception ", e);
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Waiting, cloud.distributed_lock already has the lock: " + name);
}
remainingTime = remainingTime - 1;
try {
Thread.sleep(1000L);
} catch (InterruptedException ignore) {
}
} catch (SQLException e) {
LOGGER.error("GET_LOCK() throws exception ", e);
} catch (Throwable e) {
LOGGER.error("GET_LOCK() throws exception ", e);
}

removeConnectionForGlobalLocks(name);
Expand All @@ -234,24 +243,20 @@ public static Class<?> getEntityBeanType(GenericDao<?, Long> dao) {
public static boolean releaseGlobalLock(String name) {
try (Connection conn = getConnectionForGlobalLocks(name, false);) {
if (conn == null) {
LOGGER.error("Unable to acquire DB connection for global lock system");
LOGGER.error("Unable to acquire DB connection for distributed lock: " + name);
assert (false);
return false;
}

try (PreparedStatement pstmt = conn.prepareStatement("SELECT COALESCE(RELEASE_LOCK(?), 0)");) {
try (PreparedStatement pstmt = conn.prepareStatement("DELETE FROM cloud.distributed_lock WHERE name=?")) {
pstmt.setString(1, name);
try (ResultSet rs = pstmt.executeQuery();) {
if (rs != null && rs.first()) {
return rs.getInt(1) > 0;
}
LOGGER.error("releaseGlobalLock:RELEASE_LOCK() returns unexpected result");
if (pstmt.executeUpdate() > 0) {
return true;
}
LOGGER.warn("releaseGlobalLock: failed to remove cloud.distributed_lock lock which does not exist: " + name);
}
} catch (SQLException e) {
LOGGER.error("RELEASE_LOCK() throws exception ", e);
} catch (Throwable e) {
LOGGER.error("RELEASE_LOCK() throws exception ", e);
LOGGER.error("Removing cloud.distributed_lock lock threw exception ", e);
}
return false;
}
Expand Down
23 changes: 8 additions & 15 deletions framework/db/src/test/java/com/cloud/utils/DbUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,28 +151,24 @@ public void cleanup() throws SecurityException, NoSuchFieldException, IllegalArg
public void getGlobalLock() throws SQLException {
Mockito.when(dataSource.getConnection()).thenReturn(connection);
Mockito.when(connection.prepareStatement(Matchers.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
Mockito.when(resultSet.first()).thenReturn(true);
Mockito.when(resultSet.getInt(1)).thenReturn(1);
Mockito.when(preparedStatement.executeUpdate()).thenReturn(1);
Assert.assertTrue(DbUtil.getGlobalLock("TEST", 600));

Mockito.verify(connection).prepareStatement(Matchers.anyString());
Mockito.verify(preparedStatement).close();
Mockito.verify(resultSet).close();
}

@Test
public void getGlobalLockTimeout() throws SQLException {
Mockito.when(dataSource.getConnection()).thenReturn(connection);
Mockito.when(connection.prepareStatement(Matchers.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
Mockito.when(resultSet.first()).thenReturn(true);
Mockito.when(resultSet.getInt(1)).thenReturn(0);
Assert.assertFalse(DbUtil.getGlobalLock("TEST", 600));
Mockito.when(preparedStatement.executeUpdate()).thenReturn(0);

Mockito.verify(connection).prepareStatement(Matchers.anyString());
Mockito.verify(preparedStatement).close();
Mockito.verify(resultSet).close();
final int tries = 2;
Assert.assertFalse(DbUtil.getGlobalLock("TEST", tries));

Mockito.verify(connection, Mockito.times(tries)).prepareStatement(Matchers.anyString());
Mockito.verify(preparedStatement, Mockito.times(tries)).close();
Mockito.verify(connection).close();

// if any error happens, the connection map must be cleared
Expand Down Expand Up @@ -238,13 +234,10 @@ void releaseGlobalLockNotexisting() throws SQLException {
@Test
public void releaseGlobalLock() throws SQLException {
Mockito.when(connection.prepareStatement(Matchers.anyString())).thenReturn(preparedStatement);
Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
Mockito.when(resultSet.first()).thenReturn(true);
Mockito.when(resultSet.getInt(1)).thenReturn(1);
Mockito.when(preparedStatement.executeUpdate()).thenReturn(1);
connectionMap.put("testLock", connection);
Assert.assertTrue(DbUtil.releaseGlobalLock("testLock"));

Mockito.verify(resultSet).close();
Mockito.verify(preparedStatement).close();
Mockito.verify(connection).close();
Assert.assertFalse(connectionMap.containsKey("testLock"));
Expand Down
1 change: 1 addition & 0 deletions setup/db/create-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ DROP TABLE IF EXISTS `cloud`.`image_data_store`;
DROP TABLE IF EXISTS `cloud`.`vm_compute_tags`;
DROP TABLE IF EXISTS `cloud`.`vm_root_disk_tags`;
DROP TABLE IF EXISTS `cloud`.`vm_network_map`;
DROP TABLE IF EXISTS `cloud`.`distributed_lock`;

CREATE TABLE `cloud`.`version` (
`id` bigint unsigned NOT NULL UNIQUE AUTO_INCREMENT COMMENT 'id',
Expand Down

0 comments on commit 9830bbe

Please sign in to comment.