diff --git a/rocks/CMakeLists.txt b/rocks/CMakeLists.txt index 32f30271b3f4..9319db9445f4 100644 --- a/rocks/CMakeLists.txt +++ b/rocks/CMakeLists.txt @@ -5,7 +5,7 @@ include(SetupRocksDB) userver_module( rocks SOURCE_DIR "${CMAKE_CURRENT_SOURCE_DIR}" - LINK_LIBRARIES RocksDB::rocksdb + LINK_LIBRARIES_PRIVATE RocksDB::rocksdb UTEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/src/*_test.cpp" DEPENDS core ) diff --git a/rocks/include/userver/storages/rocks/client.hpp b/rocks/include/userver/storages/rocks/client.hpp deleted file mode 100644 index 012f893d12b8..000000000000 --- a/rocks/include/userver/storages/rocks/client.hpp +++ /dev/null @@ -1,72 +0,0 @@ -#pragma once - -/// @file userver/storages/rocks/client.hpp -/// @brief @copybrief storages::rocks::Client - -#include -#include - -#include - -#include - -USERVER_NAMESPACE_BEGIN - -namespace storages::rocks { - -/** - * @brief Client for working with RocksDB storage. - * - * This class provides an interface for interacting with the RocksDB database. - * To use the class, you need to specify the database path when creating an - * object. - */ -class Client final { -public: - /** - * @brief Constructor of the Client class. - * - * @param db_path The path to the RocksDB database. - * @param blocking_task_processor - task processor to execute blocking FS - * operations - */ - Client(const std::string& db_path, engine::TaskProcessor& blocking_task_processor); - - /** - * @brief Puts a record into the database. - * - * @param key The key of the record. - * @param value The value of the record. - */ - void Put(std::string_view key, std::string_view value); - - /** - * @brief Retrieves the value of a record from the database by key. - * - * @param key The key of the record. - */ - std::string Get(std::string_view key); - - /** - * @brief Deletes a record from the database by key. - * - * @param key The key of the record to be deleted. - */ - void Delete(std::string_view key); - - /** - * Checks the status of an operation and handles any errors based on the given - * method name. - * - * @param status The status of the operation to check. - * @param method_name The name of the method associated with the operation. - */ - void CheckStatus(rocksdb::Status status, std::string_view method_name); - -private: - std::unique_ptr db_; - engine::TaskProcessor& blocking_task_processor_; -}; -} // namespace storages::rocks - -USERVER_NAMESPACE_END diff --git a/rocks/include/userver/storages/rocks/column_family.hpp b/rocks/include/userver/storages/rocks/column_family.hpp new file mode 100644 index 000000000000..d324dbc8dbf4 --- /dev/null +++ b/rocks/include/userver/storages/rocks/column_family.hpp @@ -0,0 +1,15 @@ +#pragma once + +namespace rocksdb { +class ColumnFamilyHandle; +} // namespace rocksdb + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks { + +using ColumnFamilyHandle = rocksdb::ColumnFamilyHandle*; + +} // namespace storages::rocks + +USERVER_NAMESPACE_END diff --git a/rocks/include/userver/storages/rocks/component.hpp b/rocks/include/userver/storages/rocks/component.hpp index c0cff2f3412e..383c08099cb6 100644 --- a/rocks/include/userver/storages/rocks/component.hpp +++ b/rocks/include/userver/storages/rocks/component.hpp @@ -1,45 +1,40 @@ #pragma once /// @file userver/storages/rocks/component.hpp -/// @brief @copybrief rocks::Rocks +/// @brief @copybrief components::Rocks +#include +#include #include #include #include -#include -#include USERVER_NAMESPACE_BEGIN -namespace storages::rocks { +namespace components { -// clang-format off - -/// @ingroup userver_components -/// -/// @brief RocksDB client component. -/// ## Static options: -/// Name | Description | Default value -/// ---------------------------------- | ------------------------------------------------ | --------------- -/// task-processor | name of the task processor to run the blocking file operations | - -/// db-path | path to database file | - - -// clang-format on - -class Component : public components::ComponentBase { +/** + * @brief Component for configuring and managing RocksDB. + */ +class Rocks final : public components::ComponentBase { public: - Component(const components::ComponentConfig&, const components::ComponentContext&); - - ~Component() = default; + static constexpr std::string_view kName = "rocks"; + static yaml_config::Schema GetStaticConfigSchema(); - storages::rocks::ClientPtr MakeClient(); + /** + * @brief Constructor of the Rocks class. + */ + Rocks(const components::ComponentConfig&, const components::ComponentContext&); - static yaml_config::Schema GetStaticConfigSchema(); + /** + * @brief Return a pointer to the database instance. + */ + [[nodiscard]] const storages::rocks::DbPtr& GetDb() const; private: - storages::rocks::ClientPtr client_ptr_; + storages::rocks::DbPtr db_ptr_; }; -} // namespace storages::rocks +} // namespace components USERVER_NAMESPACE_END diff --git a/rocks/include/userver/storages/rocks/db.hpp b/rocks/include/userver/storages/rocks/db.hpp new file mode 100644 index 000000000000..a680ba5ad7bf --- /dev/null +++ b/rocks/include/userver/storages/rocks/db.hpp @@ -0,0 +1,59 @@ +#pragma once + +/// @file userver/storages/rocks/db.hpp +/// @brief @copybrief storages::rocks::Db + +#include +#include +#include +#include +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks::detail { +class DbImpl; +} // namespace storages::rocks::detail + +namespace storages::rocks { + +class WriteBatch; + +struct DbOptions final { + std::optional compression; + std::optional compression_level; + std::optional bottommost_compression; + std::optional bottommost_compression_level; + std::optional use_direct_reads; + std::optional use_direct_io_for_flush_and_compaction; +}; + +class Db final { +public: + Db(const std::string& db_path, int max_background_jobs, const std::vector& column_families, + const DbOptions& db_options, engine::TaskProcessor& task_processor); + + [[nodiscard]] ColumnFamilyHandle GetColumnFamily(const std::string& name) const; + + void Put(std::string_view key, std::string_view value); + void Put(ColumnFamilyHandle column_family, std::string_view key, std::string_view value); + + void Delete(std::string_view key); + void Delete(ColumnFamilyHandle column_family, std::string_view key); + + void Write(WriteBatch& write_batch); + + [[nodiscard]] std::optional Get(std::string_view key) const; + [[nodiscard]] std::optional Get(ColumnFamilyHandle column_family, std::string_view key) const; + + [[nodiscard]] Snapshot GetSnapshot() const; + +private: + std::shared_ptr db_impl_; +}; + +} // namespace storages::rocks + +USERVER_NAMESPACE_END diff --git a/rocks/include/userver/storages/rocks/client_fwd.hpp b/rocks/include/userver/storages/rocks/db_fwd.hpp similarity index 70% rename from rocks/include/userver/storages/rocks/client_fwd.hpp rename to rocks/include/userver/storages/rocks/db_fwd.hpp index ecb360882890..ec0b4ea0d44f 100644 --- a/rocks/include/userver/storages/rocks/client_fwd.hpp +++ b/rocks/include/userver/storages/rocks/db_fwd.hpp @@ -5,10 +5,8 @@ USERVER_NAMESPACE_BEGIN namespace storages::rocks { - -class Client; -using ClientPtr = std::shared_ptr; - +class Db; +using DbPtr = std::shared_ptr; } // namespace storages::rocks USERVER_NAMESPACE_END diff --git a/rocks/include/userver/storages/rocks/detail/db_impl.hpp b/rocks/include/userver/storages/rocks/detail/db_impl.hpp new file mode 100644 index 000000000000..3172469c95f3 --- /dev/null +++ b/rocks/include/userver/storages/rocks/detail/db_impl.hpp @@ -0,0 +1,67 @@ +#pragma once + +/// @file userver/storages/rocks/detail/db_impl.hpp +/// @brief @copybrief storages::rocks::detail::DbImpl + +#include +#include +#include +#include +#include +#include +#include + +namespace rocksdb { +class Options; +class WriteOptions; +class WriteBatch; +class ReadOptions; +class Iterator; +class Snapshot; +class DB; +class ColumnFamilyHandle; +} // namespace rocksdb + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks::detail { + +class DbImpl final { +public: + DbImpl(const rocksdb::Options& options, const std::string& db, const std::vector& column_families, + engine::TaskProcessor& task_processor); + ~DbImpl(); + + [[nodiscard]] rocksdb::ColumnFamilyHandle* GetColumnFamily(const std::string& name) const; + + void Put(const rocksdb::WriteOptions& options, std::string_view key, std::string_view value); + void Put(const rocksdb::WriteOptions& options, rocksdb::ColumnFamilyHandle* column_family, std::string_view key, + std::string_view value); + + void Delete(const rocksdb::WriteOptions& options, std::string_view key); + void Delete(const rocksdb::WriteOptions& options, rocksdb::ColumnFamilyHandle* column_family, std::string_view key); + + void Write(const rocksdb::WriteOptions& options, rocksdb::WriteBatch& write_batch); + + [[nodiscard]] std::optional Get(const rocksdb::ReadOptions& options, std::string_view key) const; + [[nodiscard]] std::optional Get(const rocksdb::ReadOptions& options, + rocksdb::ColumnFamilyHandle* column_family, std::string_view key) const; + + [[nodiscard]] std::unique_ptr NewIterator(const rocksdb::ReadOptions& options) const; + [[nodiscard]] std::unique_ptr NewIterator(const rocksdb::ReadOptions& options, + rocksdb::ColumnFamilyHandle* column_family) const; + + [[nodiscard]] const rocksdb::Snapshot* GetSnapshot() const; + void ReleaseSnapshot(const rocksdb::Snapshot* snapshot) const; + + [[nodiscard]] engine::TaskProcessor& GetTaskProcessor(); + +private: + std::unique_ptr db_; + engine::TaskProcessor& task_processor_; + std::unordered_map column_family_handles_; +}; + +} // namespace storages::rocks::detail + +USERVER_NAMESPACE_END diff --git a/rocks/include/userver/storages/rocks/detail/iterator_impl.hpp b/rocks/include/userver/storages/rocks/detail/iterator_impl.hpp new file mode 100644 index 000000000000..cb9e6e16d59a --- /dev/null +++ b/rocks/include/userver/storages/rocks/detail/iterator_impl.hpp @@ -0,0 +1,49 @@ +#pragma once + +/// @file userver/storages/rocks/detail/iterator_impl.hpp +/// @brief @copybrief storages::rocks::detail::IteratorImpl + +#include +#include +#include + +namespace rocksdb { +class Iterator; +class Snapshot; +} // namespace rocksdb + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks::detail { + +class DbImpl; + +class IteratorImpl { +public: + IteratorImpl(const std::shared_ptr& db, const std::shared_ptr& snapshot, + std::unique_ptr iterator) noexcept; + virtual ~IteratorImpl(); + + IteratorImpl(IteratorImpl&&) noexcept; + IteratorImpl& operator=(IteratorImpl&&) noexcept; + + [[nodiscard]] bool Valid() const noexcept; + void SeekToFirst(); + void SeekToLast(); + void Seek(std::string_view slice); + void SeekForPrev(std::string_view slice); + void Next(); + void Prev(); + + [[nodiscard]] std::string Key() const; + [[nodiscard]] std::string Value() const; + +private: + std::shared_ptr db_impl_; + std::shared_ptr snapshot_; + std::unique_ptr iterator_; +}; + +} // namespace storages::rocks::detail + +USERVER_NAMESPACE_END diff --git a/rocks/include/userver/storages/rocks/detail/snapshot_impl.hpp b/rocks/include/userver/storages/rocks/detail/snapshot_impl.hpp new file mode 100644 index 000000000000..4d61116ca251 --- /dev/null +++ b/rocks/include/userver/storages/rocks/detail/snapshot_impl.hpp @@ -0,0 +1,44 @@ +#pragma once + +/// @file userver/storages/rocks/detail/snapshot_impl.hpp +/// @brief @copybrief storages::rocks::detail::SnapshotImpl + +#include +#include +#include +#include +#include +#include + +namespace rocksdb { +class Snapshot; +class ReadOptions; +class ColumnFamilyHandle; +} // namespace rocksdb + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks::detail { + +class DbImpl; + +class SnapshotImpl final { +public: + SnapshotImpl(const std::shared_ptr& db, const rocksdb::Snapshot* snapshot); + + [[nodiscard]] std::optional Get(rocksdb::ReadOptions& options, std::string_view key) const; + [[nodiscard]] std::optional Get(rocksdb::ReadOptions& options, + rocksdb::ColumnFamilyHandle* column_family, std::string_view key) const; + + [[nodiscard]] IteratorImpl NewIterator(rocksdb::ReadOptions& options) const; + [[nodiscard]] IteratorImpl NewIterator(rocksdb::ReadOptions& options, + rocksdb::ColumnFamilyHandle* column_family) const; + +private: + std::shared_ptr db_impl_; + std::shared_ptr snapshot_; +}; + +} // namespace storages::rocks::detail + +USERVER_NAMESPACE_END diff --git a/rocks/include/userver/storages/rocks/exception.hpp b/rocks/include/userver/storages/rocks/exception.hpp index b679c5feffb3..18bbefaf2656 100644 --- a/rocks/include/userver/storages/rocks/exception.hpp +++ b/rocks/include/userver/storages/rocks/exception.hpp @@ -6,21 +6,21 @@ #include #include +namespace rocksdb { +class Status; +} // namespace rocksdb + USERVER_NAMESPACE_BEGIN namespace storages::rocks { -/// Generic rocks-related exception -class Exception : public std::runtime_error { -public: - using std::runtime_error::runtime_error; -}; +namespace detail { +void CheckStatus(const rocksdb::Status& status, std::string_view description); +} // namespace detail -/// Request execution failed -class RequestFailedException : public Exception { +class StatusNokException : public std::runtime_error { public: - RequestFailedException(std::string_view request_description, std::string_view status); - + StatusNokException(std::string_view description, std::string_view status); std::string_view GetStatusString() const; private: diff --git a/rocks/include/userver/storages/rocks/iterator.hpp b/rocks/include/userver/storages/rocks/iterator.hpp new file mode 100644 index 000000000000..549cd2c29487 --- /dev/null +++ b/rocks/include/userver/storages/rocks/iterator.hpp @@ -0,0 +1,150 @@ +#pragma once + +/// @file userver/storages/rocks/iterator.hpp +/// @brief @copybrief storages::rocks::Iterator + +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks { + +enum class IteratorDirection : std::int8_t { kForward, kBackward }; + +class KeyValueView final { +public: + [[nodiscard]] auto Key() const { return iterator_impl_->Key(); } + [[nodiscard]] auto Value() const { return iterator_impl_->Value(); } + +private: + template + friend class Iterator; + + KeyValueView(const detail::IteratorImpl* iterator) noexcept : iterator_impl_{iterator} {} + void Reset(const detail::IteratorImpl* iterator) noexcept { iterator_impl_ = iterator; } + + const detail::IteratorImpl* iterator_impl_; +}; + +template +class IteratorSentinel {}; + +template +class Iterator final { +public: + using value_type = KeyValueView; + using reference = value_type&; + using pointer = value_type*; + + Iterator(detail::IteratorImpl&& iterator) noexcept + : iterator_impl_{std::move(iterator)}, kv_view_{&iterator_impl_} {} + + Iterator(Iterator&& other) noexcept + : iterator_impl_{std::move(other.iterator_impl_)}, kv_view_{&iterator_impl_} {} + + Iterator& operator=(Iterator&& other) noexcept { + iterator_impl_ = std::move(other.iterator_impl_); + kv_view_.Reset(&iterator_impl_); + return *this; + } + + [[nodiscard]] bool Valid() const noexcept { + return iterator_impl_.Valid(); + } + + bool operator==(const IteratorSentinel&) const { + return !iterator_impl_.Valid(); + } + + bool operator!=(const IteratorSentinel& sentinel) const { + return !(*this == sentinel); + } + + reference operator*() const { + return kv_view_; + } + + pointer operator->() const { + return &kv_view_; + } + + Iterator& operator++() { + if constexpr (D == IteratorDirection::kForward) { + iterator_impl_.Next(); + } else { + iterator_impl_.Prev(); + } + return *this; + } + + Iterator& operator--() { + if constexpr (D == IteratorDirection::kForward) { + iterator_impl_.Prev(); + } else { + iterator_impl_.Next(); + } + return *this; + } + + void SeekToFirst() & { + if constexpr (D == IteratorDirection::kForward) { + iterator_impl_.SeekToFirst(); + } else { + iterator_impl_.SeekToLast(); + } + } + + [[nodiscard]] auto SeekToFirst() && { + SeekToFirst(); + return std::move(*this); + } + + void SeekToLast() & { + if constexpr (D == IteratorDirection::kForward) { + iterator_impl_.SeekToLast(); + } else { + iterator_impl_.SeekToFirst(); + } + } + + [[nodiscard]] auto SeekToLast() && { + SeekToLast(); + return std::move(*this); + } + + void Seek(std::string_view key) & { + if constexpr (D == IteratorDirection::kForward) { + iterator_impl_.Seek(key); + } else { + iterator_impl_.SeekForPrev(key); + } + } + + [[nodiscard]] auto Seek(std::string_view key) && { + Seek(key); + return std::move(*this); + } + + void SeekForPrev(std::string_view key) & { + if constexpr (D == IteratorDirection::kForward) { + iterator_impl_.SeekForPrev(key); + } else { + iterator_impl_.Seek(key); + } + } + + [[nodiscard]] auto SeekForPrev(std::string_view key) && { + SeekForPrev(key); + return std::move(*this); + } + +private: + detail::IteratorImpl iterator_impl_; + mutable value_type kv_view_; +}; + +} // namespace storages::rocks + +USERVER_NAMESPACE_END diff --git a/rocks/include/userver/storages/rocks/snapshot.hpp b/rocks/include/userver/storages/rocks/snapshot.hpp new file mode 100644 index 000000000000..691c4683a954 --- /dev/null +++ b/rocks/include/userver/storages/rocks/snapshot.hpp @@ -0,0 +1,132 @@ +#pragma once + +/// @file userver/storages/rocks/snapshot.hpp +/// @brief @copybrief storages::rocks::Snapshot + +#include +#include +#include +#include +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks { + +enum class SnapshotRangeLayout : std::uint8_t { kDefault = 0, kColumnFamily }; + +namespace detail { + +template +class SnapshotRangeImpl; + +template +class SnapshotRangeBase { +public: + template + auto NewIterator() const { + return static_cast(this)->template NewIterator(); + } + +private: + friend T; + SnapshotRangeBase(const detail::SnapshotImpl& snapshot) noexcept : snapshot_impl_{snapshot} {} + detail::SnapshotImpl snapshot_impl_; +}; + +template <> +class SnapshotRangeImpl + : public SnapshotRangeBase> { +public: + SnapshotRangeImpl(const detail::SnapshotImpl& snapshot) noexcept : SnapshotRangeBase{snapshot} {} + + template + Iterator NewIterator() const; +}; + +template <> +class SnapshotRangeImpl + : public SnapshotRangeBase> { +public: + SnapshotRangeImpl(const detail::SnapshotImpl& snapshot, ColumnFamilyHandle column_family) noexcept + : SnapshotRangeBase{snapshot}, column_family_{column_family} {} + + template + Iterator NewIterator() const; + +private: + ColumnFamilyHandle column_family_; +}; + +} // namespace detail + +class Db; + +template +class SnapshotRange final : public detail::SnapshotRangeImpl { +public: + using detail::SnapshotRangeImpl::SnapshotRangeImpl; + + [[nodiscard]] auto begin() const { + return this->template NewIterator().SeekToFirst(); + } + + [[nodiscard]] const Iterator cbegin() const { + return begin(); + } + + [[nodiscard]] auto end() const { + return IteratorSentinel{}; + } + + [[nodiscard]] const IteratorSentinel cend() const { + return end(); + } + + [[nodiscard]] auto rbegin() const { + return this->template NewIterator().SeekToFirst(); + } + + [[nodiscard]] const Iterator crbegin() const { + return rbegin(); + } + + [[nodiscard]] auto rend() const { + return IteratorSentinel{}; + } + + [[nodiscard]] const IteratorSentinel crend() const { + return rend(); + } +}; + +class Snapshot final { +public: + Snapshot(detail::SnapshotImpl&& snapshot) noexcept : snapshot_impl_{std::move(snapshot)} {} + + [[nodiscard]] auto AsRange() const { + return SnapshotRange{snapshot_impl_}; + } + + [[nodiscard]] auto AsRange(ColumnFamilyHandle column_family) const { + return SnapshotRange{snapshot_impl_, column_family}; + } + + [[nodiscard]] std::optional Get(std::string_view key) const; + [[nodiscard]] std::optional Get(ColumnFamilyHandle column_family, std::string_view key) const; + + template + [[nodiscard]] Iterator NewIterator() const; + + template + [[nodiscard]] Iterator NewIterator(ColumnFamilyHandle column_family) const; + +private: + detail::SnapshotImpl snapshot_impl_; +}; + +} // namespace storages::rocks + +USERVER_NAMESPACE_END diff --git a/rocks/include/userver/storages/rocks/write_batch.hpp b/rocks/include/userver/storages/rocks/write_batch.hpp new file mode 100644 index 000000000000..293e4ad3c6bc --- /dev/null +++ b/rocks/include/userver/storages/rocks/write_batch.hpp @@ -0,0 +1,43 @@ +#pragma once + +/// @file userver/storages/rocks/write_batch.hpp +/// @brief @copybrief storages::rocks::WriteBatch + +#include +#include +#include +#include + +namespace rocksdb { +class WriteBatch; +} // namespace rocksdb + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks { + +class Db; + +class WriteBatch final { +public: + WriteBatch(std::size_t reserved_bytes = 0, std::size_t max_bytes = 0); + ~WriteBatch(); + + void Put(std::string_view key, std::string_view value); + void Put(ColumnFamilyHandle column_family, std::string_view key, std::string_view value); + + void Delete(std::string_view key); + void Delete(ColumnFamilyHandle column_family, std::string_view key); + +private: + friend class Db; + rocksdb::WriteBatch& GetPimpl(); + + static constexpr std::size_t kImplSize = 160; + static constexpr std::size_t kImplAlign = 16; + utils::FastPimpl write_batch_impl_; +}; + +} // namespace storages::rocks + +USERVER_NAMESPACE_END diff --git a/rocks/src/storages/rocks/client.cpp b/rocks/src/storages/rocks/client.cpp deleted file mode 100644 index 74798a5d60db..000000000000 --- a/rocks/src/storages/rocks/client.cpp +++ /dev/null @@ -1,59 +0,0 @@ -#include - -#include - -#include -#include - -USERVER_NAMESPACE_BEGIN - -namespace storages::rocks { - -Client::Client(const std::string& db_path, engine::TaskProcessor& blocking_task_processor) - : blocking_task_processor_(blocking_task_processor) { - rocksdb::Options options; - options.create_if_missing = true; - - rocksdb::DB* db{}; - const rocksdb::Status status = rocksdb::DB::Open(options, db_path, &db); - db_.reset(db); - CheckStatus(status, "Create client"); -} - -void Client::Put(std::string_view key, std::string_view value) { - engine::AsyncNoSpan(blocking_task_processor_, [this, key, value] { - const rocksdb::Status status = db_->Put(rocksdb::WriteOptions(), key, value); - CheckStatus(status, "Put"); - }).Get(); -} - -std::string Client::Get(std::string_view key) { - return engine::AsyncNoSpan( - blocking_task_processor_, - [this, key] { - std::string res; - const rocksdb::Status status = db_->Get(rocksdb::ReadOptions(), key, &res); - CheckStatus(status, "Get"); - return res; - } - ).Get(); -} - -void Client::Delete(std::string_view key) { - return engine::AsyncNoSpan( - blocking_task_processor_, - [this, key] { - const rocksdb::Status status = db_->Delete(rocksdb::WriteOptions(), key); - CheckStatus(status, "Delete"); - } - ).Get(); -} - -void Client::CheckStatus(rocksdb::Status status, std::string_view method_name) { - if (!status.ok() && !status.IsNotFound()) { - throw USERVER_NAMESPACE::storages::rocks::RequestFailedException(method_name, status.ToString()); - } -} -} // namespace storages::rocks - -USERVER_NAMESPACE_END diff --git a/rocks/src/storages/rocks/client_test.cpp b/rocks/src/storages/rocks/client_test.cpp deleted file mode 100644 index 2dc89d1a51d2..000000000000 --- a/rocks/src/storages/rocks/client_test.cpp +++ /dev/null @@ -1,30 +0,0 @@ -#include -#include -#include - -USERVER_NAMESPACE_BEGIN - -namespace { - -// TAXICOMMON-10374 -UTEST(Rocks, DISABLED_CheckCRUD) { - storages::rocks::Client client{"/tmp/rocksdb_simple_example", engine::current_task::GetTaskProcessor()}; - - const std::string key = "key"; - - std::string res = client.Get(key); - EXPECT_EQ("", res); - - const std::string value = "value"; - client.Put(key, value); - res = client.Get(key); - EXPECT_EQ(value, res); - - client.Delete(key); - res = client.Get(key); - EXPECT_EQ("", res); -} - -} // namespace - -USERVER_NAMESPACE_END diff --git a/rocks/src/storages/rocks/component.cpp b/rocks/src/storages/rocks/component.cpp index 71bb35eab1b5..93fbf34ca373 100644 --- a/rocks/src/storages/rocks/component.cpp +++ b/rocks/src/storages/rocks/component.cpp @@ -1,37 +1,72 @@ #include -#include +#include -#include +#include #include USERVER_NAMESPACE_BEGIN -namespace storages::rocks { +namespace components { -Component::Component(const components::ComponentConfig& config, const components::ComponentContext& context) - : ComponentBase(config, context), - client_ptr_(std::make_shared( - config["db-path"].As(), - context.GetTaskProcessor(config["task-processor"].As()) - )) {} - -storages::rocks::ClientPtr Component::MakeClient() { return client_ptr_; } - -yaml_config::Schema Component::GetStaticConfigSchema() { +yaml_config::Schema Rocks::GetStaticConfigSchema() { return yaml_config::MergeSchemas(R"( type: object -description: Rocks client component +description: RocksDB component additionalProperties: false properties: - task-processor: + db_path: + type: string + description: path to the database file + max_background_jobs: + type: integer + minimum: 1 + description: maximum number of concurrent background jobs, including flushes and compactions + column_families: + type: array + items: + type: string + description: name of column family + description: list of initial column families + compression: type: string - description: name of the task processor to run the blocking file operations - db-path: + description: compress blocks using this compression algorithm + compression_level: + type: integer + description: compression level applicable to zstd and lz4 + bottommost_compression: type: string - description: path to database file + description: compress bottommost blocks using this compression algorithm + bottommost_compression_level: + type: integer + description: compression level applicable to zstd and lz4 + use_direct_reads: + type: boolean + description: enable direct I/O mode for read/write + use_direct_io_for_flush_and_compaction: + type: boolean + description: use O_DIRECT for writes in background flush and compactions )"); } -} // namespace storages::rocks + +Rocks::Rocks(const components::ComponentConfig& config, const components::ComponentContext& context) + : ComponentBase{config, context}, db_ptr_{std::make_shared( + config["db_path"].As(), + config["max_background_jobs"].As(), + config["column_families"].As>(), + storages::rocks::DbOptions{ + config["compression"].As>(), + config["compression_level"].As>(), + config["bottommost_compression"].As>(), + config["bottommost_compression_level"].As>(), + config["use_direct_reads"].As>(), + config["use_direct_io_for_flush_and_compaction"].As>() + }, + context.GetTaskProcessor("rocks-task-processor") + )} {} + +const storages::rocks::DbPtr& Rocks::GetDb() const { return db_ptr_; } + +} // namespace components USERVER_NAMESPACE_END diff --git a/rocks/src/storages/rocks/db.cpp b/rocks/src/storages/rocks/db.cpp new file mode 100644 index 000000000000..e2a1a0e45ada --- /dev/null +++ b/rocks/src/storages/rocks/db.cpp @@ -0,0 +1,101 @@ +#include +#include +#include +#include + +#include + +#include +#include +#include +#include + +namespace { + +constexpr struct { std::string_view name; rocksdb::CompressionType type; } kCompressionTable[] = { + {"no_compression", rocksdb::kNoCompression}, + {"lz4", rocksdb::kLZ4Compression}, + {"zstd", rocksdb::kZSTD} +}; + +rocksdb::CompressionType ParseCompressionType(std::string_view compression) { + for (const auto& entry : kCompressionTable) { + if (entry.name == compression) { + return entry.type; + } + } + throw std::runtime_error(fmt::format("Invalid compression type: {}", compression)); +} + +} // namespace + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks { + +Db::Db(const std::string& db_path, int max_background_jobs, const std::vector& column_families, + const DbOptions& db_options, engine::TaskProcessor& task_processor) { + rocksdb::Options options; + options.create_if_missing = true; + options.create_missing_column_families = true; + options.max_background_jobs = max_background_jobs; + if (db_options.compression) options.compression = ParseCompressionType(*db_options.compression); + if (db_options.compression_level) options.compression_opts.level = *db_options.compression_level; + if (db_options.bottommost_compression) { + options.bottommost_compression = ParseCompressionType(*db_options.bottommost_compression); + } + if (db_options.bottommost_compression_level) { + options.bottommost_compression_opts.level = *db_options.bottommost_compression_level; + } + if (db_options.use_direct_reads) { + options.use_direct_reads = *db_options.use_direct_reads; + } + if (db_options.use_direct_io_for_flush_and_compaction) { + options.use_direct_io_for_flush_and_compaction = *db_options.use_direct_io_for_flush_and_compaction; + } + db_impl_ = std::make_shared(options, db_path, column_families, task_processor); +} + +ColumnFamilyHandle Db::GetColumnFamily(const std::string& name) const { + return db_impl_->GetColumnFamily(name); +} + +void Db::Put(std::string_view key, std::string_view value) { + db_impl_->Put(rocksdb::WriteOptions{}, key, value); +} + +void Db::Put(ColumnFamilyHandle column_family, std::string_view key, std::string_view value) { + db_impl_->Put(rocksdb::WriteOptions{}, column_family, key, value); +} + +void Db::Delete(std::string_view key) { + db_impl_->Delete(rocksdb::WriteOptions{}, key); +} + +void Db::Delete(ColumnFamilyHandle column_family, std::string_view key) { + db_impl_->Delete(rocksdb::WriteOptions{}, column_family, key); +} + +void Db::Write(WriteBatch& write_batch) { + db_impl_->Write(rocksdb::WriteOptions{}, write_batch.GetPimpl()); +} + +std::optional Db::Get(std::string_view key) const { + return db_impl_->Get(rocksdb::ReadOptions{}, key); +} + +std::optional Db::Get(ColumnFamilyHandle column_family, std::string_view key) const { + return db_impl_->Get(rocksdb::ReadOptions{}, column_family, key); +} + +Snapshot Db::GetSnapshot() const { + return Snapshot{ + detail::SnapshotImpl{ + db_impl_, db_impl_->GetSnapshot() + } + }; +} + +} // namespace storages::rocks + +USERVER_NAMESPACE_END diff --git a/rocks/src/storages/rocks/db_test.cpp b/rocks/src/storages/rocks/db_test.cpp new file mode 100644 index 000000000000..9c560a0d5b14 --- /dev/null +++ b/rocks/src/storages/rocks/db_test.cpp @@ -0,0 +1,30 @@ +#include + +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace { + +// TAXICOMMON-10374 +UTEST(Rocks, DISABLED_CheckCRUD) { + storages::rocks::Db db{"/tmp/rocksdb_simple_example", 4, engine::current_task::GetTaskProcessor()}; + + const std::string key = "key"; + std::optional result = db.Get(key); + EXPECT_EQ(false, result.has_value()); + + db.Put(key, "value"); + result = db.Get(key); + EXPECT_EQ("value", result.value_or("")); + + db.Delete(key); + result = db.Get(key); + EXPECT_EQ("", result.value_or("")); +} + +} // namespace + +USERVER_NAMESPACE_END diff --git a/rocks/src/storages/rocks/detail/db_impl.cpp b/rocks/src/storages/rocks/detail/db_impl.cpp new file mode 100644 index 000000000000..622bcd53acce --- /dev/null +++ b/rocks/src/storages/rocks/detail/db_impl.cpp @@ -0,0 +1,137 @@ +#include + +#include + +#include +#include +#include + +#include +#include + +#include + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks::detail { + +DbImpl::DbImpl(const rocksdb::Options& options, const std::string& db, + const std::vector& column_families, engine::TaskProcessor& task_processor) + : task_processor_{task_processor}, column_family_handles_{} { + // Prepare column family descriptors (they are useless after opening the database). + std::vector descriptors{}; + for (auto&& name : column_families) { + descriptors.push_back( + rocksdb::ColumnFamilyDescriptor{name, rocksdb::ColumnFamilyOptions{}} + ); + } + rocksdb::DB* dbptr{}; + std::vector handles{}; + auto status = rocksdb::DB::Open(options, db, descriptors, &handles, &dbptr); + CheckStatus(status, "Open"); + // Transfer descriptors from a simple vector to a hash table. + for (auto&& handle : handles) { + column_family_handles_[handle->GetName()] = handle; + } + // Complete the initialization by setting the smart pointer with current database instance. + db_.reset(dbptr); +} + +// https://github.com/facebook/rocksdb/wiki/Basic-Operations#closing-a-database +DbImpl::~DbImpl() { + auto task = + engine::AsyncNoSpan(task_processor_, [this]() { + rocksdb::Status status; + if (status = db_->SyncWAL(); !status.ok()) { + LOG_ERROR() << "Error synchronizing WAL: " << status.ToString(); + } + for (const auto& [name, handle] : this->column_family_handles_) { + if (status = db_->DestroyColumnFamilyHandle(handle); !status.ok()) { + LOG_ERROR() << "Error destroying column family handle (" << name << "): " << status.ToString(); + } + } + if (status = db_->Close(); !status.ok()) { + LOG_ERROR() << "Error closing: " << status.ToString(); + } + }); + if (task.WaitNothrow() == false) { + LOG_ERROR() << "Failed to close RocksDB gracefully"; + } +} + +rocksdb::ColumnFamilyHandle* DbImpl::GetColumnFamily(const std::string& name) const { + if (auto it = column_family_handles_.find(name); it != column_family_handles_.end()) { + return it->second; + } + throw std::runtime_error("No such column family is configured"); +} + +void DbImpl::Put(const rocksdb::WriteOptions& options, std::string_view key, std::string_view value) { + auto task = engine::AsyncNoSpan(task_processor_, [&]() { return db_->Put(options, key, value); }); + CheckStatus(task.Get(), "Put"); +} + +void DbImpl::Put(const rocksdb::WriteOptions& options, rocksdb::ColumnFamilyHandle* column_family, + std::string_view key, std::string_view value) { + auto task = engine::AsyncNoSpan(task_processor_, [&]() { return db_->Put(options, column_family, key, value); }); + CheckStatus(task.Get(), "Put"); +} + +void DbImpl::Delete(const rocksdb::WriteOptions& options, std::string_view key) { + auto task = engine::AsyncNoSpan(task_processor_, [&]() { return db_->Delete(options, key); }); + CheckStatus(task.Get(), "Delete"); +} + +void DbImpl::Delete(const rocksdb::WriteOptions& options, rocksdb::ColumnFamilyHandle* column_family, + std::string_view key) { + auto task = engine::AsyncNoSpan(task_processor_, [&]() { return db_->Delete(options, column_family, key); }); + CheckStatus(task.Get(), "Delete"); +} + +void DbImpl::Write(const rocksdb::WriteOptions& options, rocksdb::WriteBatch& write_batch) { + auto task = engine::AsyncNoSpan(task_processor_, [&]() { return db_->Write(options, &write_batch); }); + CheckStatus(task.Get(), "Write"); +} + +std::optional DbImpl::Get(const rocksdb::ReadOptions& options, std::string_view key) const { + std::string value; + auto status = engine::AsyncNoSpan(task_processor_, [&]() { return db_->Get(options, key, &value); }).Get(); + if (status.IsNotFound()) return {}; + CheckStatus(status, "Get"); + return value; +} + +std::optional DbImpl::Get(const rocksdb::ReadOptions& options, rocksdb::ColumnFamilyHandle* column_family, + std::string_view key) const { + std::string value; + auto status = engine::AsyncNoSpan(task_processor_, + [&]() { return db_->Get(options, column_family, key, &value); }).Get(); + if (status.IsNotFound()) return {}; + CheckStatus(status, "Get"); + return value; +} + +std::unique_ptr DbImpl::NewIterator(const rocksdb::ReadOptions& options) const { + return std::unique_ptr(db_->NewIterator(options)); +} + +std::unique_ptr DbImpl::NewIterator(const rocksdb::ReadOptions& options, + rocksdb::ColumnFamilyHandle* column_family) const { + return std::unique_ptr(db_->NewIterator(options, column_family)); +} + +const rocksdb::Snapshot* DbImpl::GetSnapshot() const { + return db_->GetSnapshot(); +} + +void DbImpl::ReleaseSnapshot(const rocksdb::Snapshot* snapshot) const { + db_->ReleaseSnapshot(snapshot); +} + +engine::TaskProcessor& DbImpl::GetTaskProcessor() { + return task_processor_; +} + +} // namespace storages::rocks::detail + +USERVER_NAMESPACE_END diff --git a/rocks/src/storages/rocks/detail/iterator_impl.cpp b/rocks/src/storages/rocks/detail/iterator_impl.cpp new file mode 100644 index 000000000000..b879e9b98542 --- /dev/null +++ b/rocks/src/storages/rocks/detail/iterator_impl.cpp @@ -0,0 +1,91 @@ +#include +#include +#include +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks::detail { + +IteratorImpl::IteratorImpl(const std::shared_ptr& db, const std::shared_ptr& snapshot, + std::unique_ptr iterator) noexcept : + db_impl_{db}, snapshot_{snapshot}, iterator_{std::move(iterator)} {} + +IteratorImpl::~IteratorImpl() = default; + +IteratorImpl::IteratorImpl(IteratorImpl&&) noexcept = default; +IteratorImpl& IteratorImpl::operator=(IteratorImpl&&) noexcept = default; + +bool IteratorImpl::Valid() const noexcept { + return iterator_->Valid(); +} + +void IteratorImpl::SeekToFirst() { + auto task = + engine::AsyncNoSpan(db_impl_->GetTaskProcessor(), [this]() { + iterator_->SeekToFirst(); + return iterator_->status(); + }); + CheckStatus(task.Get(), "SeekToFirst"); +} + +void IteratorImpl::SeekToLast() { + auto task = + engine::AsyncNoSpan(db_impl_->GetTaskProcessor(), [this]() { + iterator_->SeekToLast(); + return iterator_->status(); + }); + CheckStatus(task.Get(), "SeekToLast"); +} + +void IteratorImpl::Seek(std::string_view slice) { + auto task = + engine::AsyncNoSpan(db_impl_->GetTaskProcessor(), [this, slice]() { + iterator_->Seek(slice); + return iterator_->status(); + }); + CheckStatus(task.Get(), "Seek"); +} + +void IteratorImpl::SeekForPrev(std::string_view slice) { + auto task = + engine::AsyncNoSpan(db_impl_->GetTaskProcessor(), [this, slice]() { + iterator_->SeekForPrev(slice); + return iterator_->status(); + }); + CheckStatus(task.Get(), "SeekForPrev"); +} + +void IteratorImpl::Next() { + auto task = + engine::AsyncNoSpan(db_impl_->GetTaskProcessor(), [this]() { + iterator_->Next(); + return iterator_->status(); + }); + CheckStatus(task.Get(), "Next"); +} + +void IteratorImpl::Prev() { + auto task = + engine::AsyncNoSpan(db_impl_->GetTaskProcessor(), [this]() { + iterator_->Prev(); + return iterator_->status(); + }); + CheckStatus(task.Get(), "Prev"); +} + +std::string IteratorImpl::Key() const { + // TODO: Throw an exception on !Valid()? + return iterator_->key().ToString(); +} + +std::string IteratorImpl::Value() const { + // TODO: Throw an exception on !Valid()? + return iterator_->value().ToString(); +} + +} // namespace storages::rocks::detail + +USERVER_NAMESPACE_END diff --git a/rocks/src/storages/rocks/detail/snapshot_impl.cpp b/rocks/src/storages/rocks/detail/snapshot_impl.cpp new file mode 100644 index 000000000000..b117128fc6fe --- /dev/null +++ b/rocks/src/storages/rocks/detail/snapshot_impl.cpp @@ -0,0 +1,40 @@ +#include +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks::detail { + +SnapshotImpl::SnapshotImpl(const std::shared_ptr& db, const rocksdb::Snapshot* snapshot) + : db_impl_{db}, snapshot_(snapshot, [db_impl_ = db.get()](const auto* snapshot) { + // Be careful with the order of destruction. This is done to avoid a single atomic increment =) + db_impl_->ReleaseSnapshot(snapshot); + }) {} + +std::optional SnapshotImpl::Get(rocksdb::ReadOptions& options, std::string_view key) const { + options.snapshot = snapshot_.get(); + return db_impl_->Get(options, key); +} + +std::optional SnapshotImpl::Get(rocksdb::ReadOptions& options, rocksdb::ColumnFamilyHandle* column_family, + std::string_view key) const { + options.snapshot = snapshot_.get(); + return db_impl_->Get(options, column_family, key); +} + +IteratorImpl SnapshotImpl::NewIterator(rocksdb::ReadOptions& options) const { + options.snapshot = snapshot_.get(); + return detail::IteratorImpl{db_impl_, snapshot_, db_impl_->NewIterator(options)}; +} + +IteratorImpl SnapshotImpl::NewIterator(rocksdb::ReadOptions& options, + rocksdb::ColumnFamilyHandle* column_family) const { + options.snapshot = snapshot_.get(); + return detail::IteratorImpl{db_impl_, snapshot_, db_impl_->NewIterator(options, column_family)}; +} + +} // namespace storages::rocks::detail + +USERVER_NAMESPACE_END diff --git a/rocks/src/storages/rocks/exception.cpp b/rocks/src/storages/rocks/exception.cpp index 185cb71a43f2..e0f17fb20c97 100644 --- a/rocks/src/storages/rocks/exception.cpp +++ b/rocks/src/storages/rocks/exception.cpp @@ -1,15 +1,25 @@ #include - +#include #include USERVER_NAMESPACE_BEGIN namespace storages::rocks { -RequestFailedException::RequestFailedException(std::string_view request_description, std::string_view status) - : Exception(fmt::format("{} request failed with status '{}'", request_description, status)), status_(status) {} +namespace detail { + +void CheckStatus(const rocksdb::Status& status, std::string_view description) { + if (!status.ok()) { + throw USERVER_NAMESPACE::storages::rocks::StatusNokException(description, status.ToString()); + } +} + +} // namespace detail + +StatusNokException::StatusNokException(std::string_view description, std::string_view status) + : std::runtime_error(fmt::format("{} request failed with status '{}'", description, status)), status_(status) {} -std::string_view RequestFailedException::GetStatusString() const { return status_; } +std::string_view StatusNokException::GetStatusString() const { return status_; } } // namespace storages::rocks diff --git a/rocks/src/storages/rocks/snapshot.cpp b/rocks/src/storages/rocks/snapshot.cpp new file mode 100644 index 000000000000..991ed83c8dcd --- /dev/null +++ b/rocks/src/storages/rocks/snapshot.cpp @@ -0,0 +1,68 @@ +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks { + +namespace detail { + +template +Iterator detail::SnapshotRangeImpl::NewIterator() const { + rocksdb::ReadOptions options{}; + return {snapshot_impl_.NewIterator(options)}; +} + +template Iterator +detail::SnapshotRangeImpl::NewIterator() const; +template Iterator +detail::SnapshotRangeImpl::NewIterator() const; + +template +Iterator detail::SnapshotRangeImpl::NewIterator() const { + rocksdb::ReadOptions options{}; + return {snapshot_impl_.NewIterator(options, column_family_)}; +} + +template Iterator +detail::SnapshotRangeImpl::NewIterator() const; +template Iterator +detail::SnapshotRangeImpl::NewIterator() const; + +} // namespace detail + +std::optional Snapshot::Get(std::string_view key) const { + rocksdb::ReadOptions options{}; + return snapshot_impl_.Get(options, key); +} + +std::optional Snapshot::Get(ColumnFamilyHandle column_family, std::string_view key) const { + rocksdb::ReadOptions options{}; + return snapshot_impl_.Get(options, column_family, key); +} + +template +Iterator Snapshot::NewIterator() const { + rocksdb::ReadOptions options{}; + return {snapshot_impl_.NewIterator(options)}; +} + +template Iterator +Snapshot::NewIterator() const; +template Iterator +Snapshot::NewIterator() const; + +template +Iterator Snapshot::NewIterator(ColumnFamilyHandle column_family) const { + rocksdb::ReadOptions options{}; + return {snapshot_impl_.NewIterator(options, column_family)}; +} + +template Iterator +Snapshot::NewIterator(ColumnFamilyHandle column_family) const; +template Iterator +Snapshot::NewIterator(ColumnFamilyHandle column_family) const; + +} // namespace storages::rocks + +USERVER_NAMESPACE_END diff --git a/rocks/src/storages/rocks/write_batch.cpp b/rocks/src/storages/rocks/write_batch.cpp new file mode 100644 index 000000000000..93bb8b6186ac --- /dev/null +++ b/rocks/src/storages/rocks/write_batch.cpp @@ -0,0 +1,40 @@ +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace storages::rocks { + +WriteBatch::WriteBatch(std::size_t reserved_bytes, std::size_t max_bytes) + : write_batch_impl_{reserved_bytes, max_bytes} {} + +WriteBatch::~WriteBatch() = default; + +void WriteBatch::Put(std::string_view key, std::string_view value) { + auto status = write_batch_impl_->Put(key, value); + detail::CheckStatus(status, "Put"); +} + +void WriteBatch::Put(ColumnFamilyHandle column_family, std::string_view key, std::string_view value) { + auto status = write_batch_impl_->Put(column_family, key, value); + detail::CheckStatus(status, "Put"); +} + +void WriteBatch::Delete(std::string_view key) { + auto status = write_batch_impl_->Delete(key); + detail::CheckStatus(status, "Delete"); +} + +void WriteBatch::Delete(ColumnFamilyHandle column_family, std::string_view key) { + auto status = write_batch_impl_->Delete(column_family, key); + detail::CheckStatus(status, "Delete"); +} + +rocksdb::WriteBatch& WriteBatch::GetPimpl() { + return *write_batch_impl_; +} + +} // namespace storages::rocks + +USERVER_NAMESPACE_END