diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java index aadb99add52d0..c1c26550f9da1 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java @@ -1422,5 +1422,10 @@ public final class DataNodeQueryMessages { public static final String CANT_CONNECT_TO_NODE_PREFIX = "can't connect to node "; public static final String REMOVE_AINODE_FAILED = "Remove AINode failed: "; + + public static final String QUERY_TIMEOUT_IN_FETCH_SCHEMA = "Query execution is time out while fetching schema"; + + public static final String QUERY_EXECUTION_MISSING = "Query execution %s is missing during fetching device schema"; + private DataNodeQueryMessages() {} } diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java index 506820c34de1a..44a707a36c7eb 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/StorageEngineMessages.java @@ -511,4 +511,8 @@ private StorageEngineMessages() {} public static final String STRING_NOT_LEGAL_REPAIR_LOG = "String '%s' is not a legal repair log"; public static final String WRONG_LOAD_COMMAND_S = "Wrong load command %s."; + + public static final String FAILED_TO_FIND_DATA_REGION = "Failed to create state machine for consensus group %s, because data region does not exist"; + + public static final String DATA_REGION_IS_NULL = "Data region is null"; } diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java index a6d0e50f8c0c7..3d2783c81b631 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java @@ -1421,5 +1421,11 @@ public final class DataNodeQueryMessages { public static final String CANT_CONNECT_TO_NODE_PREFIX = "无法连接到节点 "; public static final String REMOVE_AINODE_FAILED = "移除 AINode 失败:"; + + public static final String QUERY_TIMEOUT_IN_FETCH_SCHEMA = "查询在拉取元数据时,执行超时"; + + public static final String QUERY_EXECUTION_MISSING = "查询执行实体 %s 在拉取元数据期间丢失"; + + private DataNodeQueryMessages() {} } diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java index 281d21453f32b..69384eb33954e 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/StorageEngineMessages.java @@ -511,4 +511,8 @@ private StorageEngineMessages() {} public static final String STRING_NOT_LEGAL_REPAIR_LOG = "字符串 '%s' 不是合法的修复日志"; public static final String WRONG_LOAD_COMMAND_S = "错误的 load 命令 %s。"; + + public static final String FAILED_TO_FIND_DATA_REGION = "共识组 %s 底层状态机创建失败, 因为 DataRegion 没找到。"; + + public static final String DATA_REGION_IS_NULL = "Data region 是空"; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 8b3eb5ffd2fe4..e96b7495312bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -43,6 +43,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.statemachine.dataregion.DataRegionStateMachine; import org.apache.iotdb.db.consensus.statemachine.dataregion.IoTConsensusDataRegionStateMachine; +import org.apache.iotdb.db.i18n.StorageEngineMessages; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager; @@ -51,6 +52,8 @@ import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; @@ -60,6 +63,8 @@ */ public class DataRegionConsensusImpl { + private static final Logger LOGGER = LoggerFactory.getLogger(DataRegionConsensusImpl.class); + private DataRegionConsensusImpl() { // do nothing } @@ -113,6 +118,11 @@ private static void reinitializeStatics() { private static DataRegionStateMachine createDataRegionStateMachine(ConsensusGroupId gid) { DataRegion dataRegion = StorageEngine.getInstance().getDataRegion((DataRegionId) gid); + if (dataRegion == null) { + String errorMsg = String.format(StorageEngineMessages.FAILED_TO_FIND_DATA_REGION, gid); + LOGGER.error(errorMsg); + throw new IllegalArgumentException(errorMsg); + } if (ConsensusFactory.IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) { return new IoTConsensusDataRegionStateMachine(dataRegion); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java index ab1c4b7799842..5c2c5207db6cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.consensus.statemachine.BaseStateMachine; import org.apache.iotdb.db.i18n.DataNodeMiscMessages; +import org.apache.iotdb.db.i18n.StorageEngineMessages; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; @@ -248,6 +249,10 @@ protected TSStatus write(PlanNode planNode) { @Override public DataSet read(IConsensusRequest request) { + if (region == null) { + logger.error(StorageEngineMessages.DATA_REGION_IS_NULL); + return null; + } if (request instanceof GetConsensusReqReaderPlan) { return region.getWALNode().orElseThrow(UnsupportedOperationException::new); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java index ecabba0542f0e..b333f287a21f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java @@ -19,14 +19,19 @@ package org.apache.iotdb.db.exception.query; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; + +import static org.apache.iotdb.rpc.TSStatusCode.QUERY_TIMEOUT; + /** This class is used to throw run time exception when query is time out. */ -public class QueryTimeoutRuntimeException extends RuntimeException { +public class QueryTimeoutRuntimeException extends IoTDBRuntimeException { public static final String QUERY_TIMEOUT_EXCEPTION_MESSAGE = "Current query is time out, query start time is %d, ddl is %d, current time is %d, please check your statement or modify timeout parameter."; public QueryTimeoutRuntimeException(long startTime, long currentTime, long timeout) { super( - String.format( - QUERY_TIMEOUT_EXCEPTION_MESSAGE, startTime, startTime + timeout, currentTime)); + String.format(QUERY_TIMEOUT_EXCEPTION_MESSAGE, startTime, startTime + timeout, currentTime), + QUERY_TIMEOUT.getStatusCode(), + true); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java index 3014213e67cd6..e9b5da059558e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -22,6 +22,7 @@ import org.apache.iotdb.calc.exception.MemoryNotEnoughException; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.exception.QuerySchemaFetchFailedException; import org.apache.iotdb.commons.path.MeasurementPath; @@ -30,6 +31,7 @@ import org.apache.iotdb.commons.schema.template.Template; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.i18n.DataNodeQueryMessages; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree; @@ -294,8 +296,10 @@ private ClusterSchemaTree executeSchemaFetchQuery( } } } else { - throw new RuntimeException( - String.format("Fetch Schema failed, because queryExecution is null for %s", queryId)); + throw new IoTDBRuntimeException( + String.format( + DataNodeQueryMessages.QUERY_EXECUTION_MISSING, executionResult.queryId.getId()), + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); } result.setDatabases(databaseSet); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java index 9bf402f2b9943..1664c8dfe03b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.commons.exception.QueryTimeoutException; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.commons.schema.column.ColumnHeader; import org.apache.iotdb.commons.schema.filter.SchemaFilter; @@ -30,10 +31,14 @@ import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.i18n.DataNodeQueryMessages; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; +import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.schema.ConvertSchemaPredicateToFilterVisitor; import org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry; @@ -75,6 +80,8 @@ public class TableDeviceSchemaFetcher { + private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private final SqlParser relationSqlParser = new SqlParser(); private final Coordinator coordinator = Coordinator.getInstance(); @@ -143,43 +150,54 @@ Map> fetchMissingDeviceSchemaForDataInsertion( executionResult.status.getMessage(), executionResult.status.getCode()); } - final List columnHeaderList = - coordinator.getQueryExecution(queryId).getDatasetHeader().getColumnHeaders(); - final int tagLength = DataNodeTableCache.getInstance().getTable(database, table).getTagNum(); - final Map> fetchedDeviceSchema = new HashMap<>(); - - while (coordinator.getQueryExecution(queryId).hasNextResult()) { - final Optional tsBlock; - try { - tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); - } catch (final IoTDBException e) { - t = e; - throw AsyncSendPlanNodeHandler.needRetry(e) - ? new IoTDBRuntimeException( - e.getCause(), TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()) - : new IoTDBRuntimeException( - String.format("Fetch Table Device Schema failed because %s", e.getMessage()), - e.getErrorCode(), - e.isUserException()); - } - if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { - break; - } - final Column[] columns = tsBlock.get().getValueColumns(); - for (int i = 0; i < tsBlock.get().getPositionCount(); i++) { - final String[] nodes = new String[tagLength + 1]; - final Map attributeMap = new HashMap<>(); - constructNodesArrayAndAttributeMap( - attributeMap, nodes, table, columnHeaderList, columns, tableInstance, i); - - fetchedDeviceSchema.put(IDeviceID.Factory.DEFAULT_FACTORY.create(nodes), attributeMap); + IQueryExecution queryExecution = coordinator.getQueryExecution(queryId); + + if (queryExecution != null) { + final List columnHeaderList = + queryExecution.getDatasetHeader().getColumnHeaders(); + final int tagLength = + DataNodeTableCache.getInstance().getTable(database, table).getTagNum(); + final Map> fetchedDeviceSchema = new HashMap<>(); + + while (queryExecution.hasNextResult()) { + final Optional tsBlock; + try { + tsBlock = queryExecution.getBatchResult(); + } catch (final IoTDBException e) { + t = e; + throw AsyncSendPlanNodeHandler.needRetry(e) + ? new IoTDBRuntimeException( + e.getCause(), TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()) + : new IoTDBRuntimeException( + String.format("Fetch Table Device Schema failed because %s", e.getMessage()), + e.getErrorCode(), + e.isUserException()); + } + if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { + break; + } + final Column[] columns = tsBlock.get().getValueColumns(); + for (int i = 0; i < tsBlock.get().getPositionCount(); i++) { + final String[] nodes = new String[tagLength + 1]; + final Map attributeMap = new HashMap<>(); + constructNodesArrayAndAttributeMap( + attributeMap, nodes, table, columnHeaderList, columns, tableInstance, i); + + fetchedDeviceSchema.put(IDeviceID.Factory.DEFAULT_FACTORY.create(nodes), attributeMap); + } } - } - schema.setResult(fetchedDeviceSchema); - fetchedDeviceSchema.forEach((key, value) -> cache.putAttributes(database, key, value)); + schema.setResult(fetchedDeviceSchema); + fetchedDeviceSchema.forEach((key, value) -> cache.putAttributes(database, key, value)); + + return fetchedDeviceSchema; + } else { + throw new IoTDBRuntimeException( + String.format( + DataNodeQueryMessages.QUERY_EXECUTION_MISSING, executionResult.queryId.getId()), + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + } - return fetchedDeviceSchema; } catch (final Throwable throwable) { t = throwable; throw throwable; @@ -486,6 +504,11 @@ private void fetchMissingDeviceSchemaForQuery( } try { + long start = System.currentTimeMillis(); + long timeoutDuration = + mppQueryContext == null + ? CONFIG.getQueryTimeoutThreshold() + : (mppQueryContext.getTimeOut() - (start - mppQueryContext.getStartTime())); final ExecutionResult executionResult = coordinator.executeForTableModel( statement, @@ -501,45 +524,57 @@ private void fetchMissingDeviceSchemaForQuery( mppQueryContext == null ? "unknown" : mppQueryContext.getQueryId(), mppQueryContext == null ? "unknown" : mppQueryContext.getSql()), LocalExecutionPlanner.getInstance().metadata, - mppQueryContext.getTimeOut() - - (System.currentTimeMillis() - mppQueryContext.getStartTime()), + timeoutDuration, false, - mppQueryContext.isDebug()); + mppQueryContext != null && mppQueryContext.isDebug()); if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new IoTDBRuntimeException( executionResult.status.getMessage(), executionResult.status.getCode()); } - final List columnHeaderList = - coordinator.getQueryExecution(queryId).getDatasetHeader().getColumnHeaders(); - - while (coordinator.getQueryExecution(queryId).hasNextResult()) { - final Optional tsBlock; - try { - tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); - } catch (final IoTDBException e) { - t = e; - throw new IoTDBRuntimeException( - String.format("Fetch Table Device Schema failed because %s", e.getMessage()), - e.getErrorCode(), - e.isUserException()); - } - if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { - break; + IQueryExecution queryExecution = coordinator.getQueryExecution(queryId); + + if (queryExecution != null) { + final List columnHeaderList = + queryExecution.getDatasetHeader().getColumnHeaders(); + + while (queryExecution.hasNextResult()) { + final Optional tsBlock; + try { + tsBlock = queryExecution.getBatchResult(); + } catch (final IoTDBException e) { + t = e; + throw new IoTDBRuntimeException( + String.format("Fetch Table Device Schema failed because %s", e.getMessage()), + e.getErrorCode(), + e.isUserException()); + } + if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { + break; + } + if (!TreeViewSchema.isTreeViewTable(tableInstance)) { + constructTableResults( + tsBlock.get(), + columnHeaderList, + tableInstance, + statement, + mppQueryContext, + attributeColumns, + deviceEntryMap.get(database)); + } else { + constructTreeResults( + tsBlock.get(), columnHeaderList, tableInstance, mppQueryContext, deviceEntryMap); + } } - if (!TreeViewSchema.isTreeViewTable(tableInstance)) { - constructTableResults( - tsBlock.get(), - columnHeaderList, - tableInstance, - statement, - mppQueryContext, - attributeColumns, - deviceEntryMap.get(database)); + } else { + if (System.currentTimeMillis() - start > timeoutDuration) { + throw new QueryTimeoutException(DataNodeQueryMessages.QUERY_TIMEOUT_IN_FETCH_SCHEMA); } else { - constructTreeResults( - tsBlock.get(), columnHeaderList, tableInstance, mppQueryContext, deviceEntryMap); + throw new IoTDBRuntimeException( + String.format( + DataNodeQueryMessages.QUERY_EXECUTION_MISSING, executionResult.queryId.getId()), + TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); } } } catch (final Throwable throwable) {