Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public enum Parameter {
COLLECTION_TITLE("collection_title"),
FULL_METADATA_LINK("full_metadata_link"),
SUGGESTED_CITATION("suggested_citation"),
KEY("key")
;
KEY("key"),
OUTPUT_FORMAT("output_format");

private final String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.springframework.http.MediaType;

import java.io.Serializable;
import java.math.BigDecimal;
Expand All @@ -28,6 +29,43 @@ public static PropertyName fromString(String input) {
}
}

@Getter
public enum GeoServerOutputFormat {
GML2("GML2", "application/gml+xml", "gml"),
GML3("GML3", "application/gml+xml", "gml"),
GML32("gml32", "application/gml+xml", "gml"),
SHAPE_ZIP("shape-zip", "application/zip", "zip"), // also accepted as "SHAPE-ZIP"
CSV("text/csv", "text/csv", "csv"),
JSON(MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_JSON_VALUE, "json"), // also "json" for backward compatibility
GEOJSON("application/geo+json", "application/geo+json", "geojson"),
KML("KML", "application/vnd.google-earth.kml+xml", "kml"),
UNKNOWN("unknown", "application/octet-stream", "bin");

private final String value;
private final String mediaType;
private final String fileExtension;

GeoServerOutputFormat(String value, String mediaType, String fileExtension) {
this.value = value;
this.mediaType = mediaType;
this.fileExtension = fileExtension;
}

public static GeoServerOutputFormat fromString(String s) {
for(GeoServerOutputFormat f : GeoServerOutputFormat.values()) {
if(f.value.equalsIgnoreCase(s)) {
return f;
}
}
return UNKNOWN; // or throw custom exception
}

@Override
public String toString() {
return value;
}
}

@Schema(description = "Property to be return")
private List<PropertyName> properties;

