Skip to content
This repository has been archived by the owner on Sep 28, 2023. It is now read-only.

Commit

Permalink
Specify source DCs for bootstrap streaming
Browse files Browse the repository at this point in the history
Summary:
enable user to configure source data centers to do streaming upon bootstrap.
corresponding unit tests added.

Test Plan: all existing tests and newly added tests passed.

Reviewers: wpc, christina1012, #ig-cassandra

Differential Revision: https://phabricator.intern.facebook.com/D7094718

Tasks: T25583201

Tags: bootcamp
  • Loading branch information
Chen Li committed Mar 2, 2018
1 parent 30882b9 commit 4dff91b
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 0 deletions.
19 changes: 19 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.security.Key;
import java.util.*;

import javax.validation.constraints.NotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
Expand Down Expand Up @@ -2064,4 +2066,21 @@ public static long getGCWarnThreshold()
return conf.gc_warn_threshold_in_ms;
}

/**
* Get a {@link Set} of source datacenters
* @return a {@link Set} of source datacenters if specified; an empty set if none has been specified
*/
@NotNull
public static Set<String> getBootStrapSourceDCs(){
Set<String> sourceDCs = new HashSet<>();
// assume DC's are space delimitted. and keep current name instead of make it cassandra.bootstrap_source_dcs
String dataCenters = System.getProperty("cassandra.bootstrap_source_dc");
if(dataCenters==null)
{
logger.debug("No source DC has been configured.");
return sourceDCs;
}
sourceDCs.addAll(Arrays.asList(dataCenters.split("\\s+")));
return sourceDCs;
}
}
15 changes: 15 additions & 0 deletions src/java/org/apache/cassandra/dht/BootStrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Keyspace;
Expand All @@ -38,6 +39,7 @@
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.*;
Expand Down Expand Up @@ -79,10 +81,23 @@ public ListenableFuture<StreamState> bootstrap(StreamStateStore stateStore, bool
stateStore);
streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
streamer.addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
Set<String> configuredSourceDCs = DatabaseDescriptor.getBootStrapSourceDCs();
if (!configuredSourceDCs.isEmpty())
{
logger.info("Source data centers specified by user: ", configuredSourceDCs);
streamer.addSourceFilter(new RangeStreamer.MultiDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(),
tokenMetadata.getTopology().getDatacenterRacks().keySet(),
configuredSourceDCs));
}

for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces())
{
AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy();
if (!configuredSourceDCs.isEmpty() && !(strategy instanceof NetworkTopologyStrategy))
{
throw new IllegalStateException("User specified source datacenters to stream from, but keyspace "
+ keyspaceName + "(" + strategy + ") is not using NetWorkTopologyStrategy.");
}
streamer.addRanges(keyspaceName, strategy.getPendingAddressRanges(tokenMetadata, tokens, address));
}

Expand Down
36 changes: 36 additions & 0 deletions src/java/org/apache/cassandra/dht/RangeStreamer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -111,6 +112,41 @@ public boolean shouldInclude(InetAddress endpoint)
}
}

/**
* Source filter which excludes any endpoints that are not in specified data centers.
*/
public static class MultiDatacenterFilter implements ISourceFilter
{
private final Set<String> desiredSourceDCs;
private final Set<String> realSourceDCs;
private final IEndpointSnitch snitch;

/**
* @param snitch
* @param realSourceDCs current data centers according to {@link org.apache.cassandra.locator.TokenMetadata.Topology}
* @param desiredSourceDCs user configured source data centers
* @throws IllegalArgumentException if desiredSourceDCs is not a subset of realSourceDCs
*
*/
public MultiDatacenterFilter(IEndpointSnitch snitch, Set<String> realSourceDCs, Set<String> desiredSourceDCs)
{
this.desiredSourceDCs = Validate.notNull(desiredSourceDCs);
this.realSourceDCs = Validate.notNull(realSourceDCs);
if (!realSourceDCs.containsAll(desiredSourceDCs))
{
throw new IllegalArgumentException("some of desired DCs " + desiredSourceDCs +
" are not part of running DCs " + realSourceDCs + ".");
}
this.snitch = snitch;
}

public boolean shouldInclude(InetAddress endpoint)
{
String dc = snitch.getDatacenter(endpoint);
return desiredSourceDCs.contains(dc);
}
}

