Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1422,5 +1422,10 @@ public final class DataNodeQueryMessages {

public static final String CANT_CONNECT_TO_NODE_PREFIX = "can't connect to node ";
public static final String REMOVE_AINODE_FAILED = "Remove AINode failed: ";

public static final String QUERY_TIMEOUT_IN_FETCH_SCHEMA = "Query execution is time out while fetching schema";

public static final String QUERY_EXECUTION_MISSING = "Query execution %s is missing during fetching device schema";

private DataNodeQueryMessages() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -511,4 +511,8 @@ private StorageEngineMessages() {}
public static final String STRING_NOT_LEGAL_REPAIR_LOG = "String '%s' is not a legal repair log";

public static final String WRONG_LOAD_COMMAND_S = "Wrong load command %s.";

public static final String FAILED_TO_FIND_DATA_REGION = "Failed to create state machine for consensus group %s, because data region does not exist";

public static final String DATA_REGION_IS_NULL = "Data region is null";
}
Original file line number Diff line number Diff line change
Expand Up @@ -1421,5 +1421,11 @@ public final class DataNodeQueryMessages {

public static final String CANT_CONNECT_TO_NODE_PREFIX = "无法连接到节点 ";
public static final String REMOVE_AINODE_FAILED = "移除 AINode 失败:";

public static final String QUERY_TIMEOUT_IN_FETCH_SCHEMA = "查询在拉取元数据时,执行超时";

public static final String QUERY_EXECUTION_MISSING = "查询执行实体 %s 在拉取元数据期间丢失";


private DataNodeQueryMessages() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -511,4 +511,8 @@ private StorageEngineMessages() {}
public static final String STRING_NOT_LEGAL_REPAIR_LOG = "字符串 '%s' 不是合法的修复日志";

public static final String WRONG_LOAD_COMMAND_S = "错误的 load 命令 %s。";

public static final String FAILED_TO_FIND_DATA_REGION = "共识组 %s 底层状态机创建失败, 因为 DataRegion 没找到。";

public static final String DATA_REGION_IS_NULL = "Data region 是空";
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.dataregion.DataRegionStateMachine;
import org.apache.iotdb.db.consensus.statemachine.dataregion.IoTConsensusDataRegionStateMachine;
import org.apache.iotdb.db.i18n.StorageEngineMessages;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
Expand All @@ -51,6 +52,8 @@

import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

Expand All @@ -60,6 +63,8 @@
*/
public class DataRegionConsensusImpl {

private static final Logger LOGGER = LoggerFactory.getLogger(DataRegionConsensusImpl.class);

private DataRegionConsensusImpl() {
// do nothing
}
Expand Down Expand Up @@ -113,6 +118,11 @@ private static void reinitializeStatics() {

private static DataRegionStateMachine createDataRegionStateMachine(ConsensusGroupId gid) {
DataRegion dataRegion = StorageEngine.getInstance().getDataRegion((DataRegionId) gid);
if (dataRegion == null) {
String errorMsg = String.format(StorageEngineMessages.FAILED_TO_FIND_DATA_REGION, gid);
LOGGER.error(errorMsg);
throw new IllegalArgumentException(errorMsg);
}
if (ConsensusFactory.IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
return new IoTConsensusDataRegionStateMachine(dataRegion);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.BaseStateMachine;
import org.apache.iotdb.db.i18n.DataNodeMiscMessages;
import org.apache.iotdb.db.i18n.StorageEngineMessages;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
Expand Down Expand Up @@ -248,6 +249,10 @@ protected TSStatus write(PlanNode planNode) {

@Override
public DataSet read(IConsensusRequest request) {
if (region == null) {
logger.error(StorageEngineMessages.DATA_REGION_IS_NULL);
return null;
}
if (request instanceof GetConsensusReqReaderPlan) {
return region.getWALNode().orElseThrow(UnsupportedOperationException::new);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@

package org.apache.iotdb.db.exception.query;

import org.apache.iotdb.commons.exception.IoTDBRuntimeException;

import static org.apache.iotdb.rpc.TSStatusCode.QUERY_TIMEOUT;

/** This class is used to throw run time exception when query is time out. */
public class QueryTimeoutRuntimeException extends RuntimeException {
public class QueryTimeoutRuntimeException extends IoTDBRuntimeException {
public static final String QUERY_TIMEOUT_EXCEPTION_MESSAGE =
"Current query is time out, query start time is %d, ddl is %d, current time is %d, please check your statement or modify timeout parameter.";

public QueryTimeoutRuntimeException(long startTime, long currentTime, long timeout) {
super(
String.format(
QUERY_TIMEOUT_EXCEPTION_MESSAGE, startTime, startTime + timeout, currentTime));
String.format(QUERY_TIMEOUT_EXCEPTION_MESSAGE, startTime, startTime + timeout, currentTime),
QUERY_TIMEOUT.getStatusCode(),
true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.exception.QuerySchemaFetchFailedException;
import org.apache.iotdb.commons.path.MeasurementPath;
Expand All @@ -30,6 +31,7 @@
import org.apache.iotdb.commons.schema.template.Template;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
Expand Down Expand Up @@ -294,8 +296,10 @@ private ClusterSchemaTree executeSchemaFetchQuery(
}
}
} else {
throw new RuntimeException(
String.format("Fetch Schema failed, because queryExecution is null for %s", queryId));
throw new IoTDBRuntimeException(
String.format(
DataNodeQueryMessages.QUERY_EXECUTION_MISSING, executionResult.queryId.getId()),
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}

result.setDatabases(databaseSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.QueryTimeoutException;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.commons.schema.column.ColumnHeader;
import org.apache.iotdb.commons.schema.filter.SchemaFilter;
Expand All @@ -30,10 +31,14 @@
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.schema.ConvertSchemaPredicateToFilterVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
Expand Down Expand Up @@ -75,6 +80,8 @@

public class TableDeviceSchemaFetcher {

private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();

private final SqlParser relationSqlParser = new SqlParser();

private final Coordinator coordinator = Coordinator.getInstance();
Expand Down Expand Up @@ -102,7 +109,7 @@
return attributeGuard;
}

Map<IDeviceID, Map<String, Binary>> fetchMissingDeviceSchemaForDataInsertion(

Check failure on line 112 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 22 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5O9mTrCmNIQl9Zbphy&open=AZ5O9mTrCmNIQl9Zbphy&pullRequest=17749
final FetchDevice statement, final MPPQueryContext context) {
DeviceSchemaRequestCache.FetchMissingDeviceSchema schema =
requestCache.getOrCreatePendingRequest(statement);
Expand Down Expand Up @@ -143,43 +150,54 @@
executionResult.status.getMessage(), executionResult.status.getCode());
}

final List<ColumnHeader> columnHeaderList =
coordinator.getQueryExecution(queryId).getDatasetHeader().getColumnHeaders();
final int tagLength = DataNodeTableCache.getInstance().getTable(database, table).getTagNum();
final Map<IDeviceID, Map<String, Binary>> fetchedDeviceSchema = new HashMap<>();

while (coordinator.getQueryExecution(queryId).hasNextResult()) {
final Optional<TsBlock> tsBlock;
try {
tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
} catch (final IoTDBException e) {
t = e;
throw AsyncSendPlanNodeHandler.needRetry(e)
? new IoTDBRuntimeException(
e.getCause(), TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode())
: new IoTDBRuntimeException(
String.format("Fetch Table Device Schema failed because %s", e.getMessage()),
e.getErrorCode(),
e.isUserException());
}
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
break;
}
final Column[] columns = tsBlock.get().getValueColumns();
for (int i = 0; i < tsBlock.get().getPositionCount(); i++) {
final String[] nodes = new String[tagLength + 1];
final Map<String, Binary> attributeMap = new HashMap<>();
constructNodesArrayAndAttributeMap(
attributeMap, nodes, table, columnHeaderList, columns, tableInstance, i);

fetchedDeviceSchema.put(IDeviceID.Factory.DEFAULT_FACTORY.create(nodes), attributeMap);
IQueryExecution queryExecution = coordinator.getQueryExecution(queryId);

if (queryExecution != null) {
final List<ColumnHeader> columnHeaderList =
queryExecution.getDatasetHeader().getColumnHeaders();
final int tagLength =
DataNodeTableCache.getInstance().getTable(database, table).getTagNum();
final Map<IDeviceID, Map<String, Binary>> fetchedDeviceSchema = new HashMap<>();

while (queryExecution.hasNextResult()) {
final Optional<TsBlock> tsBlock;
try {
tsBlock = queryExecution.getBatchResult();
} catch (final IoTDBException e) {
t = e;
throw AsyncSendPlanNodeHandler.needRetry(e)
? new IoTDBRuntimeException(
e.getCause(), TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode())
: new IoTDBRuntimeException(
String.format("Fetch Table Device Schema failed because %s", e.getMessage()),
e.getErrorCode(),
e.isUserException());
}
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
break;
}
final Column[] columns = tsBlock.get().getValueColumns();
for (int i = 0; i < tsBlock.get().getPositionCount(); i++) {
final String[] nodes = new String[tagLength + 1];
final Map<String, Binary> attributeMap = new HashMap<>();
constructNodesArrayAndAttributeMap(
attributeMap, nodes, table, columnHeaderList, columns, tableInstance, i);

fetchedDeviceSchema.put(IDeviceID.Factory.DEFAULT_FACTORY.create(nodes), attributeMap);
}
}
}

schema.setResult(fetchedDeviceSchema);
fetchedDeviceSchema.forEach((key, value) -> cache.putAttributes(database, key, value));
schema.setResult(fetchedDeviceSchema);
fetchedDeviceSchema.forEach((key, value) -> cache.putAttributes(database, key, value));

return fetchedDeviceSchema;
} else {
throw new IoTDBRuntimeException(
String.format(
DataNodeQueryMessages.QUERY_EXECUTION_MISSING, executionResult.queryId.getId()),
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}

return fetchedDeviceSchema;
} catch (final Throwable throwable) {
t = throwable;
throw throwable;
Expand Down Expand Up @@ -234,7 +252,7 @@

// Used by show/count device and update device.
// Update / Delete device will not access cache
public boolean parseFilter4TraverseDevice(

Check warning on line 255 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 95 to 64, Complexity from 19 to 14, Nesting Level from 5 to 2, Number of Variables from 30 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5O9mTrCmNIQl9Zbphz&open=AZ5O9mTrCmNIQl9Zbphz&pullRequest=17749
final TsTable tableInstance,
final List<Expression> expressionList,
final AbstractTraverseDevice statement,
Expand Down Expand Up @@ -468,7 +486,7 @@
return IDeviceID.Factory.DEFAULT_FACTORY.create(deviceIdNodes);
}

private void fetchMissingDeviceSchemaForQuery(

Check warning on line 489 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 89 to 64, Complexity from 16 to 14, Nesting Level from 4 to 2, Number of Variables from 17 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5O9mTrCmNIQl9Zbph0&open=AZ5O9mTrCmNIQl9Zbph0&pullRequest=17749
final String database,
final TsTable tableInstance,
final List<String> attributeColumns,
Expand All @@ -486,6 +504,11 @@
}

try {
long start = System.currentTimeMillis();
long timeoutDuration =
mppQueryContext == null
? CONFIG.getQueryTimeoutThreshold()
: (mppQueryContext.getTimeOut() - (start - mppQueryContext.getStartTime()));
final ExecutionResult executionResult =
coordinator.executeForTableModel(
statement,
Expand All @@ -501,45 +524,57 @@
mppQueryContext == null ? "unknown" : mppQueryContext.getQueryId(),
mppQueryContext == null ? "unknown" : mppQueryContext.getSql()),
LocalExecutionPlanner.getInstance().metadata,
mppQueryContext.getTimeOut()
- (System.currentTimeMillis() - mppQueryContext.getStartTime()),
timeoutDuration,
false,
mppQueryContext.isDebug());
mppQueryContext != null && mppQueryContext.isDebug());

if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new IoTDBRuntimeException(
executionResult.status.getMessage(), executionResult.status.getCode());
}

final List<ColumnHeader> columnHeaderList =
coordinator.getQueryExecution(queryId).getDatasetHeader().getColumnHeaders();

while (coordinator.getQueryExecution(queryId).hasNextResult()) {
final Optional<TsBlock> tsBlock;
try {
tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
} catch (final IoTDBException e) {
t = e;
throw new IoTDBRuntimeException(
String.format("Fetch Table Device Schema failed because %s", e.getMessage()),
e.getErrorCode(),
e.isUserException());
}
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
break;
IQueryExecution queryExecution = coordinator.getQueryExecution(queryId);

if (queryExecution != null) {
final List<ColumnHeader> columnHeaderList =
queryExecution.getDatasetHeader().getColumnHeaders();

while (queryExecution.hasNextResult()) {
final Optional<TsBlock> tsBlock;
try {
tsBlock = queryExecution.getBatchResult();
} catch (final IoTDBException e) {
t = e;
throw new IoTDBRuntimeException(
String.format("Fetch Table Device Schema failed because %s", e.getMessage()),
e.getErrorCode(),
e.isUserException());
}
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
break;
}
if (!TreeViewSchema.isTreeViewTable(tableInstance)) {
constructTableResults(
tsBlock.get(),
columnHeaderList,
tableInstance,
statement,
mppQueryContext,
attributeColumns,
deviceEntryMap.get(database));
} else {
constructTreeResults(
tsBlock.get(), columnHeaderList, tableInstance, mppQueryContext, deviceEntryMap);
}
}
if (!TreeViewSchema.isTreeViewTable(tableInstance)) {
constructTableResults(
tsBlock.get(),
columnHeaderList,
tableInstance,
statement,
mppQueryContext,
attributeColumns,
deviceEntryMap.get(database));
} else {
if (System.currentTimeMillis() - start > timeoutDuration) {
throw new QueryTimeoutException(DataNodeQueryMessages.QUERY_TIMEOUT_IN_FETCH_SCHEMA);
} else {
constructTreeResults(
tsBlock.get(), columnHeaderList, tableInstance, mppQueryContext, deviceEntryMap);
throw new IoTDBRuntimeException(
String.format(
DataNodeQueryMessages.QUERY_EXECUTION_MISSING, executionResult.queryId.getId()),
TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
}
} catch (final Throwable throwable) {
Expand Down
Loading