Skip to content

Hbase 写入分析 #103

@asu126

Description

@asu126

分析HBase Put操作流程

Exported on 2025/8/21 at GMT+8 13:58:59 from Cursor (1.1.5)


User

帮我分析下Hbase Put 操作流程


Cursor

我来帮你分析HBase Put操作的流程。让我先探索一下HBase的代码结构,找到相关的Put操作实现代码。

很好!我找到了Put.java文件。现在让我查看Put类的实现,以及相关的Mutation基类。

现在让我查看Mutation基类,了解Put操作的更多细节:

让我继续查看Mutation类的更多内容:

现在让我查看Table接口,了解Put操作是如何被调用的:

让我查看Table接口中的put方法:

现在让我查看Table接口中的put方法定义:

让我继续查看Table接口中的put方法:

现在让我查看Table接口中的put方法定义:

现在让我查看HRegion中的put方法实现,这是服务端的核心实现:

让我查看mutate方法的实现:

让我查看HRegion中的mutate方法:

现在让我查看HRegion中的mutate方法实现:

让我查看batchMutate方法的实现:

让我继续查看doMiniBatchMutate方法的实现:

现在让我查看客户端Table的实现,了解Put操作是如何从客户端发送到服务端的:

让我查看TableOverAsyncTable,这可能是同步Table接口的实现:

现在让我查看AsyncTableImpl中的put方法实现:

让我查看AsyncTableImpl中的put方法:

现在让我查看RawAsyncTableImpl中的put方法:

让我查看RawAsyncTableImpl中的put方法:

让我查看RawAsyncTableImpl中的put方法实现:

现在让我查看RequestConverter中的buildMutateRequest方法:

让我查看RequestConverter中的buildMutateRequest方法:

基于我对HBase代码的分析,现在我可以为你详细分析HBase Put操作的流程。让我总结一下整个流程:

HBase Put操作流程分析

1. 客户端层面 (Client Side)

1.1 Put对象创建

// 创建Put对象,指定行键
Put put = new Put(Bytes.toBytes("rowKey"));

// 添加列族、列限定符和值
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qualifier"), Bytes.toBytes("value"));

// 可选:设置时间戳
put.setTimestamp(timestamp);

1.2 Put对象结构

  • Put类继承自Mutation类
  • 包含行键(row)、时间戳(ts)、列族映射(familyMap)等
  • 支持设置持久性(durability)、TTL、ACL等属性

1.3 客户端调用链

Table.put(Put put) 
  → TableOverAsyncTable.put(Put put)
    → AsyncTableImpl.put(Put put)
      → RawAsyncTableImpl.put(Put put)
        → 构建RPC请求并发送

2. RPC通信层

2.1 请求构建

  • 使用RequestConverter.buildMutateRequest()将Put对象转换为Protocol Buffer格式
  • 构建MutateRequest,包含:
    • Region信息
    • Mutation类型(PUT)
    • Put数据

2.2 网络传输

  • 通过HBase RPC框架发送到对应的RegionServer
  • 支持重试机制和超时控制

3. 服务端层面 (Server Side)

3.1 RegionServer接收

  • RSRpcServices接收RPC请求
  • 解析请求并调用对应Region的put方法

3.2 Region处理流程

HRegion.put(Put put)
  → mutate(Put put)
    → batchMutate(BatchOperation)
      → doMiniBatchMutate(BatchOperation)

3.3 核心处理步骤

步骤1: 资源检查和锁定

  • 检查Region是否只读
  • 检查资源是否充足
  • 获取行锁(Row Lock)确保原子性

步骤2: 预准备操作

  • 调用协处理器(Coprocessor)的prePut钩子
  • 验证列族是否存在
  • 准备时间戳

步骤3: WAL写入

  • 构建WAL编辑记录
  • 写入预写日志(Write-Ahead Log)
  • 同步WAL到磁盘(根据durability设置)

步骤4: MemStore更新

  • 将数据写入内存存储(MemStore)
  • 更新MVCC(多版本并发控制)信息

