Skip to content

Commit

Permalink
增加异步count支持,全局配置asyncCount,默认false,单次设置:PageHelper.startPage(1, 10).e…
Browse files Browse the repository at this point in the history
…nableAsyncCount();

异步使用独立连接(事务)查询,有增删改操作影响查询时不适合开启异步查询。closed #334
  • Loading branch information
pagehelper committed Nov 5, 2023
1 parent c7d7d00 commit 19043df
Show file tree
Hide file tree
Showing 22 changed files with 580 additions and 92 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.29</version>
<version>8.0.33</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/com/github/pagehelper/Dialect.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@

import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

/**
* 数据库方言,针对不同数据库进行实现
Expand All @@ -48,6 +51,26 @@ public interface Dialect {
*/
boolean skip(MappedStatement ms, Object parameterObject, RowBounds rowBounds);

/**
* 是否使用异步 count 查询,使用异步后不会根据返回的 count 数来判断是否有必要进行分页查询
*
* @return true 异步,false 同步
*/
default boolean isAsyncCount() {
return false;
}

/**
* 执行异步 count 查询
*
* @param task 异步查询任务
* @param <T>
* @return
*/
default <T> Future<T> asyncCountTask(Callable<T> task) {
return ForkJoinPool.commonPool().submit(task);
}

/**
* 执行分页前,返回 true 会进行 count 查询,false 会继续下面的 beforePage 判断
*
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/com/github/pagehelper/Page.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public class Page<E> extends ArrayList<E> implements Closeable {
* 转换count查询时保留子查询的 order by 排序
*/
private Boolean keepSubSelectOrderBy;
/**
* 异步count查询
*/
private Boolean asyncCount;

