I'm cutting and re-establishing the network connection between a client and a server. It works for a little while but fails at some point. The client never reconnects.
Here is the test case. It is using toxiproxy server launched locally with default parameters.
In build.gradle
:
testCompile 'eu.rekawek.toxiproxy:toxiproxy-java:2.1.2'
import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import org.ehcache.Cache;
import org.ehcache.PersistentCacheManager;
import org.ehcache.clustered.client.config.builders.ClusteredResourcePoolBuilder;
import org.ehcache.clustered.client.config.builders.ClusteredStoreConfigurationBuilder;
import org.ehcache.clustered.client.config.builders.ClusteringServiceConfigurationBuilder;
import org.ehcache.clustered.client.config.builders.TimeoutsBuilder;
import org.ehcache.clustered.common.Consistency;
import org.ehcache.config.builders.CacheConfigurationBuilder;
import org.ehcache.config.builders.CacheManagerBuilder;
import org.ehcache.config.builders.ResourcePoolsBuilder;
import org.ehcache.config.units.MemoryUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.terracotta.testing.rules.Cluster;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicReference;
import static eu.rekawek.toxiproxy.model.ToxicDirection.DOWNSTREAM;
import static org.assertj.core.api.Assertions.assertThat;
import static org.terracotta.testing.rules.BasicExternalClusterBuilder.newCluster;
public class NetworkFailureTest extends ClusteredTests {
private static final String RESOURCE_CONFIG =
"<config xmlns:ohr='http://www.terracotta.org/config/offheap-resource'>"
+ "<ohr:offheap-resources>"
+ "<ohr:resource name=\"primary-server-resource\" unit=\"MB\">512</ohr:resource>"
+ "</ohr:offheap-resources>"
+ "</config>"
+ "<service xmlns:lease='http://www.terracotta.org/service/lease'>"
+ "<lease:connection-leasing>"
+ "<lease:lease-length unit='seconds'>5</lease:lease-length>"
+ "</lease:connection-leasing>"
+ "</service>\n";
private PersistentCacheManager cacheManager;
private ToxiproxyClient client;
private Proxy activeProxy;
@ClassRule
public static Cluster CLUSTER = newCluster(1)
.in(new File("build/cluster"))
.withServiceFragment(RESOURCE_CONFIG)
.build();
@Rule
public TestName name = new TestName();
@Before
public void startServers() throws Exception {
CLUSTER.getClusterControl().startAllServers();
CLUSTER.getClusterControl().waitForActive();
URI clusterUri = CLUSTER.getConnectionURI();
client = new ToxiproxyClient();
try {
activeProxy = client.getProxy("NetworkFailureTest.proxy");
activeProxy.delete();
} catch (IOException e) {
// It means the proxy isn't there. Just ignore
}
activeProxy = client.createProxy("NetworkFailureTest.proxy", "0.0.0.0:21212", "localhost" + ":" + clusterUri.getPort());
URI proxyUri = URI.create("terracotta://localhost:21212");
CacheManagerBuilder<PersistentCacheManager> builder = CacheManagerBuilder.newCacheManagerBuilder()
.with(ClusteringServiceConfigurationBuilder.cluster(proxyUri)
.timeouts(TimeoutsBuilder.timeouts().write(Duration.ofSeconds(20)))
.autoCreate())
.withCache("cache", CacheConfigurationBuilder.newCacheConfigurationBuilder(Integer.class, String.class,
ResourcePoolsBuilder.newResourcePoolsBuilder()
.with(ClusteredResourcePoolBuilder.clusteredDedicated("primary-server-resource", 32, MemoryUnit.MB)))
.add(ClusteredStoreConfigurationBuilder.withConsistency(Consistency.STRONG)));
cacheManager = builder.build(true);
}
@After
public void tearDown() throws Exception {
System.out.println("*************************************************** TearDown ***************************************************");
client.reset();
if(cacheManager != null) {
cacheManager.close();
cacheManager.destroy();
}
activeProxy.delete();
CLUSTER.getClusterControl().terminateAllServers();
}
@Test
public void proxy() throws Exception {
Cache<Integer, String> cache = cacheManager.getCache("cache", Integer.class, String.class);
for (int i = 0; i < 100; i++) {
System.out.println("***************************************** Loop: " + i);
upAndDown(cache);
}
}
private void upAndDown(Cache<Integer, String> cache) throws InterruptedException {
Phaser phaser = new Phaser(2);
AtomicReference<Throwable> exception = new AtomicReference<>();
Thread thread = new Thread(() -> {
try {
// Everything is alright phase
phaser.arriveAndAwaitAdvance();
Thread.sleep(5_000);
// Loosing connection phase
phaser.arriveAndAwaitAdvance();
activeProxy.toxics().timeout("failing", DOWNSTREAM, 5_000);
Thread.sleep(10_000);
// Getting back connection phase
phaser.arriveAndAwaitAdvance();
activeProxy.toxics().get("failing").remove();
} catch(Throwable e) {
exception.set(e);
} finally {
phaser.arriveAndDeregister();
}
});
thread.start();
cache.put(1, "a");
// Everything is alright phase
System.out.println("************************** Alright");
phaser.arriveAndDeregister();
while(phaser.getPhase() == 0) {
assertThat(cache.get(1)).isEqualTo("a");
Thread.sleep(1_000);
}
// Loosing connection phase
System.out.println("************************** Loosing");
while(cache.get(1) != null) {
Thread.sleep(1_000);
}
// Getting back connection phase
System.out.println("************************** Back to our feet");
while(cache.get(1) == null) {
System.out.println("Trying to get connection back");
cache.put(1, "a");
Thread.sleep(1_000);
}
while(!phaser.isTerminated());
assertThat(exception.get()).isNull();
System.out.println("************************** All done");
}
}
The full code is here: https://github.com/henri-tremblay/ehcache3/tree/networkingtest