-
Notifications
You must be signed in to change notification settings - Fork 3.8k
CASSANDRA-17258: Add write threshold warnings for large partitions and tombstones #4556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
bc5dc05
1495b84
317174a
2b8b958
0cc38ca
865a849
ed9cd52
7052758
c6951fb
66c4f9c
0763316
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.cassandra.db; | ||
|
|
||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import org.apache.cassandra.config.DataStorageSpec; | ||
| import org.apache.cassandra.config.DatabaseDescriptor; | ||
| import org.apache.cassandra.db.partitions.PartitionUpdate; | ||
| import org.apache.cassandra.net.ParamType; | ||
| import org.apache.cassandra.schema.Schema; | ||
| import org.apache.cassandra.schema.TableId; | ||
| import org.apache.cassandra.schema.TableMetadata; | ||
| import org.apache.cassandra.utils.NoSpamLogger; | ||
|
|
||
| /** | ||
| * Utility class for checking write threshold warnings on replicas. | ||
| * CASSANDRA-17258: paxos and accord do complex thread hand off and custom write logic which makes this patch complex, so was deferred | ||
| */ | ||
| public class WriteThresholds | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. paxos / accord were left out of this patch intentionally. Can you leave a comment in this class to denote that? something like |
||
| { | ||
| private static final Logger logger = LoggerFactory.getLogger(WriteThresholds.class); | ||
| private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES); | ||
|
|
||
| /** | ||
| * Check write thresholds for all partition updates in a mutation. | ||
| * This method iterates through all partition updates in the mutation. | ||
| * | ||
| * @param mutation the mutation containing one or more partition updates | ||
| */ | ||
| public static void checkWriteThresholds(Mutation mutation) | ||
| { | ||
| if (!DatabaseDescriptor.isDaemonInitialized() || !DatabaseDescriptor.getWriteThresholdsEnabled()) | ||
| return; | ||
|
|
||
| DataStorageSpec.LongBytesBound sizeWarnThreshold = DatabaseDescriptor.getWriteSizeWarnThreshold(); | ||
| int tombstoneWarnThreshold = DatabaseDescriptor.getWriteTombstoneWarnThreshold(); | ||
|
|
||
| if (sizeWarnThreshold == null && tombstoneWarnThreshold == -1) | ||
| return; | ||
|
|
||
| long sizeWarnBytes = sizeWarnThreshold != null ? sizeWarnThreshold.toBytes() : -1; | ||
|
|
||
| for (PartitionUpdate update : mutation.getPartitionUpdates()) | ||
| { | ||
| checkWriteThresholdsInternal(update, update.partitionKey(), sizeWarnBytes, tombstoneWarnThreshold); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Internal method to check write thresholds for a single partition update. | ||
| * This method looks up the partition in TopPartitionTracker and adds | ||
| * warning params to MessageParams if thresholds are exceeded. | ||
| * | ||
| * @param update the partition update being written | ||
| * @param key the partition key being written | ||
| * @param sizeWarnBytes size threshold in bytes, or -1 if disabled | ||
| * @param tombstoneWarnThreshold tombstone count threshold, or -1 if disabled | ||
| */ | ||
| private static void checkWriteThresholdsInternal(PartitionUpdate update, DecoratedKey key, | ||
| long sizeWarnBytes, int tombstoneWarnThreshold) | ||
| { | ||
| TableId tableId = update.metadata().id; | ||
| ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tableId); | ||
|
|
||
| if (cfs == null || cfs.topPartitions == null) | ||
| return; | ||
|
|
||
| long estimatedSize = cfs.topPartitions.topSizes().getEstimate(key); | ||
| long estimatedTombstones = cfs.topPartitions.topTombstones().getEstimate(key); | ||
| TableMetadata meta = update.metadata(); | ||
|
|
||
| if (sizeWarnBytes != -1 && estimatedSize > sizeWarnBytes) | ||
| { | ||
| Number currentValue = MessageParams.get(ParamType.WRITE_SIZE_WARN); | ||
| long currentLong = currentValue != null ? currentValue.longValue() : -1; | ||
|
|
||
| if (currentLong < estimatedSize) | ||
| { | ||
| MessageParams.add(ParamType.WRITE_SIZE_WARN, estimatedSize); | ||
| noSpamLogger.warn("Write to {} partition {} triggered size warning; " + | ||
| "estimated size is {} bytes, threshold is {} bytes (see write_size_warn_threshold)", | ||
| meta, meta.partitionKeyType.toCQLString(key.getKey()), estimatedSize, sizeWarnBytes); | ||
| } | ||
| } | ||
|
|
||
| if (tombstoneWarnThreshold != -1 && estimatedTombstones > tombstoneWarnThreshold) | ||
| { | ||
| Number currentValue = MessageParams.get(ParamType.WRITE_TOMBSTONE_WARN); | ||
| long currentLong = currentValue != null ? currentValue.longValue() : -1; | ||
|
|
||
| if (currentLong < estimatedTombstones) | ||
| { | ||
| MessageParams.add(ParamType.WRITE_TOMBSTONE_WARN, (int) estimatedTombstones); | ||
| noSpamLogger.warn("Write to {} partition {} triggered tombstone warning; " + | ||
| "estimated tombstone count is {}, threshold is {} (see write_tombstone_warn_threshold)", | ||
| meta, meta.partitionKeyType.toCQLString(key.getKey()), estimatedTombstones, tombstoneWarnThreshold); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,8 @@ | |
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
|
|
||
| import com.carrotsearch.hppc.ObjectLongHashMap; | ||
| import com.carrotsearch.hppc.ObjectLongMap; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
|
|
||
| import org.apache.cassandra.concurrent.ScheduledExecutors; | ||
|
|
@@ -197,6 +199,7 @@ public String toString() | |
| public static class TopHolder | ||
| { | ||
| public final NavigableSet<TopPartition> top; | ||
| public final ObjectLongMap<DecoratedKey> lookup; | ||
| private final int maxTopPartitionCount; | ||
| private final long minTrackedValue; | ||
| private final Collection<Range<Token>> ranges; | ||
|
|
@@ -205,14 +208,15 @@ public static class TopHolder | |
|
|
||
| private TopHolder(int maxTopPartitionCount, long minTrackedValue, Collection<Range<Token>> ranges) | ||
| { | ||
| this(maxTopPartitionCount, minTrackedValue, new TreeSet<>(), ranges, 0); | ||
| this(maxTopPartitionCount, minTrackedValue, new TreeSet<>(), new ObjectLongHashMap<>(), ranges, 0); | ||
| } | ||
|
|
||
| private TopHolder(int maxTopPartitionCount, long minTrackedValue, NavigableSet<TopPartition> top, Collection<Range<Token>> ranges, long lastUpdate) | ||
| private TopHolder(int maxTopPartitionCount, long minTrackedValue, NavigableSet<TopPartition> top, ObjectLongMap<DecoratedKey> lookup, Collection<Range<Token>> ranges, long lastUpdate) | ||
| { | ||
| this.maxTopPartitionCount = maxTopPartitionCount; | ||
| this.minTrackedValue = minTrackedValue; | ||
| this.top = top; | ||
| this.lookup = lookup; | ||
| this.ranges = ranges; | ||
| this.lastUpdate = lastUpdate; | ||
| } | ||
|
|
@@ -224,6 +228,7 @@ private TopHolder(StoredTopPartitions storedTopPartitions, | |
| this.maxTopPartitionCount = maxTopPartitionCount; | ||
| this.minTrackedValue = minTrackedValue; | ||
| top = new TreeSet<>(); | ||
| this.lookup = new ObjectLongHashMap<>(); | ||
| this.ranges = null; | ||
| this.lastUpdate = storedTopPartitions.lastUpdated; | ||
|
|
||
|
|
@@ -243,9 +248,11 @@ public void track(DecoratedKey key, long value) | |
| private void track(TopPartition tp) | ||
| { | ||
| top.add(tp); | ||
| lookup.put(tp.key, tp.value); | ||
| while (top.size() > maxTopPartitionCount) | ||
| { | ||
| top.pollLast(); | ||
| TopPartition p = top.pollLast(); | ||
| lookup.remove(p.key); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you remove from the lookup but you don't add?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My bad, missed the lookup.put() call. Thanks! |
||
| currentMinValue = top.last().value; | ||
| } | ||
| currentMinValue = Math.min(tp.value, currentMinValue); | ||
|
|
@@ -274,7 +281,12 @@ public TopHolder merge(TopHolder holder, Collection<Range<Token>> ownedRanges) | |
|
|
||
| private TopHolder cloneForMerging(long lastUpdate) | ||
| { | ||
| return new TopHolder(maxTopPartitionCount, minTrackedValue, new TreeSet<>(top), ranges, lastUpdate); | ||
| return new TopHolder(maxTopPartitionCount, minTrackedValue, new TreeSet<>(top), new ObjectLongHashMap<>(lookup), ranges, lastUpdate); | ||
| } | ||
|
|
||
| public long getEstimate(DecoratedKey dk) | ||
| { | ||
| return lookup.getOrDefault(dk, 0L); | ||
| } | ||
|
|
||
| public String toString() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't see this validated in
DatabaseDescriptor.applycan you make sure we validate?-1, 0+seem to be the only valid values, but where is this enforced?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
take a look at
applyReadThresholdsValidations. Prob best to just rename that method and put the check there =)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For write_size_warn_threshold:
The validation is enforced by the DataStorageSpec type itself and the constructor validates that any non-null value must be >= 0. For this type, null represents "disabled" (equivalent to -1 for int fields). So technically the valid values are either null OR >= 0
I think it will be a redundant validation if I add soemthing similar to read thresholds, what do you think ?