diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 6b0e1f75b8db..f8f042df5448 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -530,6 +530,10 @@ public void invalidate() public void invalidate(boolean expectMBean) { + if(engine != null) { + engine.invalidate(this); + } + // disable and cancel in-progress compactions before invalidating valid = false; diff --git a/src/java/org/apache/cassandra/engine/StorageEngine.java b/src/java/org/apache/cassandra/engine/StorageEngine.java index f34168334ed2..4dbe2bf6343e 100644 --- a/src/java/org/apache/cassandra/engine/StorageEngine.java +++ b/src/java/org/apache/cassandra/engine/StorageEngine.java @@ -50,6 +50,8 @@ UnfilteredRowIterator queryStorage(ColumnFamilyStore cfs, void close(final ColumnFamilyStore cfs); + void invalidate(final ColumnFamilyStore cfs); + void setCompactionThroughputMbPerSec(int throughputMbPerSec); void forceMajorCompaction(ColumnFamilyStore cfs); diff --git a/src/java/org/apache/cassandra/rocksdb/RocksDBCF.java b/src/java/org/apache/cassandra/rocksdb/RocksDBCF.java index 88faef2bd391..e1ff58fd463e 100644 --- a/src/java/org/apache/cassandra/rocksdb/RocksDBCF.java +++ b/src/java/org/apache/cassandra/rocksdb/RocksDBCF.java @@ -52,7 +52,9 @@ import org.apache.cassandra.rocksdb.tools.SanityCheckUtils; import org.apache.cassandra.rocksdb.tools.StreamingConsistencyCheckUtils; import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Hex; import org.rocksdb.BlockBasedTableConfig; import org.rocksdb.BloomFilter; @@ -82,14 +84,13 @@ public class RocksDBCF implements RocksDBCFMBean { private static final Logger logger = LoggerFactory.getLogger(RocksDBCF.class); - private final UUID cfID; + private final UUID cfId; private final ColumnFamilyStore cfs; private final IPartitioner partitioner; private final RocksDBEngine engine; private final RocksDB rocksDB; private final Statistics stats; private final RocksDBTableMetrics rocksMetrics; - private final String mbeanName; private final CassandraCompactionFilter compactionFilter; private final CassandraValueMergeOperator mergeOperator; @@ -97,14 +98,17 @@ public class RocksDBCF implements RocksDBCFMBean private final WriteOptions disableWAL; private final FlushOptions flushOptions; + private final String rocksDBTableDir; + public RocksDBCF(ColumnFamilyStore cfs) throws RocksDBException { this.cfs = cfs; - cfID = cfs.metadata.cfId; + cfId = cfs.metadata.cfId; partitioner = cfs.getPartitioner(); engine = (RocksDBEngine) cfs.engine; - String rocksDBTableDir = ROCKSDB_DIR + "/" + cfs.keyspace.getName() + "/" + cfs.name; + rocksDBTableDir = String.format("%s/%s/%s-%s", + ROCKSDB_DIR, cfs.keyspace.getName(), cfs.name, ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))); FileUtils.createDirectory(ROCKSDB_DIR); FileUtils.createDirectory(rocksDBTableDir); @@ -196,8 +200,12 @@ public RocksDBCF(ColumnFamilyStore cfs) throws RocksDBException disableWAL = new WriteOptions().setDisableWAL(true); flushOptions = new FlushOptions().setWaitForFlush(true); + registerMBean(); + } + + private void registerMBean() { // Register the mbean. - mbeanName = getMbeanName(cfs.keyspace.getName(), cfs.getTableName()); + String mbeanName = getMbeanName(cfs.keyspace.getName(), cfs.getTableName()); try { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); @@ -209,6 +217,25 @@ public RocksDBCF(ColumnFamilyStore cfs) throws RocksDBException } } + private void unregisterMBean() { + String mbeanName = getMbeanName(cfs.keyspace.getName(), cfs.getTableName()); + try + { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName mbean = new ObjectName(mbeanName); + if (mbs.isRegistered(mbean)) { + mbs.unregisterMBean(mbean); + } + } + catch (Exception e) + { + JVMStabilityInspector.inspectThrowable(e); + // this shouldn't block anything. + logger.warn("Failed unregistering mbean: {}", mbeanName, e); + } + } + + public static String getMbeanName(String keyspace, String table) { return String.format("org.apache.cassandra.rocksdbcf:keyspace=%s,table=%s", keyspace, table); @@ -296,12 +323,19 @@ protected void close() throws RocksDBException synchronized (engine.rocksDBFamily) { rocksDB.close(); + unregisterMBean(); // remove the rocksdb instance, since it's not usable - engine.rocksDBFamily.remove(cfID); + engine.rocksDBFamily.remove(cfId); } } + protected void destroy() throws RocksDBException { + logger.info("Deleting rocksdb table: " + cfs.name); + rocksDB.destroyDB(rocksDBTableDir, new Options()); + } + + public String dumpPrefix(byte[] rocksKeyPrefix, int limit) { StringBuilder sb = new StringBuilder(); @@ -337,9 +371,9 @@ public String dumpPrefix(byte[] rocksKeyPrefix, int limit) return sb.toString(); } - public UUID getCfID() + public UUID getCfId() { - return cfID; + return cfId; } @Override diff --git a/src/java/org/apache/cassandra/rocksdb/RocksDBEngine.java b/src/java/org/apache/cassandra/rocksdb/RocksDBEngine.java index 9b0ab6fa8d16..367d436f7f30 100644 --- a/src/java/org/apache/cassandra/rocksdb/RocksDBEngine.java +++ b/src/java/org/apache/cassandra/rocksdb/RocksDBEngine.java @@ -142,13 +142,24 @@ public void truncate(ColumnFamilyStore cfs) rocksDBCF.truncate(); else logger.info("Can not find rocksdb table: " + cfs.name); - } + catch (RocksDBException e) { logger.error(e.toString(), e); } } + public void invalidate(ColumnFamilyStore cfs) { + try { + RocksDBCF rocksDBCF = getRocksDBCF(cfs); + rocksDBCF.close(); + rocksDBCF.destroy(); + } + catch(RocksDBException e) { + logger.error(e.toString(), e); + } + } + public void close(ColumnFamilyStore cfs) { try diff --git a/src/java/org/apache/cassandra/rocksdb/streaming/RocksDBStreamWriter.java b/src/java/org/apache/cassandra/rocksdb/streaming/RocksDBStreamWriter.java index 450c940ce9ea..574b01717a23 100644 --- a/src/java/org/apache/cassandra/rocksdb/streaming/RocksDBStreamWriter.java +++ b/src/java/org/apache/cassandra/rocksdb/streaming/RocksDBStreamWriter.java @@ -148,6 +148,6 @@ public long getOutgoingBytes() private void updateProgress(boolean completed) { - RocksDBStreamUtils.rocksDBProgress(session, rocksDBCF.getCfID().toString(), ProgressInfo.Direction.OUT, outgoingBytes, streamedPairs, estimatedTotalKeys, completed); + RocksDBStreamUtils.rocksDBProgress(session, rocksDBCF.getCfId().toString(), ProgressInfo.Direction.OUT, outgoingBytes, streamedPairs, estimatedTotalKeys, completed); } }