I use spring boot, bucket4j+redis to build my app. and i deploy it in k8s cluster with 2 replicas.
Here is maven
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.17.0</version>
</dependency>
<!-- Bucket4J starter = Bucket4J + JCache -->
<dependency>
<groupId>com.giffing.bucket4j.spring.boot.starter</groupId>
<artifactId>bucket4j-spring-boot-starter</artifactId>
<version>0.5.2</version>
<exclusions>
<exclusion>
<groupId>org.ehcache</groupId>
<artifactId>ehcache</artifactId>
</exclusion>
</exclusions>
</dependency>
Here is config for redis and proxyManager
@Configuration
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public Config config() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port);
config.useSingleServer().setPassword(password);
config.useSingleServer().setConnectionPoolSize(2);
config.useSingleServer().setConnectionMinimumIdleSize(2);
return config;
}
@Bean("tokenCache")
public CacheManager cacheManager(Config config) {
CacheManager manager = Caching.getCachingProvider().getCacheManager();
manager.createCache(RedisKeyConstant.TOKEN_CONFIG_CACHE, RedissonConfiguration.fromConfig(config));
return manager;
}
@Bean
ProxyManager<String> proxyManager(CacheManager cacheManager) {
return new JCacheProxyManager<>(cacheManager.getCache(RedisKeyConstant.TOKEN_CONFIG_CACHE));
}
}
Here is my rate limiter
@Service
public class RateLimiter implements RateLimiter {
@Autowired
private ProxyManager<String> proxyManager;
private Bucket bucket = null;
public Bucket getBucket() {
Supplier<BucketConfiguration> configSupplier = this.getConfigSupplierForUser();
this.bucket = proxyManager.builder().build(ApiConstant.BUCKET, configSupplier);
return this.bucket;
}
private Supplier<BucketConfiguration> getConfigSupplierForUser() {
Refill refill = Refill.greedy(1, Duration.ofSeconds(1));
Bandwidth limit = Bandwidth.classic(4, refill).withInitialTokens(4);
return () -> (BucketConfiguration.builder()
.addLimit(limit).build());
}
}
Here is where i use rate limiter, I use it to limit the consumption speed for rabbitMq to limit Limit the rate of calling third-party APIs.
@Component
public class ConsumerMethod {
private static final Logger logger = LoggerFactory.getLogger(ConsumerMethod.class);
@Autowired
private RateLimiter rateLimiter;
@Bean
Consumer<Message<String>> Consumer() {
logger.info("init Consumer");
return message -> onMessage(message);
}
public void onMessage(Message<String> message) {
Channel channel = message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class);
Long l = message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class);
DTO dto = JSON.parseObject(message.getPayload(), DTO.class);
Bucket bucket = rateLimiter.getBucket();
long availableTokens = bucket.getAvailableTokens();
logger.info("-----------Before Available Tokens are " + availableTokens);
try {
if (bucket.asBlocking().tryConsume(1, Duration.ofSeconds(1))) {
long availableTokens1 = bucket.getAvailableTokens();
logger.info("-----------Token consumed successfully, after Available Tokens are " + availableTokens1);
RabbitMqUtils.manualAck(channel, l);
// Call third-party api
} else {
logger.error("-----------Token consumed failed.");
RabbitMqUtils.manualAck(channel, l);
// Do some failure methods.
}
} catch (InterruptedException e) {
RabbitMqUtils.manualAck(channel, l);
e.printStackTrace();
}
}
where is the log after processing the msg from rabbitMQ. And the starting number of each line is indicates which replica it belongs to.
1 2023-01-05 11:12:09.391 [n_group-1:28433] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are 4
2 2023-01-05 11:12:09.472 [n_group-1:28480] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are 4
1 2023-01-05 11:12:09.499 [n_group-1:28433] INFO c.a.c.s.stream.ConsumerMethod - -----------Token consumed successfully, after Available Tokens are 3
1 2023-01-05 11:12:09.579 [n_group-1:28433] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are 2
2 2023-01-05 11:12:09.608 [n_group-1:28480] INFO c.a.c.s.stream.ConsumerMethod - -----------Token consumed successfully, after Available Tokens are 2
2 2023-01-05 11:12:09.695 [n_group-1:28480] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are 1
1 2023-01-05 11:12:09.754 [n_group-1:28433] INFO c.a.c.s.stream.ConsumerMethod - -----------Token consumed successfully, after Available Tokens are 1
1 2023-01-05 11:12:09.811 [n_group-1:28433] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are 0
2 2023-01-05 11:12:09.831 [n_group-1:28480] INFO c.a.c.s.stream.ConsumerMethod - -----------Token consumed successfully, after Available Tokens are 0
2 2023-01-05 11:12:09.896 [n_group-1:28480] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are -1
2 2023-01-05 11:12:09.928 [n_group-1:28480] ERROR c.a.c.s.stream.ConsumerMethod - -----------Token consumed failed.
2 2023-01-05 11:12:10.054 [n_group-1:28480] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are -1
2 2023-01-05 11:12:10.092 [n_group-1:28480] ERROR c.a.c.s.stream.ConsumerMethod - -----------Token consumed failed.
2 2023-01-05 11:12:10.155 [n_group-1:28480] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are -1
2 2023-01-05 11:12:10.194 [n_group-1:28480] ERROR c.a.c.s.stream.ConsumerMethod - -----------Token consumed failed.
2 2023-01-05 11:12:10.254 [n_group-1:28480] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are -1
2 2023-01-05 11:12:10.290 [n_group-1:28480] ERROR c.a.c.s.stream.ConsumerMethod - -----------Token consumed failed.
2 2023-01-05 11:12:10.351 [n_group-1:28480] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are -1
2 2023-01-05 11:12:10.379 [n_group-1:28480] ERROR c.a.c.s.stream.ConsumerMethod - -----------Token consumed failed.
2 2023-01-05 11:12:10.442 [n_group-1:28480] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are 0
1 2023-01-05 11:12:10.493 [n_group-1:28433] INFO c.a.c.s.stream.ConsumerMethod - -----------Token consumed successfully, after Available Tokens are 0
1 2023-01-05 11:12:10.552 [n_group-1:28433] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are -1
1 2023-01-05 11:12:10.618 [n_group-1:28433] ERROR c.a.c.s.stream.ConsumerMethod - -----------Token consumed failed.
1 2023-01-05 11:12:10.708 [n_group-1:28433] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are -1
1 2023-01-05 11:12:10.740 [n_group-1:28433] ERROR c.a.c.s.stream.ConsumerMethod - -----------Token consumed failed.
1 2023-01-05 11:12:10.824 [n_group-1:28433] INFO c.a.c.s.stream.ConsumerMethod - -----------Before Available Tokens are -1
1 2023-01-05 11:12:10.851 [n_group-1:28433] ERROR c.a.c.s.stream.ConsumerMethod - -----------Token consumed failed.
2 2023-01-05 11:12:11.497 [n_group-1:28480] INFO c.a.c.s.stream.ConsumerMethod - -----------Token consumed successfully, after Available Tokens are 0
I wonder why the number of token in bucket can be -1, and is it the correct way to use bucket4j in multiple replicas case?
How can i to optimize my app to limit the calling third-party api rate from multiple replicas?