Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions java/vortex-jni/src/main/java/dev/vortex/api/DataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,57 @@ public OptionalLong asOptional() {
}
}

/**
* Sum of the on-storage byte sizes of all files included in this data source along with the precision of that
* estimate. Mirrors the Rust {@code Option<Precision<u64>>} returned by {@code DataSource::byte_size}:
* {@link ByteSize.Unknown} when no estimate is available (for example when the filesystem listing did not return
* sizes), {@link ByteSize.Estimate} for an inexact hint (some files contribute extrapolated sizes), and
* {@link ByteSize.Exact} when every file has a known size.
*/
public ByteSize byteSize() {
long[] out = new long[2];
NativeDataSource.byteSize(pointer, out);
return switch ((int) out[1]) {
case 1 -> new ByteSize.Estimate(out[0]);
case 2 -> new ByteSize.Exact(out[0]);
default -> ByteSize.Unknown.INSTANCE;
};
}

/** Precision-aware byte size. See {@link #byteSize()}. */
public sealed interface ByteSize {
/** Returns the byte size as a long, or {@code OptionalLong.empty()} when unknown. */
OptionalLong asOptional();

/** Byte size is not known. */
final class Unknown implements ByteSize {
public static final Unknown INSTANCE = new Unknown();

private Unknown() {}

@Override
public OptionalLong asOptional() {
return OptionalLong.empty();
}
}

/** Estimated byte size; the actual value may differ. */
record Estimate(long value) implements ByteSize {
@Override
public OptionalLong asOptional() {
return OptionalLong.of(value);
}
}

/** Exact byte size. */
record Exact(long value) implements ByteSize {
@Override
public OptionalLong asOptional() {
return OptionalLong.of(value);
}
}
}

/** Submit a scan. */
public Scan scan(ScanOptions options) {
Objects.requireNonNull(options, "options");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ private NativeDataSource() {}
* {@code 1=estimate}, {@code 2=exact}.
*/
public static native void rowCount(long pointer, long[] out);

/**
* Populate {@code out} with {@code [bytes, precision]}, the sum of on-storage file sizes for the data source.
* Precision is one of {@code 0=unknown}, {@code 1=estimate}, {@code 2=exact}.
*/
public static native void byteSize(long pointer, long[] out);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
import dev.vortex.api.Expression;
import dev.vortex.api.Expression.BinaryOp;
import dev.vortex.api.Expression.TimeUnit;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.filter.AlwaysFalse;
Expand Down Expand Up @@ -33,14 +40,6 @@
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.unsafe.types.UTF8String;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Translates {@link Predicate Spark V2 predicates} into Vortex {@link Expression}s for predicate pushdown.
*
Expand All @@ -50,16 +49,15 @@
*/
final class SparkPredicateToVortexExpression {

private SparkPredicateToVortexExpression() {
}
private SparkPredicateToVortexExpression() {}

/**
* Returns true if the given Spark predicate can be translated to a Vortex expression and every named reference
* resolves to a real field path under {@code dataColumnTypes}.
*
* <p>{@code dataColumnTypes} maps each pushable top-level column name to its top-level Spark {@link DataType};
* partition columns and columns the scan does not project should not appear in the map. For nested references
* (for example {@code info.email}) the validator walks the named reference part by part, descending into
* partition columns and columns the scan does not project should not appear in the map. For nested references (for
* example {@code info.email}) the validator walks the named reference part by part, descending into
* {@link StructType} fields so that {@code info} must be a struct that contains an {@code email} field.
*
* <p>This is the cheap check used in {@code SupportsPushDownV2Filters.pushPredicates} to decide which predicates
Expand All @@ -77,8 +75,7 @@ static boolean isPushable(Predicate predicate, Map<String, DataType> dataColumnT

/**
* Walks {@code parts} against {@code dataColumnTypes}, descending through {@link StructType} fields for
* dot-separated nested references. Returns true only when every part resolves to an actual field in the
* schema.
* dot-separated nested references. Returns true only when every part resolves to an actual field in the schema.
*/
private static boolean resolveFieldPath(String[] parts, Map<String, DataType> dataColumnTypes) {
if (parts.length == 0) {
Expand All @@ -102,7 +99,9 @@ private static boolean resolveFieldPath(String[] parts, Map<String, DataType> da
}

private static Optional<StructField> findField(StructType struct, String name) {
return Arrays.stream(struct.fields()).filter(structField -> structField.name().equals(name)).findFirst();
return Arrays.stream(struct.fields())
.filter(structField -> structField.name().equals(name))
.findFirst();
}

private static boolean isStructurallyPushable(Predicate predicate) {
Expand Down Expand Up @@ -135,7 +134,7 @@ private static boolean isStructurallyPushable(Predicate predicate) {
yield true;
}
case "STARTS_WITH", "ENDS_WITH", "CONTAINS" ->
children.length == 2 && isPushableFieldRef(children[0]) && isPushableStringLiteral(children[1]);
children.length == 2 && isPushableFieldRef(children[0]) && isPushableStringLiteral(children[1]);
// `BOOLEAN_EXPRESSION` wraps a bare boolean-valued child. We only handle the case
// where the child itself is a field reference (e.g. `WHERE bool_col`).
case "BOOLEAN_EXPRESSION" -> children.length == 1 && isPushableFieldRef(children[0]);
Expand Down Expand Up @@ -178,12 +177,12 @@ static Optional<Expression> convert(Predicate predicate) {
case "=", "<>", "!=", ">", ">=", "<", "<=" -> convertComparison(predicate.name(), children);
case "IS_NULL" -> children.length == 1 ? columnOf(children[0]).map(Expression::isNull) : Optional.empty();
case "IS_NOT_NULL" ->
children.length == 1 ? columnOf(children[0]).map(Expression::isNotNull) : Optional.empty();
children.length == 1 ? columnOf(children[0]).map(Expression::isNotNull) : Optional.empty();
case "IN" -> convertIn(children);
case "STARTS_WITH" ->
convertStringMatch(children, /* leadingWildcard= */ false, /* trailingWildcard= */ true);
convertStringMatch(children, /* leadingWildcard= */ false, /* trailingWildcard= */ true);
case "ENDS_WITH" ->
convertStringMatch(children, /* leadingWildcard= */ true, /* trailingWildcard= */ false);
convertStringMatch(children, /* leadingWildcard= */ true, /* trailingWildcard= */ false);
case "CONTAINS" -> convertStringMatch(children, /* leadingWildcard= */ true, /* trailingWildcard= */ true);
case "BOOLEAN_EXPRESSION" -> children.length == 1 ? columnOf(children[0]) : Optional.empty();
default -> Optional.empty();
Expand Down Expand Up @@ -327,9 +326,7 @@ private static boolean isFieldRefExpr(org.apache.spark.sql.connector.expressions
return expr instanceof NamedReference;
}

/**
* Returns the Vortex column expression for a Spark named reference, walking nested struct fields.
*/
/** Returns the Vortex column expression for a Spark named reference, walking nested struct fields. */
private static Optional<Expression> columnOf(org.apache.spark.sql.connector.expressions.Expression expr) {
if (!(expr instanceof NamedReference)) {
return Optional.empty();
Expand Down Expand Up @@ -501,9 +498,7 @@ private static Optional<Expression> convertLiteral(Object value, DataType dataTy
return Optional.empty();
}

/**
* Extract the unscaled integer value of a Spark decimal literal at the supplied {@code scale}.
*/
/** Extract the unscaled integer value of a Spark decimal literal at the supplied {@code scale}. */
private static BigInteger unscaledValueOf(Object value, int scale) {
BigDecimal decimal;
if (value instanceof Decimal) {
Expand Down
109 changes: 105 additions & 4 deletions java/vortex-spark/src/main/java/dev/vortex/spark/read/VortexScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,66 @@

package dev.vortex.spark.read;

import dev.vortex.api.DataSource;
import dev.vortex.api.Session;
import dev.vortex.jni.NativeFiles;
import dev.vortex.spark.VortexSparkSession;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.spark.sql.connector.catalog.CatalogV2Util;
import org.apache.spark.sql.connector.catalog.Column;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
import org.apache.spark.sql.connector.read.colstats.ColumnStatistics;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;

/** Spark V2 {@link Scan} over a table of Vortex files. */
public final class VortexScan implements Scan {
/**
* Spark V2 {@link Scan} over a table of Vortex files.
*
* <p>Implements {@link SupportsReportStatistics} to surface both the row count Vortex records in each file footer and a
* Spark scan-size estimate. The byte estimate starts from the on-storage file sizes collected by
* {@code MultiFileDataSource}, then follows Spark's file scan convention by applying the SQL file-compression factor
* and scaling by the pushed read schema's default size relative to the full table schema's default size. When the
* listing did not return a size for one or more files the file-byte total is extrapolated before Spark scaling is
* applied.
*/
public final class VortexScan implements Scan, SupportsReportStatistics {

private final List<String> paths;
private final List<Column> tableColumns;
private final List<Column> readColumns;
private final Map<String, String> formatOptions;
private final Predicate[] pushedPredicates;

private volatile Statistics cachedStatistics;

/**
* Creates a new VortexScan for the specified file paths and columns. The caller is responsible for passing
* immutable collections; the constructor does not copy.
*
* @param paths the list of Vortex file paths to scan
* @param tableColumns the full table columns before projection pushdown
* @param readColumns the list of columns to read from the files
* @param pushedPredicates predicates pushed down by Spark; {@code null} or empty means no pushdown
*/
public VortexScan(
List<String> paths,
List<Column> tableColumns,
List<Column> readColumns,
Map<String, String> formatOptions,
Predicate[] pushedPredicates) {
Predicate[] pushedPredicates,
Map<String, String> formatOptions) {
this.paths = paths;
this.tableColumns = tableColumns;
this.readColumns = readColumns;
this.formatOptions = formatOptions;
this.pushedPredicates = pushedPredicates == null ? new Predicate[0] : pushedPredicates.clone();
Expand Down Expand Up @@ -83,4 +111,77 @@ public Batch toBatch() {
public ColumnarSupportMode columnarSupportMode() {
return ColumnarSupportMode.SUPPORTED;
}

/**
* Returns statistics for this scan.
*
* <p>Opens the Vortex {@link DataSource} on first invocation and caches the result. The row count is taken from the
* data source (sum of file-footer row counts; extrapolated from the first opened file when other files are
* deferred). {@link Statistics#sizeInBytes()} is derived from the per-file sizes reported by the filesystem
* listing, then adjusted by Spark's compression factor and the ratio between the pushed read schema and the full
* table schema. When a listing did not return a size for some file the file-byte total is extrapolated. When no
* file size is known at all the value is left empty so Spark falls back to its default heuristic.
*
* @return statistics with row-count and Spark scan-size estimates
*/
@Override
public Statistics estimateStatistics() {
Statistics local = cachedStatistics;
if (local != null) {
return local;
}
synchronized (this) {
if (cachedStatistics == null) {
cachedStatistics = computeStatistics();
}
return cachedStatistics;
}
}

private Statistics computeStatistics() {
Session session = VortexSparkSession.get(formatOptions);
// Expand directory paths to concrete files the way VortexBatchExec does, so we use the
// same per-path resolution end-to-end.
List<String> resolvedPaths = paths.stream()
.flatMap(path -> path.endsWith(".vortex")
? Stream.of(path)
: NativeFiles.listFiles(session, path, formatOptions).stream())
.collect(Collectors.toList());

if (resolvedPaths.isEmpty()) {
return new VortexStatistics(OptionalLong.empty(), OptionalLong.empty());
}

DataSource source = DataSource.open(session, resolvedPaths, formatOptions);
return new VortexStatistics(
source.rowCount().asOptional(),
scaleSizeInBytes(source.byteSize().asOptional()));
}

private OptionalLong scaleSizeInBytes(OptionalLong fileBytes) {
if (fileBytes.isEmpty()) {
return OptionalLong.empty();
}

StructType tableSchema = CatalogV2Util.v2ColumnsToStructType(tableColumns.toArray(new Column[0]));
StructType readSchema = readSchema();
int tableDefaultSize = tableSchema.defaultSize();
if (tableDefaultSize <= 0) {
return fileBytes;
}

double scaled = SQLConf.get().fileCompressionFactor()
* fileBytes.getAsLong()
/ tableDefaultSize
* readSchema.defaultSize();
return OptionalLong.of((long) scaled);
}

private record VortexStatistics(OptionalLong numRows, OptionalLong sizeInBytes) implements Statistics {

@Override
public Map<NamedReference, ColumnStatistics> columnStats() {
return new HashMap<>();
}
}
}
Loading