Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2348,6 +2348,10 @@ drop_compact_storage_enabled: false
# write_consistency_levels_warned: []
# write_consistency_levels_disallowed: []
#
# Guardrail to warn or fail if no serial consistency level is provided for CAS operations. By default, this is turned off.
# warn_if_no_serial_consistency_level_provided_for_cas_enabled: false
# fail_if_no_serial_consistency_level_provided_for_cas_enabled: false
#
# Guardrail to warn or fail when writing partitions larger than threshold, expressed as 100MiB, 1GiB, etc.
# The guardrail is only checked when writing sstables (flush and compaction), and exceeding the fail threshold on that
# moment will only log an error message, without interrupting the operation.
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,8 @@ public static void setClientMode(boolean clientMode)
public volatile Set<ConsistencyLevel> read_consistency_levels_disallowed = Collections.emptySet();
public volatile Set<ConsistencyLevel> write_consistency_levels_warned = Collections.emptySet();
public volatile Set<ConsistencyLevel> write_consistency_levels_disallowed = Collections.emptySet();
public volatile boolean warn_if_no_serial_consistency_level_provided_for_cas_enabled = false;
public volatile boolean fail_if_no_serial_consistency_level_provided_for_cas_enabled = false;
public volatile boolean user_timestamps_enabled = true;
public volatile boolean alter_table_enabled = true;
public volatile boolean group_by_enabled = true;
Expand Down
28 changes: 28 additions & 0 deletions src/java/org/apache/cassandra/config/GuardrailsOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,34 @@ public void setWriteConsistencyLevelsDisallowed(Set<ConsistencyLevel> consistenc
x -> config.write_consistency_levels_disallowed = x);
}

@Override
public boolean getWarnIfNoSerialConsistencyLevelProvidedForCASEnabled()
{
return config.warn_if_no_serial_consistency_level_provided_for_cas_enabled;
}

public void setWarnIfNoSerialConsistencyLevelProvidedForCASEnabled(boolean enabled)
{
updatePropertyWithLogging("warn_if_no_serial_consistency_level_provided_for_cas_enabled",
enabled,
() -> config.warn_if_no_serial_consistency_level_provided_for_cas_enabled,
x -> config.warn_if_no_serial_consistency_level_provided_for_cas_enabled = x);
}

@Override
public boolean getFailIfNoSerialConsistencyLevelProvidedForCASEnabled()
{
return config.fail_if_no_serial_consistency_level_provided_for_cas_enabled;
}

public void setFailIfNoSerialConsistencyLevelProvidedForCASEnabled(boolean enabled)
{
updatePropertyWithLogging("fail_if_no_serial_consistency_level_provided_for_cas_enabled",
enabled,
() -> config.fail_if_no_serial_consistency_level_provided_for_cas_enabled,
x -> config.fail_if_no_serial_consistency_level_provided_for_cas_enabled = x);
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getPartitionSizeWarnThreshold()
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/cql3/BatchQueryOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public ConsistencyLevel getSerialConsistency()
return wrapped.getSerialConsistency();
}

public boolean serialConsistencyNotProvided()
{
return wrapped.serialConsistencyNotProvided();
}

public List<Object> getQueryOrIdList()
{
return queryOrIdList;
Expand Down
26 changes: 18 additions & 8 deletions src/java/org/apache/cassandra/cql3/QueryOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Map;

import javax.annotation.Nullable;

import com.google.common.collect.ImmutableList;

import org.apache.commons.lang3.builder.ToStringBuilder;
Expand Down Expand Up @@ -71,17 +73,20 @@ public abstract class QueryOptions

public static QueryOptions forInternalCalls(ConsistencyLevel consistency, List<ByteBuffer> values)
{
return new DefaultQueryOptions(consistency, values, ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, false, SpecificOptions.DEFAULT, ProtocolVersion.V3);
SpecificOptions specificOptions = new SpecificOptions(-1, null, ConsistencyLevel.SERIAL, Long.MIN_VALUE, null, UNSET_NOWINSEC);
return new DefaultQueryOptions(consistency, values, ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, false, specificOptions, ProtocolVersion.V3);
}

public static QueryOptions forInternalCallsWithNowInSec(long nowInSec, ConsistencyLevel consistency, List<ByteBuffer> values)
{
return new DefaultQueryOptions(consistency, values, ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, false, SpecificOptions.DEFAULT.withNowInSec(nowInSec), ProtocolVersion.CURRENT);
SpecificOptions specificOptions = new SpecificOptions(-1, null, ConsistencyLevel.SERIAL, Long.MIN_VALUE, null, nowInSec);
return new DefaultQueryOptions(consistency, values, ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, false, specificOptions, ProtocolVersion.CURRENT);
}

public static QueryOptions forInternalCalls(List<ByteBuffer> values)
{
return new DefaultQueryOptions(ConsistencyLevel.ONE, values, ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, false, SpecificOptions.DEFAULT, ProtocolVersion.V3);
SpecificOptions specificOptions = new SpecificOptions(-1, null, ConsistencyLevel.SERIAL, Long.MIN_VALUE, null, UNSET_NOWINSEC);
return new DefaultQueryOptions(ConsistencyLevel.ONE, values, ByteArrayUtil.EMPTY_ARRAY_OF_BYTE_ARRAYS, false, specificOptions, ProtocolVersion.V3);
}

public static QueryOptions forProtocolVersion(ProtocolVersion protocolVersion)
Expand Down Expand Up @@ -249,7 +254,12 @@ public PagingState getPagingState()
/** Serial consistency for conditional updates. */
public ConsistencyLevel getSerialConsistency()
{
return getSpecificOptions().serialConsistency;
return getSpecificOptions().serialConsistency == null ? ConsistencyLevel.SERIAL : getSpecificOptions().serialConsistency;
}

public boolean serialConsistencyNotProvided()
{
return getSpecificOptions().serialConsistency == null;
}

public long getTimestamp(QueryState state)
Expand Down Expand Up @@ -642,14 +652,14 @@ static class SpecificOptions

private SpecificOptions(int pageSize,
PagingState state,
ConsistencyLevel serialConsistency,
@Nullable ConsistencyLevel serialConsistency,
long timestamp,
String keyspace,
long nowInSeconds)
{
this.pageSize = pageSize;
this.state = state;
this.serialConsistency = serialConsistency == null ? ConsistencyLevel.SERIAL : serialConsistency;
this.serialConsistency = serialConsistency;
this.timestamp = timestamp;
this.keyspace = keyspace;
this.nowInSeconds = nowInSeconds;
Expand Down Expand Up @@ -743,7 +753,7 @@ public QueryOptions decode(ByteBuf body, ProtocolVersion version)
{
int pageSize = Flag.contains(flags, Flag.PAGE_SIZE) ? body.readInt() : -1;
PagingState pagingState = Flag.contains(flags, Flag.PAGING_STATE) ? PagingState.deserialize(CBUtil.readValueNoCopy(body), version) : null;
ConsistencyLevel serialConsistency = Flag.contains(flags, Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) : ConsistencyLevel.SERIAL;
ConsistencyLevel serialConsistency = Flag.contains(flags, Flag.SERIAL_CONSISTENCY) ? CBUtil.readConsistencyLevel(body) : null;
long timestamp = Long.MIN_VALUE;
if (Flag.contains(flags, Flag.TIMESTAMP))
{
Expand Down Expand Up @@ -830,7 +840,7 @@ private int gatherFlags(QueryOptions options, ProtocolVersion version)
flags = Flag.add(flags, Flag.PAGE_SIZE);
if (options.getPagingState() != null)
flags = Flag.add(flags, Flag.PAGING_STATE);
if (options.getSerialConsistency() != ConsistencyLevel.SERIAL)
if (options.getSpecificOptions().serialConsistency != null)
flags = Flag.add(flags, Flag.SERIAL_CONSISTENCY);
if (options.getSpecificOptions().timestamp != Long.MIN_VALUE)
flags = Flag.add(flags, Flag.TIMESTAMP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,8 @@ private void updatePartitionsPerBatchMetrics(int updatedPartitions)

private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state, Dispatcher.RequestTime requestTime)
{
Guardrails.serialConsistency.guard(options.serialConsistencyNotProvided(), state.getClientState());

Pair<CQL3CasRequest, Set<ColumnMetadata>> p = makeCasRequest(options, state, requestTime);
CQL3CasRequest casRequest = p.left;
Set<ColumnMetadata> columnsWithConditions = p.right;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,8 @@ private ResultMessage executeWithoutCondition(QueryState queryState, QueryOption

private ResultMessage executeWithCondition(QueryState queryState, QueryOptions options, Dispatcher.RequestTime requestTime)
{
Guardrails.serialConsistency.guard(options.serialConsistencyNotProvided(), queryState.getClientState());

CQL3CasRequest request = makeCasRequest(queryState, options, requestTime);

try (RowIterator result = StorageProxy.cas(keyspace(),
Expand Down
34 changes: 34 additions & 0 deletions src/java/org/apache/cassandra/db/guardrails/Guardrails.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,16 @@ public final class Guardrails implements GuardrailsMBean
state -> CONFIG_PROVIDER.getOrCreate(state).getWriteConsistencyLevelsDisallowed(),
"write consistency levels");

/**
* Guardrail on if a client specifies a serial consistency level for CAS operations.
*/
public static final Predicates<Boolean> serialConsistency =
new Predicates<>("warn_when_using_default_serial_consistency_on_cas",
null,
state -> x -> x && CONFIG_PROVIDER.getOrCreate(state).getWarnIfNoSerialConsistencyLevelProvidedForCASEnabled(),
state -> x -> x && CONFIG_PROVIDER.getOrCreate(state).getFailIfNoSerialConsistencyLevelProvidedForCASEnabled(),
(isWarning, value) -> "Query did not provide a serial consistency level.");

/**
* Guardrail on the size of a partition.
*/
Expand Down Expand Up @@ -1448,6 +1458,30 @@ public void setWriteConsistencyLevelsDisallowedCSV(String consistencyLevels)
DEFAULT_CONFIG.setWriteConsistencyLevelsDisallowed(fromCSV(consistencyLevels, ConsistencyLevel::fromString));
}

@Override
public boolean getWarnIfNoSerialConsistencyLevelProvidedForCASEnabled()
{
return DEFAULT_CONFIG.getWarnIfNoSerialConsistencyLevelProvidedForCASEnabled();
}

@Override
public void setWarnIfNoSerialConsistencyLevelProvidedForCASEnabled(boolean enabled)
{
DEFAULT_CONFIG.setWarnIfNoSerialConsistencyLevelProvidedForCASEnabled(enabled);
}

@Override
public boolean getFailIfNoSerialConsistencyLevelProvidedForCASEnabled()
{
return DEFAULT_CONFIG.getFailIfNoSerialConsistencyLevelProvidedForCASEnabled();
}

@Override
public void setFailIfNoSerialConsistencyLevelProvidedForCASEnabled(boolean enabled)
{
DEFAULT_CONFIG.setFailIfNoSerialConsistencyLevelProvidedForCASEnabled(enabled);
}

@Override
public int getFieldsPerUDTWarnThreshold()
{
Expand Down
12 changes: 12 additions & 0 deletions src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,18 @@ public interface GuardrailsConfig
*/
Set<ConsistencyLevel> getWriteConsistencyLevelsDisallowed();

/**
* @return {@code true} warn if no serial consistency level is provided for CAS operations.
* {@code false} does not warn when no serial consistency is used on CAS operations.
*/
boolean getWarnIfNoSerialConsistencyLevelProvidedForCASEnabled();

/**
* @return {@code true} fail if no serial consistency level is provided for CAS operations.
* {@code false} does not fail when no serial consistency is used on CAS operations.
*/
boolean getFailIfNoSerialConsistencyLevelProvidedForCASEnabled();

/**
* @return The threshold to warn when writing partitions larger than threshold.
*/
Expand Down
22 changes: 22 additions & 0 deletions src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,28 @@ public interface GuardrailsMBean
*/
void setWriteConsistencyLevelsDisallowedCSV(String consistencyLevels);

/**
* @return {@code true} warn if no serial consistency level is provided for CAS operations.
* {@code false} does not warn when no serial consistency is used on CAS operations.
*/
boolean getWarnIfNoSerialConsistencyLevelProvidedForCASEnabled();

/**
* @param enabled {@code true} if when no serial consistency is provided a client warning is produced.
*/
void setWarnIfNoSerialConsistencyLevelProvidedForCASEnabled(boolean enabled);

/**
* @return {@code true} fail if no serial consistency level is provided for CAS operations.
* {@code false} does not fail when no serial consistency is used on CAS operations.
*/
boolean getFailIfNoSerialConsistencyLevelProvidedForCASEnabled();

/**
* @param enabled {@code true} if when no serial consistency is provided the query fails.
*/
void setFailIfNoSerialConsistencyLevelProvidedForCASEnabled(boolean enabled);

/**
* @return The threshold to warn when encountering partitions larger than threshold, as a string formatted as in,
* for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}. A {@code null} value means disabled.
Expand Down
Loading