/**
* Source filter which excludes the current node from source calculations
*/
Expand Down
20 changes: 20 additions & 0 deletions test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

@RunWith(OrderedJUnit4ClassRunner.class)
public class DatabaseDescriptorTest
Expand Down Expand Up @@ -271,4 +272,23 @@ public void testRpcAddress() throws Exception
DatabaseDescriptor.applyAddressConfig(testConfig);

}

@Test
public void testGetSourceDCs() throws Exception
{
// case 0: no config can be loaded
assertTrue(DatabaseDescriptor.getBootStrapSourceDCs().isEmpty());

// case 1: single source DC
System.setProperty("cassandra.bootstrap_source_dc","DC1");
assertTrue(DatabaseDescriptor.getBootStrapSourceDCs().size()==1);
assertTrue(DatabaseDescriptor.getBootStrapSourceDCs().contains("DC1"));

// case 2: multi source DC's
System.setProperty("cassandra.bootstrap_source_dc","DC1 DC2 DC3");
assertTrue(DatabaseDescriptor.getBootStrapSourceDCs().size()==3);
assertTrue(DatabaseDescriptor.getBootStrapSourceDCs().contains("DC1"));
assertTrue(DatabaseDescriptor.getBootStrapSourceDCs().contains("DC2"));
assertTrue(DatabaseDescriptor.getBootStrapSourceDCs().contains("DC3"));
}
}
87 changes: 87 additions & 0 deletions test/unit/org/apache/cassandra/dht/RangeStreamerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.dht;

import java.net.InetAddress;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.RackInferringSnitch;

import static org.junit.Assert.*;

@RunWith(JUnit4.class)
public class RangeStreamerTest
{
static IEndpointSnitch snitch;

@BeforeClass
public static void setUp(){
snitch = new RackInferringSnitch();
}

/**
*
* Assume we have real DCs {1,2,3} and user configured {2,3}.
* As a result any endpoint from DC 2 or 3 should be included.
*/
@Test
public void testMultiDatacenterFilter() throws Exception
{
Set<String> realDCs = new HashSet<>(Arrays.asList("1", "2", "3"));
Set<String> desiredDCs = new HashSet<>(Arrays.asList("2","3"));
// {2, 3} is a subset of {1, 2, 3}
RangeStreamer.MultiDatacenterFilter multiDatacenterFilter = new RangeStreamer.MultiDatacenterFilter(snitch, realDCs, desiredDCs);
InetAddress endpointFromDC1=InetAddress.getByName("192.1.0.10");
InetAddress endpointFromDC2=InetAddress.getByName("192.2.0.10");
InetAddress endpointFromDC5=InetAddress.getByName("192.5.0.10");
assertFalse(multiDatacenterFilter.shouldInclude(endpointFromDC1));
assertTrue(multiDatacenterFilter.shouldInclude(endpointFromDC2));
assertFalse(multiDatacenterFilter.shouldInclude(endpointFromDC5));
}

@Test(expected = NullPointerException.class)
public void testMultiDatacenterFilterNullDesiredDCs()
{
RangeStreamer.MultiDatacenterFilter multiDatacenterFilter = new RangeStreamer.MultiDatacenterFilter(snitch, null, new HashSet<>());
}

@Test(expected = NullPointerException.class)
public void testMultiDatacenterFilterNullRealDCs()
{
RangeStreamer.MultiDatacenterFilter multiDatacenterFilter = new RangeStreamer.MultiDatacenterFilter(snitch, new HashSet<>(), null);
}

@Test(expected = IllegalArgumentException.class)
public void testMultiDatacenterFilterIllegalArgs()
{
Set<String> realDCs = new HashSet<>(Arrays.asList("1", "2", "3"));
Set<String> desiredDCs = new HashSet<>(Arrays.asList("4","3"));
// {4,3} is not a subset of {1,2,3}
RangeStreamer.MultiDatacenterFilter multiDatacenterFilter = new RangeStreamer.MultiDatacenterFilter(snitch, realDCs, desiredDCs);
}
}

0 comments on commit 4dff91b

Please sign in to comment.