issue=1258, t-cache support block-level cache evict#1266
issue=1258, t-cache support block-level cache evict#1266caijieming-ng wants to merge 19 commits intobaidu:masterfrom
Conversation
72a31b8 to
140eabf
Compare
|
build |
src/leveldb/util/block_cache.cc
Outdated
| for (uint32_t i = 0; i < c_valid.size(); ++i) { | ||
| MutexLock lockgard(&cache_->mu_); | ||
| CacheBlock* block = c_valid[i]; | ||
| block->cv.Wait(); |
src/io/utils_leveldb.cc
Outdated
| #include "leveldb/comparator.h" | ||
| #include "leveldb/env_dfs.h" | ||
| #include "leveldb/env_flash.h" | ||
| #include "leveldb/block_cache.h" |
src/io/utils_leveldb.cc
Outdated
| DECLARE_string(tera_tabletnode_path_prefix); | ||
| DECLARE_string(tera_dfs_so_path); | ||
| DECLARE_string(tera_dfs_conf); | ||
| DECLARE_int32(tera_leveldb_block_cache_env_num_thread); |
|
|
||
| // Tcache: default env | ||
| static pthread_once_t block_cache_once = PTHREAD_ONCE_INIT; | ||
| static leveldb::Env* default_block_cache_env; |
There was a problem hiding this comment.
为啥不像mem及flash一样,弄成函数内的静态变量
| // found in the LICENSE file. | ||
|
|
||
| #ifndef STOREAGE_LEVELDB_UTIL_BLOCK_CACHE_H | ||
| #define STOREAGE_LEVELDB_UTIL_BLOCK_CACHE_H |
There was a problem hiding this comment.
STOREAGE_LEVELDB_UTIL_BLOCK_CACHE_H_?
src/tabletnode/tabletnode_impl.cc
Outdated
| // compitable with legacy FlashEnv | ||
| leveldb::FlashEnv* flash_env = (leveldb::FlashEnv*)io::LeveldbFlashEnv(); | ||
| flash_env->SetFlashPath(FLAGS_tera_tabletnode_cache_paths, | ||
| FLAGS_tera_io_cache_path_vanish_allowed); |
| return reinterpret_cast<LRUHandle*>(handle)->value; | ||
| } | ||
|
|
||
| uint64_t NewId() { |
There was a problem hiding this comment.
这个接口在block cache中没有用
|
|
||
| // An entry is a variable length heap-allocated structure. Entries | ||
| // are kept in a circular doubly linked list ordered by access time. | ||
| struct LRUHandle { |
| Env* NewBlockCacheEnv(Env* base); | ||
|
|
||
| } // leveldb | ||
| #endif |
There was a problem hiding this comment.
STOREAGE_LEVELDB_UTIL_BLOCK_CACHE_H_
| // Dummy head of LRU list. | ||
| // lru.prev is newest entry, lru.next is oldest entry. | ||
| //LRUHandle hot_lru_; | ||
| //LRUHandle cold_lru_; |
| return usage_; | ||
| } | ||
|
|
||
| class LRU2QCache: public Cache { |
There was a problem hiding this comment.
没太明白和原生LRUCache的区别,除了在找坑方面有区别,其它的有吗?
There was a problem hiding this comment.
原生lru是可超了,这个lru是描述持久化的ssd上block的lru,不能超
src/leveldb/util/block_cache.cc
Outdated
| // Tcache | ||
| ///////////////////////////////////////////// | ||
| uint64_t kBlockSize = 4096UL; | ||
| uint64_t kDataSetSize = 134217728UL; |
There was a problem hiding this comment.
搞成128 * 1024 * 1024或者128 << 20这种?
|
|
||
| Status NewRandomAccessFile(const std::string& fname, | ||
| RandomAccessFile** result); // cache Pread | ||
| static void BlockDeleter(const Slice& key, void* v); |
| BlockCacheImpl* cache = new BlockCacheImpl(options); | ||
| Status s = cache->LoadCache(); | ||
| assert(s.ok()); | ||
| cache_vec_.push_back(cache); // no need lock |
There was a problem hiding this comment.
这是ts启动时,加载cache(没有开销),不是load tablet的加载;
src/leveldb/util/block_cache.cc
Outdated
| MutexLock l(&mu_); | ||
| if (tmp_storage_ == NULL) { | ||
| tmp_storage_ = new std::string(); | ||
| tmp_storage_->resize(0); |
| tmp_storage_->resize(0); | ||
| block_list_.push_back(tmp_storage_); | ||
| } | ||
| uint32_t begin = offset_ / block_size_; |
src/leveldb/util/block_cache.cc
Outdated
| Slice buf(data.data() + tmp_size, data.size() - tmp_size); | ||
| for (uint32_t i = begin + 1; i <= end; ++i) { | ||
| tmp_storage_ = new std::string(); | ||
| tmp_storage_->resize(0); |
src/leveldb/util/block_cache.cc
Outdated
| tmp_storage_ = new std::string(); | ||
| tmp_storage_->resize(0); | ||
| block_list_.push_back(tmp_storage_); | ||
| if (i < end) { // last block |
| for (uint32_t i = begin + 1; i <= end; ++i) { | ||
| tmp_storage_ = new std::string(); | ||
| tmp_storage_->resize(0); | ||
| block_list_.push_back(tmp_storage_); |
src/leveldb/util/block_cache.cc
Outdated
| Log("[%s] begin release %s\n", cache_->WorkPath().c_str(), fname_.c_str()); | ||
| MutexLock lockgard(&cache_->mu_); | ||
| uint64_t block_idx; | ||
| std::string* block_data = write_buffer_.PopBackBlock(&block_idx); |
There was a problem hiding this comment.
因为前面的块是通过BGFlush异步下刷的(按block size粒度,不满足这个粒度的,BGFlush不处理),只有close的时候,才能知道文件确实结束了,所有在此时刷最后一个不满粒度的数据块
src/leveldb/util/block_cache.cc
Outdated
| dfs_file_ = NULL; | ||
| } | ||
|
|
||
| Log("[%s] begin release %s\n", cache_->WorkPath().c_str(), fname_.c_str()); |
src/leveldb/util/block_cache.cc
Outdated
| public: | ||
| BlockCacheWritableFile(BlockCacheImpl* c, const std::string& fname, Status* s) | ||
| : cache_(c), | ||
| bg_cv_(&c->mu_), |
There was a problem hiding this comment.
不暴露出来这个锁可以吗?由BlockCacheImpl提供一个阻塞的接口
src/leveldb/util/block_cache.cc
Outdated
| cache_->options_.block_size, | ||
| s->ToString().c_str()); | ||
|
|
||
| MutexLock lockgard(&cache_->mu_); |
src/leveldb/util/block_cache.cc
Outdated
| mu_.AssertHeld(); | ||
| uint64_t fid = 0; | ||
| std::string key = "FNAME#" + fname; | ||
| mu_.Unlock(); |
There was a problem hiding this comment.
感觉略怪,为啥这里突然把锁放了,而调的时候前面特意加个锁
src/leveldb/util/block_cache.cc
Outdated
|
|
||
| Waiter* w = NULL; | ||
| LockKeyMap::iterator it = lock_key_.find(key); | ||
| if (it != lock_key_.end()){ |
src/leveldb/util/block_cache.cc
Outdated
| } | ||
| write_buffer_.Append(data); | ||
|
|
||
| MutexLock lockgard(&cache_->mu_); |
| Log("[%s] Begin BGFlush: %s\n", cache_->WorkPath().c_str(), fname_.c_str()); | ||
| MutexLock lockgard(&cache_->mu_); | ||
| uint64_t block_idx; | ||
| std::string* block_data = write_buffer_.PopFrontBlock(&block_idx); |
There was a problem hiding this comment.
前面扔了好多task,这里不搞成while一波流主要是为了并发考虑?
| port::CondVar cv(&cache_->mu_); | ||
| cv.Wait(10); // timewait 10ms retry | ||
| } | ||
| block->state = 0; |
There was a problem hiding this comment.
可以改成断言,此时这个block必然是个无效块
| s->ToString().c_str()); | ||
|
|
||
| MutexLock lockgard(&cache_->mu_); | ||
| fid_ = cache_->FileId(fname_); |
| s = block->s; // degrade read | ||
| } | ||
| block->Clear(kCacheBlockLocked); | ||
| block->cv.SignalAll(); |
There was a problem hiding this comment.
唤醒其他线程的c_lock中等待的block
|
|
||
| if (!block->Test(kCacheBlockLocked) && | ||
| block->Test(kCacheBlockValid)) { | ||
| block->Set(kCacheBlockLocked | kCacheBlockCacheRead); |
| block->WaitOnClear(kCacheBlockDfsRead); | ||
| block->Set(kCacheBlockCacheFill); | ||
| if (!block->s.ok() && s.ok()) { | ||
| s = block->s; // degrade read |
There was a problem hiding this comment.
如果这里失败,是不是后面可以直接跳到dfs->Read了
| new_fid_ = prev_fid_ + options_.fid_batch_num; | ||
| Log("[block_cache %s]: reuse block cache: prev_fid: %lu, new_fid: %lu\n", | ||
| dbname.c_str(), prev_fid_, new_fid_); | ||
| s = Status::OK(); |
| uint64_t BlockCacheImpl::AllocFileId() { // no more than fid_batch_num | ||
| mu_.AssertHeld(); | ||
| uint64_t fid = ++new_fid_; | ||
| while (new_fid_ - prev_fid_ >= options_.fid_batch_num) { |
There was a problem hiding this comment.
防止leveldb写失败
src/leveldb/util/block_cache.cc
Outdated
| PutFixed64(&hkey, block->block_idx); | ||
| block->sid = lc.sid; | ||
| block->cache_block_idx = DecodeFixed64(lkey.data()); | ||
| block->state = (block->Test(kCacheBlockValid)) ? kCacheBlockValid : 0; |
src/leveldb/util/block_cache.cc
Outdated
| lc.KeyToString().c_str(), | ||
| lc.ValToString().c_str(), | ||
| s.ToString().c_str()); | ||
| } else if (lc.type == kDataSetKey) { |
There was a problem hiding this comment.
这个分支实际上是一个load操作,是不是放别处比较好
There was a problem hiding this comment.
这个函数主要维护一个lock table,给整个blockcacheimpl内部对象的并发控制使用;比如leveldb的读修改写,或并发的data set file的load,都走同一套并发控制逻辑;
bugfix: 1. cache reload core 2. support aio engine 3. cache fill TEST PASS
bugfix: 1. cache reload core 2. support aio engine 3. cache fill TEST PASS
| @@ -0,0 +1,101 @@ | |||
| // Copyright (c) 2015, Baidu.com, Inc. All Rights Reserved | |||
There was a problem hiding this comment.
这个看起来好别扭,既然是新文件,为啥会有个2015
| namespace leveldb { | ||
|
|
||
| /** | ||
| * Keep adding ticker's here. |
There was a problem hiding this comment.
这些代码是其他地方抄过来的么,质量有保证么,会不会带来版权问题?
There was a problem hiding this comment.
删掉不需要的统计项,从rocksdb的性能统计方式参考的, :-(
There was a problem hiding this comment.
这种方式调性能问题,比日志方式有效;glog和leveldb的打一条日志,大概在10微妙,很费。
|
|
||
| // LRU cache implementation | ||
|
|
||
| // An entry is a variable length heap-allocated structure. Entries |
1. lock optimize
1. lock optimize
|
您的执著和坚持,令人感动!继续加油~ |
|
同赞 |
bugfix for cache evict
bugfix for cache evict
No description provided.