public Page() {
super();
Expand Down Expand Up @@ -329,6 +333,14 @@ public void setKeepSubSelectOrderBy(Boolean keepSubSelectOrderBy) {
this.keepSubSelectOrderBy = keepSubSelectOrderBy;
}

public Boolean getAsyncCount() {
return asyncCount;
}

public void setAsyncCount(Boolean asyncCount) {
this.asyncCount = asyncCount;
}

/**
* 指定使用的分页实现,如果自己使用的很频繁,建议自己增加一层封装再使用
*
Expand Down Expand Up @@ -466,6 +478,40 @@ public boolean keepSubSelectOrderBy() {
return this.keepSubSelectOrderBy != null && this.keepSubSelectOrderBy;
}

/**
* 异步count查询
*
* @param asyncCount
* @return
*/
public Page<E> asyncCount(boolean asyncCount) {
this.asyncCount = asyncCount;
return this;
}

/**
* 使用异步count查询
*
* @return
*/
public Page<E> enableAsyncCount() {
return asyncCount(true);
}

/**
* 不使用异步count查询
*
* @return
*/
public Page<E> disableAsyncCount() {
return asyncCount(false);
}


public boolean asyncCount() {
return this.asyncCount != null && this.asyncCount;
}

public PageInfo<E> toPageInfo() {
return new PageInfo<E>(this);
}
Expand Down
47 changes: 45 additions & 2 deletions src/main/java/com/github/pagehelper/PageHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;

/**
* Mybatis - 通用分页拦截器<br/>
Expand All @@ -48,9 +53,10 @@
* @version 5.0.0
*/
public class PageHelper extends PageMethod implements Dialect, BoundSqlInterceptor.Chain {
private PageParams pageParams;
private PageAutoDialect autoDialect;
private PageParams pageParams;
private PageAutoDialect autoDialect;
private PageBoundSqlInterceptors pageBoundSqlInterceptors;
private ForkJoinPool asyncCountService;

@Override
public boolean skip(MappedStatement ms, Object parameterObject, RowBounds rowBounds) {
Expand All @@ -62,11 +68,39 @@ public boolean skip(MappedStatement ms, Object parameterObject, RowBounds rowBou
if (StringUtil.isEmpty(page.getCountColumn())) {
page.setCountColumn(pageParams.getCountColumn());
}
//设置默认的异步 count 设置
if (page.getAsyncCount() == null) {
page.setAsyncCount(pageParams.isAsyncCount());
}
autoDialect.initDelegateDialect(ms, page.getDialectClass());
return false;
}
}

@Override
public boolean isAsyncCount() {
return getLocalPage().asyncCount();
}

@Override
public <T> Future<T> asyncCountTask(Callable<T> task) {
//异步执行时需要将ThreadLocal值传递,否则会找不到
AbstractHelperDialect dialectThreadLocal = autoDialect.getDialectThreadLocal();
Page<Object> localPage = getLocalPage();
String countId = UUID.randomUUID().toString();
return asyncCountService.submit(() -> {
try {
//设置 ThreadLocal
autoDialect.setDialectThreadLocal(dialectThreadLocal);
setLocalPage(localPage);
return task.call();
} finally {
autoDialect.clearDelegate();
clearPage();
}
});
}

@Override
public boolean beforeCount(MappedStatement ms, Object parameterObject, RowBounds rowBounds) {
return autoDialect.getDelegate().beforeCount(ms, parameterObject, rowBounds);
Expand Down Expand Up @@ -155,5 +189,14 @@ public void setProperties(Properties properties) {
pageBoundSqlInterceptors.setProperties(properties);
//20180902新增 aggregateFunctions, 允许手动添加聚合函数(影响行数)
CountSqlParser.addAggregateFunctions(properties.getProperty("aggregateFunctions"));
// 异步 asyncCountService 并发度设置,这里默认为应用可用的处理器核心数 * 2,更合理的值应该综合考虑数据库服务器的处理能力
int asyncCountParallelism = Integer.parseInt(properties.getProperty("asyncCountParallelism",
"" + (Runtime.getRuntime().availableProcessors() * 2)));
asyncCountService = new ForkJoinPool(asyncCountParallelism,
pool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("pagehelper-async-count-" + worker.getPoolIndex());
return worker;
}, null, true);
}
}
79 changes: 56 additions & 23 deletions src/main/java/com/github/pagehelper/PageInterceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,32 @@
import com.github.pagehelper.cache.Cache;
import com.github.pagehelper.cache.CacheFactory;
import com.github.pagehelper.page.PageMethod;
import com.github.pagehelper.util.ClassUtil;
import com.github.pagehelper.util.ExecutorUtil;
import com.github.pagehelper.util.MSUtils;
import com.github.pagehelper.util.StringUtil;
import org.apache.ibatis.cache.CacheKey;
import org.apache.ibatis.executor.CachingExecutor;
import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.executor.SimpleExecutor;
import org.apache.ibatis.logging.Log;
import org.apache.ibatis.logging.LogFactory;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.transaction.Transaction;
import org.apache.ibatis.transaction.TransactionFactory;
import org.apache.ibatis.transaction.managed.ManagedTransactionFactory;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;

/**
* Mybatis - 通用分页拦截器
Expand Down Expand Up @@ -134,18 +143,27 @@ public Object intercept(Invocation invocation) throws Throwable {
//开启debug时,输出触发当前分页执行时的PageHelper调用堆栈
// 如果和当前调用堆栈不一致,说明在启用分页后没有消费,当前线程再次执行时消费,调用堆栈显示的方法使用不安全
debugStackTraceLog();
Future<Long> countFuture = null;
//判断是否需要进行 count 查询
if (dialect.beforeCount(ms, parameter, rowBounds)) {
//查询总数
Long count = count(executor, ms, parameter, rowBounds, null, boundSql);
//处理查询总数,返回 true 时继续分页查询,false 时直接返回
if (!dialect.afterCount(count, parameter, rowBounds)) {
//当查询总数为 0 时,直接返回空的结果
return dialect.afterPage(new ArrayList(), parameter, rowBounds);
if (dialect.isAsyncCount()) {
countFuture = asyncCount(ms, boundSql, parameter, rowBounds);
} else {
//查询总数
Long count = count(executor, ms, parameter, rowBounds, null, boundSql);
//处理查询总数,返回 true 时继续分页查询,false 时直接返回
if (!dialect.afterCount(count, parameter, rowBounds)) {
//当查询总数为 0 时,直接返回空的结果
return dialect.afterPage(new ArrayList(), parameter, rowBounds);
}
}
}
resultList = ExecutorUtil.pageQuery(dialect, executor,
ms, parameter, rowBounds, resultHandler, boundSql, cacheKey);
if (countFuture != null) {
Long count = countFuture.get();
dialect.afterCount(count, parameter, rowBounds);
}
} else {
//rowBounds用参数值,不使用分页插件处理时,仍然支持默认的内存分页
resultList = executor.query(ms, parameter, rowBounds, resultHandler, cacheKey, boundSql);
Expand All @@ -158,6 +176,35 @@ public Object intercept(Invocation invocation) throws Throwable {
}
}

/**
* 异步查询总数
*/
private Future<Long> asyncCount(MappedStatement ms, BoundSql boundSql, Object parameter, RowBounds rowBounds) {
Configuration configuration = ms.getConfiguration();
//异步不能复用 BoundSql,因为分页使用时会添加分页参数,这里需要复制一个新的
BoundSql countBoundSql = new BoundSql(configuration, boundSql.getSql(), new ArrayList<>(boundSql.getParameterMappings()), parameter);
//异步想要起作用需要新的数据库连接,需要独立的事务,创建新的Executor,因此异步查询只适合在独立查询中使用,如果混合增删改操作,不能开启异步
Environment environment = configuration.getEnvironment();
TransactionFactory transactionFactory = null;
if (environment == null || environment.getTransactionFactory() == null) {
transactionFactory = new ManagedTransactionFactory();
} else {
transactionFactory = environment.getTransactionFactory();
}
//创建新的事务
Transaction tx = transactionFactory.newTransaction(environment.getDataSource(), null, false);
//使用新的 Executor 执行 count 查询,这里没有加载拦截器,避免递归死循环
Executor countExecutor = new CachingExecutor(new SimpleExecutor(configuration, tx));

return dialect.asyncCountTask(() -> {
try {
return count(countExecutor, ms, parameter, rowBounds, null, countBoundSql);
} finally {
tx.close();
}
});
}

/**
* Spring bean 方式配置时,如果没有配置属性就不会执行下面的 setProperties 方法,就不会初始化
* <p>
Expand Down Expand Up @@ -212,14 +259,8 @@ public void setProperties(Properties properties) {
if (StringUtil.isEmpty(dialectClass)) {
dialectClass = default_dialect_class;
}
Dialect tempDialect = null;
try {
Class<?> aClass = Class.forName(dialectClass);
tempDialect = (Dialect) aClass.newInstance();
tempDialect.setProperties(properties);
} catch (Exception e) {
throw new PageException(e);
}
Dialect tempDialect = ClassUtil.newInstance(dialectClass, properties);
tempDialect.setProperties(properties);

String countSuffix = properties.getProperty("countSuffix");
if (StringUtil.isNotEmpty(countSuffix)) {
Expand All @@ -232,15 +273,7 @@ public void setProperties(Properties properties) {
// 通过 countMsId 配置自定义类
String countMsIdGenClass = properties.getProperty("countMsIdGen");
if (StringUtil.isNotEmpty(countMsIdGenClass)) {
try {
Class<?> aClass = Class.forName(countMsIdGenClass);
countMsIdGen = (CountMsIdGen) aClass.newInstance();
if (countMsIdGen instanceof PageProperties) {
((PageProperties) countMsIdGen).setProperties(properties);
}
} catch (Exception e) {
throw new PageException(e);
}
countMsIdGen = ClassUtil.newInstance(countMsIdGenClass, properties);
}
// 初始化完成后再设置值,保证 dialect 完成初始化
dialect = tempDialect;
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/com/github/pagehelper/dialect/AbstractDialect.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.github.pagehelper.PageProperties;
import com.github.pagehelper.parser.CountSqlParser;
import com.github.pagehelper.parser.DefaultCountSqlParser;
import com.github.pagehelper.util.ClassUtil;
import com.github.pagehelper.util.StringUtil;
import org.apache.ibatis.cache.CacheKey;
import org.apache.ibatis.mapping.BoundSql;
Expand Down Expand Up @@ -59,15 +60,7 @@ public void setProperties(Properties properties) {
// 自定义 jsqlparser 的 sql 解析器
String sqlParser = properties.getProperty("sqlParser");
if (StringUtil.isNotEmpty(sqlParser)) {
try {
Class<?> aClass = Class.forName(sqlParser);
jSqlParser = (JSqlParser) aClass.newInstance();
if (jSqlParser instanceof PageProperties) {
((PageProperties) jSqlParser).setProperties(properties);
}
} catch (Exception e) {
throw new PageException(e);
}
jSqlParser = ClassUtil.newInstance(sqlParser, properties);
} else {
jSqlParser = JSqlParser.DEFAULT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package com.github.pagehelper.dialect.helper;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageProperties;
import com.github.pagehelper.cache.Cache;
import com.github.pagehelper.cache.CacheFactory;
import com.github.pagehelper.dialect.AbstractHelperDialect;
Expand All @@ -34,6 +33,7 @@
import com.github.pagehelper.dialect.replace.SimpleWithNolockReplaceSql;
import com.github.pagehelper.parser.OrderByParser;
import com.github.pagehelper.parser.SqlServerParser;
import com.github.pagehelper.util.ClassUtil;
import com.github.pagehelper.util.StringUtil;
import org.apache.ibatis.cache.CacheKey;
import org.apache.ibatis.mapping.BoundSql;
Expand Down Expand Up @@ -123,15 +123,7 @@ public void setProperties(Properties properties) {
} else if ("simple".equalsIgnoreCase(replaceSql)) {
this.replaceSql = new SimpleWithNolockReplaceSql();
} else {
try {
this.replaceSql = (ReplaceSql) Class.forName(replaceSql).newInstance();
if (this.replaceSql instanceof PageProperties) {
((PageProperties) this.replaceSql).setProperties(properties);
}
} catch (Exception e) {
throw new RuntimeException("The value of the replaceSql parameter configuration does not meet the requirements, and the optional values are simple and regex, or the "
+ ReplaceSql.class.getCanonicalName() + " fully qualified class name of the interface", e);
}
this.replaceSql = ClassUtil.newInstance(replaceSql, properties);
}
String sqlCacheClass = properties.getProperty("sqlCacheClass");
if (StringUtil.isNotEmpty(sqlCacheClass) && !sqlCacheClass.equalsIgnoreCase("false")) {
Expand Down
Loading

0 comments on commit 19043df

Please sign in to comment.