步骤5: 完成操作

  • 调用协处理器的postPut钩子
  • 释放行锁
  • 返回操作状态

4. 数据存储结构

4.1 KeyValue结构

KeyValue kv = new KeyValue(
    row,           // 行键
    family,        // 列族
    qualifier,     // 列限定符
    timestamp,     // 时间戳
    KeyValue.Type.Put,  // 操作类型
    value          // 值
);

4.2 存储层次

  • MemStore: 内存中的有序数据结构
  • HFile: 磁盘上的不可变文件
  • WAL: 预写日志,保证数据持久性

5. 关键特性

5.1 原子性

  • 同一行的所有列族操作是原子的
  • 通过行锁保证并发安全

5.2 持久性

  • 支持多种durability级别:
    • SKIP_WAL: 跳过WAL,性能最高但可能丢失数据
    • ASYNC_WAL: 异步写入WAL
    • SYNC_WAL: 同步写入WAL
    • FSYNC_WAL: 强制同步到磁盘

5.3 版本控制

  • 支持多版本存储
  • 时间戳用于版本管理
  • 可配置最大版本数

6. 性能优化

6.1 批量操作

  • 支持批量Put操作
  • 减少RPC调用次数

6.2 异步操作

  • 提供异步API
  • 支持CompletableFuture

6.3 缓冲机制

  • BufferedMutator提供客户端缓冲
  • 批量发送提高吞吐量

7. 错误处理

7.1 重试机制

  • 网络异常自动重试
  • 可配置重试次数和间隔

7.2 异常类型

  • NoSuchColumnFamilyException: 列族不存在
  • RegionTooBusyException: Region过载
  • FailedSanityCheckException: 数据验证失败

这个流程展示了HBase Put操作从客户端到服务端的完整路径,体现了HBase作为分布式NoSQL数据库的架构特点和设计理念。

// 创建Put对象,指定行键
Put put = new Put(Bytes.toBytes("rowKey"));

// 添加列族、列限定符和值
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("qualifier"), Bytes.toBytes("value"));

// 可选:设置时间戳
put.setTimestamp(timestamp);
Table.put(Put put) 
  → TableOverAsyncTable.put(Put put)
    → AsyncTableImpl.put(Put put)
      → RawAsyncTableImpl.put(Put put)
        → 构建RPC请求并发送
HRegion.put(Put put)
  → mutate(Put put)
    → batchMutate(BatchOperation)
      → doMiniBatchMutate(BatchOperation)
KeyValue kv = new KeyValue(
    row,           // 行键
    family,        // 列族
    qualifier,     // 列限定符
    timestamp,     // 时间戳
    KeyValue.Type.Put,  // 操作类型
    value          // 值
);

User

多版本特性:如果服务端设置了只保留一个版本,帮我找一下在写入时候对多版本的处理代码


Cursor

我来帮你找到HBase在写入时对多版本处理的代码,特别是当服务端设置为只保留一个版本时的处理逻辑。

首先让我搜索相关的代码:

现在让我搜索更具体的多版本处理代码:

让我查看这个关键的代码段:

现在让我搜索更多关于多版本处理的代码:

让我搜索Store类中的upsert方法:

让我查看HStore中的upsert方法:

现在让我查看AbstractMemStore中的upsert方法:

让我查看AbstractMemStore中单个cell的upsert方法:

让我查看AbstractMemStore中的doUpsert方法:

现在让我查看MutableSegment中的upsert方法:

现在让我搜索更多关于多版本控制的代码,特别是关于maxVersions的处理:

现在让我搜索更多关于多版本控制的代码,特别是关于版本清理的逻辑:

让我搜索更多关于多版本控制的代码:

现在让我查看ScanInfo类,了解minVersions和maxVersions的处理:

现在让我查看NewVersionBehaviorTracker类,了解新版本行为跟踪器的实现:

现在让我查看ScanWildcardColumnTracker类,了解通配符列跟踪器的实现:

