Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions junction/ConcurrentMap_Grampa.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,73 @@ class ConcurrentMap_Grampa {
}
}

bool putValueIfNotExists(Value desired) {
TURF_ASSERT(desired != Value(ValueTraits::NullValue));
TURF_ASSERT(desired != Value(ValueTraits::Redirect));
TURF_ASSERT(m_cell); // Cell must have been found or inserted
TURF_TRACE(ConcurrentMap_Grampa, 14, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
for (;;) {
Value oldValue = m_value;
if(oldValue == Value(ValueTraits::NullValue)) {
if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
// Exchange was successful. Return previous value.
TURF_TRACE(ConcurrentMap_Grampa, 15, "[Mutator::exchangeValue] exchanged Value", uptr(m_value),
uptr(desired));
m_value = desired; // Leave the mutator in a valid state
return true;
}
} else {
return false;
}
// The CAS failed and m_value has been updated with the latest value.
if (m_value != Value(ValueTraits::Redirect)) {
TURF_TRACE(ConcurrentMap_Grampa, 16, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
uptr(m_value));
if (m_value != Value(ValueTraits::NullValue)) {
TURF_TRACE(ConcurrentMap_Grampa, 17, "[Mutator::exchangeValue] racing write inserted new value",
uptr(m_table), uptr(m_value));
}
// There was a racing write (or erase) to this cell.
return false;
}
// We've encountered a Redirect value. Help finish the migration.
TURF_TRACE(ConcurrentMap_Grampa, 18, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value));
Hash hash = m_cell->hash.load(turf::Relaxed);
for (;;) {
// Help complete the migration.
m_table->jobCoordinator.participate();
// Try again in the latest table.
// FIXME: locateTable() could return false if the map is concurrently cleared (m_root set to 0).
// This is not concern yet since clear() is not implemented.
bool exists = m_map.locateTable(m_table, m_sizeMask, hash);
TURF_ASSERT(exists);
TURF_UNUSED(exists);
m_value = Value(ValueTraits::NullValue);
ureg overflowIdx;
switch (Details::insertOrFind(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell
case Details::InsertResult_AlreadyFound:
m_value = m_cell->value.load(turf::Consume);
if (m_value == Value(ValueTraits::Redirect)) {
TURF_TRACE(ConcurrentMap_Grampa, 19, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
uptr(m_value));
break;
}
goto breakOuter;
case Details::InsertResult_InsertedNew:
goto breakOuter;
case Details::InsertResult_Overflow:
TURF_TRACE(ConcurrentMap_Grampa, 20, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
overflowIdx);
Details::beginTableMigration(m_map, m_table, overflowIdx);
break;
}
// We were redirected... again
}
breakOuter:;
// Try again in the new table.
}
}

void assignValue(Value desired) {
exchangeValue(desired);
}
Expand Down Expand Up @@ -483,6 +550,11 @@ class ConcurrentMap_Grampa {
}
}

bool insert(Key key, Value desired) {
Mutator iter(*this, key);
return iter.putValueIfNotExists(desired);
}

Value assign(Key key, Value desired) {
Mutator iter(*this, key);
return iter.exchangeValue(desired);
Expand Down