Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/import_generation.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
35
36
2 changes: 1 addition & 1 deletion .github/last_commit.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7f34dd9500921f4c8501e75e62488659cb276fb4
3b235ed1f2fc3977cfc6f99a74123c0097ef9795
6 changes: 3 additions & 3 deletions .github/scripts/copy_sources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ echo "Copying sources..."
cp -r $1/ydb/public/sdk/cpp/* $tmp_dir
echo "tmp_dir: $tmp_dir"

rm -r $tmp_dir/client
rm -r $tmp_dir/src/client/arrow
rm -r $tmp_dir/src/client/cms
rm -r $tmp_dir/src/client/config
Expand All @@ -23,8 +22,10 @@ rm -r $tmp_dir/include/ydb-cpp-sdk/client/draft
rm -r $tmp_dir/tests/unit/client/draft

mkdir -p $tmp_dir/src/api/client/yc_private
mkdir -p $tmp_dir/src/api/client/yc_private/accessservice
mkdir -p $tmp_dir/src/api/client/yc_public

cp -r $1/ydb/public/api/client/yc_private/accessservice/sensitive.proto $tmp_dir/src/api/client/yc_private/accessservice/sensitive.proto
cp -r $1/ydb/public/api/client/yc_private/iam $tmp_dir/src/api/client/yc_private
cp -r $1/ydb/public/api/client/yc_private/operation $tmp_dir/src/api/client/yc_private
cp -r $1/ydb/public/api/client/yc_public/common $tmp_dir/src/api/client/yc_public
Expand All @@ -33,7 +34,7 @@ cp -r $1/ydb/public/api/grpc $tmp_dir/src/api
cp -r $1/ydb/public/api/protos $tmp_dir/src/api

rm -r $tmp_dir/src/api/protos/out
rm $tmp_dir/include/ydb-cpp-sdk/type_switcher.h $tmp_dir/include/ydb-cpp-sdk/client/proto/private.h $tmp_dir/src/version.h
rm $tmp_dir/include/ydb-cpp-sdk/type_switcher.h $tmp_dir/src/version.h

cp -r $2/util $tmp_dir
cp -r $2/library $tmp_dir
Expand All @@ -57,7 +58,6 @@ cp $2/tests/slo_workloads/.dockerignore $tmp_dir/tests/slo_workloads
cp $2/tests/slo_workloads/Dockerfile $tmp_dir/tests/slo_workloads

cp $2/include/ydb-cpp-sdk/type_switcher.h $tmp_dir/include/ydb-cpp-sdk/type_switcher.h
cp $2/include/ydb-cpp-sdk/client/proto/private.h $tmp_dir/include/ydb-cpp-sdk/client/proto/private.h
cp $2/src/version.h $tmp_dir/src/version.h

cd $2
Expand Down
12 changes: 12 additions & 0 deletions include/ydb-cpp-sdk/client/discovery/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,21 @@ class TWhoAmIResult : public TStatus {
TWhoAmIResult(TStatus&& status, const Ydb::Discovery::WhoAmIResult& proto);
const std::string& GetUserName() const;
const std::vector<std::string>& GetGroups() const;
bool IsAdministrationAllowed() const;
bool IsMonitoringAllowed() const;
bool IsViewerAllowed() const;
bool IsDatabaseAllowed() const;
bool IsRegisterNodeAllowed() const;
bool IsBootstrapAllowed() const;
private:
std::string UserName_;
std::vector<std::string> Groups_;
bool IsAdministrationAllowed_ = false;
bool IsMonitoringAllowed_ = false;
bool IsViewerAllowed_ = false;
bool IsDatabaseAllowed_ = false;
bool IsRegisterNodeAllowed_ = false;
bool IsBootstrapAllowed_ = false;
};

using TAsyncWhoAmIResult = NThreading::TFuture<TWhoAmIResult>;
Expand Down
7 changes: 7 additions & 0 deletions include/ydb-cpp-sdk/client/driver/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ class TDriverConfig {
//! default: true, 30, 5, 10 for linux, and true and OS default for others POSIX
TDriverConfig& SetTcpKeepAliveSettings(bool enable, size_t idle, size_t count, size_t interval);

//! Set TCP_NODELAY socket option
//! enable - if true TCP_NODELAY is enabled (default, no Nagle algorithm, low latency, packet fragmentation)
//! - if false TCP_NODELAY is disabled (Nagle algorithm enabled, reduced packet fragmentation)
//! NOTE: This affects network performance. Disable only if you want to reduce packet fragmentation.
//! default: true
TDriverConfig& SetTcpNoDelay(bool enable);

//! Enable or disable drain of client logic (e.g. session pool drain) during dtor call
TDriverConfig& SetDrainOnDtors(bool allowed);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ class TFederatedTopicClient {
std::shared_ptr<IFederatedReadSession> CreateReadSession(const TFederatedReadSessionSettings& settings);

//! Create write session.
// std::shared_ptr<NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TFederatedWriteSessionSettings& settings);
std::shared_ptr<NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TFederatedWriteSessionSettings& settings);
std::shared_ptr<NTopic::IWriteSession> CreateWriteSession(const TFederatedWriteSessionSettings& settings);

struct TClusterInfo {
Expand Down
8 changes: 8 additions & 0 deletions include/ydb-cpp-sdk/client/topic/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ class TTopicClient {

//! Create write session.
std::shared_ptr<ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TWriteSessionSettings& settings);

//! Create simple blocking keyed write session. Experimental feature. DO NOT USE IN PRODUCTION.
std::shared_ptr<ISimpleBlockingKeyedWriteSession> CreateSimpleBlockingKeyedWriteSession(const TKeyedWriteSessionSettings& settings);

//! Create keyed write session. Experimental feature. DO NOT USE IN PRODUCTION.
std::shared_ptr<IKeyedWriteSession> CreateKeyedWriteSession(const TKeyedWriteSessionSettings& settings);

//! Create write session.
std::shared_ptr<IWriteSession> CreateWriteSession(const TWriteSessionSettings& settings);

// Commit offset
Expand Down
3 changes: 3 additions & 0 deletions include/ydb-cpp-sdk/client/topic/read_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ struct TReadSessionSettings: public TRequestSettings<TReadSessionSettings> {

//! Log.
FLUENT_SETTING_OPTIONAL(TLog, Log);

//! InFlightMemoryController.
FLUENT_SETTING_OPTIONAL(std::uint64_t, PartitionMaxInFlightBytes);
};

struct TReadSessionGetEventSettings : public TCommonClientSettingsBase<TReadSessionGetEventSettings> {
Expand Down
83 changes: 83 additions & 0 deletions include/ydb-cpp-sdk/client/topic/write_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,45 @@ struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> {
FLUENT_SETTING_DEFAULT(bool, ValidateSeqNo, true);
};

struct TKeyedWriteSessionSettings : public TWriteSessionSettings {
using TSelf = TKeyedWriteSessionSettings;

enum class EPartitionChooserStrategy {
Bound,
Hash,
};

TKeyedWriteSessionSettings() = default;
TKeyedWriteSessionSettings(const TKeyedWriteSessionSettings&) = default;
TKeyedWriteSessionSettings(TKeyedWriteSessionSettings&&) = default;

TKeyedWriteSessionSettings& operator=(const TKeyedWriteSessionSettings&) = default;
TKeyedWriteSessionSettings& operator=(TKeyedWriteSessionSettings&&) = default;

//! Session lifetime.
FLUENT_SETTING_DEFAULT(TDuration, SubSessionIdleTimeout, TDuration::Seconds(30));

//! Partition chooser strategy.
FLUENT_SETTING_DEFAULT(EPartitionChooserStrategy, PartitionChooserStrategy, EPartitionChooserStrategy::Bound);

//! Hasher function.
FLUENT_SETTING_DEFAULT(std::function<std::string(const std::string_view key)>, PartitioningKeyHasher, DefaultPartitioningKeyHasher);

//! Default partitioning key hasher.
//! Uses MurmurHash.
static std::string DefaultPartitioningKeyHasher(const std::string_view key);

//! ProducerId prefix to use.
//! ProducerId is generated as ProducerIdPrefix + partition id.
FLUENT_SETTING(std::string, ProducerIdPrefix);

//! SessionID to use.
FLUENT_SETTING_DEFAULT(std::string, SessionId, "");

private:
using TWriteSessionSettings::ProducerId;
};

//! Contains the message to write and all the options.
struct TWriteMessage {
using TSelf = TWriteMessage;
Expand Down Expand Up @@ -276,4 +315,48 @@ class IWriteSession {
virtual ~IWriteSession() = default;
};

//! Keyed write session. Experimental SDK. DO NOT USE IN PRODUCTION.
class IKeyedWriteSession {
public:
//! Write single message.
//! continuationToken - a token earlier provided to client with ReadyToAccept event.
virtual void Write(TContinuationToken&& continuationToken, const std::string& key, TWriteMessage&& message,
TTransactionBase* tx = nullptr) = 0;

//! Future that is set when next event is available.
virtual NThreading::TFuture<void> WaitEvent() = 0;

//! Wait and return next event. Use WaitEvent() for non-blocking wait.
virtual std::optional<TWriteSessionEvent::TEvent> GetEvent(bool block = false) = 0;

//! Get several events in one call.
//! If blocking = false, instantly returns up to maxEventsCount available events.
//! If blocking = true, blocks till maxEventsCount events are available.
//! If maxEventsCount is unset, write session decides the count to return itself.
virtual std::vector<TWriteSessionEvent::TEvent> GetEvents(bool block = false, std::optional<size_t> maxEventsCount = std::nullopt) = 0;

virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0;
virtual TWriterCounters::TPtr GetCounters() = 0;
virtual ~IKeyedWriteSession() = default;
};

//! Simple blocking keyed write session. Experimental SDK. DO NOT USE IN PRODUCTION.
class ISimpleBlockingKeyedWriteSession {
public:
//! Write single message.
//! continuationToken - a token earlier provided to client with ReadyToAccept event.
virtual bool Write(const std::string& key, TWriteMessage&& message, TTransactionBase* tx = nullptr,
TDuration blockTimeout = TDuration::Max()) = 0;

//! Wait for all writes to complete (no more that closeTimeout()), then close.
//! Return true if all writes were completed and acked, false if timeout was reached and some writes were aborted.
virtual bool Close(TDuration closeTimeout = TDuration::Max()) = 0;

//! Writer counters with different stats (see TWriterConuters).
virtual TWriterCounters::TPtr GetCounters() = 0;

//! Close() with timeout = 0 and destroy everything instantly.
virtual ~ISimpleBlockingKeyedWriteSession() = default;
};

} // namespace NYdb::NTopic
2 changes: 2 additions & 0 deletions library/cpp/threading/future/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ if (YDB_SDK_TESTS)
unit
)
endif()

add_subdirectory(subscription)
31 changes: 31 additions & 0 deletions library/cpp/threading/future/subscription/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
_ydb_sdk_add_library(threading-future-subscription)

target_link_libraries(threading-future-subscription PUBLIC
yutil
threading-future
)

target_sources(threading-future-subscription
PRIVATE
subscription.cpp
wait_all.cpp
wait_all_or_exception.cpp
wait_any.cpp
)

_ydb_sdk_install_targets(TARGETS threading-future-subscription)

if (YDB_SDK_TESTS)
add_ydb_test(NAME future-subscription-ut
SOURCES
subscription_ut.cpp
wait_all_ut.cpp
wait_all_or_exception_ut.cpp
wait_any_ut.cpp
wait_ut_common.cpp
LINK_LIBRARIES
threading-future-subscription
LABELS
unit
)
endif()
104 changes: 104 additions & 0 deletions library/cpp/threading/future/subscription/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
Subscriptions manager and wait primitives library
=================================================

Wait primitives
---------------

All wait primitives are futures those being signaled when some or all of theirs dependencies are signaled.
Wait privimitives could be constructed either from an initializer_list or from a standard container of futures.

1. WaitAll is signaled when all its dependencies are signaled:

```C++
#include <library/cpp/threading/subscriptions/wait_all.h>

auto w = NWait::WaitAll({ future1, future2, ..., futureN });
...
w.Wait(); // wait for all futures
```

2. WaitAny is signaled when any of its dependencies is signaled:

```C++
#include <library/cpp/threading/subscriptions/wait_any.h>

auto w = NWait::WaitAny(TVector<TFuture<T>>{ future1, future2, ..., futureN });
...
w.Wait(); // wait for any future
```

3. WaitAllOrException is signaled when all its dependencies are signaled with values or any dependency is signaled with an exception:

```C++
#include <library/cpp/threading/subscriptions/wait_all_or_exception.h>

auto w = NWait::WaitAllOrException(TVector<TFuture<T>>{ future1, future2, ..., futureN });
...
w.Wait(); // wait for all values or for an exception
```

Subscriptions manager
---------------------

The subscription manager can manage multiple links beetween futures and callbacks. Multiple managed subscriptions to a single future shares just a single underlying subscription to the future. That allows dynamic creation and deletion of subscriptions and efficient implementation of different wait primitives.
The subscription manager could be used in the following way:

1. Subscribe to a single future:

```C++
#include <library/cpp/threading/subscriptions/subscription.h>

TFuture<int> LongOperation();

...
auto future = LongRunnigOperation();
auto m = MakeSubsriptionManager<int>();
auto id = m->Subscribe(future, [](TFuture<int> const& f) {
try {
auto value = f.GetValue();
...
} catch (...) {
... // handle exception
}
});
if (id.has_value()) {
... // Callback will run asynchronously
} else {
... // Future has been signaled already. The callback has been invoked synchronously
}
```

Note that a callback could be invoked synchronously during a Subscribe call. In this case the returned optional will have no value.

2. Unsubscribe from a single future:

```C++
// id holds the subscription id from a previous Subscribe call
m->Unsubscribe(id.value());
```

There is no need to call Unsubscribe if the callback has been called. In this case Unsubscribe will do nothing. And it is safe to call Unsubscribe with the same id multiple times.

3. Subscribe a single callback to multiple futures:

```C++
auto ids = m->Subscribe({ future1, future2, ..., futureN }, [](auto&& f) { ... });
...
```

Futures could be passed to Subscribe method either via an initializer_list or via a standard container like vector or list. Subscribe method accept an optional boolean parameter revertOnSignaled. If the parameter is false (default) then all subscriptions will be performed regardless of the futures states and the returned vector will have a subscription id for each future (even if callback has been executed synchronously for some futures). Otherwise the method will stop on the first signaled future (the callback will be synchronously called for it), no subscriptions will be created and an empty vector will be returned.

4. Unsubscribe multiple subscriptions:

```C++
// ids is the vector or subscription ids
m->Unsubscribe(ids);
```

The vector of IDs could be a result of a previous Subscribe call or an arbitrary set of IDs of previously created subscriptions.

5. If you do not want to instantiate a new instance of the subscription manager it is possible to use the default instance:

```C++
auto m = TSubscriptionManager<T>::Default();
```
Loading
Loading