基于我对HBase代码的深入分析,现在我可以为你详细解释HBase在写入时对多版本的处理,特别是当服务端设置为只保留一个版本时的处理逻辑。

HBase多版本特性:写入时的版本处理代码分析

1. 核心判断逻辑

HRegion.java中,有一个关键的判断来决定是否使用upsert模式:

boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1;
if (upsert) {
  store.upsert(cells, getSmallestReadPoint(), memstoreAccounting);
} else {
  store.add(cells, memstoreAccounting);
}

关键点

  • maxVersions == 1时,HBase会使用upsert模式
  • maxVersions > 1时,使用普通的add模式

2. Upsert模式的实现

2.1 调用链

HRegion.applyToMemStore() 
  → HStore.upsert() 
    → AbstractMemStore.upsert() 
      → MutableSegment.upsert()

2.2 MutableSegment.upsert()的核心逻辑

public void upsert(ExtendedCell cell, long readpoint, MemStoreSizing memStoreSizing,
  boolean sizeAddedPreOperation) {
  internalAdd(cell, false, memStoreSizing, sizeAddedPreOperation);

  // Get the Cells for the row/family/qualifier regardless of timestamp.
  // For this case we want to clean up any other puts
  ExtendedCell firstCell =
    PrivateCellUtil.createFirstOnRowColTS(cell, HConstants.LATEST_TIMESTAMP);
  SortedSet<ExtendedCell> ss = this.tailSet(firstCell);
  Iterator<ExtendedCell> it = ss.iterator();
  // versions visible to oldest scanner
  int versionsVisible = 0;
  while (it.hasNext()) {
    ExtendedCell cur = it.next();

    if (cell == cur) {
      // ignore the one just put in
      continue;
    }
    // check that this is the row and column we are interested in, otherwise bail
    if (CellUtil.matchingRows(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
      // only remove Puts that concurrent scanners cannot possibly see
      if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) {
        if (versionsVisible >= 1) {
          // if we get here we have seen at least one version visible to the oldest scanner,
          // which means we can prove that no scanner will see this version

          // false means there was a change, so give us the size.
          int cellLen = getCellLength(cur);
          long heapSize = heapSizeChange(cur, true);
          long offHeapSize = offHeapSizeChange(cur, true);
          incMemStoreSize(-cellLen, -heapSize, -offHeapSize, -1);
          if (memStoreSizing != null) {
            memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize, 1);
          }
          it.remove();
        } else {
          versionsVisible++;
        }
      }
    } else {
      // past the row or column, done
      break;
    }
  }
}

3. 版本清理的关键逻辑

3.1 版本可见性判断

int versionsVisible = 0;
// ...
if (versionsVisible >= 1) {
  // 可以安全删除旧版本
  it.remove();
} else {
  versionsVisible++;
}

核心逻辑

  • versionsVisible >= 1时,说明已经有一个版本对最老的扫描器可见
  • 此时可以安全删除旧版本,因为不会有扫描器能看到它

3.2 删除条件

if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) {
  // 只有Put类型的Cell且序列ID小于等于读点才能删除
}

删除条件

  1. 必须是Put类型的Cell(不包括Delete标记)
  2. 序列ID必须小于等于当前读点(readpoint)
  3. 已经有一个版本对扫描器可见

4. 与普通Add模式的区别

4.1 普通Add模式

// 当maxVersions > 1时
store.add(cells, memstoreAccounting);
  • 直接添加新版本到MemStore
  • 不删除旧版本
  • 版本数量由maxVersions控制

4.2 Upsert模式

// 当maxVersions == 1时
store.upsert(cells, getSmallestReadPoint(), memstoreAccounting);
  • 添加新版本的同时清理旧版本
  • 确保只保留一个版本
  • 立即进行版本清理,而不是等到读取时

5. 版本控制的其他机制

5.1 读取时的版本控制

ScanWildcardColumnTracker中:

// keep the KV if required by minversions or it is not expired, yet
if (currentCount <= minVersions || !isExpired(timestamp)) {

5.2 新版本行为跟踪器

NewVersionBehaviorTracker中:

if (countCurrentCol == minVersions) {
  return MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
}
if (countCurrentCol > minVersions) {
  // This may not be reached, only for safety.
  return MatchCode.SEEK_NEXT_COL;
}

6. 总结

当HBase服务端设置maxVersions = 1时:

  1. 写入时立即清理:使用upsert模式,在写入新版本的同时立即清理旧版本
  2. 版本可见性保证:确保清理的版本不会被任何扫描器看到
  3. 内存优化:及时释放旧版本占用的内存空间
  4. 性能提升:避免在读取时进行版本过滤,提高查询性能

这种设计体现了HBase在性能和数据一致性之间的平衡,通过写入时的主动版本管理,既保证了数据的正确性,又优化了存储和查询性能。

boolean upsert = delta && store.getColumnFamilyDescriptor().getMaxVersions() == 1;
if (upsert) {
  store.upsert(cells, getSmallestReadPoint(), memstoreAccounting);
} else {
  store.add(cells, memstoreAccounting);
}
HRegion.applyToMemStore() 
  → HStore.upsert() 
    → AbstractMemStore.upsert() 
      → MutableSegment.upsert()
public void upsert(ExtendedCell cell, long readpoint, MemStoreSizing memStoreSizing,
  boolean sizeAddedPreOperation) {
  internalAdd(cell, false, memStoreSizing, sizeAddedPreOperation);

  // Get the Cells for the row/family/qualifier regardless of timestamp.
  // For this case we want to clean up any other puts
  ExtendedCell firstCell =
    PrivateCellUtil.createFirstOnRowColTS(cell, HConstants.LATEST_TIMESTAMP);
  SortedSet<ExtendedCell> ss = this.tailSet(firstCell);
  Iterator<ExtendedCell> it = ss.iterator();
  // versions visible to oldest scanner
  int versionsVisible = 0;
  while (it.hasNext()) {
    ExtendedCell cur = it.next();

    if (cell == cur) {
      // ignore the one just put in
      continue;
    }
    // check that this is the row and column we are interested in, otherwise bail
    if (CellUtil.matchingRows(cell, cur) && CellUtil.matchingQualifier(cell, cur)) {
      // only remove Puts that concurrent scanners cannot possibly see
      if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) {
        if (versionsVisible >= 1) {
          // if we get here we have seen at least one version visible to the oldest scanner,
          // which means we can prove that no scanner will see this version

          // false means there was a change, so give us the size.
          int cellLen = getCellLength(cur);
          long heapSize = heapSizeChange(cur, true);
          long offHeapSize = offHeapSizeChange(cur, true);
          incMemStoreSize(-cellLen, -heapSize, -offHeapSize, -1);
          if (memStoreSizing != null) {
            memStoreSizing.decMemStoreSize(cellLen, heapSize, offHeapSize, 1);
          }
          it.remove();
        } else {
          versionsVisible++;
        }
      }
    } else {
      // past the row or column, done
      break;
    }
  }
}
int versionsVisible = 0;
// ...
if (versionsVisible >= 1) {
  // 可以安全删除旧版本
  it.remove();
} else {
  versionsVisible++;
}
if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && cur.getSequenceId() <= readpoint) {
  // 只有Put类型的Cell且序列ID小于等于读点才能删除
}
// 当maxVersions > 1时
store.add(cells, memstoreAccounting);
// 当maxVersions == 1时
store.upsert(cells, getSmallestReadPoint(), memstoreAccounting);
// keep the KV if required by minversions or it is not expired, yet
if (currentCount <= minVersions || !isExpired(timestamp)) {
if (countCurrentCol == minVersions) {
  return MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
}
if (countCurrentCol > minVersions) {
  // This may not be reached, only for safety.
  return MatchCode.SEEK_NEXT_COL;
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions