Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkTableCache;
import org.apache.iceberg.spark.source.SimpleRecord;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
Expand Down Expand Up @@ -88,32 +89,31 @@ public void testReadStageTableMeta() throws Exception {

Table table = Spark3Util.loadIcebergTable(spark, tableName);
table.refresh();
String tableLocation = table.location();

try (CloseableIterable<ScanTask> tasks = table.newBatchScan().planFiles()) {
String fileSetID = UUID.randomUUID().toString();
SparkTableCache.get().add(fileSetID, table);
stageTask(table, fileSetID, tasks);
Dataset<Row> scanDF2 =
spark
.read()
.format("iceberg")
.option(SparkReadOptions.FILE_OPEN_COST, "0")
.option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.load(tableLocation);
.load(fileSetID);

assertThat(scanDF2.columns()).hasSize(2);
}

try (CloseableIterable<ScanTask> tasks = table.newBatchScan().planFiles()) {
String fileSetID = UUID.randomUUID().toString();
SparkTableCache.get().add(fileSetID, table);
stageTask(table, fileSetID, tasks);
Dataset<Row> scanDF =
spark
.read()
.format("iceberg")
.option(SparkReadOptions.FILE_OPEN_COST, "0")
.option(SparkReadOptions.SCAN_TASK_SET_ID, fileSetID)
.load(tableLocation)
.load(fileSetID)
.select("*", "_pos");

List<Row> rows = scanDF.collectAsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,10 @@ public void testBinPackTableWithSpecialChars() {
List<Object[]> actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);

assertThat(SparkTableCache.get().size()).as("Table cache must be empty").isZero();
Table table = validationCatalog.loadTable(identifier);
assertThat(SparkTableCache.get().tables())
.as("Table cache must not contain the test table")
.noneMatch(cachedTable -> cachedTable.uuid().equals(table.uuid()));
}

@TestTemplate
Expand Down Expand Up @@ -870,7 +873,10 @@ public void testSortTableWithSpecialChars() {
List<Object[]> actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);

assertThat(SparkTableCache.get().size()).as("Table cache must be empty").isZero();
Table table = validationCatalog.loadTable(identifier);
assertThat(SparkTableCache.get().tables())
.as("Table cache must not contain the test table")
.noneMatch(cachedTable -> cachedTable.uuid().equals(table.uuid()));
}

@TestTemplate
Expand Down Expand Up @@ -910,7 +916,10 @@ public void testZOrderTableWithSpecialChars() {
List<Object[]> actualRecords = currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);

assertThat(SparkTableCache.get().size()).as("Table cache must be empty").isZero();
Table table = validationCatalog.loadTable(identifier);
assertThat(SparkTableCache.get().tables())
.as("Table cache must not contain the test table")
.noneMatch(cachedTable -> cachedTable.uuid().equals(table.uuid()));
}

@TestTemplate
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,6 @@ public String tag() {
return confParser.stringConf().option(SparkReadOptions.TAG).parseOptional();
}

public String scanTaskSetId() {
return confParser.stringConf().option(SparkReadOptions.SCAN_TASK_SET_ID).parseOptional();
}

public boolean streamingSkipDeleteSnapshots() {
return confParser
.booleanConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ private SparkReadOptions() {}
// Overrides the table's read.parquet.vectorization.batch-size
public static final String VECTORIZATION_BATCH_SIZE = "batch-size";

// Set ID that is used to fetch scan tasks
public static final String SCAN_TASK_SET_ID = "scan-task-set-id";

// skip snapshots of type delete while reading stream out of iceberg table
public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = "streaming-skip-delete-snapshots";
public static final boolean STREAMING_SKIP_DELETE_SNAPSHOTS_DEFAULT = false;
Expand Down
Loading