Expand All @@ -52,6 +90,9 @@ public static PropertyName fromString(String input) {
@Schema(description = "Y")
private BigDecimal y;

@Schema(description = "Data output format")
private GeoServerOutputFormat outputFormat;

@Schema(description = "Wave buoy name")
private String waveBuoy;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import au.org.aodn.ogcapi.server.core.util.GeometryUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Service;
Expand All @@ -27,23 +28,25 @@ public class DownloadWfsDataService {
private final WfsServer wfsServer;
private final RestTemplate restTemplate;
private final HttpEntity<?> pretendUserEntity;
private final int chunkSize;

public DownloadWfsDataService(
WmsServer wmsServer,
WfsServer wfsServer,
RestTemplate restTemplate,
@Qualifier("pretendUserEntity") HttpEntity<?> pretendUserEntity
@Qualifier("pretendUserEntity") HttpEntity<?> pretendUserEntity,
@Value("${app.sse.chunkSize:16384}") int chunkSize
) {
this.wmsServer = wmsServer;
this.wfsServer = wfsServer;
this.restTemplate = restTemplate;
this.pretendUserEntity = pretendUserEntity;
this.chunkSize = chunkSize;
}

/**
* Build CQL filter for temporal and spatial constraints
*/
private String buildCqlFilter(String startDate, String endDate, Object multiPolygon, WfsFields wfsFieldModel) {
protected String buildCqlFilter(String startDate, String endDate, Object multiPolygon, WfsFields wfsFieldModel) {
StringBuilder cqlFilter = new StringBuilder();

if (wfsFieldModel == null || wfsFieldModel.getFields() == null) {
Expand All @@ -52,23 +55,23 @@ private String buildCqlFilter(String startDate, String endDate, Object multiPoly

List<WfsField> fields = wfsFieldModel.getFields();

// Find temporal field
Optional<WfsField> temporalField = fields.stream()
// Possible to have multiple days, better to consider all
List<WfsField> temporalField = fields.stream()
.filter(field -> "dateTime".equals(field.getType()) || "date".equals(field.getType()))
.findFirst();
.toList();

// Add temporal filter only if both dates are specified
if (temporalField.isPresent() && startDate != null && !startDate.isEmpty() && endDate != null && !endDate.isEmpty()) {
String fieldName = temporalField.get().getName();
cqlFilter.append(fieldName)
.append(" DURING ")
.append(startDate).append("T00:00:00Z/")
.append(endDate).append("T23:59:59Z");
if (!temporalField.isEmpty() && startDate != null && !startDate.isEmpty() && endDate != null && !endDate.isEmpty()) {
List<String> cqls = new ArrayList<>();
temporalField.forEach(temp ->
cqls.add(String.format("(%s DURING %sT00:00:00Z/%sT23:59:59Z)", temp.getName(), startDate, endDate))
);
cqlFilter.append("(").append(String.join(" OR ", cqls)).append(")");
}

// Find geometry field
Optional<WfsField> geometryField = fields.stream()
.filter(field -> "geometrypropertytype".equals(field.getType()))
.filter(field -> "geometrypropertytype".equalsIgnoreCase(field.getType()))
.findFirst();

// Add spatial filter
Expand Down Expand Up @@ -101,7 +104,8 @@ public String prepareWfsRequestUrl(
String endDate,
Object multiPolygon,
List<String> fields,
String layerName) {
String layerName,
String outputFormat) {

DescribeLayerResponse describeLayerResponse = wmsServer.describeLayer(uuid, FeatureRequest.builder().layerName(layerName).build());

Expand All @@ -114,15 +118,27 @@ public String prepareWfsRequestUrl(
wfsServerUrl = describeLayerResponse.getLayerDescription().getWfs();
wfsTypeName = describeLayerResponse.getLayerDescription().getQuery().getTypeName();

wfsFieldModel = wfsServer.getDownloadableFields(uuid, WfsServer.WfsFeatureRequest.builder().layerName(wfsTypeName).server(wfsServerUrl).build());
wfsFieldModel = wfsServer.getDownloadableFields(
uuid,
WfsServer.WfsFeatureRequest.builder()
.layerName(wfsTypeName)
.server(wfsServerUrl)
.build()
);
log.info("WFSFieldModel by describeLayer: {}", wfsFieldModel);
} else {
Optional<String> featureServerUrl = wfsServer.getFeatureServerUrlByTitle(uuid, layerName);

if (featureServerUrl.isPresent()) {
wfsServerUrl = featureServerUrl.get();
wfsTypeName = layerName;
wfsFieldModel = wfsServer.getDownloadableFields(uuid, WfsServer.WfsFeatureRequest.builder().layerName(wfsTypeName).server(wfsServerUrl).build());
wfsFieldModel = wfsServer.getDownloadableFields(
uuid,
WfsServer.WfsFeatureRequest.builder()
.layerName(wfsTypeName)
.server(wfsServerUrl)
.build()
);
log.info("WFSFieldModel by wfs typename: {}", wfsFieldModel);
} else {
throw new IllegalArgumentException("No WFS server URL found for the given UUID and layer name");
Expand All @@ -137,7 +153,12 @@ public String prepareWfsRequestUrl(
String cqlFilter = buildCqlFilter(validStartDate, validEndDate, multiPolygon, wfsFieldModel);

// Build final WFS request URL
String wfsRequestUrl = wfsServer.createWfsRequestUrl(wfsServerUrl, wfsTypeName, fields, cqlFilter, null);
String wfsRequestUrl = wfsServer.createWfsRequestUrl(
wfsServerUrl,
wfsTypeName,
fields,
cqlFilter,
outputFormat);

log.info("Prepared WFS request URL: {}", wfsRequestUrl);
return wfsRequestUrl;
Expand All @@ -149,6 +170,7 @@ public void executeWfsRequestWithSse(
String wfsRequestUrl,
String uuid,
String layerName,
String outputFormat,
SseEmitter emitter,
AtomicBoolean wfsServerResponded) {
restTemplate.execute(
Expand All @@ -175,59 +197,35 @@ public void executeWfsRequestWithSse(
int bytesRead;
long totalBytes = 0;
int chunkNumber = 0;
long lastProgressTime = System.currentTimeMillis();
ByteArrayOutputStream chunkBuffer = new ByteArrayOutputStream();

while ((bytesRead = inputStream.read(buffer)) != -1) {
chunkBuffer.write(buffer, 0, bytesRead);
totalBytes += bytesRead;

long currentTime = System.currentTimeMillis();

// Send chunk when buffer is full OR every 2 seconds
if (chunkBuffer.size() >= 16384 ||
(currentTime - lastProgressTime >= 2000)) {

// Send when buffer >= 16KB **and** size divisible by 3 (for clean Base64)
while (chunkBuffer.size() >= chunkSize && chunkBuffer.size() % 3 == 0) {
byte[] chunkBytes = chunkBuffer.toByteArray();

// Ensure Base64 alignment
// Base64 works in 3-byte groups, so chunk size should be divisible by 3
int alignedSize = (chunkBytes.length / 3) * 3;

if (alignedSize > 0) {
// Send the aligned portion
byte[] alignedChunk = Arrays.copyOf(chunkBytes, alignedSize);
String encodedData = Base64.getEncoder().encodeToString(alignedChunk);

emitter.send(SseEmitter.event()
.name("file-chunk")
.data(Map.of(
"chunkNumber", ++chunkNumber,
"data", encodedData,
"chunkSize", alignedChunk.length,
"totalBytes", totalBytes,
"timestamp", currentTime
))
.id(String.valueOf(chunkNumber)));

// Keep the remaining bytes for next chunk
if (alignedSize < chunkBytes.length) {
byte[] remainder = Arrays.copyOfRange(chunkBytes, alignedSize, chunkBytes.length);
chunkBuffer.reset();
chunkBuffer.write(remainder);
} else {
chunkBuffer.reset();
}

lastProgressTime = currentTime;
}
String encodedData = Base64.getEncoder().encodeToString(chunkBytes);

emitter.send(SseEmitter.event()
.name("file-chunk")
.data(Map.of(
"chunkNumber", ++chunkNumber,
"data", encodedData,
"chunkSize", chunkBytes.length,
"totalBytes", totalBytes,
"timestamp", System.currentTimeMillis()
))
.id(String.valueOf(chunkNumber)));

chunkBuffer.reset();
}
}

// Send final chunk if any remains
// Final chunk (may not be %3==0, but client Base64 decoder usually handles it)
if (chunkBuffer.size() > 0) {
String encodedData = Base64.getEncoder()
.encodeToString(chunkBuffer.toByteArray());
String encodedData = Base64.getEncoder().encodeToString(chunkBuffer.toByteArray());
emitter.send(SseEmitter.event()
.name("file-chunk")
.data(Map.of(
Expand All @@ -246,7 +244,8 @@ public void executeWfsRequestWithSse(
"totalBytes", totalBytes,
"totalChunks", chunkNumber,
"message", "WFS data download completed successfully",
"filename", layerName + "_" + uuid + ".csv"
"media-type", FeatureRequest.GeoServerOutputFormat.fromString(outputFormat).getMediaType(),
"filename", String.format("%s_%s.%s", layerName, uuid, FeatureRequest.GeoServerOutputFormat.fromString(outputFormat).getFileExtension())
)));

// Close SSE connection with completion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,25 +144,30 @@ public ResponseEntity<?> getWfsFieldValue(String collectionId, FeatureRequest re

String result = wfsServer.getFieldValues(collectionId, wfsFeatureRequest, new ParameterizedTypeReference<>() {});

FeatureJSON json = new FeatureJSON();
try {
@SuppressWarnings("unchecked")
FeatureCollection<SimpleFeatureType, SimpleFeature> collection = json.readFeatureCollection(new StringReader(result));
try(FeatureIterator<SimpleFeature> i = collection.features()) {
Map<String, List<Object>> results = new HashMap<>();
while(i.hasNext()) {
SimpleFeature s = i.next();
s.getProperties()
.forEach(property -> {
results.computeIfAbsent(property.getName().toString(), k -> new ArrayList<>());
results.get(property.getName().toString()).add(s.getAttribute(property.getName()));
});
if(result != null) {
FeatureJSON json = new FeatureJSON();
try {
@SuppressWarnings("unchecked")
FeatureCollection<SimpleFeatureType, SimpleFeature> collection = json.readFeatureCollection(new StringReader(result));
try (FeatureIterator<SimpleFeature> i = collection.features()) {
Map<String, List<Object>> results = new HashMap<>();
while (i.hasNext()) {
SimpleFeature s = i.next();
s.getProperties()
.forEach(property -> {
results.computeIfAbsent(property.getName().toString(), k -> new ArrayList<>());
results.get(property.getName().toString()).add(s.getAttribute(property.getName()));
});
}
return ResponseEntity.ok().body(results);
}
return ResponseEntity.ok().body(results);
} catch (IOException e) {
return ResponseEntity.badRequest().body(e.getMessage());
}
}
catch (IOException e) {
return ResponseEntity.badRequest().body(e.getMessage());
else {
// Field not found
return ResponseEntity.notFound().build();
}
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import au.org.aodn.ogcapi.server.core.model.enumeration.DatasetDownloadEnums;
import au.org.aodn.ogcapi.server.core.model.enumeration.InlineResponseKeyEnum;
import au.org.aodn.ogcapi.server.core.model.enumeration.ProcessIdEnum;
import au.org.aodn.ogcapi.server.core.model.ogc.FeatureRequest;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -125,14 +126,19 @@ public SseEmitter downloadWfsSse(
String startDate = body.getInputs().get(DatasetDownloadEnums.Parameter.START_DATE.getValue()).toString();
String endDate = body.getInputs().get(DatasetDownloadEnums.Parameter.END_DATE.getValue()).toString();
String layerName = body.getInputs().get(DatasetDownloadEnums.Parameter.LAYER_NAME.getValue()).toString();
String outputFormat = body.getInputs().get(DatasetDownloadEnums.Parameter.OUTPUT_FORMAT.getValue()).toString();
var multiPolygon = body.getInputs().get(DatasetDownloadEnums.Parameter.MULTI_POLYGON.getValue());

List<String> fields = body.getInputs().get(DatasetDownloadEnums.Parameter.FIELDS.getValue()) instanceof List<?> list
? list.stream().map(String::valueOf).toList()
: null;

if(FeatureRequest.GeoServerOutputFormat.fromString(outputFormat) == FeatureRequest.GeoServerOutputFormat.UNKNOWN) {
throw new IllegalArgumentException(String.format("Output format %s not supported", outputFormat));
}

return restServices.downloadWfsDataWithSse(
uuid, startDate, endDate, multiPolygon, fields, layerName, emitter
uuid, startDate, endDate, multiPolygon, fields, layerName, outputFormat, emitter
);

} catch (Exception e) {
Expand Down
Loading
Loading