Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 100 additions & 105 deletions src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl<Req: KvRequest + StoreRequest> StoreRequest for Dispatch<Req> {
const MULTI_REGION_CONCURRENCY: usize = 16;
const MULTI_STORES_CONCURRENCY: usize = 16;

fn is_grpc_error(e: &Error) -> bool {
pub(crate) fn is_grpc_error(e: &Error) -> bool {
matches!(e, Error::GrpcAPI(_) | Error::Grpc(_))
}

Expand Down Expand Up @@ -206,7 +206,7 @@ where
match backoff.next_delay_duration() {
Some(duration) => {
let region_error_resolved =
Self::handle_region_error(pd_client.clone(), e, region_store).await?;
handle_region_error(pd_client.clone(), e, region_store).await?;
// don't sleep if we have resolved the region error
if !region_error_resolved {
sleep(duration).await;
Expand All @@ -227,109 +227,6 @@ where
}
}

// Returns
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
async fn handle_region_error(
pd_client: Arc<PdC>,
e: errorpb::Error,
region_store: RegionStore,
) -> Result<bool> {
let ver_id = region_store.region_with_leader.ver_id();
let store_id = region_store.region_with_leader.get_store_id();
if let Some(not_leader) = e.not_leader {
if let Some(leader) = not_leader.leader {
match pd_client
.update_leader(region_store.region_with_leader.ver_id(), leader)
.await
{
Ok(_) => Ok(true),
Err(e) => {
pd_client.invalidate_region_cache(ver_id).await;
Err(e)
}
}
} else {
// The peer doesn't know who is the current leader. Generally it's because
// the Raft group is in an election, but it's possible that the peer is
// isolated and removed from the Raft group. So it's necessary to reload
// the region from PD.
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}
} else if e.store_not_match.is_some() {
pd_client.invalidate_region_cache(ver_id).await;
if let Ok(store_id) = store_id {
pd_client.invalidate_store_cache(store_id).await;
}
Ok(false)
} else if e.epoch_not_match.is_some() {
Self::on_region_epoch_not_match(
pd_client.clone(),
region_store,
e.epoch_not_match.unwrap(),
)
.await
} else if e.stale_command.is_some() || e.region_not_found.is_some() {
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
} else if e.server_is_busy.is_some()
|| e.raft_entry_too_large.is_some()
|| e.max_timestamp_not_synced.is_some()
{
Err(Error::RegionError(Box::new(e)))
} else {
// TODO: pass the logger around
// info!("unknwon region error: {:?}", e);
pd_client.invalidate_region_cache(ver_id).await;
if let Ok(store_id) = store_id {
pd_client.invalidate_store_cache(store_id).await;
}
Ok(false)
}
}

// Returns
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
async fn on_region_epoch_not_match(
pd_client: Arc<PdC>,
region_store: RegionStore,
error: EpochNotMatch,
) -> Result<bool> {
let ver_id = region_store.region_with_leader.ver_id();
if error.current_regions.is_empty() {
pd_client.invalidate_region_cache(ver_id).await;
return Ok(true);
}

for r in error.current_regions {
if r.id == region_store.region_with_leader.id() {
let region_epoch = r.region_epoch.unwrap();
let returned_conf_ver = region_epoch.conf_ver;
let returned_version = region_epoch.version;
let current_region_epoch = region_store
.region_with_leader
.region
.region_epoch
.clone()
.unwrap();
let current_conf_ver = current_region_epoch.conf_ver;
let current_version = current_region_epoch.version;

// Find whether the current region is ahead of TiKV's. If so, backoff.
if returned_conf_ver < current_conf_ver || returned_version < current_version {
return Ok(false);
}
}
}
// TODO: finer grained processing
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}

#[allow(clippy::too_many_arguments)]
async fn handle_other_error(
pd_client: Arc<PdC>,
Expand Down Expand Up @@ -365,6 +262,104 @@ where
}
}

// Returns
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
pub(crate) async fn handle_region_error<PdC: PdClient>(
pd_client: Arc<PdC>,
e: errorpb::Error,
region_store: RegionStore,
) -> Result<bool> {
let ver_id = region_store.region_with_leader.ver_id();
let store_id = region_store.region_with_leader.get_store_id();
if let Some(not_leader) = e.not_leader {
if let Some(leader) = not_leader.leader {
match pd_client
.update_leader(region_store.region_with_leader.ver_id(), leader)
.await
{
Ok(_) => Ok(true),
Err(e) => {
pd_client.invalidate_region_cache(ver_id).await;
Err(e)
}
}
} else {
// The peer doesn't know who is the current leader. Generally it's because
// the Raft group is in an election, but it's possible that the peer is
// isolated and removed from the Raft group. So it's necessary to reload
// the region from PD.
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}
} else if e.store_not_match.is_some() {
pd_client.invalidate_region_cache(ver_id).await;
if let Ok(store_id) = store_id {
pd_client.invalidate_store_cache(store_id).await;
}
Ok(false)
} else if e.epoch_not_match.is_some() {
on_region_epoch_not_match(pd_client.clone(), region_store, e.epoch_not_match.unwrap()).await
} else if e.stale_command.is_some() || e.region_not_found.is_some() {
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
} else if e.server_is_busy.is_some()
|| e.raft_entry_too_large.is_some()
|| e.max_timestamp_not_synced.is_some()
{
Err(Error::RegionError(Box::new(e)))
} else {
// TODO: pass the logger around
// info!("unknwon region error: {:?}", e);
pd_client.invalidate_region_cache(ver_id).await;
if let Ok(store_id) = store_id {
pd_client.invalidate_store_cache(store_id).await;
}
Ok(false)
}
}

// Returns
// 1. Ok(true): error has been resolved, retry immediately
// 2. Ok(false): backoff, and then retry
// 3. Err(Error): can't be resolved, return the error to upper level
async fn on_region_epoch_not_match<PdC: PdClient>(
pd_client: Arc<PdC>,
region_store: RegionStore,
error: EpochNotMatch,
) -> Result<bool> {
let ver_id = region_store.region_with_leader.ver_id();
if error.current_regions.is_empty() {
pd_client.invalidate_region_cache(ver_id).await;
return Ok(true);
}

for r in error.current_regions {
if r.id == region_store.region_with_leader.id() {
let region_epoch = r.region_epoch.unwrap();
let returned_conf_ver = region_epoch.conf_ver;
let returned_version = region_epoch.version;
let current_region_epoch = region_store
.region_with_leader
.region
.region_epoch
.clone()
.unwrap();
let current_conf_ver = current_region_epoch.conf_ver;
let current_version = current_region_epoch.version;

// Find whether the current region is ahead of TiKV's. If so, backoff.
if returned_conf_ver < current_conf_ver || returned_version < current_version {
return Ok(false);
}
}
}
// TODO: finer grained processing
pd_client.invalidate_region_cache(ver_id).await;
Ok(false)
}

impl<P: Plan, PdC: PdClient> Clone for RetryableMultiRegion<P, PdC> {
fn clone(&self) -> Self {
RetryableMultiRegion {
Expand Down
35 changes: 33 additions & 2 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb;
use crate::proto::pdpb::Timestamp;
use crate::request::codec::{ApiV1TxnCodec, ApiV2TxnCodec, Codec, EncodedRequest};
use crate::request::plan::CleanupLocksResult;
Expand All @@ -17,6 +18,7 @@ use crate::timestamp::TimestampExt;
use crate::transaction::lock::ResolveLocksOptions;
use crate::transaction::lowering::new_scan_lock_request;
use crate::transaction::lowering::new_unsafe_destroy_range_request;
use crate::transaction::resolve_locks;
use crate::transaction::ResolveLocksContext;
use crate::transaction::Snapshot;
use crate::transaction::Transaction;
Expand Down Expand Up @@ -290,9 +292,7 @@ impl<Cod: Codec> Client<Cod> {
plan.execute().await
}

// For test.
// Note: `batch_size` must be >= expected number of locks.
#[cfg(feature = "integration-tests")]
pub async fn scan_locks(
&self,
safepoint: &Timestamp,
Expand All @@ -308,6 +308,37 @@ impl<Cod: Codec> Client<Cod> {
plan.execute().await
}

/// Resolves the given locks and returns any that remain live.
///
/// This method retries until either all locks are resolved or the provided
/// `backoff` is exhausted. The `timestamp` is used as the caller start
/// timestamp when checking transaction status.
pub async fn resolve_locks(
&self,
locks: Vec<kvrpcpb::LockInfo>,
timestamp: Timestamp,
mut backoff: Backoff,
) -> Result<Vec<kvrpcpb::LockInfo>> {
let mut live_locks = locks;
loop {
live_locks = resolve_locks(live_locks, timestamp.clone(), self.pd.clone()).await?;
if live_locks.is_empty() {
return Ok(live_locks);
}

if backoff.is_none() {
return Ok(live_locks);
}

match backoff.next_delay_duration() {
None => return Ok(live_locks),
Some(delay_duration) => {
tokio::time::sleep(delay_duration).await;
}
}
}
}

/// Cleans up all keys in a range and quickly reclaim disk space.
///
/// The range can span over multiple regions.
Expand Down
Loading